Skip to main content

memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58    PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    #[must_use]
210    pub fn new(mode: CommitMode) -> Self {
211        Self {
212            mode,
213            background: false,
214        }
215    }
216
217    #[must_use]
218    pub fn background(mut self, background: bool) -> Self {
219        self.background = background;
220        self
221    }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226    REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230    mime: Option<&str>,
231    magic: Option<&[u8]>,
232    uri: Option<&str>,
233) -> Option<DocumentFormat> {
234    // Check PDF magic bytes first
235    if detect_pdf_magic(magic) {
236        return Some(DocumentFormat::Pdf);
237    }
238
239    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
240    // so we need to check file extension to distinguish them
241    if let Some(magic_bytes) = magic {
242        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243            // It's a ZIP file - check extension to determine OOXML type
244            if let Some(format) = infer_format_from_extension(uri) {
245                return Some(format);
246            }
247        }
248    }
249
250    // Try MIME type
251    if let Some(mime_str) = mime {
252        let mime_lower = mime_str.trim().to_ascii_lowercase();
253        let format = match mime_lower.as_str() {
254            "application/pdf" => Some(DocumentFormat::Pdf),
255            "text/plain" => Some(DocumentFormat::PlainText),
256            "text/markdown" => Some(DocumentFormat::Markdown),
257            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259                Some(DocumentFormat::Docx)
260            }
261            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262                Some(DocumentFormat::Xlsx)
263            }
264            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266                Some(DocumentFormat::Pptx)
267            }
268            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269            _ => None,
270        };
271        if format.is_some() {
272            return format;
273        }
274    }
275
276    // Fall back to extension-based detection
277    infer_format_from_extension(uri)
278}
279
280/// Infer document format from file extension in URI/path
281fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282    let uri = uri?;
283    let path = std::path::Path::new(uri);
284    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285    match ext.as_str() {
286        "pdf" => Some(DocumentFormat::Pdf),
287        "docx" => Some(DocumentFormat::Docx),
288        "xlsx" => Some(DocumentFormat::Xlsx),
289        "xls" => Some(DocumentFormat::Xls),
290        "pptx" => Some(DocumentFormat::Pptx),
291        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294        | "sql" => Some(DocumentFormat::PlainText),
295        "md" | "markdown" => Some(DocumentFormat::Markdown),
296        "html" | "htm" => Some(DocumentFormat::Html),
297        _ => None,
298    }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302    let mut slice = match magic {
303        Some(slice) if !slice.is_empty() => slice,
304        _ => return false,
305    };
306    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307        slice = &slice[3..];
308    }
309    while let Some((first, rest)) = slice.split_first() {
310        if first.is_ascii_whitespace() {
311            slice = rest;
312        } else {
313            break;
314        }
315    }
316    slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320    target = "memvid::extract",
321    skip_all,
322    fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325    bytes: &[u8],
326    mime_hint: Option<&str>,
327    uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329    let registry = default_reader_registry();
330    let magic = bytes
331        .get(..MAGIC_SNIFF_BYTES)
332        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334        .with_uri(uri)
335        .with_magic(magic);
336
337    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338        let start = Instant::now();
339        match reader.extract(bytes, &hint) {
340            Ok(output) => {
341                return Ok(finalize_reader_output(output, start));
342            }
343            Err(err) => {
344                tracing::error!(
345                    target = "memvid::extract",
346                    reader = reader.name(),
347                    error = %err,
348                    "reader failed; falling back"
349                );
350                Some(format!("reader {} failed: {err}", reader.name()))
351            }
352        }
353    } else {
354        tracing::debug!(
355            target = "memvid::extract",
356            format = hint.format.map(super::super::reader::DocumentFormat::label),
357            "no reader matched; using default extractor"
358        );
359        Some("no registered reader matched; using default extractor".to_string())
360    };
361
362    let start = Instant::now();
363    let mut output = PassthroughReader.extract(bytes, &hint)?;
364    if let Some(reason) = fallback_reason {
365        output.diagnostics.track_warning(reason);
366    }
367    Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371    let elapsed = start.elapsed();
372    let ReaderOutput {
373        document,
374        reader_name,
375        diagnostics,
376    } = output;
377    log_reader_result(&reader_name, &diagnostics, elapsed);
378    document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382    let duration_ms = diagnostics
383        .duration_ms
384        .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385    let warnings = diagnostics.warnings.len();
386    let pages = diagnostics.pages_processed;
387
388    if warnings > 0 || diagnostics.fallback {
389        tracing::warn!(
390            target = "memvid::extract",
391            reader,
392            duration_ms,
393            pages,
394            warnings,
395            fallback = diagnostics.fallback,
396            "extraction completed with warnings"
397        );
398        for warning in &diagnostics.warnings {
399            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400        }
401    } else {
402        tracing::info!(
403            target = "memvid::extract",
404            reader,
405            duration_ms,
406            pages,
407            "extraction completed"
408        );
409    }
410}
411
412impl Memvid {
413    // -- Public ingestion entrypoints ---------------------------------------------------------
414
415    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416    where
417        F: FnOnce(&mut Self) -> Result<()>,
418    {
419        self.file.sync_all()?;
420        let mut staging = CommitStaging::prepare(self.path())?;
421        staging.copy_from(&self.file)?;
422
423        let staging_handle = staging.clone_file()?;
424        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425        let original_file = std::mem::replace(&mut self.file, staging_handle);
426        let original_wal = std::mem::replace(&mut self.wal, new_wal);
427        let original_header = self.header.clone();
428        let original_toc = self.toc.clone();
429        let original_data_end = self.data_end;
430        let original_generation = self.generation;
431        let original_dirty = self.dirty;
432        let original_lex_enabled = self.lex_enabled;
433        #[cfg(feature = "lex")]
434        let original_tantivy_dirty = self.tantivy_dirty;
435
436        let destination_path = self.path().to_path_buf();
437        let mut original_file = Some(original_file);
438        let mut original_wal = Some(original_wal);
439
440        match op(self) {
441            Ok(()) => {
442                self.file.sync_all()?;
443                match staging.commit() {
444                    Ok(()) => {
445                        drop(original_file.take());
446                        drop(original_wal.take());
447                        self.file = OpenOptions::new()
448                            .read(true)
449                            .write(true)
450                            .open(&destination_path)?;
451                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
452                        Ok(())
453                    }
454                    Err(commit_err) => {
455                        if let Some(file) = original_file.take() {
456                            self.file = file;
457                        }
458                        if let Some(wal) = original_wal.take() {
459                            self.wal = wal;
460                        }
461                        self.header = original_header;
462                        self.toc = original_toc;
463                        self.data_end = original_data_end;
464                        self.generation = original_generation;
465                        self.dirty = original_dirty;
466                        self.lex_enabled = original_lex_enabled;
467                        #[cfg(feature = "lex")]
468                        {
469                            self.tantivy_dirty = original_tantivy_dirty;
470                        }
471                        Err(commit_err)
472                    }
473                }
474            }
475            Err(err) => {
476                let _ = staging.discard();
477                if let Some(file) = original_file.take() {
478                    self.file = file;
479                }
480                if let Some(wal) = original_wal.take() {
481                    self.wal = wal;
482                }
483                self.header = original_header;
484                self.toc = original_toc;
485                self.data_end = original_data_end;
486                self.generation = original_generation;
487                self.dirty = original_dirty;
488                self.lex_enabled = original_lex_enabled;
489                #[cfg(feature = "lex")]
490                {
491                    self.tantivy_dirty = original_tantivy_dirty;
492                }
493                Err(err)
494            }
495        }
496    }
497
498    pub(crate) fn catalog_data_end(&self) -> u64 {
499        let mut max_end = self.header.wal_offset + self.header.wal_size;
500
501        for descriptor in &self.toc.segment_catalog.lex_segments {
502            if descriptor.common.bytes_length == 0 {
503                continue;
504            }
505            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
506        }
507
508        for descriptor in &self.toc.segment_catalog.vec_segments {
509            if descriptor.common.bytes_length == 0 {
510                continue;
511            }
512            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
513        }
514
515        for descriptor in &self.toc.segment_catalog.time_segments {
516            if descriptor.common.bytes_length == 0 {
517                continue;
518            }
519            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
520        }
521
522        #[cfg(feature = "temporal_track")]
523        for descriptor in &self.toc.segment_catalog.temporal_segments {
524            if descriptor.common.bytes_length == 0 {
525                continue;
526            }
527            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
528        }
529
530        #[cfg(feature = "lex")]
531        for descriptor in &self.toc.segment_catalog.tantivy_segments {
532            if descriptor.common.bytes_length == 0 {
533                continue;
534            }
535            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
536        }
537
538        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
539            if manifest.bytes_length != 0 {
540                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
541            }
542        }
543
544        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
545            if manifest.bytes_length != 0 {
546                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
547            }
548        }
549
550        if let Some(manifest) = self.toc.time_index.as_ref() {
551            if manifest.bytes_length != 0 {
552                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
553            }
554        }
555
556        #[cfg(feature = "temporal_track")]
557        if let Some(track) = self.toc.temporal_track.as_ref() {
558            if track.bytes_length != 0 {
559                max_end = max_end.max(track.bytes_offset + track.bytes_length);
560            }
561        }
562
563        max_end
564    }
565
566    fn payload_region_end(&self) -> u64 {
567        self.cached_payload_end
568    }
569
570    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
571        loop {
572            match self.wal.append_entry(payload) {
573                Ok(seq) => return Ok(seq),
574                Err(MemvidError::CheckpointFailed { reason })
575                    if reason == "embedded WAL region too small for entry"
576                        || reason == "embedded WAL region full" =>
577                {
578                    // WAL is either too small for this entry or full with pending entries.
579                    // Grow the WAL to accommodate - doubling ensures we have space.
580                    let required = WAL_ENTRY_HEADER_SIZE
581                        .saturating_add(payload.len() as u64)
582                        .max(self.header.wal_size + 1);
583                    self.grow_wal_region(required)?;
584                }
585                Err(err) => return Err(err),
586            }
587        }
588    }
589
590    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
591        let mut new_size = self.header.wal_size;
592        let mut target = required_entry_size;
593        if target == 0 {
594            target = self.header.wal_size;
595        }
596        while new_size <= target {
597            new_size = new_size
598                .checked_mul(2)
599                .ok_or_else(|| MemvidError::CheckpointFailed {
600                    reason: "wal_size overflow".into(),
601                })?;
602        }
603        let delta = new_size - self.header.wal_size;
604        if delta == 0 {
605            return Ok(());
606        }
607
608        self.shift_data_for_wal_growth(delta)?;
609        self.header.wal_size = new_size;
610        self.apply_wal_growth_offsets(delta);
611
612        let catalog_end = self.catalog_data_end();
613        self.header.footer_offset = catalog_end
614            .max(self.header.footer_offset)
615            .max(self.data_end);
616
617        self.rewrite_toc_footer()?;
618        self.header.toc_checksum = self.toc.toc_checksum;
619        crate::persist_header(&mut self.file, &self.header)?;
620        self.file.sync_all()?;
621        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
622        Ok(())
623    }
624
625    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
626        if delta == 0 {
627            return Ok(());
628        }
629        let original_len = self.file.metadata()?.len();
630        let data_start = self.header.wal_offset + self.header.wal_size;
631        self.file.set_len(original_len + delta)?;
632
633        let mut remaining = original_len.saturating_sub(data_start);
634        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
635        while remaining > 0 {
636            let chunk = min(remaining, buffer.len() as u64);
637            let src = data_start + remaining - chunk;
638            self.file.seek(SeekFrom::Start(src))?;
639            #[allow(clippy::cast_possible_truncation)]
640            self.file.read_exact(&mut buffer[..chunk as usize])?;
641            let dst = src + delta;
642            self.file.seek(SeekFrom::Start(dst))?;
643            #[allow(clippy::cast_possible_truncation)]
644            self.file.write_all(&buffer[..chunk as usize])?;
645            remaining -= chunk;
646        }
647
648        self.file.seek(SeekFrom::Start(data_start))?;
649        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
650        let mut remaining = delta;
651        while remaining > 0 {
652            let write = min(remaining, zero_buf.len() as u64);
653            #[allow(clippy::cast_possible_truncation)]
654            self.file.write_all(&zero_buf[..write as usize])?;
655            remaining -= write;
656        }
657        Ok(())
658    }
659
660    fn apply_wal_growth_offsets(&mut self, delta: u64) {
661        if delta == 0 {
662            return;
663        }
664
665        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
666        self.data_end = self.data_end.saturating_add(delta);
667        // `cached_payload_end` mirrors `data_end` / footer positioning. After
668        // `shift_data_for_wal_growth` every byte past the old WAL boundary
669        // moved right by `delta`, so the cached payload boundary must also
670        // advance. Forgetting this caused `rebuild_indexes` (which seeks to
671        // `payload_region_end()`) to write embedded indexes back into the
672        // grown portion of the WAL region, corrupting WAL record payloads.
673        // See https://github.com/memvid/memvid/issues/230.
674        self.cached_payload_end = self.cached_payload_end.saturating_add(delta);
675        self.adjust_offsets_after_wal_growth(delta);
676    }
677
678    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
679        if delta == 0 {
680            return;
681        }
682
683        for frame in &mut self.toc.frames {
684            if frame.payload_offset != 0 {
685                frame.payload_offset += delta;
686            }
687        }
688
689        for segment in &mut self.toc.segments {
690            if segment.bytes_offset != 0 {
691                segment.bytes_offset += delta;
692            }
693        }
694
695        if let Some(lex) = self.toc.indexes.lex.as_mut() {
696            if lex.bytes_offset != 0 {
697                lex.bytes_offset += delta;
698            }
699        }
700        for manifest in &mut self.toc.indexes.lex_segments {
701            if manifest.bytes_offset != 0 {
702                manifest.bytes_offset += delta;
703            }
704        }
705        if let Some(vec) = self.toc.indexes.vec.as_mut() {
706            if vec.bytes_offset != 0 {
707                vec.bytes_offset += delta;
708            }
709        }
710        if let Some(time_index) = self.toc.time_index.as_mut() {
711            if time_index.bytes_offset != 0 {
712                time_index.bytes_offset += delta;
713            }
714        }
715        #[cfg(feature = "temporal_track")]
716        if let Some(track) = self.toc.temporal_track.as_mut() {
717            if track.bytes_offset != 0 {
718                track.bytes_offset += delta;
719            }
720        }
721
722        let catalog = &mut self.toc.segment_catalog;
723        for descriptor in &mut catalog.lex_segments {
724            if descriptor.common.bytes_offset != 0 {
725                descriptor.common.bytes_offset += delta;
726            }
727        }
728        for descriptor in &mut catalog.vec_segments {
729            if descriptor.common.bytes_offset != 0 {
730                descriptor.common.bytes_offset += delta;
731            }
732        }
733        for descriptor in &mut catalog.time_segments {
734            if descriptor.common.bytes_offset != 0 {
735                descriptor.common.bytes_offset += delta;
736            }
737        }
738        #[cfg(feature = "temporal_track")]
739        for descriptor in &mut catalog.temporal_segments {
740            if descriptor.common.bytes_offset != 0 {
741                descriptor.common.bytes_offset += delta;
742            }
743        }
744        for descriptor in &mut catalog.tantivy_segments {
745            if descriptor.common.bytes_offset != 0 {
746                descriptor.common.bytes_offset += delta;
747            }
748        }
749
750        #[cfg(feature = "lex")]
751        if let Ok(mut storage) = self.lex_storage.write() {
752            storage.adjust_offsets(delta);
753        }
754    }
755    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
756        self.ensure_writable()?;
757        if options.background {
758            tracing::debug!("commit background flag ignored; running synchronously");
759        }
760        let mode = options.mode;
761        let records = self.wal.pending_records()?;
762        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
763            return Ok(());
764        }
765        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
766    }
767
768    pub fn commit(&mut self) -> Result<()> {
769        self.ensure_writable()?;
770        self.commit_with_options(CommitOptions::new(CommitMode::Full))
771    }
772
773    /// Enter batch mode for high-throughput ingestion.
774    ///
775    /// While batch mode is active:
776    /// - WAL fsync is skipped on every append (controlled by `opts.skip_sync`)
777    /// - Auto-checkpoint is suppressed (controlled by `opts.disable_auto_checkpoint`)
778    /// - Compression level is lowered (controlled by `opts.compression_level`)
779    /// - WAL is pre-sized to avoid expensive mid-batch growth (controlled by `opts.wal_pre_size_bytes`)
780    ///
781    /// **You must call [`end_batch()`](Self::end_batch) when done** to flush the WAL
782    /// and restore normal operation.
783    pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
784        if opts.wal_pre_size_bytes > 0 {
785            self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
786        }
787        self.wal.set_skip_sync(opts.skip_sync);
788        self.batch_opts = Some(opts);
789        Ok(())
790    }
791
792    /// Pre-grow the embedded WAL to at least `min_bytes` in a single operation.
793    ///
794    /// When `disable_auto_checkpoint` is true, all WAL entries accumulate for
795    /// the entire batch.  If the region is too small it must grow repeatedly,
796    /// and every growth shifts **all** payload data — O(file_size) per shift.
797    ///
798    /// Calling this once before the batch eliminates all mid-batch shifts.
799    /// On a freshly-created file the shift is essentially free (no data to move).
800    fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
801        if min_bytes <= self.header.wal_size {
802            return Ok(());
803        }
804        // Jump directly to the target size (next power of two for alignment)
805        let target = min_bytes.next_power_of_two();
806        let delta = target.saturating_sub(self.header.wal_size);
807        if delta == 0 {
808            return Ok(());
809        }
810
811        tracing::info!(
812            current_wal = self.header.wal_size,
813            target_wal = target,
814            delta,
815            "pre-sizing WAL for batch mode"
816        );
817
818        self.shift_data_for_wal_growth(delta)?;
819        self.header.wal_size = target;
820        self.apply_wal_growth_offsets(delta);
821
822        let catalog_end = self.catalog_data_end();
823        self.header.footer_offset = catalog_end
824            .max(self.header.footer_offset)
825            .max(self.data_end);
826
827        self.rewrite_toc_footer()?;
828        self.header.toc_checksum = self.toc.toc_checksum;
829        crate::persist_header(&mut self.file, &self.header)?;
830        self.file.sync_all()?;
831        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
832        Ok(())
833    }
834
835    /// Exit batch mode, flushing the WAL and restoring per-entry fsync.
836    ///
837    /// This performs a single `fsync` for all appends accumulated during the batch,
838    /// then clears batch options so subsequent puts use default behaviour.
839    pub fn end_batch(&mut self) -> Result<()> {
840        // Single fsync for the entire batch
841        self.wal.flush()?;
842        self.wal.set_skip_sync(false);
843        self.batch_opts = None;
844        Ok(())
845    }
846
847    /// Commit pending WAL records without rebuilding any indexes.
848    ///
849    /// Optimized for bulk ingestion: payloads and frame metadata are persisted,
850    /// but time index, Tantivy, vec index, and other index structures are NOT
851    /// rebuilt. The caller must do a full `commit()` (which triggers
852    /// `rebuild_indexes()`) after all batches are written.
853    ///
854    /// Skips the staging lock for performance — not crash-safe.
855    pub fn commit_skip_indexes(&mut self) -> Result<()> {
856        self.ensure_writable()?;
857        let records = self.wal.pending_records()?;
858        if records.is_empty() && !self.dirty {
859            return Ok(());
860        }
861        self.commit_skip_indexes_inner(records)
862    }
863
864    fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
865        self.generation = self.generation.wrapping_add(1);
866
867        // Temporarily remove Tantivy engine to avoid per-frame indexing work
868        // and disk reads in apply_records(). We won't persist Tantivy state anyway.
869        #[cfg(feature = "lex")]
870        let tantivy_backup = self.tantivy.take();
871
872        let result = self.apply_records(records);
873
874        // Restore Tantivy engine (unchanged — no dirty frames added)
875        #[cfg(feature = "lex")]
876        {
877            self.tantivy = tantivy_backup;
878            self.tantivy_dirty = false;
879        }
880
881        let _delta = result?;
882
883        // Set footer_offset to right after payloads (no index data written)
884        self.header.footer_offset = self.data_end;
885
886        // Clear stale index manifests so next open() doesn't try to load
887        // garbage data. Indexes will be rebuilt in the finalize step.
888        self.toc.time_index = None;
889        self.toc.indexes.lex_segments.clear();
890        // Preserve vec manifest (holds dimension info) but zero out data pointers
891        if let Some(vec) = self.toc.indexes.vec.as_mut() {
892            vec.bytes_offset = 0;
893            vec.bytes_length = 0;
894            vec.vector_count = 0;
895        }
896        self.toc.indexes.lex = None;
897        self.toc.indexes.clip = None;
898        self.toc.segment_catalog.lex_segments.clear();
899        self.toc.segment_catalog.vec_segments.clear();
900        self.toc.segment_catalog.time_segments.clear();
901        self.toc.segment_catalog.tantivy_segments.clear();
902        #[cfg(feature = "temporal_track")]
903        {
904            self.toc.temporal_track = None;
905            self.toc.segment_catalog.temporal_segments.clear();
906        }
907        self.toc.memories_track = None;
908        self.toc.logic_mesh = None;
909        self.toc.sketch_track = None;
910
911        self.rewrite_toc_footer()?;
912        self.header.toc_checksum = self.toc.toc_checksum;
913        self.wal.record_checkpoint(&mut self.header)?;
914        self.header.toc_checksum = self.toc.toc_checksum;
915        crate::persist_header(&mut self.file, &self.header)?;
916        self.file.sync_all()?;
917        self.pending_frame_inserts = 0;
918        self.dirty = false;
919        Ok(())
920    }
921
922    /// Rebuild all indexes (time, Tantivy, vec) and persist the TOC.
923    ///
924    /// Use after bulk ingestion with `commit_skip_indexes()` to build
925    /// all search indexes in one O(n) pass. This is the complement to
926    /// `commit_skip_indexes()` — call it once after all batches are done.
927    pub fn finalize_indexes(&mut self) -> Result<()> {
928        self.ensure_writable()?;
929        self.rebuild_indexes(&[], &[])?;
930        self.rewrite_toc_footer()?;
931        self.header.toc_checksum = self.toc.toc_checksum;
932        crate::persist_header(&mut self.file, &self.header)?;
933        self.file.sync_all()?;
934        Ok(())
935    }
936
937    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
938        self.generation = self.generation.wrapping_add(1);
939
940        let delta = self.apply_records(records)?;
941        let mut indexes_rebuilt = false;
942
943        // Check if CLIP index has pending embeddings that need to be persisted
944        let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
945
946        if !delta.is_empty() || clip_needs_persist {
947            tracing::debug!(
948                inserted_frames = delta.inserted_frames.len(),
949                inserted_embeddings = delta.inserted_embeddings.len(),
950                inserted_time_entries = delta.inserted_time_entries.len(),
951                clip_needs_persist = clip_needs_persist,
952                "commit applied delta"
953            );
954            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
955            indexes_rebuilt = true;
956        }
957
958        if !indexes_rebuilt && self.tantivy_index_pending() {
959            self.flush_tantivy()?;
960        }
961
962        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
963        if !indexes_rebuilt && self.clip_enabled {
964            if let Some(ref clip_index) = self.clip_index {
965                if !clip_index.is_empty() {
966                    self.persist_clip_index()?;
967                }
968            }
969        }
970
971        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
972        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
973            self.persist_memories_track()?;
974        }
975
976        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
977        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
978            self.persist_logic_mesh()?;
979        }
980
981        // Persist sketch track if it has entries
982        if !self.sketch_track.is_empty() {
983            self.persist_sketch_track()?;
984        }
985
986        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
987        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
988
989        self.rewrite_toc_footer()?;
990        self.header.toc_checksum = self.toc.toc_checksum;
991        self.wal.record_checkpoint(&mut self.header)?;
992        self.header.toc_checksum = self.toc.toc_checksum;
993        crate::persist_header(&mut self.file, &self.header)?;
994        self.file.sync_all()?;
995        #[cfg(feature = "parallel_segments")]
996        if let Some(wal) = self.manifest_wal.as_mut() {
997            wal.flush()?;
998            wal.truncate()?;
999        }
1000        self.pending_frame_inserts = 0;
1001        self.dirty = false;
1002        Ok(())
1003    }
1004
1005    #[cfg(feature = "parallel_segments")]
1006    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
1007        self.ensure_writable()?;
1008        if !self.dirty && !self.tantivy_index_pending() {
1009            return Ok(());
1010        }
1011        let opts = opts.clone();
1012        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
1013    }
1014
1015    #[cfg(feature = "parallel_segments")]
1016    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1017        if !self.dirty && !self.tantivy_index_pending() {
1018            return Ok(());
1019        }
1020        let records = self.wal.pending_records()?;
1021        let delta = self.apply_records(records)?;
1022        self.generation = self.generation.wrapping_add(1);
1023        let mut indexes_rebuilt = false;
1024        if !delta.is_empty() {
1025            tracing::info!(
1026                inserted_frames = delta.inserted_frames.len(),
1027                inserted_embeddings = delta.inserted_embeddings.len(),
1028                inserted_time_entries = delta.inserted_time_entries.len(),
1029                "parallel commit applied delta"
1030            );
1031            // Try to use parallel segment builder first
1032            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1033            tracing::info!(
1034                "parallel_commit: used_parallel={}, lex_enabled={}",
1035                used_parallel,
1036                self.lex_enabled
1037            );
1038            if used_parallel {
1039                // Segments were written at data_end; update footer_offset so
1040                // rewrite_toc_footer places the TOC after the new segment data
1041                self.header.footer_offset = self.data_end;
1042                indexes_rebuilt = true;
1043
1044                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
1045                // Only add new frames from the delta, not all frames.
1046                #[cfg(feature = "lex")]
1047                if self.lex_enabled {
1048                    tracing::info!(
1049                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1050                        delta.inserted_frames.len(),
1051                        self.toc.frames.len()
1052                    );
1053
1054                    // If Tantivy engine already exists, frames were already indexed during put()
1055                    // (at the add_frame call in put_bytes_with_options). Skip re-indexing to avoid
1056                    // duplicates. Only initialize and index if Tantivy wasn't present during put.
1057                    let tantivy_was_present = self.tantivy.is_some();
1058                    if self.tantivy.is_none() {
1059                        self.init_tantivy()?;
1060                    }
1061
1062                    // Skip indexing if Tantivy was already present - frames were indexed during put
1063                    if tantivy_was_present {
1064                        tracing::info!(
1065                            "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1066                        );
1067                    } else {
1068                        // First, collect all frames and their search text (to avoid borrow conflicts)
1069                        let max_payload = crate::memvid::search::max_index_payload();
1070                        let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1071
1072                        for frame_id in &delta.inserted_frames {
1073                            // Look up the actual Frame from the TOC
1074                            let frame = match self.toc.frames.get(*frame_id as usize) {
1075                                Some(f) => f.clone(),
1076                                None => continue,
1077                            };
1078
1079                            // Check if frame has explicit search_text first - clone it for ownership
1080                            let explicit_text = frame.search_text.clone();
1081                            if let Some(ref search_text) = explicit_text {
1082                                if !search_text.trim().is_empty() {
1083                                    prepared_docs.push((frame, search_text.clone()));
1084                                    continue;
1085                                }
1086                            }
1087
1088                            // Get MIME type and check if text-indexable
1089                            let mime = frame
1090                                .metadata
1091                                .as_ref()
1092                                .and_then(|m| m.mime.as_deref())
1093                                .unwrap_or("application/octet-stream");
1094
1095                            if !crate::memvid::search::is_text_indexable_mime(mime) {
1096                                continue;
1097                            }
1098
1099                            if frame.payload_length > max_payload {
1100                                continue;
1101                            }
1102
1103                            let text = self.frame_search_text(&frame)?;
1104                            if !text.trim().is_empty() {
1105                                prepared_docs.push((frame, text));
1106                            }
1107                        }
1108
1109                        // Now add to Tantivy engine (no borrow conflict)
1110                        if let Some(ref mut engine) = self.tantivy {
1111                            for (frame, text) in &prepared_docs {
1112                                engine.add_frame(frame, text)?;
1113                            }
1114
1115                            if !prepared_docs.is_empty() {
1116                                engine.commit()?;
1117                                self.tantivy_dirty = true;
1118                            }
1119
1120                            tracing::info!(
1121                                "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1122                                prepared_docs.len(),
1123                                engine.num_docs()
1124                            );
1125                        } else {
1126                            tracing::warn!(
1127                                "parallel_commit: Tantivy engine is None after init_tantivy"
1128                            );
1129                        }
1130                    } // end of else !tantivy_was_present
1131                }
1132
1133                // Time index stores all entries together for timeline queries.
1134                // Unlike Tantivy which is incremental, time index needs full rebuild.
1135                self.file.seek(SeekFrom::Start(self.data_end))?;
1136                let mut time_entries: Vec<TimeIndexEntry> = self
1137                    .toc
1138                    .frames
1139                    .iter()
1140                    .filter(|frame| {
1141                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1142                    })
1143                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1144                    .collect();
1145                let (ti_offset, ti_length, ti_checksum) =
1146                    time_index_append(&mut self.file, &mut time_entries)?;
1147                self.toc.time_index = Some(TimeIndexManifest {
1148                    bytes_offset: ti_offset,
1149                    bytes_length: ti_length,
1150                    entry_count: time_entries.len() as u64,
1151                    checksum: ti_checksum,
1152                });
1153                // Update data_end to account for the newly written time index
1154                self.data_end = ti_offset + ti_length;
1155                self.header.footer_offset = self.data_end;
1156                tracing::info!(
1157                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1158                    ti_offset,
1159                    ti_length,
1160                    time_entries.len()
1161                );
1162            } else {
1163                // Fall back to sequential rebuild if no segments were generated
1164                self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1165                indexes_rebuilt = true;
1166            }
1167        }
1168
1169        // Flush Tantivy index if dirty (from parallel path or pending updates)
1170        #[cfg(feature = "lex")]
1171        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1172            self.flush_tantivy()?;
1173        }
1174
1175        // Persist CLIP index if it has embeddings
1176        if self.clip_enabled {
1177            if let Some(ref clip_index) = self.clip_index {
1178                if !clip_index.is_empty() {
1179                    self.persist_clip_index()?;
1180                }
1181            }
1182        }
1183
1184        // Persist memories track if it has cards
1185        if self.memories_track.card_count() > 0 {
1186            self.persist_memories_track()?;
1187        }
1188
1189        // Persist logic mesh if it has nodes
1190        if !self.logic_mesh.is_empty() {
1191            self.persist_logic_mesh()?;
1192        }
1193
1194        // Persist sketch track if it has entries
1195        if !self.sketch_track.is_empty() {
1196            self.persist_sketch_track()?;
1197        }
1198
1199        // flush_tantivy() has already set footer_offset correctly
1200        // DO NOT overwrite with catalog_data_end()
1201        self.rewrite_toc_footer()?;
1202        self.header.toc_checksum = self.toc.toc_checksum;
1203        self.wal.record_checkpoint(&mut self.header)?;
1204        self.header.toc_checksum = self.toc.toc_checksum;
1205        crate::persist_header(&mut self.file, &self.header)?;
1206        self.file.sync_all()?;
1207        if let Some(wal) = self.manifest_wal.as_mut() {
1208            wal.flush()?;
1209            wal.truncate()?;
1210        }
1211        self.pending_frame_inserts = 0;
1212        self.dirty = false;
1213        Ok(())
1214    }
1215
1216    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1217        let records = self.wal.records_after(self.header.wal_sequence)?;
1218        if records.is_empty() {
1219            if self.tantivy_index_pending() {
1220                self.flush_tantivy()?;
1221            }
1222            return Ok(());
1223        }
1224        let delta = self.apply_records(records)?;
1225        if !delta.is_empty() {
1226            tracing::debug!(
1227                inserted_frames = delta.inserted_frames.len(),
1228                inserted_embeddings = delta.inserted_embeddings.len(),
1229                inserted_time_entries = delta.inserted_time_entries.len(),
1230                "recover applied delta"
1231            );
1232            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1233        } else if self.tantivy_index_pending() {
1234            self.flush_tantivy()?;
1235        }
1236        self.wal.record_checkpoint(&mut self.header)?;
1237        crate::persist_header(&mut self.file, &self.header)?;
1238        if !delta.is_empty() {
1239            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1240        } else if self.tantivy_index_pending() {
1241            self.flush_tantivy()?;
1242            crate::persist_header(&mut self.file, &self.header)?;
1243        }
1244        self.file.sync_all()?;
1245        self.pending_frame_inserts = 0;
1246        self.dirty = false;
1247        Ok(())
1248    }
1249
1250    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1251        let mut delta = IngestionDelta::default();
1252        if records.is_empty() {
1253            return Ok(delta);
1254        }
1255
1256        // Use data_end instead of payload_region_end to avoid overwriting
1257        // vec/lex/time segments that were written after the payload region.
1258        // payload_region_end() only considers frame payloads, but data_end tracks
1259        // all data including index segments.
1260        let mut data_cursor = self.data_end;
1261        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1262
1263        if !records.is_empty() {
1264            self.file.seek(SeekFrom::Start(data_cursor))?;
1265            for record in records {
1266                let mut entry = match decode_wal_entry(&record.payload)? {
1267                    WalEntry::Frame(entry) => entry,
1268                    #[cfg(feature = "lex")]
1269                    WalEntry::Lex(batch) => {
1270                        self.apply_lex_wal(batch)?;
1271                        continue;
1272                    }
1273                };
1274
1275                match entry.op {
1276                    FrameWalOp::Insert => {
1277                        let frame_id = self.toc.frames.len() as u64;
1278
1279                        let (
1280                            payload_offset,
1281                            payload_length,
1282                            checksum_bytes,
1283                            canonical_length_value,
1284                        ) = if let Some(source_id) = entry.reuse_payload_from {
1285                            if !entry.payload.is_empty() {
1286                                return Err(MemvidError::InvalidFrame {
1287                                    frame_id: source_id,
1288                                    reason: "reused payload entry contained inline bytes",
1289                                });
1290                            }
1291                            let source_idx = usize::try_from(source_id).map_err(|_| {
1292                                MemvidError::InvalidFrame {
1293                                    frame_id: source_id,
1294                                    reason: "frame id too large for memory",
1295                                }
1296                            })?;
1297                            let source = self.toc.frames.get(source_idx).cloned().ok_or(
1298                                MemvidError::InvalidFrame {
1299                                    frame_id: source_id,
1300                                    reason: "reused payload source missing",
1301                                },
1302                            )?;
1303                            (
1304                                source.payload_offset,
1305                                source.payload_length,
1306                                source.checksum,
1307                                entry
1308                                    .canonical_length
1309                                    .or(source.canonical_length)
1310                                    .unwrap_or(source.payload_length),
1311                            )
1312                        } else {
1313                            self.file.seek(SeekFrom::Start(data_cursor))?;
1314                            self.file.write_all(&entry.payload)?;
1315                            let checksum = hash(&entry.payload);
1316                            let payload_length = entry.payload.len() as u64;
1317                            let canonical_length =
1318                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1319                                    if let Some(len) = entry.canonical_length {
1320                                        len
1321                                    } else {
1322                                        let decoded = crate::decode_canonical_bytes(
1323                                            &entry.payload,
1324                                            CanonicalEncoding::Zstd,
1325                                            frame_id,
1326                                        )?;
1327                                        decoded.len() as u64
1328                                    }
1329                                } else {
1330                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1331                                };
1332                            let payload_offset = data_cursor;
1333                            data_cursor += payload_length;
1334                            // Keep cached_payload_end in sync (monotonically increasing)
1335                            self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1336                            (
1337                                payload_offset,
1338                                payload_length,
1339                                *checksum.as_bytes(),
1340                                canonical_length,
1341                            )
1342                        };
1343
1344                        let uri = entry
1345                            .uri
1346                            .clone()
1347                            .unwrap_or_else(|| crate::default_uri(frame_id));
1348                        let title = entry
1349                            .title
1350                            .clone()
1351                            .or_else(|| crate::infer_title_from_uri(&uri));
1352
1353                        #[cfg(feature = "temporal_track")]
1354                        let (anchor_ts, anchor_source) =
1355                            self.determine_temporal_anchor(entry.timestamp);
1356
1357                        let mut frame = Frame {
1358                            id: frame_id,
1359                            timestamp: entry.timestamp,
1360                            anchor_ts: {
1361                                #[cfg(feature = "temporal_track")]
1362                                {
1363                                    Some(anchor_ts)
1364                                }
1365                                #[cfg(not(feature = "temporal_track"))]
1366                                {
1367                                    None
1368                                }
1369                            },
1370                            anchor_source: {
1371                                #[cfg(feature = "temporal_track")]
1372                                {
1373                                    Some(anchor_source)
1374                                }
1375                                #[cfg(not(feature = "temporal_track"))]
1376                                {
1377                                    None
1378                                }
1379                            },
1380                            kind: entry.kind.clone(),
1381                            track: entry.track.clone(),
1382                            payload_offset,
1383                            payload_length,
1384                            checksum: checksum_bytes,
1385                            uri: Some(uri),
1386                            title,
1387                            canonical_encoding: entry.canonical_encoding,
1388                            canonical_length: Some(canonical_length_value),
1389                            metadata: entry.metadata.clone(),
1390                            search_text: entry.search_text.clone(),
1391                            tags: entry.tags.clone(),
1392                            labels: entry.labels.clone(),
1393                            extra_metadata: entry.extra_metadata.clone(),
1394                            content_dates: entry.content_dates.clone(),
1395                            chunk_manifest: entry.chunk_manifest.clone(),
1396                            role: entry.role,
1397                            parent_id: None,
1398                            chunk_index: entry.chunk_index,
1399                            chunk_count: entry.chunk_count,
1400                            status: FrameStatus::Active,
1401                            supersedes: entry.supersedes_frame_id,
1402                            superseded_by: None,
1403                            source_sha256: entry.source_sha256,
1404                            source_path: entry.source_path.clone(),
1405                            enrichment_state: entry.enrichment_state,
1406                        };
1407
1408                        if let Some(parent_seq) = entry.parent_sequence {
1409                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1410                                frame.parent_id = Some(*parent_frame_id);
1411                            } else {
1412                                // Parent sequence not found in current batch - this can happen
1413                                // if parent was committed in a previous batch. Try to find parent
1414                                // by looking at recently inserted frames with matching characteristics.
1415                                // The parent should be the most recent Document frame that has
1416                                // a chunk_manifest matching this chunk's expected parent.
1417                                if entry.role == FrameRole::DocumentChunk {
1418                                    // Look backwards through recently inserted frames
1419                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1420                                        if let Ok(idx) = usize::try_from(candidate_id) {
1421                                            if let Some(candidate) = self.toc.frames.get(idx) {
1422                                                if candidate.role == FrameRole::Document
1423                                                    && candidate.chunk_manifest.is_some()
1424                                                {
1425                                                    // Found a parent document - use it
1426                                                    frame.parent_id = Some(candidate_id);
1427                                                    tracing::debug!(
1428                                                        chunk_frame_id = frame_id,
1429                                                        parent_frame_id = candidate_id,
1430                                                        parent_seq = parent_seq,
1431                                                        "resolved chunk parent via fallback"
1432                                                    );
1433                                                    break;
1434                                                }
1435                                            }
1436                                        }
1437                                    }
1438                                }
1439                                if frame.parent_id.is_none() {
1440                                    tracing::warn!(
1441                                        chunk_frame_id = frame_id,
1442                                        parent_seq = parent_seq,
1443                                        "chunk has parent_sequence but parent not found in batch"
1444                                    );
1445                                }
1446                            }
1447                        }
1448
1449                        #[cfg(feature = "lex")]
1450                        let index_text = if self.tantivy.is_some() {
1451                            if let Some(text) = entry.search_text.clone() {
1452                                if text.trim().is_empty() {
1453                                    None
1454                                } else {
1455                                    Some(text)
1456                                }
1457                            } else {
1458                                Some(self.frame_content(&frame)?)
1459                            }
1460                        } else {
1461                            None
1462                        };
1463                        #[cfg(feature = "lex")]
1464                        if let (Some(engine), Some(text)) =
1465                            (self.tantivy.as_mut(), index_text.as_ref())
1466                        {
1467                            engine.add_frame(&frame, text)?;
1468                            self.tantivy_dirty = true;
1469
1470                            // Generate sketch for fast candidate pre-filtering
1471                            // Uses the same text as tantivy indexing for consistency
1472                            if !text.trim().is_empty() {
1473                                let entry = crate::types::generate_sketch(
1474                                    frame_id,
1475                                    text,
1476                                    crate::types::SketchVariant::Small,
1477                                    None,
1478                                );
1479                                self.sketch_track.insert(entry);
1480                            }
1481                        }
1482
1483                        if let Some(embedding) = entry.embedding.take() {
1484                            delta
1485                                .inserted_embeddings
1486                                .push((frame_id, embedding.clone()));
1487                        }
1488
1489                        if entry.role == FrameRole::Document {
1490                            delta
1491                                .inserted_time_entries
1492                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1493                            #[cfg(feature = "temporal_track")]
1494                            {
1495                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1496                                    frame_id,
1497                                    anchor_ts,
1498                                    anchor_source,
1499                                ));
1500                                delta.inserted_temporal_mentions.extend(
1501                                    Self::collect_temporal_mentions(
1502                                        entry.search_text.as_deref(),
1503                                        frame_id,
1504                                        anchor_ts,
1505                                    ),
1506                                );
1507                            }
1508                        }
1509
1510                        if let Some(predecessor) = frame.supersedes {
1511                            self.mark_frame_superseded(predecessor, frame_id)?;
1512                        }
1513
1514                        self.toc.frames.push(frame);
1515                        delta.inserted_frames.push(frame_id);
1516                        sequence_to_frame.insert(record.sequence, frame_id);
1517                    }
1518                    FrameWalOp::Tombstone => {
1519                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1520                            frame_id: 0,
1521                            reason: "tombstone missing frame reference",
1522                        })?;
1523                        self.mark_frame_deleted(target)?;
1524                        delta.mutated_frames = true;
1525                    }
1526                }
1527            }
1528            self.data_end = self.data_end.max(data_cursor);
1529        }
1530
1531        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1532        // This handles edge cases where chunks couldn't be linked during the first pass.
1533        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1534        let orphan_resolutions: Vec<(u64, u64)> = delta
1535            .inserted_frames
1536            .iter()
1537            .filter_map(|&frame_id| {
1538                let idx = usize::try_from(frame_id).ok()?;
1539                let frame = self.toc.frames.get(idx)?;
1540                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1541                    return None;
1542                }
1543                // Find the most recent Document frame before this chunk that has a manifest
1544                for candidate_id in (0..frame_id).rev() {
1545                    if let Ok(idx) = usize::try_from(candidate_id) {
1546                        if let Some(candidate) = self.toc.frames.get(idx) {
1547                            if candidate.role == FrameRole::Document
1548                                && candidate.chunk_manifest.is_some()
1549                                && candidate.status == FrameStatus::Active
1550                            {
1551                                return Some((frame_id, candidate_id));
1552                            }
1553                        }
1554                    }
1555                }
1556                None
1557            })
1558            .collect();
1559
1560        // Now apply the resolutions
1561        for (chunk_id, parent_id) in orphan_resolutions {
1562            if let Ok(idx) = usize::try_from(chunk_id) {
1563                if let Some(frame) = self.toc.frames.get_mut(idx) {
1564                    frame.parent_id = Some(parent_id);
1565                    tracing::debug!(
1566                        chunk_frame_id = chunk_id,
1567                        parent_frame_id = parent_id,
1568                        "resolved orphan chunk parent in second pass"
1569                    );
1570                }
1571            }
1572        }
1573
1574        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1575        // See commit_from_records() for where rebuild_indexes() is invoked.
1576        Ok(delta)
1577    }
1578
1579    #[cfg(feature = "temporal_track")]
1580    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1581        (timestamp, AnchorSource::FrameTimestamp)
1582    }
1583
1584    #[cfg(feature = "temporal_track")]
1585    fn collect_temporal_mentions(
1586        text: Option<&str>,
1587        frame_id: FrameId,
1588        anchor_ts: i64,
1589    ) -> Vec<TemporalMention> {
1590        let text = match text {
1591            Some(value) if !value.trim().is_empty() => value,
1592            _ => return Vec::new(),
1593        };
1594
1595        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1596            Ok(ts) => ts,
1597            Err(_) => return Vec::new(),
1598        };
1599
1600        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1601        let normalizer = TemporalNormalizer::new(context);
1602        let mut spans: Vec<(usize, usize)> = Vec::new();
1603        let lower = text.to_ascii_lowercase();
1604
1605        for phrase in STATIC_TEMPORAL_PHRASES {
1606            let mut search_start = 0usize;
1607            while let Some(idx) = lower[search_start..].find(phrase) {
1608                let abs = search_start + idx;
1609                let end = abs + phrase.len();
1610                spans.push((abs, end));
1611                search_start = end;
1612            }
1613        }
1614
1615        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1616        let regex = NUMERIC_DATE.get_or_init(|| {
1617            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1618        });
1619        let regex = match regex {
1620            Ok(re) => re,
1621            Err(msg) => {
1622                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1623                return Vec::new();
1624            }
1625        };
1626        for mat in regex.find_iter(text) {
1627            spans.push((mat.start(), mat.end()));
1628        }
1629
1630        spans.sort_unstable();
1631        spans.dedup();
1632
1633        let mut mentions: Vec<TemporalMention> = Vec::new();
1634        for (start, end) in spans {
1635            if end > text.len() || start >= end {
1636                continue;
1637            }
1638            let raw = &text[start..end];
1639            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1640            if trimmed.is_empty() {
1641                continue;
1642            }
1643            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1644            let finish = offset + trimmed.len();
1645            match normalizer.resolve(trimmed) {
1646                Ok(resolution) => {
1647                    mentions.extend(Self::resolution_to_mentions(
1648                        resolution, frame_id, offset, finish,
1649                    ));
1650                }
1651                Err(_) => continue,
1652            }
1653        }
1654
1655        mentions
1656    }
1657
1658    #[cfg(feature = "temporal_track")]
1659    fn resolution_to_mentions(
1660        resolution: TemporalResolution,
1661        frame_id: FrameId,
1662        byte_start: usize,
1663        byte_end: usize,
1664    ) -> Vec<TemporalMention> {
1665        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1666        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1667        let mut results = Vec::new();
1668
1669        let base_flags = Self::flags_from_resolution(&resolution.flags);
1670        match resolution.value {
1671            TemporalResolutionValue::Date(date) => {
1672                let ts = Self::date_to_timestamp(date);
1673                results.push(TemporalMention::new(
1674                    ts,
1675                    frame_id,
1676                    byte_start,
1677                    byte_len,
1678                    TemporalMentionKind::Date,
1679                    resolution.confidence,
1680                    0,
1681                    base_flags,
1682                ));
1683            }
1684            TemporalResolutionValue::DateTime(dt) => {
1685                let ts = dt.unix_timestamp();
1686                let tz_hint = dt.offset().whole_minutes() as i16;
1687                results.push(TemporalMention::new(
1688                    ts,
1689                    frame_id,
1690                    byte_start,
1691                    byte_len,
1692                    TemporalMentionKind::DateTime,
1693                    resolution.confidence,
1694                    tz_hint,
1695                    base_flags,
1696                ));
1697            }
1698            TemporalResolutionValue::DateRange { start, end } => {
1699                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1700                let start_ts = Self::date_to_timestamp(start);
1701                results.push(TemporalMention::new(
1702                    start_ts,
1703                    frame_id,
1704                    byte_start,
1705                    byte_len,
1706                    TemporalMentionKind::RangeStart,
1707                    resolution.confidence,
1708                    0,
1709                    flags,
1710                ));
1711                let end_ts = Self::date_to_timestamp(end);
1712                results.push(TemporalMention::new(
1713                    end_ts,
1714                    frame_id,
1715                    byte_start,
1716                    byte_len,
1717                    TemporalMentionKind::RangeEnd,
1718                    resolution.confidence,
1719                    0,
1720                    flags,
1721                ));
1722            }
1723            TemporalResolutionValue::DateTimeRange { start, end } => {
1724                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1725                results.push(TemporalMention::new(
1726                    start.unix_timestamp(),
1727                    frame_id,
1728                    byte_start,
1729                    byte_len,
1730                    TemporalMentionKind::RangeStart,
1731                    resolution.confidence,
1732                    start.offset().whole_minutes() as i16,
1733                    flags,
1734                ));
1735                results.push(TemporalMention::new(
1736                    end.unix_timestamp(),
1737                    frame_id,
1738                    byte_start,
1739                    byte_len,
1740                    TemporalMentionKind::RangeEnd,
1741                    resolution.confidence,
1742                    end.offset().whole_minutes() as i16,
1743                    flags,
1744                ));
1745            }
1746            TemporalResolutionValue::Month { year, month } => {
1747                let start_date = match Date::from_calendar_date(year, month, 1) {
1748                    Ok(date) => date,
1749                    Err(err) => {
1750                        tracing::warn!(
1751                            target = "memvid::temporal",
1752                            %err,
1753                            year,
1754                            month = month as u8,
1755                            "skipping invalid month resolution"
1756                        );
1757                        // Skip invalid range for this mention only.
1758                        return results;
1759                    }
1760                };
1761                let end_date = match Self::last_day_in_month(year, month) {
1762                    Some(date) => date,
1763                    None => {
1764                        tracing::warn!(
1765                            target = "memvid::temporal",
1766                            year,
1767                            month = month as u8,
1768                            "skipping month resolution with invalid calendar range"
1769                        );
1770                        return results;
1771                    }
1772                };
1773                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1774                results.push(TemporalMention::new(
1775                    Self::date_to_timestamp(start_date),
1776                    frame_id,
1777                    byte_start,
1778                    byte_len,
1779                    TemporalMentionKind::RangeStart,
1780                    resolution.confidence,
1781                    0,
1782                    flags,
1783                ));
1784                results.push(TemporalMention::new(
1785                    Self::date_to_timestamp(end_date),
1786                    frame_id,
1787                    byte_start,
1788                    byte_len,
1789                    TemporalMentionKind::RangeEnd,
1790                    resolution.confidence,
1791                    0,
1792                    flags,
1793                ));
1794            }
1795        }
1796
1797        results
1798    }
1799
1800    #[cfg(feature = "temporal_track")]
1801    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1802        let mut result = TemporalMentionFlags::empty();
1803        if flags
1804            .iter()
1805            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1806        {
1807            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1808        }
1809        if flags
1810            .iter()
1811            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1812        {
1813            result = result.set(TemporalMentionFlags::DERIVED, true);
1814        }
1815        result
1816    }
1817
1818    #[cfg(feature = "temporal_track")]
1819    fn date_to_timestamp(date: Date) -> i64 {
1820        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1821            .assume_offset(UtcOffset::UTC)
1822            .unix_timestamp()
1823    }
1824
1825    #[cfg(feature = "temporal_track")]
1826    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1827        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1828        while let Some(next) = date.next_day() {
1829            if next.month() == month {
1830                date = next;
1831            } else {
1832                break;
1833            }
1834        }
1835        Some(date)
1836    }
1837
1838    #[allow(dead_code)]
1839    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1840        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1841            return Ok(false);
1842        }
1843
1844        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1845            Some(artifact) => artifact,
1846            None => return Ok(false),
1847        };
1848
1849        let segment_id = self.toc.segment_catalog.next_segment_id;
1850        #[cfg(feature = "parallel_segments")]
1851        let span =
1852            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1853
1854        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1855        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1856        #[cfg(feature = "parallel_segments")]
1857        if let Some(span) = span {
1858            Self::decorate_segment_common(&mut descriptor.common, span);
1859        }
1860        #[cfg(feature = "parallel_segments")]
1861        let descriptor_for_manifest = descriptor.clone();
1862        self.toc.segment_catalog.lex_segments.push(descriptor);
1863        #[cfg(feature = "parallel_segments")]
1864        if let Err(err) = self.record_index_segment(
1865            SegmentKind::Lexical,
1866            descriptor_for_manifest.common,
1867            SegmentStats {
1868                doc_count: artifact.doc_count,
1869                vector_count: 0,
1870                time_entries: 0,
1871                bytes_uncompressed: artifact.bytes.len() as u64,
1872                build_micros: 0,
1873            },
1874        ) {
1875            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1876        }
1877        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1878        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1879        Ok(true)
1880    }
1881
1882    #[allow(dead_code)]
1883    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1884        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1885            return Ok(false);
1886        }
1887
1888        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1889            Some(artifact) => artifact,
1890            None => return Ok(false),
1891        };
1892
1893        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1894            if existing_dim != artifact.dimension {
1895                return Err(MemvidError::VecDimensionMismatch {
1896                    expected: existing_dim,
1897                    actual: artifact.dimension as usize,
1898                });
1899            }
1900        }
1901
1902        let segment_id = self.toc.segment_catalog.next_segment_id;
1903        #[cfg(feature = "parallel_segments")]
1904        #[cfg(feature = "parallel_segments")]
1905        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1906
1907        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1908        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1909        #[cfg(feature = "parallel_segments")]
1910        if let Some(span) = span {
1911            Self::decorate_segment_common(&mut descriptor.common, span);
1912        }
1913        #[cfg(feature = "parallel_segments")]
1914        let descriptor_for_manifest = descriptor.clone();
1915        self.toc.segment_catalog.vec_segments.push(descriptor);
1916        #[cfg(feature = "parallel_segments")]
1917        if let Err(err) = self.record_index_segment(
1918            SegmentKind::Vector,
1919            descriptor_for_manifest.common,
1920            SegmentStats {
1921                doc_count: 0,
1922                vector_count: artifact.vector_count,
1923                time_entries: 0,
1924                bytes_uncompressed: artifact.bytes_uncompressed,
1925                build_micros: 0,
1926            },
1927        ) {
1928            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1929        }
1930        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1931        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1932
1933        // Keep the global vec manifest in sync for auto-detection and stats.
1934        if self.toc.indexes.vec.is_none() {
1935            let empty_offset = self.data_end;
1936            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1937                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1938            self.toc.indexes.vec = Some(VecIndexManifest {
1939                vector_count: 0,
1940                dimension: 0,
1941                bytes_offset: empty_offset,
1942                bytes_length: 0,
1943                checksum: empty_checksum,
1944                compression_mode: self.vec_compression.clone(),
1945                model: self.vec_model.clone(),
1946            });
1947        }
1948        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1949            if manifest.dimension == 0 {
1950                manifest.dimension = artifact.dimension;
1951            }
1952            if manifest.bytes_length == 0 {
1953                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1954                manifest.compression_mode = artifact.compression.clone();
1955            }
1956        }
1957
1958        self.vec_enabled = true;
1959        Ok(true)
1960    }
1961
1962    #[allow(dead_code)]
1963    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1964        if delta.inserted_time_entries.is_empty() {
1965            return Ok(false);
1966        }
1967
1968        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1969            Some(artifact) => artifact,
1970            None => return Ok(false),
1971        };
1972
1973        let segment_id = self.toc.segment_catalog.next_segment_id;
1974        #[cfg(feature = "parallel_segments")]
1975        #[cfg(feature = "parallel_segments")]
1976        let span = self.segment_span_from_iter(
1977            delta
1978                .inserted_time_entries
1979                .iter()
1980                .map(|entry| entry.frame_id),
1981        );
1982
1983        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1984        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1985        #[cfg(feature = "parallel_segments")]
1986        if let Some(span) = span {
1987            Self::decorate_segment_common(&mut descriptor.common, span);
1988        }
1989        #[cfg(feature = "parallel_segments")]
1990        let descriptor_for_manifest = descriptor.clone();
1991        self.toc.segment_catalog.time_segments.push(descriptor);
1992        #[cfg(feature = "parallel_segments")]
1993        if let Err(err) = self.record_index_segment(
1994            SegmentKind::Time,
1995            descriptor_for_manifest.common,
1996            SegmentStats {
1997                doc_count: 0,
1998                vector_count: 0,
1999                time_entries: artifact.entry_count,
2000                bytes_uncompressed: artifact.bytes.len() as u64,
2001                build_micros: 0,
2002            },
2003        ) {
2004            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
2005        }
2006        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2007        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2008        Ok(true)
2009    }
2010
2011    #[cfg(feature = "temporal_track")]
2012    #[allow(dead_code)]
2013    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
2014        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
2015        {
2016            return Ok(false);
2017        }
2018
2019        debug_assert!(
2020            delta.inserted_temporal_mentions.len() < 1_000_000,
2021            "temporal delta mentions unexpectedly large: {}",
2022            delta.inserted_temporal_mentions.len()
2023        );
2024        debug_assert!(
2025            delta.inserted_temporal_anchors.len() < 1_000_000,
2026            "temporal delta anchors unexpectedly large: {}",
2027            delta.inserted_temporal_anchors.len()
2028        );
2029
2030        let artifact = match self.build_temporal_segment_from_records(
2031            &delta.inserted_temporal_mentions,
2032            &delta.inserted_temporal_anchors,
2033        )? {
2034            Some(artifact) => artifact,
2035            None => return Ok(false),
2036        };
2037
2038        let segment_id = self.toc.segment_catalog.next_segment_id;
2039        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2040        self.toc
2041            .segment_catalog
2042            .temporal_segments
2043            .push(descriptor.clone());
2044        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2045        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2046
2047        self.toc.temporal_track = Some(TemporalTrackManifest {
2048            bytes_offset: descriptor.common.bytes_offset,
2049            bytes_length: descriptor.common.bytes_length,
2050            entry_count: artifact.entry_count,
2051            anchor_count: artifact.anchor_count,
2052            checksum: artifact.checksum,
2053            flags: artifact.flags,
2054        });
2055
2056        self.clear_temporal_track_cache();
2057
2058        Ok(true)
2059    }
2060
2061    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2062        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2063            frame_id,
2064            reason: "frame id too large",
2065        })?;
2066        let frame = self
2067            .toc
2068            .frames
2069            .get_mut(index)
2070            .ok_or(MemvidError::InvalidFrame {
2071                frame_id,
2072                reason: "supersede target missing",
2073            })?;
2074        frame.status = FrameStatus::Superseded;
2075        frame.superseded_by = Some(successor_id);
2076        self.remove_frame_from_indexes(frame_id)
2077    }
2078
2079    pub(crate) fn rebuild_indexes(
2080        &mut self,
2081        new_vec_docs: &[(FrameId, Vec<f32>)],
2082        inserted_frame_ids: &[FrameId],
2083    ) -> Result<()> {
2084        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2085            return Ok(());
2086        }
2087
2088        let payload_end = self.payload_region_end();
2089        self.data_end = payload_end;
2090        // Don't truncate if footer_offset is higher - there may be replay segments
2091        // or other data written after payload_end that must be preserved.
2092        let safe_truncate_len = self.header.footer_offset.max(payload_end);
2093        if self.file.metadata()?.len() > safe_truncate_len {
2094            self.file.set_len(safe_truncate_len)?;
2095        }
2096        self.file.seek(SeekFrom::Start(payload_end))?;
2097
2098        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
2099        self.toc.segment_catalog.lex_segments.clear();
2100        self.toc.segment_catalog.vec_segments.clear();
2101        self.toc.segment_catalog.time_segments.clear();
2102        #[cfg(feature = "temporal_track")]
2103        self.toc.segment_catalog.temporal_segments.clear();
2104        #[cfg(feature = "parallel_segments")]
2105        self.toc.segment_catalog.index_segments.clear();
2106        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
2107        self.toc.segment_catalog.tantivy_segments.clear();
2108        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
2109        self.toc.indexes.lex_segments.clear();
2110
2111        let mut time_entries: Vec<TimeIndexEntry> = self
2112            .toc
2113            .frames
2114            .iter()
2115            .filter(|frame| {
2116                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2117            })
2118            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2119            .collect();
2120        let (ti_offset, ti_length, ti_checksum) =
2121            time_index_append(&mut self.file, &mut time_entries)?;
2122        self.toc.time_index = Some(TimeIndexManifest {
2123            bytes_offset: ti_offset,
2124            bytes_length: ti_length,
2125            entry_count: time_entries.len() as u64,
2126            checksum: ti_checksum,
2127        });
2128
2129        let mut footer_offset = ti_offset + ti_length;
2130
2131        #[cfg(feature = "temporal_track")]
2132        {
2133            self.toc.temporal_track = None;
2134            self.toc.segment_catalog.temporal_segments.clear();
2135            self.clear_temporal_track_cache();
2136        }
2137
2138        if self.lex_enabled {
2139            #[cfg(feature = "lex")]
2140            {
2141                if self.tantivy_dirty {
2142                    // instant_index was used: frames were added to Tantivy with WAL
2143                    // sequence numbers as IDs, which don't match the actual frame IDs
2144                    // assigned during apply_records(). Must do a full rebuild to fix IDs.
2145                    if let Ok(mut storage) = self.lex_storage.write() {
2146                        storage.clear();
2147                        storage.set_generation(0);
2148                    }
2149                    self.init_tantivy()?;
2150                    if let Some(mut engine) = self.tantivy.take() {
2151                        self.rebuild_tantivy_engine(&mut engine)?;
2152                        self.tantivy = Some(engine);
2153                    } else {
2154                        return Err(MemvidError::InvalidToc {
2155                            reason: "tantivy engine missing during rebuild".into(),
2156                        });
2157                    }
2158                } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2159                    // Incremental path: engine exists (from open() or previous commit),
2160                    // instant_index was NOT used so no wrong IDs. Just add new frames.
2161                    // This is O(batch_size) — the key optimization for bulk ingestion.
2162                    //
2163                    // Collect frames + text first to avoid borrow conflicts with engine.
2164                    let max_payload = crate::memvid::search::max_index_payload();
2165                    let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2166                    for &frame_id in inserted_frame_ids {
2167                        let Ok(idx) = usize::try_from(frame_id) else {
2168                            continue;
2169                        };
2170                        let frame = match self.toc.frames.get(idx) {
2171                            Some(f) => f.clone(),
2172                            None => continue,
2173                        };
2174                        if frame.status != FrameStatus::Active {
2175                            continue;
2176                        }
2177                        if let Some(search_text) = frame.search_text.clone() {
2178                            if !search_text.trim().is_empty() {
2179                                prepared_docs.push((frame, search_text));
2180                                continue;
2181                            }
2182                        }
2183                        let mime = frame
2184                            .metadata
2185                            .as_ref()
2186                            .and_then(|m| m.mime.as_deref())
2187                            .unwrap_or("application/octet-stream");
2188                        if !crate::memvid::search::is_text_indexable_mime(mime) {
2189                            continue;
2190                        }
2191                        if frame.payload_length > max_payload {
2192                            continue;
2193                        }
2194                        let text = self.frame_search_text(&frame)?;
2195                        if !text.trim().is_empty() {
2196                            prepared_docs.push((frame, text));
2197                        }
2198                    }
2199                    if let Some(engine) = self.tantivy.as_mut() {
2200                        for (frame, text) in &prepared_docs {
2201                            engine.add_frame(frame, text)?;
2202                        }
2203                        engine.commit()?;
2204                    }
2205                } else {
2206                    // Full rebuild path: no engine or no new frames (e.g., doctor repair).
2207                    // Clear embedded storage to avoid carrying stale segments between rebuilds.
2208                    if let Ok(mut storage) = self.lex_storage.write() {
2209                        storage.clear();
2210                        storage.set_generation(0);
2211                    }
2212                    self.init_tantivy()?;
2213                    if let Some(mut engine) = self.tantivy.take() {
2214                        self.rebuild_tantivy_engine(&mut engine)?;
2215                        self.tantivy = Some(engine);
2216                    } else {
2217                        return Err(MemvidError::InvalidToc {
2218                            reason: "tantivy engine missing during rebuild".into(),
2219                        });
2220                    }
2221                }
2222
2223                // Set lex_enabled to ensure it persists
2224                self.lex_enabled = true;
2225
2226                // Mark Tantivy as dirty so it gets flushed
2227                self.tantivy_dirty = true;
2228
2229                // Position embedded Tantivy segments immediately after the time index.
2230                self.data_end = footer_offset;
2231
2232                // Flush Tantivy segments to file
2233                self.flush_tantivy()?;
2234
2235                // Update footer_offset after Tantivy flush
2236                footer_offset = self.header.footer_offset;
2237
2238                // Restore data_end to payload boundary so future payload writes stay before indexes.
2239                self.data_end = payload_end;
2240            }
2241            #[cfg(not(feature = "lex"))]
2242            {
2243                self.toc.indexes.lex = None;
2244                self.toc.indexes.lex_segments.clear();
2245            }
2246        } else {
2247            // Lex disabled: clear everything
2248            self.toc.indexes.lex = None;
2249            self.toc.indexes.lex_segments.clear();
2250            #[cfg(feature = "lex")]
2251            if let Ok(mut storage) = self.lex_storage.write() {
2252                storage.clear();
2253            }
2254        }
2255
2256        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2257            let vec_offset = footer_offset;
2258            self.file.seek(SeekFrom::Start(vec_offset))?;
2259            self.file.write_all(&artifact.bytes)?;
2260            footer_offset += artifact.bytes.len() as u64;
2261            self.toc.indexes.vec = Some(VecIndexManifest {
2262                vector_count: artifact.vector_count,
2263                dimension: artifact.dimension,
2264                bytes_offset: vec_offset,
2265                bytes_length: artifact.bytes.len() as u64,
2266                checksum: artifact.checksum,
2267                compression_mode: self.vec_compression.clone(),
2268                model: self.vec_model.clone(),
2269            });
2270            self.vec_index = Some(index);
2271        } else {
2272            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2273            if !self.vec_enabled {
2274                self.toc.indexes.vec = None;
2275            }
2276            self.vec_index = None;
2277        }
2278
2279        // Persist CLIP index if it has embeddings
2280        if self.clip_enabled {
2281            if let Some(ref clip_index) = self.clip_index {
2282                if !clip_index.is_empty() {
2283                    let artifact = clip_index.encode()?;
2284                    let clip_offset = footer_offset;
2285                    self.file.seek(SeekFrom::Start(clip_offset))?;
2286                    self.file.write_all(&artifact.bytes)?;
2287                    footer_offset += artifact.bytes.len() as u64;
2288                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2289                        bytes_offset: clip_offset,
2290                        bytes_length: artifact.bytes.len() as u64,
2291                        vector_count: artifact.vector_count,
2292                        dimension: artifact.dimension,
2293                        checksum: artifact.checksum,
2294                        model_name: crate::clip::default_model_info().name.to_string(),
2295                    });
2296                    tracing::info!(
2297                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2298                        artifact.vector_count,
2299                        clip_offset
2300                    );
2301                }
2302            }
2303        } else {
2304            self.toc.indexes.clip = None;
2305        }
2306
2307        // Persist memories track if it has cards
2308        if self.memories_track.card_count() > 0 {
2309            let memories_offset = footer_offset;
2310            let memories_bytes = self.memories_track.serialize()?;
2311            let memories_checksum = blake3::hash(&memories_bytes).into();
2312            self.file.seek(SeekFrom::Start(memories_offset))?;
2313            self.file.write_all(&memories_bytes)?;
2314            footer_offset += memories_bytes.len() as u64;
2315
2316            let stats = self.memories_track.stats();
2317            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2318                bytes_offset: memories_offset,
2319                bytes_length: memories_bytes.len() as u64,
2320                card_count: stats.card_count as u64,
2321                entity_count: stats.entity_count as u64,
2322                checksum: memories_checksum,
2323            });
2324        } else {
2325            self.toc.memories_track = None;
2326        }
2327
2328        // Persist logic mesh if it has nodes
2329        if self.logic_mesh.is_empty() {
2330            self.toc.logic_mesh = None;
2331        } else {
2332            let mesh_offset = footer_offset;
2333            let mesh_bytes = self.logic_mesh.serialize()?;
2334            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2335            self.file.seek(SeekFrom::Start(mesh_offset))?;
2336            self.file.write_all(&mesh_bytes)?;
2337            footer_offset += mesh_bytes.len() as u64;
2338
2339            let stats = self.logic_mesh.stats();
2340            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2341                bytes_offset: mesh_offset,
2342                bytes_length: mesh_bytes.len() as u64,
2343                node_count: stats.node_count as u64,
2344                edge_count: stats.edge_count as u64,
2345                checksum: mesh_checksum,
2346            });
2347        }
2348
2349        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2350        tracing::info!(
2351            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2352            ti_offset,
2353            ti_length,
2354            footer_offset,
2355            self.header.footer_offset
2356        );
2357
2358        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2359        // This prevents overwriting data like replay segments that were written after index data
2360        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2361
2362        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2363        if self.file.metadata()?.len() < self.header.footer_offset {
2364            self.file.set_len(self.header.footer_offset)?;
2365        }
2366
2367        self.rewrite_toc_footer()?;
2368        self.header.toc_checksum = self.toc.toc_checksum;
2369        crate::persist_header(&mut self.file, &self.header)?;
2370
2371        #[cfg(feature = "lex")]
2372        if self.lex_enabled {
2373            if let Some(ref engine) = self.tantivy {
2374                let doc_count = engine.num_docs();
2375                let active_frame_count = self
2376                    .toc
2377                    .frames
2378                    .iter()
2379                    .filter(|f| f.status == FrameStatus::Active)
2380                    .count();
2381
2382                // Count frames that would actually be indexed by rebuild_tantivy_engine
2383                // Uses the same logic: content-type based check + size limit
2384                let text_indexable_count = self
2385                    .toc
2386                    .frames
2387                    .iter()
2388                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2389                    .count();
2390
2391                // Only fail if we have text-indexable frames but none got indexed
2392                // This avoids false positives for binary files (videos, images)
2393                if doc_count == 0 && text_indexable_count > 0 {
2394                    return Err(MemvidError::Doctor {
2395                        reason: format!(
2396                            "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2397                            This indicates a critical failure in the rebuild process."
2398                        ),
2399                    });
2400                }
2401
2402                // Success! Log it
2403                log::info!(
2404                    "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2405                );
2406            }
2407        }
2408
2409        Ok(())
2410    }
2411
2412    /// Persist the memories track to the file without a full rebuild.
2413    ///
2414    /// This is used when the memories track has been modified but no frame
2415    /// changes were made (e.g., after running enrichment).
2416    fn persist_memories_track(&mut self) -> Result<()> {
2417        if self.memories_track.card_count() == 0 {
2418            self.toc.memories_track = None;
2419            return Ok(());
2420        }
2421
2422        // Write after the current footer_offset
2423        let memories_offset = self.header.footer_offset;
2424        let memories_bytes = self.memories_track.serialize()?;
2425        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2426
2427        self.file.seek(SeekFrom::Start(memories_offset))?;
2428        self.file.write_all(&memories_bytes)?;
2429
2430        let stats = self.memories_track.stats();
2431        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2432            bytes_offset: memories_offset,
2433            bytes_length: memories_bytes.len() as u64,
2434            card_count: stats.card_count as u64,
2435            entity_count: stats.entity_count as u64,
2436            checksum: memories_checksum,
2437        });
2438
2439        // Update footer_offset to account for the memories track
2440        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2441
2442        // Ensure the file length covers the memories track
2443        if self.file.metadata()?.len() < self.header.footer_offset {
2444            self.file.set_len(self.header.footer_offset)?;
2445        }
2446
2447        Ok(())
2448    }
2449
2450    /// Persist the CLIP index to the file without a full rebuild.
2451    ///
2452    /// This is used when CLIP embeddings have been added but no full
2453    /// index rebuild is needed (e.g., in parallel segments mode).
2454    fn persist_clip_index(&mut self) -> Result<()> {
2455        if !self.clip_enabled {
2456            self.toc.indexes.clip = None;
2457            return Ok(());
2458        }
2459
2460        let clip_index = match &self.clip_index {
2461            Some(idx) if !idx.is_empty() => idx,
2462            _ => {
2463                self.toc.indexes.clip = None;
2464                return Ok(());
2465            }
2466        };
2467
2468        // Encode the CLIP index
2469        let artifact = clip_index.encode()?;
2470
2471        // Write after the current footer_offset
2472        let clip_offset = self.header.footer_offset;
2473        self.file.seek(SeekFrom::Start(clip_offset))?;
2474        self.file.write_all(&artifact.bytes)?;
2475
2476        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2477            bytes_offset: clip_offset,
2478            bytes_length: artifact.bytes.len() as u64,
2479            vector_count: artifact.vector_count,
2480            dimension: artifact.dimension,
2481            checksum: artifact.checksum,
2482            model_name: crate::clip::default_model_info().name.to_string(),
2483        });
2484
2485        tracing::info!(
2486            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2487            artifact.vector_count,
2488            clip_offset
2489        );
2490
2491        // Update footer_offset to account for the CLIP index
2492        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2493
2494        // Ensure the file length covers the CLIP index
2495        if self.file.metadata()?.len() < self.header.footer_offset {
2496            self.file.set_len(self.header.footer_offset)?;
2497        }
2498
2499        Ok(())
2500    }
2501
2502    /// Persist the Logic-Mesh to the file without a full rebuild.
2503    ///
2504    /// This is used when the Logic-Mesh has been modified but no frame
2505    /// changes were made (e.g., after running NER enrichment).
2506    fn persist_logic_mesh(&mut self) -> Result<()> {
2507        if self.logic_mesh.is_empty() {
2508            self.toc.logic_mesh = None;
2509            return Ok(());
2510        }
2511
2512        // Write after the current footer_offset
2513        let mesh_offset = self.header.footer_offset;
2514        let mesh_bytes = self.logic_mesh.serialize()?;
2515        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2516
2517        self.file.seek(SeekFrom::Start(mesh_offset))?;
2518        self.file.write_all(&mesh_bytes)?;
2519
2520        let stats = self.logic_mesh.stats();
2521        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2522            bytes_offset: mesh_offset,
2523            bytes_length: mesh_bytes.len() as u64,
2524            node_count: stats.node_count as u64,
2525            edge_count: stats.edge_count as u64,
2526            checksum: mesh_checksum,
2527        });
2528
2529        // Update footer_offset to account for the logic mesh
2530        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2531
2532        // Ensure the file length covers the logic mesh
2533        if self.file.metadata()?.len() < self.header.footer_offset {
2534            self.file.set_len(self.header.footer_offset)?;
2535        }
2536
2537        Ok(())
2538    }
2539
2540    /// Persist the sketch track to the file without a full rebuild.
2541    ///
2542    /// This is used when the sketch track has been modified (e.g., after
2543    /// running `sketch build`).
2544    fn persist_sketch_track(&mut self) -> Result<()> {
2545        if self.sketch_track.is_empty() {
2546            self.toc.sketch_track = None;
2547            return Ok(());
2548        }
2549
2550        // Seek to write after the current footer_offset
2551        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2552
2553        // Write the sketch track and get (offset, length, checksum)
2554        let (sketch_offset, sketch_length, sketch_checksum) =
2555            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2556
2557        let stats = self.sketch_track.stats();
2558        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2559            bytes_offset: sketch_offset,
2560            bytes_length: sketch_length,
2561            entry_count: stats.entry_count,
2562            #[allow(clippy::cast_possible_truncation)]
2563            entry_size: stats.variant.entry_size() as u16,
2564            flags: 0,
2565            checksum: sketch_checksum,
2566        });
2567
2568        // Update footer_offset to account for the sketch track
2569        self.header.footer_offset = sketch_offset + sketch_length;
2570
2571        // Ensure the file length covers the sketch track
2572        if self.file.metadata()?.len() < self.header.footer_offset {
2573            self.file.set_len(self.header.footer_offset)?;
2574        }
2575
2576        tracing::debug!(
2577            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2578            stats.entry_count,
2579            sketch_offset
2580        );
2581
2582        Ok(())
2583    }
2584
2585    #[cfg(feature = "lex")]
2586    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2587        let LexWalBatch {
2588            generation,
2589            doc_count,
2590            checksum,
2591            segments,
2592        } = batch;
2593
2594        if let Ok(mut storage) = self.lex_storage.write() {
2595            storage.replace(doc_count, checksum, segments);
2596            storage.set_generation(generation);
2597        }
2598
2599        self.persist_lex_manifest()
2600    }
2601
2602    #[cfg(feature = "lex")]
2603    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2604        let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2605        self.append_wal_entry(&payload)?;
2606        Ok(())
2607    }
2608
2609    #[cfg(feature = "lex")]
2610    fn persist_lex_manifest(&mut self) -> Result<()> {
2611        let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2612            storage.to_manifest()
2613        } else {
2614            (None, Vec::new())
2615        };
2616
2617        // Update the manifest
2618        if let Some(storage_manifest) = index_manifest {
2619            // Old LexIndexArtifact format: set the manifest with actual offset/length
2620            self.toc.indexes.lex = Some(storage_manifest);
2621        } else {
2622            // Tantivy segments OR lex disabled: clear the manifest
2623            // Stats will check lex_segments instead of manifest
2624            self.toc.indexes.lex = None;
2625        }
2626
2627        self.toc.indexes.lex_segments = segments;
2628
2629        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2630        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2631
2632        self.rewrite_toc_footer()?;
2633        self.header.toc_checksum = self.toc.toc_checksum;
2634        crate::persist_header(&mut self.file, &self.header)?;
2635        Ok(())
2636    }
2637
2638    #[cfg(feature = "lex")]
2639    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2640        let TantivySnapshot {
2641            doc_count,
2642            checksum,
2643            segments,
2644        } = snapshot;
2645
2646        let mut footer_offset = self.data_end;
2647        self.file.seek(SeekFrom::Start(footer_offset))?;
2648
2649        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2650        for segment in segments {
2651            let bytes_length = segment.bytes.len() as u64;
2652            self.file.write_all(&segment.bytes)?;
2653            self.file.flush()?; // Flush segment data to disk
2654            embedded_segments.push(EmbeddedLexSegment {
2655                path: segment.path,
2656                bytes_offset: footer_offset,
2657                bytes_length,
2658                checksum: segment.checksum,
2659            });
2660            footer_offset += bytes_length;
2661        }
2662        // Set footer_offset for TOC writing, but DON'T update data_end
2663        // data_end stays at end of payloads, so next commit overwrites these segments
2664        // Use max() to never decrease footer_offset - this preserves replay segments
2665        // that may have been written at a higher offset
2666        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2667
2668        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2669        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2670            Vec::with_capacity(embedded_segments.len());
2671        for segment in &embedded_segments {
2672            let descriptor = TantivySegmentDescriptor::from_common(
2673                SegmentCommon::new(
2674                    next_segment_id,
2675                    segment.bytes_offset,
2676                    segment.bytes_length,
2677                    segment.checksum,
2678                ),
2679                segment.path.clone(),
2680            );
2681            catalog_segments.push(descriptor);
2682            next_segment_id = next_segment_id.saturating_add(1);
2683        }
2684        if catalog_segments.is_empty() {
2685            self.toc.segment_catalog.tantivy_segments.clear();
2686        } else {
2687            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2688            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2689        }
2690        self.toc.segment_catalog.next_segment_id = next_segment_id;
2691
2692        // REMOVED: catalog_data_end() check
2693        // This was causing orphaned Tantivy segments because it would see OLD segments
2694        // still in the catalog from previous commits, and push footer_offset forward.
2695        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2696        // stay at the end of the newly written segments.
2697
2698        let generation = self
2699            .lex_storage
2700            .write()
2701            .map_err(|_| MemvidError::Tantivy {
2702                reason: "embedded lex storage lock poisoned".into(),
2703            })
2704            .map(|mut storage| {
2705                storage.replace(doc_count, checksum, embedded_segments.clone());
2706                storage.generation()
2707            })?;
2708
2709        let batch = LexWalBatch {
2710            generation,
2711            doc_count,
2712            checksum,
2713            segments: embedded_segments.clone(),
2714        };
2715        self.append_lex_batch(&batch)?;
2716        self.persist_lex_manifest()?;
2717        self.header.toc_checksum = self.toc.toc_checksum;
2718        crate::persist_header(&mut self.file, &self.header)?;
2719        Ok(())
2720    }
2721
2722    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2723        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2724            frame_id,
2725            reason: "frame id too large",
2726        })?;
2727        let frame = self
2728            .toc
2729            .frames
2730            .get_mut(index)
2731            .ok_or(MemvidError::InvalidFrame {
2732                frame_id,
2733                reason: "delete target missing",
2734            })?;
2735        frame.status = FrameStatus::Deleted;
2736        frame.superseded_by = None;
2737        self.remove_frame_from_indexes(frame_id)
2738    }
2739
2740    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2741        #[cfg(feature = "lex")]
2742        if let Some(engine) = self.tantivy.as_mut() {
2743            engine.delete_frame(frame_id)?;
2744            self.tantivy_dirty = true;
2745        }
2746        if let Some(index) = self.lex_index.as_mut() {
2747            index.remove_document(frame_id);
2748        }
2749        if let Some(index) = self.vec_index.as_mut() {
2750            index.remove(frame_id);
2751        }
2752        Ok(())
2753    }
2754
2755    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2756        let Ok(index) = usize::try_from(frame_id) else {
2757            return false;
2758        };
2759        self.toc
2760            .frames
2761            .get(index)
2762            .is_some_and(|frame| frame.status == FrameStatus::Active)
2763    }
2764
2765    #[cfg(feature = "parallel_segments")]
2766    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2767    where
2768        I: IntoIterator<Item = FrameId>,
2769    {
2770        let mut iter = iter.into_iter();
2771        let first_id = iter.next()?;
2772        let first_frame = self.toc.frames.get(first_id as usize);
2773        let mut min_id = first_id;
2774        let mut max_id = first_id;
2775        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2776        let mut page_end = first_frame
2777            .and_then(|frame| frame.chunk_count)
2778            .map(|count| page_start + count.saturating_sub(1))
2779            .unwrap_or(page_start);
2780        for frame_id in iter {
2781            if frame_id < min_id {
2782                min_id = frame_id;
2783            }
2784            if frame_id > max_id {
2785                max_id = frame_id;
2786            }
2787            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2788                if let Some(idx) = frame.chunk_index {
2789                    page_start = page_start.min(idx);
2790                    if let Some(count) = frame.chunk_count {
2791                        let end = idx + count.saturating_sub(1);
2792                        page_end = page_end.max(end);
2793                    } else {
2794                        page_end = page_end.max(idx);
2795                    }
2796                }
2797            }
2798        }
2799        Some(SegmentSpan {
2800            frame_start: min_id,
2801            frame_end: max_id,
2802            page_start,
2803            page_end,
2804            ..SegmentSpan::default()
2805        })
2806    }
2807
2808    #[cfg(feature = "parallel_segments")]
2809    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2810        common.span = Some(span);
2811        if common.codec_version == 0 {
2812            common.codec_version = 1;
2813        }
2814    }
2815
2816    #[cfg(feature = "parallel_segments")]
2817    pub(crate) fn record_index_segment(
2818        &mut self,
2819        kind: SegmentKind,
2820        common: SegmentCommon,
2821        stats: SegmentStats,
2822    ) -> Result<()> {
2823        let entry = IndexSegmentRef {
2824            kind,
2825            common,
2826            stats,
2827        };
2828        self.toc.segment_catalog.index_segments.push(entry.clone());
2829        if let Some(wal) = self.manifest_wal.as_mut() {
2830            wal.append_segments(&[entry])?;
2831        }
2832        Ok(())
2833    }
2834
2835    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2836        self.ensure_writable()?;
2837        if self.toc.ticket_ref.issuer == "free-tier" {
2838            return Ok(());
2839        }
2840        match self.tier() {
2841            Tier::Free => Ok(()),
2842            tier => {
2843                if self.toc.ticket_ref.issuer.trim().is_empty() {
2844                    Err(MemvidError::TicketRequired { tier })
2845                } else {
2846                    Ok(())
2847                }
2848            }
2849        }
2850    }
2851
2852    pub(crate) fn tier(&self) -> Tier {
2853        if self.header.wal_size >= WAL_SIZE_LARGE {
2854            Tier::Enterprise
2855        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2856            Tier::Dev
2857        } else {
2858            Tier::Free
2859        }
2860    }
2861
2862    pub(crate) fn capacity_limit(&self) -> u64 {
2863        if self.toc.ticket_ref.capacity_bytes != 0 {
2864            self.toc.ticket_ref.capacity_bytes
2865        } else {
2866            self.tier().capacity_bytes()
2867        }
2868    }
2869
2870    /// Get current storage capacity in bytes.
2871    ///
2872    /// Returns the capacity from the applied ticket, or the default
2873    /// tier capacity (1 GB for free tier).
2874    #[must_use]
2875    pub fn get_capacity(&self) -> u64 {
2876        self.capacity_limit()
2877    }
2878
2879    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2880        tracing::info!(
2881            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2882            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2883            time_segments = self.toc.segment_catalog.time_segments.len(),
2884            footer_offset = self.header.footer_offset,
2885            data_end = self.data_end,
2886            "rewrite_toc_footer: about to serialize TOC"
2887        );
2888        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2889        let footer_offset = self.header.footer_offset;
2890        self.file.seek(SeekFrom::Start(footer_offset))?;
2891        self.file.write_all(&toc_bytes)?;
2892        let footer = CommitFooter {
2893            toc_len: toc_bytes.len() as u64,
2894            toc_hash: *hash(&toc_bytes).as_bytes(),
2895            generation: self.generation,
2896        };
2897        let encoded_footer = footer.encode();
2898        self.file.write_all(&encoded_footer)?;
2899
2900        // The file must always be at least header + WAL size
2901        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2902        let min_len = self.header.wal_offset + self.header.wal_size;
2903        let final_len = new_len.max(min_len);
2904
2905        if new_len < min_len {
2906            tracing::warn!(
2907                file.new_len = new_len,
2908                file.min_len = min_len,
2909                file.final_len = final_len,
2910                "truncation would cut into WAL region, clamping to min_len"
2911            );
2912        }
2913
2914        self.file.set_len(final_len)?;
2915        // Ensure footer is flushed to disk so mmap-based readers can find it
2916        self.file.sync_all()?;
2917        Ok(())
2918    }
2919}
2920
2921#[cfg(feature = "parallel_segments")]
2922impl Memvid {
2923    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2924        let chunks = self.collect_segment_chunks(delta)?;
2925        if chunks.is_empty() {
2926            return Ok(false);
2927        }
2928        let planner = SegmentPlanner::new(opts.clone());
2929        let plans = planner.plan_from_chunks(chunks);
2930        if plans.is_empty() {
2931            return Ok(false);
2932        }
2933        let worker_pool = SegmentWorkerPool::new(opts);
2934        let results = worker_pool.execute(plans)?;
2935        if results.is_empty() {
2936            return Ok(false);
2937        }
2938        self.append_parallel_segments(results)?;
2939        Ok(true)
2940    }
2941
2942    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2943        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2944            delta.inserted_embeddings.iter().cloned().collect();
2945        tracing::info!(
2946            inserted_frames = ?delta.inserted_frames,
2947            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2948            "collect_segment_chunks: comparing frame IDs"
2949        );
2950        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2951        for frame_id in &delta.inserted_frames {
2952            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2953                MemvidError::InvalidFrame {
2954                    frame_id: *frame_id,
2955                    reason: "frame id out of range while planning segments",
2956                },
2957            )?;
2958            let text = self.frame_content(&frame)?;
2959            if text.trim().is_empty() {
2960                continue;
2961            }
2962            let token_estimate = estimate_tokens(&text);
2963            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2964            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2965            let page_start = if frame.chunk_index.is_some() {
2966                chunk_index + 1
2967            } else {
2968                0
2969            };
2970            let page_end = if frame.chunk_index.is_some() {
2971                page_start
2972            } else {
2973                0
2974            };
2975            chunks.push(SegmentChunkPlan {
2976                text,
2977                frame_id: *frame_id,
2978                timestamp: frame.timestamp,
2979                chunk_index,
2980                chunk_count: chunk_count.max(1),
2981                token_estimate,
2982                token_start: 0,
2983                token_end: 0,
2984                page_start,
2985                page_end,
2986                embedding: embedding_map.remove(frame_id),
2987            });
2988        }
2989        Ok(chunks)
2990    }
2991}
2992
2993#[cfg(feature = "parallel_segments")]
2994fn estimate_tokens(text: &str) -> usize {
2995    text.split_whitespace().count().max(1)
2996}
2997
2998impl Memvid {
2999    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
3000        let catalog_end = self.catalog_data_end();
3001        if catalog_end <= self.header.footer_offset {
3002            return Ok(false);
3003        }
3004        self.header.footer_offset = catalog_end;
3005        self.rewrite_toc_footer()?;
3006        self.header.toc_checksum = self.toc.toc_checksum;
3007        crate::persist_header(&mut self.file, &self.header)?;
3008        Ok(true)
3009    }
3010}
3011
3012impl Memvid {
3013    pub fn vacuum(&mut self) -> Result<()> {
3014        self.commit()?;
3015
3016        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3017        let frames: Vec<Frame> = self
3018            .toc
3019            .frames
3020            .iter()
3021            .filter(|frame| frame.status == FrameStatus::Active)
3022            .cloned()
3023            .collect();
3024        for frame in frames {
3025            let bytes = self.read_frame_payload_bytes(&frame)?;
3026            active_payloads.insert(frame.id, bytes);
3027        }
3028
3029        let mut cursor = self.header.wal_offset + self.header.wal_size;
3030        self.file.seek(SeekFrom::Start(cursor))?;
3031        for frame in &mut self.toc.frames {
3032            if frame.status == FrameStatus::Active {
3033                if let Some(bytes) = active_payloads.get(&frame.id) {
3034                    self.file.write_all(bytes)?;
3035                    frame.payload_offset = cursor;
3036                    frame.payload_length = bytes.len() as u64;
3037                    cursor += bytes.len() as u64;
3038                } else {
3039                    frame.payload_offset = 0;
3040                    frame.payload_length = 0;
3041                }
3042            } else {
3043                frame.payload_offset = 0;
3044                frame.payload_length = 0;
3045            }
3046        }
3047
3048        self.data_end = cursor;
3049
3050        self.toc.segments.clear();
3051        self.toc.indexes.lex_segments.clear();
3052        self.toc.segment_catalog.lex_segments.clear();
3053        self.toc.segment_catalog.vec_segments.clear();
3054        self.toc.segment_catalog.time_segments.clear();
3055        #[cfg(feature = "temporal_track")]
3056        {
3057            self.toc.temporal_track = None;
3058            self.toc.segment_catalog.temporal_segments.clear();
3059        }
3060        #[cfg(feature = "lex")]
3061        {
3062            self.toc.segment_catalog.tantivy_segments.clear();
3063        }
3064        #[cfg(feature = "parallel_segments")]
3065        {
3066            self.toc.segment_catalog.index_segments.clear();
3067        }
3068
3069        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
3070        #[cfg(feature = "lex")]
3071        {
3072            self.tantivy = None;
3073            self.tantivy_dirty = false;
3074        }
3075
3076        self.rebuild_indexes(&[], &[])?;
3077        self.file.sync_all()?;
3078        Ok(())
3079    }
3080
3081    /// Preview how a document would be chunked without actually ingesting it.
3082    ///
3083    /// This is useful when you need to compute embeddings for each chunk externally
3084    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
3085    /// is too small to be chunked (< 2400 chars after normalization).
3086    ///
3087    /// # Example
3088    /// ```ignore
3089    /// let chunks = mem.preview_chunks(b"long document text...")?;
3090    /// if let Some(chunk_texts) = chunks {
3091    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
3092    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
3093    /// } else {
3094    ///     let embedding = my_embedder.embed_query(text)?;
3095    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
3096    /// }
3097    /// ```
3098    #[must_use]
3099    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3100        plan_document_chunks(payload).map(|plan| plan.chunks)
3101    }
3102
3103    /// Append raw bytes as a document frame.
3104    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3105        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3106    }
3107
3108    /// Append raw bytes with explicit metadata/options.
3109    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3110        self.put_internal(Some(payload), None, None, None, options, None)
3111    }
3112
3113    /// Append bytes and an existing embedding (bypasses on-device embedding).
3114    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3115        self.put_internal(
3116            Some(payload),
3117            None,
3118            Some(embedding),
3119            None,
3120            PutOptions::default(),
3121            None,
3122        )
3123    }
3124
3125    pub fn put_with_embedding_and_options(
3126        &mut self,
3127        payload: &[u8],
3128        embedding: Vec<f32>,
3129        options: PutOptions,
3130    ) -> Result<u64> {
3131        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3132    }
3133
3134    /// Ingest a document with pre-computed embeddings for both parent and chunks.
3135    ///
3136    /// This is the recommended API for high-accuracy semantic search when chunking
3137    /// occurs. The caller provides:
3138    /// - `payload`: The document bytes
3139    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
3140    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
3141    /// - `options`: Standard put options
3142    ///
3143    /// The number of chunk embeddings should match the number of chunks that will be
3144    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
3145    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
3146    pub fn put_with_chunk_embeddings(
3147        &mut self,
3148        payload: &[u8],
3149        parent_embedding: Option<Vec<f32>>,
3150        chunk_embeddings: Vec<Vec<f32>>,
3151        options: PutOptions,
3152    ) -> Result<u64> {
3153        self.put_internal(
3154            Some(payload),
3155            None,
3156            parent_embedding,
3157            Some(chunk_embeddings),
3158            options,
3159            None,
3160        )
3161    }
3162
3163    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
3164    pub fn update_frame(
3165        &mut self,
3166        frame_id: FrameId,
3167        payload: Option<Vec<u8>>,
3168        mut options: PutOptions,
3169        embedding: Option<Vec<f32>>,
3170    ) -> Result<u64> {
3171        self.ensure_mutation_allowed()?;
3172        let existing = self.frame_by_id(frame_id)?;
3173        if existing.status != FrameStatus::Active {
3174            return Err(MemvidError::InvalidFrame {
3175                frame_id,
3176                reason: "frame is not active",
3177            });
3178        }
3179
3180        if options.timestamp.is_none() {
3181            options.timestamp = Some(existing.timestamp);
3182        }
3183        if options.track.is_none() {
3184            options.track = existing.track.clone();
3185        }
3186        if options.kind.is_none() {
3187            options.kind = existing.kind.clone();
3188        }
3189        if options.uri.is_none() {
3190            options.uri = existing.uri.clone();
3191        }
3192        if options.title.is_none() {
3193            options.title = existing.title.clone();
3194        }
3195        if options.metadata.is_none() {
3196            options.metadata = existing.metadata.clone();
3197        }
3198        if options.search_text.is_none() {
3199            options.search_text = existing.search_text.clone();
3200        }
3201        if options.tags.is_empty() {
3202            options.tags = existing.tags.clone();
3203        }
3204        if options.labels.is_empty() {
3205            options.labels = existing.labels.clone();
3206        }
3207        if options.extra_metadata.is_empty() {
3208            options.extra_metadata = existing.extra_metadata.clone();
3209        }
3210
3211        let reuse_frame = if payload.is_none() {
3212            options.auto_tag = false;
3213            options.extract_dates = false;
3214            Some(existing.clone())
3215        } else {
3216            None
3217        };
3218
3219        let effective_embedding = if let Some(explicit) = embedding {
3220            Some(explicit)
3221        } else if self.vec_enabled {
3222            self.frame_embedding(frame_id)?
3223        } else {
3224            None
3225        };
3226
3227        let payload_slice = payload.as_deref();
3228        let reuse_flag = reuse_frame.is_some();
3229        let replace_flag = payload_slice.is_some();
3230        let seq = self.put_internal(
3231            payload_slice,
3232            reuse_frame,
3233            effective_embedding,
3234            None, // No chunk embeddings for update
3235            options,
3236            Some(frame_id),
3237        )?;
3238        info!(
3239            "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3240        );
3241        Ok(seq)
3242    }
3243
3244    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3245        self.ensure_mutation_allowed()?;
3246        let frame = self.frame_by_id(frame_id)?;
3247        if frame.status != FrameStatus::Active {
3248            return Err(MemvidError::InvalidFrame {
3249                frame_id,
3250                reason: "frame is not active",
3251            });
3252        }
3253
3254        let mut tombstone = WalEntryData {
3255            timestamp: SystemTime::now()
3256                .duration_since(UNIX_EPOCH)
3257                .map(|d| d.as_secs() as i64)
3258                .unwrap_or(frame.timestamp),
3259            kind: None,
3260            track: None,
3261            payload: Vec::new(),
3262            embedding: None,
3263            uri: frame.uri.clone(),
3264            title: frame.title.clone(),
3265            canonical_encoding: frame.canonical_encoding,
3266            canonical_length: frame.canonical_length,
3267            metadata: None,
3268            search_text: None,
3269            tags: Vec::new(),
3270            labels: Vec::new(),
3271            extra_metadata: BTreeMap::new(),
3272            content_dates: Vec::new(),
3273            chunk_manifest: None,
3274            role: frame.role,
3275            parent_sequence: None,
3276            chunk_index: frame.chunk_index,
3277            chunk_count: frame.chunk_count,
3278            op: FrameWalOp::Tombstone,
3279            target_frame_id: Some(frame_id),
3280            supersedes_frame_id: None,
3281            reuse_payload_from: None,
3282            source_sha256: None,
3283            source_path: None,
3284            enrichment_state: crate::types::EnrichmentState::default(),
3285        };
3286        tombstone.kind = frame.kind.clone();
3287        tombstone.track = frame.track.clone();
3288
3289        let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3290        let seq = self.append_wal_entry(&payload_bytes)?;
3291        self.dirty = true;
3292        let suppress_checkpoint = self
3293            .batch_opts
3294            .as_ref()
3295            .is_some_and(|o| o.disable_auto_checkpoint);
3296        if !suppress_checkpoint && self.wal.should_checkpoint() {
3297            self.commit()?;
3298        }
3299        info!("frame_delete frame_id={frame_id} seq={seq}");
3300        Ok(seq)
3301    }
3302}
3303
3304impl Memvid {
3305    fn put_internal(
3306        &mut self,
3307        payload: Option<&[u8]>,
3308        reuse_frame: Option<Frame>,
3309        embedding: Option<Vec<f32>>,
3310        chunk_embeddings: Option<Vec<Vec<f32>>>,
3311        mut options: PutOptions,
3312        supersedes: Option<FrameId>,
3313    ) -> Result<u64> {
3314        self.ensure_mutation_allowed()?;
3315
3316        // Deduplication: if enabled and we have payload, check if identical content exists
3317        if options.dedup {
3318            if let Some(bytes) = payload {
3319                let content_hash = hash(bytes);
3320                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3321                    // Found existing frame with same content hash, skip ingestion
3322                    tracing::debug!(
3323                        frame_id = existing_frame.id,
3324                        "dedup: skipping ingestion, identical content already exists"
3325                    );
3326                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3327                    return Ok(existing_frame.id);
3328                }
3329            }
3330        }
3331
3332        if payload.is_some() && reuse_frame.is_some() {
3333            let frame_id = reuse_frame
3334                .as_ref()
3335                .map(|frame| frame.id)
3336                .unwrap_or_default();
3337            return Err(MemvidError::InvalidFrame {
3338                frame_id,
3339                reason: "cannot reuse payload when bytes are provided",
3340            });
3341        }
3342
3343        // If the caller supplies embeddings, enforce a single vector dimension contract
3344        // for the entire memory (fail fast, never silently accept mixed dimensions).
3345        let incoming_dimension = {
3346            let mut dim: Option<u32> = None;
3347
3348            if let Some(ref vector) = embedding {
3349                if !vector.is_empty() {
3350                    #[allow(clippy::cast_possible_truncation)]
3351                    let len = vector.len() as u32;
3352                    dim = Some(len);
3353                }
3354            }
3355
3356            if let Some(ref vectors) = chunk_embeddings {
3357                for vector in vectors {
3358                    if vector.is_empty() {
3359                        continue;
3360                    }
3361                    let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3362                    match dim {
3363                        None => dim = Some(vec_dim),
3364                        Some(existing) if existing == vec_dim => {}
3365                        Some(existing) => {
3366                            return Err(MemvidError::VecDimensionMismatch {
3367                                expected: existing,
3368                                actual: vector.len(),
3369                            });
3370                        }
3371                    }
3372                }
3373            }
3374
3375            dim
3376        };
3377
3378        if let Some(incoming_dimension) = incoming_dimension {
3379            // Embeddings imply vector search should be enabled.
3380            if !self.vec_enabled {
3381                self.enable_vec()?;
3382            }
3383
3384            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3385                if existing_dimension != incoming_dimension {
3386                    return Err(MemvidError::VecDimensionMismatch {
3387                        expected: existing_dimension,
3388                        actual: incoming_dimension as usize,
3389                    });
3390                }
3391            }
3392
3393            // Persist the dimension early for better auto-detection (even before the next commit).
3394            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3395                if manifest.dimension == 0 {
3396                    manifest.dimension = incoming_dimension;
3397                }
3398            }
3399        }
3400
3401        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3402        let payload_tail = self.payload_region_end();
3403        let projected = if let Some(bytes) = payload {
3404            let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3405                prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3406            } else {
3407                prepare_canonical_payload(bytes)?
3408            };
3409            let len = prepared.len();
3410            prepared_payload = Some((prepared, encoding, length));
3411            payload_tail.saturating_add(len as u64)
3412        } else if reuse_frame.is_some() {
3413            payload_tail
3414        } else {
3415            return Err(MemvidError::InvalidFrame {
3416                frame_id: 0,
3417                reason: "payload required for frame insertion",
3418            });
3419        };
3420
3421        let capacity_limit = self.capacity_limit();
3422        if projected > capacity_limit {
3423            let incoming_size = projected.saturating_sub(payload_tail);
3424            return Err(MemvidError::CapacityExceeded {
3425                current: payload_tail,
3426                limit: capacity_limit,
3427                required: incoming_size,
3428            });
3429        }
3430        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3431            SystemTime::now()
3432                .duration_since(UNIX_EPOCH)
3433                .map(|d| d.as_secs() as i64)
3434                .unwrap_or(0)
3435        });
3436
3437        #[allow(unused_assignments)]
3438        let mut reuse_bytes: Option<Vec<u8>> = None;
3439        let payload_for_processing = if let Some(bytes) = payload {
3440            Some(bytes)
3441        } else if let Some(frame) = reuse_frame.as_ref() {
3442            let bytes = self.frame_canonical_bytes(frame)?;
3443            reuse_bytes = Some(bytes);
3444            reuse_bytes.as_deref()
3445        } else {
3446            None
3447        };
3448
3449        // Try to create a chunk plan from raw UTF-8 bytes first
3450        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3451            (Some(bytes), None) => plan_document_chunks(bytes),
3452            _ => None,
3453        };
3454
3455        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3456        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3457        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3458        let mut source_sha256: Option<[u8; 32]> = None;
3459        let source_path_value = options.source_path.take();
3460
3461        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3462            if raw_chunk_plan.is_some() {
3463                // UTF-8 text document - chunks contain the text, no parent payload needed
3464                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3465            } else if options.no_raw {
3466                // --no-raw mode: don't store the raw binary, only compute hash
3467                if let Some(bytes) = payload {
3468                    // Compute BLAKE3 hash of original binary for verification
3469                    let hash_result = hash(bytes);
3470                    source_sha256 = Some(*hash_result.as_bytes());
3471                    // Store empty payload - the extracted text is in search_text
3472                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3473                } else {
3474                    return Err(MemvidError::InvalidFrame {
3475                        frame_id: 0,
3476                        reason: "payload required for --no-raw mode",
3477                    });
3478                }
3479            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3480                (prepared, encoding, length, None)
3481            } else if let Some(bytes) = payload {
3482                let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3483                    prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3484                } else {
3485                    prepare_canonical_payload(bytes)?
3486                };
3487                (prepared, encoding, length, None)
3488            } else if let Some(frame) = reuse_frame.as_ref() {
3489                (
3490                    Vec::new(),
3491                    frame.canonical_encoding,
3492                    frame.canonical_length,
3493                    Some(frame.id),
3494                )
3495            } else {
3496                return Err(MemvidError::InvalidFrame {
3497                    frame_id: 0,
3498                    reason: "payload required for frame insertion",
3499                });
3500            };
3501
3502        // Track whether we'll create an extracted text chunk plan later
3503        let mut chunk_plan = raw_chunk_plan;
3504
3505        let mut metadata = options.metadata.take();
3506        let mut search_text = options
3507            .search_text
3508            .take()
3509            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3510        let mut tags = std::mem::take(&mut options.tags);
3511        let mut labels = std::mem::take(&mut options.labels);
3512        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3513        let mut content_dates: Vec<String> = Vec::new();
3514
3515        let need_search_text = search_text
3516            .as_ref()
3517            .is_none_or(|text| text.trim().is_empty());
3518        let need_metadata = metadata.is_none();
3519        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3520
3521        let mut extraction_error = None;
3522        let mut is_skim_extraction = false; // Track if extraction was time-limited
3523
3524        let extracted = if run_extractor {
3525            if let Some(bytes) = payload_for_processing {
3526                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3527                let uri_hint = options.uri.as_deref();
3528
3529                // Use time-budgeted extraction for instant indexing with a budget
3530                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3531
3532                if use_budgeted {
3533                    // Time-budgeted extraction for sub-second ingestion
3534                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3535                        options.extraction_budget_ms,
3536                    );
3537                    match crate::extract_budgeted::extract_with_budget(
3538                        bytes, mime_hint, uri_hint, budget,
3539                    ) {
3540                        Ok(result) => {
3541                            is_skim_extraction = result.is_skim();
3542                            if is_skim_extraction {
3543                                tracing::debug!(
3544                                    coverage = result.coverage,
3545                                    elapsed_ms = result.elapsed_ms,
3546                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3547                                    "time-budgeted extraction (skim)"
3548                                );
3549                            }
3550                            // Convert BudgetedExtractionResult to ExtractedDocument
3551                            let doc = crate::extract::ExtractedDocument {
3552                                text: if result.text.is_empty() {
3553                                    None
3554                                } else {
3555                                    Some(result.text)
3556                                },
3557                                metadata: serde_json::json!({
3558                                    "skim": is_skim_extraction,
3559                                    "coverage": result.coverage,
3560                                    "sections_extracted": result.sections_extracted,
3561                                    "sections_total": result.sections_total,
3562                                }),
3563                                mime_type: mime_hint.map(std::string::ToString::to_string),
3564                            };
3565                            Some(doc)
3566                        }
3567                        Err(err) => {
3568                            // Fall back to full extraction on budgeted extraction error
3569                            tracing::warn!(
3570                                ?err,
3571                                "budgeted extraction failed, trying full extraction"
3572                            );
3573                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3574                                Ok(doc) => Some(doc),
3575                                Err(err) => {
3576                                    extraction_error = Some(err);
3577                                    None
3578                                }
3579                            }
3580                        }
3581                    }
3582                } else {
3583                    // Full extraction (no time budget)
3584                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3585                        Ok(doc) => Some(doc),
3586                        Err(err) => {
3587                            extraction_error = Some(err);
3588                            None
3589                        }
3590                    }
3591                }
3592            } else {
3593                None
3594            }
3595        } else {
3596            None
3597        };
3598
3599        if let Some(err) = extraction_error {
3600            return Err(err);
3601        }
3602
3603        if let Some(doc) = &extracted {
3604            if need_search_text {
3605                if let Some(text) = &doc.text {
3606                    if let Some(normalized) =
3607                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3608                    {
3609                        search_text = Some(normalized);
3610                    }
3611                }
3612            }
3613
3614            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3615            // from extracted text. This ensures large documents like PDFs get fully indexed.
3616            if chunk_plan.is_none() {
3617                if let Some(text) = &doc.text {
3618                    chunk_plan = plan_text_chunks(text);
3619                }
3620            }
3621
3622            if let Some(mime) = doc.mime_type.as_ref() {
3623                if let Some(existing) = &mut metadata {
3624                    if existing.mime.is_none() {
3625                        existing.mime = Some(mime.clone());
3626                    }
3627                } else {
3628                    let mut doc_meta = DocMetadata::default();
3629                    doc_meta.mime = Some(mime.clone());
3630                    metadata = Some(doc_meta);
3631                }
3632            }
3633
3634            // Only add extractous_metadata when auto_tag is enabled
3635            // This allows callers to disable metadata pollution by setting auto_tag=false
3636            if options.auto_tag {
3637                if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3638                {
3639                    extra_metadata
3640                        .entry("extractous_metadata".to_string())
3641                        .or_insert(meta_json);
3642                }
3643            }
3644        }
3645
3646        if options.auto_tag {
3647            if let Some(ref text) = search_text {
3648                if !text.trim().is_empty() {
3649                    let result = AutoTagger.analyse(text, options.extract_dates);
3650                    merge_unique(&mut tags, result.tags);
3651                    merge_unique(&mut labels, result.labels);
3652                    if options.extract_dates && content_dates.is_empty() {
3653                        content_dates = result.content_dates;
3654                    }
3655                }
3656            }
3657        }
3658
3659        if content_dates.is_empty() {
3660            if let Some(frame) = reuse_frame.as_ref() {
3661                content_dates = frame.content_dates.clone();
3662            }
3663        }
3664
3665        let metadata_ref = metadata.as_ref();
3666        let mut search_text = augment_search_text(
3667            search_text,
3668            options.uri.as_deref(),
3669            options.title.as_deref(),
3670            options.track.as_deref(),
3671            &tags,
3672            &labels,
3673            &extra_metadata,
3674            &content_dates,
3675            metadata_ref,
3676        );
3677        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3678        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3679        let mut parent_chunk_count: Option<u32> = None;
3680
3681        let kind_value = options.kind.take();
3682        let track_value = options.track.take();
3683        let uri_value = options.uri.take();
3684        let title_value = options.title.take();
3685        let should_extract_triplets = options.extract_triplets;
3686        // Save references for triplet extraction (after search_text is moved into WAL entry)
3687        let triplet_uri = uri_value.clone();
3688        let triplet_title = title_value.clone();
3689
3690        if let Some(plan) = chunk_plan.as_ref() {
3691            let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3692            parent_chunk_manifest = Some(plan.manifest.clone());
3693            parent_chunk_count = Some(chunk_total);
3694
3695            if let Some(first_chunk) = plan.chunks.first() {
3696                if let Some(normalized) =
3697                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3698                {
3699                    if !normalized.trim().is_empty() {
3700                        search_text = Some(normalized);
3701                    }
3702                }
3703            }
3704
3705            let chunk_tags = tags.clone();
3706            let chunk_labels = labels.clone();
3707            let chunk_metadata = metadata.clone();
3708            let chunk_extra_metadata = extra_metadata.clone();
3709            let chunk_content_dates = content_dates.clone();
3710
3711            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3712                let (chunk_payload, chunk_encoding, chunk_length) =
3713                    prepare_canonical_payload(chunk_text.as_bytes())?;
3714                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3715                    .map(|n| n.text)
3716                    .filter(|text| !text.trim().is_empty());
3717
3718                let chunk_uri = uri_value
3719                    .as_ref()
3720                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3721                let chunk_title = title_value
3722                    .as_ref()
3723                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3724
3725                // Use provided chunk embedding if available, otherwise None
3726                let chunk_embedding = chunk_embeddings
3727                    .as_ref()
3728                    .and_then(|embeddings| embeddings.get(idx).cloned());
3729
3730                chunk_entries.push(WalEntryData {
3731                    timestamp,
3732                    kind: kind_value.clone(),
3733                    track: track_value.clone(),
3734                    payload: chunk_payload,
3735                    embedding: chunk_embedding,
3736                    uri: chunk_uri,
3737                    title: chunk_title,
3738                    canonical_encoding: chunk_encoding,
3739                    canonical_length: chunk_length,
3740                    metadata: chunk_metadata.clone(),
3741                    search_text: chunk_search_text,
3742                    tags: chunk_tags.clone(),
3743                    labels: chunk_labels.clone(),
3744                    extra_metadata: chunk_extra_metadata.clone(),
3745                    content_dates: chunk_content_dates.clone(),
3746                    chunk_manifest: None,
3747                    role: FrameRole::DocumentChunk,
3748                    parent_sequence: None,
3749                    chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3750                    chunk_count: Some(chunk_total),
3751                    op: FrameWalOp::Insert,
3752                    target_frame_id: None,
3753                    supersedes_frame_id: None,
3754                    reuse_payload_from: None,
3755                    source_sha256: None, // Chunks don't have source references
3756                    source_path: None,
3757                    // Chunks are already extracted, so mark as Enriched
3758                    enrichment_state: crate::types::EnrichmentState::Enriched,
3759                });
3760            }
3761        }
3762
3763        let parent_uri = uri_value.clone();
3764        let parent_title = title_value.clone();
3765
3766        // Get parent_sequence from options.parent_id if provided
3767        // We need the WAL sequence of the parent frame to link them
3768        let parent_sequence = if let Some(parent_id) = options.parent_id {
3769            // Look up the parent frame to get its WAL sequence
3770            // Since frame.id corresponds to the array index, we need to find the sequence
3771            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3772            // This works because sequence numbers are assigned incrementally
3773            usize::try_from(parent_id)
3774                .ok()
3775                .and_then(|idx| self.toc.frames.get(idx))
3776                .map(|_| parent_id + 2) // WAL sequences start at 2
3777        } else {
3778            None
3779        };
3780
3781        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3782        let triplet_text = search_text.clone();
3783
3784        // Capture values needed for instant indexing BEFORE they're moved into entry
3785        #[cfg(feature = "lex")]
3786        let instant_index_tags = if options.instant_index {
3787            tags.clone()
3788        } else {
3789            Vec::new()
3790        };
3791        #[cfg(feature = "lex")]
3792        let instant_index_labels = if options.instant_index {
3793            labels.clone()
3794        } else {
3795            Vec::new()
3796        };
3797
3798        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3799        #[cfg(feature = "lex")]
3800        let needs_enrichment =
3801            options.instant_index && (options.enable_embedding || is_skim_extraction);
3802        #[cfg(feature = "lex")]
3803        let enrichment_state = if needs_enrichment {
3804            crate::types::EnrichmentState::Searchable
3805        } else {
3806            crate::types::EnrichmentState::Enriched
3807        };
3808        #[cfg(not(feature = "lex"))]
3809        let enrichment_state = crate::types::EnrichmentState::Enriched;
3810
3811        let entry = WalEntryData {
3812            timestamp,
3813            kind: kind_value,
3814            track: track_value,
3815            payload: storage_payload,
3816            embedding,
3817            uri: parent_uri,
3818            title: parent_title,
3819            canonical_encoding,
3820            canonical_length,
3821            metadata,
3822            search_text,
3823            tags,
3824            labels,
3825            extra_metadata,
3826            content_dates,
3827            chunk_manifest: parent_chunk_manifest,
3828            role: options.role,
3829            parent_sequence,
3830            chunk_index: None,
3831            chunk_count: parent_chunk_count,
3832            op: FrameWalOp::Insert,
3833            target_frame_id: None,
3834            supersedes_frame_id: supersedes,
3835            reuse_payload_from,
3836            source_sha256,
3837            source_path: source_path_value,
3838            enrichment_state,
3839        };
3840
3841        let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3842        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3843        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3844
3845        // Instant indexing: make frame searchable immediately (<1s) without full commit
3846        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3847        #[cfg(feature = "lex")]
3848        if options.instant_index && self.tantivy.is_some() {
3849            // Create a minimal frame for indexing
3850            let frame_id = parent_seq as FrameId;
3851
3852            // Use triplet_text which was cloned before entry was created
3853            if let Some(ref text) = triplet_text {
3854                if !text.trim().is_empty() {
3855                    // Create temporary frame for indexing (minimal fields for Tantivy)
3856                    let temp_frame = Frame {
3857                        id: frame_id,
3858                        timestamp,
3859                        anchor_ts: None,
3860                        anchor_source: None,
3861                        kind: options.kind.clone(),
3862                        track: options.track.clone(),
3863                        payload_offset: 0,
3864                        payload_length: 0,
3865                        checksum: [0u8; 32],
3866                        uri: options
3867                            .uri
3868                            .clone()
3869                            .or_else(|| Some(crate::default_uri(frame_id))),
3870                        title: options.title.clone(),
3871                        canonical_encoding: crate::types::CanonicalEncoding::default(),
3872                        canonical_length: None,
3873                        metadata: None, // Not needed for text search
3874                        search_text: triplet_text.clone(),
3875                        tags: instant_index_tags.clone(),
3876                        labels: instant_index_labels.clone(),
3877                        extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3878                        content_dates: Vec::new(),                         // Not needed for search
3879                        chunk_manifest: None,
3880                        role: options.role,
3881                        parent_id: None,
3882                        chunk_index: None,
3883                        chunk_count: None,
3884                        status: FrameStatus::Active,
3885                        supersedes,
3886                        superseded_by: None,
3887                        source_sha256: None, // Not needed for search
3888                        source_path: None,   // Not needed for search
3889                        enrichment_state: crate::types::EnrichmentState::Searchable,
3890                    };
3891
3892                    // Get mutable reference to engine and index the frame
3893                    if let Some(engine) = self.tantivy.as_mut() {
3894                        engine.add_frame(&temp_frame, text)?;
3895                        engine.soft_commit()?;
3896                        self.tantivy_dirty = true;
3897
3898                        tracing::debug!(
3899                            frame_id = frame_id,
3900                            "instant index: frame searchable immediately"
3901                        );
3902                    }
3903                }
3904            }
3905        }
3906
3907        // Queue frame for background enrichment when using instant index path
3908        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3909        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3910        #[cfg(feature = "lex")]
3911        if needs_enrichment {
3912            let frame_id = parent_seq as FrameId;
3913            self.toc.enrichment_queue.push(frame_id);
3914            tracing::debug!(
3915                frame_id = frame_id,
3916                is_skim = is_skim_extraction,
3917                needs_embedding = options.enable_embedding,
3918                "queued frame for background enrichment"
3919            );
3920        }
3921
3922        for mut chunk_entry in chunk_entries {
3923            chunk_entry.parent_sequence = Some(parent_seq);
3924            let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3925            self.append_wal_entry(&chunk_bytes)?;
3926            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3927        }
3928
3929        self.dirty = true;
3930        let suppress_checkpoint = self
3931            .batch_opts
3932            .as_ref()
3933            .is_some_and(|o| o.disable_auto_checkpoint);
3934        if !suppress_checkpoint && self.wal.should_checkpoint() {
3935            self.commit()?;
3936        }
3937
3938        // Record the put action if a replay session is active
3939        #[cfg(feature = "replay")]
3940        if let Some(input_bytes) = payload {
3941            self.record_put_action(parent_seq, input_bytes);
3942        }
3943
3944        // Extract triplets if enabled (default: true)
3945        // Triplets are stored as MemoryCards with entity/slot/value structure
3946        if should_extract_triplets {
3947            if let Some(ref text) = triplet_text {
3948                if !text.trim().is_empty() {
3949                    let extractor = TripletExtractor::default();
3950                    let frame_id = parent_seq as FrameId;
3951                    let (cards, _stats) = extractor.extract(
3952                        frame_id,
3953                        text,
3954                        triplet_uri.as_deref(),
3955                        triplet_title.as_deref(),
3956                        timestamp,
3957                    );
3958
3959                    if !cards.is_empty() {
3960                        // Add cards to memories track
3961                        let card_ids = self.memories_track.add_cards(cards);
3962
3963                        // Record enrichment for incremental processing
3964                        self.memories_track
3965                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3966                    }
3967                }
3968            }
3969        }
3970
3971        Ok(parent_seq)
3972    }
3973}
3974
3975#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3976#[serde(rename_all = "snake_case")]
3977pub(crate) enum FrameWalOp {
3978    Insert,
3979    Tombstone,
3980}
3981
3982impl Default for FrameWalOp {
3983    fn default() -> Self {
3984        Self::Insert
3985    }
3986}
3987
3988#[derive(Debug, Serialize, Deserialize)]
3989enum WalEntry {
3990    Frame(WalEntryData),
3991    #[cfg(feature = "lex")]
3992    Lex(LexWalBatch),
3993}
3994
3995fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3996    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3997        return Ok(entry);
3998    }
3999    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
4000    Ok(WalEntry::Frame(legacy))
4001}
4002
4003#[derive(Debug, Serialize, Deserialize)]
4004pub(crate) struct WalEntryData {
4005    pub(crate) timestamp: i64,
4006    pub(crate) kind: Option<String>,
4007    pub(crate) track: Option<String>,
4008    pub(crate) payload: Vec<u8>,
4009    pub(crate) embedding: Option<Vec<f32>>,
4010    #[serde(default)]
4011    pub(crate) uri: Option<String>,
4012    #[serde(default)]
4013    pub(crate) title: Option<String>,
4014    #[serde(default)]
4015    pub(crate) canonical_encoding: CanonicalEncoding,
4016    #[serde(default)]
4017    pub(crate) canonical_length: Option<u64>,
4018    #[serde(default)]
4019    pub(crate) metadata: Option<DocMetadata>,
4020    #[serde(default)]
4021    pub(crate) search_text: Option<String>,
4022    #[serde(default)]
4023    pub(crate) tags: Vec<String>,
4024    #[serde(default)]
4025    pub(crate) labels: Vec<String>,
4026    #[serde(default)]
4027    pub(crate) extra_metadata: BTreeMap<String, String>,
4028    #[serde(default)]
4029    pub(crate) content_dates: Vec<String>,
4030    #[serde(default)]
4031    pub(crate) chunk_manifest: Option<TextChunkManifest>,
4032    #[serde(default)]
4033    pub(crate) role: FrameRole,
4034    #[serde(default)]
4035    pub(crate) parent_sequence: Option<u64>,
4036    #[serde(default)]
4037    pub(crate) chunk_index: Option<u32>,
4038    #[serde(default)]
4039    pub(crate) chunk_count: Option<u32>,
4040    #[serde(default)]
4041    pub(crate) op: FrameWalOp,
4042    #[serde(default)]
4043    pub(crate) target_frame_id: Option<FrameId>,
4044    #[serde(default)]
4045    pub(crate) supersedes_frame_id: Option<FrameId>,
4046    #[serde(default)]
4047    pub(crate) reuse_payload_from: Option<FrameId>,
4048    /// SHA-256 hash of original source file (set when --no-raw is used).
4049    #[serde(default)]
4050    pub(crate) source_sha256: Option<[u8; 32]>,
4051    /// Original source file path (set when --no-raw is used).
4052    #[serde(default)]
4053    pub(crate) source_path: Option<String>,
4054    /// Enrichment state for progressive ingestion.
4055    #[serde(default)]
4056    pub(crate) enrichment_state: crate::types::EnrichmentState,
4057}
4058
4059pub(crate) fn prepare_canonical_payload(
4060    payload: &[u8],
4061) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4062    prepare_canonical_payload_with_level(payload, 3)
4063}
4064
4065pub(crate) fn prepare_canonical_payload_with_level(
4066    payload: &[u8],
4067    level: i32,
4068) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4069    if level == 0 {
4070        // No compression — store as plain text
4071        return Ok((
4072            payload.to_vec(),
4073            CanonicalEncoding::Plain,
4074            Some(payload.len() as u64),
4075        ));
4076    }
4077    if std::str::from_utf8(payload).is_ok() {
4078        let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4079        Ok((
4080            compressed,
4081            CanonicalEncoding::Zstd,
4082            Some(payload.len() as u64),
4083        ))
4084    } else {
4085        Ok((
4086            payload.to_vec(),
4087            CanonicalEncoding::Plain,
4088            Some(payload.len() as u64),
4089        ))
4090    }
4091}
4092
4093pub(crate) fn augment_search_text(
4094    base: Option<String>,
4095    uri: Option<&str>,
4096    title: Option<&str>,
4097    track: Option<&str>,
4098    tags: &[String],
4099    labels: &[String],
4100    extra_metadata: &BTreeMap<String, String>,
4101    content_dates: &[String],
4102    metadata: Option<&DocMetadata>,
4103) -> Option<String> {
4104    let mut segments: Vec<String> = Vec::new();
4105    if let Some(text) = base {
4106        let trimmed = text.trim();
4107        if !trimmed.is_empty() {
4108            segments.push(trimmed.to_string());
4109        }
4110    }
4111
4112    if let Some(title) = title {
4113        if !title.trim().is_empty() {
4114            segments.push(format!("title: {}", title.trim()));
4115        }
4116    }
4117
4118    if let Some(uri) = uri {
4119        if !uri.trim().is_empty() {
4120            segments.push(format!("uri: {}", uri.trim()));
4121        }
4122    }
4123
4124    if let Some(track) = track {
4125        if !track.trim().is_empty() {
4126            segments.push(format!("track: {}", track.trim()));
4127        }
4128    }
4129
4130    if !tags.is_empty() {
4131        segments.push(format!("tags: {}", tags.join(" ")));
4132    }
4133
4134    if !labels.is_empty() {
4135        segments.push(format!("labels: {}", labels.join(" ")));
4136    }
4137
4138    if !extra_metadata.is_empty() {
4139        for (key, value) in extra_metadata {
4140            if value.trim().is_empty() {
4141                continue;
4142            }
4143            segments.push(format!("{key}: {value}"));
4144        }
4145    }
4146
4147    if !content_dates.is_empty() {
4148        segments.push(format!("dates: {}", content_dates.join(" ")));
4149    }
4150
4151    if let Some(meta) = metadata {
4152        if let Ok(meta_json) = serde_json::to_string(meta) {
4153            segments.push(format!("metadata: {meta_json}"));
4154        }
4155    }
4156
4157    if segments.is_empty() {
4158        None
4159    } else {
4160        Some(segments.join("\n"))
4161    }
4162}
4163
4164pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4165    if additions.is_empty() {
4166        return;
4167    }
4168    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4169    for value in additions {
4170        let trimmed = value.trim();
4171        if trimmed.is_empty() {
4172            continue;
4173        }
4174        let candidate = trimmed.to_string();
4175        if seen.insert(candidate.clone()) {
4176            target.push(candidate);
4177        }
4178    }
4179}