memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
58    SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    pub fn new(mode: CommitMode) -> Self {
210        Self {
211            mode,
212            background: false,
213        }
214    }
215
216    pub fn background(mut self, background: bool) -> Self {
217        self.background = background;
218        self
219    }
220}
221
222fn default_reader_registry() -> &'static ReaderRegistry {
223    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
224    REGISTRY.get_or_init(ReaderRegistry::default)
225}
226
227fn infer_document_format(
228    mime: Option<&str>,
229    magic: Option<&[u8]>,
230    uri: Option<&str>,
231) -> Option<DocumentFormat> {
232    // Check PDF magic bytes first
233    if detect_pdf_magic(magic) {
234        return Some(DocumentFormat::Pdf);
235    }
236
237    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
238    // so we need to check file extension to distinguish them
239    if let Some(magic_bytes) = magic {
240        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
241            // It's a ZIP file - check extension to determine OOXML type
242            if let Some(format) = infer_format_from_extension(uri) {
243                return Some(format);
244            }
245        }
246    }
247
248    // Try MIME type
249    if let Some(mime_str) = mime {
250        let mime_lower = mime_str.trim().to_ascii_lowercase();
251        let format = match mime_lower.as_str() {
252            "application/pdf" => Some(DocumentFormat::Pdf),
253            "text/plain" => Some(DocumentFormat::PlainText),
254            "text/markdown" => Some(DocumentFormat::Markdown),
255            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
256            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
257                Some(DocumentFormat::Docx)
258            }
259            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
260                Some(DocumentFormat::Xlsx)
261            }
262            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
263            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
264                Some(DocumentFormat::Pptx)
265            }
266            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
267            _ => None,
268        };
269        if format.is_some() {
270            return format;
271        }
272    }
273
274    // Fall back to extension-based detection
275    infer_format_from_extension(uri)
276}
277
278/// Infer document format from file extension in URI/path
279fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
280    let uri = uri?;
281    let path = std::path::Path::new(uri);
282    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
283    match ext.as_str() {
284        "pdf" => Some(DocumentFormat::Pdf),
285        "docx" => Some(DocumentFormat::Docx),
286        "xlsx" => Some(DocumentFormat::Xlsx),
287        "xls" => Some(DocumentFormat::Xls),
288        "pptx" => Some(DocumentFormat::Pptx),
289        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
290        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
291        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
292        | "sql" => Some(DocumentFormat::PlainText),
293        "md" | "markdown" => Some(DocumentFormat::Markdown),
294        "html" | "htm" => Some(DocumentFormat::Html),
295        _ => None,
296    }
297}
298
299fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
300    let mut slice = match magic {
301        Some(slice) if !slice.is_empty() => slice,
302        _ => return false,
303    };
304    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
305        slice = &slice[3..];
306    }
307    while let Some((first, rest)) = slice.split_first() {
308        if first.is_ascii_whitespace() {
309            slice = rest;
310        } else {
311            break;
312        }
313    }
314    slice.starts_with(b"%PDF")
315}
316
317#[instrument(
318    target = "memvid::extract",
319    skip_all,
320    fields(mime = mime_hint, uri = uri)
321)]
322fn extract_via_registry(
323    bytes: &[u8],
324    mime_hint: Option<&str>,
325    uri: Option<&str>,
326) -> Result<ExtractedDocument> {
327    let registry = default_reader_registry();
328    let magic = bytes
329        .get(..MAGIC_SNIFF_BYTES)
330        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
331    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
332        .with_uri(uri)
333        .with_magic(magic);
334
335    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
336        let start = Instant::now();
337        match reader.extract(bytes, &hint) {
338            Ok(output) => {
339                return Ok(finalize_reader_output(output, start));
340            }
341            Err(err) => {
342                tracing::error!(
343                    target = "memvid::extract",
344                    reader = reader.name(),
345                    error = %err,
346                    "reader failed; falling back"
347                );
348                Some(format!("reader {} failed: {err}", reader.name()))
349            }
350        }
351    } else {
352        tracing::debug!(
353            target = "memvid::extract",
354            format = hint.format.map(|f| f.label()),
355            "no reader matched; using default extractor"
356        );
357        Some("no registered reader matched; using default extractor".to_string())
358    };
359
360    let start = Instant::now();
361    let mut output = PassthroughReader.extract(bytes, &hint)?;
362    if let Some(reason) = fallback_reason {
363        output.diagnostics.track_warning(reason);
364    }
365    Ok(finalize_reader_output(output, start))
366}
367
368fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
369    let elapsed = start.elapsed();
370    let ReaderOutput {
371        document,
372        reader_name,
373        diagnostics,
374    } = output;
375    log_reader_result(&reader_name, &diagnostics, elapsed);
376    document
377}
378
379fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
380    let duration_ms = diagnostics
381        .duration_ms
382        .unwrap_or_else(|| elapsed.as_millis() as u64);
383    let warnings = diagnostics.warnings.len();
384    let pages = diagnostics.pages_processed;
385
386    if warnings > 0 || diagnostics.fallback {
387        tracing::warn!(
388            target = "memvid::extract",
389            reader,
390            duration_ms,
391            pages,
392            warnings,
393            fallback = diagnostics.fallback,
394            "extraction completed with warnings"
395        );
396        for warning in diagnostics.warnings.iter() {
397            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
398        }
399    } else {
400        tracing::info!(
401            target = "memvid::extract",
402            reader,
403            duration_ms,
404            pages,
405            "extraction completed"
406        );
407    }
408}
409
410impl Memvid {
411    // -- Public ingestion entrypoints ---------------------------------------------------------
412
413    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
414    where
415        F: FnOnce(&mut Self) -> Result<()>,
416    {
417        self.file.sync_all()?;
418        let mut staging = CommitStaging::prepare(self.path())?;
419        staging.copy_from(&self.file)?;
420
421        let staging_handle = staging.clone_file()?;
422        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
423        let original_file = std::mem::replace(&mut self.file, staging_handle);
424        let original_wal = std::mem::replace(&mut self.wal, new_wal);
425        let original_header = self.header.clone();
426        let original_toc = self.toc.clone();
427        let original_data_end = self.data_end;
428        let original_generation = self.generation;
429        let original_dirty = self.dirty;
430        #[cfg(feature = "lex")]
431        let original_tantivy_dirty = self.tantivy_dirty;
432
433        let destination_path = self.path().to_path_buf();
434        let mut original_file = Some(original_file);
435        let mut original_wal = Some(original_wal);
436
437        match op(self) {
438            Ok(()) => {
439                self.file.sync_all()?;
440                match staging.commit() {
441                    Ok(()) => {
442                        drop(original_file.take());
443                        drop(original_wal.take());
444                        self.file = OpenOptions::new()
445                            .read(true)
446                            .write(true)
447                            .open(&destination_path)?;
448                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
449                        Ok(())
450                    }
451                    Err(commit_err) => {
452                        if let Some(file) = original_file.take() {
453                            self.file = file;
454                        }
455                        if let Some(wal) = original_wal.take() {
456                            self.wal = wal;
457                        }
458                        self.header = original_header;
459                        self.toc = original_toc;
460                        self.data_end = original_data_end;
461                        self.generation = original_generation;
462                        self.dirty = original_dirty;
463                        #[cfg(feature = "lex")]
464                        {
465                            self.tantivy_dirty = original_tantivy_dirty;
466                        }
467                        Err(commit_err.into())
468                    }
469                }
470            }
471            Err(err) => {
472                let _ = staging.discard();
473                if let Some(file) = original_file.take() {
474                    self.file = file;
475                }
476                if let Some(wal) = original_wal.take() {
477                    self.wal = wal;
478                }
479                self.header = original_header;
480                self.toc = original_toc;
481                self.data_end = original_data_end;
482                self.generation = original_generation;
483                self.dirty = original_dirty;
484                #[cfg(feature = "lex")]
485                {
486                    self.tantivy_dirty = original_tantivy_dirty;
487                }
488                Err(err)
489            }
490        }
491    }
492
493    pub(crate) fn catalog_data_end(&self) -> u64 {
494        let mut max_end = self.header.wal_offset + self.header.wal_size;
495
496        for descriptor in &self.toc.segment_catalog.lex_segments {
497            if descriptor.common.bytes_length == 0 {
498                continue;
499            }
500            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
501        }
502
503        for descriptor in &self.toc.segment_catalog.vec_segments {
504            if descriptor.common.bytes_length == 0 {
505                continue;
506            }
507            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
508        }
509
510        for descriptor in &self.toc.segment_catalog.time_segments {
511            if descriptor.common.bytes_length == 0 {
512                continue;
513            }
514            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
515        }
516
517        #[cfg(feature = "temporal_track")]
518        for descriptor in &self.toc.segment_catalog.temporal_segments {
519            if descriptor.common.bytes_length == 0 {
520                continue;
521            }
522            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
523        }
524
525        #[cfg(feature = "lex")]
526        for descriptor in &self.toc.segment_catalog.tantivy_segments {
527            if descriptor.common.bytes_length == 0 {
528                continue;
529            }
530            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
531        }
532
533        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
534            if manifest.bytes_length != 0 {
535                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
536            }
537        }
538
539        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
540            if manifest.bytes_length != 0 {
541                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
542            }
543        }
544
545        if let Some(manifest) = self.toc.time_index.as_ref() {
546            if manifest.bytes_length != 0 {
547                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
548            }
549        }
550
551        #[cfg(feature = "temporal_track")]
552        if let Some(track) = self.toc.temporal_track.as_ref() {
553            if track.bytes_length != 0 {
554                max_end = max_end.max(track.bytes_offset + track.bytes_length);
555            }
556        }
557
558        max_end
559    }
560
561    fn payload_region_end(&self) -> u64 {
562        let wal_region_end = self.header.wal_offset + self.header.wal_size;
563        let frames_with_payload: Vec<_> = self
564            .toc
565            .frames
566            .iter()
567            .filter(|frame| frame.payload_length != 0)
568            .collect();
569
570        tracing::info!(
571            "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
572            frames_with_payload.len(),
573            self.toc.frames.len(),
574            wal_region_end
575        );
576
577        for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
578            tracing::info!(
579                "  frame[{}]: id={} offset={} length={} status={:?}",
580                idx,
581                frame.id,
582                frame.payload_offset,
583                frame.payload_length,
584                frame.status
585            );
586        }
587
588        let result = frames_with_payload
589            .iter()
590            .fold(wal_region_end, |max_end, frame| {
591                match frame.payload_offset.checked_add(frame.payload_length) {
592                    Some(end) => max_end.max(end),
593                    None => max_end,
594                }
595            });
596
597        tracing::info!("payload_region_end: returning {}", result);
598        result
599    }
600
601    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
602        loop {
603            match self.wal.append_entry(payload) {
604                Ok(seq) => return Ok(seq),
605                Err(MemvidError::CheckpointFailed { reason })
606                    if reason == "embedded WAL region too small for entry"
607                        || reason == "embedded WAL region full" =>
608                {
609                    // WAL is either too small for this entry or full with pending entries.
610                    // Grow the WAL to accommodate - doubling ensures we have space.
611                    let required = WAL_ENTRY_HEADER_SIZE
612                        .saturating_add(payload.len() as u64)
613                        .max(self.header.wal_size + 1);
614                    self.grow_wal_region(required)?;
615                }
616                Err(err) => return Err(err),
617            }
618        }
619    }
620
621    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
622        let mut new_size = self.header.wal_size;
623        let mut target = required_entry_size;
624        if target == 0 {
625            target = self.header.wal_size;
626        }
627        while new_size <= target {
628            new_size = new_size
629                .checked_mul(2)
630                .ok_or_else(|| MemvidError::CheckpointFailed {
631                    reason: "wal_size overflow".into(),
632                })?;
633        }
634        let delta = new_size - self.header.wal_size;
635        if delta == 0 {
636            return Ok(());
637        }
638
639        self.shift_data_for_wal_growth(delta)?;
640        self.header.wal_size = new_size;
641        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
642        self.data_end = self.data_end.saturating_add(delta);
643        self.adjust_offsets_after_wal_growth(delta);
644
645        let catalog_end = self.catalog_data_end();
646        self.header.footer_offset = catalog_end
647            .max(self.header.footer_offset)
648            .max(self.data_end);
649
650        self.rewrite_toc_footer()?;
651        self.header.toc_checksum = self.toc.toc_checksum;
652        crate::persist_header(&mut self.file, &self.header)?;
653        self.file.sync_all()?;
654        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
655        Ok(())
656    }
657
658    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
659        if delta == 0 {
660            return Ok(());
661        }
662        let original_len = self.file.metadata()?.len();
663        let data_start = self.header.wal_offset + self.header.wal_size;
664        self.file.set_len(original_len + delta)?;
665
666        let mut remaining = original_len.saturating_sub(data_start);
667        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
668        while remaining > 0 {
669            let chunk = min(remaining, buffer.len() as u64);
670            let src = data_start + remaining - chunk;
671            self.file.seek(SeekFrom::Start(src))?;
672            self.file.read_exact(&mut buffer[..chunk as usize])?;
673            let dst = src + delta;
674            self.file.seek(SeekFrom::Start(dst))?;
675            self.file.write_all(&buffer[..chunk as usize])?;
676            remaining -= chunk;
677        }
678
679        self.file.seek(SeekFrom::Start(data_start))?;
680        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
681        let mut remaining = delta;
682        while remaining > 0 {
683            let write = min(remaining, zero_buf.len() as u64);
684            self.file.write_all(&zero_buf[..write as usize])?;
685            remaining -= write;
686        }
687        Ok(())
688    }
689
690    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
691        if delta == 0 {
692            return;
693        }
694
695        for frame in &mut self.toc.frames {
696            if frame.payload_offset != 0 {
697                frame.payload_offset += delta;
698            }
699        }
700
701        for segment in &mut self.toc.segments {
702            if segment.bytes_offset != 0 {
703                segment.bytes_offset += delta;
704            }
705        }
706
707        if let Some(lex) = self.toc.indexes.lex.as_mut() {
708            if lex.bytes_offset != 0 {
709                lex.bytes_offset += delta;
710            }
711        }
712        for manifest in &mut self.toc.indexes.lex_segments {
713            if manifest.bytes_offset != 0 {
714                manifest.bytes_offset += delta;
715            }
716        }
717        if let Some(vec) = self.toc.indexes.vec.as_mut() {
718            if vec.bytes_offset != 0 {
719                vec.bytes_offset += delta;
720            }
721        }
722        if let Some(time_index) = self.toc.time_index.as_mut() {
723            if time_index.bytes_offset != 0 {
724                time_index.bytes_offset += delta;
725            }
726        }
727        #[cfg(feature = "temporal_track")]
728        if let Some(track) = self.toc.temporal_track.as_mut() {
729            if track.bytes_offset != 0 {
730                track.bytes_offset += delta;
731            }
732        }
733
734        let catalog = &mut self.toc.segment_catalog;
735        for descriptor in &mut catalog.lex_segments {
736            if descriptor.common.bytes_offset != 0 {
737                descriptor.common.bytes_offset += delta;
738            }
739        }
740        for descriptor in &mut catalog.vec_segments {
741            if descriptor.common.bytes_offset != 0 {
742                descriptor.common.bytes_offset += delta;
743            }
744        }
745        for descriptor in &mut catalog.time_segments {
746            if descriptor.common.bytes_offset != 0 {
747                descriptor.common.bytes_offset += delta;
748            }
749        }
750        #[cfg(feature = "temporal_track")]
751        for descriptor in &mut catalog.temporal_segments {
752            if descriptor.common.bytes_offset != 0 {
753                descriptor.common.bytes_offset += delta;
754            }
755        }
756        for descriptor in &mut catalog.tantivy_segments {
757            if descriptor.common.bytes_offset != 0 {
758                descriptor.common.bytes_offset += delta;
759            }
760        }
761
762        #[cfg(feature = "lex")]
763        if let Ok(mut storage) = self.lex_storage.write() {
764            storage.adjust_offsets(delta);
765        }
766    }
767    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
768        self.ensure_writable()?;
769        if options.background {
770            tracing::debug!("commit background flag ignored; running synchronously");
771        }
772        let mode = options.mode;
773        let records = self.wal.pending_records()?;
774        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
775            return Ok(());
776        }
777        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
778    }
779
780    pub fn commit(&mut self) -> Result<()> {
781        self.ensure_writable()?;
782        self.commit_with_options(CommitOptions::new(CommitMode::Full))
783    }
784
785    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
786        self.generation = self.generation.wrapping_add(1);
787
788        let delta = self.apply_records(records)?;
789        let mut indexes_rebuilt = false;
790
791        // Check if CLIP index has pending embeddings that need to be persisted
792        let clip_needs_persist = self
793            .clip_index
794            .as_ref()
795            .map_or(false, |idx| !idx.is_empty());
796
797        if !delta.is_empty() || clip_needs_persist {
798            tracing::debug!(
799                inserted_frames = delta.inserted_frames.len(),
800                inserted_embeddings = delta.inserted_embeddings.len(),
801                inserted_time_entries = delta.inserted_time_entries.len(),
802                clip_needs_persist = clip_needs_persist,
803                "commit applied delta"
804            );
805            self.rebuild_indexes(&delta.inserted_embeddings)?;
806            indexes_rebuilt = true;
807        }
808
809        if !indexes_rebuilt && self.tantivy_index_pending() {
810            self.flush_tantivy()?;
811        }
812
813        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
814        if !indexes_rebuilt && self.clip_enabled {
815            if let Some(ref clip_index) = self.clip_index {
816                if !clip_index.is_empty() {
817                    self.persist_clip_index()?;
818                }
819            }
820        }
821
822        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
823        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
824            self.persist_memories_track()?;
825        }
826
827        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
828        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
829            self.persist_logic_mesh()?;
830        }
831
832        // Persist sketch track if it has entries
833        if !self.sketch_track.is_empty() {
834            self.persist_sketch_track()?;
835        }
836
837        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
838        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
839
840        self.rewrite_toc_footer()?;
841        self.header.toc_checksum = self.toc.toc_checksum;
842        self.wal.record_checkpoint(&mut self.header)?;
843        self.header.toc_checksum = self.toc.toc_checksum;
844        crate::persist_header(&mut self.file, &self.header)?;
845        self.file.sync_all()?;
846        #[cfg(feature = "parallel_segments")]
847        if let Some(wal) = self.manifest_wal.as_mut() {
848            wal.flush()?;
849            wal.truncate()?;
850        }
851        self.pending_frame_inserts = 0;
852        self.dirty = false;
853        Ok(())
854    }
855
856    #[cfg(feature = "parallel_segments")]
857    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
858        self.ensure_writable()?;
859        if !self.dirty && !self.tantivy_index_pending() {
860            return Ok(());
861        }
862        let opts = opts.clone();
863        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
864    }
865
866    #[cfg(feature = "parallel_segments")]
867    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
868        if !self.dirty && !self.tantivy_index_pending() {
869            return Ok(());
870        }
871        let records = self.wal.pending_records()?;
872        let delta = self.apply_records(records)?;
873        self.generation = self.generation.wrapping_add(1);
874        let mut indexes_rebuilt = false;
875        if !delta.is_empty() {
876            tracing::info!(
877                inserted_frames = delta.inserted_frames.len(),
878                inserted_embeddings = delta.inserted_embeddings.len(),
879                inserted_time_entries = delta.inserted_time_entries.len(),
880                "parallel commit applied delta"
881            );
882            // Try to use parallel segment builder first
883            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
884            tracing::info!(
885                "parallel_commit: used_parallel={}, lex_enabled={}",
886                used_parallel,
887                self.lex_enabled
888            );
889            if used_parallel {
890                // Segments were written at data_end; update footer_offset so
891                // rewrite_toc_footer places the TOC after the new segment data
892                self.header.footer_offset = self.data_end;
893                indexes_rebuilt = true;
894
895                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
896                // Only add new frames from the delta, not all frames.
897                #[cfg(feature = "lex")]
898                if self.lex_enabled {
899                    tracing::info!(
900                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
901                        delta.inserted_frames.len(),
902                        self.toc.frames.len()
903                    );
904
905                    // Initialize Tantivy engine if not already present
906                    if self.tantivy.is_none() {
907                        self.init_tantivy()?;
908                    }
909
910                    // First, collect all frames and their search text (to avoid borrow conflicts)
911                    let max_payload = crate::memvid::search::max_index_payload();
912                    let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
913
914                    for frame_id in &delta.inserted_frames {
915                        // Look up the actual Frame from the TOC
916                        let frame = match self.toc.frames.get(*frame_id as usize) {
917                            Some(f) => f.clone(),
918                            None => continue,
919                        };
920
921                        // Check if frame has explicit search_text first - clone it for ownership
922                        let explicit_text = frame.search_text.clone();
923                        if let Some(ref search_text) = explicit_text {
924                            if !search_text.trim().is_empty() {
925                                prepared_docs.push((frame, search_text.clone()));
926                                continue;
927                            }
928                        }
929
930                        // Get MIME type and check if text-indexable
931                        let mime = frame
932                            .metadata
933                            .as_ref()
934                            .and_then(|m| m.mime.as_deref())
935                            .unwrap_or("application/octet-stream");
936
937                        if !crate::memvid::search::is_text_indexable_mime(mime) {
938                            continue;
939                        }
940
941                        if frame.payload_length > max_payload {
942                            continue;
943                        }
944
945                        let text = self.frame_search_text(&frame)?;
946                        if !text.trim().is_empty() {
947                            prepared_docs.push((frame, text));
948                        }
949                    }
950
951                    // Now add to Tantivy engine (no borrow conflict)
952                    if let Some(ref mut engine) = self.tantivy {
953                        for (frame, text) in &prepared_docs {
954                            engine.add_frame(frame, text)?;
955                        }
956
957                        if !prepared_docs.is_empty() {
958                            engine.commit()?;
959                            self.tantivy_dirty = true;
960                        }
961
962                        tracing::info!(
963                            "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
964                            prepared_docs.len(),
965                            engine.num_docs()
966                        );
967                    } else {
968                        tracing::warn!(
969                            "parallel_commit: Tantivy engine is None after init_tantivy"
970                        );
971                    }
972                }
973
974                // Time index stores all entries together for timeline queries.
975                // Unlike Tantivy which is incremental, time index needs full rebuild.
976                self.file.seek(SeekFrom::Start(self.data_end))?;
977                let mut time_entries: Vec<TimeIndexEntry> = self
978                    .toc
979                    .frames
980                    .iter()
981                    .filter(|frame| {
982                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
983                    })
984                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
985                    .collect();
986                let (ti_offset, ti_length, ti_checksum) =
987                    time_index_append(&mut self.file, &mut time_entries)?;
988                self.toc.time_index = Some(TimeIndexManifest {
989                    bytes_offset: ti_offset,
990                    bytes_length: ti_length,
991                    entry_count: time_entries.len() as u64,
992                    checksum: ti_checksum,
993                });
994                // Update data_end to account for the newly written time index
995                self.data_end = ti_offset + ti_length;
996                self.header.footer_offset = self.data_end;
997                tracing::info!(
998                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
999                    ti_offset,
1000                    ti_length,
1001                    time_entries.len()
1002                );
1003            } else {
1004                // Fall back to sequential rebuild if no segments were generated
1005                self.rebuild_indexes(&delta.inserted_embeddings)?;
1006                indexes_rebuilt = true;
1007            }
1008        }
1009
1010        // Flush Tantivy index if dirty (from parallel path or pending updates)
1011        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1012            self.flush_tantivy()?;
1013        }
1014
1015        // Persist CLIP index if it has embeddings
1016        if self.clip_enabled {
1017            if let Some(ref clip_index) = self.clip_index {
1018                if !clip_index.is_empty() {
1019                    self.persist_clip_index()?;
1020                }
1021            }
1022        }
1023
1024        // Persist memories track if it has cards
1025        if self.memories_track.card_count() > 0 {
1026            self.persist_memories_track()?;
1027        }
1028
1029        // Persist logic mesh if it has nodes
1030        if !self.logic_mesh.is_empty() {
1031            self.persist_logic_mesh()?;
1032        }
1033
1034        // Persist sketch track if it has entries
1035        if !self.sketch_track.is_empty() {
1036            self.persist_sketch_track()?;
1037        }
1038
1039        // flush_tantivy() has already set footer_offset correctly
1040        // DO NOT overwrite with catalog_data_end()
1041        self.rewrite_toc_footer()?;
1042        self.header.toc_checksum = self.toc.toc_checksum;
1043        self.wal.record_checkpoint(&mut self.header)?;
1044        self.header.toc_checksum = self.toc.toc_checksum;
1045        crate::persist_header(&mut self.file, &self.header)?;
1046        self.file.sync_all()?;
1047        if let Some(wal) = self.manifest_wal.as_mut() {
1048            wal.flush()?;
1049            wal.truncate()?;
1050        }
1051        self.pending_frame_inserts = 0;
1052        self.dirty = false;
1053        Ok(())
1054    }
1055
1056    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1057        let records = self.wal.records_after(self.header.wal_sequence)?;
1058        if records.is_empty() {
1059            if self.tantivy_index_pending() {
1060                self.flush_tantivy()?;
1061            }
1062            return Ok(());
1063        }
1064        let delta = self.apply_records(records)?;
1065        if !delta.is_empty() {
1066            tracing::debug!(
1067                inserted_frames = delta.inserted_frames.len(),
1068                inserted_embeddings = delta.inserted_embeddings.len(),
1069                inserted_time_entries = delta.inserted_time_entries.len(),
1070                "recover applied delta"
1071            );
1072            self.rebuild_indexes(&delta.inserted_embeddings)?;
1073        } else if self.tantivy_index_pending() {
1074            self.flush_tantivy()?;
1075        }
1076        self.wal.record_checkpoint(&mut self.header)?;
1077        crate::persist_header(&mut self.file, &self.header)?;
1078        if !delta.is_empty() {
1079            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1080        } else if self.tantivy_index_pending() {
1081            self.flush_tantivy()?;
1082            crate::persist_header(&mut self.file, &self.header)?;
1083        }
1084        self.file.sync_all()?;
1085        self.pending_frame_inserts = 0;
1086        self.dirty = false;
1087        Ok(())
1088    }
1089
1090    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1091        let mut delta = IngestionDelta::default();
1092        if records.is_empty() {
1093            return Ok(delta);
1094        }
1095
1096        // Use data_end instead of payload_region_end to avoid overwriting
1097        // vec/lex/time segments that were written after the payload region.
1098        // payload_region_end() only considers frame payloads, but data_end tracks
1099        // all data including index segments.
1100        let mut data_cursor = self.data_end;
1101        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1102
1103        if !records.is_empty() {
1104            self.file.seek(SeekFrom::Start(data_cursor))?;
1105            for record in records {
1106                let mut entry = match decode_wal_entry(&record.payload)? {
1107                    WalEntry::Frame(entry) => entry,
1108                    #[cfg(feature = "lex")]
1109                    WalEntry::Lex(batch) => {
1110                        self.apply_lex_wal(batch)?;
1111                        continue;
1112                    }
1113                };
1114
1115                match entry.op {
1116                    FrameWalOp::Insert => {
1117                        let frame_id = self.toc.frames.len() as u64;
1118
1119                        let (
1120                            payload_offset,
1121                            payload_length,
1122                            checksum_bytes,
1123                            canonical_length_value,
1124                        ) = if let Some(source_id) = entry.reuse_payload_from {
1125                            if !entry.payload.is_empty() {
1126                                return Err(MemvidError::InvalidFrame {
1127                                    frame_id: source_id,
1128                                    reason: "reused payload entry contained inline bytes",
1129                                });
1130                            }
1131                            let source = self.toc.frames.get(source_id as usize).cloned().ok_or(
1132                                MemvidError::InvalidFrame {
1133                                    frame_id: source_id,
1134                                    reason: "reused payload source missing",
1135                                },
1136                            )?;
1137                            (
1138                                source.payload_offset,
1139                                source.payload_length,
1140                                source.checksum,
1141                                entry
1142                                    .canonical_length
1143                                    .or(source.canonical_length)
1144                                    .unwrap_or(source.payload_length),
1145                            )
1146                        } else {
1147                            self.file.seek(SeekFrom::Start(data_cursor))?;
1148                            self.file.write_all(&entry.payload)?;
1149                            let checksum = hash(&entry.payload);
1150                            let payload_length = entry.payload.len() as u64;
1151                            let canonical_length =
1152                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1153                                    match entry.canonical_length {
1154                                        Some(len) => len,
1155                                        None => {
1156                                            let decoded = crate::decode_canonical_bytes(
1157                                                &entry.payload,
1158                                                CanonicalEncoding::Zstd,
1159                                                frame_id,
1160                                            )?;
1161                                            decoded.len() as u64
1162                                        }
1163                                    }
1164                                } else {
1165                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1166                                };
1167                            let payload_offset = data_cursor;
1168                            data_cursor += payload_length;
1169                            (
1170                                payload_offset,
1171                                payload_length,
1172                                *checksum.as_bytes(),
1173                                canonical_length,
1174                            )
1175                        };
1176
1177                        let uri = entry
1178                            .uri
1179                            .clone()
1180                            .unwrap_or_else(|| crate::default_uri(frame_id));
1181                        let title = entry
1182                            .title
1183                            .clone()
1184                            .or_else(|| crate::infer_title_from_uri(&uri));
1185
1186                        #[cfg(feature = "temporal_track")]
1187                        let (anchor_ts, anchor_source) =
1188                            self.determine_temporal_anchor(entry.timestamp);
1189
1190                        let mut frame = Frame {
1191                            id: frame_id,
1192                            timestamp: entry.timestamp,
1193                            anchor_ts: {
1194                                #[cfg(feature = "temporal_track")]
1195                                {
1196                                    Some(anchor_ts)
1197                                }
1198                                #[cfg(not(feature = "temporal_track"))]
1199                                {
1200                                    None
1201                                }
1202                            },
1203                            anchor_source: {
1204                                #[cfg(feature = "temporal_track")]
1205                                {
1206                                    Some(anchor_source)
1207                                }
1208                                #[cfg(not(feature = "temporal_track"))]
1209                                {
1210                                    None
1211                                }
1212                            },
1213                            kind: entry.kind.clone(),
1214                            track: entry.track.clone(),
1215                            payload_offset,
1216                            payload_length,
1217                            checksum: checksum_bytes,
1218                            uri: Some(uri),
1219                            title,
1220                            canonical_encoding: entry.canonical_encoding,
1221                            canonical_length: Some(canonical_length_value),
1222                            metadata: entry.metadata.clone(),
1223                            search_text: entry.search_text.clone(),
1224                            tags: entry.tags.clone(),
1225                            labels: entry.labels.clone(),
1226                            extra_metadata: entry.extra_metadata.clone(),
1227                            content_dates: entry.content_dates.clone(),
1228                            chunk_manifest: entry.chunk_manifest.clone(),
1229                            role: entry.role,
1230                            parent_id: None,
1231                            chunk_index: entry.chunk_index,
1232                            chunk_count: entry.chunk_count,
1233                            status: FrameStatus::Active,
1234                            supersedes: entry.supersedes_frame_id,
1235                            superseded_by: None,
1236                            source_sha256: entry.source_sha256,
1237                            source_path: entry.source_path.clone(),
1238                            enrichment_state: entry.enrichment_state,
1239                        };
1240
1241                        if let Some(parent_seq) = entry.parent_sequence {
1242                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1243                                frame.parent_id = Some(*parent_frame_id);
1244                            } else {
1245                                // Parent sequence not found in current batch - this can happen
1246                                // if parent was committed in a previous batch. Try to find parent
1247                                // by looking at recently inserted frames with matching characteristics.
1248                                // The parent should be the most recent Document frame that has
1249                                // a chunk_manifest matching this chunk's expected parent.
1250                                if entry.role == FrameRole::DocumentChunk {
1251                                    // Look backwards through recently inserted frames
1252                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1253                                        if let Some(candidate) =
1254                                            self.toc.frames.get(candidate_id as usize)
1255                                        {
1256                                            if candidate.role == FrameRole::Document
1257                                                && candidate.chunk_manifest.is_some()
1258                                            {
1259                                                // Found a parent document - use it
1260                                                frame.parent_id = Some(candidate_id);
1261                                                tracing::debug!(
1262                                                    chunk_frame_id = frame_id,
1263                                                    parent_frame_id = candidate_id,
1264                                                    parent_seq = parent_seq,
1265                                                    "resolved chunk parent via fallback"
1266                                                );
1267                                                break;
1268                                            }
1269                                        }
1270                                    }
1271                                }
1272                                if frame.parent_id.is_none() {
1273                                    tracing::warn!(
1274                                        chunk_frame_id = frame_id,
1275                                        parent_seq = parent_seq,
1276                                        "chunk has parent_sequence but parent not found in batch"
1277                                    );
1278                                }
1279                            }
1280                        }
1281
1282                        #[cfg(feature = "lex")]
1283                        let index_text = if self.tantivy.is_some() {
1284                            if let Some(text) = entry.search_text.clone() {
1285                                if text.trim().is_empty() {
1286                                    None
1287                                } else {
1288                                    Some(text)
1289                                }
1290                            } else {
1291                                Some(self.frame_content(&frame)?)
1292                            }
1293                        } else {
1294                            None
1295                        };
1296                        #[cfg(feature = "lex")]
1297                        if let (Some(engine), Some(ref text)) =
1298                            (self.tantivy.as_mut(), index_text.as_ref())
1299                        {
1300                            engine.add_frame(&frame, text)?;
1301                            self.tantivy_dirty = true;
1302
1303                            // Generate sketch for fast candidate pre-filtering
1304                            // Uses the same text as tantivy indexing for consistency
1305                            if !text.trim().is_empty() {
1306                                let entry = crate::types::generate_sketch(
1307                                    frame_id,
1308                                    text,
1309                                    crate::types::SketchVariant::Small,
1310                                    None,
1311                                );
1312                                self.sketch_track.insert(entry);
1313                            }
1314                        }
1315
1316                        if let Some(embedding) = entry.embedding.take() {
1317                            delta
1318                                .inserted_embeddings
1319                                .push((frame_id, embedding.clone()));
1320                        }
1321
1322                        if entry.role == FrameRole::Document {
1323                            delta
1324                                .inserted_time_entries
1325                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1326                            #[cfg(feature = "temporal_track")]
1327                            {
1328                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1329                                    frame_id,
1330                                    anchor_ts,
1331                                    anchor_source,
1332                                ));
1333                                delta.inserted_temporal_mentions.extend(
1334                                    Self::collect_temporal_mentions(
1335                                        entry.search_text.as_deref(),
1336                                        frame_id,
1337                                        anchor_ts,
1338                                    ),
1339                                );
1340                            }
1341                        }
1342
1343                        if let Some(predecessor) = frame.supersedes {
1344                            self.mark_frame_superseded(predecessor, frame_id)?;
1345                        }
1346
1347                        self.toc.frames.push(frame);
1348                        delta.inserted_frames.push(frame_id);
1349                        sequence_to_frame.insert(record.sequence, frame_id);
1350                    }
1351                    FrameWalOp::Tombstone => {
1352                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1353                            frame_id: 0,
1354                            reason: "tombstone missing frame reference",
1355                        })?;
1356                        self.mark_frame_deleted(target)?;
1357                        delta.mutated_frames = true;
1358                    }
1359                }
1360            }
1361            self.data_end = self.data_end.max(data_cursor);
1362        }
1363
1364        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1365        // This handles edge cases where chunks couldn't be linked during the first pass.
1366        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1367        let orphan_resolutions: Vec<(u64, u64)> = delta
1368            .inserted_frames
1369            .iter()
1370            .filter_map(|&frame_id| {
1371                let frame = self.toc.frames.get(frame_id as usize)?;
1372                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1373                    return None;
1374                }
1375                // Find the most recent Document frame before this chunk that has a manifest
1376                for candidate_id in (0..frame_id).rev() {
1377                    if let Some(candidate) = self.toc.frames.get(candidate_id as usize) {
1378                        if candidate.role == FrameRole::Document
1379                            && candidate.chunk_manifest.is_some()
1380                            && candidate.status == FrameStatus::Active
1381                        {
1382                            return Some((frame_id, candidate_id));
1383                        }
1384                    }
1385                }
1386                None
1387            })
1388            .collect();
1389
1390        // Now apply the resolutions
1391        for (chunk_id, parent_id) in orphan_resolutions {
1392            if let Some(frame) = self.toc.frames.get_mut(chunk_id as usize) {
1393                frame.parent_id = Some(parent_id);
1394                tracing::debug!(
1395                    chunk_frame_id = chunk_id,
1396                    parent_frame_id = parent_id,
1397                    "resolved orphan chunk parent in second pass"
1398                );
1399            }
1400        }
1401
1402        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1403        // See commit_from_records() for where rebuild_indexes() is invoked.
1404        Ok(delta)
1405    }
1406
1407    #[cfg(feature = "temporal_track")]
1408    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1409        (timestamp, AnchorSource::FrameTimestamp)
1410    }
1411
1412    #[cfg(feature = "temporal_track")]
1413    fn collect_temporal_mentions(
1414        text: Option<&str>,
1415        frame_id: FrameId,
1416        anchor_ts: i64,
1417    ) -> Vec<TemporalMention> {
1418        let text = match text {
1419            Some(value) if !value.trim().is_empty() => value,
1420            _ => return Vec::new(),
1421        };
1422
1423        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1424            Ok(ts) => ts,
1425            Err(_) => return Vec::new(),
1426        };
1427
1428        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1429        let normalizer = TemporalNormalizer::new(context);
1430        let mut spans: Vec<(usize, usize)> = Vec::new();
1431        let lower = text.to_ascii_lowercase();
1432
1433        for phrase in STATIC_TEMPORAL_PHRASES {
1434            let mut search_start = 0usize;
1435            while let Some(idx) = lower[search_start..].find(phrase) {
1436                let abs = search_start + idx;
1437                let end = abs + phrase.len();
1438                spans.push((abs, end));
1439                search_start = end;
1440            }
1441        }
1442
1443        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1444        let regex = NUMERIC_DATE.get_or_init(|| {
1445            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1446        });
1447        let regex = match regex {
1448            Ok(re) => re,
1449            Err(msg) => {
1450                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1451                return Vec::new();
1452            }
1453        };
1454        for mat in regex.find_iter(text) {
1455            spans.push((mat.start(), mat.end()));
1456        }
1457
1458        spans.sort_unstable();
1459        spans.dedup();
1460
1461        let mut mentions: Vec<TemporalMention> = Vec::new();
1462        for (start, end) in spans {
1463            if end > text.len() || start >= end {
1464                continue;
1465            }
1466            let raw = &text[start..end];
1467            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1468            if trimmed.is_empty() {
1469                continue;
1470            }
1471            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1472            let finish = offset + trimmed.len();
1473            match normalizer.resolve(trimmed) {
1474                Ok(resolution) => {
1475                    mentions.extend(Self::resolution_to_mentions(
1476                        resolution, frame_id, offset, finish,
1477                    ));
1478                }
1479                Err(_) => continue,
1480            }
1481        }
1482
1483        mentions
1484    }
1485
1486    #[cfg(feature = "temporal_track")]
1487    fn resolution_to_mentions(
1488        resolution: TemporalResolution,
1489        frame_id: FrameId,
1490        byte_start: usize,
1491        byte_end: usize,
1492    ) -> Vec<TemporalMention> {
1493        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1494        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1495        let mut results = Vec::new();
1496
1497        let base_flags = Self::flags_from_resolution(&resolution.flags);
1498        match resolution.value {
1499            TemporalResolutionValue::Date(date) => {
1500                let ts = Self::date_to_timestamp(date);
1501                results.push(TemporalMention::new(
1502                    ts,
1503                    frame_id,
1504                    byte_start,
1505                    byte_len,
1506                    TemporalMentionKind::Date,
1507                    resolution.confidence,
1508                    0,
1509                    base_flags,
1510                ));
1511            }
1512            TemporalResolutionValue::DateTime(dt) => {
1513                let ts = dt.unix_timestamp();
1514                let tz_hint = dt.offset().whole_minutes() as i16;
1515                results.push(TemporalMention::new(
1516                    ts,
1517                    frame_id,
1518                    byte_start,
1519                    byte_len,
1520                    TemporalMentionKind::DateTime,
1521                    resolution.confidence,
1522                    tz_hint,
1523                    base_flags,
1524                ));
1525            }
1526            TemporalResolutionValue::DateRange { start, end } => {
1527                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1528                let start_ts = Self::date_to_timestamp(start);
1529                results.push(TemporalMention::new(
1530                    start_ts,
1531                    frame_id,
1532                    byte_start,
1533                    byte_len,
1534                    TemporalMentionKind::RangeStart,
1535                    resolution.confidence,
1536                    0,
1537                    flags,
1538                ));
1539                let end_ts = Self::date_to_timestamp(end);
1540                results.push(TemporalMention::new(
1541                    end_ts,
1542                    frame_id,
1543                    byte_start,
1544                    byte_len,
1545                    TemporalMentionKind::RangeEnd,
1546                    resolution.confidence,
1547                    0,
1548                    flags,
1549                ));
1550            }
1551            TemporalResolutionValue::DateTimeRange { start, end } => {
1552                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1553                results.push(TemporalMention::new(
1554                    start.unix_timestamp(),
1555                    frame_id,
1556                    byte_start,
1557                    byte_len,
1558                    TemporalMentionKind::RangeStart,
1559                    resolution.confidence,
1560                    start.offset().whole_minutes() as i16,
1561                    flags,
1562                ));
1563                results.push(TemporalMention::new(
1564                    end.unix_timestamp(),
1565                    frame_id,
1566                    byte_start,
1567                    byte_len,
1568                    TemporalMentionKind::RangeEnd,
1569                    resolution.confidence,
1570                    end.offset().whole_minutes() as i16,
1571                    flags,
1572                ));
1573            }
1574            TemporalResolutionValue::Month { year, month } => {
1575                let start_date = match Date::from_calendar_date(year, month, 1) {
1576                    Ok(date) => date,
1577                    Err(err) => {
1578                        tracing::warn!(
1579                            target = "memvid::temporal",
1580                            %err,
1581                            year,
1582                            month = month as u8,
1583                            "skipping invalid month resolution"
1584                        );
1585                        // Skip invalid range for this mention only.
1586                        return results;
1587                    }
1588                };
1589                let end_date = match Self::last_day_in_month(year, month) {
1590                    Some(date) => date,
1591                    None => {
1592                        tracing::warn!(
1593                            target = "memvid::temporal",
1594                            year,
1595                            month = month as u8,
1596                            "skipping month resolution with invalid calendar range"
1597                        );
1598                        return results;
1599                    }
1600                };
1601                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1602                results.push(TemporalMention::new(
1603                    Self::date_to_timestamp(start_date),
1604                    frame_id,
1605                    byte_start,
1606                    byte_len,
1607                    TemporalMentionKind::RangeStart,
1608                    resolution.confidence,
1609                    0,
1610                    flags,
1611                ));
1612                results.push(TemporalMention::new(
1613                    Self::date_to_timestamp(end_date),
1614                    frame_id,
1615                    byte_start,
1616                    byte_len,
1617                    TemporalMentionKind::RangeEnd,
1618                    resolution.confidence,
1619                    0,
1620                    flags,
1621                ));
1622            }
1623        }
1624
1625        results
1626    }
1627
1628    #[cfg(feature = "temporal_track")]
1629    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1630        let mut result = TemporalMentionFlags::empty();
1631        if flags
1632            .iter()
1633            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1634        {
1635            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1636        }
1637        if flags
1638            .iter()
1639            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1640        {
1641            result = result.set(TemporalMentionFlags::DERIVED, true);
1642        }
1643        result
1644    }
1645
1646    #[cfg(feature = "temporal_track")]
1647    fn date_to_timestamp(date: Date) -> i64 {
1648        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1649            .assume_offset(UtcOffset::UTC)
1650            .unix_timestamp()
1651    }
1652
1653    #[cfg(feature = "temporal_track")]
1654    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1655        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1656        while let Some(next) = date.next_day() {
1657            if next.month() == month {
1658                date = next;
1659            } else {
1660                break;
1661            }
1662        }
1663        Some(date)
1664    }
1665
1666    #[allow(dead_code)]
1667    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1668        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1669            return Ok(false);
1670        }
1671
1672        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1673            Some(artifact) => artifact,
1674            None => return Ok(false),
1675        };
1676
1677        let segment_id = self.toc.segment_catalog.next_segment_id;
1678        #[cfg(feature = "parallel_segments")]
1679        let span =
1680            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1681
1682        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1683        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1684        #[cfg(feature = "parallel_segments")]
1685        if let Some(span) = span {
1686            Self::decorate_segment_common(&mut descriptor.common, span);
1687        }
1688        #[cfg(feature = "parallel_segments")]
1689        let descriptor_for_manifest = descriptor.clone();
1690        self.toc.segment_catalog.lex_segments.push(descriptor);
1691        #[cfg(feature = "parallel_segments")]
1692        if let Err(err) = self.record_index_segment(
1693            SegmentKind::Lexical,
1694            descriptor_for_manifest.common,
1695            SegmentStats {
1696                doc_count: artifact.doc_count,
1697                vector_count: 0,
1698                time_entries: 0,
1699                bytes_uncompressed: artifact.bytes.len() as u64,
1700                build_micros: 0,
1701            },
1702        ) {
1703            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1704        }
1705        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1706        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1707        Ok(true)
1708    }
1709
1710    #[allow(dead_code)]
1711    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1712        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1713            return Ok(false);
1714        }
1715
1716        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1717            Some(artifact) => artifact,
1718            None => return Ok(false),
1719        };
1720
1721        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1722            if existing_dim != artifact.dimension {
1723                return Err(MemvidError::VecDimensionMismatch {
1724                    expected: existing_dim,
1725                    actual: artifact.dimension as usize,
1726                });
1727            }
1728        }
1729
1730        let segment_id = self.toc.segment_catalog.next_segment_id;
1731        #[cfg(feature = "parallel_segments")]
1732        #[cfg(feature = "parallel_segments")]
1733        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1734
1735        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1736        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1737        #[cfg(feature = "parallel_segments")]
1738        if let Some(span) = span {
1739            Self::decorate_segment_common(&mut descriptor.common, span);
1740        }
1741        #[cfg(feature = "parallel_segments")]
1742        let descriptor_for_manifest = descriptor.clone();
1743        self.toc.segment_catalog.vec_segments.push(descriptor);
1744        #[cfg(feature = "parallel_segments")]
1745        if let Err(err) = self.record_index_segment(
1746            SegmentKind::Vector,
1747            descriptor_for_manifest.common,
1748            SegmentStats {
1749                doc_count: 0,
1750                vector_count: artifact.vector_count,
1751                time_entries: 0,
1752                bytes_uncompressed: artifact.bytes_uncompressed,
1753                build_micros: 0,
1754            },
1755        ) {
1756            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1757        }
1758        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1759        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1760
1761        // Keep the global vec manifest in sync for auto-detection and stats.
1762        if self.toc.indexes.vec.is_none() {
1763            let empty_offset = self.data_end;
1764            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1765                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1766            self.toc.indexes.vec = Some(VecIndexManifest {
1767                vector_count: 0,
1768                dimension: 0,
1769                bytes_offset: empty_offset,
1770                bytes_length: 0,
1771                checksum: empty_checksum,
1772                compression_mode: self.vec_compression.clone(),
1773            });
1774        }
1775        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1776            if manifest.dimension == 0 {
1777                manifest.dimension = artifact.dimension;
1778            }
1779            if manifest.bytes_length == 0 {
1780                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1781                manifest.compression_mode = artifact.compression.clone();
1782            }
1783        }
1784
1785        self.vec_enabled = true;
1786        Ok(true)
1787    }
1788
1789    #[allow(dead_code)]
1790    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1791        if delta.inserted_time_entries.is_empty() {
1792            return Ok(false);
1793        }
1794
1795        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1796            Some(artifact) => artifact,
1797            None => return Ok(false),
1798        };
1799
1800        let segment_id = self.toc.segment_catalog.next_segment_id;
1801        #[cfg(feature = "parallel_segments")]
1802        #[cfg(feature = "parallel_segments")]
1803        let span = self.segment_span_from_iter(
1804            delta
1805                .inserted_time_entries
1806                .iter()
1807                .map(|entry| entry.frame_id),
1808        );
1809
1810        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1811        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1812        #[cfg(feature = "parallel_segments")]
1813        if let Some(span) = span {
1814            Self::decorate_segment_common(&mut descriptor.common, span);
1815        }
1816        #[cfg(feature = "parallel_segments")]
1817        let descriptor_for_manifest = descriptor.clone();
1818        self.toc.segment_catalog.time_segments.push(descriptor);
1819        #[cfg(feature = "parallel_segments")]
1820        if let Err(err) = self.record_index_segment(
1821            SegmentKind::Time,
1822            descriptor_for_manifest.common,
1823            SegmentStats {
1824                doc_count: 0,
1825                vector_count: 0,
1826                time_entries: artifact.entry_count,
1827                bytes_uncompressed: artifact.bytes.len() as u64,
1828                build_micros: 0,
1829            },
1830        ) {
1831            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1832        }
1833        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1834        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1835        Ok(true)
1836    }
1837
1838    #[cfg(feature = "temporal_track")]
1839    #[allow(dead_code)]
1840    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1841        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1842        {
1843            return Ok(false);
1844        }
1845
1846        debug_assert!(
1847            delta.inserted_temporal_mentions.len() < 1_000_000,
1848            "temporal delta mentions unexpectedly large: {}",
1849            delta.inserted_temporal_mentions.len()
1850        );
1851        debug_assert!(
1852            delta.inserted_temporal_anchors.len() < 1_000_000,
1853            "temporal delta anchors unexpectedly large: {}",
1854            delta.inserted_temporal_anchors.len()
1855        );
1856
1857        let artifact = match self.build_temporal_segment_from_records(
1858            &delta.inserted_temporal_mentions,
1859            &delta.inserted_temporal_anchors,
1860        )? {
1861            Some(artifact) => artifact,
1862            None => return Ok(false),
1863        };
1864
1865        let segment_id = self.toc.segment_catalog.next_segment_id;
1866        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1867        self.toc
1868            .segment_catalog
1869            .temporal_segments
1870            .push(descriptor.clone());
1871        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1872        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1873
1874        self.toc.temporal_track = Some(TemporalTrackManifest {
1875            bytes_offset: descriptor.common.bytes_offset,
1876            bytes_length: descriptor.common.bytes_length,
1877            entry_count: artifact.entry_count,
1878            anchor_count: artifact.anchor_count,
1879            checksum: artifact.checksum,
1880            flags: artifact.flags,
1881        });
1882
1883        self.clear_temporal_track_cache();
1884
1885        Ok(true)
1886    }
1887
1888    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1889        let frame =
1890            self.toc
1891                .frames
1892                .get_mut(frame_id as usize)
1893                .ok_or(MemvidError::InvalidFrame {
1894                    frame_id,
1895                    reason: "supersede target missing",
1896                })?;
1897        frame.status = FrameStatus::Superseded;
1898        frame.superseded_by = Some(successor_id);
1899        self.remove_frame_from_indexes(frame_id)
1900    }
1901
1902    pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1903        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1904            return Ok(());
1905        }
1906
1907        let payload_end = self.payload_region_end();
1908        self.data_end = payload_end;
1909        // Don't truncate if footer_offset is higher - there may be replay segments
1910        // or other data written after payload_end that must be preserved.
1911        let safe_truncate_len = self.header.footer_offset.max(payload_end);
1912        if self.file.metadata()?.len() > safe_truncate_len {
1913            self.file.set_len(safe_truncate_len)?;
1914        }
1915        self.file.seek(SeekFrom::Start(payload_end))?;
1916
1917        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
1918        self.toc.segment_catalog.lex_segments.clear();
1919        self.toc.segment_catalog.vec_segments.clear();
1920        self.toc.segment_catalog.time_segments.clear();
1921        #[cfg(feature = "temporal_track")]
1922        self.toc.segment_catalog.temporal_segments.clear();
1923        #[cfg(feature = "parallel_segments")]
1924        self.toc.segment_catalog.index_segments.clear();
1925        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
1926        self.toc.segment_catalog.tantivy_segments.clear();
1927        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
1928        self.toc.indexes.lex_segments.clear();
1929
1930        let mut time_entries: Vec<TimeIndexEntry> = self
1931            .toc
1932            .frames
1933            .iter()
1934            .filter(|frame| {
1935                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1936            })
1937            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1938            .collect();
1939        let (ti_offset, ti_length, ti_checksum) =
1940            time_index_append(&mut self.file, &mut time_entries)?;
1941        self.toc.time_index = Some(TimeIndexManifest {
1942            bytes_offset: ti_offset,
1943            bytes_length: ti_length,
1944            entry_count: time_entries.len() as u64,
1945            checksum: ti_checksum,
1946        });
1947
1948        let mut footer_offset = ti_offset + ti_length;
1949
1950        #[cfg(feature = "temporal_track")]
1951        {
1952            self.toc.temporal_track = None;
1953            self.toc.segment_catalog.temporal_segments.clear();
1954            self.clear_temporal_track_cache();
1955        }
1956
1957        if self.lex_enabled {
1958            #[cfg(feature = "lex")]
1959            {
1960                // Clear embedded storage to avoid carrying stale segments between rebuilds.
1961                if let Ok(mut storage) = self.lex_storage.write() {
1962                    storage.clear();
1963                    storage.set_generation(0);
1964                }
1965
1966                // Initialize Tantivy engine (loads existing segments if any)
1967                self.init_tantivy()?;
1968
1969                if let Some(mut engine) = self.tantivy.take() {
1970                    self.rebuild_tantivy_engine(&mut engine)?;
1971                    self.tantivy = Some(engine);
1972                } else {
1973                    return Err(MemvidError::InvalidToc {
1974                        reason: "tantivy engine missing during doctor rebuild".into(),
1975                    });
1976                }
1977
1978                // Set lex_enabled to ensure it persists
1979                self.lex_enabled = true;
1980
1981                // Mark Tantivy as dirty so it gets flushed
1982                self.tantivy_dirty = true;
1983
1984                // Position embedded Tantivy segments immediately after the time index.
1985                self.data_end = footer_offset;
1986
1987                // Flush Tantivy segments to file
1988                self.flush_tantivy()?;
1989
1990                // Update footer_offset after Tantivy flush
1991                footer_offset = self.header.footer_offset;
1992
1993                // Restore data_end to payload boundary so future payload writes stay before indexes.
1994                self.data_end = payload_end;
1995            }
1996            #[cfg(not(feature = "lex"))]
1997            {
1998                self.toc.indexes.lex = None;
1999                self.toc.indexes.lex_segments.clear();
2000            }
2001        } else {
2002            // Lex disabled: clear everything
2003            self.toc.indexes.lex = None;
2004            self.toc.indexes.lex_segments.clear();
2005            #[cfg(feature = "lex")]
2006            if let Ok(mut storage) = self.lex_storage.write() {
2007                storage.clear();
2008            }
2009        }
2010
2011        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2012            let vec_offset = footer_offset;
2013            self.file.seek(SeekFrom::Start(vec_offset))?;
2014            self.file.write_all(&artifact.bytes)?;
2015            footer_offset += artifact.bytes.len() as u64;
2016            self.toc.indexes.vec = Some(VecIndexManifest {
2017                vector_count: artifact.vector_count,
2018                dimension: artifact.dimension,
2019                bytes_offset: vec_offset,
2020                bytes_length: artifact.bytes.len() as u64,
2021                checksum: artifact.checksum,
2022                compression_mode: self.vec_compression.clone(),
2023            });
2024            self.vec_index = Some(index);
2025        } else {
2026            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2027            if !self.vec_enabled {
2028                self.toc.indexes.vec = None;
2029            }
2030            self.vec_index = None;
2031        }
2032
2033        // Persist CLIP index if it has embeddings
2034        if self.clip_enabled {
2035            if let Some(ref clip_index) = self.clip_index {
2036                if !clip_index.is_empty() {
2037                    let artifact = clip_index.encode()?;
2038                    let clip_offset = footer_offset;
2039                    self.file.seek(SeekFrom::Start(clip_offset))?;
2040                    self.file.write_all(&artifact.bytes)?;
2041                    footer_offset += artifact.bytes.len() as u64;
2042                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2043                        bytes_offset: clip_offset,
2044                        bytes_length: artifact.bytes.len() as u64,
2045                        vector_count: artifact.vector_count,
2046                        dimension: artifact.dimension,
2047                        checksum: artifact.checksum,
2048                        model_name: crate::clip::default_model_info().name.to_string(),
2049                    });
2050                    tracing::info!(
2051                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2052                        artifact.vector_count,
2053                        clip_offset
2054                    );
2055                }
2056            }
2057        } else {
2058            self.toc.indexes.clip = None;
2059        }
2060
2061        // Persist memories track if it has cards
2062        if self.memories_track.card_count() > 0 {
2063            let memories_offset = footer_offset;
2064            let memories_bytes = self.memories_track.serialize()?;
2065            let memories_checksum = blake3::hash(&memories_bytes).into();
2066            self.file.seek(SeekFrom::Start(memories_offset))?;
2067            self.file.write_all(&memories_bytes)?;
2068            footer_offset += memories_bytes.len() as u64;
2069
2070            let stats = self.memories_track.stats();
2071            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2072                bytes_offset: memories_offset,
2073                bytes_length: memories_bytes.len() as u64,
2074                card_count: stats.card_count as u64,
2075                entity_count: stats.entity_count as u64,
2076                checksum: memories_checksum,
2077            });
2078        } else {
2079            self.toc.memories_track = None;
2080        }
2081
2082        // Persist logic mesh if it has nodes
2083        if !self.logic_mesh.is_empty() {
2084            let mesh_offset = footer_offset;
2085            let mesh_bytes = self.logic_mesh.serialize()?;
2086            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2087            self.file.seek(SeekFrom::Start(mesh_offset))?;
2088            self.file.write_all(&mesh_bytes)?;
2089            footer_offset += mesh_bytes.len() as u64;
2090
2091            let stats = self.logic_mesh.stats();
2092            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2093                bytes_offset: mesh_offset,
2094                bytes_length: mesh_bytes.len() as u64,
2095                node_count: stats.node_count as u64,
2096                edge_count: stats.edge_count as u64,
2097                checksum: mesh_checksum,
2098            });
2099        } else {
2100            self.toc.logic_mesh = None;
2101        }
2102
2103        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2104        tracing::info!(
2105            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2106            ti_offset,
2107            ti_length,
2108            footer_offset,
2109            self.header.footer_offset
2110        );
2111
2112        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2113        // This prevents overwriting data like replay segments that were written after index data
2114        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2115
2116        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2117        if self.file.metadata()?.len() < self.header.footer_offset {
2118            self.file.set_len(self.header.footer_offset)?;
2119        }
2120
2121        self.rewrite_toc_footer()?;
2122        self.header.toc_checksum = self.toc.toc_checksum;
2123        crate::persist_header(&mut self.file, &self.header)?;
2124
2125        #[cfg(feature = "lex")]
2126        if self.lex_enabled {
2127            if let Some(ref engine) = self.tantivy {
2128                let doc_count = engine.num_docs();
2129                let active_frame_count = self
2130                    .toc
2131                    .frames
2132                    .iter()
2133                    .filter(|f| f.status == FrameStatus::Active)
2134                    .count();
2135
2136                // Count frames that would actually be indexed by rebuild_tantivy_engine
2137                // Uses the same logic: content-type based check + size limit
2138                let text_indexable_count = self
2139                    .toc
2140                    .frames
2141                    .iter()
2142                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2143                    .count();
2144
2145                // Only fail if we have text-indexable frames but none got indexed
2146                // This avoids false positives for binary files (videos, images)
2147                if doc_count == 0 && text_indexable_count > 0 {
2148                    return Err(MemvidError::Doctor {
2149                        reason: format!(
2150                            "Lex index rebuild failed: 0 documents indexed from {} text-indexable frames. \
2151                            This indicates a critical failure in the rebuild process.",
2152                            text_indexable_count
2153                        ),
2154                    });
2155                }
2156
2157                // Success! Log it
2158                log::info!(
2159                    "✓ Doctor lex index rebuild succeeded: {} docs from {} frames ({} text-indexable)",
2160                    doc_count,
2161                    active_frame_count,
2162                    text_indexable_count
2163                );
2164            }
2165        }
2166
2167        Ok(())
2168    }
2169
2170    /// Persist the memories track to the file without a full rebuild.
2171    ///
2172    /// This is used when the memories track has been modified but no frame
2173    /// changes were made (e.g., after running enrichment).
2174    fn persist_memories_track(&mut self) -> Result<()> {
2175        if self.memories_track.card_count() == 0 {
2176            self.toc.memories_track = None;
2177            return Ok(());
2178        }
2179
2180        // Write after the current footer_offset
2181        let memories_offset = self.header.footer_offset;
2182        let memories_bytes = self.memories_track.serialize()?;
2183        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2184
2185        self.file.seek(SeekFrom::Start(memories_offset))?;
2186        self.file.write_all(&memories_bytes)?;
2187
2188        let stats = self.memories_track.stats();
2189        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2190            bytes_offset: memories_offset,
2191            bytes_length: memories_bytes.len() as u64,
2192            card_count: stats.card_count as u64,
2193            entity_count: stats.entity_count as u64,
2194            checksum: memories_checksum,
2195        });
2196
2197        // Update footer_offset to account for the memories track
2198        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2199
2200        // Ensure the file length covers the memories track
2201        if self.file.metadata()?.len() < self.header.footer_offset {
2202            self.file.set_len(self.header.footer_offset)?;
2203        }
2204
2205        Ok(())
2206    }
2207
2208    /// Persist the CLIP index to the file without a full rebuild.
2209    ///
2210    /// This is used when CLIP embeddings have been added but no full
2211    /// index rebuild is needed (e.g., in parallel segments mode).
2212    fn persist_clip_index(&mut self) -> Result<()> {
2213        if !self.clip_enabled {
2214            self.toc.indexes.clip = None;
2215            return Ok(());
2216        }
2217
2218        let clip_index = match &self.clip_index {
2219            Some(idx) if !idx.is_empty() => idx,
2220            _ => {
2221                self.toc.indexes.clip = None;
2222                return Ok(());
2223            }
2224        };
2225
2226        // Encode the CLIP index
2227        let artifact = clip_index.encode()?;
2228
2229        // Write after the current footer_offset
2230        let clip_offset = self.header.footer_offset;
2231        self.file.seek(SeekFrom::Start(clip_offset))?;
2232        self.file.write_all(&artifact.bytes)?;
2233
2234        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2235            bytes_offset: clip_offset,
2236            bytes_length: artifact.bytes.len() as u64,
2237            vector_count: artifact.vector_count,
2238            dimension: artifact.dimension,
2239            checksum: artifact.checksum,
2240            model_name: crate::clip::default_model_info().name.to_string(),
2241        });
2242
2243        tracing::info!(
2244            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2245            artifact.vector_count,
2246            clip_offset
2247        );
2248
2249        // Update footer_offset to account for the CLIP index
2250        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2251
2252        // Ensure the file length covers the CLIP index
2253        if self.file.metadata()?.len() < self.header.footer_offset {
2254            self.file.set_len(self.header.footer_offset)?;
2255        }
2256
2257        Ok(())
2258    }
2259
2260    /// Persist the Logic-Mesh to the file without a full rebuild.
2261    ///
2262    /// This is used when the Logic-Mesh has been modified but no frame
2263    /// changes were made (e.g., after running NER enrichment).
2264    fn persist_logic_mesh(&mut self) -> Result<()> {
2265        if self.logic_mesh.is_empty() {
2266            self.toc.logic_mesh = None;
2267            return Ok(());
2268        }
2269
2270        // Write after the current footer_offset
2271        let mesh_offset = self.header.footer_offset;
2272        let mesh_bytes = self.logic_mesh.serialize()?;
2273        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2274
2275        self.file.seek(SeekFrom::Start(mesh_offset))?;
2276        self.file.write_all(&mesh_bytes)?;
2277
2278        let stats = self.logic_mesh.stats();
2279        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2280            bytes_offset: mesh_offset,
2281            bytes_length: mesh_bytes.len() as u64,
2282            node_count: stats.node_count as u64,
2283            edge_count: stats.edge_count as u64,
2284            checksum: mesh_checksum,
2285        });
2286
2287        // Update footer_offset to account for the logic mesh
2288        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2289
2290        // Ensure the file length covers the logic mesh
2291        if self.file.metadata()?.len() < self.header.footer_offset {
2292            self.file.set_len(self.header.footer_offset)?;
2293        }
2294
2295        Ok(())
2296    }
2297
2298    /// Persist the sketch track to the file without a full rebuild.
2299    ///
2300    /// This is used when the sketch track has been modified (e.g., after
2301    /// running `sketch build`).
2302    fn persist_sketch_track(&mut self) -> Result<()> {
2303        if self.sketch_track.is_empty() {
2304            self.toc.sketch_track = None;
2305            return Ok(());
2306        }
2307
2308        // Seek to write after the current footer_offset
2309        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2310
2311        // Write the sketch track and get (offset, length, checksum)
2312        let (sketch_offset, sketch_length, sketch_checksum) =
2313            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2314
2315        let stats = self.sketch_track.stats();
2316        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2317            bytes_offset: sketch_offset,
2318            bytes_length: sketch_length,
2319            entry_count: stats.entry_count,
2320            entry_size: stats.variant.entry_size() as u16,
2321            flags: 0,
2322            checksum: sketch_checksum,
2323        });
2324
2325        // Update footer_offset to account for the sketch track
2326        self.header.footer_offset = sketch_offset + sketch_length;
2327
2328        // Ensure the file length covers the sketch track
2329        if self.file.metadata()?.len() < self.header.footer_offset {
2330            self.file.set_len(self.header.footer_offset)?;
2331        }
2332
2333        tracing::debug!(
2334            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2335            stats.entry_count,
2336            sketch_offset
2337        );
2338
2339        Ok(())
2340    }
2341
2342    #[cfg(feature = "lex")]
2343    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2344        let LexWalBatch {
2345            generation,
2346            doc_count,
2347            checksum,
2348            segments,
2349        } = batch;
2350
2351        if let Some(mut storage) = self.lex_storage.write().ok() {
2352            storage.replace(doc_count, checksum, segments);
2353            storage.set_generation(generation);
2354        }
2355
2356        let result = self.persist_lex_manifest();
2357        result
2358    }
2359
2360    #[cfg(feature = "lex")]
2361    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2362        let payload = encode_to_vec(&WalEntry::Lex(batch.clone()), wal_config())?;
2363        self.append_wal_entry(&payload)?;
2364        Ok(())
2365    }
2366
2367    #[cfg(feature = "lex")]
2368    fn persist_lex_manifest(&mut self) -> Result<()> {
2369        let (index_manifest, segments) = if let Some(storage) = self.lex_storage.read().ok() {
2370            storage.to_manifest()
2371        } else {
2372            (None, Vec::new())
2373        };
2374
2375        // Update the manifest
2376        if let Some(storage_manifest) = index_manifest {
2377            // Old LexIndexArtifact format: set the manifest with actual offset/length
2378            self.toc.indexes.lex = Some(storage_manifest);
2379        } else {
2380            // Tantivy segments OR lex disabled: clear the manifest
2381            // Stats will check lex_segments instead of manifest
2382            self.toc.indexes.lex = None;
2383        }
2384
2385        self.toc.indexes.lex_segments = segments;
2386
2387        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2388        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2389
2390        self.rewrite_toc_footer()?;
2391        self.header.toc_checksum = self.toc.toc_checksum;
2392        crate::persist_header(&mut self.file, &self.header)?;
2393        Ok(())
2394    }
2395
2396    #[cfg(feature = "lex")]
2397    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2398        let TantivySnapshot {
2399            doc_count,
2400            checksum,
2401            segments,
2402        } = snapshot;
2403
2404        let mut footer_offset = self.data_end;
2405        self.file.seek(SeekFrom::Start(footer_offset))?;
2406
2407        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2408        for segment in segments {
2409            let bytes_length = segment.bytes.len() as u64;
2410            self.file.write_all(&segment.bytes)?;
2411            self.file.flush()?; // Flush segment data to disk
2412            embedded_segments.push(EmbeddedLexSegment {
2413                path: segment.path,
2414                bytes_offset: footer_offset,
2415                bytes_length,
2416                checksum: segment.checksum,
2417            });
2418            footer_offset += bytes_length;
2419        }
2420        // Set footer_offset for TOC writing, but DON'T update data_end
2421        // data_end stays at end of payloads, so next commit overwrites these segments
2422        // Use max() to never decrease footer_offset - this preserves replay segments
2423        // that may have been written at a higher offset
2424        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2425
2426        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2427        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2428            Vec::with_capacity(embedded_segments.len());
2429        for segment in &embedded_segments {
2430            let descriptor = TantivySegmentDescriptor::from_common(
2431                SegmentCommon::new(
2432                    next_segment_id,
2433                    segment.bytes_offset,
2434                    segment.bytes_length,
2435                    segment.checksum,
2436                ),
2437                segment.path.clone(),
2438            );
2439            catalog_segments.push(descriptor);
2440            next_segment_id = next_segment_id.saturating_add(1);
2441        }
2442        if catalog_segments.is_empty() {
2443            self.toc.segment_catalog.tantivy_segments.clear();
2444        } else {
2445            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2446            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2447        }
2448        self.toc.segment_catalog.next_segment_id = next_segment_id;
2449
2450        // REMOVED: catalog_data_end() check
2451        // This was causing orphaned Tantivy segments because it would see OLD segments
2452        // still in the catalog from previous commits, and push footer_offset forward.
2453        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2454        // stay at the end of the newly written segments.
2455
2456        let generation = self
2457            .lex_storage
2458            .write()
2459            .map_err(|_| MemvidError::Tantivy {
2460                reason: "embedded lex storage lock poisoned".into(),
2461            })
2462            .map(|mut storage| {
2463                storage.replace(doc_count, checksum, embedded_segments.clone());
2464                storage.generation()
2465            })?;
2466
2467        let batch = LexWalBatch {
2468            generation,
2469            doc_count,
2470            checksum,
2471            segments: embedded_segments.clone(),
2472        };
2473        self.append_lex_batch(&batch)?;
2474        self.persist_lex_manifest()?;
2475        self.header.toc_checksum = self.toc.toc_checksum;
2476        crate::persist_header(&mut self.file, &self.header)?;
2477        Ok(())
2478    }
2479
2480    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2481        let frame =
2482            self.toc
2483                .frames
2484                .get_mut(frame_id as usize)
2485                .ok_or(MemvidError::InvalidFrame {
2486                    frame_id,
2487                    reason: "delete target missing",
2488                })?;
2489        frame.status = FrameStatus::Deleted;
2490        frame.superseded_by = None;
2491        self.remove_frame_from_indexes(frame_id)
2492    }
2493
2494    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2495        #[cfg(feature = "lex")]
2496        if let Some(engine) = self.tantivy.as_mut() {
2497            engine.delete_frame(frame_id)?;
2498            self.tantivy_dirty = true;
2499        }
2500        if let Some(index) = self.lex_index.as_mut() {
2501            index.remove_document(frame_id);
2502        }
2503        if let Some(index) = self.vec_index.as_mut() {
2504            index.remove(frame_id);
2505        }
2506        Ok(())
2507    }
2508
2509    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2510        self.toc
2511            .frames
2512            .get(frame_id as usize)
2513            .map_or(false, |frame| frame.status == FrameStatus::Active)
2514    }
2515
2516    #[cfg(feature = "parallel_segments")]
2517    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2518    where
2519        I: IntoIterator<Item = FrameId>,
2520    {
2521        let mut iter = iter.into_iter();
2522        let first_id = iter.next()?;
2523        let first_frame = self.toc.frames.get(first_id as usize);
2524        let mut min_id = first_id;
2525        let mut max_id = first_id;
2526        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2527        let mut page_end = first_frame
2528            .and_then(|frame| frame.chunk_count)
2529            .map(|count| page_start + count.saturating_sub(1))
2530            .unwrap_or(page_start);
2531        for frame_id in iter {
2532            if frame_id < min_id {
2533                min_id = frame_id;
2534            }
2535            if frame_id > max_id {
2536                max_id = frame_id;
2537            }
2538            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2539                if let Some(idx) = frame.chunk_index {
2540                    page_start = page_start.min(idx);
2541                    if let Some(count) = frame.chunk_count {
2542                        let end = idx + count.saturating_sub(1);
2543                        page_end = page_end.max(end);
2544                    } else {
2545                        page_end = page_end.max(idx);
2546                    }
2547                }
2548            }
2549        }
2550        Some(SegmentSpan {
2551            frame_start: min_id,
2552            frame_end: max_id,
2553            page_start,
2554            page_end,
2555            ..SegmentSpan::default()
2556        })
2557    }
2558
2559    #[cfg(feature = "parallel_segments")]
2560    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2561        common.span = Some(span);
2562        if common.codec_version == 0 {
2563            common.codec_version = 1;
2564        }
2565    }
2566
2567    #[cfg(feature = "parallel_segments")]
2568    pub(crate) fn record_index_segment(
2569        &mut self,
2570        kind: SegmentKind,
2571        common: SegmentCommon,
2572        stats: SegmentStats,
2573    ) -> Result<()> {
2574        let entry = IndexSegmentRef {
2575            kind,
2576            common,
2577            stats,
2578        };
2579        self.toc.segment_catalog.index_segments.push(entry.clone());
2580        if let Some(wal) = self.manifest_wal.as_mut() {
2581            wal.append_segments(&[entry])?;
2582        }
2583        Ok(())
2584    }
2585
2586    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2587        self.ensure_writable()?;
2588        if self.toc.ticket_ref.issuer == "free-tier" {
2589            return Ok(());
2590        }
2591        match self.tier() {
2592            Tier::Free => Ok(()),
2593            tier => {
2594                if self.toc.ticket_ref.issuer.trim().is_empty() {
2595                    Err(MemvidError::TicketRequired { tier })
2596                } else {
2597                    Ok(())
2598                }
2599            }
2600        }
2601    }
2602
2603    pub(crate) fn tier(&self) -> Tier {
2604        if self.header.wal_size >= WAL_SIZE_LARGE {
2605            Tier::Enterprise
2606        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2607            Tier::Dev
2608        } else {
2609            Tier::Free
2610        }
2611    }
2612
2613    pub(crate) fn capacity_limit(&self) -> u64 {
2614        if self.toc.ticket_ref.capacity_bytes != 0 {
2615            self.toc.ticket_ref.capacity_bytes
2616        } else {
2617            self.tier().capacity_bytes()
2618        }
2619    }
2620
2621    /// Get current storage capacity in bytes.
2622    ///
2623    /// Returns the capacity from the applied ticket, or the default
2624    /// tier capacity (1 GB for free tier).
2625    pub fn get_capacity(&self) -> u64 {
2626        self.capacity_limit()
2627    }
2628
2629    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2630        tracing::info!(
2631            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2632            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2633            time_segments = self.toc.segment_catalog.time_segments.len(),
2634            footer_offset = self.header.footer_offset,
2635            data_end = self.data_end,
2636            "rewrite_toc_footer: about to serialize TOC"
2637        );
2638        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2639        let footer_offset = self.header.footer_offset;
2640        self.file.seek(SeekFrom::Start(footer_offset))?;
2641        self.file.write_all(&toc_bytes)?;
2642        let footer = CommitFooter {
2643            toc_len: toc_bytes.len() as u64,
2644            toc_hash: *hash(&toc_bytes).as_bytes(),
2645            generation: self.generation,
2646        };
2647        let encoded_footer = footer.encode();
2648        self.file.write_all(&encoded_footer)?;
2649
2650        // The file must always be at least header + WAL size
2651        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2652        let min_len = self.header.wal_offset + self.header.wal_size;
2653        let final_len = new_len.max(min_len);
2654
2655        if new_len < min_len {
2656            tracing::warn!(
2657                file.new_len = new_len,
2658                file.min_len = min_len,
2659                file.final_len = final_len,
2660                "truncation would cut into WAL region, clamping to min_len"
2661            );
2662        }
2663
2664        self.file.set_len(final_len)?;
2665        // Ensure footer is flushed to disk so mmap-based readers can find it
2666        self.file.sync_all()?;
2667        Ok(())
2668    }
2669}
2670
2671#[cfg(feature = "parallel_segments")]
2672impl Memvid {
2673    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2674        let chunks = self.collect_segment_chunks(delta)?;
2675        if chunks.is_empty() {
2676            return Ok(false);
2677        }
2678        let planner = SegmentPlanner::new(opts.clone());
2679        let plans = planner.plan_from_chunks(chunks);
2680        if plans.is_empty() {
2681            return Ok(false);
2682        }
2683        let worker_pool = SegmentWorkerPool::new(opts);
2684        let results = worker_pool.execute(plans)?;
2685        if results.is_empty() {
2686            return Ok(false);
2687        }
2688        self.append_parallel_segments(results)?;
2689        Ok(true)
2690    }
2691
2692    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2693        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2694            delta.inserted_embeddings.iter().cloned().collect();
2695        tracing::info!(
2696            inserted_frames = ?delta.inserted_frames,
2697            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2698            "collect_segment_chunks: comparing frame IDs"
2699        );
2700        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2701        for frame_id in &delta.inserted_frames {
2702            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2703                MemvidError::InvalidFrame {
2704                    frame_id: *frame_id,
2705                    reason: "frame id out of range while planning segments",
2706                },
2707            )?;
2708            let text = self.frame_content(&frame)?;
2709            if text.trim().is_empty() {
2710                continue;
2711            }
2712            let token_estimate = estimate_tokens(&text);
2713            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2714            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2715            let page_start = if frame.chunk_index.is_some() {
2716                chunk_index + 1
2717            } else {
2718                0
2719            };
2720            let page_end = if frame.chunk_index.is_some() {
2721                page_start
2722            } else {
2723                0
2724            };
2725            chunks.push(SegmentChunkPlan {
2726                text,
2727                frame_id: *frame_id,
2728                timestamp: frame.timestamp,
2729                chunk_index,
2730                chunk_count: chunk_count.max(1),
2731                token_estimate,
2732                token_start: 0,
2733                token_end: 0,
2734                page_start,
2735                page_end,
2736                embedding: embedding_map.remove(frame_id),
2737            });
2738        }
2739        Ok(chunks)
2740    }
2741}
2742
2743#[cfg(feature = "parallel_segments")]
2744fn estimate_tokens(text: &str) -> usize {
2745    text.split_whitespace().count().max(1)
2746}
2747
2748impl Memvid {
2749    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2750        let catalog_end = self.catalog_data_end();
2751        if catalog_end <= self.header.footer_offset {
2752            return Ok(false);
2753        }
2754        self.header.footer_offset = catalog_end;
2755        self.rewrite_toc_footer()?;
2756        self.header.toc_checksum = self.toc.toc_checksum;
2757        crate::persist_header(&mut self.file, &self.header)?;
2758        Ok(true)
2759    }
2760}
2761
2762impl Memvid {
2763    pub fn vacuum(&mut self) -> Result<()> {
2764        self.commit()?;
2765
2766        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2767        let frames: Vec<Frame> = self
2768            .toc
2769            .frames
2770            .iter()
2771            .filter(|frame| frame.status == FrameStatus::Active)
2772            .cloned()
2773            .collect();
2774        for frame in frames {
2775            let bytes = self.read_frame_payload_bytes(&frame)?;
2776            active_payloads.insert(frame.id, bytes);
2777        }
2778
2779        let mut cursor = self.header.wal_offset + self.header.wal_size;
2780        self.file.seek(SeekFrom::Start(cursor))?;
2781        for frame in &mut self.toc.frames {
2782            if frame.status == FrameStatus::Active {
2783                if let Some(bytes) = active_payloads.get(&frame.id) {
2784                    self.file.write_all(bytes)?;
2785                    frame.payload_offset = cursor;
2786                    frame.payload_length = bytes.len() as u64;
2787                    cursor += bytes.len() as u64;
2788                } else {
2789                    frame.payload_offset = 0;
2790                    frame.payload_length = 0;
2791                }
2792            } else {
2793                frame.payload_offset = 0;
2794                frame.payload_length = 0;
2795            }
2796        }
2797
2798        self.data_end = cursor;
2799
2800        self.toc.segments.clear();
2801        self.toc.indexes.lex_segments.clear();
2802        self.toc.segment_catalog.lex_segments.clear();
2803        self.toc.segment_catalog.vec_segments.clear();
2804        self.toc.segment_catalog.time_segments.clear();
2805        #[cfg(feature = "temporal_track")]
2806        {
2807            self.toc.temporal_track = None;
2808            self.toc.segment_catalog.temporal_segments.clear();
2809        }
2810        #[cfg(feature = "lex")]
2811        {
2812            self.toc.segment_catalog.tantivy_segments.clear();
2813        }
2814        #[cfg(feature = "parallel_segments")]
2815        {
2816            self.toc.segment_catalog.index_segments.clear();
2817        }
2818
2819        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
2820        #[cfg(feature = "lex")]
2821        {
2822            self.tantivy = None;
2823            self.tantivy_dirty = false;
2824        }
2825
2826        self.rebuild_indexes(&[])?;
2827        self.file.sync_all()?;
2828        Ok(())
2829    }
2830
2831    /// Preview how a document would be chunked without actually ingesting it.
2832    ///
2833    /// This is useful when you need to compute embeddings for each chunk externally
2834    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
2835    /// is too small to be chunked (< 2400 chars after normalization).
2836    ///
2837    /// # Example
2838    /// ```ignore
2839    /// let chunks = mem.preview_chunks(b"long document text...")?;
2840    /// if let Some(chunk_texts) = chunks {
2841    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
2842    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
2843    /// } else {
2844    ///     let embedding = my_embedder.embed_query(text)?;
2845    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
2846    /// }
2847    /// ```
2848    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2849        plan_document_chunks(payload).map(|plan| plan.chunks)
2850    }
2851
2852    /// Append raw bytes as a document frame.
2853    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2854        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2855    }
2856
2857    /// Append raw bytes with explicit metadata/options.
2858    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2859        self.put_internal(Some(payload), None, None, None, options, None)
2860    }
2861
2862    /// Append bytes and an existing embedding (bypasses on-device embedding).
2863    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2864        self.put_internal(
2865            Some(payload),
2866            None,
2867            Some(embedding),
2868            None,
2869            PutOptions::default(),
2870            None,
2871        )
2872    }
2873
2874    pub fn put_with_embedding_and_options(
2875        &mut self,
2876        payload: &[u8],
2877        embedding: Vec<f32>,
2878        options: PutOptions,
2879    ) -> Result<u64> {
2880        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2881    }
2882
2883    /// Ingest a document with pre-computed embeddings for both parent and chunks.
2884    ///
2885    /// This is the recommended API for high-accuracy semantic search when chunking
2886    /// occurs. The caller provides:
2887    /// - `payload`: The document bytes
2888    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
2889    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
2890    /// - `options`: Standard put options
2891    ///
2892    /// The number of chunk embeddings should match the number of chunks that will be
2893    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
2894    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
2895    pub fn put_with_chunk_embeddings(
2896        &mut self,
2897        payload: &[u8],
2898        parent_embedding: Option<Vec<f32>>,
2899        chunk_embeddings: Vec<Vec<f32>>,
2900        options: PutOptions,
2901    ) -> Result<u64> {
2902        self.put_internal(
2903            Some(payload),
2904            None,
2905            parent_embedding,
2906            Some(chunk_embeddings),
2907            options,
2908            None,
2909        )
2910    }
2911
2912    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
2913    pub fn update_frame(
2914        &mut self,
2915        frame_id: FrameId,
2916        payload: Option<Vec<u8>>,
2917        mut options: PutOptions,
2918        embedding: Option<Vec<f32>>,
2919    ) -> Result<u64> {
2920        self.ensure_mutation_allowed()?;
2921        let existing = self.frame_by_id(frame_id)?;
2922        if existing.status != FrameStatus::Active {
2923            return Err(MemvidError::InvalidFrame {
2924                frame_id,
2925                reason: "frame is not active",
2926            });
2927        }
2928
2929        if options.timestamp.is_none() {
2930            options.timestamp = Some(existing.timestamp);
2931        }
2932        if options.track.is_none() {
2933            options.track = existing.track.clone();
2934        }
2935        if options.kind.is_none() {
2936            options.kind = existing.kind.clone();
2937        }
2938        if options.uri.is_none() {
2939            options.uri = existing.uri.clone();
2940        }
2941        if options.title.is_none() {
2942            options.title = existing.title.clone();
2943        }
2944        if options.metadata.is_none() {
2945            options.metadata = existing.metadata.clone();
2946        }
2947        if options.search_text.is_none() {
2948            options.search_text = existing.search_text.clone();
2949        }
2950        if options.tags.is_empty() {
2951            options.tags = existing.tags.clone();
2952        }
2953        if options.labels.is_empty() {
2954            options.labels = existing.labels.clone();
2955        }
2956        if options.extra_metadata.is_empty() {
2957            options.extra_metadata = existing.extra_metadata.clone();
2958        }
2959
2960        let reuse_frame = if payload.is_none() {
2961            options.auto_tag = false;
2962            options.extract_dates = false;
2963            Some(existing.clone())
2964        } else {
2965            None
2966        };
2967
2968        let effective_embedding = if let Some(explicit) = embedding {
2969            Some(explicit)
2970        } else if self.vec_enabled {
2971            self.frame_embedding(frame_id)?
2972        } else {
2973            None
2974        };
2975
2976        let payload_slice = payload.as_deref();
2977        let reuse_flag = reuse_frame.is_some();
2978        let replace_flag = payload_slice.is_some();
2979        let seq = self.put_internal(
2980            payload_slice,
2981            reuse_frame,
2982            effective_embedding,
2983            None, // No chunk embeddings for update
2984            options,
2985            Some(frame_id),
2986        )?;
2987        info!(
2988            "frame_update frame_id={} seq={} reused_payload={} replaced_payload={}",
2989            frame_id, seq, reuse_flag, replace_flag
2990        );
2991        Ok(seq)
2992    }
2993
2994    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
2995        self.ensure_mutation_allowed()?;
2996        let frame = self.frame_by_id(frame_id)?;
2997        if frame.status != FrameStatus::Active {
2998            return Err(MemvidError::InvalidFrame {
2999                frame_id,
3000                reason: "frame is not active",
3001            });
3002        }
3003
3004        let mut tombstone = WalEntryData {
3005            timestamp: SystemTime::now()
3006                .duration_since(UNIX_EPOCH)
3007                .map(|d| d.as_secs() as i64)
3008                .unwrap_or(frame.timestamp),
3009            kind: None,
3010            track: None,
3011            payload: Vec::new(),
3012            embedding: None,
3013            uri: frame.uri.clone(),
3014            title: frame.title.clone(),
3015            canonical_encoding: frame.canonical_encoding,
3016            canonical_length: frame.canonical_length,
3017            metadata: None,
3018            search_text: None,
3019            tags: Vec::new(),
3020            labels: Vec::new(),
3021            extra_metadata: BTreeMap::new(),
3022            content_dates: Vec::new(),
3023            chunk_manifest: None,
3024            role: frame.role,
3025            parent_sequence: None,
3026            chunk_index: frame.chunk_index,
3027            chunk_count: frame.chunk_count,
3028            op: FrameWalOp::Tombstone,
3029            target_frame_id: Some(frame_id),
3030            supersedes_frame_id: None,
3031            reuse_payload_from: None,
3032            source_sha256: None,
3033            source_path: None,
3034            enrichment_state: crate::types::EnrichmentState::default(),
3035        };
3036        tombstone.kind = frame.kind.clone();
3037        tombstone.track = frame.track.clone();
3038
3039        let payload_bytes = encode_to_vec(&WalEntry::Frame(tombstone), wal_config())?;
3040        let seq = self.append_wal_entry(&payload_bytes)?;
3041        self.dirty = true;
3042        if self.wal.should_checkpoint() {
3043            self.commit()?;
3044        }
3045        info!("frame_delete frame_id={} seq={}", frame_id, seq);
3046        Ok(seq)
3047    }
3048}
3049
3050impl Memvid {
3051    fn put_internal(
3052        &mut self,
3053        payload: Option<&[u8]>,
3054        reuse_frame: Option<Frame>,
3055        embedding: Option<Vec<f32>>,
3056        chunk_embeddings: Option<Vec<Vec<f32>>>,
3057        mut options: PutOptions,
3058        supersedes: Option<FrameId>,
3059    ) -> Result<u64> {
3060        self.ensure_mutation_allowed()?;
3061
3062        // Deduplication: if enabled and we have payload, check if identical content exists
3063        if options.dedup {
3064            if let Some(bytes) = payload {
3065                let content_hash = hash(bytes);
3066                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3067                    // Found existing frame with same content hash, skip ingestion
3068                    tracing::debug!(
3069                        frame_id = existing_frame.id,
3070                        "dedup: skipping ingestion, identical content already exists"
3071                    );
3072                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3073                    return Ok(existing_frame.id);
3074                }
3075            }
3076        }
3077
3078        if payload.is_some() && reuse_frame.is_some() {
3079            let frame_id = reuse_frame
3080                .as_ref()
3081                .map(|frame| frame.id)
3082                .unwrap_or_default();
3083            return Err(MemvidError::InvalidFrame {
3084                frame_id,
3085                reason: "cannot reuse payload when bytes are provided",
3086            });
3087        }
3088
3089        // If the caller supplies embeddings, enforce a single vector dimension contract
3090        // for the entire memory (fail fast, never silently accept mixed dimensions).
3091        let incoming_dimension = {
3092            let mut dim: Option<u32> = None;
3093
3094            if let Some(ref vector) = embedding {
3095                if !vector.is_empty() {
3096                    dim = Some(vector.len() as u32);
3097                }
3098            }
3099
3100            if let Some(ref vectors) = chunk_embeddings {
3101                for vector in vectors {
3102                    if vector.is_empty() {
3103                        continue;
3104                    }
3105                    let vec_dim = vector.len() as u32;
3106                    match dim {
3107                        None => dim = Some(vec_dim),
3108                        Some(existing) if existing == vec_dim => {}
3109                        Some(existing) => {
3110                            return Err(MemvidError::VecDimensionMismatch {
3111                                expected: existing,
3112                                actual: vector.len(),
3113                            });
3114                        }
3115                    }
3116                }
3117            }
3118
3119            dim
3120        };
3121
3122        if let Some(incoming_dimension) = incoming_dimension {
3123            // Embeddings imply vector search should be enabled.
3124            if !self.vec_enabled {
3125                self.enable_vec()?;
3126            }
3127
3128            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3129                if existing_dimension != incoming_dimension {
3130                    return Err(MemvidError::VecDimensionMismatch {
3131                        expected: existing_dimension,
3132                        actual: incoming_dimension as usize,
3133                    });
3134                }
3135            }
3136
3137            // Persist the dimension early for better auto-detection (even before the next commit).
3138            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3139                if manifest.dimension == 0 {
3140                    manifest.dimension = incoming_dimension;
3141                }
3142            }
3143        }
3144
3145        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3146        let payload_tail = self.payload_region_end();
3147        let projected = if let Some(bytes) = payload {
3148            let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3149            let len = prepared.len();
3150            prepared_payload = Some((prepared, encoding, length));
3151            payload_tail.saturating_add(len as u64)
3152        } else if reuse_frame.is_some() {
3153            payload_tail
3154        } else {
3155            return Err(MemvidError::InvalidFrame {
3156                frame_id: 0,
3157                reason: "payload required for frame insertion",
3158            });
3159        };
3160
3161        let capacity_limit = self.capacity_limit();
3162        if projected > capacity_limit {
3163            let incoming_size = projected.saturating_sub(payload_tail);
3164            return Err(MemvidError::CapacityExceeded {
3165                current: payload_tail,
3166                limit: capacity_limit,
3167                required: incoming_size,
3168            });
3169        }
3170        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3171            SystemTime::now()
3172                .duration_since(UNIX_EPOCH)
3173                .map(|d| d.as_secs() as i64)
3174                .unwrap_or(0)
3175        });
3176
3177        let mut _reuse_bytes: Option<Vec<u8>> = None;
3178        let payload_for_processing = if let Some(bytes) = payload {
3179            Some(bytes)
3180        } else if let Some(frame) = reuse_frame.as_ref() {
3181            let bytes = self.frame_canonical_bytes(frame)?;
3182            _reuse_bytes = Some(bytes);
3183            _reuse_bytes.as_deref()
3184        } else {
3185            None
3186        };
3187
3188        // Try to create a chunk plan from raw UTF-8 bytes first
3189        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3190            (Some(bytes), None) => plan_document_chunks(bytes),
3191            _ => None,
3192        };
3193
3194        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3195        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3196        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3197        let mut source_sha256: Option<[u8; 32]> = None;
3198        let source_path_value = options.source_path.take();
3199
3200        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3201            if raw_chunk_plan.is_some() {
3202                // UTF-8 text document - chunks contain the text, no parent payload needed
3203                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3204            } else if options.no_raw {
3205                // --no-raw mode: don't store the raw binary, only compute hash
3206                if let Some(bytes) = payload {
3207                    // Compute BLAKE3 hash of original binary for verification
3208                    let hash_result = hash(bytes);
3209                    source_sha256 = Some(*hash_result.as_bytes());
3210                    // Store empty payload - the extracted text is in search_text
3211                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3212                } else {
3213                    return Err(MemvidError::InvalidFrame {
3214                        frame_id: 0,
3215                        reason: "payload required for --no-raw mode",
3216                    });
3217                }
3218            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3219                (prepared, encoding, length, None)
3220            } else if let Some(bytes) = payload {
3221                let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3222                (prepared, encoding, length, None)
3223            } else if let Some(frame) = reuse_frame.as_ref() {
3224                (
3225                    Vec::new(),
3226                    frame.canonical_encoding,
3227                    frame.canonical_length,
3228                    Some(frame.id),
3229                )
3230            } else {
3231                return Err(MemvidError::InvalidFrame {
3232                    frame_id: 0,
3233                    reason: "payload required for frame insertion",
3234                });
3235            };
3236
3237        // Track whether we'll create an extracted text chunk plan later
3238        let mut chunk_plan = raw_chunk_plan;
3239
3240        let mut metadata = options.metadata.take();
3241        let mut search_text = options
3242            .search_text
3243            .take()
3244            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3245        let mut tags = std::mem::take(&mut options.tags);
3246        let mut labels = std::mem::take(&mut options.labels);
3247        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3248        let mut content_dates: Vec<String> = Vec::new();
3249
3250        let need_search_text = search_text
3251            .as_ref()
3252            .map_or(true, |text| text.trim().is_empty());
3253        let need_metadata = metadata.is_none();
3254        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3255
3256        let mut extraction_error = None;
3257        let mut is_skim_extraction = false; // Track if extraction was time-limited
3258
3259        let extracted = if run_extractor {
3260            if let Some(bytes) = payload_for_processing {
3261                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3262                let uri_hint = options.uri.as_deref();
3263
3264                // Use time-budgeted extraction for instant indexing with a budget
3265                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3266
3267                if use_budgeted {
3268                    // Time-budgeted extraction for sub-second ingestion
3269                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3270                        options.extraction_budget_ms,
3271                    );
3272                    match crate::extract_budgeted::extract_with_budget(
3273                        bytes, mime_hint, uri_hint, budget,
3274                    ) {
3275                        Ok(result) => {
3276                            is_skim_extraction = result.is_skim();
3277                            if is_skim_extraction {
3278                                tracing::debug!(
3279                                    coverage = result.coverage,
3280                                    elapsed_ms = result.elapsed_ms,
3281                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3282                                    "time-budgeted extraction (skim)"
3283                                );
3284                            }
3285                            // Convert BudgetedExtractionResult to ExtractedDocument
3286                            let doc = crate::extract::ExtractedDocument {
3287                                text: if result.text.is_empty() {
3288                                    None
3289                                } else {
3290                                    Some(result.text)
3291                                },
3292                                metadata: serde_json::json!({
3293                                    "skim": is_skim_extraction,
3294                                    "coverage": result.coverage,
3295                                    "sections_extracted": result.sections_extracted,
3296                                    "sections_total": result.sections_total,
3297                                }),
3298                                mime_type: mime_hint.map(|s| s.to_string()),
3299                            };
3300                            Some(doc)
3301                        }
3302                        Err(err) => {
3303                            // Fall back to full extraction on budgeted extraction error
3304                            tracing::warn!(
3305                                ?err,
3306                                "budgeted extraction failed, trying full extraction"
3307                            );
3308                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3309                                Ok(doc) => Some(doc),
3310                                Err(err) => {
3311                                    extraction_error = Some(err);
3312                                    None
3313                                }
3314                            }
3315                        }
3316                    }
3317                } else {
3318                    // Full extraction (no time budget)
3319                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3320                        Ok(doc) => Some(doc),
3321                        Err(err) => {
3322                            extraction_error = Some(err);
3323                            None
3324                        }
3325                    }
3326                }
3327            } else {
3328                None
3329            }
3330        } else {
3331            None
3332        };
3333
3334        if let Some(err) = extraction_error {
3335            return Err(err);
3336        }
3337
3338        if let Some(doc) = &extracted {
3339            if need_search_text {
3340                if let Some(text) = &doc.text {
3341                    if let Some(normalized) =
3342                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3343                    {
3344                        search_text = Some(normalized);
3345                    }
3346                }
3347            }
3348
3349            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3350            // from extracted text. This ensures large documents like PDFs get fully indexed.
3351            if chunk_plan.is_none() {
3352                if let Some(text) = &doc.text {
3353                    chunk_plan = plan_text_chunks(text);
3354                }
3355            }
3356
3357            if let Some(mime) = doc.mime_type.as_ref() {
3358                match &mut metadata {
3359                    Some(existing) => {
3360                        if existing.mime.is_none() {
3361                            existing.mime = Some(mime.clone());
3362                        }
3363                    }
3364                    None => {
3365                        let mut doc_meta = DocMetadata::default();
3366                        doc_meta.mime = Some(mime.clone());
3367                        metadata = Some(doc_meta);
3368                    }
3369                }
3370            }
3371
3372            if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3373                extra_metadata
3374                    .entry("extractous_metadata".to_string())
3375                    .or_insert(meta_json);
3376            }
3377        }
3378
3379        if options.auto_tag {
3380            if let Some(ref text) = search_text {
3381                if !text.trim().is_empty() {
3382                    let result = AutoTagger::default().analyse(text, options.extract_dates);
3383                    merge_unique(&mut tags, result.tags);
3384                    merge_unique(&mut labels, result.labels);
3385                    if options.extract_dates && content_dates.is_empty() {
3386                        content_dates = result.content_dates;
3387                    }
3388                }
3389            }
3390        }
3391
3392        if content_dates.is_empty() {
3393            if let Some(frame) = reuse_frame.as_ref() {
3394                content_dates = frame.content_dates.clone();
3395            }
3396        }
3397
3398        let metadata_ref = metadata.as_ref();
3399        let mut search_text = augment_search_text(
3400            search_text,
3401            options.uri.as_deref(),
3402            options.title.as_deref(),
3403            options.track.as_deref(),
3404            &tags,
3405            &labels,
3406            &extra_metadata,
3407            &content_dates,
3408            metadata_ref,
3409        );
3410        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3411        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3412        let mut parent_chunk_count: Option<u32> = None;
3413
3414        let kind_value = options.kind.take();
3415        let track_value = options.track.take();
3416        let uri_value = options.uri.take();
3417        let title_value = options.title.take();
3418        let should_extract_triplets = options.extract_triplets;
3419        // Save references for triplet extraction (after search_text is moved into WAL entry)
3420        let triplet_uri = uri_value.clone();
3421        let triplet_title = title_value.clone();
3422
3423        if let Some(plan) = chunk_plan.as_ref() {
3424            let chunk_total = plan.chunks.len() as u32;
3425            parent_chunk_manifest = Some(plan.manifest.clone());
3426            parent_chunk_count = Some(chunk_total);
3427
3428            if let Some(first_chunk) = plan.chunks.first() {
3429                if let Some(normalized) =
3430                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3431                {
3432                    if !normalized.trim().is_empty() {
3433                        search_text = Some(normalized);
3434                    }
3435                }
3436            }
3437
3438            let chunk_tags = tags.clone();
3439            let chunk_labels = labels.clone();
3440            let chunk_metadata = metadata.clone();
3441            let chunk_extra_metadata = extra_metadata.clone();
3442            let chunk_content_dates = content_dates.clone();
3443
3444            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3445                let (chunk_payload, chunk_encoding, chunk_length) =
3446                    prepare_canonical_payload(chunk_text.as_bytes())?;
3447                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3448                    .map(|n| n.text)
3449                    .filter(|text| !text.trim().is_empty());
3450
3451                let chunk_uri = uri_value
3452                    .as_ref()
3453                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3454                let chunk_title = title_value
3455                    .as_ref()
3456                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3457
3458                // Use provided chunk embedding if available, otherwise None
3459                let chunk_embedding = chunk_embeddings
3460                    .as_ref()
3461                    .and_then(|embeddings| embeddings.get(idx).cloned());
3462
3463                chunk_entries.push(WalEntryData {
3464                    timestamp,
3465                    kind: kind_value.clone(),
3466                    track: track_value.clone(),
3467                    payload: chunk_payload,
3468                    embedding: chunk_embedding,
3469                    uri: chunk_uri,
3470                    title: chunk_title,
3471                    canonical_encoding: chunk_encoding,
3472                    canonical_length: chunk_length,
3473                    metadata: chunk_metadata.clone(),
3474                    search_text: chunk_search_text,
3475                    tags: chunk_tags.clone(),
3476                    labels: chunk_labels.clone(),
3477                    extra_metadata: chunk_extra_metadata.clone(),
3478                    content_dates: chunk_content_dates.clone(),
3479                    chunk_manifest: None,
3480                    role: FrameRole::DocumentChunk,
3481                    parent_sequence: None,
3482                    chunk_index: Some(idx as u32),
3483                    chunk_count: Some(chunk_total),
3484                    op: FrameWalOp::Insert,
3485                    target_frame_id: None,
3486                    supersedes_frame_id: None,
3487                    reuse_payload_from: None,
3488                    source_sha256: None, // Chunks don't have source references
3489                    source_path: None,
3490                    // Chunks are already extracted, so mark as Enriched
3491                    enrichment_state: crate::types::EnrichmentState::Enriched,
3492                });
3493            }
3494        }
3495
3496        let parent_uri = uri_value.clone();
3497        let parent_title = title_value.clone();
3498
3499        // Get parent_sequence from options.parent_id if provided
3500        // We need the WAL sequence of the parent frame to link them
3501        let parent_sequence = if let Some(parent_id) = options.parent_id {
3502            // Look up the parent frame to get its WAL sequence
3503            // Since frame.id corresponds to the array index, we need to find the sequence
3504            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3505            // This works because sequence numbers are assigned incrementally
3506            self.toc
3507                .frames
3508                .get(parent_id as usize)
3509                .map(|_| parent_id + 2) // WAL sequences start at 2
3510        } else {
3511            None
3512        };
3513
3514        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3515        let triplet_text = search_text.clone();
3516
3517        // Capture values needed for instant indexing BEFORE they're moved into entry
3518        #[cfg(feature = "lex")]
3519        let instant_index_tags = if options.instant_index {
3520            tags.clone()
3521        } else {
3522            Vec::new()
3523        };
3524        #[cfg(feature = "lex")]
3525        let instant_index_labels = if options.instant_index {
3526            labels.clone()
3527        } else {
3528            Vec::new()
3529        };
3530
3531        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3532        #[cfg(feature = "lex")]
3533        let needs_enrichment =
3534            options.instant_index && (options.enable_embedding || is_skim_extraction);
3535        #[cfg(feature = "lex")]
3536        let enrichment_state = if needs_enrichment {
3537            crate::types::EnrichmentState::Searchable
3538        } else {
3539            crate::types::EnrichmentState::Enriched
3540        };
3541        #[cfg(not(feature = "lex"))]
3542        let enrichment_state = crate::types::EnrichmentState::Enriched;
3543
3544        let entry = WalEntryData {
3545            timestamp,
3546            kind: kind_value,
3547            track: track_value,
3548            payload: storage_payload,
3549            embedding,
3550            uri: parent_uri,
3551            title: parent_title,
3552            canonical_encoding,
3553            canonical_length,
3554            metadata,
3555            search_text,
3556            tags,
3557            labels,
3558            extra_metadata,
3559            content_dates,
3560            chunk_manifest: parent_chunk_manifest,
3561            role: options.role,
3562            parent_sequence,
3563            chunk_index: None,
3564            chunk_count: parent_chunk_count,
3565            op: FrameWalOp::Insert,
3566            target_frame_id: None,
3567            supersedes_frame_id: supersedes,
3568            reuse_payload_from,
3569            source_sha256,
3570            source_path: source_path_value,
3571            enrichment_state,
3572        };
3573
3574        let parent_bytes = encode_to_vec(&WalEntry::Frame(entry), wal_config())?;
3575        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3576        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3577
3578        // Instant indexing: make frame searchable immediately (<1s) without full commit
3579        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3580        #[cfg(feature = "lex")]
3581        if options.instant_index {
3582            if self.tantivy.is_some() {
3583                // Create a minimal frame for indexing
3584                let frame_id = parent_seq as FrameId;
3585
3586                // Use triplet_text which was cloned before entry was created
3587                if let Some(ref text) = triplet_text {
3588                    if !text.trim().is_empty() {
3589                        // Create temporary frame for indexing (minimal fields for Tantivy)
3590                        let temp_frame = Frame {
3591                            id: frame_id,
3592                            timestamp,
3593                            anchor_ts: None,
3594                            anchor_source: None,
3595                            kind: options.kind.clone(),
3596                            track: options.track.clone(),
3597                            payload_offset: 0,
3598                            payload_length: 0,
3599                            checksum: [0u8; 32],
3600                            uri: options
3601                                .uri
3602                                .clone()
3603                                .or_else(|| Some(crate::default_uri(frame_id))),
3604                            title: options.title.clone(),
3605                            canonical_encoding: crate::types::CanonicalEncoding::default(),
3606                            canonical_length: None,
3607                            metadata: None, // Not needed for text search
3608                            search_text: triplet_text.clone(),
3609                            tags: instant_index_tags.clone(),
3610                            labels: instant_index_labels.clone(),
3611                            extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3612                            content_dates: Vec::new(), // Not needed for search
3613                            chunk_manifest: None,
3614                            role: options.role,
3615                            parent_id: None,
3616                            chunk_index: None,
3617                            chunk_count: None,
3618                            status: FrameStatus::Active,
3619                            supersedes: supersedes,
3620                            superseded_by: None,
3621                            source_sha256: None, // Not needed for search
3622                            source_path: None,   // Not needed for search
3623                            enrichment_state: crate::types::EnrichmentState::Searchable,
3624                        };
3625
3626                        // Get mutable reference to engine and index the frame
3627                        if let Some(engine) = self.tantivy.as_mut() {
3628                            engine.add_frame(&temp_frame, text)?;
3629                            engine.soft_commit()?;
3630                            self.tantivy_dirty = true;
3631
3632                            tracing::debug!(
3633                                frame_id = frame_id,
3634                                "instant index: frame searchable immediately"
3635                            );
3636                        }
3637                    }
3638                }
3639            }
3640        }
3641
3642        // Queue frame for background enrichment when using instant index path
3643        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3644        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3645        #[cfg(feature = "lex")]
3646        if needs_enrichment {
3647            let frame_id = parent_seq as FrameId;
3648            self.toc.enrichment_queue.push(frame_id);
3649            tracing::debug!(
3650                frame_id = frame_id,
3651                is_skim = is_skim_extraction,
3652                needs_embedding = options.enable_embedding,
3653                "queued frame for background enrichment"
3654            );
3655        }
3656
3657        for mut chunk_entry in chunk_entries {
3658            chunk_entry.parent_sequence = Some(parent_seq);
3659            let chunk_bytes = encode_to_vec(&WalEntry::Frame(chunk_entry), wal_config())?;
3660            self.append_wal_entry(&chunk_bytes)?;
3661            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3662        }
3663
3664        self.dirty = true;
3665        if self.wal.should_checkpoint() {
3666            self.commit()?;
3667        }
3668
3669        // Record the put action if a replay session is active
3670        #[cfg(feature = "replay")]
3671        if let Some(input_bytes) = payload {
3672            self.record_put_action(parent_seq, input_bytes);
3673        }
3674
3675        // Extract triplets if enabled (default: true)
3676        // Triplets are stored as MemoryCards with entity/slot/value structure
3677        if should_extract_triplets {
3678            if let Some(ref text) = triplet_text {
3679                if !text.trim().is_empty() {
3680                    let extractor = TripletExtractor::default();
3681                    let frame_id = parent_seq as FrameId;
3682                    let (cards, _stats) = extractor.extract(
3683                        frame_id,
3684                        text,
3685                        triplet_uri.as_deref(),
3686                        triplet_title.as_deref(),
3687                        timestamp,
3688                    );
3689
3690                    if !cards.is_empty() {
3691                        // Add cards to memories track
3692                        let card_ids = self.memories_track.add_cards(cards);
3693
3694                        // Record enrichment for incremental processing
3695                        self.memories_track
3696                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3697                    }
3698                }
3699            }
3700        }
3701
3702        Ok(parent_seq)
3703    }
3704}
3705
3706#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3707#[serde(rename_all = "snake_case")]
3708pub(crate) enum FrameWalOp {
3709    Insert,
3710    Tombstone,
3711}
3712
3713impl Default for FrameWalOp {
3714    fn default() -> Self {
3715        Self::Insert
3716    }
3717}
3718
3719#[derive(Debug, Serialize, Deserialize)]
3720enum WalEntry {
3721    Frame(WalEntryData),
3722    #[cfg(feature = "lex")]
3723    Lex(LexWalBatch),
3724}
3725
3726fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3727    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3728        return Ok(entry);
3729    }
3730    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3731    Ok(WalEntry::Frame(legacy))
3732}
3733
3734#[derive(Debug, Serialize, Deserialize)]
3735pub(crate) struct WalEntryData {
3736    pub(crate) timestamp: i64,
3737    pub(crate) kind: Option<String>,
3738    pub(crate) track: Option<String>,
3739    pub(crate) payload: Vec<u8>,
3740    pub(crate) embedding: Option<Vec<f32>>,
3741    #[serde(default)]
3742    pub(crate) uri: Option<String>,
3743    #[serde(default)]
3744    pub(crate) title: Option<String>,
3745    #[serde(default)]
3746    pub(crate) canonical_encoding: CanonicalEncoding,
3747    #[serde(default)]
3748    pub(crate) canonical_length: Option<u64>,
3749    #[serde(default)]
3750    pub(crate) metadata: Option<DocMetadata>,
3751    #[serde(default)]
3752    pub(crate) search_text: Option<String>,
3753    #[serde(default)]
3754    pub(crate) tags: Vec<String>,
3755    #[serde(default)]
3756    pub(crate) labels: Vec<String>,
3757    #[serde(default)]
3758    pub(crate) extra_metadata: BTreeMap<String, String>,
3759    #[serde(default)]
3760    pub(crate) content_dates: Vec<String>,
3761    #[serde(default)]
3762    pub(crate) chunk_manifest: Option<TextChunkManifest>,
3763    #[serde(default)]
3764    pub(crate) role: FrameRole,
3765    #[serde(default)]
3766    pub(crate) parent_sequence: Option<u64>,
3767    #[serde(default)]
3768    pub(crate) chunk_index: Option<u32>,
3769    #[serde(default)]
3770    pub(crate) chunk_count: Option<u32>,
3771    #[serde(default)]
3772    pub(crate) op: FrameWalOp,
3773    #[serde(default)]
3774    pub(crate) target_frame_id: Option<FrameId>,
3775    #[serde(default)]
3776    pub(crate) supersedes_frame_id: Option<FrameId>,
3777    #[serde(default)]
3778    pub(crate) reuse_payload_from: Option<FrameId>,
3779    /// SHA-256 hash of original source file (set when --no-raw is used).
3780    #[serde(default)]
3781    pub(crate) source_sha256: Option<[u8; 32]>,
3782    /// Original source file path (set when --no-raw is used).
3783    #[serde(default)]
3784    pub(crate) source_path: Option<String>,
3785    /// Enrichment state for progressive ingestion.
3786    #[serde(default)]
3787    pub(crate) enrichment_state: crate::types::EnrichmentState,
3788}
3789
3790pub(crate) fn prepare_canonical_payload(
3791    payload: &[u8],
3792) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3793    if std::str::from_utf8(payload).is_ok() {
3794        let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3795        Ok((
3796            compressed,
3797            CanonicalEncoding::Zstd,
3798            Some(payload.len() as u64),
3799        ))
3800    } else {
3801        Ok((
3802            payload.to_vec(),
3803            CanonicalEncoding::Plain,
3804            Some(payload.len() as u64),
3805        ))
3806    }
3807}
3808
3809pub(crate) fn augment_search_text(
3810    base: Option<String>,
3811    uri: Option<&str>,
3812    title: Option<&str>,
3813    track: Option<&str>,
3814    tags: &[String],
3815    labels: &[String],
3816    extra_metadata: &BTreeMap<String, String>,
3817    content_dates: &[String],
3818    metadata: Option<&DocMetadata>,
3819) -> Option<String> {
3820    let mut segments: Vec<String> = Vec::new();
3821    if let Some(text) = base {
3822        let trimmed = text.trim();
3823        if !trimmed.is_empty() {
3824            segments.push(trimmed.to_string());
3825        }
3826    }
3827
3828    if let Some(title) = title {
3829        if !title.trim().is_empty() {
3830            segments.push(format!("title: {}", title.trim()));
3831        }
3832    }
3833
3834    if let Some(uri) = uri {
3835        if !uri.trim().is_empty() {
3836            segments.push(format!("uri: {}", uri.trim()));
3837        }
3838    }
3839
3840    if let Some(track) = track {
3841        if !track.trim().is_empty() {
3842            segments.push(format!("track: {}", track.trim()));
3843        }
3844    }
3845
3846    if !tags.is_empty() {
3847        segments.push(format!("tags: {}", tags.join(" ")));
3848    }
3849
3850    if !labels.is_empty() {
3851        segments.push(format!("labels: {}", labels.join(" ")));
3852    }
3853
3854    if !extra_metadata.is_empty() {
3855        for (key, value) in extra_metadata {
3856            if value.trim().is_empty() {
3857                continue;
3858            }
3859            segments.push(format!("{}: {}", key, value));
3860        }
3861    }
3862
3863    if !content_dates.is_empty() {
3864        segments.push(format!("dates: {}", content_dates.join(" ")));
3865    }
3866
3867    if let Some(meta) = metadata {
3868        if let Ok(meta_json) = serde_json::to_string(meta) {
3869            segments.push(format!("metadata: {}", meta_json));
3870        }
3871    }
3872
3873    if segments.is_empty() {
3874        None
3875    } else {
3876        Some(segments.join("\n"))
3877    }
3878}
3879
3880pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3881    if additions.is_empty() {
3882        return;
3883    }
3884    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3885    for value in additions {
3886        let trimmed = value.trim();
3887        if trimmed.is_empty() {
3888            continue;
3889        }
3890        let candidate = trimmed.to_string();
3891        if seen.insert(candidate.clone()) {
3892            target.push(candidate);
3893        }
3894    }
3895}