Skip to main content

memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
58    SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    #[must_use]
210    pub fn new(mode: CommitMode) -> Self {
211        Self {
212            mode,
213            background: false,
214        }
215    }
216
217    #[must_use]
218    pub fn background(mut self, background: bool) -> Self {
219        self.background = background;
220        self
221    }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226    REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230    mime: Option<&str>,
231    magic: Option<&[u8]>,
232    uri: Option<&str>,
233) -> Option<DocumentFormat> {
234    // Check PDF magic bytes first
235    if detect_pdf_magic(magic) {
236        return Some(DocumentFormat::Pdf);
237    }
238
239    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
240    // so we need to check file extension to distinguish them
241    if let Some(magic_bytes) = magic {
242        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243            // It's a ZIP file - check extension to determine OOXML type
244            if let Some(format) = infer_format_from_extension(uri) {
245                return Some(format);
246            }
247        }
248    }
249
250    // Try MIME type
251    if let Some(mime_str) = mime {
252        let mime_lower = mime_str.trim().to_ascii_lowercase();
253        let format = match mime_lower.as_str() {
254            "application/pdf" => Some(DocumentFormat::Pdf),
255            "text/plain" => Some(DocumentFormat::PlainText),
256            "text/markdown" => Some(DocumentFormat::Markdown),
257            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259                Some(DocumentFormat::Docx)
260            }
261            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262                Some(DocumentFormat::Xlsx)
263            }
264            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266                Some(DocumentFormat::Pptx)
267            }
268            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269            _ => None,
270        };
271        if format.is_some() {
272            return format;
273        }
274    }
275
276    // Fall back to extension-based detection
277    infer_format_from_extension(uri)
278}
279
280/// Infer document format from file extension in URI/path
281fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282    let uri = uri?;
283    let path = std::path::Path::new(uri);
284    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285    match ext.as_str() {
286        "pdf" => Some(DocumentFormat::Pdf),
287        "docx" => Some(DocumentFormat::Docx),
288        "xlsx" => Some(DocumentFormat::Xlsx),
289        "xls" => Some(DocumentFormat::Xls),
290        "pptx" => Some(DocumentFormat::Pptx),
291        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294        | "sql" => Some(DocumentFormat::PlainText),
295        "md" | "markdown" => Some(DocumentFormat::Markdown),
296        "html" | "htm" => Some(DocumentFormat::Html),
297        _ => None,
298    }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302    let mut slice = match magic {
303        Some(slice) if !slice.is_empty() => slice,
304        _ => return false,
305    };
306    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307        slice = &slice[3..];
308    }
309    while let Some((first, rest)) = slice.split_first() {
310        if first.is_ascii_whitespace() {
311            slice = rest;
312        } else {
313            break;
314        }
315    }
316    slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320    target = "memvid::extract",
321    skip_all,
322    fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325    bytes: &[u8],
326    mime_hint: Option<&str>,
327    uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329    let registry = default_reader_registry();
330    let magic = bytes
331        .get(..MAGIC_SNIFF_BYTES)
332        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334        .with_uri(uri)
335        .with_magic(magic);
336
337    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338        let start = Instant::now();
339        match reader.extract(bytes, &hint) {
340            Ok(output) => {
341                return Ok(finalize_reader_output(output, start));
342            }
343            Err(err) => {
344                tracing::error!(
345                    target = "memvid::extract",
346                    reader = reader.name(),
347                    error = %err,
348                    "reader failed; falling back"
349                );
350                Some(format!("reader {} failed: {err}", reader.name()))
351            }
352        }
353    } else {
354        tracing::debug!(
355            target = "memvid::extract",
356            format = hint.format.map(super::super::reader::DocumentFormat::label),
357            "no reader matched; using default extractor"
358        );
359        Some("no registered reader matched; using default extractor".to_string())
360    };
361
362    let start = Instant::now();
363    let mut output = PassthroughReader.extract(bytes, &hint)?;
364    if let Some(reason) = fallback_reason {
365        output.diagnostics.track_warning(reason);
366    }
367    Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371    let elapsed = start.elapsed();
372    let ReaderOutput {
373        document,
374        reader_name,
375        diagnostics,
376    } = output;
377    log_reader_result(&reader_name, &diagnostics, elapsed);
378    document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382    let duration_ms = diagnostics
383        .duration_ms
384        .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385    let warnings = diagnostics.warnings.len();
386    let pages = diagnostics.pages_processed;
387
388    if warnings > 0 || diagnostics.fallback {
389        tracing::warn!(
390            target = "memvid::extract",
391            reader,
392            duration_ms,
393            pages,
394            warnings,
395            fallback = diagnostics.fallback,
396            "extraction completed with warnings"
397        );
398        for warning in &diagnostics.warnings {
399            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400        }
401    } else {
402        tracing::info!(
403            target = "memvid::extract",
404            reader,
405            duration_ms,
406            pages,
407            "extraction completed"
408        );
409    }
410}
411
412impl Memvid {
413    // -- Public ingestion entrypoints ---------------------------------------------------------
414
415    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416    where
417        F: FnOnce(&mut Self) -> Result<()>,
418    {
419        self.file.sync_all()?;
420        let mut staging = CommitStaging::prepare(self.path())?;
421        staging.copy_from(&self.file)?;
422
423        let staging_handle = staging.clone_file()?;
424        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425        let original_file = std::mem::replace(&mut self.file, staging_handle);
426        let original_wal = std::mem::replace(&mut self.wal, new_wal);
427        let original_header = self.header.clone();
428        let original_toc = self.toc.clone();
429        let original_data_end = self.data_end;
430        let original_generation = self.generation;
431        let original_dirty = self.dirty;
432        #[cfg(feature = "lex")]
433        let original_tantivy_dirty = self.tantivy_dirty;
434
435        let destination_path = self.path().to_path_buf();
436        let mut original_file = Some(original_file);
437        let mut original_wal = Some(original_wal);
438
439        match op(self) {
440            Ok(()) => {
441                self.file.sync_all()?;
442                match staging.commit() {
443                    Ok(()) => {
444                        drop(original_file.take());
445                        drop(original_wal.take());
446                        self.file = OpenOptions::new()
447                            .read(true)
448                            .write(true)
449                            .open(&destination_path)?;
450                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451                        Ok(())
452                    }
453                    Err(commit_err) => {
454                        if let Some(file) = original_file.take() {
455                            self.file = file;
456                        }
457                        if let Some(wal) = original_wal.take() {
458                            self.wal = wal;
459                        }
460                        self.header = original_header;
461                        self.toc = original_toc;
462                        self.data_end = original_data_end;
463                        self.generation = original_generation;
464                        self.dirty = original_dirty;
465                        #[cfg(feature = "lex")]
466                        {
467                            self.tantivy_dirty = original_tantivy_dirty;
468                        }
469                        Err(commit_err)
470                    }
471                }
472            }
473            Err(err) => {
474                let _ = staging.discard();
475                if let Some(file) = original_file.take() {
476                    self.file = file;
477                }
478                if let Some(wal) = original_wal.take() {
479                    self.wal = wal;
480                }
481                self.header = original_header;
482                self.toc = original_toc;
483                self.data_end = original_data_end;
484                self.generation = original_generation;
485                self.dirty = original_dirty;
486                #[cfg(feature = "lex")]
487                {
488                    self.tantivy_dirty = original_tantivy_dirty;
489                }
490                Err(err)
491            }
492        }
493    }
494
495    pub(crate) fn catalog_data_end(&self) -> u64 {
496        let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498        for descriptor in &self.toc.segment_catalog.lex_segments {
499            if descriptor.common.bytes_length == 0 {
500                continue;
501            }
502            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503        }
504
505        for descriptor in &self.toc.segment_catalog.vec_segments {
506            if descriptor.common.bytes_length == 0 {
507                continue;
508            }
509            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510        }
511
512        for descriptor in &self.toc.segment_catalog.time_segments {
513            if descriptor.common.bytes_length == 0 {
514                continue;
515            }
516            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517        }
518
519        #[cfg(feature = "temporal_track")]
520        for descriptor in &self.toc.segment_catalog.temporal_segments {
521            if descriptor.common.bytes_length == 0 {
522                continue;
523            }
524            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525        }
526
527        #[cfg(feature = "lex")]
528        for descriptor in &self.toc.segment_catalog.tantivy_segments {
529            if descriptor.common.bytes_length == 0 {
530                continue;
531            }
532            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533        }
534
535        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536            if manifest.bytes_length != 0 {
537                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538            }
539        }
540
541        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542            if manifest.bytes_length != 0 {
543                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544            }
545        }
546
547        if let Some(manifest) = self.toc.time_index.as_ref() {
548            if manifest.bytes_length != 0 {
549                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550            }
551        }
552
553        #[cfg(feature = "temporal_track")]
554        if let Some(track) = self.toc.temporal_track.as_ref() {
555            if track.bytes_length != 0 {
556                max_end = max_end.max(track.bytes_offset + track.bytes_length);
557            }
558        }
559
560        max_end
561    }
562
563    fn payload_region_end(&self) -> u64 {
564        let wal_region_end = self.header.wal_offset + self.header.wal_size;
565        let frames_with_payload: Vec<_> = self
566            .toc
567            .frames
568            .iter()
569            .filter(|frame| frame.payload_length != 0)
570            .collect();
571
572        tracing::info!(
573            "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
574            frames_with_payload.len(),
575            self.toc.frames.len(),
576            wal_region_end
577        );
578
579        for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
580            tracing::info!(
581                "  frame[{}]: id={} offset={} length={} status={:?}",
582                idx,
583                frame.id,
584                frame.payload_offset,
585                frame.payload_length,
586                frame.status
587            );
588        }
589
590        let result = frames_with_payload
591            .iter()
592            .fold(wal_region_end, |max_end, frame| {
593                match frame.payload_offset.checked_add(frame.payload_length) {
594                    Some(end) => max_end.max(end),
595                    None => max_end,
596                }
597            });
598
599        tracing::info!("payload_region_end: returning {}", result);
600        result
601    }
602
603    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
604        loop {
605            match self.wal.append_entry(payload) {
606                Ok(seq) => return Ok(seq),
607                Err(MemvidError::CheckpointFailed { reason })
608                    if reason == "embedded WAL region too small for entry"
609                        || reason == "embedded WAL region full" =>
610                {
611                    // WAL is either too small for this entry or full with pending entries.
612                    // Grow the WAL to accommodate - doubling ensures we have space.
613                    let required = WAL_ENTRY_HEADER_SIZE
614                        .saturating_add(payload.len() as u64)
615                        .max(self.header.wal_size + 1);
616                    self.grow_wal_region(required)?;
617                }
618                Err(err) => return Err(err),
619            }
620        }
621    }
622
623    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
624        let mut new_size = self.header.wal_size;
625        let mut target = required_entry_size;
626        if target == 0 {
627            target = self.header.wal_size;
628        }
629        while new_size <= target {
630            new_size = new_size
631                .checked_mul(2)
632                .ok_or_else(|| MemvidError::CheckpointFailed {
633                    reason: "wal_size overflow".into(),
634                })?;
635        }
636        let delta = new_size - self.header.wal_size;
637        if delta == 0 {
638            return Ok(());
639        }
640
641        self.shift_data_for_wal_growth(delta)?;
642        self.header.wal_size = new_size;
643        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
644        self.data_end = self.data_end.saturating_add(delta);
645        self.adjust_offsets_after_wal_growth(delta);
646
647        let catalog_end = self.catalog_data_end();
648        self.header.footer_offset = catalog_end
649            .max(self.header.footer_offset)
650            .max(self.data_end);
651
652        self.rewrite_toc_footer()?;
653        self.header.toc_checksum = self.toc.toc_checksum;
654        crate::persist_header(&mut self.file, &self.header)?;
655        self.file.sync_all()?;
656        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
657        Ok(())
658    }
659
660    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
661        if delta == 0 {
662            return Ok(());
663        }
664        let original_len = self.file.metadata()?.len();
665        let data_start = self.header.wal_offset + self.header.wal_size;
666        self.file.set_len(original_len + delta)?;
667
668        let mut remaining = original_len.saturating_sub(data_start);
669        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
670        while remaining > 0 {
671            let chunk = min(remaining, buffer.len() as u64);
672            let src = data_start + remaining - chunk;
673            self.file.seek(SeekFrom::Start(src))?;
674            #[allow(clippy::cast_possible_truncation)]
675            self.file.read_exact(&mut buffer[..chunk as usize])?;
676            let dst = src + delta;
677            self.file.seek(SeekFrom::Start(dst))?;
678            #[allow(clippy::cast_possible_truncation)]
679            self.file.write_all(&buffer[..chunk as usize])?;
680            remaining -= chunk;
681        }
682
683        self.file.seek(SeekFrom::Start(data_start))?;
684        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
685        let mut remaining = delta;
686        while remaining > 0 {
687            let write = min(remaining, zero_buf.len() as u64);
688            #[allow(clippy::cast_possible_truncation)]
689            self.file.write_all(&zero_buf[..write as usize])?;
690            remaining -= write;
691        }
692        Ok(())
693    }
694
695    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
696        if delta == 0 {
697            return;
698        }
699
700        for frame in &mut self.toc.frames {
701            if frame.payload_offset != 0 {
702                frame.payload_offset += delta;
703            }
704        }
705
706        for segment in &mut self.toc.segments {
707            if segment.bytes_offset != 0 {
708                segment.bytes_offset += delta;
709            }
710        }
711
712        if let Some(lex) = self.toc.indexes.lex.as_mut() {
713            if lex.bytes_offset != 0 {
714                lex.bytes_offset += delta;
715            }
716        }
717        for manifest in &mut self.toc.indexes.lex_segments {
718            if manifest.bytes_offset != 0 {
719                manifest.bytes_offset += delta;
720            }
721        }
722        if let Some(vec) = self.toc.indexes.vec.as_mut() {
723            if vec.bytes_offset != 0 {
724                vec.bytes_offset += delta;
725            }
726        }
727        if let Some(time_index) = self.toc.time_index.as_mut() {
728            if time_index.bytes_offset != 0 {
729                time_index.bytes_offset += delta;
730            }
731        }
732        #[cfg(feature = "temporal_track")]
733        if let Some(track) = self.toc.temporal_track.as_mut() {
734            if track.bytes_offset != 0 {
735                track.bytes_offset += delta;
736            }
737        }
738
739        let catalog = &mut self.toc.segment_catalog;
740        for descriptor in &mut catalog.lex_segments {
741            if descriptor.common.bytes_offset != 0 {
742                descriptor.common.bytes_offset += delta;
743            }
744        }
745        for descriptor in &mut catalog.vec_segments {
746            if descriptor.common.bytes_offset != 0 {
747                descriptor.common.bytes_offset += delta;
748            }
749        }
750        for descriptor in &mut catalog.time_segments {
751            if descriptor.common.bytes_offset != 0 {
752                descriptor.common.bytes_offset += delta;
753            }
754        }
755        #[cfg(feature = "temporal_track")]
756        for descriptor in &mut catalog.temporal_segments {
757            if descriptor.common.bytes_offset != 0 {
758                descriptor.common.bytes_offset += delta;
759            }
760        }
761        for descriptor in &mut catalog.tantivy_segments {
762            if descriptor.common.bytes_offset != 0 {
763                descriptor.common.bytes_offset += delta;
764            }
765        }
766
767        #[cfg(feature = "lex")]
768        if let Ok(mut storage) = self.lex_storage.write() {
769            storage.adjust_offsets(delta);
770        }
771    }
772    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
773        self.ensure_writable()?;
774        if options.background {
775            tracing::debug!("commit background flag ignored; running synchronously");
776        }
777        let mode = options.mode;
778        let records = self.wal.pending_records()?;
779        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
780            return Ok(());
781        }
782        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
783    }
784
785    pub fn commit(&mut self) -> Result<()> {
786        self.ensure_writable()?;
787        self.commit_with_options(CommitOptions::new(CommitMode::Full))
788    }
789
790    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
791        self.generation = self.generation.wrapping_add(1);
792
793        let delta = self.apply_records(records)?;
794        let mut indexes_rebuilt = false;
795
796        // Check if CLIP index has pending embeddings that need to be persisted
797        let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
798
799        if !delta.is_empty() || clip_needs_persist {
800            tracing::debug!(
801                inserted_frames = delta.inserted_frames.len(),
802                inserted_embeddings = delta.inserted_embeddings.len(),
803                inserted_time_entries = delta.inserted_time_entries.len(),
804                clip_needs_persist = clip_needs_persist,
805                "commit applied delta"
806            );
807            self.rebuild_indexes(&delta.inserted_embeddings)?;
808            indexes_rebuilt = true;
809        }
810
811        if !indexes_rebuilt && self.tantivy_index_pending() {
812            self.flush_tantivy()?;
813        }
814
815        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
816        if !indexes_rebuilt && self.clip_enabled {
817            if let Some(ref clip_index) = self.clip_index {
818                if !clip_index.is_empty() {
819                    self.persist_clip_index()?;
820                }
821            }
822        }
823
824        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
825        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
826            self.persist_memories_track()?;
827        }
828
829        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
830        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
831            self.persist_logic_mesh()?;
832        }
833
834        // Persist sketch track if it has entries
835        if !self.sketch_track.is_empty() {
836            self.persist_sketch_track()?;
837        }
838
839        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
840        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
841
842        self.rewrite_toc_footer()?;
843        self.header.toc_checksum = self.toc.toc_checksum;
844        self.wal.record_checkpoint(&mut self.header)?;
845        self.header.toc_checksum = self.toc.toc_checksum;
846        crate::persist_header(&mut self.file, &self.header)?;
847        self.file.sync_all()?;
848        #[cfg(feature = "parallel_segments")]
849        if let Some(wal) = self.manifest_wal.as_mut() {
850            wal.flush()?;
851            wal.truncate()?;
852        }
853        self.pending_frame_inserts = 0;
854        self.dirty = false;
855        Ok(())
856    }
857
858    #[cfg(feature = "parallel_segments")]
859    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
860        self.ensure_writable()?;
861        if !self.dirty && !self.tantivy_index_pending() {
862            return Ok(());
863        }
864        let opts = opts.clone();
865        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
866    }
867
868    #[cfg(feature = "parallel_segments")]
869    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
870        if !self.dirty && !self.tantivy_index_pending() {
871            return Ok(());
872        }
873        let records = self.wal.pending_records()?;
874        let delta = self.apply_records(records)?;
875        self.generation = self.generation.wrapping_add(1);
876        let mut indexes_rebuilt = false;
877        if !delta.is_empty() {
878            tracing::info!(
879                inserted_frames = delta.inserted_frames.len(),
880                inserted_embeddings = delta.inserted_embeddings.len(),
881                inserted_time_entries = delta.inserted_time_entries.len(),
882                "parallel commit applied delta"
883            );
884            // Try to use parallel segment builder first
885            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
886            tracing::info!(
887                "parallel_commit: used_parallel={}, lex_enabled={}",
888                used_parallel,
889                self.lex_enabled
890            );
891            if used_parallel {
892                // Segments were written at data_end; update footer_offset so
893                // rewrite_toc_footer places the TOC after the new segment data
894                self.header.footer_offset = self.data_end;
895                indexes_rebuilt = true;
896
897                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
898                // Only add new frames from the delta, not all frames.
899                #[cfg(feature = "lex")]
900                if self.lex_enabled {
901                    tracing::info!(
902                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
903                        delta.inserted_frames.len(),
904                        self.toc.frames.len()
905                    );
906
907                    // If Tantivy engine already exists, frames were already indexed during put()
908                    // (at the add_frame call in put_bytes_with_options). Skip re-indexing to avoid
909                    // duplicates. Only initialize and index if Tantivy wasn't present during put.
910                    let tantivy_was_present = self.tantivy.is_some();
911                    if self.tantivy.is_none() {
912                        self.init_tantivy()?;
913                    }
914
915                    // Skip indexing if Tantivy was already present - frames were indexed during put
916                    if tantivy_was_present {
917                        tracing::info!(
918                            "parallel_commit: skipping Tantivy indexing (already indexed during put)"
919                        );
920                    } else {
921                        // First, collect all frames and their search text (to avoid borrow conflicts)
922                        let max_payload = crate::memvid::search::max_index_payload();
923                        let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
924
925                        for frame_id in &delta.inserted_frames {
926                        // Look up the actual Frame from the TOC
927                        let frame = match self.toc.frames.get(*frame_id as usize) {
928                            Some(f) => f.clone(),
929                            None => continue,
930                        };
931
932                        // Check if frame has explicit search_text first - clone it for ownership
933                        let explicit_text = frame.search_text.clone();
934                        if let Some(ref search_text) = explicit_text {
935                            if !search_text.trim().is_empty() {
936                                prepared_docs.push((frame, search_text.clone()));
937                                continue;
938                            }
939                        }
940
941                        // Get MIME type and check if text-indexable
942                        let mime = frame
943                            .metadata
944                            .as_ref()
945                            .and_then(|m| m.mime.as_deref())
946                            .unwrap_or("application/octet-stream");
947
948                        if !crate::memvid::search::is_text_indexable_mime(mime) {
949                            continue;
950                        }
951
952                        if frame.payload_length > max_payload {
953                            continue;
954                        }
955
956                        let text = self.frame_search_text(&frame)?;
957                        if !text.trim().is_empty() {
958                            prepared_docs.push((frame, text));
959                        }
960                    }
961
962                    // Now add to Tantivy engine (no borrow conflict)
963                    if let Some(ref mut engine) = self.tantivy {
964                        for (frame, text) in &prepared_docs {
965                            engine.add_frame(frame, text)?;
966                        }
967
968                        if !prepared_docs.is_empty() {
969                            engine.commit()?;
970                            self.tantivy_dirty = true;
971                        }
972
973                        tracing::info!(
974                            "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
975                            prepared_docs.len(),
976                            engine.num_docs()
977                        );
978                    } else {
979                        tracing::warn!(
980                            "parallel_commit: Tantivy engine is None after init_tantivy"
981                        );
982                    }
983                    }  // end of else !tantivy_was_present
984                }
985
986                // Time index stores all entries together for timeline queries.
987                // Unlike Tantivy which is incremental, time index needs full rebuild.
988                self.file.seek(SeekFrom::Start(self.data_end))?;
989                let mut time_entries: Vec<TimeIndexEntry> = self
990                    .toc
991                    .frames
992                    .iter()
993                    .filter(|frame| {
994                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
995                    })
996                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
997                    .collect();
998                let (ti_offset, ti_length, ti_checksum) =
999                    time_index_append(&mut self.file, &mut time_entries)?;
1000                self.toc.time_index = Some(TimeIndexManifest {
1001                    bytes_offset: ti_offset,
1002                    bytes_length: ti_length,
1003                    entry_count: time_entries.len() as u64,
1004                    checksum: ti_checksum,
1005                });
1006                // Update data_end to account for the newly written time index
1007                self.data_end = ti_offset + ti_length;
1008                self.header.footer_offset = self.data_end;
1009                tracing::info!(
1010                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1011                    ti_offset,
1012                    ti_length,
1013                    time_entries.len()
1014                );
1015            } else {
1016                // Fall back to sequential rebuild if no segments were generated
1017                self.rebuild_indexes(&delta.inserted_embeddings)?;
1018                indexes_rebuilt = true;
1019            }
1020        }
1021
1022        // Flush Tantivy index if dirty (from parallel path or pending updates)
1023        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1024            self.flush_tantivy()?;
1025        }
1026
1027        // Persist CLIP index if it has embeddings
1028        if self.clip_enabled {
1029            if let Some(ref clip_index) = self.clip_index {
1030                if !clip_index.is_empty() {
1031                    self.persist_clip_index()?;
1032                }
1033            }
1034        }
1035
1036        // Persist memories track if it has cards
1037        if self.memories_track.card_count() > 0 {
1038            self.persist_memories_track()?;
1039        }
1040
1041        // Persist logic mesh if it has nodes
1042        if !self.logic_mesh.is_empty() {
1043            self.persist_logic_mesh()?;
1044        }
1045
1046        // Persist sketch track if it has entries
1047        if !self.sketch_track.is_empty() {
1048            self.persist_sketch_track()?;
1049        }
1050
1051        // flush_tantivy() has already set footer_offset correctly
1052        // DO NOT overwrite with catalog_data_end()
1053        self.rewrite_toc_footer()?;
1054        self.header.toc_checksum = self.toc.toc_checksum;
1055        self.wal.record_checkpoint(&mut self.header)?;
1056        self.header.toc_checksum = self.toc.toc_checksum;
1057        crate::persist_header(&mut self.file, &self.header)?;
1058        self.file.sync_all()?;
1059        if let Some(wal) = self.manifest_wal.as_mut() {
1060            wal.flush()?;
1061            wal.truncate()?;
1062        }
1063        self.pending_frame_inserts = 0;
1064        self.dirty = false;
1065        Ok(())
1066    }
1067
1068    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1069        let records = self.wal.records_after(self.header.wal_sequence)?;
1070        if records.is_empty() {
1071            if self.tantivy_index_pending() {
1072                self.flush_tantivy()?;
1073            }
1074            return Ok(());
1075        }
1076        let delta = self.apply_records(records)?;
1077        if !delta.is_empty() {
1078            tracing::debug!(
1079                inserted_frames = delta.inserted_frames.len(),
1080                inserted_embeddings = delta.inserted_embeddings.len(),
1081                inserted_time_entries = delta.inserted_time_entries.len(),
1082                "recover applied delta"
1083            );
1084            self.rebuild_indexes(&delta.inserted_embeddings)?;
1085        } else if self.tantivy_index_pending() {
1086            self.flush_tantivy()?;
1087        }
1088        self.wal.record_checkpoint(&mut self.header)?;
1089        crate::persist_header(&mut self.file, &self.header)?;
1090        if !delta.is_empty() {
1091            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1092        } else if self.tantivy_index_pending() {
1093            self.flush_tantivy()?;
1094            crate::persist_header(&mut self.file, &self.header)?;
1095        }
1096        self.file.sync_all()?;
1097        self.pending_frame_inserts = 0;
1098        self.dirty = false;
1099        Ok(())
1100    }
1101
1102    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1103        let mut delta = IngestionDelta::default();
1104        if records.is_empty() {
1105            return Ok(delta);
1106        }
1107
1108        // Use data_end instead of payload_region_end to avoid overwriting
1109        // vec/lex/time segments that were written after the payload region.
1110        // payload_region_end() only considers frame payloads, but data_end tracks
1111        // all data including index segments.
1112        let mut data_cursor = self.data_end;
1113        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1114
1115        if !records.is_empty() {
1116            self.file.seek(SeekFrom::Start(data_cursor))?;
1117            for record in records {
1118                let mut entry = match decode_wal_entry(&record.payload)? {
1119                    WalEntry::Frame(entry) => entry,
1120                    #[cfg(feature = "lex")]
1121                    WalEntry::Lex(batch) => {
1122                        self.apply_lex_wal(batch)?;
1123                        continue;
1124                    }
1125                };
1126
1127                match entry.op {
1128                    FrameWalOp::Insert => {
1129                        let frame_id = self.toc.frames.len() as u64;
1130
1131                        let (
1132                            payload_offset,
1133                            payload_length,
1134                            checksum_bytes,
1135                            canonical_length_value,
1136                        ) = if let Some(source_id) = entry.reuse_payload_from {
1137                            if !entry.payload.is_empty() {
1138                                return Err(MemvidError::InvalidFrame {
1139                                    frame_id: source_id,
1140                                    reason: "reused payload entry contained inline bytes",
1141                                });
1142                            }
1143                            let source_idx = usize::try_from(source_id).map_err(|_| {
1144                                MemvidError::InvalidFrame {
1145                                    frame_id: source_id,
1146                                    reason: "frame id too large for memory",
1147                                }
1148                            })?;
1149                            let source = self.toc.frames.get(source_idx).cloned().ok_or(
1150                                MemvidError::InvalidFrame {
1151                                    frame_id: source_id,
1152                                    reason: "reused payload source missing",
1153                                },
1154                            )?;
1155                            (
1156                                source.payload_offset,
1157                                source.payload_length,
1158                                source.checksum,
1159                                entry
1160                                    .canonical_length
1161                                    .or(source.canonical_length)
1162                                    .unwrap_or(source.payload_length),
1163                            )
1164                        } else {
1165                            self.file.seek(SeekFrom::Start(data_cursor))?;
1166                            self.file.write_all(&entry.payload)?;
1167                            let checksum = hash(&entry.payload);
1168                            let payload_length = entry.payload.len() as u64;
1169                            let canonical_length =
1170                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1171                                    if let Some(len) = entry.canonical_length {
1172                                        len
1173                                    } else {
1174                                        let decoded = crate::decode_canonical_bytes(
1175                                            &entry.payload,
1176                                            CanonicalEncoding::Zstd,
1177                                            frame_id,
1178                                        )?;
1179                                        decoded.len() as u64
1180                                    }
1181                                } else {
1182                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1183                                };
1184                            let payload_offset = data_cursor;
1185                            data_cursor += payload_length;
1186                            (
1187                                payload_offset,
1188                                payload_length,
1189                                *checksum.as_bytes(),
1190                                canonical_length,
1191                            )
1192                        };
1193
1194                        let uri = entry
1195                            .uri
1196                            .clone()
1197                            .unwrap_or_else(|| crate::default_uri(frame_id));
1198                        let title = entry
1199                            .title
1200                            .clone()
1201                            .or_else(|| crate::infer_title_from_uri(&uri));
1202
1203                        #[cfg(feature = "temporal_track")]
1204                        let (anchor_ts, anchor_source) =
1205                            self.determine_temporal_anchor(entry.timestamp);
1206
1207                        let mut frame = Frame {
1208                            id: frame_id,
1209                            timestamp: entry.timestamp,
1210                            anchor_ts: {
1211                                #[cfg(feature = "temporal_track")]
1212                                {
1213                                    Some(anchor_ts)
1214                                }
1215                                #[cfg(not(feature = "temporal_track"))]
1216                                {
1217                                    None
1218                                }
1219                            },
1220                            anchor_source: {
1221                                #[cfg(feature = "temporal_track")]
1222                                {
1223                                    Some(anchor_source)
1224                                }
1225                                #[cfg(not(feature = "temporal_track"))]
1226                                {
1227                                    None
1228                                }
1229                            },
1230                            kind: entry.kind.clone(),
1231                            track: entry.track.clone(),
1232                            payload_offset,
1233                            payload_length,
1234                            checksum: checksum_bytes,
1235                            uri: Some(uri),
1236                            title,
1237                            canonical_encoding: entry.canonical_encoding,
1238                            canonical_length: Some(canonical_length_value),
1239                            metadata: entry.metadata.clone(),
1240                            search_text: entry.search_text.clone(),
1241                            tags: entry.tags.clone(),
1242                            labels: entry.labels.clone(),
1243                            extra_metadata: entry.extra_metadata.clone(),
1244                            content_dates: entry.content_dates.clone(),
1245                            chunk_manifest: entry.chunk_manifest.clone(),
1246                            role: entry.role,
1247                            parent_id: None,
1248                            chunk_index: entry.chunk_index,
1249                            chunk_count: entry.chunk_count,
1250                            status: FrameStatus::Active,
1251                            supersedes: entry.supersedes_frame_id,
1252                            superseded_by: None,
1253                            source_sha256: entry.source_sha256,
1254                            source_path: entry.source_path.clone(),
1255                            enrichment_state: entry.enrichment_state,
1256                        };
1257
1258                        if let Some(parent_seq) = entry.parent_sequence {
1259                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1260                                frame.parent_id = Some(*parent_frame_id);
1261                            } else {
1262                                // Parent sequence not found in current batch - this can happen
1263                                // if parent was committed in a previous batch. Try to find parent
1264                                // by looking at recently inserted frames with matching characteristics.
1265                                // The parent should be the most recent Document frame that has
1266                                // a chunk_manifest matching this chunk's expected parent.
1267                                if entry.role == FrameRole::DocumentChunk {
1268                                    // Look backwards through recently inserted frames
1269                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1270                                        if let Ok(idx) = usize::try_from(candidate_id) {
1271                                            if let Some(candidate) = self.toc.frames.get(idx) {
1272                                                if candidate.role == FrameRole::Document
1273                                                    && candidate.chunk_manifest.is_some()
1274                                                {
1275                                                    // Found a parent document - use it
1276                                                    frame.parent_id = Some(candidate_id);
1277                                                    tracing::debug!(
1278                                                        chunk_frame_id = frame_id,
1279                                                        parent_frame_id = candidate_id,
1280                                                        parent_seq = parent_seq,
1281                                                        "resolved chunk parent via fallback"
1282                                                    );
1283                                                    break;
1284                                                }
1285                                            }
1286                                        }
1287                                    }
1288                                }
1289                                if frame.parent_id.is_none() {
1290                                    tracing::warn!(
1291                                        chunk_frame_id = frame_id,
1292                                        parent_seq = parent_seq,
1293                                        "chunk has parent_sequence but parent not found in batch"
1294                                    );
1295                                }
1296                            }
1297                        }
1298
1299                        #[cfg(feature = "lex")]
1300                        let index_text = if self.tantivy.is_some() {
1301                            if let Some(text) = entry.search_text.clone() {
1302                                if text.trim().is_empty() {
1303                                    None
1304                                } else {
1305                                    Some(text)
1306                                }
1307                            } else {
1308                                Some(self.frame_content(&frame)?)
1309                            }
1310                        } else {
1311                            None
1312                        };
1313                        #[cfg(feature = "lex")]
1314                        if let (Some(engine), Some(text)) =
1315                            (self.tantivy.as_mut(), index_text.as_ref())
1316                        {
1317                            engine.add_frame(&frame, text)?;
1318                            self.tantivy_dirty = true;
1319
1320                            // Generate sketch for fast candidate pre-filtering
1321                            // Uses the same text as tantivy indexing for consistency
1322                            if !text.trim().is_empty() {
1323                                let entry = crate::types::generate_sketch(
1324                                    frame_id,
1325                                    text,
1326                                    crate::types::SketchVariant::Small,
1327                                    None,
1328                                );
1329                                self.sketch_track.insert(entry);
1330                            }
1331                        }
1332
1333                        if let Some(embedding) = entry.embedding.take() {
1334                            delta
1335                                .inserted_embeddings
1336                                .push((frame_id, embedding.clone()));
1337                        }
1338
1339                        if entry.role == FrameRole::Document {
1340                            delta
1341                                .inserted_time_entries
1342                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1343                            #[cfg(feature = "temporal_track")]
1344                            {
1345                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1346                                    frame_id,
1347                                    anchor_ts,
1348                                    anchor_source,
1349                                ));
1350                                delta.inserted_temporal_mentions.extend(
1351                                    Self::collect_temporal_mentions(
1352                                        entry.search_text.as_deref(),
1353                                        frame_id,
1354                                        anchor_ts,
1355                                    ),
1356                                );
1357                            }
1358                        }
1359
1360                        if let Some(predecessor) = frame.supersedes {
1361                            self.mark_frame_superseded(predecessor, frame_id)?;
1362                        }
1363
1364                        self.toc.frames.push(frame);
1365                        delta.inserted_frames.push(frame_id);
1366                        sequence_to_frame.insert(record.sequence, frame_id);
1367                    }
1368                    FrameWalOp::Tombstone => {
1369                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1370                            frame_id: 0,
1371                            reason: "tombstone missing frame reference",
1372                        })?;
1373                        self.mark_frame_deleted(target)?;
1374                        delta.mutated_frames = true;
1375                    }
1376                }
1377            }
1378            self.data_end = self.data_end.max(data_cursor);
1379        }
1380
1381        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1382        // This handles edge cases where chunks couldn't be linked during the first pass.
1383        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1384        let orphan_resolutions: Vec<(u64, u64)> = delta
1385            .inserted_frames
1386            .iter()
1387            .filter_map(|&frame_id| {
1388                let idx = usize::try_from(frame_id).ok()?;
1389                let frame = self.toc.frames.get(idx)?;
1390                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1391                    return None;
1392                }
1393                // Find the most recent Document frame before this chunk that has a manifest
1394                for candidate_id in (0..frame_id).rev() {
1395                    if let Ok(idx) = usize::try_from(candidate_id) {
1396                        if let Some(candidate) = self.toc.frames.get(idx) {
1397                            if candidate.role == FrameRole::Document
1398                                && candidate.chunk_manifest.is_some()
1399                                && candidate.status == FrameStatus::Active
1400                            {
1401                                return Some((frame_id, candidate_id));
1402                            }
1403                        }
1404                    }
1405                }
1406                None
1407            })
1408            .collect();
1409
1410        // Now apply the resolutions
1411        for (chunk_id, parent_id) in orphan_resolutions {
1412            if let Ok(idx) = usize::try_from(chunk_id) {
1413                if let Some(frame) = self.toc.frames.get_mut(idx) {
1414                    frame.parent_id = Some(parent_id);
1415                    tracing::debug!(
1416                        chunk_frame_id = chunk_id,
1417                        parent_frame_id = parent_id,
1418                        "resolved orphan chunk parent in second pass"
1419                    );
1420                }
1421            }
1422        }
1423
1424        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1425        // See commit_from_records() for where rebuild_indexes() is invoked.
1426        Ok(delta)
1427    }
1428
1429    #[cfg(feature = "temporal_track")]
1430    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1431        (timestamp, AnchorSource::FrameTimestamp)
1432    }
1433
1434    #[cfg(feature = "temporal_track")]
1435    fn collect_temporal_mentions(
1436        text: Option<&str>,
1437        frame_id: FrameId,
1438        anchor_ts: i64,
1439    ) -> Vec<TemporalMention> {
1440        let text = match text {
1441            Some(value) if !value.trim().is_empty() => value,
1442            _ => return Vec::new(),
1443        };
1444
1445        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1446            Ok(ts) => ts,
1447            Err(_) => return Vec::new(),
1448        };
1449
1450        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1451        let normalizer = TemporalNormalizer::new(context);
1452        let mut spans: Vec<(usize, usize)> = Vec::new();
1453        let lower = text.to_ascii_lowercase();
1454
1455        for phrase in STATIC_TEMPORAL_PHRASES {
1456            let mut search_start = 0usize;
1457            while let Some(idx) = lower[search_start..].find(phrase) {
1458                let abs = search_start + idx;
1459                let end = abs + phrase.len();
1460                spans.push((abs, end));
1461                search_start = end;
1462            }
1463        }
1464
1465        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1466        let regex = NUMERIC_DATE.get_or_init(|| {
1467            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1468        });
1469        let regex = match regex {
1470            Ok(re) => re,
1471            Err(msg) => {
1472                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1473                return Vec::new();
1474            }
1475        };
1476        for mat in regex.find_iter(text) {
1477            spans.push((mat.start(), mat.end()));
1478        }
1479
1480        spans.sort_unstable();
1481        spans.dedup();
1482
1483        let mut mentions: Vec<TemporalMention> = Vec::new();
1484        for (start, end) in spans {
1485            if end > text.len() || start >= end {
1486                continue;
1487            }
1488            let raw = &text[start..end];
1489            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1490            if trimmed.is_empty() {
1491                continue;
1492            }
1493            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1494            let finish = offset + trimmed.len();
1495            match normalizer.resolve(trimmed) {
1496                Ok(resolution) => {
1497                    mentions.extend(Self::resolution_to_mentions(
1498                        resolution, frame_id, offset, finish,
1499                    ));
1500                }
1501                Err(_) => continue,
1502            }
1503        }
1504
1505        mentions
1506    }
1507
1508    #[cfg(feature = "temporal_track")]
1509    fn resolution_to_mentions(
1510        resolution: TemporalResolution,
1511        frame_id: FrameId,
1512        byte_start: usize,
1513        byte_end: usize,
1514    ) -> Vec<TemporalMention> {
1515        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1516        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1517        let mut results = Vec::new();
1518
1519        let base_flags = Self::flags_from_resolution(&resolution.flags);
1520        match resolution.value {
1521            TemporalResolutionValue::Date(date) => {
1522                let ts = Self::date_to_timestamp(date);
1523                results.push(TemporalMention::new(
1524                    ts,
1525                    frame_id,
1526                    byte_start,
1527                    byte_len,
1528                    TemporalMentionKind::Date,
1529                    resolution.confidence,
1530                    0,
1531                    base_flags,
1532                ));
1533            }
1534            TemporalResolutionValue::DateTime(dt) => {
1535                let ts = dt.unix_timestamp();
1536                let tz_hint = dt.offset().whole_minutes() as i16;
1537                results.push(TemporalMention::new(
1538                    ts,
1539                    frame_id,
1540                    byte_start,
1541                    byte_len,
1542                    TemporalMentionKind::DateTime,
1543                    resolution.confidence,
1544                    tz_hint,
1545                    base_flags,
1546                ));
1547            }
1548            TemporalResolutionValue::DateRange { start, end } => {
1549                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1550                let start_ts = Self::date_to_timestamp(start);
1551                results.push(TemporalMention::new(
1552                    start_ts,
1553                    frame_id,
1554                    byte_start,
1555                    byte_len,
1556                    TemporalMentionKind::RangeStart,
1557                    resolution.confidence,
1558                    0,
1559                    flags,
1560                ));
1561                let end_ts = Self::date_to_timestamp(end);
1562                results.push(TemporalMention::new(
1563                    end_ts,
1564                    frame_id,
1565                    byte_start,
1566                    byte_len,
1567                    TemporalMentionKind::RangeEnd,
1568                    resolution.confidence,
1569                    0,
1570                    flags,
1571                ));
1572            }
1573            TemporalResolutionValue::DateTimeRange { start, end } => {
1574                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1575                results.push(TemporalMention::new(
1576                    start.unix_timestamp(),
1577                    frame_id,
1578                    byte_start,
1579                    byte_len,
1580                    TemporalMentionKind::RangeStart,
1581                    resolution.confidence,
1582                    start.offset().whole_minutes() as i16,
1583                    flags,
1584                ));
1585                results.push(TemporalMention::new(
1586                    end.unix_timestamp(),
1587                    frame_id,
1588                    byte_start,
1589                    byte_len,
1590                    TemporalMentionKind::RangeEnd,
1591                    resolution.confidence,
1592                    end.offset().whole_minutes() as i16,
1593                    flags,
1594                ));
1595            }
1596            TemporalResolutionValue::Month { year, month } => {
1597                let start_date = match Date::from_calendar_date(year, month, 1) {
1598                    Ok(date) => date,
1599                    Err(err) => {
1600                        tracing::warn!(
1601                            target = "memvid::temporal",
1602                            %err,
1603                            year,
1604                            month = month as u8,
1605                            "skipping invalid month resolution"
1606                        );
1607                        // Skip invalid range for this mention only.
1608                        return results;
1609                    }
1610                };
1611                let end_date = match Self::last_day_in_month(year, month) {
1612                    Some(date) => date,
1613                    None => {
1614                        tracing::warn!(
1615                            target = "memvid::temporal",
1616                            year,
1617                            month = month as u8,
1618                            "skipping month resolution with invalid calendar range"
1619                        );
1620                        return results;
1621                    }
1622                };
1623                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1624                results.push(TemporalMention::new(
1625                    Self::date_to_timestamp(start_date),
1626                    frame_id,
1627                    byte_start,
1628                    byte_len,
1629                    TemporalMentionKind::RangeStart,
1630                    resolution.confidence,
1631                    0,
1632                    flags,
1633                ));
1634                results.push(TemporalMention::new(
1635                    Self::date_to_timestamp(end_date),
1636                    frame_id,
1637                    byte_start,
1638                    byte_len,
1639                    TemporalMentionKind::RangeEnd,
1640                    resolution.confidence,
1641                    0,
1642                    flags,
1643                ));
1644            }
1645        }
1646
1647        results
1648    }
1649
1650    #[cfg(feature = "temporal_track")]
1651    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1652        let mut result = TemporalMentionFlags::empty();
1653        if flags
1654            .iter()
1655            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1656        {
1657            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1658        }
1659        if flags
1660            .iter()
1661            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1662        {
1663            result = result.set(TemporalMentionFlags::DERIVED, true);
1664        }
1665        result
1666    }
1667
1668    #[cfg(feature = "temporal_track")]
1669    fn date_to_timestamp(date: Date) -> i64 {
1670        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1671            .assume_offset(UtcOffset::UTC)
1672            .unix_timestamp()
1673    }
1674
1675    #[cfg(feature = "temporal_track")]
1676    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1677        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1678        while let Some(next) = date.next_day() {
1679            if next.month() == month {
1680                date = next;
1681            } else {
1682                break;
1683            }
1684        }
1685        Some(date)
1686    }
1687
1688    #[allow(dead_code)]
1689    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1690        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1691            return Ok(false);
1692        }
1693
1694        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1695            Some(artifact) => artifact,
1696            None => return Ok(false),
1697        };
1698
1699        let segment_id = self.toc.segment_catalog.next_segment_id;
1700        #[cfg(feature = "parallel_segments")]
1701        let span =
1702            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1703
1704        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1705        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1706        #[cfg(feature = "parallel_segments")]
1707        if let Some(span) = span {
1708            Self::decorate_segment_common(&mut descriptor.common, span);
1709        }
1710        #[cfg(feature = "parallel_segments")]
1711        let descriptor_for_manifest = descriptor.clone();
1712        self.toc.segment_catalog.lex_segments.push(descriptor);
1713        #[cfg(feature = "parallel_segments")]
1714        if let Err(err) = self.record_index_segment(
1715            SegmentKind::Lexical,
1716            descriptor_for_manifest.common,
1717            SegmentStats {
1718                doc_count: artifact.doc_count,
1719                vector_count: 0,
1720                time_entries: 0,
1721                bytes_uncompressed: artifact.bytes.len() as u64,
1722                build_micros: 0,
1723            },
1724        ) {
1725            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1726        }
1727        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1728        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1729        Ok(true)
1730    }
1731
1732    #[allow(dead_code)]
1733    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1734        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1735            return Ok(false);
1736        }
1737
1738        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1739            Some(artifact) => artifact,
1740            None => return Ok(false),
1741        };
1742
1743        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1744            if existing_dim != artifact.dimension {
1745                return Err(MemvidError::VecDimensionMismatch {
1746                    expected: existing_dim,
1747                    actual: artifact.dimension as usize,
1748                });
1749            }
1750        }
1751
1752        let segment_id = self.toc.segment_catalog.next_segment_id;
1753        #[cfg(feature = "parallel_segments")]
1754        #[cfg(feature = "parallel_segments")]
1755        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1756
1757        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1758        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1759        #[cfg(feature = "parallel_segments")]
1760        if let Some(span) = span {
1761            Self::decorate_segment_common(&mut descriptor.common, span);
1762        }
1763        #[cfg(feature = "parallel_segments")]
1764        let descriptor_for_manifest = descriptor.clone();
1765        self.toc.segment_catalog.vec_segments.push(descriptor);
1766        #[cfg(feature = "parallel_segments")]
1767        if let Err(err) = self.record_index_segment(
1768            SegmentKind::Vector,
1769            descriptor_for_manifest.common,
1770            SegmentStats {
1771                doc_count: 0,
1772                vector_count: artifact.vector_count,
1773                time_entries: 0,
1774                bytes_uncompressed: artifact.bytes_uncompressed,
1775                build_micros: 0,
1776            },
1777        ) {
1778            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1779        }
1780        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1781        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1782
1783        // Keep the global vec manifest in sync for auto-detection and stats.
1784        if self.toc.indexes.vec.is_none() {
1785            let empty_offset = self.data_end;
1786            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1787                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1788            self.toc.indexes.vec = Some(VecIndexManifest {
1789                vector_count: 0,
1790                dimension: 0,
1791                bytes_offset: empty_offset,
1792                bytes_length: 0,
1793                checksum: empty_checksum,
1794                compression_mode: self.vec_compression.clone(),
1795            });
1796        }
1797        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1798            if manifest.dimension == 0 {
1799                manifest.dimension = artifact.dimension;
1800            }
1801            if manifest.bytes_length == 0 {
1802                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1803                manifest.compression_mode = artifact.compression.clone();
1804            }
1805        }
1806
1807        self.vec_enabled = true;
1808        Ok(true)
1809    }
1810
1811    #[allow(dead_code)]
1812    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1813        if delta.inserted_time_entries.is_empty() {
1814            return Ok(false);
1815        }
1816
1817        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1818            Some(artifact) => artifact,
1819            None => return Ok(false),
1820        };
1821
1822        let segment_id = self.toc.segment_catalog.next_segment_id;
1823        #[cfg(feature = "parallel_segments")]
1824        #[cfg(feature = "parallel_segments")]
1825        let span = self.segment_span_from_iter(
1826            delta
1827                .inserted_time_entries
1828                .iter()
1829                .map(|entry| entry.frame_id),
1830        );
1831
1832        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1833        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1834        #[cfg(feature = "parallel_segments")]
1835        if let Some(span) = span {
1836            Self::decorate_segment_common(&mut descriptor.common, span);
1837        }
1838        #[cfg(feature = "parallel_segments")]
1839        let descriptor_for_manifest = descriptor.clone();
1840        self.toc.segment_catalog.time_segments.push(descriptor);
1841        #[cfg(feature = "parallel_segments")]
1842        if let Err(err) = self.record_index_segment(
1843            SegmentKind::Time,
1844            descriptor_for_manifest.common,
1845            SegmentStats {
1846                doc_count: 0,
1847                vector_count: 0,
1848                time_entries: artifact.entry_count,
1849                bytes_uncompressed: artifact.bytes.len() as u64,
1850                build_micros: 0,
1851            },
1852        ) {
1853            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1854        }
1855        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1856        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1857        Ok(true)
1858    }
1859
1860    #[cfg(feature = "temporal_track")]
1861    #[allow(dead_code)]
1862    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1863        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1864        {
1865            return Ok(false);
1866        }
1867
1868        debug_assert!(
1869            delta.inserted_temporal_mentions.len() < 1_000_000,
1870            "temporal delta mentions unexpectedly large: {}",
1871            delta.inserted_temporal_mentions.len()
1872        );
1873        debug_assert!(
1874            delta.inserted_temporal_anchors.len() < 1_000_000,
1875            "temporal delta anchors unexpectedly large: {}",
1876            delta.inserted_temporal_anchors.len()
1877        );
1878
1879        let artifact = match self.build_temporal_segment_from_records(
1880            &delta.inserted_temporal_mentions,
1881            &delta.inserted_temporal_anchors,
1882        )? {
1883            Some(artifact) => artifact,
1884            None => return Ok(false),
1885        };
1886
1887        let segment_id = self.toc.segment_catalog.next_segment_id;
1888        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1889        self.toc
1890            .segment_catalog
1891            .temporal_segments
1892            .push(descriptor.clone());
1893        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1894        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1895
1896        self.toc.temporal_track = Some(TemporalTrackManifest {
1897            bytes_offset: descriptor.common.bytes_offset,
1898            bytes_length: descriptor.common.bytes_length,
1899            entry_count: artifact.entry_count,
1900            anchor_count: artifact.anchor_count,
1901            checksum: artifact.checksum,
1902            flags: artifact.flags,
1903        });
1904
1905        self.clear_temporal_track_cache();
1906
1907        Ok(true)
1908    }
1909
1910    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1911        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
1912            frame_id,
1913            reason: "frame id too large",
1914        })?;
1915        let frame = self
1916            .toc
1917            .frames
1918            .get_mut(index)
1919            .ok_or(MemvidError::InvalidFrame {
1920                frame_id,
1921                reason: "supersede target missing",
1922            })?;
1923        frame.status = FrameStatus::Superseded;
1924        frame.superseded_by = Some(successor_id);
1925        self.remove_frame_from_indexes(frame_id)
1926    }
1927
1928    pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1929        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1930            return Ok(());
1931        }
1932
1933        let payload_end = self.payload_region_end();
1934        self.data_end = payload_end;
1935        // Don't truncate if footer_offset is higher - there may be replay segments
1936        // or other data written after payload_end that must be preserved.
1937        let safe_truncate_len = self.header.footer_offset.max(payload_end);
1938        if self.file.metadata()?.len() > safe_truncate_len {
1939            self.file.set_len(safe_truncate_len)?;
1940        }
1941        self.file.seek(SeekFrom::Start(payload_end))?;
1942
1943        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
1944        self.toc.segment_catalog.lex_segments.clear();
1945        self.toc.segment_catalog.vec_segments.clear();
1946        self.toc.segment_catalog.time_segments.clear();
1947        #[cfg(feature = "temporal_track")]
1948        self.toc.segment_catalog.temporal_segments.clear();
1949        #[cfg(feature = "parallel_segments")]
1950        self.toc.segment_catalog.index_segments.clear();
1951        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
1952        self.toc.segment_catalog.tantivy_segments.clear();
1953        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
1954        self.toc.indexes.lex_segments.clear();
1955
1956        let mut time_entries: Vec<TimeIndexEntry> = self
1957            .toc
1958            .frames
1959            .iter()
1960            .filter(|frame| {
1961                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1962            })
1963            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1964            .collect();
1965        let (ti_offset, ti_length, ti_checksum) =
1966            time_index_append(&mut self.file, &mut time_entries)?;
1967        self.toc.time_index = Some(TimeIndexManifest {
1968            bytes_offset: ti_offset,
1969            bytes_length: ti_length,
1970            entry_count: time_entries.len() as u64,
1971            checksum: ti_checksum,
1972        });
1973
1974        let mut footer_offset = ti_offset + ti_length;
1975
1976        #[cfg(feature = "temporal_track")]
1977        {
1978            self.toc.temporal_track = None;
1979            self.toc.segment_catalog.temporal_segments.clear();
1980            self.clear_temporal_track_cache();
1981        }
1982
1983        if self.lex_enabled {
1984            #[cfg(feature = "lex")]
1985            {
1986                // Clear embedded storage to avoid carrying stale segments between rebuilds.
1987                if let Ok(mut storage) = self.lex_storage.write() {
1988                    storage.clear();
1989                    storage.set_generation(0);
1990                }
1991
1992                // Initialize Tantivy engine (loads existing segments if any)
1993                self.init_tantivy()?;
1994
1995                if let Some(mut engine) = self.tantivy.take() {
1996                    self.rebuild_tantivy_engine(&mut engine)?;
1997                    self.tantivy = Some(engine);
1998                } else {
1999                    return Err(MemvidError::InvalidToc {
2000                        reason: "tantivy engine missing during doctor rebuild".into(),
2001                    });
2002                }
2003
2004                // Set lex_enabled to ensure it persists
2005                self.lex_enabled = true;
2006
2007                // Mark Tantivy as dirty so it gets flushed
2008                self.tantivy_dirty = true;
2009
2010                // Position embedded Tantivy segments immediately after the time index.
2011                self.data_end = footer_offset;
2012
2013                // Flush Tantivy segments to file
2014                self.flush_tantivy()?;
2015
2016                // Update footer_offset after Tantivy flush
2017                footer_offset = self.header.footer_offset;
2018
2019                // Restore data_end to payload boundary so future payload writes stay before indexes.
2020                self.data_end = payload_end;
2021            }
2022            #[cfg(not(feature = "lex"))]
2023            {
2024                self.toc.indexes.lex = None;
2025                self.toc.indexes.lex_segments.clear();
2026            }
2027        } else {
2028            // Lex disabled: clear everything
2029            self.toc.indexes.lex = None;
2030            self.toc.indexes.lex_segments.clear();
2031            #[cfg(feature = "lex")]
2032            if let Ok(mut storage) = self.lex_storage.write() {
2033                storage.clear();
2034            }
2035        }
2036
2037        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2038            let vec_offset = footer_offset;
2039            self.file.seek(SeekFrom::Start(vec_offset))?;
2040            self.file.write_all(&artifact.bytes)?;
2041            footer_offset += artifact.bytes.len() as u64;
2042            self.toc.indexes.vec = Some(VecIndexManifest {
2043                vector_count: artifact.vector_count,
2044                dimension: artifact.dimension,
2045                bytes_offset: vec_offset,
2046                bytes_length: artifact.bytes.len() as u64,
2047                checksum: artifact.checksum,
2048                compression_mode: self.vec_compression.clone(),
2049            });
2050            self.vec_index = Some(index);
2051        } else {
2052            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2053            if !self.vec_enabled {
2054                self.toc.indexes.vec = None;
2055            }
2056            self.vec_index = None;
2057        }
2058
2059        // Persist CLIP index if it has embeddings
2060        if self.clip_enabled {
2061            if let Some(ref clip_index) = self.clip_index {
2062                if !clip_index.is_empty() {
2063                    let artifact = clip_index.encode()?;
2064                    let clip_offset = footer_offset;
2065                    self.file.seek(SeekFrom::Start(clip_offset))?;
2066                    self.file.write_all(&artifact.bytes)?;
2067                    footer_offset += artifact.bytes.len() as u64;
2068                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2069                        bytes_offset: clip_offset,
2070                        bytes_length: artifact.bytes.len() as u64,
2071                        vector_count: artifact.vector_count,
2072                        dimension: artifact.dimension,
2073                        checksum: artifact.checksum,
2074                        model_name: crate::clip::default_model_info().name.to_string(),
2075                    });
2076                    tracing::info!(
2077                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2078                        artifact.vector_count,
2079                        clip_offset
2080                    );
2081                }
2082            }
2083        } else {
2084            self.toc.indexes.clip = None;
2085        }
2086
2087        // Persist memories track if it has cards
2088        if self.memories_track.card_count() > 0 {
2089            let memories_offset = footer_offset;
2090            let memories_bytes = self.memories_track.serialize()?;
2091            let memories_checksum = blake3::hash(&memories_bytes).into();
2092            self.file.seek(SeekFrom::Start(memories_offset))?;
2093            self.file.write_all(&memories_bytes)?;
2094            footer_offset += memories_bytes.len() as u64;
2095
2096            let stats = self.memories_track.stats();
2097            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2098                bytes_offset: memories_offset,
2099                bytes_length: memories_bytes.len() as u64,
2100                card_count: stats.card_count as u64,
2101                entity_count: stats.entity_count as u64,
2102                checksum: memories_checksum,
2103            });
2104        } else {
2105            self.toc.memories_track = None;
2106        }
2107
2108        // Persist logic mesh if it has nodes
2109        if self.logic_mesh.is_empty() {
2110            self.toc.logic_mesh = None;
2111        } else {
2112            let mesh_offset = footer_offset;
2113            let mesh_bytes = self.logic_mesh.serialize()?;
2114            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2115            self.file.seek(SeekFrom::Start(mesh_offset))?;
2116            self.file.write_all(&mesh_bytes)?;
2117            footer_offset += mesh_bytes.len() as u64;
2118
2119            let stats = self.logic_mesh.stats();
2120            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2121                bytes_offset: mesh_offset,
2122                bytes_length: mesh_bytes.len() as u64,
2123                node_count: stats.node_count as u64,
2124                edge_count: stats.edge_count as u64,
2125                checksum: mesh_checksum,
2126            });
2127        }
2128
2129        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2130        tracing::info!(
2131            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2132            ti_offset,
2133            ti_length,
2134            footer_offset,
2135            self.header.footer_offset
2136        );
2137
2138        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2139        // This prevents overwriting data like replay segments that were written after index data
2140        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2141
2142        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2143        if self.file.metadata()?.len() < self.header.footer_offset {
2144            self.file.set_len(self.header.footer_offset)?;
2145        }
2146
2147        self.rewrite_toc_footer()?;
2148        self.header.toc_checksum = self.toc.toc_checksum;
2149        crate::persist_header(&mut self.file, &self.header)?;
2150
2151        #[cfg(feature = "lex")]
2152        if self.lex_enabled {
2153            if let Some(ref engine) = self.tantivy {
2154                let doc_count = engine.num_docs();
2155                let active_frame_count = self
2156                    .toc
2157                    .frames
2158                    .iter()
2159                    .filter(|f| f.status == FrameStatus::Active)
2160                    .count();
2161
2162                // Count frames that would actually be indexed by rebuild_tantivy_engine
2163                // Uses the same logic: content-type based check + size limit
2164                let text_indexable_count = self
2165                    .toc
2166                    .frames
2167                    .iter()
2168                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2169                    .count();
2170
2171                // Only fail if we have text-indexable frames but none got indexed
2172                // This avoids false positives for binary files (videos, images)
2173                if doc_count == 0 && text_indexable_count > 0 {
2174                    return Err(MemvidError::Doctor {
2175                        reason: format!(
2176                            "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2177                            This indicates a critical failure in the rebuild process."
2178                        ),
2179                    });
2180                }
2181
2182                // Success! Log it
2183                log::info!(
2184                    "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2185                );
2186            }
2187        }
2188
2189        Ok(())
2190    }
2191
2192    /// Persist the memories track to the file without a full rebuild.
2193    ///
2194    /// This is used when the memories track has been modified but no frame
2195    /// changes were made (e.g., after running enrichment).
2196    fn persist_memories_track(&mut self) -> Result<()> {
2197        if self.memories_track.card_count() == 0 {
2198            self.toc.memories_track = None;
2199            return Ok(());
2200        }
2201
2202        // Write after the current footer_offset
2203        let memories_offset = self.header.footer_offset;
2204        let memories_bytes = self.memories_track.serialize()?;
2205        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2206
2207        self.file.seek(SeekFrom::Start(memories_offset))?;
2208        self.file.write_all(&memories_bytes)?;
2209
2210        let stats = self.memories_track.stats();
2211        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2212            bytes_offset: memories_offset,
2213            bytes_length: memories_bytes.len() as u64,
2214            card_count: stats.card_count as u64,
2215            entity_count: stats.entity_count as u64,
2216            checksum: memories_checksum,
2217        });
2218
2219        // Update footer_offset to account for the memories track
2220        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2221
2222        // Ensure the file length covers the memories track
2223        if self.file.metadata()?.len() < self.header.footer_offset {
2224            self.file.set_len(self.header.footer_offset)?;
2225        }
2226
2227        Ok(())
2228    }
2229
2230    /// Persist the CLIP index to the file without a full rebuild.
2231    ///
2232    /// This is used when CLIP embeddings have been added but no full
2233    /// index rebuild is needed (e.g., in parallel segments mode).
2234    fn persist_clip_index(&mut self) -> Result<()> {
2235        if !self.clip_enabled {
2236            self.toc.indexes.clip = None;
2237            return Ok(());
2238        }
2239
2240        let clip_index = match &self.clip_index {
2241            Some(idx) if !idx.is_empty() => idx,
2242            _ => {
2243                self.toc.indexes.clip = None;
2244                return Ok(());
2245            }
2246        };
2247
2248        // Encode the CLIP index
2249        let artifact = clip_index.encode()?;
2250
2251        // Write after the current footer_offset
2252        let clip_offset = self.header.footer_offset;
2253        self.file.seek(SeekFrom::Start(clip_offset))?;
2254        self.file.write_all(&artifact.bytes)?;
2255
2256        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2257            bytes_offset: clip_offset,
2258            bytes_length: artifact.bytes.len() as u64,
2259            vector_count: artifact.vector_count,
2260            dimension: artifact.dimension,
2261            checksum: artifact.checksum,
2262            model_name: crate::clip::default_model_info().name.to_string(),
2263        });
2264
2265        tracing::info!(
2266            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2267            artifact.vector_count,
2268            clip_offset
2269        );
2270
2271        // Update footer_offset to account for the CLIP index
2272        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2273
2274        // Ensure the file length covers the CLIP index
2275        if self.file.metadata()?.len() < self.header.footer_offset {
2276            self.file.set_len(self.header.footer_offset)?;
2277        }
2278
2279        Ok(())
2280    }
2281
2282    /// Persist the Logic-Mesh to the file without a full rebuild.
2283    ///
2284    /// This is used when the Logic-Mesh has been modified but no frame
2285    /// changes were made (e.g., after running NER enrichment).
2286    fn persist_logic_mesh(&mut self) -> Result<()> {
2287        if self.logic_mesh.is_empty() {
2288            self.toc.logic_mesh = None;
2289            return Ok(());
2290        }
2291
2292        // Write after the current footer_offset
2293        let mesh_offset = self.header.footer_offset;
2294        let mesh_bytes = self.logic_mesh.serialize()?;
2295        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2296
2297        self.file.seek(SeekFrom::Start(mesh_offset))?;
2298        self.file.write_all(&mesh_bytes)?;
2299
2300        let stats = self.logic_mesh.stats();
2301        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2302            bytes_offset: mesh_offset,
2303            bytes_length: mesh_bytes.len() as u64,
2304            node_count: stats.node_count as u64,
2305            edge_count: stats.edge_count as u64,
2306            checksum: mesh_checksum,
2307        });
2308
2309        // Update footer_offset to account for the logic mesh
2310        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2311
2312        // Ensure the file length covers the logic mesh
2313        if self.file.metadata()?.len() < self.header.footer_offset {
2314            self.file.set_len(self.header.footer_offset)?;
2315        }
2316
2317        Ok(())
2318    }
2319
2320    /// Persist the sketch track to the file without a full rebuild.
2321    ///
2322    /// This is used when the sketch track has been modified (e.g., after
2323    /// running `sketch build`).
2324    fn persist_sketch_track(&mut self) -> Result<()> {
2325        if self.sketch_track.is_empty() {
2326            self.toc.sketch_track = None;
2327            return Ok(());
2328        }
2329
2330        // Seek to write after the current footer_offset
2331        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2332
2333        // Write the sketch track and get (offset, length, checksum)
2334        let (sketch_offset, sketch_length, sketch_checksum) =
2335            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2336
2337        let stats = self.sketch_track.stats();
2338        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2339            bytes_offset: sketch_offset,
2340            bytes_length: sketch_length,
2341            entry_count: stats.entry_count,
2342            #[allow(clippy::cast_possible_truncation)]
2343            entry_size: stats.variant.entry_size() as u16,
2344            flags: 0,
2345            checksum: sketch_checksum,
2346        });
2347
2348        // Update footer_offset to account for the sketch track
2349        self.header.footer_offset = sketch_offset + sketch_length;
2350
2351        // Ensure the file length covers the sketch track
2352        if self.file.metadata()?.len() < self.header.footer_offset {
2353            self.file.set_len(self.header.footer_offset)?;
2354        }
2355
2356        tracing::debug!(
2357            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2358            stats.entry_count,
2359            sketch_offset
2360        );
2361
2362        Ok(())
2363    }
2364
2365    #[cfg(feature = "lex")]
2366    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2367        let LexWalBatch {
2368            generation,
2369            doc_count,
2370            checksum,
2371            segments,
2372        } = batch;
2373
2374        if let Ok(mut storage) = self.lex_storage.write() {
2375            storage.replace(doc_count, checksum, segments);
2376            storage.set_generation(generation);
2377        }
2378
2379        self.persist_lex_manifest()
2380    }
2381
2382    #[cfg(feature = "lex")]
2383    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2384        let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2385        self.append_wal_entry(&payload)?;
2386        Ok(())
2387    }
2388
2389    #[cfg(feature = "lex")]
2390    fn persist_lex_manifest(&mut self) -> Result<()> {
2391        let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2392            storage.to_manifest()
2393        } else {
2394            (None, Vec::new())
2395        };
2396
2397        // Update the manifest
2398        if let Some(storage_manifest) = index_manifest {
2399            // Old LexIndexArtifact format: set the manifest with actual offset/length
2400            self.toc.indexes.lex = Some(storage_manifest);
2401        } else {
2402            // Tantivy segments OR lex disabled: clear the manifest
2403            // Stats will check lex_segments instead of manifest
2404            self.toc.indexes.lex = None;
2405        }
2406
2407        self.toc.indexes.lex_segments = segments;
2408
2409        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2410        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2411
2412        self.rewrite_toc_footer()?;
2413        self.header.toc_checksum = self.toc.toc_checksum;
2414        crate::persist_header(&mut self.file, &self.header)?;
2415        Ok(())
2416    }
2417
2418    #[cfg(feature = "lex")]
2419    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2420        let TantivySnapshot {
2421            doc_count,
2422            checksum,
2423            segments,
2424        } = snapshot;
2425
2426        let mut footer_offset = self.data_end;
2427        self.file.seek(SeekFrom::Start(footer_offset))?;
2428
2429        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2430        for segment in segments {
2431            let bytes_length = segment.bytes.len() as u64;
2432            self.file.write_all(&segment.bytes)?;
2433            self.file.flush()?; // Flush segment data to disk
2434            embedded_segments.push(EmbeddedLexSegment {
2435                path: segment.path,
2436                bytes_offset: footer_offset,
2437                bytes_length,
2438                checksum: segment.checksum,
2439            });
2440            footer_offset += bytes_length;
2441        }
2442        // Set footer_offset for TOC writing, but DON'T update data_end
2443        // data_end stays at end of payloads, so next commit overwrites these segments
2444        // Use max() to never decrease footer_offset - this preserves replay segments
2445        // that may have been written at a higher offset
2446        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2447
2448        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2449        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2450            Vec::with_capacity(embedded_segments.len());
2451        for segment in &embedded_segments {
2452            let descriptor = TantivySegmentDescriptor::from_common(
2453                SegmentCommon::new(
2454                    next_segment_id,
2455                    segment.bytes_offset,
2456                    segment.bytes_length,
2457                    segment.checksum,
2458                ),
2459                segment.path.clone(),
2460            );
2461            catalog_segments.push(descriptor);
2462            next_segment_id = next_segment_id.saturating_add(1);
2463        }
2464        if catalog_segments.is_empty() {
2465            self.toc.segment_catalog.tantivy_segments.clear();
2466        } else {
2467            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2468            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2469        }
2470        self.toc.segment_catalog.next_segment_id = next_segment_id;
2471
2472        // REMOVED: catalog_data_end() check
2473        // This was causing orphaned Tantivy segments because it would see OLD segments
2474        // still in the catalog from previous commits, and push footer_offset forward.
2475        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2476        // stay at the end of the newly written segments.
2477
2478        let generation = self
2479            .lex_storage
2480            .write()
2481            .map_err(|_| MemvidError::Tantivy {
2482                reason: "embedded lex storage lock poisoned".into(),
2483            })
2484            .map(|mut storage| {
2485                storage.replace(doc_count, checksum, embedded_segments.clone());
2486                storage.generation()
2487            })?;
2488
2489        let batch = LexWalBatch {
2490            generation,
2491            doc_count,
2492            checksum,
2493            segments: embedded_segments.clone(),
2494        };
2495        self.append_lex_batch(&batch)?;
2496        self.persist_lex_manifest()?;
2497        self.header.toc_checksum = self.toc.toc_checksum;
2498        crate::persist_header(&mut self.file, &self.header)?;
2499        Ok(())
2500    }
2501
2502    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2503        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2504            frame_id,
2505            reason: "frame id too large",
2506        })?;
2507        let frame = self
2508            .toc
2509            .frames
2510            .get_mut(index)
2511            .ok_or(MemvidError::InvalidFrame {
2512                frame_id,
2513                reason: "delete target missing",
2514            })?;
2515        frame.status = FrameStatus::Deleted;
2516        frame.superseded_by = None;
2517        self.remove_frame_from_indexes(frame_id)
2518    }
2519
2520    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2521        #[cfg(feature = "lex")]
2522        if let Some(engine) = self.tantivy.as_mut() {
2523            engine.delete_frame(frame_id)?;
2524            self.tantivy_dirty = true;
2525        }
2526        if let Some(index) = self.lex_index.as_mut() {
2527            index.remove_document(frame_id);
2528        }
2529        if let Some(index) = self.vec_index.as_mut() {
2530            index.remove(frame_id);
2531        }
2532        Ok(())
2533    }
2534
2535    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2536        let Ok(index) = usize::try_from(frame_id) else {
2537            return false;
2538        };
2539        self.toc
2540            .frames
2541            .get(index)
2542            .is_some_and(|frame| frame.status == FrameStatus::Active)
2543    }
2544
2545    #[cfg(feature = "parallel_segments")]
2546    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2547    where
2548        I: IntoIterator<Item = FrameId>,
2549    {
2550        let mut iter = iter.into_iter();
2551        let first_id = iter.next()?;
2552        let first_frame = self.toc.frames.get(first_id as usize);
2553        let mut min_id = first_id;
2554        let mut max_id = first_id;
2555        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2556        let mut page_end = first_frame
2557            .and_then(|frame| frame.chunk_count)
2558            .map(|count| page_start + count.saturating_sub(1))
2559            .unwrap_or(page_start);
2560        for frame_id in iter {
2561            if frame_id < min_id {
2562                min_id = frame_id;
2563            }
2564            if frame_id > max_id {
2565                max_id = frame_id;
2566            }
2567            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2568                if let Some(idx) = frame.chunk_index {
2569                    page_start = page_start.min(idx);
2570                    if let Some(count) = frame.chunk_count {
2571                        let end = idx + count.saturating_sub(1);
2572                        page_end = page_end.max(end);
2573                    } else {
2574                        page_end = page_end.max(idx);
2575                    }
2576                }
2577            }
2578        }
2579        Some(SegmentSpan {
2580            frame_start: min_id,
2581            frame_end: max_id,
2582            page_start,
2583            page_end,
2584            ..SegmentSpan::default()
2585        })
2586    }
2587
2588    #[cfg(feature = "parallel_segments")]
2589    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2590        common.span = Some(span);
2591        if common.codec_version == 0 {
2592            common.codec_version = 1;
2593        }
2594    }
2595
2596    #[cfg(feature = "parallel_segments")]
2597    pub(crate) fn record_index_segment(
2598        &mut self,
2599        kind: SegmentKind,
2600        common: SegmentCommon,
2601        stats: SegmentStats,
2602    ) -> Result<()> {
2603        let entry = IndexSegmentRef {
2604            kind,
2605            common,
2606            stats,
2607        };
2608        self.toc.segment_catalog.index_segments.push(entry.clone());
2609        if let Some(wal) = self.manifest_wal.as_mut() {
2610            wal.append_segments(&[entry])?;
2611        }
2612        Ok(())
2613    }
2614
2615    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2616        self.ensure_writable()?;
2617        if self.toc.ticket_ref.issuer == "free-tier" {
2618            return Ok(());
2619        }
2620        match self.tier() {
2621            Tier::Free => Ok(()),
2622            tier => {
2623                if self.toc.ticket_ref.issuer.trim().is_empty() {
2624                    Err(MemvidError::TicketRequired { tier })
2625                } else {
2626                    Ok(())
2627                }
2628            }
2629        }
2630    }
2631
2632    pub(crate) fn tier(&self) -> Tier {
2633        if self.header.wal_size >= WAL_SIZE_LARGE {
2634            Tier::Enterprise
2635        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2636            Tier::Dev
2637        } else {
2638            Tier::Free
2639        }
2640    }
2641
2642    pub(crate) fn capacity_limit(&self) -> u64 {
2643        if self.toc.ticket_ref.capacity_bytes != 0 {
2644            self.toc.ticket_ref.capacity_bytes
2645        } else {
2646            self.tier().capacity_bytes()
2647        }
2648    }
2649
2650    /// Get current storage capacity in bytes.
2651    ///
2652    /// Returns the capacity from the applied ticket, or the default
2653    /// tier capacity (1 GB for free tier).
2654    #[must_use]
2655    pub fn get_capacity(&self) -> u64 {
2656        self.capacity_limit()
2657    }
2658
2659    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2660        tracing::info!(
2661            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2662            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2663            time_segments = self.toc.segment_catalog.time_segments.len(),
2664            footer_offset = self.header.footer_offset,
2665            data_end = self.data_end,
2666            "rewrite_toc_footer: about to serialize TOC"
2667        );
2668        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2669        let footer_offset = self.header.footer_offset;
2670        self.file.seek(SeekFrom::Start(footer_offset))?;
2671        self.file.write_all(&toc_bytes)?;
2672        let footer = CommitFooter {
2673            toc_len: toc_bytes.len() as u64,
2674            toc_hash: *hash(&toc_bytes).as_bytes(),
2675            generation: self.generation,
2676        };
2677        let encoded_footer = footer.encode();
2678        self.file.write_all(&encoded_footer)?;
2679
2680        // The file must always be at least header + WAL size
2681        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2682        let min_len = self.header.wal_offset + self.header.wal_size;
2683        let final_len = new_len.max(min_len);
2684
2685        if new_len < min_len {
2686            tracing::warn!(
2687                file.new_len = new_len,
2688                file.min_len = min_len,
2689                file.final_len = final_len,
2690                "truncation would cut into WAL region, clamping to min_len"
2691            );
2692        }
2693
2694        self.file.set_len(final_len)?;
2695        // Ensure footer is flushed to disk so mmap-based readers can find it
2696        self.file.sync_all()?;
2697        Ok(())
2698    }
2699}
2700
2701#[cfg(feature = "parallel_segments")]
2702impl Memvid {
2703    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2704        let chunks = self.collect_segment_chunks(delta)?;
2705        if chunks.is_empty() {
2706            return Ok(false);
2707        }
2708        let planner = SegmentPlanner::new(opts.clone());
2709        let plans = planner.plan_from_chunks(chunks);
2710        if plans.is_empty() {
2711            return Ok(false);
2712        }
2713        let worker_pool = SegmentWorkerPool::new(opts);
2714        let results = worker_pool.execute(plans)?;
2715        if results.is_empty() {
2716            return Ok(false);
2717        }
2718        self.append_parallel_segments(results)?;
2719        Ok(true)
2720    }
2721
2722    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2723        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2724            delta.inserted_embeddings.iter().cloned().collect();
2725        tracing::info!(
2726            inserted_frames = ?delta.inserted_frames,
2727            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2728            "collect_segment_chunks: comparing frame IDs"
2729        );
2730        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2731        for frame_id in &delta.inserted_frames {
2732            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2733                MemvidError::InvalidFrame {
2734                    frame_id: *frame_id,
2735                    reason: "frame id out of range while planning segments",
2736                },
2737            )?;
2738            let text = self.frame_content(&frame)?;
2739            if text.trim().is_empty() {
2740                continue;
2741            }
2742            let token_estimate = estimate_tokens(&text);
2743            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2744            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2745            let page_start = if frame.chunk_index.is_some() {
2746                chunk_index + 1
2747            } else {
2748                0
2749            };
2750            let page_end = if frame.chunk_index.is_some() {
2751                page_start
2752            } else {
2753                0
2754            };
2755            chunks.push(SegmentChunkPlan {
2756                text,
2757                frame_id: *frame_id,
2758                timestamp: frame.timestamp,
2759                chunk_index,
2760                chunk_count: chunk_count.max(1),
2761                token_estimate,
2762                token_start: 0,
2763                token_end: 0,
2764                page_start,
2765                page_end,
2766                embedding: embedding_map.remove(frame_id),
2767            });
2768        }
2769        Ok(chunks)
2770    }
2771}
2772
2773#[cfg(feature = "parallel_segments")]
2774fn estimate_tokens(text: &str) -> usize {
2775    text.split_whitespace().count().max(1)
2776}
2777
2778impl Memvid {
2779    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2780        let catalog_end = self.catalog_data_end();
2781        if catalog_end <= self.header.footer_offset {
2782            return Ok(false);
2783        }
2784        self.header.footer_offset = catalog_end;
2785        self.rewrite_toc_footer()?;
2786        self.header.toc_checksum = self.toc.toc_checksum;
2787        crate::persist_header(&mut self.file, &self.header)?;
2788        Ok(true)
2789    }
2790}
2791
2792impl Memvid {
2793    pub fn vacuum(&mut self) -> Result<()> {
2794        self.commit()?;
2795
2796        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2797        let frames: Vec<Frame> = self
2798            .toc
2799            .frames
2800            .iter()
2801            .filter(|frame| frame.status == FrameStatus::Active)
2802            .cloned()
2803            .collect();
2804        for frame in frames {
2805            let bytes = self.read_frame_payload_bytes(&frame)?;
2806            active_payloads.insert(frame.id, bytes);
2807        }
2808
2809        let mut cursor = self.header.wal_offset + self.header.wal_size;
2810        self.file.seek(SeekFrom::Start(cursor))?;
2811        for frame in &mut self.toc.frames {
2812            if frame.status == FrameStatus::Active {
2813                if let Some(bytes) = active_payloads.get(&frame.id) {
2814                    self.file.write_all(bytes)?;
2815                    frame.payload_offset = cursor;
2816                    frame.payload_length = bytes.len() as u64;
2817                    cursor += bytes.len() as u64;
2818                } else {
2819                    frame.payload_offset = 0;
2820                    frame.payload_length = 0;
2821                }
2822            } else {
2823                frame.payload_offset = 0;
2824                frame.payload_length = 0;
2825            }
2826        }
2827
2828        self.data_end = cursor;
2829
2830        self.toc.segments.clear();
2831        self.toc.indexes.lex_segments.clear();
2832        self.toc.segment_catalog.lex_segments.clear();
2833        self.toc.segment_catalog.vec_segments.clear();
2834        self.toc.segment_catalog.time_segments.clear();
2835        #[cfg(feature = "temporal_track")]
2836        {
2837            self.toc.temporal_track = None;
2838            self.toc.segment_catalog.temporal_segments.clear();
2839        }
2840        #[cfg(feature = "lex")]
2841        {
2842            self.toc.segment_catalog.tantivy_segments.clear();
2843        }
2844        #[cfg(feature = "parallel_segments")]
2845        {
2846            self.toc.segment_catalog.index_segments.clear();
2847        }
2848
2849        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
2850        #[cfg(feature = "lex")]
2851        {
2852            self.tantivy = None;
2853            self.tantivy_dirty = false;
2854        }
2855
2856        self.rebuild_indexes(&[])?;
2857        self.file.sync_all()?;
2858        Ok(())
2859    }
2860
2861    /// Preview how a document would be chunked without actually ingesting it.
2862    ///
2863    /// This is useful when you need to compute embeddings for each chunk externally
2864    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
2865    /// is too small to be chunked (< 2400 chars after normalization).
2866    ///
2867    /// # Example
2868    /// ```ignore
2869    /// let chunks = mem.preview_chunks(b"long document text...")?;
2870    /// if let Some(chunk_texts) = chunks {
2871    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
2872    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
2873    /// } else {
2874    ///     let embedding = my_embedder.embed_query(text)?;
2875    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
2876    /// }
2877    /// ```
2878    #[must_use]
2879    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2880        plan_document_chunks(payload).map(|plan| plan.chunks)
2881    }
2882
2883    /// Append raw bytes as a document frame.
2884    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2885        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2886    }
2887
2888    /// Append raw bytes with explicit metadata/options.
2889    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2890        self.put_internal(Some(payload), None, None, None, options, None)
2891    }
2892
2893    /// Append bytes and an existing embedding (bypasses on-device embedding).
2894    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2895        self.put_internal(
2896            Some(payload),
2897            None,
2898            Some(embedding),
2899            None,
2900            PutOptions::default(),
2901            None,
2902        )
2903    }
2904
2905    pub fn put_with_embedding_and_options(
2906        &mut self,
2907        payload: &[u8],
2908        embedding: Vec<f32>,
2909        options: PutOptions,
2910    ) -> Result<u64> {
2911        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2912    }
2913
2914    /// Ingest a document with pre-computed embeddings for both parent and chunks.
2915    ///
2916    /// This is the recommended API for high-accuracy semantic search when chunking
2917    /// occurs. The caller provides:
2918    /// - `payload`: The document bytes
2919    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
2920    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
2921    /// - `options`: Standard put options
2922    ///
2923    /// The number of chunk embeddings should match the number of chunks that will be
2924    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
2925    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
2926    pub fn put_with_chunk_embeddings(
2927        &mut self,
2928        payload: &[u8],
2929        parent_embedding: Option<Vec<f32>>,
2930        chunk_embeddings: Vec<Vec<f32>>,
2931        options: PutOptions,
2932    ) -> Result<u64> {
2933        self.put_internal(
2934            Some(payload),
2935            None,
2936            parent_embedding,
2937            Some(chunk_embeddings),
2938            options,
2939            None,
2940        )
2941    }
2942
2943    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
2944    pub fn update_frame(
2945        &mut self,
2946        frame_id: FrameId,
2947        payload: Option<Vec<u8>>,
2948        mut options: PutOptions,
2949        embedding: Option<Vec<f32>>,
2950    ) -> Result<u64> {
2951        self.ensure_mutation_allowed()?;
2952        let existing = self.frame_by_id(frame_id)?;
2953        if existing.status != FrameStatus::Active {
2954            return Err(MemvidError::InvalidFrame {
2955                frame_id,
2956                reason: "frame is not active",
2957            });
2958        }
2959
2960        if options.timestamp.is_none() {
2961            options.timestamp = Some(existing.timestamp);
2962        }
2963        if options.track.is_none() {
2964            options.track = existing.track.clone();
2965        }
2966        if options.kind.is_none() {
2967            options.kind = existing.kind.clone();
2968        }
2969        if options.uri.is_none() {
2970            options.uri = existing.uri.clone();
2971        }
2972        if options.title.is_none() {
2973            options.title = existing.title.clone();
2974        }
2975        if options.metadata.is_none() {
2976            options.metadata = existing.metadata.clone();
2977        }
2978        if options.search_text.is_none() {
2979            options.search_text = existing.search_text.clone();
2980        }
2981        if options.tags.is_empty() {
2982            options.tags = existing.tags.clone();
2983        }
2984        if options.labels.is_empty() {
2985            options.labels = existing.labels.clone();
2986        }
2987        if options.extra_metadata.is_empty() {
2988            options.extra_metadata = existing.extra_metadata.clone();
2989        }
2990
2991        let reuse_frame = if payload.is_none() {
2992            options.auto_tag = false;
2993            options.extract_dates = false;
2994            Some(existing.clone())
2995        } else {
2996            None
2997        };
2998
2999        let effective_embedding = if let Some(explicit) = embedding {
3000            Some(explicit)
3001        } else if self.vec_enabled {
3002            self.frame_embedding(frame_id)?
3003        } else {
3004            None
3005        };
3006
3007        let payload_slice = payload.as_deref();
3008        let reuse_flag = reuse_frame.is_some();
3009        let replace_flag = payload_slice.is_some();
3010        let seq = self.put_internal(
3011            payload_slice,
3012            reuse_frame,
3013            effective_embedding,
3014            None, // No chunk embeddings for update
3015            options,
3016            Some(frame_id),
3017        )?;
3018        info!(
3019            "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3020        );
3021        Ok(seq)
3022    }
3023
3024    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3025        self.ensure_mutation_allowed()?;
3026        let frame = self.frame_by_id(frame_id)?;
3027        if frame.status != FrameStatus::Active {
3028            return Err(MemvidError::InvalidFrame {
3029                frame_id,
3030                reason: "frame is not active",
3031            });
3032        }
3033
3034        let mut tombstone = WalEntryData {
3035            timestamp: SystemTime::now()
3036                .duration_since(UNIX_EPOCH)
3037                .map(|d| d.as_secs() as i64)
3038                .unwrap_or(frame.timestamp),
3039            kind: None,
3040            track: None,
3041            payload: Vec::new(),
3042            embedding: None,
3043            uri: frame.uri.clone(),
3044            title: frame.title.clone(),
3045            canonical_encoding: frame.canonical_encoding,
3046            canonical_length: frame.canonical_length,
3047            metadata: None,
3048            search_text: None,
3049            tags: Vec::new(),
3050            labels: Vec::new(),
3051            extra_metadata: BTreeMap::new(),
3052            content_dates: Vec::new(),
3053            chunk_manifest: None,
3054            role: frame.role,
3055            parent_sequence: None,
3056            chunk_index: frame.chunk_index,
3057            chunk_count: frame.chunk_count,
3058            op: FrameWalOp::Tombstone,
3059            target_frame_id: Some(frame_id),
3060            supersedes_frame_id: None,
3061            reuse_payload_from: None,
3062            source_sha256: None,
3063            source_path: None,
3064            enrichment_state: crate::types::EnrichmentState::default(),
3065        };
3066        tombstone.kind = frame.kind.clone();
3067        tombstone.track = frame.track.clone();
3068
3069        let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3070        let seq = self.append_wal_entry(&payload_bytes)?;
3071        self.dirty = true;
3072        if self.wal.should_checkpoint() {
3073            self.commit()?;
3074        }
3075        info!("frame_delete frame_id={frame_id} seq={seq}");
3076        Ok(seq)
3077    }
3078}
3079
3080impl Memvid {
3081    fn put_internal(
3082        &mut self,
3083        payload: Option<&[u8]>,
3084        reuse_frame: Option<Frame>,
3085        embedding: Option<Vec<f32>>,
3086        chunk_embeddings: Option<Vec<Vec<f32>>>,
3087        mut options: PutOptions,
3088        supersedes: Option<FrameId>,
3089    ) -> Result<u64> {
3090        self.ensure_mutation_allowed()?;
3091
3092        // Deduplication: if enabled and we have payload, check if identical content exists
3093        if options.dedup {
3094            if let Some(bytes) = payload {
3095                let content_hash = hash(bytes);
3096                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3097                    // Found existing frame with same content hash, skip ingestion
3098                    tracing::debug!(
3099                        frame_id = existing_frame.id,
3100                        "dedup: skipping ingestion, identical content already exists"
3101                    );
3102                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3103                    return Ok(existing_frame.id);
3104                }
3105            }
3106        }
3107
3108        if payload.is_some() && reuse_frame.is_some() {
3109            let frame_id = reuse_frame
3110                .as_ref()
3111                .map(|frame| frame.id)
3112                .unwrap_or_default();
3113            return Err(MemvidError::InvalidFrame {
3114                frame_id,
3115                reason: "cannot reuse payload when bytes are provided",
3116            });
3117        }
3118
3119        // If the caller supplies embeddings, enforce a single vector dimension contract
3120        // for the entire memory (fail fast, never silently accept mixed dimensions).
3121        let incoming_dimension = {
3122            let mut dim: Option<u32> = None;
3123
3124            if let Some(ref vector) = embedding {
3125                if !vector.is_empty() {
3126                    #[allow(clippy::cast_possible_truncation)]
3127                    let len = vector.len() as u32;
3128                    dim = Some(len);
3129                }
3130            }
3131
3132            if let Some(ref vectors) = chunk_embeddings {
3133                for vector in vectors {
3134                    if vector.is_empty() {
3135                        continue;
3136                    }
3137                    let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3138                    match dim {
3139                        None => dim = Some(vec_dim),
3140                        Some(existing) if existing == vec_dim => {}
3141                        Some(existing) => {
3142                            return Err(MemvidError::VecDimensionMismatch {
3143                                expected: existing,
3144                                actual: vector.len(),
3145                            });
3146                        }
3147                    }
3148                }
3149            }
3150
3151            dim
3152        };
3153
3154        if let Some(incoming_dimension) = incoming_dimension {
3155            // Embeddings imply vector search should be enabled.
3156            if !self.vec_enabled {
3157                self.enable_vec()?;
3158            }
3159
3160            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3161                if existing_dimension != incoming_dimension {
3162                    return Err(MemvidError::VecDimensionMismatch {
3163                        expected: existing_dimension,
3164                        actual: incoming_dimension as usize,
3165                    });
3166                }
3167            }
3168
3169            // Persist the dimension early for better auto-detection (even before the next commit).
3170            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3171                if manifest.dimension == 0 {
3172                    manifest.dimension = incoming_dimension;
3173                }
3174            }
3175        }
3176
3177        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3178        let payload_tail = self.payload_region_end();
3179        let projected = if let Some(bytes) = payload {
3180            let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3181            let len = prepared.len();
3182            prepared_payload = Some((prepared, encoding, length));
3183            payload_tail.saturating_add(len as u64)
3184        } else if reuse_frame.is_some() {
3185            payload_tail
3186        } else {
3187            return Err(MemvidError::InvalidFrame {
3188                frame_id: 0,
3189                reason: "payload required for frame insertion",
3190            });
3191        };
3192
3193        let capacity_limit = self.capacity_limit();
3194        if projected > capacity_limit {
3195            let incoming_size = projected.saturating_sub(payload_tail);
3196            return Err(MemvidError::CapacityExceeded {
3197                current: payload_tail,
3198                limit: capacity_limit,
3199                required: incoming_size,
3200            });
3201        }
3202        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3203            SystemTime::now()
3204                .duration_since(UNIX_EPOCH)
3205                .map(|d| d.as_secs() as i64)
3206                .unwrap_or(0)
3207        });
3208
3209        #[allow(unused_assignments)]
3210        let mut reuse_bytes: Option<Vec<u8>> = None;
3211        let payload_for_processing = if let Some(bytes) = payload {
3212            Some(bytes)
3213        } else if let Some(frame) = reuse_frame.as_ref() {
3214            let bytes = self.frame_canonical_bytes(frame)?;
3215            reuse_bytes = Some(bytes);
3216            reuse_bytes.as_deref()
3217        } else {
3218            None
3219        };
3220
3221        // Try to create a chunk plan from raw UTF-8 bytes first
3222        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3223            (Some(bytes), None) => plan_document_chunks(bytes),
3224            _ => None,
3225        };
3226
3227        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3228        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3229        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3230        let mut source_sha256: Option<[u8; 32]> = None;
3231        let source_path_value = options.source_path.take();
3232
3233        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3234            if raw_chunk_plan.is_some() {
3235                // UTF-8 text document - chunks contain the text, no parent payload needed
3236                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3237            } else if options.no_raw {
3238                // --no-raw mode: don't store the raw binary, only compute hash
3239                if let Some(bytes) = payload {
3240                    // Compute BLAKE3 hash of original binary for verification
3241                    let hash_result = hash(bytes);
3242                    source_sha256 = Some(*hash_result.as_bytes());
3243                    // Store empty payload - the extracted text is in search_text
3244                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3245                } else {
3246                    return Err(MemvidError::InvalidFrame {
3247                        frame_id: 0,
3248                        reason: "payload required for --no-raw mode",
3249                    });
3250                }
3251            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3252                (prepared, encoding, length, None)
3253            } else if let Some(bytes) = payload {
3254                let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3255                (prepared, encoding, length, None)
3256            } else if let Some(frame) = reuse_frame.as_ref() {
3257                (
3258                    Vec::new(),
3259                    frame.canonical_encoding,
3260                    frame.canonical_length,
3261                    Some(frame.id),
3262                )
3263            } else {
3264                return Err(MemvidError::InvalidFrame {
3265                    frame_id: 0,
3266                    reason: "payload required for frame insertion",
3267                });
3268            };
3269
3270        // Track whether we'll create an extracted text chunk plan later
3271        let mut chunk_plan = raw_chunk_plan;
3272
3273        let mut metadata = options.metadata.take();
3274        let mut search_text = options
3275            .search_text
3276            .take()
3277            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3278        let mut tags = std::mem::take(&mut options.tags);
3279        let mut labels = std::mem::take(&mut options.labels);
3280        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3281        let mut content_dates: Vec<String> = Vec::new();
3282
3283        let need_search_text = search_text
3284            .as_ref()
3285            .is_none_or(|text| text.trim().is_empty());
3286        let need_metadata = metadata.is_none();
3287        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3288
3289        let mut extraction_error = None;
3290        let mut is_skim_extraction = false; // Track if extraction was time-limited
3291
3292        let extracted = if run_extractor {
3293            if let Some(bytes) = payload_for_processing {
3294                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3295                let uri_hint = options.uri.as_deref();
3296
3297                // Use time-budgeted extraction for instant indexing with a budget
3298                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3299
3300                if use_budgeted {
3301                    // Time-budgeted extraction for sub-second ingestion
3302                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3303                        options.extraction_budget_ms,
3304                    );
3305                    match crate::extract_budgeted::extract_with_budget(
3306                        bytes, mime_hint, uri_hint, budget,
3307                    ) {
3308                        Ok(result) => {
3309                            is_skim_extraction = result.is_skim();
3310                            if is_skim_extraction {
3311                                tracing::debug!(
3312                                    coverage = result.coverage,
3313                                    elapsed_ms = result.elapsed_ms,
3314                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3315                                    "time-budgeted extraction (skim)"
3316                                );
3317                            }
3318                            // Convert BudgetedExtractionResult to ExtractedDocument
3319                            let doc = crate::extract::ExtractedDocument {
3320                                text: if result.text.is_empty() {
3321                                    None
3322                                } else {
3323                                    Some(result.text)
3324                                },
3325                                metadata: serde_json::json!({
3326                                    "skim": is_skim_extraction,
3327                                    "coverage": result.coverage,
3328                                    "sections_extracted": result.sections_extracted,
3329                                    "sections_total": result.sections_total,
3330                                }),
3331                                mime_type: mime_hint.map(std::string::ToString::to_string),
3332                            };
3333                            Some(doc)
3334                        }
3335                        Err(err) => {
3336                            // Fall back to full extraction on budgeted extraction error
3337                            tracing::warn!(
3338                                ?err,
3339                                "budgeted extraction failed, trying full extraction"
3340                            );
3341                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3342                                Ok(doc) => Some(doc),
3343                                Err(err) => {
3344                                    extraction_error = Some(err);
3345                                    None
3346                                }
3347                            }
3348                        }
3349                    }
3350                } else {
3351                    // Full extraction (no time budget)
3352                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3353                        Ok(doc) => Some(doc),
3354                        Err(err) => {
3355                            extraction_error = Some(err);
3356                            None
3357                        }
3358                    }
3359                }
3360            } else {
3361                None
3362            }
3363        } else {
3364            None
3365        };
3366
3367        if let Some(err) = extraction_error {
3368            return Err(err);
3369        }
3370
3371        if let Some(doc) = &extracted {
3372            if need_search_text {
3373                if let Some(text) = &doc.text {
3374                    if let Some(normalized) =
3375                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3376                    {
3377                        search_text = Some(normalized);
3378                    }
3379                }
3380            }
3381
3382            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3383            // from extracted text. This ensures large documents like PDFs get fully indexed.
3384            if chunk_plan.is_none() {
3385                if let Some(text) = &doc.text {
3386                    chunk_plan = plan_text_chunks(text);
3387                }
3388            }
3389
3390            if let Some(mime) = doc.mime_type.as_ref() {
3391                if let Some(existing) = &mut metadata {
3392                    if existing.mime.is_none() {
3393                        existing.mime = Some(mime.clone());
3394                    }
3395                } else {
3396                    let mut doc_meta = DocMetadata::default();
3397                    doc_meta.mime = Some(mime.clone());
3398                    metadata = Some(doc_meta);
3399                }
3400            }
3401
3402            // Only add extractous_metadata when auto_tag is enabled
3403            // This allows callers to disable metadata pollution by setting auto_tag=false
3404            if options.auto_tag {
3405                if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3406                    extra_metadata
3407                        .entry("extractous_metadata".to_string())
3408                        .or_insert(meta_json);
3409                }
3410            }
3411        }
3412
3413        if options.auto_tag {
3414            if let Some(ref text) = search_text {
3415                if !text.trim().is_empty() {
3416                    let result = AutoTagger.analyse(text, options.extract_dates);
3417                    merge_unique(&mut tags, result.tags);
3418                    merge_unique(&mut labels, result.labels);
3419                    if options.extract_dates && content_dates.is_empty() {
3420                        content_dates = result.content_dates;
3421                    }
3422                }
3423            }
3424        }
3425
3426        if content_dates.is_empty() {
3427            if let Some(frame) = reuse_frame.as_ref() {
3428                content_dates = frame.content_dates.clone();
3429            }
3430        }
3431
3432        let metadata_ref = metadata.as_ref();
3433        let mut search_text = augment_search_text(
3434            search_text,
3435            options.uri.as_deref(),
3436            options.title.as_deref(),
3437            options.track.as_deref(),
3438            &tags,
3439            &labels,
3440            &extra_metadata,
3441            &content_dates,
3442            metadata_ref,
3443        );
3444        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3445        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3446        let mut parent_chunk_count: Option<u32> = None;
3447
3448        let kind_value = options.kind.take();
3449        let track_value = options.track.take();
3450        let uri_value = options.uri.take();
3451        let title_value = options.title.take();
3452        let should_extract_triplets = options.extract_triplets;
3453        // Save references for triplet extraction (after search_text is moved into WAL entry)
3454        let triplet_uri = uri_value.clone();
3455        let triplet_title = title_value.clone();
3456
3457        if let Some(plan) = chunk_plan.as_ref() {
3458            let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3459            parent_chunk_manifest = Some(plan.manifest.clone());
3460            parent_chunk_count = Some(chunk_total);
3461
3462            if let Some(first_chunk) = plan.chunks.first() {
3463                if let Some(normalized) =
3464                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3465                {
3466                    if !normalized.trim().is_empty() {
3467                        search_text = Some(normalized);
3468                    }
3469                }
3470            }
3471
3472            let chunk_tags = tags.clone();
3473            let chunk_labels = labels.clone();
3474            let chunk_metadata = metadata.clone();
3475            let chunk_extra_metadata = extra_metadata.clone();
3476            let chunk_content_dates = content_dates.clone();
3477
3478            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3479                let (chunk_payload, chunk_encoding, chunk_length) =
3480                    prepare_canonical_payload(chunk_text.as_bytes())?;
3481                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3482                    .map(|n| n.text)
3483                    .filter(|text| !text.trim().is_empty());
3484
3485                let chunk_uri = uri_value
3486                    .as_ref()
3487                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3488                let chunk_title = title_value
3489                    .as_ref()
3490                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3491
3492                // Use provided chunk embedding if available, otherwise None
3493                let chunk_embedding = chunk_embeddings
3494                    .as_ref()
3495                    .and_then(|embeddings| embeddings.get(idx).cloned());
3496
3497                chunk_entries.push(WalEntryData {
3498                    timestamp,
3499                    kind: kind_value.clone(),
3500                    track: track_value.clone(),
3501                    payload: chunk_payload,
3502                    embedding: chunk_embedding,
3503                    uri: chunk_uri,
3504                    title: chunk_title,
3505                    canonical_encoding: chunk_encoding,
3506                    canonical_length: chunk_length,
3507                    metadata: chunk_metadata.clone(),
3508                    search_text: chunk_search_text,
3509                    tags: chunk_tags.clone(),
3510                    labels: chunk_labels.clone(),
3511                    extra_metadata: chunk_extra_metadata.clone(),
3512                    content_dates: chunk_content_dates.clone(),
3513                    chunk_manifest: None,
3514                    role: FrameRole::DocumentChunk,
3515                    parent_sequence: None,
3516                    chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3517                    chunk_count: Some(chunk_total),
3518                    op: FrameWalOp::Insert,
3519                    target_frame_id: None,
3520                    supersedes_frame_id: None,
3521                    reuse_payload_from: None,
3522                    source_sha256: None, // Chunks don't have source references
3523                    source_path: None,
3524                    // Chunks are already extracted, so mark as Enriched
3525                    enrichment_state: crate::types::EnrichmentState::Enriched,
3526                });
3527            }
3528        }
3529
3530        let parent_uri = uri_value.clone();
3531        let parent_title = title_value.clone();
3532
3533        // Get parent_sequence from options.parent_id if provided
3534        // We need the WAL sequence of the parent frame to link them
3535        let parent_sequence = if let Some(parent_id) = options.parent_id {
3536            // Look up the parent frame to get its WAL sequence
3537            // Since frame.id corresponds to the array index, we need to find the sequence
3538            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3539            // This works because sequence numbers are assigned incrementally
3540            usize::try_from(parent_id)
3541                .ok()
3542                .and_then(|idx| self.toc.frames.get(idx))
3543                .map(|_| parent_id + 2) // WAL sequences start at 2
3544        } else {
3545            None
3546        };
3547
3548        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3549        let triplet_text = search_text.clone();
3550
3551        // Capture values needed for instant indexing BEFORE they're moved into entry
3552        #[cfg(feature = "lex")]
3553        let instant_index_tags = if options.instant_index {
3554            tags.clone()
3555        } else {
3556            Vec::new()
3557        };
3558        #[cfg(feature = "lex")]
3559        let instant_index_labels = if options.instant_index {
3560            labels.clone()
3561        } else {
3562            Vec::new()
3563        };
3564
3565        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3566        #[cfg(feature = "lex")]
3567        let needs_enrichment =
3568            options.instant_index && (options.enable_embedding || is_skim_extraction);
3569        #[cfg(feature = "lex")]
3570        let enrichment_state = if needs_enrichment {
3571            crate::types::EnrichmentState::Searchable
3572        } else {
3573            crate::types::EnrichmentState::Enriched
3574        };
3575        #[cfg(not(feature = "lex"))]
3576        let enrichment_state = crate::types::EnrichmentState::Enriched;
3577
3578        let entry = WalEntryData {
3579            timestamp,
3580            kind: kind_value,
3581            track: track_value,
3582            payload: storage_payload,
3583            embedding,
3584            uri: parent_uri,
3585            title: parent_title,
3586            canonical_encoding,
3587            canonical_length,
3588            metadata,
3589            search_text,
3590            tags,
3591            labels,
3592            extra_metadata,
3593            content_dates,
3594            chunk_manifest: parent_chunk_manifest,
3595            role: options.role,
3596            parent_sequence,
3597            chunk_index: None,
3598            chunk_count: parent_chunk_count,
3599            op: FrameWalOp::Insert,
3600            target_frame_id: None,
3601            supersedes_frame_id: supersedes,
3602            reuse_payload_from,
3603            source_sha256,
3604            source_path: source_path_value,
3605            enrichment_state,
3606        };
3607
3608        let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3609        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3610        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3611
3612        // Instant indexing: make frame searchable immediately (<1s) without full commit
3613        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3614        #[cfg(feature = "lex")]
3615        if options.instant_index && self.tantivy.is_some() {
3616            // Create a minimal frame for indexing
3617            let frame_id = parent_seq as FrameId;
3618
3619            // Use triplet_text which was cloned before entry was created
3620            if let Some(ref text) = triplet_text {
3621                if !text.trim().is_empty() {
3622                    // Create temporary frame for indexing (minimal fields for Tantivy)
3623                    let temp_frame = Frame {
3624                        id: frame_id,
3625                        timestamp,
3626                        anchor_ts: None,
3627                        anchor_source: None,
3628                        kind: options.kind.clone(),
3629                        track: options.track.clone(),
3630                        payload_offset: 0,
3631                        payload_length: 0,
3632                        checksum: [0u8; 32],
3633                        uri: options
3634                            .uri
3635                            .clone()
3636                            .or_else(|| Some(crate::default_uri(frame_id))),
3637                        title: options.title.clone(),
3638                        canonical_encoding: crate::types::CanonicalEncoding::default(),
3639                        canonical_length: None,
3640                        metadata: None, // Not needed for text search
3641                        search_text: triplet_text.clone(),
3642                        tags: instant_index_tags.clone(),
3643                        labels: instant_index_labels.clone(),
3644                        extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3645                        content_dates: Vec::new(),                         // Not needed for search
3646                        chunk_manifest: None,
3647                        role: options.role,
3648                        parent_id: None,
3649                        chunk_index: None,
3650                        chunk_count: None,
3651                        status: FrameStatus::Active,
3652                        supersedes,
3653                        superseded_by: None,
3654                        source_sha256: None, // Not needed for search
3655                        source_path: None,   // Not needed for search
3656                        enrichment_state: crate::types::EnrichmentState::Searchable,
3657                    };
3658
3659                    // Get mutable reference to engine and index the frame
3660                    if let Some(engine) = self.tantivy.as_mut() {
3661                        engine.add_frame(&temp_frame, text)?;
3662                        engine.soft_commit()?;
3663                        self.tantivy_dirty = true;
3664
3665                        tracing::debug!(
3666                            frame_id = frame_id,
3667                            "instant index: frame searchable immediately"
3668                        );
3669                    }
3670                }
3671            }
3672        }
3673
3674        // Queue frame for background enrichment when using instant index path
3675        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3676        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3677        #[cfg(feature = "lex")]
3678        if needs_enrichment {
3679            let frame_id = parent_seq as FrameId;
3680            self.toc.enrichment_queue.push(frame_id);
3681            tracing::debug!(
3682                frame_id = frame_id,
3683                is_skim = is_skim_extraction,
3684                needs_embedding = options.enable_embedding,
3685                "queued frame for background enrichment"
3686            );
3687        }
3688
3689        for mut chunk_entry in chunk_entries {
3690            chunk_entry.parent_sequence = Some(parent_seq);
3691            let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3692            self.append_wal_entry(&chunk_bytes)?;
3693            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3694        }
3695
3696        self.dirty = true;
3697        if self.wal.should_checkpoint() {
3698            self.commit()?;
3699        }
3700
3701        // Record the put action if a replay session is active
3702        #[cfg(feature = "replay")]
3703        if let Some(input_bytes) = payload {
3704            self.record_put_action(parent_seq, input_bytes);
3705        }
3706
3707        // Extract triplets if enabled (default: true)
3708        // Triplets are stored as MemoryCards with entity/slot/value structure
3709        if should_extract_triplets {
3710            if let Some(ref text) = triplet_text {
3711                if !text.trim().is_empty() {
3712                    let extractor = TripletExtractor::default();
3713                    let frame_id = parent_seq as FrameId;
3714                    let (cards, _stats) = extractor.extract(
3715                        frame_id,
3716                        text,
3717                        triplet_uri.as_deref(),
3718                        triplet_title.as_deref(),
3719                        timestamp,
3720                    );
3721
3722                    if !cards.is_empty() {
3723                        // Add cards to memories track
3724                        let card_ids = self.memories_track.add_cards(cards);
3725
3726                        // Record enrichment for incremental processing
3727                        self.memories_track
3728                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3729                    }
3730                }
3731            }
3732        }
3733
3734        Ok(parent_seq)
3735    }
3736}
3737
3738#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3739#[serde(rename_all = "snake_case")]
3740pub(crate) enum FrameWalOp {
3741    Insert,
3742    Tombstone,
3743}
3744
3745impl Default for FrameWalOp {
3746    fn default() -> Self {
3747        Self::Insert
3748    }
3749}
3750
3751#[derive(Debug, Serialize, Deserialize)]
3752enum WalEntry {
3753    Frame(WalEntryData),
3754    #[cfg(feature = "lex")]
3755    Lex(LexWalBatch),
3756}
3757
3758fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3759    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3760        return Ok(entry);
3761    }
3762    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3763    Ok(WalEntry::Frame(legacy))
3764}
3765
3766#[derive(Debug, Serialize, Deserialize)]
3767pub(crate) struct WalEntryData {
3768    pub(crate) timestamp: i64,
3769    pub(crate) kind: Option<String>,
3770    pub(crate) track: Option<String>,
3771    pub(crate) payload: Vec<u8>,
3772    pub(crate) embedding: Option<Vec<f32>>,
3773    #[serde(default)]
3774    pub(crate) uri: Option<String>,
3775    #[serde(default)]
3776    pub(crate) title: Option<String>,
3777    #[serde(default)]
3778    pub(crate) canonical_encoding: CanonicalEncoding,
3779    #[serde(default)]
3780    pub(crate) canonical_length: Option<u64>,
3781    #[serde(default)]
3782    pub(crate) metadata: Option<DocMetadata>,
3783    #[serde(default)]
3784    pub(crate) search_text: Option<String>,
3785    #[serde(default)]
3786    pub(crate) tags: Vec<String>,
3787    #[serde(default)]
3788    pub(crate) labels: Vec<String>,
3789    #[serde(default)]
3790    pub(crate) extra_metadata: BTreeMap<String, String>,
3791    #[serde(default)]
3792    pub(crate) content_dates: Vec<String>,
3793    #[serde(default)]
3794    pub(crate) chunk_manifest: Option<TextChunkManifest>,
3795    #[serde(default)]
3796    pub(crate) role: FrameRole,
3797    #[serde(default)]
3798    pub(crate) parent_sequence: Option<u64>,
3799    #[serde(default)]
3800    pub(crate) chunk_index: Option<u32>,
3801    #[serde(default)]
3802    pub(crate) chunk_count: Option<u32>,
3803    #[serde(default)]
3804    pub(crate) op: FrameWalOp,
3805    #[serde(default)]
3806    pub(crate) target_frame_id: Option<FrameId>,
3807    #[serde(default)]
3808    pub(crate) supersedes_frame_id: Option<FrameId>,
3809    #[serde(default)]
3810    pub(crate) reuse_payload_from: Option<FrameId>,
3811    /// SHA-256 hash of original source file (set when --no-raw is used).
3812    #[serde(default)]
3813    pub(crate) source_sha256: Option<[u8; 32]>,
3814    /// Original source file path (set when --no-raw is used).
3815    #[serde(default)]
3816    pub(crate) source_path: Option<String>,
3817    /// Enrichment state for progressive ingestion.
3818    #[serde(default)]
3819    pub(crate) enrichment_state: crate::types::EnrichmentState,
3820}
3821
3822pub(crate) fn prepare_canonical_payload(
3823    payload: &[u8],
3824) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3825    if std::str::from_utf8(payload).is_ok() {
3826        let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3827        Ok((
3828            compressed,
3829            CanonicalEncoding::Zstd,
3830            Some(payload.len() as u64),
3831        ))
3832    } else {
3833        Ok((
3834            payload.to_vec(),
3835            CanonicalEncoding::Plain,
3836            Some(payload.len() as u64),
3837        ))
3838    }
3839}
3840
3841pub(crate) fn augment_search_text(
3842    base: Option<String>,
3843    uri: Option<&str>,
3844    title: Option<&str>,
3845    track: Option<&str>,
3846    tags: &[String],
3847    labels: &[String],
3848    extra_metadata: &BTreeMap<String, String>,
3849    content_dates: &[String],
3850    metadata: Option<&DocMetadata>,
3851) -> Option<String> {
3852    let mut segments: Vec<String> = Vec::new();
3853    if let Some(text) = base {
3854        let trimmed = text.trim();
3855        if !trimmed.is_empty() {
3856            segments.push(trimmed.to_string());
3857        }
3858    }
3859
3860    if let Some(title) = title {
3861        if !title.trim().is_empty() {
3862            segments.push(format!("title: {}", title.trim()));
3863        }
3864    }
3865
3866    if let Some(uri) = uri {
3867        if !uri.trim().is_empty() {
3868            segments.push(format!("uri: {}", uri.trim()));
3869        }
3870    }
3871
3872    if let Some(track) = track {
3873        if !track.trim().is_empty() {
3874            segments.push(format!("track: {}", track.trim()));
3875        }
3876    }
3877
3878    if !tags.is_empty() {
3879        segments.push(format!("tags: {}", tags.join(" ")));
3880    }
3881
3882    if !labels.is_empty() {
3883        segments.push(format!("labels: {}", labels.join(" ")));
3884    }
3885
3886    if !extra_metadata.is_empty() {
3887        for (key, value) in extra_metadata {
3888            if value.trim().is_empty() {
3889                continue;
3890            }
3891            segments.push(format!("{key}: {value}"));
3892        }
3893    }
3894
3895    if !content_dates.is_empty() {
3896        segments.push(format!("dates: {}", content_dates.join(" ")));
3897    }
3898
3899    if let Some(meta) = metadata {
3900        if let Ok(meta_json) = serde_json::to_string(meta) {
3901            segments.push(format!("metadata: {meta_json}"));
3902        }
3903    }
3904
3905    if segments.is_empty() {
3906        None
3907    } else {
3908        Some(segments.join("\n"))
3909    }
3910}
3911
3912pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3913    if additions.is_empty() {
3914        return;
3915    }
3916    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3917    for value in additions {
3918        let trimmed = value.trim();
3919        if trimmed.is_empty() {
3920            continue;
3921        }
3922        let candidate = trimmed.to_string();
3923        if seen.insert(candidate.clone()) {
3924            target.push(candidate);
3925        }
3926    }
3927}