1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
58 SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 #[must_use]
210 pub fn new(mode: CommitMode) -> Self {
211 Self {
212 mode,
213 background: false,
214 }
215 }
216
217 #[must_use]
218 pub fn background(mut self, background: bool) -> Self {
219 self.background = background;
220 self
221 }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226 REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230 mime: Option<&str>,
231 magic: Option<&[u8]>,
232 uri: Option<&str>,
233) -> Option<DocumentFormat> {
234 if detect_pdf_magic(magic) {
236 return Some(DocumentFormat::Pdf);
237 }
238
239 if let Some(magic_bytes) = magic {
242 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243 if let Some(format) = infer_format_from_extension(uri) {
245 return Some(format);
246 }
247 }
248 }
249
250 if let Some(mime_str) = mime {
252 let mime_lower = mime_str.trim().to_ascii_lowercase();
253 let format = match mime_lower.as_str() {
254 "application/pdf" => Some(DocumentFormat::Pdf),
255 "text/plain" => Some(DocumentFormat::PlainText),
256 "text/markdown" => Some(DocumentFormat::Markdown),
257 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259 Some(DocumentFormat::Docx)
260 }
261 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262 Some(DocumentFormat::Xlsx)
263 }
264 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266 Some(DocumentFormat::Pptx)
267 }
268 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269 _ => None,
270 };
271 if format.is_some() {
272 return format;
273 }
274 }
275
276 infer_format_from_extension(uri)
278}
279
280fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282 let uri = uri?;
283 let path = std::path::Path::new(uri);
284 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285 match ext.as_str() {
286 "pdf" => Some(DocumentFormat::Pdf),
287 "docx" => Some(DocumentFormat::Docx),
288 "xlsx" => Some(DocumentFormat::Xlsx),
289 "xls" => Some(DocumentFormat::Xls),
290 "pptx" => Some(DocumentFormat::Pptx),
291 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294 | "sql" => Some(DocumentFormat::PlainText),
295 "md" | "markdown" => Some(DocumentFormat::Markdown),
296 "html" | "htm" => Some(DocumentFormat::Html),
297 _ => None,
298 }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302 let mut slice = match magic {
303 Some(slice) if !slice.is_empty() => slice,
304 _ => return false,
305 };
306 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307 slice = &slice[3..];
308 }
309 while let Some((first, rest)) = slice.split_first() {
310 if first.is_ascii_whitespace() {
311 slice = rest;
312 } else {
313 break;
314 }
315 }
316 slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320 target = "memvid::extract",
321 skip_all,
322 fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325 bytes: &[u8],
326 mime_hint: Option<&str>,
327 uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329 let registry = default_reader_registry();
330 let magic = bytes
331 .get(..MAGIC_SNIFF_BYTES)
332 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334 .with_uri(uri)
335 .with_magic(magic);
336
337 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338 let start = Instant::now();
339 match reader.extract(bytes, &hint) {
340 Ok(output) => {
341 return Ok(finalize_reader_output(output, start));
342 }
343 Err(err) => {
344 tracing::error!(
345 target = "memvid::extract",
346 reader = reader.name(),
347 error = %err,
348 "reader failed; falling back"
349 );
350 Some(format!("reader {} failed: {err}", reader.name()))
351 }
352 }
353 } else {
354 tracing::debug!(
355 target = "memvid::extract",
356 format = hint.format.map(super::super::reader::DocumentFormat::label),
357 "no reader matched; using default extractor"
358 );
359 Some("no registered reader matched; using default extractor".to_string())
360 };
361
362 let start = Instant::now();
363 let mut output = PassthroughReader.extract(bytes, &hint)?;
364 if let Some(reason) = fallback_reason {
365 output.diagnostics.track_warning(reason);
366 }
367 Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371 let elapsed = start.elapsed();
372 let ReaderOutput {
373 document,
374 reader_name,
375 diagnostics,
376 } = output;
377 log_reader_result(&reader_name, &diagnostics, elapsed);
378 document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382 let duration_ms = diagnostics
383 .duration_ms
384 .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385 let warnings = diagnostics.warnings.len();
386 let pages = diagnostics.pages_processed;
387
388 if warnings > 0 || diagnostics.fallback {
389 tracing::warn!(
390 target = "memvid::extract",
391 reader,
392 duration_ms,
393 pages,
394 warnings,
395 fallback = diagnostics.fallback,
396 "extraction completed with warnings"
397 );
398 for warning in &diagnostics.warnings {
399 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400 }
401 } else {
402 tracing::info!(
403 target = "memvid::extract",
404 reader,
405 duration_ms,
406 pages,
407 "extraction completed"
408 );
409 }
410}
411
412impl Memvid {
413 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416 where
417 F: FnOnce(&mut Self) -> Result<()>,
418 {
419 self.file.sync_all()?;
420 let mut staging = CommitStaging::prepare(self.path())?;
421 staging.copy_from(&self.file)?;
422
423 let staging_handle = staging.clone_file()?;
424 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425 let original_file = std::mem::replace(&mut self.file, staging_handle);
426 let original_wal = std::mem::replace(&mut self.wal, new_wal);
427 let original_header = self.header.clone();
428 let original_toc = self.toc.clone();
429 let original_data_end = self.data_end;
430 let original_generation = self.generation;
431 let original_dirty = self.dirty;
432 #[cfg(feature = "lex")]
433 let original_tantivy_dirty = self.tantivy_dirty;
434
435 let destination_path = self.path().to_path_buf();
436 let mut original_file = Some(original_file);
437 let mut original_wal = Some(original_wal);
438
439 match op(self) {
440 Ok(()) => {
441 self.file.sync_all()?;
442 match staging.commit() {
443 Ok(()) => {
444 drop(original_file.take());
445 drop(original_wal.take());
446 self.file = OpenOptions::new()
447 .read(true)
448 .write(true)
449 .open(&destination_path)?;
450 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451 Ok(())
452 }
453 Err(commit_err) => {
454 if let Some(file) = original_file.take() {
455 self.file = file;
456 }
457 if let Some(wal) = original_wal.take() {
458 self.wal = wal;
459 }
460 self.header = original_header;
461 self.toc = original_toc;
462 self.data_end = original_data_end;
463 self.generation = original_generation;
464 self.dirty = original_dirty;
465 #[cfg(feature = "lex")]
466 {
467 self.tantivy_dirty = original_tantivy_dirty;
468 }
469 Err(commit_err)
470 }
471 }
472 }
473 Err(err) => {
474 let _ = staging.discard();
475 if let Some(file) = original_file.take() {
476 self.file = file;
477 }
478 if let Some(wal) = original_wal.take() {
479 self.wal = wal;
480 }
481 self.header = original_header;
482 self.toc = original_toc;
483 self.data_end = original_data_end;
484 self.generation = original_generation;
485 self.dirty = original_dirty;
486 #[cfg(feature = "lex")]
487 {
488 self.tantivy_dirty = original_tantivy_dirty;
489 }
490 Err(err)
491 }
492 }
493 }
494
495 pub(crate) fn catalog_data_end(&self) -> u64 {
496 let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498 for descriptor in &self.toc.segment_catalog.lex_segments {
499 if descriptor.common.bytes_length == 0 {
500 continue;
501 }
502 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503 }
504
505 for descriptor in &self.toc.segment_catalog.vec_segments {
506 if descriptor.common.bytes_length == 0 {
507 continue;
508 }
509 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510 }
511
512 for descriptor in &self.toc.segment_catalog.time_segments {
513 if descriptor.common.bytes_length == 0 {
514 continue;
515 }
516 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517 }
518
519 #[cfg(feature = "temporal_track")]
520 for descriptor in &self.toc.segment_catalog.temporal_segments {
521 if descriptor.common.bytes_length == 0 {
522 continue;
523 }
524 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525 }
526
527 #[cfg(feature = "lex")]
528 for descriptor in &self.toc.segment_catalog.tantivy_segments {
529 if descriptor.common.bytes_length == 0 {
530 continue;
531 }
532 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533 }
534
535 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536 if manifest.bytes_length != 0 {
537 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538 }
539 }
540
541 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542 if manifest.bytes_length != 0 {
543 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544 }
545 }
546
547 if let Some(manifest) = self.toc.time_index.as_ref() {
548 if manifest.bytes_length != 0 {
549 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550 }
551 }
552
553 #[cfg(feature = "temporal_track")]
554 if let Some(track) = self.toc.temporal_track.as_ref() {
555 if track.bytes_length != 0 {
556 max_end = max_end.max(track.bytes_offset + track.bytes_length);
557 }
558 }
559
560 max_end
561 }
562
563 fn payload_region_end(&self) -> u64 {
564 let wal_region_end = self.header.wal_offset + self.header.wal_size;
565 let frames_with_payload: Vec<_> = self
566 .toc
567 .frames
568 .iter()
569 .filter(|frame| frame.payload_length != 0)
570 .collect();
571
572 tracing::info!(
573 "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
574 frames_with_payload.len(),
575 self.toc.frames.len(),
576 wal_region_end
577 );
578
579 for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
580 tracing::info!(
581 " frame[{}]: id={} offset={} length={} status={:?}",
582 idx,
583 frame.id,
584 frame.payload_offset,
585 frame.payload_length,
586 frame.status
587 );
588 }
589
590 let result = frames_with_payload
591 .iter()
592 .fold(wal_region_end, |max_end, frame| {
593 match frame.payload_offset.checked_add(frame.payload_length) {
594 Some(end) => max_end.max(end),
595 None => max_end,
596 }
597 });
598
599 tracing::info!("payload_region_end: returning {}", result);
600 result
601 }
602
603 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
604 loop {
605 match self.wal.append_entry(payload) {
606 Ok(seq) => return Ok(seq),
607 Err(MemvidError::CheckpointFailed { reason })
608 if reason == "embedded WAL region too small for entry"
609 || reason == "embedded WAL region full" =>
610 {
611 let required = WAL_ENTRY_HEADER_SIZE
614 .saturating_add(payload.len() as u64)
615 .max(self.header.wal_size + 1);
616 self.grow_wal_region(required)?;
617 }
618 Err(err) => return Err(err),
619 }
620 }
621 }
622
623 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
624 let mut new_size = self.header.wal_size;
625 let mut target = required_entry_size;
626 if target == 0 {
627 target = self.header.wal_size;
628 }
629 while new_size <= target {
630 new_size = new_size
631 .checked_mul(2)
632 .ok_or_else(|| MemvidError::CheckpointFailed {
633 reason: "wal_size overflow".into(),
634 })?;
635 }
636 let delta = new_size - self.header.wal_size;
637 if delta == 0 {
638 return Ok(());
639 }
640
641 self.shift_data_for_wal_growth(delta)?;
642 self.header.wal_size = new_size;
643 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
644 self.data_end = self.data_end.saturating_add(delta);
645 self.adjust_offsets_after_wal_growth(delta);
646
647 let catalog_end = self.catalog_data_end();
648 self.header.footer_offset = catalog_end
649 .max(self.header.footer_offset)
650 .max(self.data_end);
651
652 self.rewrite_toc_footer()?;
653 self.header.toc_checksum = self.toc.toc_checksum;
654 crate::persist_header(&mut self.file, &self.header)?;
655 self.file.sync_all()?;
656 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
657 Ok(())
658 }
659
660 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
661 if delta == 0 {
662 return Ok(());
663 }
664 let original_len = self.file.metadata()?.len();
665 let data_start = self.header.wal_offset + self.header.wal_size;
666 self.file.set_len(original_len + delta)?;
667
668 let mut remaining = original_len.saturating_sub(data_start);
669 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
670 while remaining > 0 {
671 let chunk = min(remaining, buffer.len() as u64);
672 let src = data_start + remaining - chunk;
673 self.file.seek(SeekFrom::Start(src))?;
674 #[allow(clippy::cast_possible_truncation)]
675 self.file.read_exact(&mut buffer[..chunk as usize])?;
676 let dst = src + delta;
677 self.file.seek(SeekFrom::Start(dst))?;
678 #[allow(clippy::cast_possible_truncation)]
679 self.file.write_all(&buffer[..chunk as usize])?;
680 remaining -= chunk;
681 }
682
683 self.file.seek(SeekFrom::Start(data_start))?;
684 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
685 let mut remaining = delta;
686 while remaining > 0 {
687 let write = min(remaining, zero_buf.len() as u64);
688 #[allow(clippy::cast_possible_truncation)]
689 self.file.write_all(&zero_buf[..write as usize])?;
690 remaining -= write;
691 }
692 Ok(())
693 }
694
695 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
696 if delta == 0 {
697 return;
698 }
699
700 for frame in &mut self.toc.frames {
701 if frame.payload_offset != 0 {
702 frame.payload_offset += delta;
703 }
704 }
705
706 for segment in &mut self.toc.segments {
707 if segment.bytes_offset != 0 {
708 segment.bytes_offset += delta;
709 }
710 }
711
712 if let Some(lex) = self.toc.indexes.lex.as_mut() {
713 if lex.bytes_offset != 0 {
714 lex.bytes_offset += delta;
715 }
716 }
717 for manifest in &mut self.toc.indexes.lex_segments {
718 if manifest.bytes_offset != 0 {
719 manifest.bytes_offset += delta;
720 }
721 }
722 if let Some(vec) = self.toc.indexes.vec.as_mut() {
723 if vec.bytes_offset != 0 {
724 vec.bytes_offset += delta;
725 }
726 }
727 if let Some(time_index) = self.toc.time_index.as_mut() {
728 if time_index.bytes_offset != 0 {
729 time_index.bytes_offset += delta;
730 }
731 }
732 #[cfg(feature = "temporal_track")]
733 if let Some(track) = self.toc.temporal_track.as_mut() {
734 if track.bytes_offset != 0 {
735 track.bytes_offset += delta;
736 }
737 }
738
739 let catalog = &mut self.toc.segment_catalog;
740 for descriptor in &mut catalog.lex_segments {
741 if descriptor.common.bytes_offset != 0 {
742 descriptor.common.bytes_offset += delta;
743 }
744 }
745 for descriptor in &mut catalog.vec_segments {
746 if descriptor.common.bytes_offset != 0 {
747 descriptor.common.bytes_offset += delta;
748 }
749 }
750 for descriptor in &mut catalog.time_segments {
751 if descriptor.common.bytes_offset != 0 {
752 descriptor.common.bytes_offset += delta;
753 }
754 }
755 #[cfg(feature = "temporal_track")]
756 for descriptor in &mut catalog.temporal_segments {
757 if descriptor.common.bytes_offset != 0 {
758 descriptor.common.bytes_offset += delta;
759 }
760 }
761 for descriptor in &mut catalog.tantivy_segments {
762 if descriptor.common.bytes_offset != 0 {
763 descriptor.common.bytes_offset += delta;
764 }
765 }
766
767 #[cfg(feature = "lex")]
768 if let Ok(mut storage) = self.lex_storage.write() {
769 storage.adjust_offsets(delta);
770 }
771 }
772 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
773 self.ensure_writable()?;
774 if options.background {
775 tracing::debug!("commit background flag ignored; running synchronously");
776 }
777 let mode = options.mode;
778 let records = self.wal.pending_records()?;
779 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
780 return Ok(());
781 }
782 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
783 }
784
785 pub fn commit(&mut self) -> Result<()> {
786 self.ensure_writable()?;
787 self.commit_with_options(CommitOptions::new(CommitMode::Full))
788 }
789
790 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
791 self.generation = self.generation.wrapping_add(1);
792
793 let delta = self.apply_records(records)?;
794 let mut indexes_rebuilt = false;
795
796 let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
798
799 if !delta.is_empty() || clip_needs_persist {
800 tracing::debug!(
801 inserted_frames = delta.inserted_frames.len(),
802 inserted_embeddings = delta.inserted_embeddings.len(),
803 inserted_time_entries = delta.inserted_time_entries.len(),
804 clip_needs_persist = clip_needs_persist,
805 "commit applied delta"
806 );
807 self.rebuild_indexes(&delta.inserted_embeddings)?;
808 indexes_rebuilt = true;
809 }
810
811 if !indexes_rebuilt && self.tantivy_index_pending() {
812 self.flush_tantivy()?;
813 }
814
815 if !indexes_rebuilt && self.clip_enabled {
817 if let Some(ref clip_index) = self.clip_index {
818 if !clip_index.is_empty() {
819 self.persist_clip_index()?;
820 }
821 }
822 }
823
824 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
826 self.persist_memories_track()?;
827 }
828
829 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
831 self.persist_logic_mesh()?;
832 }
833
834 if !self.sketch_track.is_empty() {
836 self.persist_sketch_track()?;
837 }
838
839 self.rewrite_toc_footer()?;
843 self.header.toc_checksum = self.toc.toc_checksum;
844 self.wal.record_checkpoint(&mut self.header)?;
845 self.header.toc_checksum = self.toc.toc_checksum;
846 crate::persist_header(&mut self.file, &self.header)?;
847 self.file.sync_all()?;
848 #[cfg(feature = "parallel_segments")]
849 if let Some(wal) = self.manifest_wal.as_mut() {
850 wal.flush()?;
851 wal.truncate()?;
852 }
853 self.pending_frame_inserts = 0;
854 self.dirty = false;
855 Ok(())
856 }
857
858 #[cfg(feature = "parallel_segments")]
859 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
860 self.ensure_writable()?;
861 if !self.dirty && !self.tantivy_index_pending() {
862 return Ok(());
863 }
864 let opts = opts.clone();
865 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
866 }
867
868 #[cfg(feature = "parallel_segments")]
869 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
870 if !self.dirty && !self.tantivy_index_pending() {
871 return Ok(());
872 }
873 let records = self.wal.pending_records()?;
874 let delta = self.apply_records(records)?;
875 self.generation = self.generation.wrapping_add(1);
876 let mut indexes_rebuilt = false;
877 if !delta.is_empty() {
878 tracing::info!(
879 inserted_frames = delta.inserted_frames.len(),
880 inserted_embeddings = delta.inserted_embeddings.len(),
881 inserted_time_entries = delta.inserted_time_entries.len(),
882 "parallel commit applied delta"
883 );
884 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
886 tracing::info!(
887 "parallel_commit: used_parallel={}, lex_enabled={}",
888 used_parallel,
889 self.lex_enabled
890 );
891 if used_parallel {
892 self.header.footer_offset = self.data_end;
895 indexes_rebuilt = true;
896
897 #[cfg(feature = "lex")]
900 if self.lex_enabled {
901 tracing::info!(
902 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
903 delta.inserted_frames.len(),
904 self.toc.frames.len()
905 );
906
907 let tantivy_was_present = self.tantivy.is_some();
911 if self.tantivy.is_none() {
912 self.init_tantivy()?;
913 }
914
915 if tantivy_was_present {
917 tracing::info!(
918 "parallel_commit: skipping Tantivy indexing (already indexed during put)"
919 );
920 } else {
921 let max_payload = crate::memvid::search::max_index_payload();
923 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
924
925 for frame_id in &delta.inserted_frames {
926 let frame = match self.toc.frames.get(*frame_id as usize) {
928 Some(f) => f.clone(),
929 None => continue,
930 };
931
932 let explicit_text = frame.search_text.clone();
934 if let Some(ref search_text) = explicit_text {
935 if !search_text.trim().is_empty() {
936 prepared_docs.push((frame, search_text.clone()));
937 continue;
938 }
939 }
940
941 let mime = frame
943 .metadata
944 .as_ref()
945 .and_then(|m| m.mime.as_deref())
946 .unwrap_or("application/octet-stream");
947
948 if !crate::memvid::search::is_text_indexable_mime(mime) {
949 continue;
950 }
951
952 if frame.payload_length > max_payload {
953 continue;
954 }
955
956 let text = self.frame_search_text(&frame)?;
957 if !text.trim().is_empty() {
958 prepared_docs.push((frame, text));
959 }
960 }
961
962 if let Some(ref mut engine) = self.tantivy {
964 for (frame, text) in &prepared_docs {
965 engine.add_frame(frame, text)?;
966 }
967
968 if !prepared_docs.is_empty() {
969 engine.commit()?;
970 self.tantivy_dirty = true;
971 }
972
973 tracing::info!(
974 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
975 prepared_docs.len(),
976 engine.num_docs()
977 );
978 } else {
979 tracing::warn!(
980 "parallel_commit: Tantivy engine is None after init_tantivy"
981 );
982 }
983 } }
985
986 self.file.seek(SeekFrom::Start(self.data_end))?;
989 let mut time_entries: Vec<TimeIndexEntry> = self
990 .toc
991 .frames
992 .iter()
993 .filter(|frame| {
994 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
995 })
996 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
997 .collect();
998 let (ti_offset, ti_length, ti_checksum) =
999 time_index_append(&mut self.file, &mut time_entries)?;
1000 self.toc.time_index = Some(TimeIndexManifest {
1001 bytes_offset: ti_offset,
1002 bytes_length: ti_length,
1003 entry_count: time_entries.len() as u64,
1004 checksum: ti_checksum,
1005 });
1006 self.data_end = ti_offset + ti_length;
1008 self.header.footer_offset = self.data_end;
1009 tracing::info!(
1010 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1011 ti_offset,
1012 ti_length,
1013 time_entries.len()
1014 );
1015 } else {
1016 self.rebuild_indexes(&delta.inserted_embeddings)?;
1018 indexes_rebuilt = true;
1019 }
1020 }
1021
1022 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1024 self.flush_tantivy()?;
1025 }
1026
1027 if self.clip_enabled {
1029 if let Some(ref clip_index) = self.clip_index {
1030 if !clip_index.is_empty() {
1031 self.persist_clip_index()?;
1032 }
1033 }
1034 }
1035
1036 if self.memories_track.card_count() > 0 {
1038 self.persist_memories_track()?;
1039 }
1040
1041 if !self.logic_mesh.is_empty() {
1043 self.persist_logic_mesh()?;
1044 }
1045
1046 if !self.sketch_track.is_empty() {
1048 self.persist_sketch_track()?;
1049 }
1050
1051 self.rewrite_toc_footer()?;
1054 self.header.toc_checksum = self.toc.toc_checksum;
1055 self.wal.record_checkpoint(&mut self.header)?;
1056 self.header.toc_checksum = self.toc.toc_checksum;
1057 crate::persist_header(&mut self.file, &self.header)?;
1058 self.file.sync_all()?;
1059 if let Some(wal) = self.manifest_wal.as_mut() {
1060 wal.flush()?;
1061 wal.truncate()?;
1062 }
1063 self.pending_frame_inserts = 0;
1064 self.dirty = false;
1065 Ok(())
1066 }
1067
1068 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1069 let records = self.wal.records_after(self.header.wal_sequence)?;
1070 if records.is_empty() {
1071 if self.tantivy_index_pending() {
1072 self.flush_tantivy()?;
1073 }
1074 return Ok(());
1075 }
1076 let delta = self.apply_records(records)?;
1077 if !delta.is_empty() {
1078 tracing::debug!(
1079 inserted_frames = delta.inserted_frames.len(),
1080 inserted_embeddings = delta.inserted_embeddings.len(),
1081 inserted_time_entries = delta.inserted_time_entries.len(),
1082 "recover applied delta"
1083 );
1084 self.rebuild_indexes(&delta.inserted_embeddings)?;
1085 } else if self.tantivy_index_pending() {
1086 self.flush_tantivy()?;
1087 }
1088 self.wal.record_checkpoint(&mut self.header)?;
1089 crate::persist_header(&mut self.file, &self.header)?;
1090 if !delta.is_empty() {
1091 } else if self.tantivy_index_pending() {
1093 self.flush_tantivy()?;
1094 crate::persist_header(&mut self.file, &self.header)?;
1095 }
1096 self.file.sync_all()?;
1097 self.pending_frame_inserts = 0;
1098 self.dirty = false;
1099 Ok(())
1100 }
1101
1102 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1103 let mut delta = IngestionDelta::default();
1104 if records.is_empty() {
1105 return Ok(delta);
1106 }
1107
1108 let mut data_cursor = self.data_end;
1113 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1114
1115 if !records.is_empty() {
1116 self.file.seek(SeekFrom::Start(data_cursor))?;
1117 for record in records {
1118 let mut entry = match decode_wal_entry(&record.payload)? {
1119 WalEntry::Frame(entry) => entry,
1120 #[cfg(feature = "lex")]
1121 WalEntry::Lex(batch) => {
1122 self.apply_lex_wal(batch)?;
1123 continue;
1124 }
1125 };
1126
1127 match entry.op {
1128 FrameWalOp::Insert => {
1129 let frame_id = self.toc.frames.len() as u64;
1130
1131 let (
1132 payload_offset,
1133 payload_length,
1134 checksum_bytes,
1135 canonical_length_value,
1136 ) = if let Some(source_id) = entry.reuse_payload_from {
1137 if !entry.payload.is_empty() {
1138 return Err(MemvidError::InvalidFrame {
1139 frame_id: source_id,
1140 reason: "reused payload entry contained inline bytes",
1141 });
1142 }
1143 let source_idx = usize::try_from(source_id).map_err(|_| {
1144 MemvidError::InvalidFrame {
1145 frame_id: source_id,
1146 reason: "frame id too large for memory",
1147 }
1148 })?;
1149 let source = self.toc.frames.get(source_idx).cloned().ok_or(
1150 MemvidError::InvalidFrame {
1151 frame_id: source_id,
1152 reason: "reused payload source missing",
1153 },
1154 )?;
1155 (
1156 source.payload_offset,
1157 source.payload_length,
1158 source.checksum,
1159 entry
1160 .canonical_length
1161 .or(source.canonical_length)
1162 .unwrap_or(source.payload_length),
1163 )
1164 } else {
1165 self.file.seek(SeekFrom::Start(data_cursor))?;
1166 self.file.write_all(&entry.payload)?;
1167 let checksum = hash(&entry.payload);
1168 let payload_length = entry.payload.len() as u64;
1169 let canonical_length =
1170 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1171 if let Some(len) = entry.canonical_length {
1172 len
1173 } else {
1174 let decoded = crate::decode_canonical_bytes(
1175 &entry.payload,
1176 CanonicalEncoding::Zstd,
1177 frame_id,
1178 )?;
1179 decoded.len() as u64
1180 }
1181 } else {
1182 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1183 };
1184 let payload_offset = data_cursor;
1185 data_cursor += payload_length;
1186 (
1187 payload_offset,
1188 payload_length,
1189 *checksum.as_bytes(),
1190 canonical_length,
1191 )
1192 };
1193
1194 let uri = entry
1195 .uri
1196 .clone()
1197 .unwrap_or_else(|| crate::default_uri(frame_id));
1198 let title = entry
1199 .title
1200 .clone()
1201 .or_else(|| crate::infer_title_from_uri(&uri));
1202
1203 #[cfg(feature = "temporal_track")]
1204 let (anchor_ts, anchor_source) =
1205 self.determine_temporal_anchor(entry.timestamp);
1206
1207 let mut frame = Frame {
1208 id: frame_id,
1209 timestamp: entry.timestamp,
1210 anchor_ts: {
1211 #[cfg(feature = "temporal_track")]
1212 {
1213 Some(anchor_ts)
1214 }
1215 #[cfg(not(feature = "temporal_track"))]
1216 {
1217 None
1218 }
1219 },
1220 anchor_source: {
1221 #[cfg(feature = "temporal_track")]
1222 {
1223 Some(anchor_source)
1224 }
1225 #[cfg(not(feature = "temporal_track"))]
1226 {
1227 None
1228 }
1229 },
1230 kind: entry.kind.clone(),
1231 track: entry.track.clone(),
1232 payload_offset,
1233 payload_length,
1234 checksum: checksum_bytes,
1235 uri: Some(uri),
1236 title,
1237 canonical_encoding: entry.canonical_encoding,
1238 canonical_length: Some(canonical_length_value),
1239 metadata: entry.metadata.clone(),
1240 search_text: entry.search_text.clone(),
1241 tags: entry.tags.clone(),
1242 labels: entry.labels.clone(),
1243 extra_metadata: entry.extra_metadata.clone(),
1244 content_dates: entry.content_dates.clone(),
1245 chunk_manifest: entry.chunk_manifest.clone(),
1246 role: entry.role,
1247 parent_id: None,
1248 chunk_index: entry.chunk_index,
1249 chunk_count: entry.chunk_count,
1250 status: FrameStatus::Active,
1251 supersedes: entry.supersedes_frame_id,
1252 superseded_by: None,
1253 source_sha256: entry.source_sha256,
1254 source_path: entry.source_path.clone(),
1255 enrichment_state: entry.enrichment_state,
1256 };
1257
1258 if let Some(parent_seq) = entry.parent_sequence {
1259 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1260 frame.parent_id = Some(*parent_frame_id);
1261 } else {
1262 if entry.role == FrameRole::DocumentChunk {
1268 for &candidate_id in delta.inserted_frames.iter().rev() {
1270 if let Ok(idx) = usize::try_from(candidate_id) {
1271 if let Some(candidate) = self.toc.frames.get(idx) {
1272 if candidate.role == FrameRole::Document
1273 && candidate.chunk_manifest.is_some()
1274 {
1275 frame.parent_id = Some(candidate_id);
1277 tracing::debug!(
1278 chunk_frame_id = frame_id,
1279 parent_frame_id = candidate_id,
1280 parent_seq = parent_seq,
1281 "resolved chunk parent via fallback"
1282 );
1283 break;
1284 }
1285 }
1286 }
1287 }
1288 }
1289 if frame.parent_id.is_none() {
1290 tracing::warn!(
1291 chunk_frame_id = frame_id,
1292 parent_seq = parent_seq,
1293 "chunk has parent_sequence but parent not found in batch"
1294 );
1295 }
1296 }
1297 }
1298
1299 #[cfg(feature = "lex")]
1300 let index_text = if self.tantivy.is_some() {
1301 if let Some(text) = entry.search_text.clone() {
1302 if text.trim().is_empty() {
1303 None
1304 } else {
1305 Some(text)
1306 }
1307 } else {
1308 Some(self.frame_content(&frame)?)
1309 }
1310 } else {
1311 None
1312 };
1313 #[cfg(feature = "lex")]
1314 if let (Some(engine), Some(text)) =
1315 (self.tantivy.as_mut(), index_text.as_ref())
1316 {
1317 engine.add_frame(&frame, text)?;
1318 self.tantivy_dirty = true;
1319
1320 if !text.trim().is_empty() {
1323 let entry = crate::types::generate_sketch(
1324 frame_id,
1325 text,
1326 crate::types::SketchVariant::Small,
1327 None,
1328 );
1329 self.sketch_track.insert(entry);
1330 }
1331 }
1332
1333 if let Some(embedding) = entry.embedding.take() {
1334 delta
1335 .inserted_embeddings
1336 .push((frame_id, embedding.clone()));
1337 }
1338
1339 if entry.role == FrameRole::Document {
1340 delta
1341 .inserted_time_entries
1342 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1343 #[cfg(feature = "temporal_track")]
1344 {
1345 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1346 frame_id,
1347 anchor_ts,
1348 anchor_source,
1349 ));
1350 delta.inserted_temporal_mentions.extend(
1351 Self::collect_temporal_mentions(
1352 entry.search_text.as_deref(),
1353 frame_id,
1354 anchor_ts,
1355 ),
1356 );
1357 }
1358 }
1359
1360 if let Some(predecessor) = frame.supersedes {
1361 self.mark_frame_superseded(predecessor, frame_id)?;
1362 }
1363
1364 self.toc.frames.push(frame);
1365 delta.inserted_frames.push(frame_id);
1366 sequence_to_frame.insert(record.sequence, frame_id);
1367 }
1368 FrameWalOp::Tombstone => {
1369 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1370 frame_id: 0,
1371 reason: "tombstone missing frame reference",
1372 })?;
1373 self.mark_frame_deleted(target)?;
1374 delta.mutated_frames = true;
1375 }
1376 }
1377 }
1378 self.data_end = self.data_end.max(data_cursor);
1379 }
1380
1381 let orphan_resolutions: Vec<(u64, u64)> = delta
1385 .inserted_frames
1386 .iter()
1387 .filter_map(|&frame_id| {
1388 let idx = usize::try_from(frame_id).ok()?;
1389 let frame = self.toc.frames.get(idx)?;
1390 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1391 return None;
1392 }
1393 for candidate_id in (0..frame_id).rev() {
1395 if let Ok(idx) = usize::try_from(candidate_id) {
1396 if let Some(candidate) = self.toc.frames.get(idx) {
1397 if candidate.role == FrameRole::Document
1398 && candidate.chunk_manifest.is_some()
1399 && candidate.status == FrameStatus::Active
1400 {
1401 return Some((frame_id, candidate_id));
1402 }
1403 }
1404 }
1405 }
1406 None
1407 })
1408 .collect();
1409
1410 for (chunk_id, parent_id) in orphan_resolutions {
1412 if let Ok(idx) = usize::try_from(chunk_id) {
1413 if let Some(frame) = self.toc.frames.get_mut(idx) {
1414 frame.parent_id = Some(parent_id);
1415 tracing::debug!(
1416 chunk_frame_id = chunk_id,
1417 parent_frame_id = parent_id,
1418 "resolved orphan chunk parent in second pass"
1419 );
1420 }
1421 }
1422 }
1423
1424 Ok(delta)
1427 }
1428
1429 #[cfg(feature = "temporal_track")]
1430 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1431 (timestamp, AnchorSource::FrameTimestamp)
1432 }
1433
1434 #[cfg(feature = "temporal_track")]
1435 fn collect_temporal_mentions(
1436 text: Option<&str>,
1437 frame_id: FrameId,
1438 anchor_ts: i64,
1439 ) -> Vec<TemporalMention> {
1440 let text = match text {
1441 Some(value) if !value.trim().is_empty() => value,
1442 _ => return Vec::new(),
1443 };
1444
1445 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1446 Ok(ts) => ts,
1447 Err(_) => return Vec::new(),
1448 };
1449
1450 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1451 let normalizer = TemporalNormalizer::new(context);
1452 let mut spans: Vec<(usize, usize)> = Vec::new();
1453 let lower = text.to_ascii_lowercase();
1454
1455 for phrase in STATIC_TEMPORAL_PHRASES {
1456 let mut search_start = 0usize;
1457 while let Some(idx) = lower[search_start..].find(phrase) {
1458 let abs = search_start + idx;
1459 let end = abs + phrase.len();
1460 spans.push((abs, end));
1461 search_start = end;
1462 }
1463 }
1464
1465 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1466 let regex = NUMERIC_DATE.get_or_init(|| {
1467 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1468 });
1469 let regex = match regex {
1470 Ok(re) => re,
1471 Err(msg) => {
1472 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1473 return Vec::new();
1474 }
1475 };
1476 for mat in regex.find_iter(text) {
1477 spans.push((mat.start(), mat.end()));
1478 }
1479
1480 spans.sort_unstable();
1481 spans.dedup();
1482
1483 let mut mentions: Vec<TemporalMention> = Vec::new();
1484 for (start, end) in spans {
1485 if end > text.len() || start >= end {
1486 continue;
1487 }
1488 let raw = &text[start..end];
1489 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1490 if trimmed.is_empty() {
1491 continue;
1492 }
1493 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1494 let finish = offset + trimmed.len();
1495 match normalizer.resolve(trimmed) {
1496 Ok(resolution) => {
1497 mentions.extend(Self::resolution_to_mentions(
1498 resolution, frame_id, offset, finish,
1499 ));
1500 }
1501 Err(_) => continue,
1502 }
1503 }
1504
1505 mentions
1506 }
1507
1508 #[cfg(feature = "temporal_track")]
1509 fn resolution_to_mentions(
1510 resolution: TemporalResolution,
1511 frame_id: FrameId,
1512 byte_start: usize,
1513 byte_end: usize,
1514 ) -> Vec<TemporalMention> {
1515 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1516 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1517 let mut results = Vec::new();
1518
1519 let base_flags = Self::flags_from_resolution(&resolution.flags);
1520 match resolution.value {
1521 TemporalResolutionValue::Date(date) => {
1522 let ts = Self::date_to_timestamp(date);
1523 results.push(TemporalMention::new(
1524 ts,
1525 frame_id,
1526 byte_start,
1527 byte_len,
1528 TemporalMentionKind::Date,
1529 resolution.confidence,
1530 0,
1531 base_flags,
1532 ));
1533 }
1534 TemporalResolutionValue::DateTime(dt) => {
1535 let ts = dt.unix_timestamp();
1536 let tz_hint = dt.offset().whole_minutes() as i16;
1537 results.push(TemporalMention::new(
1538 ts,
1539 frame_id,
1540 byte_start,
1541 byte_len,
1542 TemporalMentionKind::DateTime,
1543 resolution.confidence,
1544 tz_hint,
1545 base_flags,
1546 ));
1547 }
1548 TemporalResolutionValue::DateRange { start, end } => {
1549 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1550 let start_ts = Self::date_to_timestamp(start);
1551 results.push(TemporalMention::new(
1552 start_ts,
1553 frame_id,
1554 byte_start,
1555 byte_len,
1556 TemporalMentionKind::RangeStart,
1557 resolution.confidence,
1558 0,
1559 flags,
1560 ));
1561 let end_ts = Self::date_to_timestamp(end);
1562 results.push(TemporalMention::new(
1563 end_ts,
1564 frame_id,
1565 byte_start,
1566 byte_len,
1567 TemporalMentionKind::RangeEnd,
1568 resolution.confidence,
1569 0,
1570 flags,
1571 ));
1572 }
1573 TemporalResolutionValue::DateTimeRange { start, end } => {
1574 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1575 results.push(TemporalMention::new(
1576 start.unix_timestamp(),
1577 frame_id,
1578 byte_start,
1579 byte_len,
1580 TemporalMentionKind::RangeStart,
1581 resolution.confidence,
1582 start.offset().whole_minutes() as i16,
1583 flags,
1584 ));
1585 results.push(TemporalMention::new(
1586 end.unix_timestamp(),
1587 frame_id,
1588 byte_start,
1589 byte_len,
1590 TemporalMentionKind::RangeEnd,
1591 resolution.confidence,
1592 end.offset().whole_minutes() as i16,
1593 flags,
1594 ));
1595 }
1596 TemporalResolutionValue::Month { year, month } => {
1597 let start_date = match Date::from_calendar_date(year, month, 1) {
1598 Ok(date) => date,
1599 Err(err) => {
1600 tracing::warn!(
1601 target = "memvid::temporal",
1602 %err,
1603 year,
1604 month = month as u8,
1605 "skipping invalid month resolution"
1606 );
1607 return results;
1609 }
1610 };
1611 let end_date = match Self::last_day_in_month(year, month) {
1612 Some(date) => date,
1613 None => {
1614 tracing::warn!(
1615 target = "memvid::temporal",
1616 year,
1617 month = month as u8,
1618 "skipping month resolution with invalid calendar range"
1619 );
1620 return results;
1621 }
1622 };
1623 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1624 results.push(TemporalMention::new(
1625 Self::date_to_timestamp(start_date),
1626 frame_id,
1627 byte_start,
1628 byte_len,
1629 TemporalMentionKind::RangeStart,
1630 resolution.confidence,
1631 0,
1632 flags,
1633 ));
1634 results.push(TemporalMention::new(
1635 Self::date_to_timestamp(end_date),
1636 frame_id,
1637 byte_start,
1638 byte_len,
1639 TemporalMentionKind::RangeEnd,
1640 resolution.confidence,
1641 0,
1642 flags,
1643 ));
1644 }
1645 }
1646
1647 results
1648 }
1649
1650 #[cfg(feature = "temporal_track")]
1651 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1652 let mut result = TemporalMentionFlags::empty();
1653 if flags
1654 .iter()
1655 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1656 {
1657 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1658 }
1659 if flags
1660 .iter()
1661 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1662 {
1663 result = result.set(TemporalMentionFlags::DERIVED, true);
1664 }
1665 result
1666 }
1667
1668 #[cfg(feature = "temporal_track")]
1669 fn date_to_timestamp(date: Date) -> i64 {
1670 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1671 .assume_offset(UtcOffset::UTC)
1672 .unix_timestamp()
1673 }
1674
1675 #[cfg(feature = "temporal_track")]
1676 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1677 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1678 while let Some(next) = date.next_day() {
1679 if next.month() == month {
1680 date = next;
1681 } else {
1682 break;
1683 }
1684 }
1685 Some(date)
1686 }
1687
1688 #[allow(dead_code)]
1689 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1690 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1691 return Ok(false);
1692 }
1693
1694 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1695 Some(artifact) => artifact,
1696 None => return Ok(false),
1697 };
1698
1699 let segment_id = self.toc.segment_catalog.next_segment_id;
1700 #[cfg(feature = "parallel_segments")]
1701 let span =
1702 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1703
1704 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1705 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1706 #[cfg(feature = "parallel_segments")]
1707 if let Some(span) = span {
1708 Self::decorate_segment_common(&mut descriptor.common, span);
1709 }
1710 #[cfg(feature = "parallel_segments")]
1711 let descriptor_for_manifest = descriptor.clone();
1712 self.toc.segment_catalog.lex_segments.push(descriptor);
1713 #[cfg(feature = "parallel_segments")]
1714 if let Err(err) = self.record_index_segment(
1715 SegmentKind::Lexical,
1716 descriptor_for_manifest.common,
1717 SegmentStats {
1718 doc_count: artifact.doc_count,
1719 vector_count: 0,
1720 time_entries: 0,
1721 bytes_uncompressed: artifact.bytes.len() as u64,
1722 build_micros: 0,
1723 },
1724 ) {
1725 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1726 }
1727 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1728 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1729 Ok(true)
1730 }
1731
1732 #[allow(dead_code)]
1733 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1734 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1735 return Ok(false);
1736 }
1737
1738 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1739 Some(artifact) => artifact,
1740 None => return Ok(false),
1741 };
1742
1743 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1744 if existing_dim != artifact.dimension {
1745 return Err(MemvidError::VecDimensionMismatch {
1746 expected: existing_dim,
1747 actual: artifact.dimension as usize,
1748 });
1749 }
1750 }
1751
1752 let segment_id = self.toc.segment_catalog.next_segment_id;
1753 #[cfg(feature = "parallel_segments")]
1754 #[cfg(feature = "parallel_segments")]
1755 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1756
1757 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1758 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1759 #[cfg(feature = "parallel_segments")]
1760 if let Some(span) = span {
1761 Self::decorate_segment_common(&mut descriptor.common, span);
1762 }
1763 #[cfg(feature = "parallel_segments")]
1764 let descriptor_for_manifest = descriptor.clone();
1765 self.toc.segment_catalog.vec_segments.push(descriptor);
1766 #[cfg(feature = "parallel_segments")]
1767 if let Err(err) = self.record_index_segment(
1768 SegmentKind::Vector,
1769 descriptor_for_manifest.common,
1770 SegmentStats {
1771 doc_count: 0,
1772 vector_count: artifact.vector_count,
1773 time_entries: 0,
1774 bytes_uncompressed: artifact.bytes_uncompressed,
1775 build_micros: 0,
1776 },
1777 ) {
1778 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1779 }
1780 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1781 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1782
1783 if self.toc.indexes.vec.is_none() {
1785 let empty_offset = self.data_end;
1786 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1787 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1788 self.toc.indexes.vec = Some(VecIndexManifest {
1789 vector_count: 0,
1790 dimension: 0,
1791 bytes_offset: empty_offset,
1792 bytes_length: 0,
1793 checksum: empty_checksum,
1794 compression_mode: self.vec_compression.clone(),
1795 });
1796 }
1797 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1798 if manifest.dimension == 0 {
1799 manifest.dimension = artifact.dimension;
1800 }
1801 if manifest.bytes_length == 0 {
1802 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1803 manifest.compression_mode = artifact.compression.clone();
1804 }
1805 }
1806
1807 self.vec_enabled = true;
1808 Ok(true)
1809 }
1810
1811 #[allow(dead_code)]
1812 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1813 if delta.inserted_time_entries.is_empty() {
1814 return Ok(false);
1815 }
1816
1817 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1818 Some(artifact) => artifact,
1819 None => return Ok(false),
1820 };
1821
1822 let segment_id = self.toc.segment_catalog.next_segment_id;
1823 #[cfg(feature = "parallel_segments")]
1824 #[cfg(feature = "parallel_segments")]
1825 let span = self.segment_span_from_iter(
1826 delta
1827 .inserted_time_entries
1828 .iter()
1829 .map(|entry| entry.frame_id),
1830 );
1831
1832 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1833 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1834 #[cfg(feature = "parallel_segments")]
1835 if let Some(span) = span {
1836 Self::decorate_segment_common(&mut descriptor.common, span);
1837 }
1838 #[cfg(feature = "parallel_segments")]
1839 let descriptor_for_manifest = descriptor.clone();
1840 self.toc.segment_catalog.time_segments.push(descriptor);
1841 #[cfg(feature = "parallel_segments")]
1842 if let Err(err) = self.record_index_segment(
1843 SegmentKind::Time,
1844 descriptor_for_manifest.common,
1845 SegmentStats {
1846 doc_count: 0,
1847 vector_count: 0,
1848 time_entries: artifact.entry_count,
1849 bytes_uncompressed: artifact.bytes.len() as u64,
1850 build_micros: 0,
1851 },
1852 ) {
1853 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1854 }
1855 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1856 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1857 Ok(true)
1858 }
1859
1860 #[cfg(feature = "temporal_track")]
1861 #[allow(dead_code)]
1862 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1863 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1864 {
1865 return Ok(false);
1866 }
1867
1868 debug_assert!(
1869 delta.inserted_temporal_mentions.len() < 1_000_000,
1870 "temporal delta mentions unexpectedly large: {}",
1871 delta.inserted_temporal_mentions.len()
1872 );
1873 debug_assert!(
1874 delta.inserted_temporal_anchors.len() < 1_000_000,
1875 "temporal delta anchors unexpectedly large: {}",
1876 delta.inserted_temporal_anchors.len()
1877 );
1878
1879 let artifact = match self.build_temporal_segment_from_records(
1880 &delta.inserted_temporal_mentions,
1881 &delta.inserted_temporal_anchors,
1882 )? {
1883 Some(artifact) => artifact,
1884 None => return Ok(false),
1885 };
1886
1887 let segment_id = self.toc.segment_catalog.next_segment_id;
1888 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1889 self.toc
1890 .segment_catalog
1891 .temporal_segments
1892 .push(descriptor.clone());
1893 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1894 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1895
1896 self.toc.temporal_track = Some(TemporalTrackManifest {
1897 bytes_offset: descriptor.common.bytes_offset,
1898 bytes_length: descriptor.common.bytes_length,
1899 entry_count: artifact.entry_count,
1900 anchor_count: artifact.anchor_count,
1901 checksum: artifact.checksum,
1902 flags: artifact.flags,
1903 });
1904
1905 self.clear_temporal_track_cache();
1906
1907 Ok(true)
1908 }
1909
1910 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1911 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
1912 frame_id,
1913 reason: "frame id too large",
1914 })?;
1915 let frame = self
1916 .toc
1917 .frames
1918 .get_mut(index)
1919 .ok_or(MemvidError::InvalidFrame {
1920 frame_id,
1921 reason: "supersede target missing",
1922 })?;
1923 frame.status = FrameStatus::Superseded;
1924 frame.superseded_by = Some(successor_id);
1925 self.remove_frame_from_indexes(frame_id)
1926 }
1927
1928 pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1929 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1930 return Ok(());
1931 }
1932
1933 let payload_end = self.payload_region_end();
1934 self.data_end = payload_end;
1935 let safe_truncate_len = self.header.footer_offset.max(payload_end);
1938 if self.file.metadata()?.len() > safe_truncate_len {
1939 self.file.set_len(safe_truncate_len)?;
1940 }
1941 self.file.seek(SeekFrom::Start(payload_end))?;
1942
1943 self.toc.segment_catalog.lex_segments.clear();
1945 self.toc.segment_catalog.vec_segments.clear();
1946 self.toc.segment_catalog.time_segments.clear();
1947 #[cfg(feature = "temporal_track")]
1948 self.toc.segment_catalog.temporal_segments.clear();
1949 #[cfg(feature = "parallel_segments")]
1950 self.toc.segment_catalog.index_segments.clear();
1951 self.toc.segment_catalog.tantivy_segments.clear();
1953 self.toc.indexes.lex_segments.clear();
1955
1956 let mut time_entries: Vec<TimeIndexEntry> = self
1957 .toc
1958 .frames
1959 .iter()
1960 .filter(|frame| {
1961 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1962 })
1963 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1964 .collect();
1965 let (ti_offset, ti_length, ti_checksum) =
1966 time_index_append(&mut self.file, &mut time_entries)?;
1967 self.toc.time_index = Some(TimeIndexManifest {
1968 bytes_offset: ti_offset,
1969 bytes_length: ti_length,
1970 entry_count: time_entries.len() as u64,
1971 checksum: ti_checksum,
1972 });
1973
1974 let mut footer_offset = ti_offset + ti_length;
1975
1976 #[cfg(feature = "temporal_track")]
1977 {
1978 self.toc.temporal_track = None;
1979 self.toc.segment_catalog.temporal_segments.clear();
1980 self.clear_temporal_track_cache();
1981 }
1982
1983 if self.lex_enabled {
1984 #[cfg(feature = "lex")]
1985 {
1986 if let Ok(mut storage) = self.lex_storage.write() {
1988 storage.clear();
1989 storage.set_generation(0);
1990 }
1991
1992 self.init_tantivy()?;
1994
1995 if let Some(mut engine) = self.tantivy.take() {
1996 self.rebuild_tantivy_engine(&mut engine)?;
1997 self.tantivy = Some(engine);
1998 } else {
1999 return Err(MemvidError::InvalidToc {
2000 reason: "tantivy engine missing during doctor rebuild".into(),
2001 });
2002 }
2003
2004 self.lex_enabled = true;
2006
2007 self.tantivy_dirty = true;
2009
2010 self.data_end = footer_offset;
2012
2013 self.flush_tantivy()?;
2015
2016 footer_offset = self.header.footer_offset;
2018
2019 self.data_end = payload_end;
2021 }
2022 #[cfg(not(feature = "lex"))]
2023 {
2024 self.toc.indexes.lex = None;
2025 self.toc.indexes.lex_segments.clear();
2026 }
2027 } else {
2028 self.toc.indexes.lex = None;
2030 self.toc.indexes.lex_segments.clear();
2031 #[cfg(feature = "lex")]
2032 if let Ok(mut storage) = self.lex_storage.write() {
2033 storage.clear();
2034 }
2035 }
2036
2037 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2038 let vec_offset = footer_offset;
2039 self.file.seek(SeekFrom::Start(vec_offset))?;
2040 self.file.write_all(&artifact.bytes)?;
2041 footer_offset += artifact.bytes.len() as u64;
2042 self.toc.indexes.vec = Some(VecIndexManifest {
2043 vector_count: artifact.vector_count,
2044 dimension: artifact.dimension,
2045 bytes_offset: vec_offset,
2046 bytes_length: artifact.bytes.len() as u64,
2047 checksum: artifact.checksum,
2048 compression_mode: self.vec_compression.clone(),
2049 });
2050 self.vec_index = Some(index);
2051 } else {
2052 if !self.vec_enabled {
2054 self.toc.indexes.vec = None;
2055 }
2056 self.vec_index = None;
2057 }
2058
2059 if self.clip_enabled {
2061 if let Some(ref clip_index) = self.clip_index {
2062 if !clip_index.is_empty() {
2063 let artifact = clip_index.encode()?;
2064 let clip_offset = footer_offset;
2065 self.file.seek(SeekFrom::Start(clip_offset))?;
2066 self.file.write_all(&artifact.bytes)?;
2067 footer_offset += artifact.bytes.len() as u64;
2068 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2069 bytes_offset: clip_offset,
2070 bytes_length: artifact.bytes.len() as u64,
2071 vector_count: artifact.vector_count,
2072 dimension: artifact.dimension,
2073 checksum: artifact.checksum,
2074 model_name: crate::clip::default_model_info().name.to_string(),
2075 });
2076 tracing::info!(
2077 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2078 artifact.vector_count,
2079 clip_offset
2080 );
2081 }
2082 }
2083 } else {
2084 self.toc.indexes.clip = None;
2085 }
2086
2087 if self.memories_track.card_count() > 0 {
2089 let memories_offset = footer_offset;
2090 let memories_bytes = self.memories_track.serialize()?;
2091 let memories_checksum = blake3::hash(&memories_bytes).into();
2092 self.file.seek(SeekFrom::Start(memories_offset))?;
2093 self.file.write_all(&memories_bytes)?;
2094 footer_offset += memories_bytes.len() as u64;
2095
2096 let stats = self.memories_track.stats();
2097 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2098 bytes_offset: memories_offset,
2099 bytes_length: memories_bytes.len() as u64,
2100 card_count: stats.card_count as u64,
2101 entity_count: stats.entity_count as u64,
2102 checksum: memories_checksum,
2103 });
2104 } else {
2105 self.toc.memories_track = None;
2106 }
2107
2108 if self.logic_mesh.is_empty() {
2110 self.toc.logic_mesh = None;
2111 } else {
2112 let mesh_offset = footer_offset;
2113 let mesh_bytes = self.logic_mesh.serialize()?;
2114 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2115 self.file.seek(SeekFrom::Start(mesh_offset))?;
2116 self.file.write_all(&mesh_bytes)?;
2117 footer_offset += mesh_bytes.len() as u64;
2118
2119 let stats = self.logic_mesh.stats();
2120 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2121 bytes_offset: mesh_offset,
2122 bytes_length: mesh_bytes.len() as u64,
2123 node_count: stats.node_count as u64,
2124 edge_count: stats.edge_count as u64,
2125 checksum: mesh_checksum,
2126 });
2127 }
2128
2129 tracing::info!(
2131 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2132 ti_offset,
2133 ti_length,
2134 footer_offset,
2135 self.header.footer_offset
2136 );
2137
2138 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2141
2142 if self.file.metadata()?.len() < self.header.footer_offset {
2144 self.file.set_len(self.header.footer_offset)?;
2145 }
2146
2147 self.rewrite_toc_footer()?;
2148 self.header.toc_checksum = self.toc.toc_checksum;
2149 crate::persist_header(&mut self.file, &self.header)?;
2150
2151 #[cfg(feature = "lex")]
2152 if self.lex_enabled {
2153 if let Some(ref engine) = self.tantivy {
2154 let doc_count = engine.num_docs();
2155 let active_frame_count = self
2156 .toc
2157 .frames
2158 .iter()
2159 .filter(|f| f.status == FrameStatus::Active)
2160 .count();
2161
2162 let text_indexable_count = self
2165 .toc
2166 .frames
2167 .iter()
2168 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2169 .count();
2170
2171 if doc_count == 0 && text_indexable_count > 0 {
2174 return Err(MemvidError::Doctor {
2175 reason: format!(
2176 "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2177 This indicates a critical failure in the rebuild process."
2178 ),
2179 });
2180 }
2181
2182 log::info!(
2184 "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2185 );
2186 }
2187 }
2188
2189 Ok(())
2190 }
2191
2192 fn persist_memories_track(&mut self) -> Result<()> {
2197 if self.memories_track.card_count() == 0 {
2198 self.toc.memories_track = None;
2199 return Ok(());
2200 }
2201
2202 let memories_offset = self.header.footer_offset;
2204 let memories_bytes = self.memories_track.serialize()?;
2205 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2206
2207 self.file.seek(SeekFrom::Start(memories_offset))?;
2208 self.file.write_all(&memories_bytes)?;
2209
2210 let stats = self.memories_track.stats();
2211 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2212 bytes_offset: memories_offset,
2213 bytes_length: memories_bytes.len() as u64,
2214 card_count: stats.card_count as u64,
2215 entity_count: stats.entity_count as u64,
2216 checksum: memories_checksum,
2217 });
2218
2219 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2221
2222 if self.file.metadata()?.len() < self.header.footer_offset {
2224 self.file.set_len(self.header.footer_offset)?;
2225 }
2226
2227 Ok(())
2228 }
2229
2230 fn persist_clip_index(&mut self) -> Result<()> {
2235 if !self.clip_enabled {
2236 self.toc.indexes.clip = None;
2237 return Ok(());
2238 }
2239
2240 let clip_index = match &self.clip_index {
2241 Some(idx) if !idx.is_empty() => idx,
2242 _ => {
2243 self.toc.indexes.clip = None;
2244 return Ok(());
2245 }
2246 };
2247
2248 let artifact = clip_index.encode()?;
2250
2251 let clip_offset = self.header.footer_offset;
2253 self.file.seek(SeekFrom::Start(clip_offset))?;
2254 self.file.write_all(&artifact.bytes)?;
2255
2256 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2257 bytes_offset: clip_offset,
2258 bytes_length: artifact.bytes.len() as u64,
2259 vector_count: artifact.vector_count,
2260 dimension: artifact.dimension,
2261 checksum: artifact.checksum,
2262 model_name: crate::clip::default_model_info().name.to_string(),
2263 });
2264
2265 tracing::info!(
2266 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2267 artifact.vector_count,
2268 clip_offset
2269 );
2270
2271 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2273
2274 if self.file.metadata()?.len() < self.header.footer_offset {
2276 self.file.set_len(self.header.footer_offset)?;
2277 }
2278
2279 Ok(())
2280 }
2281
2282 fn persist_logic_mesh(&mut self) -> Result<()> {
2287 if self.logic_mesh.is_empty() {
2288 self.toc.logic_mesh = None;
2289 return Ok(());
2290 }
2291
2292 let mesh_offset = self.header.footer_offset;
2294 let mesh_bytes = self.logic_mesh.serialize()?;
2295 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2296
2297 self.file.seek(SeekFrom::Start(mesh_offset))?;
2298 self.file.write_all(&mesh_bytes)?;
2299
2300 let stats = self.logic_mesh.stats();
2301 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2302 bytes_offset: mesh_offset,
2303 bytes_length: mesh_bytes.len() as u64,
2304 node_count: stats.node_count as u64,
2305 edge_count: stats.edge_count as u64,
2306 checksum: mesh_checksum,
2307 });
2308
2309 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2311
2312 if self.file.metadata()?.len() < self.header.footer_offset {
2314 self.file.set_len(self.header.footer_offset)?;
2315 }
2316
2317 Ok(())
2318 }
2319
2320 fn persist_sketch_track(&mut self) -> Result<()> {
2325 if self.sketch_track.is_empty() {
2326 self.toc.sketch_track = None;
2327 return Ok(());
2328 }
2329
2330 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2332
2333 let (sketch_offset, sketch_length, sketch_checksum) =
2335 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2336
2337 let stats = self.sketch_track.stats();
2338 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2339 bytes_offset: sketch_offset,
2340 bytes_length: sketch_length,
2341 entry_count: stats.entry_count,
2342 #[allow(clippy::cast_possible_truncation)]
2343 entry_size: stats.variant.entry_size() as u16,
2344 flags: 0,
2345 checksum: sketch_checksum,
2346 });
2347
2348 self.header.footer_offset = sketch_offset + sketch_length;
2350
2351 if self.file.metadata()?.len() < self.header.footer_offset {
2353 self.file.set_len(self.header.footer_offset)?;
2354 }
2355
2356 tracing::debug!(
2357 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2358 stats.entry_count,
2359 sketch_offset
2360 );
2361
2362 Ok(())
2363 }
2364
2365 #[cfg(feature = "lex")]
2366 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2367 let LexWalBatch {
2368 generation,
2369 doc_count,
2370 checksum,
2371 segments,
2372 } = batch;
2373
2374 if let Ok(mut storage) = self.lex_storage.write() {
2375 storage.replace(doc_count, checksum, segments);
2376 storage.set_generation(generation);
2377 }
2378
2379 self.persist_lex_manifest()
2380 }
2381
2382 #[cfg(feature = "lex")]
2383 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2384 let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2385 self.append_wal_entry(&payload)?;
2386 Ok(())
2387 }
2388
2389 #[cfg(feature = "lex")]
2390 fn persist_lex_manifest(&mut self) -> Result<()> {
2391 let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2392 storage.to_manifest()
2393 } else {
2394 (None, Vec::new())
2395 };
2396
2397 if let Some(storage_manifest) = index_manifest {
2399 self.toc.indexes.lex = Some(storage_manifest);
2401 } else {
2402 self.toc.indexes.lex = None;
2405 }
2406
2407 self.toc.indexes.lex_segments = segments;
2408
2409 self.rewrite_toc_footer()?;
2413 self.header.toc_checksum = self.toc.toc_checksum;
2414 crate::persist_header(&mut self.file, &self.header)?;
2415 Ok(())
2416 }
2417
2418 #[cfg(feature = "lex")]
2419 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2420 let TantivySnapshot {
2421 doc_count,
2422 checksum,
2423 segments,
2424 } = snapshot;
2425
2426 let mut footer_offset = self.data_end;
2427 self.file.seek(SeekFrom::Start(footer_offset))?;
2428
2429 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2430 for segment in segments {
2431 let bytes_length = segment.bytes.len() as u64;
2432 self.file.write_all(&segment.bytes)?;
2433 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2435 path: segment.path,
2436 bytes_offset: footer_offset,
2437 bytes_length,
2438 checksum: segment.checksum,
2439 });
2440 footer_offset += bytes_length;
2441 }
2442 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2447
2448 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2449 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2450 Vec::with_capacity(embedded_segments.len());
2451 for segment in &embedded_segments {
2452 let descriptor = TantivySegmentDescriptor::from_common(
2453 SegmentCommon::new(
2454 next_segment_id,
2455 segment.bytes_offset,
2456 segment.bytes_length,
2457 segment.checksum,
2458 ),
2459 segment.path.clone(),
2460 );
2461 catalog_segments.push(descriptor);
2462 next_segment_id = next_segment_id.saturating_add(1);
2463 }
2464 if catalog_segments.is_empty() {
2465 self.toc.segment_catalog.tantivy_segments.clear();
2466 } else {
2467 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2468 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2469 }
2470 self.toc.segment_catalog.next_segment_id = next_segment_id;
2471
2472 let generation = self
2479 .lex_storage
2480 .write()
2481 .map_err(|_| MemvidError::Tantivy {
2482 reason: "embedded lex storage lock poisoned".into(),
2483 })
2484 .map(|mut storage| {
2485 storage.replace(doc_count, checksum, embedded_segments.clone());
2486 storage.generation()
2487 })?;
2488
2489 let batch = LexWalBatch {
2490 generation,
2491 doc_count,
2492 checksum,
2493 segments: embedded_segments.clone(),
2494 };
2495 self.append_lex_batch(&batch)?;
2496 self.persist_lex_manifest()?;
2497 self.header.toc_checksum = self.toc.toc_checksum;
2498 crate::persist_header(&mut self.file, &self.header)?;
2499 Ok(())
2500 }
2501
2502 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2503 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2504 frame_id,
2505 reason: "frame id too large",
2506 })?;
2507 let frame = self
2508 .toc
2509 .frames
2510 .get_mut(index)
2511 .ok_or(MemvidError::InvalidFrame {
2512 frame_id,
2513 reason: "delete target missing",
2514 })?;
2515 frame.status = FrameStatus::Deleted;
2516 frame.superseded_by = None;
2517 self.remove_frame_from_indexes(frame_id)
2518 }
2519
2520 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2521 #[cfg(feature = "lex")]
2522 if let Some(engine) = self.tantivy.as_mut() {
2523 engine.delete_frame(frame_id)?;
2524 self.tantivy_dirty = true;
2525 }
2526 if let Some(index) = self.lex_index.as_mut() {
2527 index.remove_document(frame_id);
2528 }
2529 if let Some(index) = self.vec_index.as_mut() {
2530 index.remove(frame_id);
2531 }
2532 Ok(())
2533 }
2534
2535 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2536 let Ok(index) = usize::try_from(frame_id) else {
2537 return false;
2538 };
2539 self.toc
2540 .frames
2541 .get(index)
2542 .is_some_and(|frame| frame.status == FrameStatus::Active)
2543 }
2544
2545 #[cfg(feature = "parallel_segments")]
2546 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2547 where
2548 I: IntoIterator<Item = FrameId>,
2549 {
2550 let mut iter = iter.into_iter();
2551 let first_id = iter.next()?;
2552 let first_frame = self.toc.frames.get(first_id as usize);
2553 let mut min_id = first_id;
2554 let mut max_id = first_id;
2555 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2556 let mut page_end = first_frame
2557 .and_then(|frame| frame.chunk_count)
2558 .map(|count| page_start + count.saturating_sub(1))
2559 .unwrap_or(page_start);
2560 for frame_id in iter {
2561 if frame_id < min_id {
2562 min_id = frame_id;
2563 }
2564 if frame_id > max_id {
2565 max_id = frame_id;
2566 }
2567 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2568 if let Some(idx) = frame.chunk_index {
2569 page_start = page_start.min(idx);
2570 if let Some(count) = frame.chunk_count {
2571 let end = idx + count.saturating_sub(1);
2572 page_end = page_end.max(end);
2573 } else {
2574 page_end = page_end.max(idx);
2575 }
2576 }
2577 }
2578 }
2579 Some(SegmentSpan {
2580 frame_start: min_id,
2581 frame_end: max_id,
2582 page_start,
2583 page_end,
2584 ..SegmentSpan::default()
2585 })
2586 }
2587
2588 #[cfg(feature = "parallel_segments")]
2589 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2590 common.span = Some(span);
2591 if common.codec_version == 0 {
2592 common.codec_version = 1;
2593 }
2594 }
2595
2596 #[cfg(feature = "parallel_segments")]
2597 pub(crate) fn record_index_segment(
2598 &mut self,
2599 kind: SegmentKind,
2600 common: SegmentCommon,
2601 stats: SegmentStats,
2602 ) -> Result<()> {
2603 let entry = IndexSegmentRef {
2604 kind,
2605 common,
2606 stats,
2607 };
2608 self.toc.segment_catalog.index_segments.push(entry.clone());
2609 if let Some(wal) = self.manifest_wal.as_mut() {
2610 wal.append_segments(&[entry])?;
2611 }
2612 Ok(())
2613 }
2614
2615 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2616 self.ensure_writable()?;
2617 if self.toc.ticket_ref.issuer == "free-tier" {
2618 return Ok(());
2619 }
2620 match self.tier() {
2621 Tier::Free => Ok(()),
2622 tier => {
2623 if self.toc.ticket_ref.issuer.trim().is_empty() {
2624 Err(MemvidError::TicketRequired { tier })
2625 } else {
2626 Ok(())
2627 }
2628 }
2629 }
2630 }
2631
2632 pub(crate) fn tier(&self) -> Tier {
2633 if self.header.wal_size >= WAL_SIZE_LARGE {
2634 Tier::Enterprise
2635 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2636 Tier::Dev
2637 } else {
2638 Tier::Free
2639 }
2640 }
2641
2642 pub(crate) fn capacity_limit(&self) -> u64 {
2643 if self.toc.ticket_ref.capacity_bytes != 0 {
2644 self.toc.ticket_ref.capacity_bytes
2645 } else {
2646 self.tier().capacity_bytes()
2647 }
2648 }
2649
2650 #[must_use]
2655 pub fn get_capacity(&self) -> u64 {
2656 self.capacity_limit()
2657 }
2658
2659 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2660 tracing::info!(
2661 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2662 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2663 time_segments = self.toc.segment_catalog.time_segments.len(),
2664 footer_offset = self.header.footer_offset,
2665 data_end = self.data_end,
2666 "rewrite_toc_footer: about to serialize TOC"
2667 );
2668 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2669 let footer_offset = self.header.footer_offset;
2670 self.file.seek(SeekFrom::Start(footer_offset))?;
2671 self.file.write_all(&toc_bytes)?;
2672 let footer = CommitFooter {
2673 toc_len: toc_bytes.len() as u64,
2674 toc_hash: *hash(&toc_bytes).as_bytes(),
2675 generation: self.generation,
2676 };
2677 let encoded_footer = footer.encode();
2678 self.file.write_all(&encoded_footer)?;
2679
2680 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2682 let min_len = self.header.wal_offset + self.header.wal_size;
2683 let final_len = new_len.max(min_len);
2684
2685 if new_len < min_len {
2686 tracing::warn!(
2687 file.new_len = new_len,
2688 file.min_len = min_len,
2689 file.final_len = final_len,
2690 "truncation would cut into WAL region, clamping to min_len"
2691 );
2692 }
2693
2694 self.file.set_len(final_len)?;
2695 self.file.sync_all()?;
2697 Ok(())
2698 }
2699}
2700
2701#[cfg(feature = "parallel_segments")]
2702impl Memvid {
2703 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2704 let chunks = self.collect_segment_chunks(delta)?;
2705 if chunks.is_empty() {
2706 return Ok(false);
2707 }
2708 let planner = SegmentPlanner::new(opts.clone());
2709 let plans = planner.plan_from_chunks(chunks);
2710 if plans.is_empty() {
2711 return Ok(false);
2712 }
2713 let worker_pool = SegmentWorkerPool::new(opts);
2714 let results = worker_pool.execute(plans)?;
2715 if results.is_empty() {
2716 return Ok(false);
2717 }
2718 self.append_parallel_segments(results)?;
2719 Ok(true)
2720 }
2721
2722 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2723 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2724 delta.inserted_embeddings.iter().cloned().collect();
2725 tracing::info!(
2726 inserted_frames = ?delta.inserted_frames,
2727 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2728 "collect_segment_chunks: comparing frame IDs"
2729 );
2730 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2731 for frame_id in &delta.inserted_frames {
2732 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2733 MemvidError::InvalidFrame {
2734 frame_id: *frame_id,
2735 reason: "frame id out of range while planning segments",
2736 },
2737 )?;
2738 let text = self.frame_content(&frame)?;
2739 if text.trim().is_empty() {
2740 continue;
2741 }
2742 let token_estimate = estimate_tokens(&text);
2743 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2744 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2745 let page_start = if frame.chunk_index.is_some() {
2746 chunk_index + 1
2747 } else {
2748 0
2749 };
2750 let page_end = if frame.chunk_index.is_some() {
2751 page_start
2752 } else {
2753 0
2754 };
2755 chunks.push(SegmentChunkPlan {
2756 text,
2757 frame_id: *frame_id,
2758 timestamp: frame.timestamp,
2759 chunk_index,
2760 chunk_count: chunk_count.max(1),
2761 token_estimate,
2762 token_start: 0,
2763 token_end: 0,
2764 page_start,
2765 page_end,
2766 embedding: embedding_map.remove(frame_id),
2767 });
2768 }
2769 Ok(chunks)
2770 }
2771}
2772
2773#[cfg(feature = "parallel_segments")]
2774fn estimate_tokens(text: &str) -> usize {
2775 text.split_whitespace().count().max(1)
2776}
2777
2778impl Memvid {
2779 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2780 let catalog_end = self.catalog_data_end();
2781 if catalog_end <= self.header.footer_offset {
2782 return Ok(false);
2783 }
2784 self.header.footer_offset = catalog_end;
2785 self.rewrite_toc_footer()?;
2786 self.header.toc_checksum = self.toc.toc_checksum;
2787 crate::persist_header(&mut self.file, &self.header)?;
2788 Ok(true)
2789 }
2790}
2791
2792impl Memvid {
2793 pub fn vacuum(&mut self) -> Result<()> {
2794 self.commit()?;
2795
2796 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2797 let frames: Vec<Frame> = self
2798 .toc
2799 .frames
2800 .iter()
2801 .filter(|frame| frame.status == FrameStatus::Active)
2802 .cloned()
2803 .collect();
2804 for frame in frames {
2805 let bytes = self.read_frame_payload_bytes(&frame)?;
2806 active_payloads.insert(frame.id, bytes);
2807 }
2808
2809 let mut cursor = self.header.wal_offset + self.header.wal_size;
2810 self.file.seek(SeekFrom::Start(cursor))?;
2811 for frame in &mut self.toc.frames {
2812 if frame.status == FrameStatus::Active {
2813 if let Some(bytes) = active_payloads.get(&frame.id) {
2814 self.file.write_all(bytes)?;
2815 frame.payload_offset = cursor;
2816 frame.payload_length = bytes.len() as u64;
2817 cursor += bytes.len() as u64;
2818 } else {
2819 frame.payload_offset = 0;
2820 frame.payload_length = 0;
2821 }
2822 } else {
2823 frame.payload_offset = 0;
2824 frame.payload_length = 0;
2825 }
2826 }
2827
2828 self.data_end = cursor;
2829
2830 self.toc.segments.clear();
2831 self.toc.indexes.lex_segments.clear();
2832 self.toc.segment_catalog.lex_segments.clear();
2833 self.toc.segment_catalog.vec_segments.clear();
2834 self.toc.segment_catalog.time_segments.clear();
2835 #[cfg(feature = "temporal_track")]
2836 {
2837 self.toc.temporal_track = None;
2838 self.toc.segment_catalog.temporal_segments.clear();
2839 }
2840 #[cfg(feature = "lex")]
2841 {
2842 self.toc.segment_catalog.tantivy_segments.clear();
2843 }
2844 #[cfg(feature = "parallel_segments")]
2845 {
2846 self.toc.segment_catalog.index_segments.clear();
2847 }
2848
2849 #[cfg(feature = "lex")]
2851 {
2852 self.tantivy = None;
2853 self.tantivy_dirty = false;
2854 }
2855
2856 self.rebuild_indexes(&[])?;
2857 self.file.sync_all()?;
2858 Ok(())
2859 }
2860
2861 #[must_use]
2879 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2880 plan_document_chunks(payload).map(|plan| plan.chunks)
2881 }
2882
2883 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2885 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2886 }
2887
2888 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2890 self.put_internal(Some(payload), None, None, None, options, None)
2891 }
2892
2893 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2895 self.put_internal(
2896 Some(payload),
2897 None,
2898 Some(embedding),
2899 None,
2900 PutOptions::default(),
2901 None,
2902 )
2903 }
2904
2905 pub fn put_with_embedding_and_options(
2906 &mut self,
2907 payload: &[u8],
2908 embedding: Vec<f32>,
2909 options: PutOptions,
2910 ) -> Result<u64> {
2911 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2912 }
2913
2914 pub fn put_with_chunk_embeddings(
2927 &mut self,
2928 payload: &[u8],
2929 parent_embedding: Option<Vec<f32>>,
2930 chunk_embeddings: Vec<Vec<f32>>,
2931 options: PutOptions,
2932 ) -> Result<u64> {
2933 self.put_internal(
2934 Some(payload),
2935 None,
2936 parent_embedding,
2937 Some(chunk_embeddings),
2938 options,
2939 None,
2940 )
2941 }
2942
2943 pub fn update_frame(
2945 &mut self,
2946 frame_id: FrameId,
2947 payload: Option<Vec<u8>>,
2948 mut options: PutOptions,
2949 embedding: Option<Vec<f32>>,
2950 ) -> Result<u64> {
2951 self.ensure_mutation_allowed()?;
2952 let existing = self.frame_by_id(frame_id)?;
2953 if existing.status != FrameStatus::Active {
2954 return Err(MemvidError::InvalidFrame {
2955 frame_id,
2956 reason: "frame is not active",
2957 });
2958 }
2959
2960 if options.timestamp.is_none() {
2961 options.timestamp = Some(existing.timestamp);
2962 }
2963 if options.track.is_none() {
2964 options.track = existing.track.clone();
2965 }
2966 if options.kind.is_none() {
2967 options.kind = existing.kind.clone();
2968 }
2969 if options.uri.is_none() {
2970 options.uri = existing.uri.clone();
2971 }
2972 if options.title.is_none() {
2973 options.title = existing.title.clone();
2974 }
2975 if options.metadata.is_none() {
2976 options.metadata = existing.metadata.clone();
2977 }
2978 if options.search_text.is_none() {
2979 options.search_text = existing.search_text.clone();
2980 }
2981 if options.tags.is_empty() {
2982 options.tags = existing.tags.clone();
2983 }
2984 if options.labels.is_empty() {
2985 options.labels = existing.labels.clone();
2986 }
2987 if options.extra_metadata.is_empty() {
2988 options.extra_metadata = existing.extra_metadata.clone();
2989 }
2990
2991 let reuse_frame = if payload.is_none() {
2992 options.auto_tag = false;
2993 options.extract_dates = false;
2994 Some(existing.clone())
2995 } else {
2996 None
2997 };
2998
2999 let effective_embedding = if let Some(explicit) = embedding {
3000 Some(explicit)
3001 } else if self.vec_enabled {
3002 self.frame_embedding(frame_id)?
3003 } else {
3004 None
3005 };
3006
3007 let payload_slice = payload.as_deref();
3008 let reuse_flag = reuse_frame.is_some();
3009 let replace_flag = payload_slice.is_some();
3010 let seq = self.put_internal(
3011 payload_slice,
3012 reuse_frame,
3013 effective_embedding,
3014 None, options,
3016 Some(frame_id),
3017 )?;
3018 info!(
3019 "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3020 );
3021 Ok(seq)
3022 }
3023
3024 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3025 self.ensure_mutation_allowed()?;
3026 let frame = self.frame_by_id(frame_id)?;
3027 if frame.status != FrameStatus::Active {
3028 return Err(MemvidError::InvalidFrame {
3029 frame_id,
3030 reason: "frame is not active",
3031 });
3032 }
3033
3034 let mut tombstone = WalEntryData {
3035 timestamp: SystemTime::now()
3036 .duration_since(UNIX_EPOCH)
3037 .map(|d| d.as_secs() as i64)
3038 .unwrap_or(frame.timestamp),
3039 kind: None,
3040 track: None,
3041 payload: Vec::new(),
3042 embedding: None,
3043 uri: frame.uri.clone(),
3044 title: frame.title.clone(),
3045 canonical_encoding: frame.canonical_encoding,
3046 canonical_length: frame.canonical_length,
3047 metadata: None,
3048 search_text: None,
3049 tags: Vec::new(),
3050 labels: Vec::new(),
3051 extra_metadata: BTreeMap::new(),
3052 content_dates: Vec::new(),
3053 chunk_manifest: None,
3054 role: frame.role,
3055 parent_sequence: None,
3056 chunk_index: frame.chunk_index,
3057 chunk_count: frame.chunk_count,
3058 op: FrameWalOp::Tombstone,
3059 target_frame_id: Some(frame_id),
3060 supersedes_frame_id: None,
3061 reuse_payload_from: None,
3062 source_sha256: None,
3063 source_path: None,
3064 enrichment_state: crate::types::EnrichmentState::default(),
3065 };
3066 tombstone.kind = frame.kind.clone();
3067 tombstone.track = frame.track.clone();
3068
3069 let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3070 let seq = self.append_wal_entry(&payload_bytes)?;
3071 self.dirty = true;
3072 if self.wal.should_checkpoint() {
3073 self.commit()?;
3074 }
3075 info!("frame_delete frame_id={frame_id} seq={seq}");
3076 Ok(seq)
3077 }
3078}
3079
3080impl Memvid {
3081 fn put_internal(
3082 &mut self,
3083 payload: Option<&[u8]>,
3084 reuse_frame: Option<Frame>,
3085 embedding: Option<Vec<f32>>,
3086 chunk_embeddings: Option<Vec<Vec<f32>>>,
3087 mut options: PutOptions,
3088 supersedes: Option<FrameId>,
3089 ) -> Result<u64> {
3090 self.ensure_mutation_allowed()?;
3091
3092 if options.dedup {
3094 if let Some(bytes) = payload {
3095 let content_hash = hash(bytes);
3096 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3097 tracing::debug!(
3099 frame_id = existing_frame.id,
3100 "dedup: skipping ingestion, identical content already exists"
3101 );
3102 return Ok(existing_frame.id);
3104 }
3105 }
3106 }
3107
3108 if payload.is_some() && reuse_frame.is_some() {
3109 let frame_id = reuse_frame
3110 .as_ref()
3111 .map(|frame| frame.id)
3112 .unwrap_or_default();
3113 return Err(MemvidError::InvalidFrame {
3114 frame_id,
3115 reason: "cannot reuse payload when bytes are provided",
3116 });
3117 }
3118
3119 let incoming_dimension = {
3122 let mut dim: Option<u32> = None;
3123
3124 if let Some(ref vector) = embedding {
3125 if !vector.is_empty() {
3126 #[allow(clippy::cast_possible_truncation)]
3127 let len = vector.len() as u32;
3128 dim = Some(len);
3129 }
3130 }
3131
3132 if let Some(ref vectors) = chunk_embeddings {
3133 for vector in vectors {
3134 if vector.is_empty() {
3135 continue;
3136 }
3137 let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3138 match dim {
3139 None => dim = Some(vec_dim),
3140 Some(existing) if existing == vec_dim => {}
3141 Some(existing) => {
3142 return Err(MemvidError::VecDimensionMismatch {
3143 expected: existing,
3144 actual: vector.len(),
3145 });
3146 }
3147 }
3148 }
3149 }
3150
3151 dim
3152 };
3153
3154 if let Some(incoming_dimension) = incoming_dimension {
3155 if !self.vec_enabled {
3157 self.enable_vec()?;
3158 }
3159
3160 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3161 if existing_dimension != incoming_dimension {
3162 return Err(MemvidError::VecDimensionMismatch {
3163 expected: existing_dimension,
3164 actual: incoming_dimension as usize,
3165 });
3166 }
3167 }
3168
3169 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3171 if manifest.dimension == 0 {
3172 manifest.dimension = incoming_dimension;
3173 }
3174 }
3175 }
3176
3177 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3178 let payload_tail = self.payload_region_end();
3179 let projected = if let Some(bytes) = payload {
3180 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3181 let len = prepared.len();
3182 prepared_payload = Some((prepared, encoding, length));
3183 payload_tail.saturating_add(len as u64)
3184 } else if reuse_frame.is_some() {
3185 payload_tail
3186 } else {
3187 return Err(MemvidError::InvalidFrame {
3188 frame_id: 0,
3189 reason: "payload required for frame insertion",
3190 });
3191 };
3192
3193 let capacity_limit = self.capacity_limit();
3194 if projected > capacity_limit {
3195 let incoming_size = projected.saturating_sub(payload_tail);
3196 return Err(MemvidError::CapacityExceeded {
3197 current: payload_tail,
3198 limit: capacity_limit,
3199 required: incoming_size,
3200 });
3201 }
3202 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3203 SystemTime::now()
3204 .duration_since(UNIX_EPOCH)
3205 .map(|d| d.as_secs() as i64)
3206 .unwrap_or(0)
3207 });
3208
3209 #[allow(unused_assignments)]
3210 let mut reuse_bytes: Option<Vec<u8>> = None;
3211 let payload_for_processing = if let Some(bytes) = payload {
3212 Some(bytes)
3213 } else if let Some(frame) = reuse_frame.as_ref() {
3214 let bytes = self.frame_canonical_bytes(frame)?;
3215 reuse_bytes = Some(bytes);
3216 reuse_bytes.as_deref()
3217 } else {
3218 None
3219 };
3220
3221 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3223 (Some(bytes), None) => plan_document_chunks(bytes),
3224 _ => None,
3225 };
3226
3227 let mut source_sha256: Option<[u8; 32]> = None;
3231 let source_path_value = options.source_path.take();
3232
3233 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3234 if raw_chunk_plan.is_some() {
3235 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3237 } else if options.no_raw {
3238 if let Some(bytes) = payload {
3240 let hash_result = hash(bytes);
3242 source_sha256 = Some(*hash_result.as_bytes());
3243 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3245 } else {
3246 return Err(MemvidError::InvalidFrame {
3247 frame_id: 0,
3248 reason: "payload required for --no-raw mode",
3249 });
3250 }
3251 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3252 (prepared, encoding, length, None)
3253 } else if let Some(bytes) = payload {
3254 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3255 (prepared, encoding, length, None)
3256 } else if let Some(frame) = reuse_frame.as_ref() {
3257 (
3258 Vec::new(),
3259 frame.canonical_encoding,
3260 frame.canonical_length,
3261 Some(frame.id),
3262 )
3263 } else {
3264 return Err(MemvidError::InvalidFrame {
3265 frame_id: 0,
3266 reason: "payload required for frame insertion",
3267 });
3268 };
3269
3270 let mut chunk_plan = raw_chunk_plan;
3272
3273 let mut metadata = options.metadata.take();
3274 let mut search_text = options
3275 .search_text
3276 .take()
3277 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3278 let mut tags = std::mem::take(&mut options.tags);
3279 let mut labels = std::mem::take(&mut options.labels);
3280 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3281 let mut content_dates: Vec<String> = Vec::new();
3282
3283 let need_search_text = search_text
3284 .as_ref()
3285 .is_none_or(|text| text.trim().is_empty());
3286 let need_metadata = metadata.is_none();
3287 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3288
3289 let mut extraction_error = None;
3290 let mut is_skim_extraction = false; let extracted = if run_extractor {
3293 if let Some(bytes) = payload_for_processing {
3294 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3295 let uri_hint = options.uri.as_deref();
3296
3297 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3299
3300 if use_budgeted {
3301 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3303 options.extraction_budget_ms,
3304 );
3305 match crate::extract_budgeted::extract_with_budget(
3306 bytes, mime_hint, uri_hint, budget,
3307 ) {
3308 Ok(result) => {
3309 is_skim_extraction = result.is_skim();
3310 if is_skim_extraction {
3311 tracing::debug!(
3312 coverage = result.coverage,
3313 elapsed_ms = result.elapsed_ms,
3314 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3315 "time-budgeted extraction (skim)"
3316 );
3317 }
3318 let doc = crate::extract::ExtractedDocument {
3320 text: if result.text.is_empty() {
3321 None
3322 } else {
3323 Some(result.text)
3324 },
3325 metadata: serde_json::json!({
3326 "skim": is_skim_extraction,
3327 "coverage": result.coverage,
3328 "sections_extracted": result.sections_extracted,
3329 "sections_total": result.sections_total,
3330 }),
3331 mime_type: mime_hint.map(std::string::ToString::to_string),
3332 };
3333 Some(doc)
3334 }
3335 Err(err) => {
3336 tracing::warn!(
3338 ?err,
3339 "budgeted extraction failed, trying full extraction"
3340 );
3341 match extract_via_registry(bytes, mime_hint, uri_hint) {
3342 Ok(doc) => Some(doc),
3343 Err(err) => {
3344 extraction_error = Some(err);
3345 None
3346 }
3347 }
3348 }
3349 }
3350 } else {
3351 match extract_via_registry(bytes, mime_hint, uri_hint) {
3353 Ok(doc) => Some(doc),
3354 Err(err) => {
3355 extraction_error = Some(err);
3356 None
3357 }
3358 }
3359 }
3360 } else {
3361 None
3362 }
3363 } else {
3364 None
3365 };
3366
3367 if let Some(err) = extraction_error {
3368 return Err(err);
3369 }
3370
3371 if let Some(doc) = &extracted {
3372 if need_search_text {
3373 if let Some(text) = &doc.text {
3374 if let Some(normalized) =
3375 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3376 {
3377 search_text = Some(normalized);
3378 }
3379 }
3380 }
3381
3382 if chunk_plan.is_none() {
3385 if let Some(text) = &doc.text {
3386 chunk_plan = plan_text_chunks(text);
3387 }
3388 }
3389
3390 if let Some(mime) = doc.mime_type.as_ref() {
3391 if let Some(existing) = &mut metadata {
3392 if existing.mime.is_none() {
3393 existing.mime = Some(mime.clone());
3394 }
3395 } else {
3396 let mut doc_meta = DocMetadata::default();
3397 doc_meta.mime = Some(mime.clone());
3398 metadata = Some(doc_meta);
3399 }
3400 }
3401
3402 if options.auto_tag {
3405 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3406 extra_metadata
3407 .entry("extractous_metadata".to_string())
3408 .or_insert(meta_json);
3409 }
3410 }
3411 }
3412
3413 if options.auto_tag {
3414 if let Some(ref text) = search_text {
3415 if !text.trim().is_empty() {
3416 let result = AutoTagger.analyse(text, options.extract_dates);
3417 merge_unique(&mut tags, result.tags);
3418 merge_unique(&mut labels, result.labels);
3419 if options.extract_dates && content_dates.is_empty() {
3420 content_dates = result.content_dates;
3421 }
3422 }
3423 }
3424 }
3425
3426 if content_dates.is_empty() {
3427 if let Some(frame) = reuse_frame.as_ref() {
3428 content_dates = frame.content_dates.clone();
3429 }
3430 }
3431
3432 let metadata_ref = metadata.as_ref();
3433 let mut search_text = augment_search_text(
3434 search_text,
3435 options.uri.as_deref(),
3436 options.title.as_deref(),
3437 options.track.as_deref(),
3438 &tags,
3439 &labels,
3440 &extra_metadata,
3441 &content_dates,
3442 metadata_ref,
3443 );
3444 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3445 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3446 let mut parent_chunk_count: Option<u32> = None;
3447
3448 let kind_value = options.kind.take();
3449 let track_value = options.track.take();
3450 let uri_value = options.uri.take();
3451 let title_value = options.title.take();
3452 let should_extract_triplets = options.extract_triplets;
3453 let triplet_uri = uri_value.clone();
3455 let triplet_title = title_value.clone();
3456
3457 if let Some(plan) = chunk_plan.as_ref() {
3458 let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3459 parent_chunk_manifest = Some(plan.manifest.clone());
3460 parent_chunk_count = Some(chunk_total);
3461
3462 if let Some(first_chunk) = plan.chunks.first() {
3463 if let Some(normalized) =
3464 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3465 {
3466 if !normalized.trim().is_empty() {
3467 search_text = Some(normalized);
3468 }
3469 }
3470 }
3471
3472 let chunk_tags = tags.clone();
3473 let chunk_labels = labels.clone();
3474 let chunk_metadata = metadata.clone();
3475 let chunk_extra_metadata = extra_metadata.clone();
3476 let chunk_content_dates = content_dates.clone();
3477
3478 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3479 let (chunk_payload, chunk_encoding, chunk_length) =
3480 prepare_canonical_payload(chunk_text.as_bytes())?;
3481 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3482 .map(|n| n.text)
3483 .filter(|text| !text.trim().is_empty());
3484
3485 let chunk_uri = uri_value
3486 .as_ref()
3487 .map(|uri| format!("{uri}#page-{}", idx + 1));
3488 let chunk_title = title_value
3489 .as_ref()
3490 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3491
3492 let chunk_embedding = chunk_embeddings
3494 .as_ref()
3495 .and_then(|embeddings| embeddings.get(idx).cloned());
3496
3497 chunk_entries.push(WalEntryData {
3498 timestamp,
3499 kind: kind_value.clone(),
3500 track: track_value.clone(),
3501 payload: chunk_payload,
3502 embedding: chunk_embedding,
3503 uri: chunk_uri,
3504 title: chunk_title,
3505 canonical_encoding: chunk_encoding,
3506 canonical_length: chunk_length,
3507 metadata: chunk_metadata.clone(),
3508 search_text: chunk_search_text,
3509 tags: chunk_tags.clone(),
3510 labels: chunk_labels.clone(),
3511 extra_metadata: chunk_extra_metadata.clone(),
3512 content_dates: chunk_content_dates.clone(),
3513 chunk_manifest: None,
3514 role: FrameRole::DocumentChunk,
3515 parent_sequence: None,
3516 chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3517 chunk_count: Some(chunk_total),
3518 op: FrameWalOp::Insert,
3519 target_frame_id: None,
3520 supersedes_frame_id: None,
3521 reuse_payload_from: None,
3522 source_sha256: None, source_path: None,
3524 enrichment_state: crate::types::EnrichmentState::Enriched,
3526 });
3527 }
3528 }
3529
3530 let parent_uri = uri_value.clone();
3531 let parent_title = title_value.clone();
3532
3533 let parent_sequence = if let Some(parent_id) = options.parent_id {
3536 usize::try_from(parent_id)
3541 .ok()
3542 .and_then(|idx| self.toc.frames.get(idx))
3543 .map(|_| parent_id + 2) } else {
3545 None
3546 };
3547
3548 let triplet_text = search_text.clone();
3550
3551 #[cfg(feature = "lex")]
3553 let instant_index_tags = if options.instant_index {
3554 tags.clone()
3555 } else {
3556 Vec::new()
3557 };
3558 #[cfg(feature = "lex")]
3559 let instant_index_labels = if options.instant_index {
3560 labels.clone()
3561 } else {
3562 Vec::new()
3563 };
3564
3565 #[cfg(feature = "lex")]
3567 let needs_enrichment =
3568 options.instant_index && (options.enable_embedding || is_skim_extraction);
3569 #[cfg(feature = "lex")]
3570 let enrichment_state = if needs_enrichment {
3571 crate::types::EnrichmentState::Searchable
3572 } else {
3573 crate::types::EnrichmentState::Enriched
3574 };
3575 #[cfg(not(feature = "lex"))]
3576 let enrichment_state = crate::types::EnrichmentState::Enriched;
3577
3578 let entry = WalEntryData {
3579 timestamp,
3580 kind: kind_value,
3581 track: track_value,
3582 payload: storage_payload,
3583 embedding,
3584 uri: parent_uri,
3585 title: parent_title,
3586 canonical_encoding,
3587 canonical_length,
3588 metadata,
3589 search_text,
3590 tags,
3591 labels,
3592 extra_metadata,
3593 content_dates,
3594 chunk_manifest: parent_chunk_manifest,
3595 role: options.role,
3596 parent_sequence,
3597 chunk_index: None,
3598 chunk_count: parent_chunk_count,
3599 op: FrameWalOp::Insert,
3600 target_frame_id: None,
3601 supersedes_frame_id: supersedes,
3602 reuse_payload_from,
3603 source_sha256,
3604 source_path: source_path_value,
3605 enrichment_state,
3606 };
3607
3608 let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3609 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3610 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3611
3612 #[cfg(feature = "lex")]
3615 if options.instant_index && self.tantivy.is_some() {
3616 let frame_id = parent_seq as FrameId;
3618
3619 if let Some(ref text) = triplet_text {
3621 if !text.trim().is_empty() {
3622 let temp_frame = Frame {
3624 id: frame_id,
3625 timestamp,
3626 anchor_ts: None,
3627 anchor_source: None,
3628 kind: options.kind.clone(),
3629 track: options.track.clone(),
3630 payload_offset: 0,
3631 payload_length: 0,
3632 checksum: [0u8; 32],
3633 uri: options
3634 .uri
3635 .clone()
3636 .or_else(|| Some(crate::default_uri(frame_id))),
3637 title: options.title.clone(),
3638 canonical_encoding: crate::types::CanonicalEncoding::default(),
3639 canonical_length: None,
3640 metadata: None, search_text: triplet_text.clone(),
3642 tags: instant_index_tags.clone(),
3643 labels: instant_index_labels.clone(),
3644 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3647 role: options.role,
3648 parent_id: None,
3649 chunk_index: None,
3650 chunk_count: None,
3651 status: FrameStatus::Active,
3652 supersedes,
3653 superseded_by: None,
3654 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3657 };
3658
3659 if let Some(engine) = self.tantivy.as_mut() {
3661 engine.add_frame(&temp_frame, text)?;
3662 engine.soft_commit()?;
3663 self.tantivy_dirty = true;
3664
3665 tracing::debug!(
3666 frame_id = frame_id,
3667 "instant index: frame searchable immediately"
3668 );
3669 }
3670 }
3671 }
3672 }
3673
3674 #[cfg(feature = "lex")]
3678 if needs_enrichment {
3679 let frame_id = parent_seq as FrameId;
3680 self.toc.enrichment_queue.push(frame_id);
3681 tracing::debug!(
3682 frame_id = frame_id,
3683 is_skim = is_skim_extraction,
3684 needs_embedding = options.enable_embedding,
3685 "queued frame for background enrichment"
3686 );
3687 }
3688
3689 for mut chunk_entry in chunk_entries {
3690 chunk_entry.parent_sequence = Some(parent_seq);
3691 let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3692 self.append_wal_entry(&chunk_bytes)?;
3693 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3694 }
3695
3696 self.dirty = true;
3697 if self.wal.should_checkpoint() {
3698 self.commit()?;
3699 }
3700
3701 #[cfg(feature = "replay")]
3703 if let Some(input_bytes) = payload {
3704 self.record_put_action(parent_seq, input_bytes);
3705 }
3706
3707 if should_extract_triplets {
3710 if let Some(ref text) = triplet_text {
3711 if !text.trim().is_empty() {
3712 let extractor = TripletExtractor::default();
3713 let frame_id = parent_seq as FrameId;
3714 let (cards, _stats) = extractor.extract(
3715 frame_id,
3716 text,
3717 triplet_uri.as_deref(),
3718 triplet_title.as_deref(),
3719 timestamp,
3720 );
3721
3722 if !cards.is_empty() {
3723 let card_ids = self.memories_track.add_cards(cards);
3725
3726 self.memories_track
3728 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3729 }
3730 }
3731 }
3732 }
3733
3734 Ok(parent_seq)
3735 }
3736}
3737
3738#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3739#[serde(rename_all = "snake_case")]
3740pub(crate) enum FrameWalOp {
3741 Insert,
3742 Tombstone,
3743}
3744
3745impl Default for FrameWalOp {
3746 fn default() -> Self {
3747 Self::Insert
3748 }
3749}
3750
3751#[derive(Debug, Serialize, Deserialize)]
3752enum WalEntry {
3753 Frame(WalEntryData),
3754 #[cfg(feature = "lex")]
3755 Lex(LexWalBatch),
3756}
3757
3758fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3759 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3760 return Ok(entry);
3761 }
3762 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3763 Ok(WalEntry::Frame(legacy))
3764}
3765
3766#[derive(Debug, Serialize, Deserialize)]
3767pub(crate) struct WalEntryData {
3768 pub(crate) timestamp: i64,
3769 pub(crate) kind: Option<String>,
3770 pub(crate) track: Option<String>,
3771 pub(crate) payload: Vec<u8>,
3772 pub(crate) embedding: Option<Vec<f32>>,
3773 #[serde(default)]
3774 pub(crate) uri: Option<String>,
3775 #[serde(default)]
3776 pub(crate) title: Option<String>,
3777 #[serde(default)]
3778 pub(crate) canonical_encoding: CanonicalEncoding,
3779 #[serde(default)]
3780 pub(crate) canonical_length: Option<u64>,
3781 #[serde(default)]
3782 pub(crate) metadata: Option<DocMetadata>,
3783 #[serde(default)]
3784 pub(crate) search_text: Option<String>,
3785 #[serde(default)]
3786 pub(crate) tags: Vec<String>,
3787 #[serde(default)]
3788 pub(crate) labels: Vec<String>,
3789 #[serde(default)]
3790 pub(crate) extra_metadata: BTreeMap<String, String>,
3791 #[serde(default)]
3792 pub(crate) content_dates: Vec<String>,
3793 #[serde(default)]
3794 pub(crate) chunk_manifest: Option<TextChunkManifest>,
3795 #[serde(default)]
3796 pub(crate) role: FrameRole,
3797 #[serde(default)]
3798 pub(crate) parent_sequence: Option<u64>,
3799 #[serde(default)]
3800 pub(crate) chunk_index: Option<u32>,
3801 #[serde(default)]
3802 pub(crate) chunk_count: Option<u32>,
3803 #[serde(default)]
3804 pub(crate) op: FrameWalOp,
3805 #[serde(default)]
3806 pub(crate) target_frame_id: Option<FrameId>,
3807 #[serde(default)]
3808 pub(crate) supersedes_frame_id: Option<FrameId>,
3809 #[serde(default)]
3810 pub(crate) reuse_payload_from: Option<FrameId>,
3811 #[serde(default)]
3813 pub(crate) source_sha256: Option<[u8; 32]>,
3814 #[serde(default)]
3816 pub(crate) source_path: Option<String>,
3817 #[serde(default)]
3819 pub(crate) enrichment_state: crate::types::EnrichmentState,
3820}
3821
3822pub(crate) fn prepare_canonical_payload(
3823 payload: &[u8],
3824) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3825 if std::str::from_utf8(payload).is_ok() {
3826 let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3827 Ok((
3828 compressed,
3829 CanonicalEncoding::Zstd,
3830 Some(payload.len() as u64),
3831 ))
3832 } else {
3833 Ok((
3834 payload.to_vec(),
3835 CanonicalEncoding::Plain,
3836 Some(payload.len() as u64),
3837 ))
3838 }
3839}
3840
3841pub(crate) fn augment_search_text(
3842 base: Option<String>,
3843 uri: Option<&str>,
3844 title: Option<&str>,
3845 track: Option<&str>,
3846 tags: &[String],
3847 labels: &[String],
3848 extra_metadata: &BTreeMap<String, String>,
3849 content_dates: &[String],
3850 metadata: Option<&DocMetadata>,
3851) -> Option<String> {
3852 let mut segments: Vec<String> = Vec::new();
3853 if let Some(text) = base {
3854 let trimmed = text.trim();
3855 if !trimmed.is_empty() {
3856 segments.push(trimmed.to_string());
3857 }
3858 }
3859
3860 if let Some(title) = title {
3861 if !title.trim().is_empty() {
3862 segments.push(format!("title: {}", title.trim()));
3863 }
3864 }
3865
3866 if let Some(uri) = uri {
3867 if !uri.trim().is_empty() {
3868 segments.push(format!("uri: {}", uri.trim()));
3869 }
3870 }
3871
3872 if let Some(track) = track {
3873 if !track.trim().is_empty() {
3874 segments.push(format!("track: {}", track.trim()));
3875 }
3876 }
3877
3878 if !tags.is_empty() {
3879 segments.push(format!("tags: {}", tags.join(" ")));
3880 }
3881
3882 if !labels.is_empty() {
3883 segments.push(format!("labels: {}", labels.join(" ")));
3884 }
3885
3886 if !extra_metadata.is_empty() {
3887 for (key, value) in extra_metadata {
3888 if value.trim().is_empty() {
3889 continue;
3890 }
3891 segments.push(format!("{key}: {value}"));
3892 }
3893 }
3894
3895 if !content_dates.is_empty() {
3896 segments.push(format!("dates: {}", content_dates.join(" ")));
3897 }
3898
3899 if let Some(meta) = metadata {
3900 if let Ok(meta_json) = serde_json::to_string(meta) {
3901 segments.push(format!("metadata: {meta_json}"));
3902 }
3903 }
3904
3905 if segments.is_empty() {
3906 None
3907 } else {
3908 Some(segments.join("\n"))
3909 }
3910}
3911
3912pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3913 if additions.is_empty() {
3914 return;
3915 }
3916 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3917 for value in additions {
3918 let trimmed = value.trim();
3919 if trimmed.is_empty() {
3920 continue;
3921 }
3922 let candidate = trimmed.to_string();
3923 if seen.insert(candidate.clone()) {
3924 target.push(candidate);
3925 }
3926 }
3927}