1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58 PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 #[must_use]
210 pub fn new(mode: CommitMode) -> Self {
211 Self {
212 mode,
213 background: false,
214 }
215 }
216
217 #[must_use]
218 pub fn background(mut self, background: bool) -> Self {
219 self.background = background;
220 self
221 }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226 REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230 mime: Option<&str>,
231 magic: Option<&[u8]>,
232 uri: Option<&str>,
233) -> Option<DocumentFormat> {
234 if detect_pdf_magic(magic) {
236 return Some(DocumentFormat::Pdf);
237 }
238
239 if let Some(magic_bytes) = magic {
242 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243 if let Some(format) = infer_format_from_extension(uri) {
245 return Some(format);
246 }
247 }
248 }
249
250 if let Some(mime_str) = mime {
252 let mime_lower = mime_str.trim().to_ascii_lowercase();
253 let format = match mime_lower.as_str() {
254 "application/pdf" => Some(DocumentFormat::Pdf),
255 "text/plain" => Some(DocumentFormat::PlainText),
256 "text/markdown" => Some(DocumentFormat::Markdown),
257 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259 Some(DocumentFormat::Docx)
260 }
261 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262 Some(DocumentFormat::Xlsx)
263 }
264 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266 Some(DocumentFormat::Pptx)
267 }
268 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269 _ => None,
270 };
271 if format.is_some() {
272 return format;
273 }
274 }
275
276 infer_format_from_extension(uri)
278}
279
280fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282 let uri = uri?;
283 let path = std::path::Path::new(uri);
284 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285 match ext.as_str() {
286 "pdf" => Some(DocumentFormat::Pdf),
287 "docx" => Some(DocumentFormat::Docx),
288 "xlsx" => Some(DocumentFormat::Xlsx),
289 "xls" => Some(DocumentFormat::Xls),
290 "pptx" => Some(DocumentFormat::Pptx),
291 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294 | "sql" => Some(DocumentFormat::PlainText),
295 "md" | "markdown" => Some(DocumentFormat::Markdown),
296 "html" | "htm" => Some(DocumentFormat::Html),
297 _ => None,
298 }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302 let mut slice = match magic {
303 Some(slice) if !slice.is_empty() => slice,
304 _ => return false,
305 };
306 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307 slice = &slice[3..];
308 }
309 while let Some((first, rest)) = slice.split_first() {
310 if first.is_ascii_whitespace() {
311 slice = rest;
312 } else {
313 break;
314 }
315 }
316 slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320 target = "memvid::extract",
321 skip_all,
322 fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325 bytes: &[u8],
326 mime_hint: Option<&str>,
327 uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329 let registry = default_reader_registry();
330 let magic = bytes
331 .get(..MAGIC_SNIFF_BYTES)
332 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334 .with_uri(uri)
335 .with_magic(magic);
336
337 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338 let start = Instant::now();
339 match reader.extract(bytes, &hint) {
340 Ok(output) => {
341 return Ok(finalize_reader_output(output, start));
342 }
343 Err(err) => {
344 tracing::error!(
345 target = "memvid::extract",
346 reader = reader.name(),
347 error = %err,
348 "reader failed; falling back"
349 );
350 Some(format!("reader {} failed: {err}", reader.name()))
351 }
352 }
353 } else {
354 tracing::debug!(
355 target = "memvid::extract",
356 format = hint.format.map(super::super::reader::DocumentFormat::label),
357 "no reader matched; using default extractor"
358 );
359 Some("no registered reader matched; using default extractor".to_string())
360 };
361
362 let start = Instant::now();
363 let mut output = PassthroughReader.extract(bytes, &hint)?;
364 if let Some(reason) = fallback_reason {
365 output.diagnostics.track_warning(reason);
366 }
367 Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371 let elapsed = start.elapsed();
372 let ReaderOutput {
373 document,
374 reader_name,
375 diagnostics,
376 } = output;
377 log_reader_result(&reader_name, &diagnostics, elapsed);
378 document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382 let duration_ms = diagnostics
383 .duration_ms
384 .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385 let warnings = diagnostics.warnings.len();
386 let pages = diagnostics.pages_processed;
387
388 if warnings > 0 || diagnostics.fallback {
389 tracing::warn!(
390 target = "memvid::extract",
391 reader,
392 duration_ms,
393 pages,
394 warnings,
395 fallback = diagnostics.fallback,
396 "extraction completed with warnings"
397 );
398 for warning in &diagnostics.warnings {
399 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400 }
401 } else {
402 tracing::info!(
403 target = "memvid::extract",
404 reader,
405 duration_ms,
406 pages,
407 "extraction completed"
408 );
409 }
410}
411
412impl Memvid {
413 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416 where
417 F: FnOnce(&mut Self) -> Result<()>,
418 {
419 self.file.sync_all()?;
420 let mut staging = CommitStaging::prepare(self.path())?;
421 staging.copy_from(&self.file)?;
422
423 let staging_handle = staging.clone_file()?;
424 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425 let original_file = std::mem::replace(&mut self.file, staging_handle);
426 let original_wal = std::mem::replace(&mut self.wal, new_wal);
427 let original_header = self.header.clone();
428 let original_toc = self.toc.clone();
429 let original_data_end = self.data_end;
430 let original_generation = self.generation;
431 let original_dirty = self.dirty;
432 let original_lex_enabled = self.lex_enabled;
433 #[cfg(feature = "lex")]
434 let original_tantivy_dirty = self.tantivy_dirty;
435
436 let destination_path = self.path().to_path_buf();
437 let mut original_file = Some(original_file);
438 let mut original_wal = Some(original_wal);
439
440 match op(self) {
441 Ok(()) => {
442 self.file.sync_all()?;
443 match staging.commit() {
444 Ok(()) => {
445 drop(original_file.take());
446 drop(original_wal.take());
447 self.file = OpenOptions::new()
448 .read(true)
449 .write(true)
450 .open(&destination_path)?;
451 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
452 Ok(())
453 }
454 Err(commit_err) => {
455 if let Some(file) = original_file.take() {
456 self.file = file;
457 }
458 if let Some(wal) = original_wal.take() {
459 self.wal = wal;
460 }
461 self.header = original_header;
462 self.toc = original_toc;
463 self.data_end = original_data_end;
464 self.generation = original_generation;
465 self.dirty = original_dirty;
466 self.lex_enabled = original_lex_enabled;
467 #[cfg(feature = "lex")]
468 {
469 self.tantivy_dirty = original_tantivy_dirty;
470 }
471 Err(commit_err)
472 }
473 }
474 }
475 Err(err) => {
476 let _ = staging.discard();
477 if let Some(file) = original_file.take() {
478 self.file = file;
479 }
480 if let Some(wal) = original_wal.take() {
481 self.wal = wal;
482 }
483 self.header = original_header;
484 self.toc = original_toc;
485 self.data_end = original_data_end;
486 self.generation = original_generation;
487 self.dirty = original_dirty;
488 self.lex_enabled = original_lex_enabled;
489 #[cfg(feature = "lex")]
490 {
491 self.tantivy_dirty = original_tantivy_dirty;
492 }
493 Err(err)
494 }
495 }
496 }
497
498 pub(crate) fn catalog_data_end(&self) -> u64 {
499 let mut max_end = self.header.wal_offset + self.header.wal_size;
500
501 for descriptor in &self.toc.segment_catalog.lex_segments {
502 if descriptor.common.bytes_length == 0 {
503 continue;
504 }
505 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
506 }
507
508 for descriptor in &self.toc.segment_catalog.vec_segments {
509 if descriptor.common.bytes_length == 0 {
510 continue;
511 }
512 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
513 }
514
515 for descriptor in &self.toc.segment_catalog.time_segments {
516 if descriptor.common.bytes_length == 0 {
517 continue;
518 }
519 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
520 }
521
522 #[cfg(feature = "temporal_track")]
523 for descriptor in &self.toc.segment_catalog.temporal_segments {
524 if descriptor.common.bytes_length == 0 {
525 continue;
526 }
527 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
528 }
529
530 #[cfg(feature = "lex")]
531 for descriptor in &self.toc.segment_catalog.tantivy_segments {
532 if descriptor.common.bytes_length == 0 {
533 continue;
534 }
535 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
536 }
537
538 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
539 if manifest.bytes_length != 0 {
540 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
541 }
542 }
543
544 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
545 if manifest.bytes_length != 0 {
546 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
547 }
548 }
549
550 if let Some(manifest) = self.toc.time_index.as_ref() {
551 if manifest.bytes_length != 0 {
552 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
553 }
554 }
555
556 #[cfg(feature = "temporal_track")]
557 if let Some(track) = self.toc.temporal_track.as_ref() {
558 if track.bytes_length != 0 {
559 max_end = max_end.max(track.bytes_offset + track.bytes_length);
560 }
561 }
562
563 max_end
564 }
565
566 fn payload_region_end(&self) -> u64 {
567 self.cached_payload_end
568 }
569
570 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
571 loop {
572 match self.wal.append_entry(payload) {
573 Ok(seq) => return Ok(seq),
574 Err(MemvidError::CheckpointFailed { reason })
575 if reason == "embedded WAL region too small for entry"
576 || reason == "embedded WAL region full" =>
577 {
578 let required = WAL_ENTRY_HEADER_SIZE
581 .saturating_add(payload.len() as u64)
582 .max(self.header.wal_size + 1);
583 self.grow_wal_region(required)?;
584 }
585 Err(err) => return Err(err),
586 }
587 }
588 }
589
590 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
591 let mut new_size = self.header.wal_size;
592 let mut target = required_entry_size;
593 if target == 0 {
594 target = self.header.wal_size;
595 }
596 while new_size <= target {
597 new_size = new_size
598 .checked_mul(2)
599 .ok_or_else(|| MemvidError::CheckpointFailed {
600 reason: "wal_size overflow".into(),
601 })?;
602 }
603 let delta = new_size - self.header.wal_size;
604 if delta == 0 {
605 return Ok(());
606 }
607
608 self.shift_data_for_wal_growth(delta)?;
609 self.header.wal_size = new_size;
610 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
611 self.data_end = self.data_end.saturating_add(delta);
612 self.adjust_offsets_after_wal_growth(delta);
613
614 let catalog_end = self.catalog_data_end();
615 self.header.footer_offset = catalog_end
616 .max(self.header.footer_offset)
617 .max(self.data_end);
618
619 self.rewrite_toc_footer()?;
620 self.header.toc_checksum = self.toc.toc_checksum;
621 crate::persist_header(&mut self.file, &self.header)?;
622 self.file.sync_all()?;
623 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
624 Ok(())
625 }
626
627 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
628 if delta == 0 {
629 return Ok(());
630 }
631 let original_len = self.file.metadata()?.len();
632 let data_start = self.header.wal_offset + self.header.wal_size;
633 self.file.set_len(original_len + delta)?;
634
635 let mut remaining = original_len.saturating_sub(data_start);
636 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
637 while remaining > 0 {
638 let chunk = min(remaining, buffer.len() as u64);
639 let src = data_start + remaining - chunk;
640 self.file.seek(SeekFrom::Start(src))?;
641 #[allow(clippy::cast_possible_truncation)]
642 self.file.read_exact(&mut buffer[..chunk as usize])?;
643 let dst = src + delta;
644 self.file.seek(SeekFrom::Start(dst))?;
645 #[allow(clippy::cast_possible_truncation)]
646 self.file.write_all(&buffer[..chunk as usize])?;
647 remaining -= chunk;
648 }
649
650 self.file.seek(SeekFrom::Start(data_start))?;
651 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
652 let mut remaining = delta;
653 while remaining > 0 {
654 let write = min(remaining, zero_buf.len() as u64);
655 #[allow(clippy::cast_possible_truncation)]
656 self.file.write_all(&zero_buf[..write as usize])?;
657 remaining -= write;
658 }
659 Ok(())
660 }
661
662 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
663 if delta == 0 {
664 return;
665 }
666
667 for frame in &mut self.toc.frames {
668 if frame.payload_offset != 0 {
669 frame.payload_offset += delta;
670 }
671 }
672
673 for segment in &mut self.toc.segments {
674 if segment.bytes_offset != 0 {
675 segment.bytes_offset += delta;
676 }
677 }
678
679 if let Some(lex) = self.toc.indexes.lex.as_mut() {
680 if lex.bytes_offset != 0 {
681 lex.bytes_offset += delta;
682 }
683 }
684 for manifest in &mut self.toc.indexes.lex_segments {
685 if manifest.bytes_offset != 0 {
686 manifest.bytes_offset += delta;
687 }
688 }
689 if let Some(vec) = self.toc.indexes.vec.as_mut() {
690 if vec.bytes_offset != 0 {
691 vec.bytes_offset += delta;
692 }
693 }
694 if let Some(time_index) = self.toc.time_index.as_mut() {
695 if time_index.bytes_offset != 0 {
696 time_index.bytes_offset += delta;
697 }
698 }
699 #[cfg(feature = "temporal_track")]
700 if let Some(track) = self.toc.temporal_track.as_mut() {
701 if track.bytes_offset != 0 {
702 track.bytes_offset += delta;
703 }
704 }
705
706 let catalog = &mut self.toc.segment_catalog;
707 for descriptor in &mut catalog.lex_segments {
708 if descriptor.common.bytes_offset != 0 {
709 descriptor.common.bytes_offset += delta;
710 }
711 }
712 for descriptor in &mut catalog.vec_segments {
713 if descriptor.common.bytes_offset != 0 {
714 descriptor.common.bytes_offset += delta;
715 }
716 }
717 for descriptor in &mut catalog.time_segments {
718 if descriptor.common.bytes_offset != 0 {
719 descriptor.common.bytes_offset += delta;
720 }
721 }
722 #[cfg(feature = "temporal_track")]
723 for descriptor in &mut catalog.temporal_segments {
724 if descriptor.common.bytes_offset != 0 {
725 descriptor.common.bytes_offset += delta;
726 }
727 }
728 for descriptor in &mut catalog.tantivy_segments {
729 if descriptor.common.bytes_offset != 0 {
730 descriptor.common.bytes_offset += delta;
731 }
732 }
733
734 #[cfg(feature = "lex")]
735 if let Ok(mut storage) = self.lex_storage.write() {
736 storage.adjust_offsets(delta);
737 }
738 }
739 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
740 self.ensure_writable()?;
741 if options.background {
742 tracing::debug!("commit background flag ignored; running synchronously");
743 }
744 let mode = options.mode;
745 let records = self.wal.pending_records()?;
746 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
747 return Ok(());
748 }
749 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
750 }
751
752 pub fn commit(&mut self) -> Result<()> {
753 self.ensure_writable()?;
754 self.commit_with_options(CommitOptions::new(CommitMode::Full))
755 }
756
757 pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
768 if opts.wal_pre_size_bytes > 0 {
769 self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
770 }
771 self.wal.set_skip_sync(opts.skip_sync);
772 self.batch_opts = Some(opts);
773 Ok(())
774 }
775
776 fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
785 if min_bytes <= self.header.wal_size {
786 return Ok(());
787 }
788 let target = min_bytes.next_power_of_two();
790 let delta = target.saturating_sub(self.header.wal_size);
791 if delta == 0 {
792 return Ok(());
793 }
794
795 tracing::info!(
796 current_wal = self.header.wal_size,
797 target_wal = target,
798 delta,
799 "pre-sizing WAL for batch mode"
800 );
801
802 self.shift_data_for_wal_growth(delta)?;
803 self.header.wal_size = target;
804 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
805 self.data_end = self.data_end.saturating_add(delta);
806 self.adjust_offsets_after_wal_growth(delta);
807
808 let catalog_end = self.catalog_data_end();
809 self.header.footer_offset = catalog_end
810 .max(self.header.footer_offset)
811 .max(self.data_end);
812
813 self.rewrite_toc_footer()?;
814 self.header.toc_checksum = self.toc.toc_checksum;
815 crate::persist_header(&mut self.file, &self.header)?;
816 self.file.sync_all()?;
817 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
818 Ok(())
819 }
820
821 pub fn end_batch(&mut self) -> Result<()> {
826 self.wal.flush()?;
828 self.wal.set_skip_sync(false);
829 self.batch_opts = None;
830 Ok(())
831 }
832
833 pub fn commit_skip_indexes(&mut self) -> Result<()> {
842 self.ensure_writable()?;
843 let records = self.wal.pending_records()?;
844 if records.is_empty() && !self.dirty {
845 return Ok(());
846 }
847 self.commit_skip_indexes_inner(records)
848 }
849
850 fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
851 self.generation = self.generation.wrapping_add(1);
852
853 #[cfg(feature = "lex")]
856 let tantivy_backup = self.tantivy.take();
857
858 let result = self.apply_records(records);
859
860 #[cfg(feature = "lex")]
862 {
863 self.tantivy = tantivy_backup;
864 self.tantivy_dirty = false;
865 }
866
867 let _delta = result?;
868
869 self.header.footer_offset = self.data_end;
871
872 self.toc.time_index = None;
875 self.toc.indexes.lex_segments.clear();
876 if let Some(vec) = self.toc.indexes.vec.as_mut() {
878 vec.bytes_offset = 0;
879 vec.bytes_length = 0;
880 vec.vector_count = 0;
881 }
882 self.toc.indexes.lex = None;
883 self.toc.indexes.clip = None;
884 self.toc.segment_catalog.lex_segments.clear();
885 self.toc.segment_catalog.vec_segments.clear();
886 self.toc.segment_catalog.time_segments.clear();
887 self.toc.segment_catalog.tantivy_segments.clear();
888 #[cfg(feature = "temporal_track")]
889 {
890 self.toc.temporal_track = None;
891 self.toc.segment_catalog.temporal_segments.clear();
892 }
893 self.toc.memories_track = None;
894 self.toc.logic_mesh = None;
895 self.toc.sketch_track = None;
896
897 self.rewrite_toc_footer()?;
898 self.header.toc_checksum = self.toc.toc_checksum;
899 self.wal.record_checkpoint(&mut self.header)?;
900 self.header.toc_checksum = self.toc.toc_checksum;
901 crate::persist_header(&mut self.file, &self.header)?;
902 self.file.sync_all()?;
903 self.pending_frame_inserts = 0;
904 self.dirty = false;
905 Ok(())
906 }
907
908 pub fn finalize_indexes(&mut self) -> Result<()> {
914 self.ensure_writable()?;
915 self.rebuild_indexes(&[], &[])?;
916 self.rewrite_toc_footer()?;
917 self.header.toc_checksum = self.toc.toc_checksum;
918 crate::persist_header(&mut self.file, &self.header)?;
919 self.file.sync_all()?;
920 Ok(())
921 }
922
923 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
924 self.generation = self.generation.wrapping_add(1);
925
926 let delta = self.apply_records(records)?;
927 let mut indexes_rebuilt = false;
928
929 let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
931
932 if !delta.is_empty() || clip_needs_persist {
933 tracing::debug!(
934 inserted_frames = delta.inserted_frames.len(),
935 inserted_embeddings = delta.inserted_embeddings.len(),
936 inserted_time_entries = delta.inserted_time_entries.len(),
937 clip_needs_persist = clip_needs_persist,
938 "commit applied delta"
939 );
940 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
941 indexes_rebuilt = true;
942 }
943
944 if !indexes_rebuilt && self.tantivy_index_pending() {
945 self.flush_tantivy()?;
946 }
947
948 if !indexes_rebuilt && self.clip_enabled {
950 if let Some(ref clip_index) = self.clip_index {
951 if !clip_index.is_empty() {
952 self.persist_clip_index()?;
953 }
954 }
955 }
956
957 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
959 self.persist_memories_track()?;
960 }
961
962 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
964 self.persist_logic_mesh()?;
965 }
966
967 if !self.sketch_track.is_empty() {
969 self.persist_sketch_track()?;
970 }
971
972 self.rewrite_toc_footer()?;
976 self.header.toc_checksum = self.toc.toc_checksum;
977 self.wal.record_checkpoint(&mut self.header)?;
978 self.header.toc_checksum = self.toc.toc_checksum;
979 crate::persist_header(&mut self.file, &self.header)?;
980 self.file.sync_all()?;
981 #[cfg(feature = "parallel_segments")]
982 if let Some(wal) = self.manifest_wal.as_mut() {
983 wal.flush()?;
984 wal.truncate()?;
985 }
986 self.pending_frame_inserts = 0;
987 self.dirty = false;
988 Ok(())
989 }
990
991 #[cfg(feature = "parallel_segments")]
992 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
993 self.ensure_writable()?;
994 if !self.dirty && !self.tantivy_index_pending() {
995 return Ok(());
996 }
997 let opts = opts.clone();
998 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
999 }
1000
1001 #[cfg(feature = "parallel_segments")]
1002 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1003 if !self.dirty && !self.tantivy_index_pending() {
1004 return Ok(());
1005 }
1006 let records = self.wal.pending_records()?;
1007 let delta = self.apply_records(records)?;
1008 self.generation = self.generation.wrapping_add(1);
1009 let mut indexes_rebuilt = false;
1010 if !delta.is_empty() {
1011 tracing::info!(
1012 inserted_frames = delta.inserted_frames.len(),
1013 inserted_embeddings = delta.inserted_embeddings.len(),
1014 inserted_time_entries = delta.inserted_time_entries.len(),
1015 "parallel commit applied delta"
1016 );
1017 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1019 tracing::info!(
1020 "parallel_commit: used_parallel={}, lex_enabled={}",
1021 used_parallel,
1022 self.lex_enabled
1023 );
1024 if used_parallel {
1025 self.header.footer_offset = self.data_end;
1028 indexes_rebuilt = true;
1029
1030 #[cfg(feature = "lex")]
1033 if self.lex_enabled {
1034 tracing::info!(
1035 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1036 delta.inserted_frames.len(),
1037 self.toc.frames.len()
1038 );
1039
1040 let tantivy_was_present = self.tantivy.is_some();
1044 if self.tantivy.is_none() {
1045 self.init_tantivy()?;
1046 }
1047
1048 if tantivy_was_present {
1050 tracing::info!(
1051 "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1052 );
1053 } else {
1054 let max_payload = crate::memvid::search::max_index_payload();
1056 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1057
1058 for frame_id in &delta.inserted_frames {
1059 let frame = match self.toc.frames.get(*frame_id as usize) {
1061 Some(f) => f.clone(),
1062 None => continue,
1063 };
1064
1065 let explicit_text = frame.search_text.clone();
1067 if let Some(ref search_text) = explicit_text {
1068 if !search_text.trim().is_empty() {
1069 prepared_docs.push((frame, search_text.clone()));
1070 continue;
1071 }
1072 }
1073
1074 let mime = frame
1076 .metadata
1077 .as_ref()
1078 .and_then(|m| m.mime.as_deref())
1079 .unwrap_or("application/octet-stream");
1080
1081 if !crate::memvid::search::is_text_indexable_mime(mime) {
1082 continue;
1083 }
1084
1085 if frame.payload_length > max_payload {
1086 continue;
1087 }
1088
1089 let text = self.frame_search_text(&frame)?;
1090 if !text.trim().is_empty() {
1091 prepared_docs.push((frame, text));
1092 }
1093 }
1094
1095 if let Some(ref mut engine) = self.tantivy {
1097 for (frame, text) in &prepared_docs {
1098 engine.add_frame(frame, text)?;
1099 }
1100
1101 if !prepared_docs.is_empty() {
1102 engine.commit()?;
1103 self.tantivy_dirty = true;
1104 }
1105
1106 tracing::info!(
1107 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1108 prepared_docs.len(),
1109 engine.num_docs()
1110 );
1111 } else {
1112 tracing::warn!(
1113 "parallel_commit: Tantivy engine is None after init_tantivy"
1114 );
1115 }
1116 } }
1118
1119 self.file.seek(SeekFrom::Start(self.data_end))?;
1122 let mut time_entries: Vec<TimeIndexEntry> = self
1123 .toc
1124 .frames
1125 .iter()
1126 .filter(|frame| {
1127 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1128 })
1129 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1130 .collect();
1131 let (ti_offset, ti_length, ti_checksum) =
1132 time_index_append(&mut self.file, &mut time_entries)?;
1133 self.toc.time_index = Some(TimeIndexManifest {
1134 bytes_offset: ti_offset,
1135 bytes_length: ti_length,
1136 entry_count: time_entries.len() as u64,
1137 checksum: ti_checksum,
1138 });
1139 self.data_end = ti_offset + ti_length;
1141 self.header.footer_offset = self.data_end;
1142 tracing::info!(
1143 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1144 ti_offset,
1145 ti_length,
1146 time_entries.len()
1147 );
1148 } else {
1149 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1151 indexes_rebuilt = true;
1152 }
1153 }
1154
1155 #[cfg(feature = "lex")]
1157 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1158 self.flush_tantivy()?;
1159 }
1160
1161 if self.clip_enabled {
1163 if let Some(ref clip_index) = self.clip_index {
1164 if !clip_index.is_empty() {
1165 self.persist_clip_index()?;
1166 }
1167 }
1168 }
1169
1170 if self.memories_track.card_count() > 0 {
1172 self.persist_memories_track()?;
1173 }
1174
1175 if !self.logic_mesh.is_empty() {
1177 self.persist_logic_mesh()?;
1178 }
1179
1180 if !self.sketch_track.is_empty() {
1182 self.persist_sketch_track()?;
1183 }
1184
1185 self.rewrite_toc_footer()?;
1188 self.header.toc_checksum = self.toc.toc_checksum;
1189 self.wal.record_checkpoint(&mut self.header)?;
1190 self.header.toc_checksum = self.toc.toc_checksum;
1191 crate::persist_header(&mut self.file, &self.header)?;
1192 self.file.sync_all()?;
1193 if let Some(wal) = self.manifest_wal.as_mut() {
1194 wal.flush()?;
1195 wal.truncate()?;
1196 }
1197 self.pending_frame_inserts = 0;
1198 self.dirty = false;
1199 Ok(())
1200 }
1201
1202 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1203 let records = self.wal.records_after(self.header.wal_sequence)?;
1204 if records.is_empty() {
1205 if self.tantivy_index_pending() {
1206 self.flush_tantivy()?;
1207 }
1208 return Ok(());
1209 }
1210 let delta = self.apply_records(records)?;
1211 if !delta.is_empty() {
1212 tracing::debug!(
1213 inserted_frames = delta.inserted_frames.len(),
1214 inserted_embeddings = delta.inserted_embeddings.len(),
1215 inserted_time_entries = delta.inserted_time_entries.len(),
1216 "recover applied delta"
1217 );
1218 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1219 } else if self.tantivy_index_pending() {
1220 self.flush_tantivy()?;
1221 }
1222 self.wal.record_checkpoint(&mut self.header)?;
1223 crate::persist_header(&mut self.file, &self.header)?;
1224 if !delta.is_empty() {
1225 } else if self.tantivy_index_pending() {
1227 self.flush_tantivy()?;
1228 crate::persist_header(&mut self.file, &self.header)?;
1229 }
1230 self.file.sync_all()?;
1231 self.pending_frame_inserts = 0;
1232 self.dirty = false;
1233 Ok(())
1234 }
1235
1236 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1237 let mut delta = IngestionDelta::default();
1238 if records.is_empty() {
1239 return Ok(delta);
1240 }
1241
1242 let mut data_cursor = self.data_end;
1247 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1248
1249 if !records.is_empty() {
1250 self.file.seek(SeekFrom::Start(data_cursor))?;
1251 for record in records {
1252 let mut entry = match decode_wal_entry(&record.payload)? {
1253 WalEntry::Frame(entry) => entry,
1254 #[cfg(feature = "lex")]
1255 WalEntry::Lex(batch) => {
1256 self.apply_lex_wal(batch)?;
1257 continue;
1258 }
1259 };
1260
1261 match entry.op {
1262 FrameWalOp::Insert => {
1263 let frame_id = self.toc.frames.len() as u64;
1264
1265 let (
1266 payload_offset,
1267 payload_length,
1268 checksum_bytes,
1269 canonical_length_value,
1270 ) = if let Some(source_id) = entry.reuse_payload_from {
1271 if !entry.payload.is_empty() {
1272 return Err(MemvidError::InvalidFrame {
1273 frame_id: source_id,
1274 reason: "reused payload entry contained inline bytes",
1275 });
1276 }
1277 let source_idx = usize::try_from(source_id).map_err(|_| {
1278 MemvidError::InvalidFrame {
1279 frame_id: source_id,
1280 reason: "frame id too large for memory",
1281 }
1282 })?;
1283 let source = self.toc.frames.get(source_idx).cloned().ok_or(
1284 MemvidError::InvalidFrame {
1285 frame_id: source_id,
1286 reason: "reused payload source missing",
1287 },
1288 )?;
1289 (
1290 source.payload_offset,
1291 source.payload_length,
1292 source.checksum,
1293 entry
1294 .canonical_length
1295 .or(source.canonical_length)
1296 .unwrap_or(source.payload_length),
1297 )
1298 } else {
1299 self.file.seek(SeekFrom::Start(data_cursor))?;
1300 self.file.write_all(&entry.payload)?;
1301 let checksum = hash(&entry.payload);
1302 let payload_length = entry.payload.len() as u64;
1303 let canonical_length =
1304 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1305 if let Some(len) = entry.canonical_length {
1306 len
1307 } else {
1308 let decoded = crate::decode_canonical_bytes(
1309 &entry.payload,
1310 CanonicalEncoding::Zstd,
1311 frame_id,
1312 )?;
1313 decoded.len() as u64
1314 }
1315 } else {
1316 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1317 };
1318 let payload_offset = data_cursor;
1319 data_cursor += payload_length;
1320 self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1322 (
1323 payload_offset,
1324 payload_length,
1325 *checksum.as_bytes(),
1326 canonical_length,
1327 )
1328 };
1329
1330 let uri = entry
1331 .uri
1332 .clone()
1333 .unwrap_or_else(|| crate::default_uri(frame_id));
1334 let title = entry
1335 .title
1336 .clone()
1337 .or_else(|| crate::infer_title_from_uri(&uri));
1338
1339 #[cfg(feature = "temporal_track")]
1340 let (anchor_ts, anchor_source) =
1341 self.determine_temporal_anchor(entry.timestamp);
1342
1343 let mut frame = Frame {
1344 id: frame_id,
1345 timestamp: entry.timestamp,
1346 anchor_ts: {
1347 #[cfg(feature = "temporal_track")]
1348 {
1349 Some(anchor_ts)
1350 }
1351 #[cfg(not(feature = "temporal_track"))]
1352 {
1353 None
1354 }
1355 },
1356 anchor_source: {
1357 #[cfg(feature = "temporal_track")]
1358 {
1359 Some(anchor_source)
1360 }
1361 #[cfg(not(feature = "temporal_track"))]
1362 {
1363 None
1364 }
1365 },
1366 kind: entry.kind.clone(),
1367 track: entry.track.clone(),
1368 payload_offset,
1369 payload_length,
1370 checksum: checksum_bytes,
1371 uri: Some(uri),
1372 title,
1373 canonical_encoding: entry.canonical_encoding,
1374 canonical_length: Some(canonical_length_value),
1375 metadata: entry.metadata.clone(),
1376 search_text: entry.search_text.clone(),
1377 tags: entry.tags.clone(),
1378 labels: entry.labels.clone(),
1379 extra_metadata: entry.extra_metadata.clone(),
1380 content_dates: entry.content_dates.clone(),
1381 chunk_manifest: entry.chunk_manifest.clone(),
1382 role: entry.role,
1383 parent_id: None,
1384 chunk_index: entry.chunk_index,
1385 chunk_count: entry.chunk_count,
1386 status: FrameStatus::Active,
1387 supersedes: entry.supersedes_frame_id,
1388 superseded_by: None,
1389 source_sha256: entry.source_sha256,
1390 source_path: entry.source_path.clone(),
1391 enrichment_state: entry.enrichment_state,
1392 };
1393
1394 if let Some(parent_seq) = entry.parent_sequence {
1395 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1396 frame.parent_id = Some(*parent_frame_id);
1397 } else {
1398 if entry.role == FrameRole::DocumentChunk {
1404 for &candidate_id in delta.inserted_frames.iter().rev() {
1406 if let Ok(idx) = usize::try_from(candidate_id) {
1407 if let Some(candidate) = self.toc.frames.get(idx) {
1408 if candidate.role == FrameRole::Document
1409 && candidate.chunk_manifest.is_some()
1410 {
1411 frame.parent_id = Some(candidate_id);
1413 tracing::debug!(
1414 chunk_frame_id = frame_id,
1415 parent_frame_id = candidate_id,
1416 parent_seq = parent_seq,
1417 "resolved chunk parent via fallback"
1418 );
1419 break;
1420 }
1421 }
1422 }
1423 }
1424 }
1425 if frame.parent_id.is_none() {
1426 tracing::warn!(
1427 chunk_frame_id = frame_id,
1428 parent_seq = parent_seq,
1429 "chunk has parent_sequence but parent not found in batch"
1430 );
1431 }
1432 }
1433 }
1434
1435 #[cfg(feature = "lex")]
1436 let index_text = if self.tantivy.is_some() {
1437 if let Some(text) = entry.search_text.clone() {
1438 if text.trim().is_empty() {
1439 None
1440 } else {
1441 Some(text)
1442 }
1443 } else {
1444 Some(self.frame_content(&frame)?)
1445 }
1446 } else {
1447 None
1448 };
1449 #[cfg(feature = "lex")]
1450 if let (Some(engine), Some(text)) =
1451 (self.tantivy.as_mut(), index_text.as_ref())
1452 {
1453 engine.add_frame(&frame, text)?;
1454 self.tantivy_dirty = true;
1455
1456 if !text.trim().is_empty() {
1459 let entry = crate::types::generate_sketch(
1460 frame_id,
1461 text,
1462 crate::types::SketchVariant::Small,
1463 None,
1464 );
1465 self.sketch_track.insert(entry);
1466 }
1467 }
1468
1469 if let Some(embedding) = entry.embedding.take() {
1470 delta
1471 .inserted_embeddings
1472 .push((frame_id, embedding.clone()));
1473 }
1474
1475 if entry.role == FrameRole::Document {
1476 delta
1477 .inserted_time_entries
1478 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1479 #[cfg(feature = "temporal_track")]
1480 {
1481 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1482 frame_id,
1483 anchor_ts,
1484 anchor_source,
1485 ));
1486 delta.inserted_temporal_mentions.extend(
1487 Self::collect_temporal_mentions(
1488 entry.search_text.as_deref(),
1489 frame_id,
1490 anchor_ts,
1491 ),
1492 );
1493 }
1494 }
1495
1496 if let Some(predecessor) = frame.supersedes {
1497 self.mark_frame_superseded(predecessor, frame_id)?;
1498 }
1499
1500 self.toc.frames.push(frame);
1501 delta.inserted_frames.push(frame_id);
1502 sequence_to_frame.insert(record.sequence, frame_id);
1503 }
1504 FrameWalOp::Tombstone => {
1505 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1506 frame_id: 0,
1507 reason: "tombstone missing frame reference",
1508 })?;
1509 self.mark_frame_deleted(target)?;
1510 delta.mutated_frames = true;
1511 }
1512 }
1513 }
1514 self.data_end = self.data_end.max(data_cursor);
1515 }
1516
1517 let orphan_resolutions: Vec<(u64, u64)> = delta
1521 .inserted_frames
1522 .iter()
1523 .filter_map(|&frame_id| {
1524 let idx = usize::try_from(frame_id).ok()?;
1525 let frame = self.toc.frames.get(idx)?;
1526 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1527 return None;
1528 }
1529 for candidate_id in (0..frame_id).rev() {
1531 if let Ok(idx) = usize::try_from(candidate_id) {
1532 if let Some(candidate) = self.toc.frames.get(idx) {
1533 if candidate.role == FrameRole::Document
1534 && candidate.chunk_manifest.is_some()
1535 && candidate.status == FrameStatus::Active
1536 {
1537 return Some((frame_id, candidate_id));
1538 }
1539 }
1540 }
1541 }
1542 None
1543 })
1544 .collect();
1545
1546 for (chunk_id, parent_id) in orphan_resolutions {
1548 if let Ok(idx) = usize::try_from(chunk_id) {
1549 if let Some(frame) = self.toc.frames.get_mut(idx) {
1550 frame.parent_id = Some(parent_id);
1551 tracing::debug!(
1552 chunk_frame_id = chunk_id,
1553 parent_frame_id = parent_id,
1554 "resolved orphan chunk parent in second pass"
1555 );
1556 }
1557 }
1558 }
1559
1560 Ok(delta)
1563 }
1564
1565 #[cfg(feature = "temporal_track")]
1566 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1567 (timestamp, AnchorSource::FrameTimestamp)
1568 }
1569
1570 #[cfg(feature = "temporal_track")]
1571 fn collect_temporal_mentions(
1572 text: Option<&str>,
1573 frame_id: FrameId,
1574 anchor_ts: i64,
1575 ) -> Vec<TemporalMention> {
1576 let text = match text {
1577 Some(value) if !value.trim().is_empty() => value,
1578 _ => return Vec::new(),
1579 };
1580
1581 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1582 Ok(ts) => ts,
1583 Err(_) => return Vec::new(),
1584 };
1585
1586 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1587 let normalizer = TemporalNormalizer::new(context);
1588 let mut spans: Vec<(usize, usize)> = Vec::new();
1589 let lower = text.to_ascii_lowercase();
1590
1591 for phrase in STATIC_TEMPORAL_PHRASES {
1592 let mut search_start = 0usize;
1593 while let Some(idx) = lower[search_start..].find(phrase) {
1594 let abs = search_start + idx;
1595 let end = abs + phrase.len();
1596 spans.push((abs, end));
1597 search_start = end;
1598 }
1599 }
1600
1601 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1602 let regex = NUMERIC_DATE.get_or_init(|| {
1603 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1604 });
1605 let regex = match regex {
1606 Ok(re) => re,
1607 Err(msg) => {
1608 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1609 return Vec::new();
1610 }
1611 };
1612 for mat in regex.find_iter(text) {
1613 spans.push((mat.start(), mat.end()));
1614 }
1615
1616 spans.sort_unstable();
1617 spans.dedup();
1618
1619 let mut mentions: Vec<TemporalMention> = Vec::new();
1620 for (start, end) in spans {
1621 if end > text.len() || start >= end {
1622 continue;
1623 }
1624 let raw = &text[start..end];
1625 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1626 if trimmed.is_empty() {
1627 continue;
1628 }
1629 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1630 let finish = offset + trimmed.len();
1631 match normalizer.resolve(trimmed) {
1632 Ok(resolution) => {
1633 mentions.extend(Self::resolution_to_mentions(
1634 resolution, frame_id, offset, finish,
1635 ));
1636 }
1637 Err(_) => continue,
1638 }
1639 }
1640
1641 mentions
1642 }
1643
1644 #[cfg(feature = "temporal_track")]
1645 fn resolution_to_mentions(
1646 resolution: TemporalResolution,
1647 frame_id: FrameId,
1648 byte_start: usize,
1649 byte_end: usize,
1650 ) -> Vec<TemporalMention> {
1651 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1652 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1653 let mut results = Vec::new();
1654
1655 let base_flags = Self::flags_from_resolution(&resolution.flags);
1656 match resolution.value {
1657 TemporalResolutionValue::Date(date) => {
1658 let ts = Self::date_to_timestamp(date);
1659 results.push(TemporalMention::new(
1660 ts,
1661 frame_id,
1662 byte_start,
1663 byte_len,
1664 TemporalMentionKind::Date,
1665 resolution.confidence,
1666 0,
1667 base_flags,
1668 ));
1669 }
1670 TemporalResolutionValue::DateTime(dt) => {
1671 let ts = dt.unix_timestamp();
1672 let tz_hint = dt.offset().whole_minutes() as i16;
1673 results.push(TemporalMention::new(
1674 ts,
1675 frame_id,
1676 byte_start,
1677 byte_len,
1678 TemporalMentionKind::DateTime,
1679 resolution.confidence,
1680 tz_hint,
1681 base_flags,
1682 ));
1683 }
1684 TemporalResolutionValue::DateRange { start, end } => {
1685 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1686 let start_ts = Self::date_to_timestamp(start);
1687 results.push(TemporalMention::new(
1688 start_ts,
1689 frame_id,
1690 byte_start,
1691 byte_len,
1692 TemporalMentionKind::RangeStart,
1693 resolution.confidence,
1694 0,
1695 flags,
1696 ));
1697 let end_ts = Self::date_to_timestamp(end);
1698 results.push(TemporalMention::new(
1699 end_ts,
1700 frame_id,
1701 byte_start,
1702 byte_len,
1703 TemporalMentionKind::RangeEnd,
1704 resolution.confidence,
1705 0,
1706 flags,
1707 ));
1708 }
1709 TemporalResolutionValue::DateTimeRange { start, end } => {
1710 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1711 results.push(TemporalMention::new(
1712 start.unix_timestamp(),
1713 frame_id,
1714 byte_start,
1715 byte_len,
1716 TemporalMentionKind::RangeStart,
1717 resolution.confidence,
1718 start.offset().whole_minutes() as i16,
1719 flags,
1720 ));
1721 results.push(TemporalMention::new(
1722 end.unix_timestamp(),
1723 frame_id,
1724 byte_start,
1725 byte_len,
1726 TemporalMentionKind::RangeEnd,
1727 resolution.confidence,
1728 end.offset().whole_minutes() as i16,
1729 flags,
1730 ));
1731 }
1732 TemporalResolutionValue::Month { year, month } => {
1733 let start_date = match Date::from_calendar_date(year, month, 1) {
1734 Ok(date) => date,
1735 Err(err) => {
1736 tracing::warn!(
1737 target = "memvid::temporal",
1738 %err,
1739 year,
1740 month = month as u8,
1741 "skipping invalid month resolution"
1742 );
1743 return results;
1745 }
1746 };
1747 let end_date = match Self::last_day_in_month(year, month) {
1748 Some(date) => date,
1749 None => {
1750 tracing::warn!(
1751 target = "memvid::temporal",
1752 year,
1753 month = month as u8,
1754 "skipping month resolution with invalid calendar range"
1755 );
1756 return results;
1757 }
1758 };
1759 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1760 results.push(TemporalMention::new(
1761 Self::date_to_timestamp(start_date),
1762 frame_id,
1763 byte_start,
1764 byte_len,
1765 TemporalMentionKind::RangeStart,
1766 resolution.confidence,
1767 0,
1768 flags,
1769 ));
1770 results.push(TemporalMention::new(
1771 Self::date_to_timestamp(end_date),
1772 frame_id,
1773 byte_start,
1774 byte_len,
1775 TemporalMentionKind::RangeEnd,
1776 resolution.confidence,
1777 0,
1778 flags,
1779 ));
1780 }
1781 }
1782
1783 results
1784 }
1785
1786 #[cfg(feature = "temporal_track")]
1787 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1788 let mut result = TemporalMentionFlags::empty();
1789 if flags
1790 .iter()
1791 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1792 {
1793 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1794 }
1795 if flags
1796 .iter()
1797 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1798 {
1799 result = result.set(TemporalMentionFlags::DERIVED, true);
1800 }
1801 result
1802 }
1803
1804 #[cfg(feature = "temporal_track")]
1805 fn date_to_timestamp(date: Date) -> i64 {
1806 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1807 .assume_offset(UtcOffset::UTC)
1808 .unix_timestamp()
1809 }
1810
1811 #[cfg(feature = "temporal_track")]
1812 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1813 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1814 while let Some(next) = date.next_day() {
1815 if next.month() == month {
1816 date = next;
1817 } else {
1818 break;
1819 }
1820 }
1821 Some(date)
1822 }
1823
1824 #[allow(dead_code)]
1825 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1826 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1827 return Ok(false);
1828 }
1829
1830 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1831 Some(artifact) => artifact,
1832 None => return Ok(false),
1833 };
1834
1835 let segment_id = self.toc.segment_catalog.next_segment_id;
1836 #[cfg(feature = "parallel_segments")]
1837 let span =
1838 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1839
1840 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1841 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1842 #[cfg(feature = "parallel_segments")]
1843 if let Some(span) = span {
1844 Self::decorate_segment_common(&mut descriptor.common, span);
1845 }
1846 #[cfg(feature = "parallel_segments")]
1847 let descriptor_for_manifest = descriptor.clone();
1848 self.toc.segment_catalog.lex_segments.push(descriptor);
1849 #[cfg(feature = "parallel_segments")]
1850 if let Err(err) = self.record_index_segment(
1851 SegmentKind::Lexical,
1852 descriptor_for_manifest.common,
1853 SegmentStats {
1854 doc_count: artifact.doc_count,
1855 vector_count: 0,
1856 time_entries: 0,
1857 bytes_uncompressed: artifact.bytes.len() as u64,
1858 build_micros: 0,
1859 },
1860 ) {
1861 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1862 }
1863 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1864 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1865 Ok(true)
1866 }
1867
1868 #[allow(dead_code)]
1869 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1870 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1871 return Ok(false);
1872 }
1873
1874 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1875 Some(artifact) => artifact,
1876 None => return Ok(false),
1877 };
1878
1879 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1880 if existing_dim != artifact.dimension {
1881 return Err(MemvidError::VecDimensionMismatch {
1882 expected: existing_dim,
1883 actual: artifact.dimension as usize,
1884 });
1885 }
1886 }
1887
1888 let segment_id = self.toc.segment_catalog.next_segment_id;
1889 #[cfg(feature = "parallel_segments")]
1890 #[cfg(feature = "parallel_segments")]
1891 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1892
1893 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1894 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1895 #[cfg(feature = "parallel_segments")]
1896 if let Some(span) = span {
1897 Self::decorate_segment_common(&mut descriptor.common, span);
1898 }
1899 #[cfg(feature = "parallel_segments")]
1900 let descriptor_for_manifest = descriptor.clone();
1901 self.toc.segment_catalog.vec_segments.push(descriptor);
1902 #[cfg(feature = "parallel_segments")]
1903 if let Err(err) = self.record_index_segment(
1904 SegmentKind::Vector,
1905 descriptor_for_manifest.common,
1906 SegmentStats {
1907 doc_count: 0,
1908 vector_count: artifact.vector_count,
1909 time_entries: 0,
1910 bytes_uncompressed: artifact.bytes_uncompressed,
1911 build_micros: 0,
1912 },
1913 ) {
1914 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1915 }
1916 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1917 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1918
1919 if self.toc.indexes.vec.is_none() {
1921 let empty_offset = self.data_end;
1922 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1923 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1924 self.toc.indexes.vec = Some(VecIndexManifest {
1925 vector_count: 0,
1926 dimension: 0,
1927 bytes_offset: empty_offset,
1928 bytes_length: 0,
1929 checksum: empty_checksum,
1930 compression_mode: self.vec_compression.clone(),
1931 model: self.vec_model.clone(),
1932 });
1933 }
1934 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1935 if manifest.dimension == 0 {
1936 manifest.dimension = artifact.dimension;
1937 }
1938 if manifest.bytes_length == 0 {
1939 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1940 manifest.compression_mode = artifact.compression.clone();
1941 }
1942 }
1943
1944 self.vec_enabled = true;
1945 Ok(true)
1946 }
1947
1948 #[allow(dead_code)]
1949 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1950 if delta.inserted_time_entries.is_empty() {
1951 return Ok(false);
1952 }
1953
1954 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1955 Some(artifact) => artifact,
1956 None => return Ok(false),
1957 };
1958
1959 let segment_id = self.toc.segment_catalog.next_segment_id;
1960 #[cfg(feature = "parallel_segments")]
1961 #[cfg(feature = "parallel_segments")]
1962 let span = self.segment_span_from_iter(
1963 delta
1964 .inserted_time_entries
1965 .iter()
1966 .map(|entry| entry.frame_id),
1967 );
1968
1969 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1970 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1971 #[cfg(feature = "parallel_segments")]
1972 if let Some(span) = span {
1973 Self::decorate_segment_common(&mut descriptor.common, span);
1974 }
1975 #[cfg(feature = "parallel_segments")]
1976 let descriptor_for_manifest = descriptor.clone();
1977 self.toc.segment_catalog.time_segments.push(descriptor);
1978 #[cfg(feature = "parallel_segments")]
1979 if let Err(err) = self.record_index_segment(
1980 SegmentKind::Time,
1981 descriptor_for_manifest.common,
1982 SegmentStats {
1983 doc_count: 0,
1984 vector_count: 0,
1985 time_entries: artifact.entry_count,
1986 bytes_uncompressed: artifact.bytes.len() as u64,
1987 build_micros: 0,
1988 },
1989 ) {
1990 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1991 }
1992 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1993 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1994 Ok(true)
1995 }
1996
1997 #[cfg(feature = "temporal_track")]
1998 #[allow(dead_code)]
1999 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
2000 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
2001 {
2002 return Ok(false);
2003 }
2004
2005 debug_assert!(
2006 delta.inserted_temporal_mentions.len() < 1_000_000,
2007 "temporal delta mentions unexpectedly large: {}",
2008 delta.inserted_temporal_mentions.len()
2009 );
2010 debug_assert!(
2011 delta.inserted_temporal_anchors.len() < 1_000_000,
2012 "temporal delta anchors unexpectedly large: {}",
2013 delta.inserted_temporal_anchors.len()
2014 );
2015
2016 let artifact = match self.build_temporal_segment_from_records(
2017 &delta.inserted_temporal_mentions,
2018 &delta.inserted_temporal_anchors,
2019 )? {
2020 Some(artifact) => artifact,
2021 None => return Ok(false),
2022 };
2023
2024 let segment_id = self.toc.segment_catalog.next_segment_id;
2025 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2026 self.toc
2027 .segment_catalog
2028 .temporal_segments
2029 .push(descriptor.clone());
2030 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2031 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2032
2033 self.toc.temporal_track = Some(TemporalTrackManifest {
2034 bytes_offset: descriptor.common.bytes_offset,
2035 bytes_length: descriptor.common.bytes_length,
2036 entry_count: artifact.entry_count,
2037 anchor_count: artifact.anchor_count,
2038 checksum: artifact.checksum,
2039 flags: artifact.flags,
2040 });
2041
2042 self.clear_temporal_track_cache();
2043
2044 Ok(true)
2045 }
2046
2047 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2048 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2049 frame_id,
2050 reason: "frame id too large",
2051 })?;
2052 let frame = self
2053 .toc
2054 .frames
2055 .get_mut(index)
2056 .ok_or(MemvidError::InvalidFrame {
2057 frame_id,
2058 reason: "supersede target missing",
2059 })?;
2060 frame.status = FrameStatus::Superseded;
2061 frame.superseded_by = Some(successor_id);
2062 self.remove_frame_from_indexes(frame_id)
2063 }
2064
2065 pub(crate) fn rebuild_indexes(
2066 &mut self,
2067 new_vec_docs: &[(FrameId, Vec<f32>)],
2068 inserted_frame_ids: &[FrameId],
2069 ) -> Result<()> {
2070 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2071 return Ok(());
2072 }
2073
2074 let payload_end = self.payload_region_end();
2075 self.data_end = payload_end;
2076 let safe_truncate_len = self.header.footer_offset.max(payload_end);
2079 if self.file.metadata()?.len() > safe_truncate_len {
2080 self.file.set_len(safe_truncate_len)?;
2081 }
2082 self.file.seek(SeekFrom::Start(payload_end))?;
2083
2084 self.toc.segment_catalog.lex_segments.clear();
2086 self.toc.segment_catalog.vec_segments.clear();
2087 self.toc.segment_catalog.time_segments.clear();
2088 #[cfg(feature = "temporal_track")]
2089 self.toc.segment_catalog.temporal_segments.clear();
2090 #[cfg(feature = "parallel_segments")]
2091 self.toc.segment_catalog.index_segments.clear();
2092 self.toc.segment_catalog.tantivy_segments.clear();
2094 self.toc.indexes.lex_segments.clear();
2096
2097 let mut time_entries: Vec<TimeIndexEntry> = self
2098 .toc
2099 .frames
2100 .iter()
2101 .filter(|frame| {
2102 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2103 })
2104 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2105 .collect();
2106 let (ti_offset, ti_length, ti_checksum) =
2107 time_index_append(&mut self.file, &mut time_entries)?;
2108 self.toc.time_index = Some(TimeIndexManifest {
2109 bytes_offset: ti_offset,
2110 bytes_length: ti_length,
2111 entry_count: time_entries.len() as u64,
2112 checksum: ti_checksum,
2113 });
2114
2115 let mut footer_offset = ti_offset + ti_length;
2116
2117 #[cfg(feature = "temporal_track")]
2118 {
2119 self.toc.temporal_track = None;
2120 self.toc.segment_catalog.temporal_segments.clear();
2121 self.clear_temporal_track_cache();
2122 }
2123
2124 if self.lex_enabled {
2125 #[cfg(feature = "lex")]
2126 {
2127 if self.tantivy_dirty {
2128 if let Ok(mut storage) = self.lex_storage.write() {
2132 storage.clear();
2133 storage.set_generation(0);
2134 }
2135 self.init_tantivy()?;
2136 if let Some(mut engine) = self.tantivy.take() {
2137 self.rebuild_tantivy_engine(&mut engine)?;
2138 self.tantivy = Some(engine);
2139 } else {
2140 return Err(MemvidError::InvalidToc {
2141 reason: "tantivy engine missing during rebuild".into(),
2142 });
2143 }
2144 } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2145 let max_payload = crate::memvid::search::max_index_payload();
2151 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2152 for &frame_id in inserted_frame_ids {
2153 let Ok(idx) = usize::try_from(frame_id) else {
2154 continue;
2155 };
2156 let frame = match self.toc.frames.get(idx) {
2157 Some(f) => f.clone(),
2158 None => continue,
2159 };
2160 if frame.status != FrameStatus::Active {
2161 continue;
2162 }
2163 if let Some(search_text) = frame.search_text.clone() {
2164 if !search_text.trim().is_empty() {
2165 prepared_docs.push((frame, search_text));
2166 continue;
2167 }
2168 }
2169 let mime = frame
2170 .metadata
2171 .as_ref()
2172 .and_then(|m| m.mime.as_deref())
2173 .unwrap_or("application/octet-stream");
2174 if !crate::memvid::search::is_text_indexable_mime(mime) {
2175 continue;
2176 }
2177 if frame.payload_length > max_payload {
2178 continue;
2179 }
2180 let text = self.frame_search_text(&frame)?;
2181 if !text.trim().is_empty() {
2182 prepared_docs.push((frame, text));
2183 }
2184 }
2185 if let Some(engine) = self.tantivy.as_mut() {
2186 for (frame, text) in &prepared_docs {
2187 engine.add_frame(frame, text)?;
2188 }
2189 engine.commit()?;
2190 }
2191 } else {
2192 if let Ok(mut storage) = self.lex_storage.write() {
2195 storage.clear();
2196 storage.set_generation(0);
2197 }
2198 self.init_tantivy()?;
2199 if let Some(mut engine) = self.tantivy.take() {
2200 self.rebuild_tantivy_engine(&mut engine)?;
2201 self.tantivy = Some(engine);
2202 } else {
2203 return Err(MemvidError::InvalidToc {
2204 reason: "tantivy engine missing during rebuild".into(),
2205 });
2206 }
2207 }
2208
2209 self.lex_enabled = true;
2211
2212 self.tantivy_dirty = true;
2214
2215 self.data_end = footer_offset;
2217
2218 self.flush_tantivy()?;
2220
2221 footer_offset = self.header.footer_offset;
2223
2224 self.data_end = payload_end;
2226 }
2227 #[cfg(not(feature = "lex"))]
2228 {
2229 self.toc.indexes.lex = None;
2230 self.toc.indexes.lex_segments.clear();
2231 }
2232 } else {
2233 self.toc.indexes.lex = None;
2235 self.toc.indexes.lex_segments.clear();
2236 #[cfg(feature = "lex")]
2237 if let Ok(mut storage) = self.lex_storage.write() {
2238 storage.clear();
2239 }
2240 }
2241
2242 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2243 let vec_offset = footer_offset;
2244 self.file.seek(SeekFrom::Start(vec_offset))?;
2245 self.file.write_all(&artifact.bytes)?;
2246 footer_offset += artifact.bytes.len() as u64;
2247 self.toc.indexes.vec = Some(VecIndexManifest {
2248 vector_count: artifact.vector_count,
2249 dimension: artifact.dimension,
2250 bytes_offset: vec_offset,
2251 bytes_length: artifact.bytes.len() as u64,
2252 checksum: artifact.checksum,
2253 compression_mode: self.vec_compression.clone(),
2254 model: self.vec_model.clone(),
2255 });
2256 self.vec_index = Some(index);
2257 } else {
2258 if !self.vec_enabled {
2260 self.toc.indexes.vec = None;
2261 }
2262 self.vec_index = None;
2263 }
2264
2265 if self.clip_enabled {
2267 if let Some(ref clip_index) = self.clip_index {
2268 if !clip_index.is_empty() {
2269 let artifact = clip_index.encode()?;
2270 let clip_offset = footer_offset;
2271 self.file.seek(SeekFrom::Start(clip_offset))?;
2272 self.file.write_all(&artifact.bytes)?;
2273 footer_offset += artifact.bytes.len() as u64;
2274 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2275 bytes_offset: clip_offset,
2276 bytes_length: artifact.bytes.len() as u64,
2277 vector_count: artifact.vector_count,
2278 dimension: artifact.dimension,
2279 checksum: artifact.checksum,
2280 model_name: crate::clip::default_model_info().name.to_string(),
2281 });
2282 tracing::info!(
2283 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2284 artifact.vector_count,
2285 clip_offset
2286 );
2287 }
2288 }
2289 } else {
2290 self.toc.indexes.clip = None;
2291 }
2292
2293 if self.memories_track.card_count() > 0 {
2295 let memories_offset = footer_offset;
2296 let memories_bytes = self.memories_track.serialize()?;
2297 let memories_checksum = blake3::hash(&memories_bytes).into();
2298 self.file.seek(SeekFrom::Start(memories_offset))?;
2299 self.file.write_all(&memories_bytes)?;
2300 footer_offset += memories_bytes.len() as u64;
2301
2302 let stats = self.memories_track.stats();
2303 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2304 bytes_offset: memories_offset,
2305 bytes_length: memories_bytes.len() as u64,
2306 card_count: stats.card_count as u64,
2307 entity_count: stats.entity_count as u64,
2308 checksum: memories_checksum,
2309 });
2310 } else {
2311 self.toc.memories_track = None;
2312 }
2313
2314 if self.logic_mesh.is_empty() {
2316 self.toc.logic_mesh = None;
2317 } else {
2318 let mesh_offset = footer_offset;
2319 let mesh_bytes = self.logic_mesh.serialize()?;
2320 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2321 self.file.seek(SeekFrom::Start(mesh_offset))?;
2322 self.file.write_all(&mesh_bytes)?;
2323 footer_offset += mesh_bytes.len() as u64;
2324
2325 let stats = self.logic_mesh.stats();
2326 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2327 bytes_offset: mesh_offset,
2328 bytes_length: mesh_bytes.len() as u64,
2329 node_count: stats.node_count as u64,
2330 edge_count: stats.edge_count as u64,
2331 checksum: mesh_checksum,
2332 });
2333 }
2334
2335 tracing::info!(
2337 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2338 ti_offset,
2339 ti_length,
2340 footer_offset,
2341 self.header.footer_offset
2342 );
2343
2344 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2347
2348 if self.file.metadata()?.len() < self.header.footer_offset {
2350 self.file.set_len(self.header.footer_offset)?;
2351 }
2352
2353 self.rewrite_toc_footer()?;
2354 self.header.toc_checksum = self.toc.toc_checksum;
2355 crate::persist_header(&mut self.file, &self.header)?;
2356
2357 #[cfg(feature = "lex")]
2358 if self.lex_enabled {
2359 if let Some(ref engine) = self.tantivy {
2360 let doc_count = engine.num_docs();
2361 let active_frame_count = self
2362 .toc
2363 .frames
2364 .iter()
2365 .filter(|f| f.status == FrameStatus::Active)
2366 .count();
2367
2368 let text_indexable_count = self
2371 .toc
2372 .frames
2373 .iter()
2374 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2375 .count();
2376
2377 if doc_count == 0 && text_indexable_count > 0 {
2380 return Err(MemvidError::Doctor {
2381 reason: format!(
2382 "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2383 This indicates a critical failure in the rebuild process."
2384 ),
2385 });
2386 }
2387
2388 log::info!(
2390 "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2391 );
2392 }
2393 }
2394
2395 Ok(())
2396 }
2397
2398 fn persist_memories_track(&mut self) -> Result<()> {
2403 if self.memories_track.card_count() == 0 {
2404 self.toc.memories_track = None;
2405 return Ok(());
2406 }
2407
2408 let memories_offset = self.header.footer_offset;
2410 let memories_bytes = self.memories_track.serialize()?;
2411 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2412
2413 self.file.seek(SeekFrom::Start(memories_offset))?;
2414 self.file.write_all(&memories_bytes)?;
2415
2416 let stats = self.memories_track.stats();
2417 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2418 bytes_offset: memories_offset,
2419 bytes_length: memories_bytes.len() as u64,
2420 card_count: stats.card_count as u64,
2421 entity_count: stats.entity_count as u64,
2422 checksum: memories_checksum,
2423 });
2424
2425 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2427
2428 if self.file.metadata()?.len() < self.header.footer_offset {
2430 self.file.set_len(self.header.footer_offset)?;
2431 }
2432
2433 Ok(())
2434 }
2435
2436 fn persist_clip_index(&mut self) -> Result<()> {
2441 if !self.clip_enabled {
2442 self.toc.indexes.clip = None;
2443 return Ok(());
2444 }
2445
2446 let clip_index = match &self.clip_index {
2447 Some(idx) if !idx.is_empty() => idx,
2448 _ => {
2449 self.toc.indexes.clip = None;
2450 return Ok(());
2451 }
2452 };
2453
2454 let artifact = clip_index.encode()?;
2456
2457 let clip_offset = self.header.footer_offset;
2459 self.file.seek(SeekFrom::Start(clip_offset))?;
2460 self.file.write_all(&artifact.bytes)?;
2461
2462 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2463 bytes_offset: clip_offset,
2464 bytes_length: artifact.bytes.len() as u64,
2465 vector_count: artifact.vector_count,
2466 dimension: artifact.dimension,
2467 checksum: artifact.checksum,
2468 model_name: crate::clip::default_model_info().name.to_string(),
2469 });
2470
2471 tracing::info!(
2472 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2473 artifact.vector_count,
2474 clip_offset
2475 );
2476
2477 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2479
2480 if self.file.metadata()?.len() < self.header.footer_offset {
2482 self.file.set_len(self.header.footer_offset)?;
2483 }
2484
2485 Ok(())
2486 }
2487
2488 fn persist_logic_mesh(&mut self) -> Result<()> {
2493 if self.logic_mesh.is_empty() {
2494 self.toc.logic_mesh = None;
2495 return Ok(());
2496 }
2497
2498 let mesh_offset = self.header.footer_offset;
2500 let mesh_bytes = self.logic_mesh.serialize()?;
2501 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2502
2503 self.file.seek(SeekFrom::Start(mesh_offset))?;
2504 self.file.write_all(&mesh_bytes)?;
2505
2506 let stats = self.logic_mesh.stats();
2507 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2508 bytes_offset: mesh_offset,
2509 bytes_length: mesh_bytes.len() as u64,
2510 node_count: stats.node_count as u64,
2511 edge_count: stats.edge_count as u64,
2512 checksum: mesh_checksum,
2513 });
2514
2515 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2517
2518 if self.file.metadata()?.len() < self.header.footer_offset {
2520 self.file.set_len(self.header.footer_offset)?;
2521 }
2522
2523 Ok(())
2524 }
2525
2526 fn persist_sketch_track(&mut self) -> Result<()> {
2531 if self.sketch_track.is_empty() {
2532 self.toc.sketch_track = None;
2533 return Ok(());
2534 }
2535
2536 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2538
2539 let (sketch_offset, sketch_length, sketch_checksum) =
2541 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2542
2543 let stats = self.sketch_track.stats();
2544 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2545 bytes_offset: sketch_offset,
2546 bytes_length: sketch_length,
2547 entry_count: stats.entry_count,
2548 #[allow(clippy::cast_possible_truncation)]
2549 entry_size: stats.variant.entry_size() as u16,
2550 flags: 0,
2551 checksum: sketch_checksum,
2552 });
2553
2554 self.header.footer_offset = sketch_offset + sketch_length;
2556
2557 if self.file.metadata()?.len() < self.header.footer_offset {
2559 self.file.set_len(self.header.footer_offset)?;
2560 }
2561
2562 tracing::debug!(
2563 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2564 stats.entry_count,
2565 sketch_offset
2566 );
2567
2568 Ok(())
2569 }
2570
2571 #[cfg(feature = "lex")]
2572 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2573 let LexWalBatch {
2574 generation,
2575 doc_count,
2576 checksum,
2577 segments,
2578 } = batch;
2579
2580 if let Ok(mut storage) = self.lex_storage.write() {
2581 storage.replace(doc_count, checksum, segments);
2582 storage.set_generation(generation);
2583 }
2584
2585 self.persist_lex_manifest()
2586 }
2587
2588 #[cfg(feature = "lex")]
2589 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2590 let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2591 self.append_wal_entry(&payload)?;
2592 Ok(())
2593 }
2594
2595 #[cfg(feature = "lex")]
2596 fn persist_lex_manifest(&mut self) -> Result<()> {
2597 let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2598 storage.to_manifest()
2599 } else {
2600 (None, Vec::new())
2601 };
2602
2603 if let Some(storage_manifest) = index_manifest {
2605 self.toc.indexes.lex = Some(storage_manifest);
2607 } else {
2608 self.toc.indexes.lex = None;
2611 }
2612
2613 self.toc.indexes.lex_segments = segments;
2614
2615 self.rewrite_toc_footer()?;
2619 self.header.toc_checksum = self.toc.toc_checksum;
2620 crate::persist_header(&mut self.file, &self.header)?;
2621 Ok(())
2622 }
2623
2624 #[cfg(feature = "lex")]
2625 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2626 let TantivySnapshot {
2627 doc_count,
2628 checksum,
2629 segments,
2630 } = snapshot;
2631
2632 let mut footer_offset = self.data_end;
2633 self.file.seek(SeekFrom::Start(footer_offset))?;
2634
2635 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2636 for segment in segments {
2637 let bytes_length = segment.bytes.len() as u64;
2638 self.file.write_all(&segment.bytes)?;
2639 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2641 path: segment.path,
2642 bytes_offset: footer_offset,
2643 bytes_length,
2644 checksum: segment.checksum,
2645 });
2646 footer_offset += bytes_length;
2647 }
2648 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2653
2654 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2655 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2656 Vec::with_capacity(embedded_segments.len());
2657 for segment in &embedded_segments {
2658 let descriptor = TantivySegmentDescriptor::from_common(
2659 SegmentCommon::new(
2660 next_segment_id,
2661 segment.bytes_offset,
2662 segment.bytes_length,
2663 segment.checksum,
2664 ),
2665 segment.path.clone(),
2666 );
2667 catalog_segments.push(descriptor);
2668 next_segment_id = next_segment_id.saturating_add(1);
2669 }
2670 if catalog_segments.is_empty() {
2671 self.toc.segment_catalog.tantivy_segments.clear();
2672 } else {
2673 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2674 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2675 }
2676 self.toc.segment_catalog.next_segment_id = next_segment_id;
2677
2678 let generation = self
2685 .lex_storage
2686 .write()
2687 .map_err(|_| MemvidError::Tantivy {
2688 reason: "embedded lex storage lock poisoned".into(),
2689 })
2690 .map(|mut storage| {
2691 storage.replace(doc_count, checksum, embedded_segments.clone());
2692 storage.generation()
2693 })?;
2694
2695 let batch = LexWalBatch {
2696 generation,
2697 doc_count,
2698 checksum,
2699 segments: embedded_segments.clone(),
2700 };
2701 self.append_lex_batch(&batch)?;
2702 self.persist_lex_manifest()?;
2703 self.header.toc_checksum = self.toc.toc_checksum;
2704 crate::persist_header(&mut self.file, &self.header)?;
2705 Ok(())
2706 }
2707
2708 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2709 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2710 frame_id,
2711 reason: "frame id too large",
2712 })?;
2713 let frame = self
2714 .toc
2715 .frames
2716 .get_mut(index)
2717 .ok_or(MemvidError::InvalidFrame {
2718 frame_id,
2719 reason: "delete target missing",
2720 })?;
2721 frame.status = FrameStatus::Deleted;
2722 frame.superseded_by = None;
2723 self.remove_frame_from_indexes(frame_id)
2724 }
2725
2726 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2727 #[cfg(feature = "lex")]
2728 if let Some(engine) = self.tantivy.as_mut() {
2729 engine.delete_frame(frame_id)?;
2730 self.tantivy_dirty = true;
2731 }
2732 if let Some(index) = self.lex_index.as_mut() {
2733 index.remove_document(frame_id);
2734 }
2735 if let Some(index) = self.vec_index.as_mut() {
2736 index.remove(frame_id);
2737 }
2738 Ok(())
2739 }
2740
2741 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2742 let Ok(index) = usize::try_from(frame_id) else {
2743 return false;
2744 };
2745 self.toc
2746 .frames
2747 .get(index)
2748 .is_some_and(|frame| frame.status == FrameStatus::Active)
2749 }
2750
2751 #[cfg(feature = "parallel_segments")]
2752 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2753 where
2754 I: IntoIterator<Item = FrameId>,
2755 {
2756 let mut iter = iter.into_iter();
2757 let first_id = iter.next()?;
2758 let first_frame = self.toc.frames.get(first_id as usize);
2759 let mut min_id = first_id;
2760 let mut max_id = first_id;
2761 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2762 let mut page_end = first_frame
2763 .and_then(|frame| frame.chunk_count)
2764 .map(|count| page_start + count.saturating_sub(1))
2765 .unwrap_or(page_start);
2766 for frame_id in iter {
2767 if frame_id < min_id {
2768 min_id = frame_id;
2769 }
2770 if frame_id > max_id {
2771 max_id = frame_id;
2772 }
2773 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2774 if let Some(idx) = frame.chunk_index {
2775 page_start = page_start.min(idx);
2776 if let Some(count) = frame.chunk_count {
2777 let end = idx + count.saturating_sub(1);
2778 page_end = page_end.max(end);
2779 } else {
2780 page_end = page_end.max(idx);
2781 }
2782 }
2783 }
2784 }
2785 Some(SegmentSpan {
2786 frame_start: min_id,
2787 frame_end: max_id,
2788 page_start,
2789 page_end,
2790 ..SegmentSpan::default()
2791 })
2792 }
2793
2794 #[cfg(feature = "parallel_segments")]
2795 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2796 common.span = Some(span);
2797 if common.codec_version == 0 {
2798 common.codec_version = 1;
2799 }
2800 }
2801
2802 #[cfg(feature = "parallel_segments")]
2803 pub(crate) fn record_index_segment(
2804 &mut self,
2805 kind: SegmentKind,
2806 common: SegmentCommon,
2807 stats: SegmentStats,
2808 ) -> Result<()> {
2809 let entry = IndexSegmentRef {
2810 kind,
2811 common,
2812 stats,
2813 };
2814 self.toc.segment_catalog.index_segments.push(entry.clone());
2815 if let Some(wal) = self.manifest_wal.as_mut() {
2816 wal.append_segments(&[entry])?;
2817 }
2818 Ok(())
2819 }
2820
2821 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2822 self.ensure_writable()?;
2823 if self.toc.ticket_ref.issuer == "free-tier" {
2824 return Ok(());
2825 }
2826 match self.tier() {
2827 Tier::Free => Ok(()),
2828 tier => {
2829 if self.toc.ticket_ref.issuer.trim().is_empty() {
2830 Err(MemvidError::TicketRequired { tier })
2831 } else {
2832 Ok(())
2833 }
2834 }
2835 }
2836 }
2837
2838 pub(crate) fn tier(&self) -> Tier {
2839 if self.header.wal_size >= WAL_SIZE_LARGE {
2840 Tier::Enterprise
2841 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2842 Tier::Dev
2843 } else {
2844 Tier::Free
2845 }
2846 }
2847
2848 pub(crate) fn capacity_limit(&self) -> u64 {
2849 if self.toc.ticket_ref.capacity_bytes != 0 {
2850 self.toc.ticket_ref.capacity_bytes
2851 } else {
2852 self.tier().capacity_bytes()
2853 }
2854 }
2855
2856 #[must_use]
2861 pub fn get_capacity(&self) -> u64 {
2862 self.capacity_limit()
2863 }
2864
2865 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2866 tracing::info!(
2867 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2868 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2869 time_segments = self.toc.segment_catalog.time_segments.len(),
2870 footer_offset = self.header.footer_offset,
2871 data_end = self.data_end,
2872 "rewrite_toc_footer: about to serialize TOC"
2873 );
2874 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2875 let footer_offset = self.header.footer_offset;
2876 self.file.seek(SeekFrom::Start(footer_offset))?;
2877 self.file.write_all(&toc_bytes)?;
2878 let footer = CommitFooter {
2879 toc_len: toc_bytes.len() as u64,
2880 toc_hash: *hash(&toc_bytes).as_bytes(),
2881 generation: self.generation,
2882 };
2883 let encoded_footer = footer.encode();
2884 self.file.write_all(&encoded_footer)?;
2885
2886 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2888 let min_len = self.header.wal_offset + self.header.wal_size;
2889 let final_len = new_len.max(min_len);
2890
2891 if new_len < min_len {
2892 tracing::warn!(
2893 file.new_len = new_len,
2894 file.min_len = min_len,
2895 file.final_len = final_len,
2896 "truncation would cut into WAL region, clamping to min_len"
2897 );
2898 }
2899
2900 self.file.set_len(final_len)?;
2901 self.file.sync_all()?;
2903 Ok(())
2904 }
2905}
2906
2907#[cfg(feature = "parallel_segments")]
2908impl Memvid {
2909 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2910 let chunks = self.collect_segment_chunks(delta)?;
2911 if chunks.is_empty() {
2912 return Ok(false);
2913 }
2914 let planner = SegmentPlanner::new(opts.clone());
2915 let plans = planner.plan_from_chunks(chunks);
2916 if plans.is_empty() {
2917 return Ok(false);
2918 }
2919 let worker_pool = SegmentWorkerPool::new(opts);
2920 let results = worker_pool.execute(plans)?;
2921 if results.is_empty() {
2922 return Ok(false);
2923 }
2924 self.append_parallel_segments(results)?;
2925 Ok(true)
2926 }
2927
2928 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2929 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2930 delta.inserted_embeddings.iter().cloned().collect();
2931 tracing::info!(
2932 inserted_frames = ?delta.inserted_frames,
2933 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2934 "collect_segment_chunks: comparing frame IDs"
2935 );
2936 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2937 for frame_id in &delta.inserted_frames {
2938 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2939 MemvidError::InvalidFrame {
2940 frame_id: *frame_id,
2941 reason: "frame id out of range while planning segments",
2942 },
2943 )?;
2944 let text = self.frame_content(&frame)?;
2945 if text.trim().is_empty() {
2946 continue;
2947 }
2948 let token_estimate = estimate_tokens(&text);
2949 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2950 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2951 let page_start = if frame.chunk_index.is_some() {
2952 chunk_index + 1
2953 } else {
2954 0
2955 };
2956 let page_end = if frame.chunk_index.is_some() {
2957 page_start
2958 } else {
2959 0
2960 };
2961 chunks.push(SegmentChunkPlan {
2962 text,
2963 frame_id: *frame_id,
2964 timestamp: frame.timestamp,
2965 chunk_index,
2966 chunk_count: chunk_count.max(1),
2967 token_estimate,
2968 token_start: 0,
2969 token_end: 0,
2970 page_start,
2971 page_end,
2972 embedding: embedding_map.remove(frame_id),
2973 });
2974 }
2975 Ok(chunks)
2976 }
2977}
2978
2979#[cfg(feature = "parallel_segments")]
2980fn estimate_tokens(text: &str) -> usize {
2981 text.split_whitespace().count().max(1)
2982}
2983
2984impl Memvid {
2985 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2986 let catalog_end = self.catalog_data_end();
2987 if catalog_end <= self.header.footer_offset {
2988 return Ok(false);
2989 }
2990 self.header.footer_offset = catalog_end;
2991 self.rewrite_toc_footer()?;
2992 self.header.toc_checksum = self.toc.toc_checksum;
2993 crate::persist_header(&mut self.file, &self.header)?;
2994 Ok(true)
2995 }
2996}
2997
2998impl Memvid {
2999 pub fn vacuum(&mut self) -> Result<()> {
3000 self.commit()?;
3001
3002 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3003 let frames: Vec<Frame> = self
3004 .toc
3005 .frames
3006 .iter()
3007 .filter(|frame| frame.status == FrameStatus::Active)
3008 .cloned()
3009 .collect();
3010 for frame in frames {
3011 let bytes = self.read_frame_payload_bytes(&frame)?;
3012 active_payloads.insert(frame.id, bytes);
3013 }
3014
3015 let mut cursor = self.header.wal_offset + self.header.wal_size;
3016 self.file.seek(SeekFrom::Start(cursor))?;
3017 for frame in &mut self.toc.frames {
3018 if frame.status == FrameStatus::Active {
3019 if let Some(bytes) = active_payloads.get(&frame.id) {
3020 self.file.write_all(bytes)?;
3021 frame.payload_offset = cursor;
3022 frame.payload_length = bytes.len() as u64;
3023 cursor += bytes.len() as u64;
3024 } else {
3025 frame.payload_offset = 0;
3026 frame.payload_length = 0;
3027 }
3028 } else {
3029 frame.payload_offset = 0;
3030 frame.payload_length = 0;
3031 }
3032 }
3033
3034 self.data_end = cursor;
3035
3036 self.toc.segments.clear();
3037 self.toc.indexes.lex_segments.clear();
3038 self.toc.segment_catalog.lex_segments.clear();
3039 self.toc.segment_catalog.vec_segments.clear();
3040 self.toc.segment_catalog.time_segments.clear();
3041 #[cfg(feature = "temporal_track")]
3042 {
3043 self.toc.temporal_track = None;
3044 self.toc.segment_catalog.temporal_segments.clear();
3045 }
3046 #[cfg(feature = "lex")]
3047 {
3048 self.toc.segment_catalog.tantivy_segments.clear();
3049 }
3050 #[cfg(feature = "parallel_segments")]
3051 {
3052 self.toc.segment_catalog.index_segments.clear();
3053 }
3054
3055 #[cfg(feature = "lex")]
3057 {
3058 self.tantivy = None;
3059 self.tantivy_dirty = false;
3060 }
3061
3062 self.rebuild_indexes(&[], &[])?;
3063 self.file.sync_all()?;
3064 Ok(())
3065 }
3066
3067 #[must_use]
3085 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3086 plan_document_chunks(payload).map(|plan| plan.chunks)
3087 }
3088
3089 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3091 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3092 }
3093
3094 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3096 self.put_internal(Some(payload), None, None, None, options, None)
3097 }
3098
3099 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3101 self.put_internal(
3102 Some(payload),
3103 None,
3104 Some(embedding),
3105 None,
3106 PutOptions::default(),
3107 None,
3108 )
3109 }
3110
3111 pub fn put_with_embedding_and_options(
3112 &mut self,
3113 payload: &[u8],
3114 embedding: Vec<f32>,
3115 options: PutOptions,
3116 ) -> Result<u64> {
3117 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3118 }
3119
3120 pub fn put_with_chunk_embeddings(
3133 &mut self,
3134 payload: &[u8],
3135 parent_embedding: Option<Vec<f32>>,
3136 chunk_embeddings: Vec<Vec<f32>>,
3137 options: PutOptions,
3138 ) -> Result<u64> {
3139 self.put_internal(
3140 Some(payload),
3141 None,
3142 parent_embedding,
3143 Some(chunk_embeddings),
3144 options,
3145 None,
3146 )
3147 }
3148
3149 pub fn update_frame(
3151 &mut self,
3152 frame_id: FrameId,
3153 payload: Option<Vec<u8>>,
3154 mut options: PutOptions,
3155 embedding: Option<Vec<f32>>,
3156 ) -> Result<u64> {
3157 self.ensure_mutation_allowed()?;
3158 let existing = self.frame_by_id(frame_id)?;
3159 if existing.status != FrameStatus::Active {
3160 return Err(MemvidError::InvalidFrame {
3161 frame_id,
3162 reason: "frame is not active",
3163 });
3164 }
3165
3166 if options.timestamp.is_none() {
3167 options.timestamp = Some(existing.timestamp);
3168 }
3169 if options.track.is_none() {
3170 options.track = existing.track.clone();
3171 }
3172 if options.kind.is_none() {
3173 options.kind = existing.kind.clone();
3174 }
3175 if options.uri.is_none() {
3176 options.uri = existing.uri.clone();
3177 }
3178 if options.title.is_none() {
3179 options.title = existing.title.clone();
3180 }
3181 if options.metadata.is_none() {
3182 options.metadata = existing.metadata.clone();
3183 }
3184 if options.search_text.is_none() {
3185 options.search_text = existing.search_text.clone();
3186 }
3187 if options.tags.is_empty() {
3188 options.tags = existing.tags.clone();
3189 }
3190 if options.labels.is_empty() {
3191 options.labels = existing.labels.clone();
3192 }
3193 if options.extra_metadata.is_empty() {
3194 options.extra_metadata = existing.extra_metadata.clone();
3195 }
3196
3197 let reuse_frame = if payload.is_none() {
3198 options.auto_tag = false;
3199 options.extract_dates = false;
3200 Some(existing.clone())
3201 } else {
3202 None
3203 };
3204
3205 let effective_embedding = if let Some(explicit) = embedding {
3206 Some(explicit)
3207 } else if self.vec_enabled {
3208 self.frame_embedding(frame_id)?
3209 } else {
3210 None
3211 };
3212
3213 let payload_slice = payload.as_deref();
3214 let reuse_flag = reuse_frame.is_some();
3215 let replace_flag = payload_slice.is_some();
3216 let seq = self.put_internal(
3217 payload_slice,
3218 reuse_frame,
3219 effective_embedding,
3220 None, options,
3222 Some(frame_id),
3223 )?;
3224 info!(
3225 "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3226 );
3227 Ok(seq)
3228 }
3229
3230 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3231 self.ensure_mutation_allowed()?;
3232 let frame = self.frame_by_id(frame_id)?;
3233 if frame.status != FrameStatus::Active {
3234 return Err(MemvidError::InvalidFrame {
3235 frame_id,
3236 reason: "frame is not active",
3237 });
3238 }
3239
3240 let mut tombstone = WalEntryData {
3241 timestamp: SystemTime::now()
3242 .duration_since(UNIX_EPOCH)
3243 .map(|d| d.as_secs() as i64)
3244 .unwrap_or(frame.timestamp),
3245 kind: None,
3246 track: None,
3247 payload: Vec::new(),
3248 embedding: None,
3249 uri: frame.uri.clone(),
3250 title: frame.title.clone(),
3251 canonical_encoding: frame.canonical_encoding,
3252 canonical_length: frame.canonical_length,
3253 metadata: None,
3254 search_text: None,
3255 tags: Vec::new(),
3256 labels: Vec::new(),
3257 extra_metadata: BTreeMap::new(),
3258 content_dates: Vec::new(),
3259 chunk_manifest: None,
3260 role: frame.role,
3261 parent_sequence: None,
3262 chunk_index: frame.chunk_index,
3263 chunk_count: frame.chunk_count,
3264 op: FrameWalOp::Tombstone,
3265 target_frame_id: Some(frame_id),
3266 supersedes_frame_id: None,
3267 reuse_payload_from: None,
3268 source_sha256: None,
3269 source_path: None,
3270 enrichment_state: crate::types::EnrichmentState::default(),
3271 };
3272 tombstone.kind = frame.kind.clone();
3273 tombstone.track = frame.track.clone();
3274
3275 let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3276 let seq = self.append_wal_entry(&payload_bytes)?;
3277 self.dirty = true;
3278 let suppress_checkpoint = self
3279 .batch_opts
3280 .as_ref()
3281 .is_some_and(|o| o.disable_auto_checkpoint);
3282 if !suppress_checkpoint && self.wal.should_checkpoint() {
3283 self.commit()?;
3284 }
3285 info!("frame_delete frame_id={frame_id} seq={seq}");
3286 Ok(seq)
3287 }
3288}
3289
3290impl Memvid {
3291 fn put_internal(
3292 &mut self,
3293 payload: Option<&[u8]>,
3294 reuse_frame: Option<Frame>,
3295 embedding: Option<Vec<f32>>,
3296 chunk_embeddings: Option<Vec<Vec<f32>>>,
3297 mut options: PutOptions,
3298 supersedes: Option<FrameId>,
3299 ) -> Result<u64> {
3300 self.ensure_mutation_allowed()?;
3301
3302 if options.dedup {
3304 if let Some(bytes) = payload {
3305 let content_hash = hash(bytes);
3306 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3307 tracing::debug!(
3309 frame_id = existing_frame.id,
3310 "dedup: skipping ingestion, identical content already exists"
3311 );
3312 return Ok(existing_frame.id);
3314 }
3315 }
3316 }
3317
3318 if payload.is_some() && reuse_frame.is_some() {
3319 let frame_id = reuse_frame
3320 .as_ref()
3321 .map(|frame| frame.id)
3322 .unwrap_or_default();
3323 return Err(MemvidError::InvalidFrame {
3324 frame_id,
3325 reason: "cannot reuse payload when bytes are provided",
3326 });
3327 }
3328
3329 let incoming_dimension = {
3332 let mut dim: Option<u32> = None;
3333
3334 if let Some(ref vector) = embedding {
3335 if !vector.is_empty() {
3336 #[allow(clippy::cast_possible_truncation)]
3337 let len = vector.len() as u32;
3338 dim = Some(len);
3339 }
3340 }
3341
3342 if let Some(ref vectors) = chunk_embeddings {
3343 for vector in vectors {
3344 if vector.is_empty() {
3345 continue;
3346 }
3347 let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3348 match dim {
3349 None => dim = Some(vec_dim),
3350 Some(existing) if existing == vec_dim => {}
3351 Some(existing) => {
3352 return Err(MemvidError::VecDimensionMismatch {
3353 expected: existing,
3354 actual: vector.len(),
3355 });
3356 }
3357 }
3358 }
3359 }
3360
3361 dim
3362 };
3363
3364 if let Some(incoming_dimension) = incoming_dimension {
3365 if !self.vec_enabled {
3367 self.enable_vec()?;
3368 }
3369
3370 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3371 if existing_dimension != incoming_dimension {
3372 return Err(MemvidError::VecDimensionMismatch {
3373 expected: existing_dimension,
3374 actual: incoming_dimension as usize,
3375 });
3376 }
3377 }
3378
3379 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3381 if manifest.dimension == 0 {
3382 manifest.dimension = incoming_dimension;
3383 }
3384 }
3385 }
3386
3387 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3388 let payload_tail = self.payload_region_end();
3389 let projected = if let Some(bytes) = payload {
3390 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3391 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3392 } else {
3393 prepare_canonical_payload(bytes)?
3394 };
3395 let len = prepared.len();
3396 prepared_payload = Some((prepared, encoding, length));
3397 payload_tail.saturating_add(len as u64)
3398 } else if reuse_frame.is_some() {
3399 payload_tail
3400 } else {
3401 return Err(MemvidError::InvalidFrame {
3402 frame_id: 0,
3403 reason: "payload required for frame insertion",
3404 });
3405 };
3406
3407 let capacity_limit = self.capacity_limit();
3408 if projected > capacity_limit {
3409 let incoming_size = projected.saturating_sub(payload_tail);
3410 return Err(MemvidError::CapacityExceeded {
3411 current: payload_tail,
3412 limit: capacity_limit,
3413 required: incoming_size,
3414 });
3415 }
3416 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3417 SystemTime::now()
3418 .duration_since(UNIX_EPOCH)
3419 .map(|d| d.as_secs() as i64)
3420 .unwrap_or(0)
3421 });
3422
3423 #[allow(unused_assignments)]
3424 let mut reuse_bytes: Option<Vec<u8>> = None;
3425 let payload_for_processing = if let Some(bytes) = payload {
3426 Some(bytes)
3427 } else if let Some(frame) = reuse_frame.as_ref() {
3428 let bytes = self.frame_canonical_bytes(frame)?;
3429 reuse_bytes = Some(bytes);
3430 reuse_bytes.as_deref()
3431 } else {
3432 None
3433 };
3434
3435 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3437 (Some(bytes), None) => plan_document_chunks(bytes),
3438 _ => None,
3439 };
3440
3441 let mut source_sha256: Option<[u8; 32]> = None;
3445 let source_path_value = options.source_path.take();
3446
3447 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3448 if raw_chunk_plan.is_some() {
3449 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3451 } else if options.no_raw {
3452 if let Some(bytes) = payload {
3454 let hash_result = hash(bytes);
3456 source_sha256 = Some(*hash_result.as_bytes());
3457 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3459 } else {
3460 return Err(MemvidError::InvalidFrame {
3461 frame_id: 0,
3462 reason: "payload required for --no-raw mode",
3463 });
3464 }
3465 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3466 (prepared, encoding, length, None)
3467 } else if let Some(bytes) = payload {
3468 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3469 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3470 } else {
3471 prepare_canonical_payload(bytes)?
3472 };
3473 (prepared, encoding, length, None)
3474 } else if let Some(frame) = reuse_frame.as_ref() {
3475 (
3476 Vec::new(),
3477 frame.canonical_encoding,
3478 frame.canonical_length,
3479 Some(frame.id),
3480 )
3481 } else {
3482 return Err(MemvidError::InvalidFrame {
3483 frame_id: 0,
3484 reason: "payload required for frame insertion",
3485 });
3486 };
3487
3488 let mut chunk_plan = raw_chunk_plan;
3490
3491 let mut metadata = options.metadata.take();
3492 let mut search_text = options
3493 .search_text
3494 .take()
3495 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3496 let mut tags = std::mem::take(&mut options.tags);
3497 let mut labels = std::mem::take(&mut options.labels);
3498 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3499 let mut content_dates: Vec<String> = Vec::new();
3500
3501 let need_search_text = search_text
3502 .as_ref()
3503 .is_none_or(|text| text.trim().is_empty());
3504 let need_metadata = metadata.is_none();
3505 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3506
3507 let mut extraction_error = None;
3508 let mut is_skim_extraction = false; let extracted = if run_extractor {
3511 if let Some(bytes) = payload_for_processing {
3512 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3513 let uri_hint = options.uri.as_deref();
3514
3515 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3517
3518 if use_budgeted {
3519 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3521 options.extraction_budget_ms,
3522 );
3523 match crate::extract_budgeted::extract_with_budget(
3524 bytes, mime_hint, uri_hint, budget,
3525 ) {
3526 Ok(result) => {
3527 is_skim_extraction = result.is_skim();
3528 if is_skim_extraction {
3529 tracing::debug!(
3530 coverage = result.coverage,
3531 elapsed_ms = result.elapsed_ms,
3532 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3533 "time-budgeted extraction (skim)"
3534 );
3535 }
3536 let doc = crate::extract::ExtractedDocument {
3538 text: if result.text.is_empty() {
3539 None
3540 } else {
3541 Some(result.text)
3542 },
3543 metadata: serde_json::json!({
3544 "skim": is_skim_extraction,
3545 "coverage": result.coverage,
3546 "sections_extracted": result.sections_extracted,
3547 "sections_total": result.sections_total,
3548 }),
3549 mime_type: mime_hint.map(std::string::ToString::to_string),
3550 };
3551 Some(doc)
3552 }
3553 Err(err) => {
3554 tracing::warn!(
3556 ?err,
3557 "budgeted extraction failed, trying full extraction"
3558 );
3559 match extract_via_registry(bytes, mime_hint, uri_hint) {
3560 Ok(doc) => Some(doc),
3561 Err(err) => {
3562 extraction_error = Some(err);
3563 None
3564 }
3565 }
3566 }
3567 }
3568 } else {
3569 match extract_via_registry(bytes, mime_hint, uri_hint) {
3571 Ok(doc) => Some(doc),
3572 Err(err) => {
3573 extraction_error = Some(err);
3574 None
3575 }
3576 }
3577 }
3578 } else {
3579 None
3580 }
3581 } else {
3582 None
3583 };
3584
3585 if let Some(err) = extraction_error {
3586 return Err(err);
3587 }
3588
3589 if let Some(doc) = &extracted {
3590 if need_search_text {
3591 if let Some(text) = &doc.text {
3592 if let Some(normalized) =
3593 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3594 {
3595 search_text = Some(normalized);
3596 }
3597 }
3598 }
3599
3600 if chunk_plan.is_none() {
3603 if let Some(text) = &doc.text {
3604 chunk_plan = plan_text_chunks(text);
3605 }
3606 }
3607
3608 if let Some(mime) = doc.mime_type.as_ref() {
3609 if let Some(existing) = &mut metadata {
3610 if existing.mime.is_none() {
3611 existing.mime = Some(mime.clone());
3612 }
3613 } else {
3614 let mut doc_meta = DocMetadata::default();
3615 doc_meta.mime = Some(mime.clone());
3616 metadata = Some(doc_meta);
3617 }
3618 }
3619
3620 if options.auto_tag {
3623 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3624 {
3625 extra_metadata
3626 .entry("extractous_metadata".to_string())
3627 .or_insert(meta_json);
3628 }
3629 }
3630 }
3631
3632 if options.auto_tag {
3633 if let Some(ref text) = search_text {
3634 if !text.trim().is_empty() {
3635 let result = AutoTagger.analyse(text, options.extract_dates);
3636 merge_unique(&mut tags, result.tags);
3637 merge_unique(&mut labels, result.labels);
3638 if options.extract_dates && content_dates.is_empty() {
3639 content_dates = result.content_dates;
3640 }
3641 }
3642 }
3643 }
3644
3645 if content_dates.is_empty() {
3646 if let Some(frame) = reuse_frame.as_ref() {
3647 content_dates = frame.content_dates.clone();
3648 }
3649 }
3650
3651 let metadata_ref = metadata.as_ref();
3652 let mut search_text = augment_search_text(
3653 search_text,
3654 options.uri.as_deref(),
3655 options.title.as_deref(),
3656 options.track.as_deref(),
3657 &tags,
3658 &labels,
3659 &extra_metadata,
3660 &content_dates,
3661 metadata_ref,
3662 );
3663 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3664 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3665 let mut parent_chunk_count: Option<u32> = None;
3666
3667 let kind_value = options.kind.take();
3668 let track_value = options.track.take();
3669 let uri_value = options.uri.take();
3670 let title_value = options.title.take();
3671 let should_extract_triplets = options.extract_triplets;
3672 let triplet_uri = uri_value.clone();
3674 let triplet_title = title_value.clone();
3675
3676 if let Some(plan) = chunk_plan.as_ref() {
3677 let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3678 parent_chunk_manifest = Some(plan.manifest.clone());
3679 parent_chunk_count = Some(chunk_total);
3680
3681 if let Some(first_chunk) = plan.chunks.first() {
3682 if let Some(normalized) =
3683 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3684 {
3685 if !normalized.trim().is_empty() {
3686 search_text = Some(normalized);
3687 }
3688 }
3689 }
3690
3691 let chunk_tags = tags.clone();
3692 let chunk_labels = labels.clone();
3693 let chunk_metadata = metadata.clone();
3694 let chunk_extra_metadata = extra_metadata.clone();
3695 let chunk_content_dates = content_dates.clone();
3696
3697 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3698 let (chunk_payload, chunk_encoding, chunk_length) =
3699 prepare_canonical_payload(chunk_text.as_bytes())?;
3700 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3701 .map(|n| n.text)
3702 .filter(|text| !text.trim().is_empty());
3703
3704 let chunk_uri = uri_value
3705 .as_ref()
3706 .map(|uri| format!("{uri}#page-{}", idx + 1));
3707 let chunk_title = title_value
3708 .as_ref()
3709 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3710
3711 let chunk_embedding = chunk_embeddings
3713 .as_ref()
3714 .and_then(|embeddings| embeddings.get(idx).cloned());
3715
3716 chunk_entries.push(WalEntryData {
3717 timestamp,
3718 kind: kind_value.clone(),
3719 track: track_value.clone(),
3720 payload: chunk_payload,
3721 embedding: chunk_embedding,
3722 uri: chunk_uri,
3723 title: chunk_title,
3724 canonical_encoding: chunk_encoding,
3725 canonical_length: chunk_length,
3726 metadata: chunk_metadata.clone(),
3727 search_text: chunk_search_text,
3728 tags: chunk_tags.clone(),
3729 labels: chunk_labels.clone(),
3730 extra_metadata: chunk_extra_metadata.clone(),
3731 content_dates: chunk_content_dates.clone(),
3732 chunk_manifest: None,
3733 role: FrameRole::DocumentChunk,
3734 parent_sequence: None,
3735 chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3736 chunk_count: Some(chunk_total),
3737 op: FrameWalOp::Insert,
3738 target_frame_id: None,
3739 supersedes_frame_id: None,
3740 reuse_payload_from: None,
3741 source_sha256: None, source_path: None,
3743 enrichment_state: crate::types::EnrichmentState::Enriched,
3745 });
3746 }
3747 }
3748
3749 let parent_uri = uri_value.clone();
3750 let parent_title = title_value.clone();
3751
3752 let parent_sequence = if let Some(parent_id) = options.parent_id {
3755 usize::try_from(parent_id)
3760 .ok()
3761 .and_then(|idx| self.toc.frames.get(idx))
3762 .map(|_| parent_id + 2) } else {
3764 None
3765 };
3766
3767 let triplet_text = search_text.clone();
3769
3770 #[cfg(feature = "lex")]
3772 let instant_index_tags = if options.instant_index {
3773 tags.clone()
3774 } else {
3775 Vec::new()
3776 };
3777 #[cfg(feature = "lex")]
3778 let instant_index_labels = if options.instant_index {
3779 labels.clone()
3780 } else {
3781 Vec::new()
3782 };
3783
3784 #[cfg(feature = "lex")]
3786 let needs_enrichment =
3787 options.instant_index && (options.enable_embedding || is_skim_extraction);
3788 #[cfg(feature = "lex")]
3789 let enrichment_state = if needs_enrichment {
3790 crate::types::EnrichmentState::Searchable
3791 } else {
3792 crate::types::EnrichmentState::Enriched
3793 };
3794 #[cfg(not(feature = "lex"))]
3795 let enrichment_state = crate::types::EnrichmentState::Enriched;
3796
3797 let entry = WalEntryData {
3798 timestamp,
3799 kind: kind_value,
3800 track: track_value,
3801 payload: storage_payload,
3802 embedding,
3803 uri: parent_uri,
3804 title: parent_title,
3805 canonical_encoding,
3806 canonical_length,
3807 metadata,
3808 search_text,
3809 tags,
3810 labels,
3811 extra_metadata,
3812 content_dates,
3813 chunk_manifest: parent_chunk_manifest,
3814 role: options.role,
3815 parent_sequence,
3816 chunk_index: None,
3817 chunk_count: parent_chunk_count,
3818 op: FrameWalOp::Insert,
3819 target_frame_id: None,
3820 supersedes_frame_id: supersedes,
3821 reuse_payload_from,
3822 source_sha256,
3823 source_path: source_path_value,
3824 enrichment_state,
3825 };
3826
3827 let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3828 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3829 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3830
3831 #[cfg(feature = "lex")]
3834 if options.instant_index && self.tantivy.is_some() {
3835 let frame_id = parent_seq as FrameId;
3837
3838 if let Some(ref text) = triplet_text {
3840 if !text.trim().is_empty() {
3841 let temp_frame = Frame {
3843 id: frame_id,
3844 timestamp,
3845 anchor_ts: None,
3846 anchor_source: None,
3847 kind: options.kind.clone(),
3848 track: options.track.clone(),
3849 payload_offset: 0,
3850 payload_length: 0,
3851 checksum: [0u8; 32],
3852 uri: options
3853 .uri
3854 .clone()
3855 .or_else(|| Some(crate::default_uri(frame_id))),
3856 title: options.title.clone(),
3857 canonical_encoding: crate::types::CanonicalEncoding::default(),
3858 canonical_length: None,
3859 metadata: None, search_text: triplet_text.clone(),
3861 tags: instant_index_tags.clone(),
3862 labels: instant_index_labels.clone(),
3863 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3866 role: options.role,
3867 parent_id: None,
3868 chunk_index: None,
3869 chunk_count: None,
3870 status: FrameStatus::Active,
3871 supersedes,
3872 superseded_by: None,
3873 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3876 };
3877
3878 if let Some(engine) = self.tantivy.as_mut() {
3880 engine.add_frame(&temp_frame, text)?;
3881 engine.soft_commit()?;
3882 self.tantivy_dirty = true;
3883
3884 tracing::debug!(
3885 frame_id = frame_id,
3886 "instant index: frame searchable immediately"
3887 );
3888 }
3889 }
3890 }
3891 }
3892
3893 #[cfg(feature = "lex")]
3897 if needs_enrichment {
3898 let frame_id = parent_seq as FrameId;
3899 self.toc.enrichment_queue.push(frame_id);
3900 tracing::debug!(
3901 frame_id = frame_id,
3902 is_skim = is_skim_extraction,
3903 needs_embedding = options.enable_embedding,
3904 "queued frame for background enrichment"
3905 );
3906 }
3907
3908 for mut chunk_entry in chunk_entries {
3909 chunk_entry.parent_sequence = Some(parent_seq);
3910 let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3911 self.append_wal_entry(&chunk_bytes)?;
3912 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3913 }
3914
3915 self.dirty = true;
3916 let suppress_checkpoint = self
3917 .batch_opts
3918 .as_ref()
3919 .is_some_and(|o| o.disable_auto_checkpoint);
3920 if !suppress_checkpoint && self.wal.should_checkpoint() {
3921 self.commit()?;
3922 }
3923
3924 #[cfg(feature = "replay")]
3926 if let Some(input_bytes) = payload {
3927 self.record_put_action(parent_seq, input_bytes);
3928 }
3929
3930 if should_extract_triplets {
3933 if let Some(ref text) = triplet_text {
3934 if !text.trim().is_empty() {
3935 let extractor = TripletExtractor::default();
3936 let frame_id = parent_seq as FrameId;
3937 let (cards, _stats) = extractor.extract(
3938 frame_id,
3939 text,
3940 triplet_uri.as_deref(),
3941 triplet_title.as_deref(),
3942 timestamp,
3943 );
3944
3945 if !cards.is_empty() {
3946 let card_ids = self.memories_track.add_cards(cards);
3948
3949 self.memories_track
3951 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3952 }
3953 }
3954 }
3955 }
3956
3957 Ok(parent_seq)
3958 }
3959}
3960
3961#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3962#[serde(rename_all = "snake_case")]
3963pub(crate) enum FrameWalOp {
3964 Insert,
3965 Tombstone,
3966}
3967
3968impl Default for FrameWalOp {
3969 fn default() -> Self {
3970 Self::Insert
3971 }
3972}
3973
3974#[derive(Debug, Serialize, Deserialize)]
3975enum WalEntry {
3976 Frame(WalEntryData),
3977 #[cfg(feature = "lex")]
3978 Lex(LexWalBatch),
3979}
3980
3981fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3982 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3983 return Ok(entry);
3984 }
3985 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3986 Ok(WalEntry::Frame(legacy))
3987}
3988
3989#[derive(Debug, Serialize, Deserialize)]
3990pub(crate) struct WalEntryData {
3991 pub(crate) timestamp: i64,
3992 pub(crate) kind: Option<String>,
3993 pub(crate) track: Option<String>,
3994 pub(crate) payload: Vec<u8>,
3995 pub(crate) embedding: Option<Vec<f32>>,
3996 #[serde(default)]
3997 pub(crate) uri: Option<String>,
3998 #[serde(default)]
3999 pub(crate) title: Option<String>,
4000 #[serde(default)]
4001 pub(crate) canonical_encoding: CanonicalEncoding,
4002 #[serde(default)]
4003 pub(crate) canonical_length: Option<u64>,
4004 #[serde(default)]
4005 pub(crate) metadata: Option<DocMetadata>,
4006 #[serde(default)]
4007 pub(crate) search_text: Option<String>,
4008 #[serde(default)]
4009 pub(crate) tags: Vec<String>,
4010 #[serde(default)]
4011 pub(crate) labels: Vec<String>,
4012 #[serde(default)]
4013 pub(crate) extra_metadata: BTreeMap<String, String>,
4014 #[serde(default)]
4015 pub(crate) content_dates: Vec<String>,
4016 #[serde(default)]
4017 pub(crate) chunk_manifest: Option<TextChunkManifest>,
4018 #[serde(default)]
4019 pub(crate) role: FrameRole,
4020 #[serde(default)]
4021 pub(crate) parent_sequence: Option<u64>,
4022 #[serde(default)]
4023 pub(crate) chunk_index: Option<u32>,
4024 #[serde(default)]
4025 pub(crate) chunk_count: Option<u32>,
4026 #[serde(default)]
4027 pub(crate) op: FrameWalOp,
4028 #[serde(default)]
4029 pub(crate) target_frame_id: Option<FrameId>,
4030 #[serde(default)]
4031 pub(crate) supersedes_frame_id: Option<FrameId>,
4032 #[serde(default)]
4033 pub(crate) reuse_payload_from: Option<FrameId>,
4034 #[serde(default)]
4036 pub(crate) source_sha256: Option<[u8; 32]>,
4037 #[serde(default)]
4039 pub(crate) source_path: Option<String>,
4040 #[serde(default)]
4042 pub(crate) enrichment_state: crate::types::EnrichmentState,
4043}
4044
4045pub(crate) fn prepare_canonical_payload(
4046 payload: &[u8],
4047) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4048 prepare_canonical_payload_with_level(payload, 3)
4049}
4050
4051pub(crate) fn prepare_canonical_payload_with_level(
4052 payload: &[u8],
4053 level: i32,
4054) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4055 if level == 0 {
4056 return Ok((
4058 payload.to_vec(),
4059 CanonicalEncoding::Plain,
4060 Some(payload.len() as u64),
4061 ));
4062 }
4063 if std::str::from_utf8(payload).is_ok() {
4064 let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4065 Ok((
4066 compressed,
4067 CanonicalEncoding::Zstd,
4068 Some(payload.len() as u64),
4069 ))
4070 } else {
4071 Ok((
4072 payload.to_vec(),
4073 CanonicalEncoding::Plain,
4074 Some(payload.len() as u64),
4075 ))
4076 }
4077}
4078
4079pub(crate) fn augment_search_text(
4080 base: Option<String>,
4081 uri: Option<&str>,
4082 title: Option<&str>,
4083 track: Option<&str>,
4084 tags: &[String],
4085 labels: &[String],
4086 extra_metadata: &BTreeMap<String, String>,
4087 content_dates: &[String],
4088 metadata: Option<&DocMetadata>,
4089) -> Option<String> {
4090 let mut segments: Vec<String> = Vec::new();
4091 if let Some(text) = base {
4092 let trimmed = text.trim();
4093 if !trimmed.is_empty() {
4094 segments.push(trimmed.to_string());
4095 }
4096 }
4097
4098 if let Some(title) = title {
4099 if !title.trim().is_empty() {
4100 segments.push(format!("title: {}", title.trim()));
4101 }
4102 }
4103
4104 if let Some(uri) = uri {
4105 if !uri.trim().is_empty() {
4106 segments.push(format!("uri: {}", uri.trim()));
4107 }
4108 }
4109
4110 if let Some(track) = track {
4111 if !track.trim().is_empty() {
4112 segments.push(format!("track: {}", track.trim()));
4113 }
4114 }
4115
4116 if !tags.is_empty() {
4117 segments.push(format!("tags: {}", tags.join(" ")));
4118 }
4119
4120 if !labels.is_empty() {
4121 segments.push(format!("labels: {}", labels.join(" ")));
4122 }
4123
4124 if !extra_metadata.is_empty() {
4125 for (key, value) in extra_metadata {
4126 if value.trim().is_empty() {
4127 continue;
4128 }
4129 segments.push(format!("{key}: {value}"));
4130 }
4131 }
4132
4133 if !content_dates.is_empty() {
4134 segments.push(format!("dates: {}", content_dates.join(" ")));
4135 }
4136
4137 if let Some(meta) = metadata {
4138 if let Ok(meta_json) = serde_json::to_string(meta) {
4139 segments.push(format!("metadata: {meta_json}"));
4140 }
4141 }
4142
4143 if segments.is_empty() {
4144 None
4145 } else {
4146 Some(segments.join("\n"))
4147 }
4148}
4149
4150pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4151 if additions.is_empty() {
4152 return;
4153 }
4154 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4155 for value in additions {
4156 let trimmed = value.trim();
4157 if trimmed.is_empty() {
4158 continue;
4159 }
4160 let candidate = trimmed.to_string();
4161 if seen.insert(candidate.clone()) {
4162 target.push(candidate);
4163 }
4164 }
4165}