Skip to main content

memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58    PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    #[must_use]
210    pub fn new(mode: CommitMode) -> Self {
211        Self {
212            mode,
213            background: false,
214        }
215    }
216
217    #[must_use]
218    pub fn background(mut self, background: bool) -> Self {
219        self.background = background;
220        self
221    }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226    REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230    mime: Option<&str>,
231    magic: Option<&[u8]>,
232    uri: Option<&str>,
233) -> Option<DocumentFormat> {
234    // Check PDF magic bytes first
235    if detect_pdf_magic(magic) {
236        return Some(DocumentFormat::Pdf);
237    }
238
239    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
240    // so we need to check file extension to distinguish them
241    if let Some(magic_bytes) = magic {
242        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243            // It's a ZIP file - check extension to determine OOXML type
244            if let Some(format) = infer_format_from_extension(uri) {
245                return Some(format);
246            }
247        }
248    }
249
250    // Try MIME type
251    if let Some(mime_str) = mime {
252        let mime_lower = mime_str.trim().to_ascii_lowercase();
253        let format = match mime_lower.as_str() {
254            "application/pdf" => Some(DocumentFormat::Pdf),
255            "text/plain" => Some(DocumentFormat::PlainText),
256            "text/markdown" => Some(DocumentFormat::Markdown),
257            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259                Some(DocumentFormat::Docx)
260            }
261            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262                Some(DocumentFormat::Xlsx)
263            }
264            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266                Some(DocumentFormat::Pptx)
267            }
268            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269            _ => None,
270        };
271        if format.is_some() {
272            return format;
273        }
274    }
275
276    // Fall back to extension-based detection
277    infer_format_from_extension(uri)
278}
279
280/// Infer document format from file extension in URI/path
281fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282    let uri = uri?;
283    let path = std::path::Path::new(uri);
284    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285    match ext.as_str() {
286        "pdf" => Some(DocumentFormat::Pdf),
287        "docx" => Some(DocumentFormat::Docx),
288        "xlsx" => Some(DocumentFormat::Xlsx),
289        "xls" => Some(DocumentFormat::Xls),
290        "pptx" => Some(DocumentFormat::Pptx),
291        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294        | "sql" => Some(DocumentFormat::PlainText),
295        "md" | "markdown" => Some(DocumentFormat::Markdown),
296        "html" | "htm" => Some(DocumentFormat::Html),
297        _ => None,
298    }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302    let mut slice = match magic {
303        Some(slice) if !slice.is_empty() => slice,
304        _ => return false,
305    };
306    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307        slice = &slice[3..];
308    }
309    while let Some((first, rest)) = slice.split_first() {
310        if first.is_ascii_whitespace() {
311            slice = rest;
312        } else {
313            break;
314        }
315    }
316    slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320    target = "memvid::extract",
321    skip_all,
322    fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325    bytes: &[u8],
326    mime_hint: Option<&str>,
327    uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329    let registry = default_reader_registry();
330    let magic = bytes
331        .get(..MAGIC_SNIFF_BYTES)
332        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334        .with_uri(uri)
335        .with_magic(magic);
336
337    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338        let start = Instant::now();
339        match reader.extract(bytes, &hint) {
340            Ok(output) => {
341                return Ok(finalize_reader_output(output, start));
342            }
343            Err(err) => {
344                tracing::error!(
345                    target = "memvid::extract",
346                    reader = reader.name(),
347                    error = %err,
348                    "reader failed; falling back"
349                );
350                Some(format!("reader {} failed: {err}", reader.name()))
351            }
352        }
353    } else {
354        tracing::debug!(
355            target = "memvid::extract",
356            format = hint.format.map(super::super::reader::DocumentFormat::label),
357            "no reader matched; using default extractor"
358        );
359        Some("no registered reader matched; using default extractor".to_string())
360    };
361
362    let start = Instant::now();
363    let mut output = PassthroughReader.extract(bytes, &hint)?;
364    if let Some(reason) = fallback_reason {
365        output.diagnostics.track_warning(reason);
366    }
367    Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371    let elapsed = start.elapsed();
372    let ReaderOutput {
373        document,
374        reader_name,
375        diagnostics,
376    } = output;
377    log_reader_result(&reader_name, &diagnostics, elapsed);
378    document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382    let duration_ms = diagnostics
383        .duration_ms
384        .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385    let warnings = diagnostics.warnings.len();
386    let pages = diagnostics.pages_processed;
387
388    if warnings > 0 || diagnostics.fallback {
389        tracing::warn!(
390            target = "memvid::extract",
391            reader,
392            duration_ms,
393            pages,
394            warnings,
395            fallback = diagnostics.fallback,
396            "extraction completed with warnings"
397        );
398        for warning in &diagnostics.warnings {
399            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400        }
401    } else {
402        tracing::info!(
403            target = "memvid::extract",
404            reader,
405            duration_ms,
406            pages,
407            "extraction completed"
408        );
409    }
410}
411
412impl Memvid {
413    // -- Public ingestion entrypoints ---------------------------------------------------------
414
415    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416    where
417        F: FnOnce(&mut Self) -> Result<()>,
418    {
419        self.file.sync_all()?;
420        let mut staging = CommitStaging::prepare(self.path())?;
421        staging.copy_from(&self.file)?;
422
423        let staging_handle = staging.clone_file()?;
424        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425        let original_file = std::mem::replace(&mut self.file, staging_handle);
426        let original_wal = std::mem::replace(&mut self.wal, new_wal);
427        let original_header = self.header.clone();
428        let original_toc = self.toc.clone();
429        let original_data_end = self.data_end;
430        let original_generation = self.generation;
431        let original_dirty = self.dirty;
432        #[cfg(feature = "lex")]
433        let original_tantivy_dirty = self.tantivy_dirty;
434
435        let destination_path = self.path().to_path_buf();
436        let mut original_file = Some(original_file);
437        let mut original_wal = Some(original_wal);
438
439        match op(self) {
440            Ok(()) => {
441                self.file.sync_all()?;
442                match staging.commit() {
443                    Ok(()) => {
444                        drop(original_file.take());
445                        drop(original_wal.take());
446                        self.file = OpenOptions::new()
447                            .read(true)
448                            .write(true)
449                            .open(&destination_path)?;
450                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451                        Ok(())
452                    }
453                    Err(commit_err) => {
454                        if let Some(file) = original_file.take() {
455                            self.file = file;
456                        }
457                        if let Some(wal) = original_wal.take() {
458                            self.wal = wal;
459                        }
460                        self.header = original_header;
461                        self.toc = original_toc;
462                        self.data_end = original_data_end;
463                        self.generation = original_generation;
464                        self.dirty = original_dirty;
465                        #[cfg(feature = "lex")]
466                        {
467                            self.tantivy_dirty = original_tantivy_dirty;
468                        }
469                        Err(commit_err)
470                    }
471                }
472            }
473            Err(err) => {
474                let _ = staging.discard();
475                if let Some(file) = original_file.take() {
476                    self.file = file;
477                }
478                if let Some(wal) = original_wal.take() {
479                    self.wal = wal;
480                }
481                self.header = original_header;
482                self.toc = original_toc;
483                self.data_end = original_data_end;
484                self.generation = original_generation;
485                self.dirty = original_dirty;
486                #[cfg(feature = "lex")]
487                {
488                    self.tantivy_dirty = original_tantivy_dirty;
489                }
490                Err(err)
491            }
492        }
493    }
494
495    pub(crate) fn catalog_data_end(&self) -> u64 {
496        let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498        for descriptor in &self.toc.segment_catalog.lex_segments {
499            if descriptor.common.bytes_length == 0 {
500                continue;
501            }
502            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503        }
504
505        for descriptor in &self.toc.segment_catalog.vec_segments {
506            if descriptor.common.bytes_length == 0 {
507                continue;
508            }
509            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510        }
511
512        for descriptor in &self.toc.segment_catalog.time_segments {
513            if descriptor.common.bytes_length == 0 {
514                continue;
515            }
516            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517        }
518
519        #[cfg(feature = "temporal_track")]
520        for descriptor in &self.toc.segment_catalog.temporal_segments {
521            if descriptor.common.bytes_length == 0 {
522                continue;
523            }
524            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525        }
526
527        #[cfg(feature = "lex")]
528        for descriptor in &self.toc.segment_catalog.tantivy_segments {
529            if descriptor.common.bytes_length == 0 {
530                continue;
531            }
532            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533        }
534
535        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536            if manifest.bytes_length != 0 {
537                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538            }
539        }
540
541        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542            if manifest.bytes_length != 0 {
543                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544            }
545        }
546
547        if let Some(manifest) = self.toc.time_index.as_ref() {
548            if manifest.bytes_length != 0 {
549                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550            }
551        }
552
553        #[cfg(feature = "temporal_track")]
554        if let Some(track) = self.toc.temporal_track.as_ref() {
555            if track.bytes_length != 0 {
556                max_end = max_end.max(track.bytes_offset + track.bytes_length);
557            }
558        }
559
560        max_end
561    }
562
563    fn payload_region_end(&self) -> u64 {
564        self.cached_payload_end
565    }
566
567    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
568        loop {
569            match self.wal.append_entry(payload) {
570                Ok(seq) => return Ok(seq),
571                Err(MemvidError::CheckpointFailed { reason })
572                    if reason == "embedded WAL region too small for entry"
573                        || reason == "embedded WAL region full" =>
574                {
575                    // WAL is either too small for this entry or full with pending entries.
576                    // Grow the WAL to accommodate - doubling ensures we have space.
577                    let required = WAL_ENTRY_HEADER_SIZE
578                        .saturating_add(payload.len() as u64)
579                        .max(self.header.wal_size + 1);
580                    self.grow_wal_region(required)?;
581                }
582                Err(err) => return Err(err),
583            }
584        }
585    }
586
587    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
588        let mut new_size = self.header.wal_size;
589        let mut target = required_entry_size;
590        if target == 0 {
591            target = self.header.wal_size;
592        }
593        while new_size <= target {
594            new_size = new_size
595                .checked_mul(2)
596                .ok_or_else(|| MemvidError::CheckpointFailed {
597                    reason: "wal_size overflow".into(),
598                })?;
599        }
600        let delta = new_size - self.header.wal_size;
601        if delta == 0 {
602            return Ok(());
603        }
604
605        self.shift_data_for_wal_growth(delta)?;
606        self.header.wal_size = new_size;
607        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
608        self.data_end = self.data_end.saturating_add(delta);
609        self.adjust_offsets_after_wal_growth(delta);
610
611        let catalog_end = self.catalog_data_end();
612        self.header.footer_offset = catalog_end
613            .max(self.header.footer_offset)
614            .max(self.data_end);
615
616        self.rewrite_toc_footer()?;
617        self.header.toc_checksum = self.toc.toc_checksum;
618        crate::persist_header(&mut self.file, &self.header)?;
619        self.file.sync_all()?;
620        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
621        Ok(())
622    }
623
624    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
625        if delta == 0 {
626            return Ok(());
627        }
628        let original_len = self.file.metadata()?.len();
629        let data_start = self.header.wal_offset + self.header.wal_size;
630        self.file.set_len(original_len + delta)?;
631
632        let mut remaining = original_len.saturating_sub(data_start);
633        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
634        while remaining > 0 {
635            let chunk = min(remaining, buffer.len() as u64);
636            let src = data_start + remaining - chunk;
637            self.file.seek(SeekFrom::Start(src))?;
638            #[allow(clippy::cast_possible_truncation)]
639            self.file.read_exact(&mut buffer[..chunk as usize])?;
640            let dst = src + delta;
641            self.file.seek(SeekFrom::Start(dst))?;
642            #[allow(clippy::cast_possible_truncation)]
643            self.file.write_all(&buffer[..chunk as usize])?;
644            remaining -= chunk;
645        }
646
647        self.file.seek(SeekFrom::Start(data_start))?;
648        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
649        let mut remaining = delta;
650        while remaining > 0 {
651            let write = min(remaining, zero_buf.len() as u64);
652            #[allow(clippy::cast_possible_truncation)]
653            self.file.write_all(&zero_buf[..write as usize])?;
654            remaining -= write;
655        }
656        Ok(())
657    }
658
659    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
660        if delta == 0 {
661            return;
662        }
663
664        for frame in &mut self.toc.frames {
665            if frame.payload_offset != 0 {
666                frame.payload_offset += delta;
667            }
668        }
669
670        for segment in &mut self.toc.segments {
671            if segment.bytes_offset != 0 {
672                segment.bytes_offset += delta;
673            }
674        }
675
676        if let Some(lex) = self.toc.indexes.lex.as_mut() {
677            if lex.bytes_offset != 0 {
678                lex.bytes_offset += delta;
679            }
680        }
681        for manifest in &mut self.toc.indexes.lex_segments {
682            if manifest.bytes_offset != 0 {
683                manifest.bytes_offset += delta;
684            }
685        }
686        if let Some(vec) = self.toc.indexes.vec.as_mut() {
687            if vec.bytes_offset != 0 {
688                vec.bytes_offset += delta;
689            }
690        }
691        if let Some(time_index) = self.toc.time_index.as_mut() {
692            if time_index.bytes_offset != 0 {
693                time_index.bytes_offset += delta;
694            }
695        }
696        #[cfg(feature = "temporal_track")]
697        if let Some(track) = self.toc.temporal_track.as_mut() {
698            if track.bytes_offset != 0 {
699                track.bytes_offset += delta;
700            }
701        }
702
703        let catalog = &mut self.toc.segment_catalog;
704        for descriptor in &mut catalog.lex_segments {
705            if descriptor.common.bytes_offset != 0 {
706                descriptor.common.bytes_offset += delta;
707            }
708        }
709        for descriptor in &mut catalog.vec_segments {
710            if descriptor.common.bytes_offset != 0 {
711                descriptor.common.bytes_offset += delta;
712            }
713        }
714        for descriptor in &mut catalog.time_segments {
715            if descriptor.common.bytes_offset != 0 {
716                descriptor.common.bytes_offset += delta;
717            }
718        }
719        #[cfg(feature = "temporal_track")]
720        for descriptor in &mut catalog.temporal_segments {
721            if descriptor.common.bytes_offset != 0 {
722                descriptor.common.bytes_offset += delta;
723            }
724        }
725        for descriptor in &mut catalog.tantivy_segments {
726            if descriptor.common.bytes_offset != 0 {
727                descriptor.common.bytes_offset += delta;
728            }
729        }
730
731        #[cfg(feature = "lex")]
732        if let Ok(mut storage) = self.lex_storage.write() {
733            storage.adjust_offsets(delta);
734        }
735    }
736    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
737        self.ensure_writable()?;
738        if options.background {
739            tracing::debug!("commit background flag ignored; running synchronously");
740        }
741        let mode = options.mode;
742        let records = self.wal.pending_records()?;
743        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
744            return Ok(());
745        }
746        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
747    }
748
749    pub fn commit(&mut self) -> Result<()> {
750        self.ensure_writable()?;
751        self.commit_with_options(CommitOptions::new(CommitMode::Full))
752    }
753
754    /// Enter batch mode for high-throughput ingestion.
755    ///
756    /// While batch mode is active:
757    /// - WAL fsync is skipped on every append (controlled by `opts.skip_sync`)
758    /// - Auto-checkpoint is suppressed (controlled by `opts.disable_auto_checkpoint`)
759    /// - Compression level is lowered (controlled by `opts.compression_level`)
760    /// - WAL is pre-sized to avoid expensive mid-batch growth (controlled by `opts.wal_pre_size_bytes`)
761    ///
762    /// **You must call [`end_batch()`](Self::end_batch) when done** to flush the WAL
763    /// and restore normal operation.
764    pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
765        if opts.wal_pre_size_bytes > 0 {
766            self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
767        }
768        self.wal.set_skip_sync(opts.skip_sync);
769        self.batch_opts = Some(opts);
770        Ok(())
771    }
772
773    /// Pre-grow the embedded WAL to at least `min_bytes` in a single operation.
774    ///
775    /// When `disable_auto_checkpoint` is true, all WAL entries accumulate for
776    /// the entire batch.  If the region is too small it must grow repeatedly,
777    /// and every growth shifts **all** payload data — O(file_size) per shift.
778    ///
779    /// Calling this once before the batch eliminates all mid-batch shifts.
780    /// On a freshly-created file the shift is essentially free (no data to move).
781    fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
782        if min_bytes <= self.header.wal_size {
783            return Ok(());
784        }
785        // Jump directly to the target size (next power of two for alignment)
786        let target = min_bytes.next_power_of_two();
787        let delta = target.saturating_sub(self.header.wal_size);
788        if delta == 0 {
789            return Ok(());
790        }
791
792        tracing::info!(
793            current_wal = self.header.wal_size,
794            target_wal = target,
795            delta,
796            "pre-sizing WAL for batch mode"
797        );
798
799        self.shift_data_for_wal_growth(delta)?;
800        self.header.wal_size = target;
801        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
802        self.data_end = self.data_end.saturating_add(delta);
803        self.adjust_offsets_after_wal_growth(delta);
804
805        let catalog_end = self.catalog_data_end();
806        self.header.footer_offset = catalog_end
807            .max(self.header.footer_offset)
808            .max(self.data_end);
809
810        self.rewrite_toc_footer()?;
811        self.header.toc_checksum = self.toc.toc_checksum;
812        crate::persist_header(&mut self.file, &self.header)?;
813        self.file.sync_all()?;
814        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
815        Ok(())
816    }
817
818    /// Exit batch mode, flushing the WAL and restoring per-entry fsync.
819    ///
820    /// This performs a single `fsync` for all appends accumulated during the batch,
821    /// then clears batch options so subsequent puts use default behaviour.
822    pub fn end_batch(&mut self) -> Result<()> {
823        // Single fsync for the entire batch
824        self.wal.flush()?;
825        self.wal.set_skip_sync(false);
826        self.batch_opts = None;
827        Ok(())
828    }
829
830    /// Commit pending WAL records without rebuilding any indexes.
831    ///
832    /// Optimized for bulk ingestion: payloads and frame metadata are persisted,
833    /// but time index, Tantivy, vec index, and other index structures are NOT
834    /// rebuilt. The caller must do a full `commit()` (which triggers
835    /// `rebuild_indexes()`) after all batches are written.
836    ///
837    /// Skips the staging lock for performance — not crash-safe.
838    pub fn commit_skip_indexes(&mut self) -> Result<()> {
839        self.ensure_writable()?;
840        let records = self.wal.pending_records()?;
841        if records.is_empty() && !self.dirty {
842            return Ok(());
843        }
844        self.commit_skip_indexes_inner(records)
845    }
846
847    fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
848        self.generation = self.generation.wrapping_add(1);
849
850        // Temporarily remove Tantivy engine to avoid per-frame indexing work
851        // and disk reads in apply_records(). We won't persist Tantivy state anyway.
852        #[cfg(feature = "lex")]
853        let tantivy_backup = self.tantivy.take();
854
855        let result = self.apply_records(records);
856
857        // Restore Tantivy engine (unchanged — no dirty frames added)
858        #[cfg(feature = "lex")]
859        {
860            self.tantivy = tantivy_backup;
861            self.tantivy_dirty = false;
862        }
863
864        let _delta = result?;
865
866        // Set footer_offset to right after payloads (no index data written)
867        self.header.footer_offset = self.data_end;
868
869        // Clear stale index manifests so next open() doesn't try to load
870        // garbage data. Indexes will be rebuilt in the finalize step.
871        self.toc.time_index = None;
872        self.toc.indexes.lex_segments.clear();
873        // Preserve vec manifest (holds dimension info) but zero out data pointers
874        if let Some(vec) = self.toc.indexes.vec.as_mut() {
875            vec.bytes_offset = 0;
876            vec.bytes_length = 0;
877            vec.vector_count = 0;
878        }
879        self.toc.indexes.lex = None;
880        self.toc.indexes.clip = None;
881        self.toc.segment_catalog.lex_segments.clear();
882        self.toc.segment_catalog.vec_segments.clear();
883        self.toc.segment_catalog.time_segments.clear();
884        self.toc.segment_catalog.tantivy_segments.clear();
885        #[cfg(feature = "temporal_track")]
886        {
887            self.toc.temporal_track = None;
888            self.toc.segment_catalog.temporal_segments.clear();
889        }
890        self.toc.memories_track = None;
891        self.toc.logic_mesh = None;
892        self.toc.sketch_track = None;
893
894        self.rewrite_toc_footer()?;
895        self.header.toc_checksum = self.toc.toc_checksum;
896        self.wal.record_checkpoint(&mut self.header)?;
897        self.header.toc_checksum = self.toc.toc_checksum;
898        crate::persist_header(&mut self.file, &self.header)?;
899        self.file.sync_all()?;
900        self.pending_frame_inserts = 0;
901        self.dirty = false;
902        Ok(())
903    }
904
905    /// Rebuild all indexes (time, Tantivy, vec) and persist the TOC.
906    ///
907    /// Use after bulk ingestion with `commit_skip_indexes()` to build
908    /// all search indexes in one O(n) pass. This is the complement to
909    /// `commit_skip_indexes()` — call it once after all batches are done.
910    pub fn finalize_indexes(&mut self) -> Result<()> {
911        self.ensure_writable()?;
912        self.rebuild_indexes(&[], &[])?;
913        self.rewrite_toc_footer()?;
914        self.header.toc_checksum = self.toc.toc_checksum;
915        crate::persist_header(&mut self.file, &self.header)?;
916        self.file.sync_all()?;
917        Ok(())
918    }
919
920    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
921        self.generation = self.generation.wrapping_add(1);
922
923        let delta = self.apply_records(records)?;
924        let mut indexes_rebuilt = false;
925
926        // Check if CLIP index has pending embeddings that need to be persisted
927        let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
928
929        if !delta.is_empty() || clip_needs_persist {
930            tracing::debug!(
931                inserted_frames = delta.inserted_frames.len(),
932                inserted_embeddings = delta.inserted_embeddings.len(),
933                inserted_time_entries = delta.inserted_time_entries.len(),
934                clip_needs_persist = clip_needs_persist,
935                "commit applied delta"
936            );
937            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
938            indexes_rebuilt = true;
939        }
940
941        if !indexes_rebuilt && self.tantivy_index_pending() {
942            self.flush_tantivy()?;
943        }
944
945        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
946        if !indexes_rebuilt && self.clip_enabled {
947            if let Some(ref clip_index) = self.clip_index {
948                if !clip_index.is_empty() {
949                    self.persist_clip_index()?;
950                }
951            }
952        }
953
954        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
955        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
956            self.persist_memories_track()?;
957        }
958
959        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
960        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
961            self.persist_logic_mesh()?;
962        }
963
964        // Persist sketch track if it has entries
965        if !self.sketch_track.is_empty() {
966            self.persist_sketch_track()?;
967        }
968
969        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
970        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
971
972        self.rewrite_toc_footer()?;
973        self.header.toc_checksum = self.toc.toc_checksum;
974        self.wal.record_checkpoint(&mut self.header)?;
975        self.header.toc_checksum = self.toc.toc_checksum;
976        crate::persist_header(&mut self.file, &self.header)?;
977        self.file.sync_all()?;
978        #[cfg(feature = "parallel_segments")]
979        if let Some(wal) = self.manifest_wal.as_mut() {
980            wal.flush()?;
981            wal.truncate()?;
982        }
983        self.pending_frame_inserts = 0;
984        self.dirty = false;
985        Ok(())
986    }
987
988    #[cfg(feature = "parallel_segments")]
989    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
990        self.ensure_writable()?;
991        if !self.dirty && !self.tantivy_index_pending() {
992            return Ok(());
993        }
994        let opts = opts.clone();
995        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
996    }
997
998    #[cfg(feature = "parallel_segments")]
999    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1000        if !self.dirty && !self.tantivy_index_pending() {
1001            return Ok(());
1002        }
1003        let records = self.wal.pending_records()?;
1004        let delta = self.apply_records(records)?;
1005        self.generation = self.generation.wrapping_add(1);
1006        let mut indexes_rebuilt = false;
1007        if !delta.is_empty() {
1008            tracing::info!(
1009                inserted_frames = delta.inserted_frames.len(),
1010                inserted_embeddings = delta.inserted_embeddings.len(),
1011                inserted_time_entries = delta.inserted_time_entries.len(),
1012                "parallel commit applied delta"
1013            );
1014            // Try to use parallel segment builder first
1015            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1016            tracing::info!(
1017                "parallel_commit: used_parallel={}, lex_enabled={}",
1018                used_parallel,
1019                self.lex_enabled
1020            );
1021            if used_parallel {
1022                // Segments were written at data_end; update footer_offset so
1023                // rewrite_toc_footer places the TOC after the new segment data
1024                self.header.footer_offset = self.data_end;
1025                indexes_rebuilt = true;
1026
1027                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
1028                // Only add new frames from the delta, not all frames.
1029                #[cfg(feature = "lex")]
1030                if self.lex_enabled {
1031                    tracing::info!(
1032                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1033                        delta.inserted_frames.len(),
1034                        self.toc.frames.len()
1035                    );
1036
1037                    // If Tantivy engine already exists, frames were already indexed during put()
1038                    // (at the add_frame call in put_bytes_with_options). Skip re-indexing to avoid
1039                    // duplicates. Only initialize and index if Tantivy wasn't present during put.
1040                    let tantivy_was_present = self.tantivy.is_some();
1041                    if self.tantivy.is_none() {
1042                        self.init_tantivy()?;
1043                    }
1044
1045                    // Skip indexing if Tantivy was already present - frames were indexed during put
1046                    if tantivy_was_present {
1047                        tracing::info!(
1048                            "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1049                        );
1050                    } else {
1051                        // First, collect all frames and their search text (to avoid borrow conflicts)
1052                        let max_payload = crate::memvid::search::max_index_payload();
1053                        let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1054
1055                        for frame_id in &delta.inserted_frames {
1056                            // Look up the actual Frame from the TOC
1057                            let frame = match self.toc.frames.get(*frame_id as usize) {
1058                                Some(f) => f.clone(),
1059                                None => continue,
1060                            };
1061
1062                            // Check if frame has explicit search_text first - clone it for ownership
1063                            let explicit_text = frame.search_text.clone();
1064                            if let Some(ref search_text) = explicit_text {
1065                                if !search_text.trim().is_empty() {
1066                                    prepared_docs.push((frame, search_text.clone()));
1067                                    continue;
1068                                }
1069                            }
1070
1071                            // Get MIME type and check if text-indexable
1072                            let mime = frame
1073                                .metadata
1074                                .as_ref()
1075                                .and_then(|m| m.mime.as_deref())
1076                                .unwrap_or("application/octet-stream");
1077
1078                            if !crate::memvid::search::is_text_indexable_mime(mime) {
1079                                continue;
1080                            }
1081
1082                            if frame.payload_length > max_payload {
1083                                continue;
1084                            }
1085
1086                            let text = self.frame_search_text(&frame)?;
1087                            if !text.trim().is_empty() {
1088                                prepared_docs.push((frame, text));
1089                            }
1090                        }
1091
1092                        // Now add to Tantivy engine (no borrow conflict)
1093                        if let Some(ref mut engine) = self.tantivy {
1094                            for (frame, text) in &prepared_docs {
1095                                engine.add_frame(frame, text)?;
1096                            }
1097
1098                            if !prepared_docs.is_empty() {
1099                                engine.commit()?;
1100                                self.tantivy_dirty = true;
1101                            }
1102
1103                            tracing::info!(
1104                                "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1105                                prepared_docs.len(),
1106                                engine.num_docs()
1107                            );
1108                        } else {
1109                            tracing::warn!(
1110                                "parallel_commit: Tantivy engine is None after init_tantivy"
1111                            );
1112                        }
1113                    } // end of else !tantivy_was_present
1114                }
1115
1116                // Time index stores all entries together for timeline queries.
1117                // Unlike Tantivy which is incremental, time index needs full rebuild.
1118                self.file.seek(SeekFrom::Start(self.data_end))?;
1119                let mut time_entries: Vec<TimeIndexEntry> = self
1120                    .toc
1121                    .frames
1122                    .iter()
1123                    .filter(|frame| {
1124                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1125                    })
1126                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1127                    .collect();
1128                let (ti_offset, ti_length, ti_checksum) =
1129                    time_index_append(&mut self.file, &mut time_entries)?;
1130                self.toc.time_index = Some(TimeIndexManifest {
1131                    bytes_offset: ti_offset,
1132                    bytes_length: ti_length,
1133                    entry_count: time_entries.len() as u64,
1134                    checksum: ti_checksum,
1135                });
1136                // Update data_end to account for the newly written time index
1137                self.data_end = ti_offset + ti_length;
1138                self.header.footer_offset = self.data_end;
1139                tracing::info!(
1140                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1141                    ti_offset,
1142                    ti_length,
1143                    time_entries.len()
1144                );
1145            } else {
1146                // Fall back to sequential rebuild if no segments were generated
1147                self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1148                indexes_rebuilt = true;
1149            }
1150        }
1151
1152        // Flush Tantivy index if dirty (from parallel path or pending updates)
1153        #[cfg(feature = "lex")]
1154        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1155            self.flush_tantivy()?;
1156        }
1157
1158        // Persist CLIP index if it has embeddings
1159        if self.clip_enabled {
1160            if let Some(ref clip_index) = self.clip_index {
1161                if !clip_index.is_empty() {
1162                    self.persist_clip_index()?;
1163                }
1164            }
1165        }
1166
1167        // Persist memories track if it has cards
1168        if self.memories_track.card_count() > 0 {
1169            self.persist_memories_track()?;
1170        }
1171
1172        // Persist logic mesh if it has nodes
1173        if !self.logic_mesh.is_empty() {
1174            self.persist_logic_mesh()?;
1175        }
1176
1177        // Persist sketch track if it has entries
1178        if !self.sketch_track.is_empty() {
1179            self.persist_sketch_track()?;
1180        }
1181
1182        // flush_tantivy() has already set footer_offset correctly
1183        // DO NOT overwrite with catalog_data_end()
1184        self.rewrite_toc_footer()?;
1185        self.header.toc_checksum = self.toc.toc_checksum;
1186        self.wal.record_checkpoint(&mut self.header)?;
1187        self.header.toc_checksum = self.toc.toc_checksum;
1188        crate::persist_header(&mut self.file, &self.header)?;
1189        self.file.sync_all()?;
1190        if let Some(wal) = self.manifest_wal.as_mut() {
1191            wal.flush()?;
1192            wal.truncate()?;
1193        }
1194        self.pending_frame_inserts = 0;
1195        self.dirty = false;
1196        Ok(())
1197    }
1198
1199    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1200        let records = self.wal.records_after(self.header.wal_sequence)?;
1201        if records.is_empty() {
1202            if self.tantivy_index_pending() {
1203                self.flush_tantivy()?;
1204            }
1205            return Ok(());
1206        }
1207        let delta = self.apply_records(records)?;
1208        if !delta.is_empty() {
1209            tracing::debug!(
1210                inserted_frames = delta.inserted_frames.len(),
1211                inserted_embeddings = delta.inserted_embeddings.len(),
1212                inserted_time_entries = delta.inserted_time_entries.len(),
1213                "recover applied delta"
1214            );
1215            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1216        } else if self.tantivy_index_pending() {
1217            self.flush_tantivy()?;
1218        }
1219        self.wal.record_checkpoint(&mut self.header)?;
1220        crate::persist_header(&mut self.file, &self.header)?;
1221        if !delta.is_empty() {
1222            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1223        } else if self.tantivy_index_pending() {
1224            self.flush_tantivy()?;
1225            crate::persist_header(&mut self.file, &self.header)?;
1226        }
1227        self.file.sync_all()?;
1228        self.pending_frame_inserts = 0;
1229        self.dirty = false;
1230        Ok(())
1231    }
1232
1233    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1234        let mut delta = IngestionDelta::default();
1235        if records.is_empty() {
1236            return Ok(delta);
1237        }
1238
1239        // Use data_end instead of payload_region_end to avoid overwriting
1240        // vec/lex/time segments that were written after the payload region.
1241        // payload_region_end() only considers frame payloads, but data_end tracks
1242        // all data including index segments.
1243        let mut data_cursor = self.data_end;
1244        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1245
1246        if !records.is_empty() {
1247            self.file.seek(SeekFrom::Start(data_cursor))?;
1248            for record in records {
1249                let mut entry = match decode_wal_entry(&record.payload)? {
1250                    WalEntry::Frame(entry) => entry,
1251                    #[cfg(feature = "lex")]
1252                    WalEntry::Lex(batch) => {
1253                        self.apply_lex_wal(batch)?;
1254                        continue;
1255                    }
1256                };
1257
1258                match entry.op {
1259                    FrameWalOp::Insert => {
1260                        let frame_id = self.toc.frames.len() as u64;
1261
1262                        let (
1263                            payload_offset,
1264                            payload_length,
1265                            checksum_bytes,
1266                            canonical_length_value,
1267                        ) = if let Some(source_id) = entry.reuse_payload_from {
1268                            if !entry.payload.is_empty() {
1269                                return Err(MemvidError::InvalidFrame {
1270                                    frame_id: source_id,
1271                                    reason: "reused payload entry contained inline bytes",
1272                                });
1273                            }
1274                            let source_idx = usize::try_from(source_id).map_err(|_| {
1275                                MemvidError::InvalidFrame {
1276                                    frame_id: source_id,
1277                                    reason: "frame id too large for memory",
1278                                }
1279                            })?;
1280                            let source = self.toc.frames.get(source_idx).cloned().ok_or(
1281                                MemvidError::InvalidFrame {
1282                                    frame_id: source_id,
1283                                    reason: "reused payload source missing",
1284                                },
1285                            )?;
1286                            (
1287                                source.payload_offset,
1288                                source.payload_length,
1289                                source.checksum,
1290                                entry
1291                                    .canonical_length
1292                                    .or(source.canonical_length)
1293                                    .unwrap_or(source.payload_length),
1294                            )
1295                        } else {
1296                            self.file.seek(SeekFrom::Start(data_cursor))?;
1297                            self.file.write_all(&entry.payload)?;
1298                            let checksum = hash(&entry.payload);
1299                            let payload_length = entry.payload.len() as u64;
1300                            let canonical_length =
1301                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1302                                    if let Some(len) = entry.canonical_length {
1303                                        len
1304                                    } else {
1305                                        let decoded = crate::decode_canonical_bytes(
1306                                            &entry.payload,
1307                                            CanonicalEncoding::Zstd,
1308                                            frame_id,
1309                                        )?;
1310                                        decoded.len() as u64
1311                                    }
1312                                } else {
1313                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1314                                };
1315                            let payload_offset = data_cursor;
1316                            data_cursor += payload_length;
1317                            // Keep cached_payload_end in sync (monotonically increasing)
1318                            self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1319                            (
1320                                payload_offset,
1321                                payload_length,
1322                                *checksum.as_bytes(),
1323                                canonical_length,
1324                            )
1325                        };
1326
1327                        let uri = entry
1328                            .uri
1329                            .clone()
1330                            .unwrap_or_else(|| crate::default_uri(frame_id));
1331                        let title = entry
1332                            .title
1333                            .clone()
1334                            .or_else(|| crate::infer_title_from_uri(&uri));
1335
1336                        #[cfg(feature = "temporal_track")]
1337                        let (anchor_ts, anchor_source) =
1338                            self.determine_temporal_anchor(entry.timestamp);
1339
1340                        let mut frame = Frame {
1341                            id: frame_id,
1342                            timestamp: entry.timestamp,
1343                            anchor_ts: {
1344                                #[cfg(feature = "temporal_track")]
1345                                {
1346                                    Some(anchor_ts)
1347                                }
1348                                #[cfg(not(feature = "temporal_track"))]
1349                                {
1350                                    None
1351                                }
1352                            },
1353                            anchor_source: {
1354                                #[cfg(feature = "temporal_track")]
1355                                {
1356                                    Some(anchor_source)
1357                                }
1358                                #[cfg(not(feature = "temporal_track"))]
1359                                {
1360                                    None
1361                                }
1362                            },
1363                            kind: entry.kind.clone(),
1364                            track: entry.track.clone(),
1365                            payload_offset,
1366                            payload_length,
1367                            checksum: checksum_bytes,
1368                            uri: Some(uri),
1369                            title,
1370                            canonical_encoding: entry.canonical_encoding,
1371                            canonical_length: Some(canonical_length_value),
1372                            metadata: entry.metadata.clone(),
1373                            search_text: entry.search_text.clone(),
1374                            tags: entry.tags.clone(),
1375                            labels: entry.labels.clone(),
1376                            extra_metadata: entry.extra_metadata.clone(),
1377                            content_dates: entry.content_dates.clone(),
1378                            chunk_manifest: entry.chunk_manifest.clone(),
1379                            role: entry.role,
1380                            parent_id: None,
1381                            chunk_index: entry.chunk_index,
1382                            chunk_count: entry.chunk_count,
1383                            status: FrameStatus::Active,
1384                            supersedes: entry.supersedes_frame_id,
1385                            superseded_by: None,
1386                            source_sha256: entry.source_sha256,
1387                            source_path: entry.source_path.clone(),
1388                            enrichment_state: entry.enrichment_state,
1389                        };
1390
1391                        if let Some(parent_seq) = entry.parent_sequence {
1392                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1393                                frame.parent_id = Some(*parent_frame_id);
1394                            } else {
1395                                // Parent sequence not found in current batch - this can happen
1396                                // if parent was committed in a previous batch. Try to find parent
1397                                // by looking at recently inserted frames with matching characteristics.
1398                                // The parent should be the most recent Document frame that has
1399                                // a chunk_manifest matching this chunk's expected parent.
1400                                if entry.role == FrameRole::DocumentChunk {
1401                                    // Look backwards through recently inserted frames
1402                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1403                                        if let Ok(idx) = usize::try_from(candidate_id) {
1404                                            if let Some(candidate) = self.toc.frames.get(idx) {
1405                                                if candidate.role == FrameRole::Document
1406                                                    && candidate.chunk_manifest.is_some()
1407                                                {
1408                                                    // Found a parent document - use it
1409                                                    frame.parent_id = Some(candidate_id);
1410                                                    tracing::debug!(
1411                                                        chunk_frame_id = frame_id,
1412                                                        parent_frame_id = candidate_id,
1413                                                        parent_seq = parent_seq,
1414                                                        "resolved chunk parent via fallback"
1415                                                    );
1416                                                    break;
1417                                                }
1418                                            }
1419                                        }
1420                                    }
1421                                }
1422                                if frame.parent_id.is_none() {
1423                                    tracing::warn!(
1424                                        chunk_frame_id = frame_id,
1425                                        parent_seq = parent_seq,
1426                                        "chunk has parent_sequence but parent not found in batch"
1427                                    );
1428                                }
1429                            }
1430                        }
1431
1432                        #[cfg(feature = "lex")]
1433                        let index_text = if self.tantivy.is_some() {
1434                            if let Some(text) = entry.search_text.clone() {
1435                                if text.trim().is_empty() {
1436                                    None
1437                                } else {
1438                                    Some(text)
1439                                }
1440                            } else {
1441                                Some(self.frame_content(&frame)?)
1442                            }
1443                        } else {
1444                            None
1445                        };
1446                        #[cfg(feature = "lex")]
1447                        if let (Some(engine), Some(text)) =
1448                            (self.tantivy.as_mut(), index_text.as_ref())
1449                        {
1450                            engine.add_frame(&frame, text)?;
1451                            self.tantivy_dirty = true;
1452
1453                            // Generate sketch for fast candidate pre-filtering
1454                            // Uses the same text as tantivy indexing for consistency
1455                            if !text.trim().is_empty() {
1456                                let entry = crate::types::generate_sketch(
1457                                    frame_id,
1458                                    text,
1459                                    crate::types::SketchVariant::Small,
1460                                    None,
1461                                );
1462                                self.sketch_track.insert(entry);
1463                            }
1464                        }
1465
1466                        if let Some(embedding) = entry.embedding.take() {
1467                            delta
1468                                .inserted_embeddings
1469                                .push((frame_id, embedding.clone()));
1470                        }
1471
1472                        if entry.role == FrameRole::Document {
1473                            delta
1474                                .inserted_time_entries
1475                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1476                            #[cfg(feature = "temporal_track")]
1477                            {
1478                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1479                                    frame_id,
1480                                    anchor_ts,
1481                                    anchor_source,
1482                                ));
1483                                delta.inserted_temporal_mentions.extend(
1484                                    Self::collect_temporal_mentions(
1485                                        entry.search_text.as_deref(),
1486                                        frame_id,
1487                                        anchor_ts,
1488                                    ),
1489                                );
1490                            }
1491                        }
1492
1493                        if let Some(predecessor) = frame.supersedes {
1494                            self.mark_frame_superseded(predecessor, frame_id)?;
1495                        }
1496
1497                        self.toc.frames.push(frame);
1498                        delta.inserted_frames.push(frame_id);
1499                        sequence_to_frame.insert(record.sequence, frame_id);
1500                    }
1501                    FrameWalOp::Tombstone => {
1502                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1503                            frame_id: 0,
1504                            reason: "tombstone missing frame reference",
1505                        })?;
1506                        self.mark_frame_deleted(target)?;
1507                        delta.mutated_frames = true;
1508                    }
1509                }
1510            }
1511            self.data_end = self.data_end.max(data_cursor);
1512        }
1513
1514        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1515        // This handles edge cases where chunks couldn't be linked during the first pass.
1516        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1517        let orphan_resolutions: Vec<(u64, u64)> = delta
1518            .inserted_frames
1519            .iter()
1520            .filter_map(|&frame_id| {
1521                let idx = usize::try_from(frame_id).ok()?;
1522                let frame = self.toc.frames.get(idx)?;
1523                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1524                    return None;
1525                }
1526                // Find the most recent Document frame before this chunk that has a manifest
1527                for candidate_id in (0..frame_id).rev() {
1528                    if let Ok(idx) = usize::try_from(candidate_id) {
1529                        if let Some(candidate) = self.toc.frames.get(idx) {
1530                            if candidate.role == FrameRole::Document
1531                                && candidate.chunk_manifest.is_some()
1532                                && candidate.status == FrameStatus::Active
1533                            {
1534                                return Some((frame_id, candidate_id));
1535                            }
1536                        }
1537                    }
1538                }
1539                None
1540            })
1541            .collect();
1542
1543        // Now apply the resolutions
1544        for (chunk_id, parent_id) in orphan_resolutions {
1545            if let Ok(idx) = usize::try_from(chunk_id) {
1546                if let Some(frame) = self.toc.frames.get_mut(idx) {
1547                    frame.parent_id = Some(parent_id);
1548                    tracing::debug!(
1549                        chunk_frame_id = chunk_id,
1550                        parent_frame_id = parent_id,
1551                        "resolved orphan chunk parent in second pass"
1552                    );
1553                }
1554            }
1555        }
1556
1557        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1558        // See commit_from_records() for where rebuild_indexes() is invoked.
1559        Ok(delta)
1560    }
1561
1562    #[cfg(feature = "temporal_track")]
1563    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1564        (timestamp, AnchorSource::FrameTimestamp)
1565    }
1566
1567    #[cfg(feature = "temporal_track")]
1568    fn collect_temporal_mentions(
1569        text: Option<&str>,
1570        frame_id: FrameId,
1571        anchor_ts: i64,
1572    ) -> Vec<TemporalMention> {
1573        let text = match text {
1574            Some(value) if !value.trim().is_empty() => value,
1575            _ => return Vec::new(),
1576        };
1577
1578        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1579            Ok(ts) => ts,
1580            Err(_) => return Vec::new(),
1581        };
1582
1583        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1584        let normalizer = TemporalNormalizer::new(context);
1585        let mut spans: Vec<(usize, usize)> = Vec::new();
1586        let lower = text.to_ascii_lowercase();
1587
1588        for phrase in STATIC_TEMPORAL_PHRASES {
1589            let mut search_start = 0usize;
1590            while let Some(idx) = lower[search_start..].find(phrase) {
1591                let abs = search_start + idx;
1592                let end = abs + phrase.len();
1593                spans.push((abs, end));
1594                search_start = end;
1595            }
1596        }
1597
1598        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1599        let regex = NUMERIC_DATE.get_or_init(|| {
1600            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1601        });
1602        let regex = match regex {
1603            Ok(re) => re,
1604            Err(msg) => {
1605                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1606                return Vec::new();
1607            }
1608        };
1609        for mat in regex.find_iter(text) {
1610            spans.push((mat.start(), mat.end()));
1611        }
1612
1613        spans.sort_unstable();
1614        spans.dedup();
1615
1616        let mut mentions: Vec<TemporalMention> = Vec::new();
1617        for (start, end) in spans {
1618            if end > text.len() || start >= end {
1619                continue;
1620            }
1621            let raw = &text[start..end];
1622            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1623            if trimmed.is_empty() {
1624                continue;
1625            }
1626            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1627            let finish = offset + trimmed.len();
1628            match normalizer.resolve(trimmed) {
1629                Ok(resolution) => {
1630                    mentions.extend(Self::resolution_to_mentions(
1631                        resolution, frame_id, offset, finish,
1632                    ));
1633                }
1634                Err(_) => continue,
1635            }
1636        }
1637
1638        mentions
1639    }
1640
1641    #[cfg(feature = "temporal_track")]
1642    fn resolution_to_mentions(
1643        resolution: TemporalResolution,
1644        frame_id: FrameId,
1645        byte_start: usize,
1646        byte_end: usize,
1647    ) -> Vec<TemporalMention> {
1648        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1649        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1650        let mut results = Vec::new();
1651
1652        let base_flags = Self::flags_from_resolution(&resolution.flags);
1653        match resolution.value {
1654            TemporalResolutionValue::Date(date) => {
1655                let ts = Self::date_to_timestamp(date);
1656                results.push(TemporalMention::new(
1657                    ts,
1658                    frame_id,
1659                    byte_start,
1660                    byte_len,
1661                    TemporalMentionKind::Date,
1662                    resolution.confidence,
1663                    0,
1664                    base_flags,
1665                ));
1666            }
1667            TemporalResolutionValue::DateTime(dt) => {
1668                let ts = dt.unix_timestamp();
1669                let tz_hint = dt.offset().whole_minutes() as i16;
1670                results.push(TemporalMention::new(
1671                    ts,
1672                    frame_id,
1673                    byte_start,
1674                    byte_len,
1675                    TemporalMentionKind::DateTime,
1676                    resolution.confidence,
1677                    tz_hint,
1678                    base_flags,
1679                ));
1680            }
1681            TemporalResolutionValue::DateRange { start, end } => {
1682                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1683                let start_ts = Self::date_to_timestamp(start);
1684                results.push(TemporalMention::new(
1685                    start_ts,
1686                    frame_id,
1687                    byte_start,
1688                    byte_len,
1689                    TemporalMentionKind::RangeStart,
1690                    resolution.confidence,
1691                    0,
1692                    flags,
1693                ));
1694                let end_ts = Self::date_to_timestamp(end);
1695                results.push(TemporalMention::new(
1696                    end_ts,
1697                    frame_id,
1698                    byte_start,
1699                    byte_len,
1700                    TemporalMentionKind::RangeEnd,
1701                    resolution.confidence,
1702                    0,
1703                    flags,
1704                ));
1705            }
1706            TemporalResolutionValue::DateTimeRange { start, end } => {
1707                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1708                results.push(TemporalMention::new(
1709                    start.unix_timestamp(),
1710                    frame_id,
1711                    byte_start,
1712                    byte_len,
1713                    TemporalMentionKind::RangeStart,
1714                    resolution.confidence,
1715                    start.offset().whole_minutes() as i16,
1716                    flags,
1717                ));
1718                results.push(TemporalMention::new(
1719                    end.unix_timestamp(),
1720                    frame_id,
1721                    byte_start,
1722                    byte_len,
1723                    TemporalMentionKind::RangeEnd,
1724                    resolution.confidence,
1725                    end.offset().whole_minutes() as i16,
1726                    flags,
1727                ));
1728            }
1729            TemporalResolutionValue::Month { year, month } => {
1730                let start_date = match Date::from_calendar_date(year, month, 1) {
1731                    Ok(date) => date,
1732                    Err(err) => {
1733                        tracing::warn!(
1734                            target = "memvid::temporal",
1735                            %err,
1736                            year,
1737                            month = month as u8,
1738                            "skipping invalid month resolution"
1739                        );
1740                        // Skip invalid range for this mention only.
1741                        return results;
1742                    }
1743                };
1744                let end_date = match Self::last_day_in_month(year, month) {
1745                    Some(date) => date,
1746                    None => {
1747                        tracing::warn!(
1748                            target = "memvid::temporal",
1749                            year,
1750                            month = month as u8,
1751                            "skipping month resolution with invalid calendar range"
1752                        );
1753                        return results;
1754                    }
1755                };
1756                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1757                results.push(TemporalMention::new(
1758                    Self::date_to_timestamp(start_date),
1759                    frame_id,
1760                    byte_start,
1761                    byte_len,
1762                    TemporalMentionKind::RangeStart,
1763                    resolution.confidence,
1764                    0,
1765                    flags,
1766                ));
1767                results.push(TemporalMention::new(
1768                    Self::date_to_timestamp(end_date),
1769                    frame_id,
1770                    byte_start,
1771                    byte_len,
1772                    TemporalMentionKind::RangeEnd,
1773                    resolution.confidence,
1774                    0,
1775                    flags,
1776                ));
1777            }
1778        }
1779
1780        results
1781    }
1782
1783    #[cfg(feature = "temporal_track")]
1784    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1785        let mut result = TemporalMentionFlags::empty();
1786        if flags
1787            .iter()
1788            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1789        {
1790            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1791        }
1792        if flags
1793            .iter()
1794            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1795        {
1796            result = result.set(TemporalMentionFlags::DERIVED, true);
1797        }
1798        result
1799    }
1800
1801    #[cfg(feature = "temporal_track")]
1802    fn date_to_timestamp(date: Date) -> i64 {
1803        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1804            .assume_offset(UtcOffset::UTC)
1805            .unix_timestamp()
1806    }
1807
1808    #[cfg(feature = "temporal_track")]
1809    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1810        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1811        while let Some(next) = date.next_day() {
1812            if next.month() == month {
1813                date = next;
1814            } else {
1815                break;
1816            }
1817        }
1818        Some(date)
1819    }
1820
1821    #[allow(dead_code)]
1822    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1823        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1824            return Ok(false);
1825        }
1826
1827        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1828            Some(artifact) => artifact,
1829            None => return Ok(false),
1830        };
1831
1832        let segment_id = self.toc.segment_catalog.next_segment_id;
1833        #[cfg(feature = "parallel_segments")]
1834        let span =
1835            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1836
1837        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1838        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1839        #[cfg(feature = "parallel_segments")]
1840        if let Some(span) = span {
1841            Self::decorate_segment_common(&mut descriptor.common, span);
1842        }
1843        #[cfg(feature = "parallel_segments")]
1844        let descriptor_for_manifest = descriptor.clone();
1845        self.toc.segment_catalog.lex_segments.push(descriptor);
1846        #[cfg(feature = "parallel_segments")]
1847        if let Err(err) = self.record_index_segment(
1848            SegmentKind::Lexical,
1849            descriptor_for_manifest.common,
1850            SegmentStats {
1851                doc_count: artifact.doc_count,
1852                vector_count: 0,
1853                time_entries: 0,
1854                bytes_uncompressed: artifact.bytes.len() as u64,
1855                build_micros: 0,
1856            },
1857        ) {
1858            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1859        }
1860        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1861        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1862        Ok(true)
1863    }
1864
1865    #[allow(dead_code)]
1866    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1867        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1868            return Ok(false);
1869        }
1870
1871        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1872            Some(artifact) => artifact,
1873            None => return Ok(false),
1874        };
1875
1876        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1877            if existing_dim != artifact.dimension {
1878                return Err(MemvidError::VecDimensionMismatch {
1879                    expected: existing_dim,
1880                    actual: artifact.dimension as usize,
1881                });
1882            }
1883        }
1884
1885        let segment_id = self.toc.segment_catalog.next_segment_id;
1886        #[cfg(feature = "parallel_segments")]
1887        #[cfg(feature = "parallel_segments")]
1888        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1889
1890        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1891        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1892        #[cfg(feature = "parallel_segments")]
1893        if let Some(span) = span {
1894            Self::decorate_segment_common(&mut descriptor.common, span);
1895        }
1896        #[cfg(feature = "parallel_segments")]
1897        let descriptor_for_manifest = descriptor.clone();
1898        self.toc.segment_catalog.vec_segments.push(descriptor);
1899        #[cfg(feature = "parallel_segments")]
1900        if let Err(err) = self.record_index_segment(
1901            SegmentKind::Vector,
1902            descriptor_for_manifest.common,
1903            SegmentStats {
1904                doc_count: 0,
1905                vector_count: artifact.vector_count,
1906                time_entries: 0,
1907                bytes_uncompressed: artifact.bytes_uncompressed,
1908                build_micros: 0,
1909            },
1910        ) {
1911            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1912        }
1913        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1914        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1915
1916        // Keep the global vec manifest in sync for auto-detection and stats.
1917        if self.toc.indexes.vec.is_none() {
1918            let empty_offset = self.data_end;
1919            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1920                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1921            self.toc.indexes.vec = Some(VecIndexManifest {
1922                vector_count: 0,
1923                dimension: 0,
1924                bytes_offset: empty_offset,
1925                bytes_length: 0,
1926                checksum: empty_checksum,
1927                compression_mode: self.vec_compression.clone(),
1928                model: self.vec_model.clone(),
1929            });
1930        }
1931        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1932            if manifest.dimension == 0 {
1933                manifest.dimension = artifact.dimension;
1934            }
1935            if manifest.bytes_length == 0 {
1936                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1937                manifest.compression_mode = artifact.compression.clone();
1938            }
1939        }
1940
1941        self.vec_enabled = true;
1942        Ok(true)
1943    }
1944
1945    #[allow(dead_code)]
1946    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1947        if delta.inserted_time_entries.is_empty() {
1948            return Ok(false);
1949        }
1950
1951        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1952            Some(artifact) => artifact,
1953            None => return Ok(false),
1954        };
1955
1956        let segment_id = self.toc.segment_catalog.next_segment_id;
1957        #[cfg(feature = "parallel_segments")]
1958        #[cfg(feature = "parallel_segments")]
1959        let span = self.segment_span_from_iter(
1960            delta
1961                .inserted_time_entries
1962                .iter()
1963                .map(|entry| entry.frame_id),
1964        );
1965
1966        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1967        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1968        #[cfg(feature = "parallel_segments")]
1969        if let Some(span) = span {
1970            Self::decorate_segment_common(&mut descriptor.common, span);
1971        }
1972        #[cfg(feature = "parallel_segments")]
1973        let descriptor_for_manifest = descriptor.clone();
1974        self.toc.segment_catalog.time_segments.push(descriptor);
1975        #[cfg(feature = "parallel_segments")]
1976        if let Err(err) = self.record_index_segment(
1977            SegmentKind::Time,
1978            descriptor_for_manifest.common,
1979            SegmentStats {
1980                doc_count: 0,
1981                vector_count: 0,
1982                time_entries: artifact.entry_count,
1983                bytes_uncompressed: artifact.bytes.len() as u64,
1984                build_micros: 0,
1985            },
1986        ) {
1987            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1988        }
1989        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1990        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1991        Ok(true)
1992    }
1993
1994    #[cfg(feature = "temporal_track")]
1995    #[allow(dead_code)]
1996    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1997        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1998        {
1999            return Ok(false);
2000        }
2001
2002        debug_assert!(
2003            delta.inserted_temporal_mentions.len() < 1_000_000,
2004            "temporal delta mentions unexpectedly large: {}",
2005            delta.inserted_temporal_mentions.len()
2006        );
2007        debug_assert!(
2008            delta.inserted_temporal_anchors.len() < 1_000_000,
2009            "temporal delta anchors unexpectedly large: {}",
2010            delta.inserted_temporal_anchors.len()
2011        );
2012
2013        let artifact = match self.build_temporal_segment_from_records(
2014            &delta.inserted_temporal_mentions,
2015            &delta.inserted_temporal_anchors,
2016        )? {
2017            Some(artifact) => artifact,
2018            None => return Ok(false),
2019        };
2020
2021        let segment_id = self.toc.segment_catalog.next_segment_id;
2022        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2023        self.toc
2024            .segment_catalog
2025            .temporal_segments
2026            .push(descriptor.clone());
2027        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2028        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2029
2030        self.toc.temporal_track = Some(TemporalTrackManifest {
2031            bytes_offset: descriptor.common.bytes_offset,
2032            bytes_length: descriptor.common.bytes_length,
2033            entry_count: artifact.entry_count,
2034            anchor_count: artifact.anchor_count,
2035            checksum: artifact.checksum,
2036            flags: artifact.flags,
2037        });
2038
2039        self.clear_temporal_track_cache();
2040
2041        Ok(true)
2042    }
2043
2044    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2045        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2046            frame_id,
2047            reason: "frame id too large",
2048        })?;
2049        let frame = self
2050            .toc
2051            .frames
2052            .get_mut(index)
2053            .ok_or(MemvidError::InvalidFrame {
2054                frame_id,
2055                reason: "supersede target missing",
2056            })?;
2057        frame.status = FrameStatus::Superseded;
2058        frame.superseded_by = Some(successor_id);
2059        self.remove_frame_from_indexes(frame_id)
2060    }
2061
2062    pub(crate) fn rebuild_indexes(
2063        &mut self,
2064        new_vec_docs: &[(FrameId, Vec<f32>)],
2065        inserted_frame_ids: &[FrameId],
2066    ) -> Result<()> {
2067        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2068            return Ok(());
2069        }
2070
2071        let payload_end = self.payload_region_end();
2072        self.data_end = payload_end;
2073        // Don't truncate if footer_offset is higher - there may be replay segments
2074        // or other data written after payload_end that must be preserved.
2075        let safe_truncate_len = self.header.footer_offset.max(payload_end);
2076        if self.file.metadata()?.len() > safe_truncate_len {
2077            self.file.set_len(safe_truncate_len)?;
2078        }
2079        self.file.seek(SeekFrom::Start(payload_end))?;
2080
2081        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
2082        self.toc.segment_catalog.lex_segments.clear();
2083        self.toc.segment_catalog.vec_segments.clear();
2084        self.toc.segment_catalog.time_segments.clear();
2085        #[cfg(feature = "temporal_track")]
2086        self.toc.segment_catalog.temporal_segments.clear();
2087        #[cfg(feature = "parallel_segments")]
2088        self.toc.segment_catalog.index_segments.clear();
2089        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
2090        self.toc.segment_catalog.tantivy_segments.clear();
2091        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
2092        self.toc.indexes.lex_segments.clear();
2093
2094        let mut time_entries: Vec<TimeIndexEntry> = self
2095            .toc
2096            .frames
2097            .iter()
2098            .filter(|frame| {
2099                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2100            })
2101            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2102            .collect();
2103        let (ti_offset, ti_length, ti_checksum) =
2104            time_index_append(&mut self.file, &mut time_entries)?;
2105        self.toc.time_index = Some(TimeIndexManifest {
2106            bytes_offset: ti_offset,
2107            bytes_length: ti_length,
2108            entry_count: time_entries.len() as u64,
2109            checksum: ti_checksum,
2110        });
2111
2112        let mut footer_offset = ti_offset + ti_length;
2113
2114        #[cfg(feature = "temporal_track")]
2115        {
2116            self.toc.temporal_track = None;
2117            self.toc.segment_catalog.temporal_segments.clear();
2118            self.clear_temporal_track_cache();
2119        }
2120
2121        if self.lex_enabled {
2122            #[cfg(feature = "lex")]
2123            {
2124                if self.tantivy_dirty {
2125                    // instant_index was used: frames were added to Tantivy with WAL
2126                    // sequence numbers as IDs, which don't match the actual frame IDs
2127                    // assigned during apply_records(). Must do a full rebuild to fix IDs.
2128                    if let Ok(mut storage) = self.lex_storage.write() {
2129                        storage.clear();
2130                        storage.set_generation(0);
2131                    }
2132                    self.init_tantivy()?;
2133                    if let Some(mut engine) = self.tantivy.take() {
2134                        self.rebuild_tantivy_engine(&mut engine)?;
2135                        self.tantivy = Some(engine);
2136                    } else {
2137                        return Err(MemvidError::InvalidToc {
2138                            reason: "tantivy engine missing during rebuild".into(),
2139                        });
2140                    }
2141                } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2142                    // Incremental path: engine exists (from open() or previous commit),
2143                    // instant_index was NOT used so no wrong IDs. Just add new frames.
2144                    // This is O(batch_size) — the key optimization for bulk ingestion.
2145                    //
2146                    // Collect frames + text first to avoid borrow conflicts with engine.
2147                    let max_payload = crate::memvid::search::max_index_payload();
2148                    let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2149                    for &frame_id in inserted_frame_ids {
2150                        let Ok(idx) = usize::try_from(frame_id) else {
2151                            continue;
2152                        };
2153                        let frame = match self.toc.frames.get(idx) {
2154                            Some(f) => f.clone(),
2155                            None => continue,
2156                        };
2157                        if frame.status != FrameStatus::Active {
2158                            continue;
2159                        }
2160                        if let Some(search_text) = frame.search_text.clone() {
2161                            if !search_text.trim().is_empty() {
2162                                prepared_docs.push((frame, search_text));
2163                                continue;
2164                            }
2165                        }
2166                        let mime = frame
2167                            .metadata
2168                            .as_ref()
2169                            .and_then(|m| m.mime.as_deref())
2170                            .unwrap_or("application/octet-stream");
2171                        if !crate::memvid::search::is_text_indexable_mime(mime) {
2172                            continue;
2173                        }
2174                        if frame.payload_length > max_payload {
2175                            continue;
2176                        }
2177                        let text = self.frame_search_text(&frame)?;
2178                        if !text.trim().is_empty() {
2179                            prepared_docs.push((frame, text));
2180                        }
2181                    }
2182                    if let Some(engine) = self.tantivy.as_mut() {
2183                        for (frame, text) in &prepared_docs {
2184                            engine.add_frame(frame, text)?;
2185                        }
2186                        engine.commit()?;
2187                    }
2188                } else {
2189                    // Full rebuild path: no engine or no new frames (e.g., doctor repair).
2190                    // Clear embedded storage to avoid carrying stale segments between rebuilds.
2191                    if let Ok(mut storage) = self.lex_storage.write() {
2192                        storage.clear();
2193                        storage.set_generation(0);
2194                    }
2195                    self.init_tantivy()?;
2196                    if let Some(mut engine) = self.tantivy.take() {
2197                        self.rebuild_tantivy_engine(&mut engine)?;
2198                        self.tantivy = Some(engine);
2199                    } else {
2200                        return Err(MemvidError::InvalidToc {
2201                            reason: "tantivy engine missing during rebuild".into(),
2202                        });
2203                    }
2204                }
2205
2206                // Set lex_enabled to ensure it persists
2207                self.lex_enabled = true;
2208
2209                // Mark Tantivy as dirty so it gets flushed
2210                self.tantivy_dirty = true;
2211
2212                // Position embedded Tantivy segments immediately after the time index.
2213                self.data_end = footer_offset;
2214
2215                // Flush Tantivy segments to file
2216                self.flush_tantivy()?;
2217
2218                // Update footer_offset after Tantivy flush
2219                footer_offset = self.header.footer_offset;
2220
2221                // Restore data_end to payload boundary so future payload writes stay before indexes.
2222                self.data_end = payload_end;
2223            }
2224            #[cfg(not(feature = "lex"))]
2225            {
2226                self.toc.indexes.lex = None;
2227                self.toc.indexes.lex_segments.clear();
2228            }
2229        } else {
2230            // Lex disabled: clear everything
2231            self.toc.indexes.lex = None;
2232            self.toc.indexes.lex_segments.clear();
2233            #[cfg(feature = "lex")]
2234            if let Ok(mut storage) = self.lex_storage.write() {
2235                storage.clear();
2236            }
2237        }
2238
2239        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2240            let vec_offset = footer_offset;
2241            self.file.seek(SeekFrom::Start(vec_offset))?;
2242            self.file.write_all(&artifact.bytes)?;
2243            footer_offset += artifact.bytes.len() as u64;
2244            self.toc.indexes.vec = Some(VecIndexManifest {
2245                vector_count: artifact.vector_count,
2246                dimension: artifact.dimension,
2247                bytes_offset: vec_offset,
2248                bytes_length: artifact.bytes.len() as u64,
2249                checksum: artifact.checksum,
2250                compression_mode: self.vec_compression.clone(),
2251                model: self.vec_model.clone(),
2252            });
2253            self.vec_index = Some(index);
2254        } else {
2255            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2256            if !self.vec_enabled {
2257                self.toc.indexes.vec = None;
2258            }
2259            self.vec_index = None;
2260        }
2261
2262        // Persist CLIP index if it has embeddings
2263        if self.clip_enabled {
2264            if let Some(ref clip_index) = self.clip_index {
2265                if !clip_index.is_empty() {
2266                    let artifact = clip_index.encode()?;
2267                    let clip_offset = footer_offset;
2268                    self.file.seek(SeekFrom::Start(clip_offset))?;
2269                    self.file.write_all(&artifact.bytes)?;
2270                    footer_offset += artifact.bytes.len() as u64;
2271                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2272                        bytes_offset: clip_offset,
2273                        bytes_length: artifact.bytes.len() as u64,
2274                        vector_count: artifact.vector_count,
2275                        dimension: artifact.dimension,
2276                        checksum: artifact.checksum,
2277                        model_name: crate::clip::default_model_info().name.to_string(),
2278                    });
2279                    tracing::info!(
2280                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2281                        artifact.vector_count,
2282                        clip_offset
2283                    );
2284                }
2285            }
2286        } else {
2287            self.toc.indexes.clip = None;
2288        }
2289
2290        // Persist memories track if it has cards
2291        if self.memories_track.card_count() > 0 {
2292            let memories_offset = footer_offset;
2293            let memories_bytes = self.memories_track.serialize()?;
2294            let memories_checksum = blake3::hash(&memories_bytes).into();
2295            self.file.seek(SeekFrom::Start(memories_offset))?;
2296            self.file.write_all(&memories_bytes)?;
2297            footer_offset += memories_bytes.len() as u64;
2298
2299            let stats = self.memories_track.stats();
2300            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2301                bytes_offset: memories_offset,
2302                bytes_length: memories_bytes.len() as u64,
2303                card_count: stats.card_count as u64,
2304                entity_count: stats.entity_count as u64,
2305                checksum: memories_checksum,
2306            });
2307        } else {
2308            self.toc.memories_track = None;
2309        }
2310
2311        // Persist logic mesh if it has nodes
2312        if self.logic_mesh.is_empty() {
2313            self.toc.logic_mesh = None;
2314        } else {
2315            let mesh_offset = footer_offset;
2316            let mesh_bytes = self.logic_mesh.serialize()?;
2317            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2318            self.file.seek(SeekFrom::Start(mesh_offset))?;
2319            self.file.write_all(&mesh_bytes)?;
2320            footer_offset += mesh_bytes.len() as u64;
2321
2322            let stats = self.logic_mesh.stats();
2323            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2324                bytes_offset: mesh_offset,
2325                bytes_length: mesh_bytes.len() as u64,
2326                node_count: stats.node_count as u64,
2327                edge_count: stats.edge_count as u64,
2328                checksum: mesh_checksum,
2329            });
2330        }
2331
2332        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2333        tracing::info!(
2334            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2335            ti_offset,
2336            ti_length,
2337            footer_offset,
2338            self.header.footer_offset
2339        );
2340
2341        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2342        // This prevents overwriting data like replay segments that were written after index data
2343        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2344
2345        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2346        if self.file.metadata()?.len() < self.header.footer_offset {
2347            self.file.set_len(self.header.footer_offset)?;
2348        }
2349
2350        self.rewrite_toc_footer()?;
2351        self.header.toc_checksum = self.toc.toc_checksum;
2352        crate::persist_header(&mut self.file, &self.header)?;
2353
2354        #[cfg(feature = "lex")]
2355        if self.lex_enabled {
2356            if let Some(ref engine) = self.tantivy {
2357                let doc_count = engine.num_docs();
2358                let active_frame_count = self
2359                    .toc
2360                    .frames
2361                    .iter()
2362                    .filter(|f| f.status == FrameStatus::Active)
2363                    .count();
2364
2365                // Count frames that would actually be indexed by rebuild_tantivy_engine
2366                // Uses the same logic: content-type based check + size limit
2367                let text_indexable_count = self
2368                    .toc
2369                    .frames
2370                    .iter()
2371                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2372                    .count();
2373
2374                // Only fail if we have text-indexable frames but none got indexed
2375                // This avoids false positives for binary files (videos, images)
2376                if doc_count == 0 && text_indexable_count > 0 {
2377                    return Err(MemvidError::Doctor {
2378                        reason: format!(
2379                            "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2380                            This indicates a critical failure in the rebuild process."
2381                        ),
2382                    });
2383                }
2384
2385                // Success! Log it
2386                log::info!(
2387                    "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2388                );
2389            }
2390        }
2391
2392        Ok(())
2393    }
2394
2395    /// Persist the memories track to the file without a full rebuild.
2396    ///
2397    /// This is used when the memories track has been modified but no frame
2398    /// changes were made (e.g., after running enrichment).
2399    fn persist_memories_track(&mut self) -> Result<()> {
2400        if self.memories_track.card_count() == 0 {
2401            self.toc.memories_track = None;
2402            return Ok(());
2403        }
2404
2405        // Write after the current footer_offset
2406        let memories_offset = self.header.footer_offset;
2407        let memories_bytes = self.memories_track.serialize()?;
2408        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2409
2410        self.file.seek(SeekFrom::Start(memories_offset))?;
2411        self.file.write_all(&memories_bytes)?;
2412
2413        let stats = self.memories_track.stats();
2414        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2415            bytes_offset: memories_offset,
2416            bytes_length: memories_bytes.len() as u64,
2417            card_count: stats.card_count as u64,
2418            entity_count: stats.entity_count as u64,
2419            checksum: memories_checksum,
2420        });
2421
2422        // Update footer_offset to account for the memories track
2423        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2424
2425        // Ensure the file length covers the memories track
2426        if self.file.metadata()?.len() < self.header.footer_offset {
2427            self.file.set_len(self.header.footer_offset)?;
2428        }
2429
2430        Ok(())
2431    }
2432
2433    /// Persist the CLIP index to the file without a full rebuild.
2434    ///
2435    /// This is used when CLIP embeddings have been added but no full
2436    /// index rebuild is needed (e.g., in parallel segments mode).
2437    fn persist_clip_index(&mut self) -> Result<()> {
2438        if !self.clip_enabled {
2439            self.toc.indexes.clip = None;
2440            return Ok(());
2441        }
2442
2443        let clip_index = match &self.clip_index {
2444            Some(idx) if !idx.is_empty() => idx,
2445            _ => {
2446                self.toc.indexes.clip = None;
2447                return Ok(());
2448            }
2449        };
2450
2451        // Encode the CLIP index
2452        let artifact = clip_index.encode()?;
2453
2454        // Write after the current footer_offset
2455        let clip_offset = self.header.footer_offset;
2456        self.file.seek(SeekFrom::Start(clip_offset))?;
2457        self.file.write_all(&artifact.bytes)?;
2458
2459        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2460            bytes_offset: clip_offset,
2461            bytes_length: artifact.bytes.len() as u64,
2462            vector_count: artifact.vector_count,
2463            dimension: artifact.dimension,
2464            checksum: artifact.checksum,
2465            model_name: crate::clip::default_model_info().name.to_string(),
2466        });
2467
2468        tracing::info!(
2469            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2470            artifact.vector_count,
2471            clip_offset
2472        );
2473
2474        // Update footer_offset to account for the CLIP index
2475        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2476
2477        // Ensure the file length covers the CLIP index
2478        if self.file.metadata()?.len() < self.header.footer_offset {
2479            self.file.set_len(self.header.footer_offset)?;
2480        }
2481
2482        Ok(())
2483    }
2484
2485    /// Persist the Logic-Mesh to the file without a full rebuild.
2486    ///
2487    /// This is used when the Logic-Mesh has been modified but no frame
2488    /// changes were made (e.g., after running NER enrichment).
2489    fn persist_logic_mesh(&mut self) -> Result<()> {
2490        if self.logic_mesh.is_empty() {
2491            self.toc.logic_mesh = None;
2492            return Ok(());
2493        }
2494
2495        // Write after the current footer_offset
2496        let mesh_offset = self.header.footer_offset;
2497        let mesh_bytes = self.logic_mesh.serialize()?;
2498        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2499
2500        self.file.seek(SeekFrom::Start(mesh_offset))?;
2501        self.file.write_all(&mesh_bytes)?;
2502
2503        let stats = self.logic_mesh.stats();
2504        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2505            bytes_offset: mesh_offset,
2506            bytes_length: mesh_bytes.len() as u64,
2507            node_count: stats.node_count as u64,
2508            edge_count: stats.edge_count as u64,
2509            checksum: mesh_checksum,
2510        });
2511
2512        // Update footer_offset to account for the logic mesh
2513        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2514
2515        // Ensure the file length covers the logic mesh
2516        if self.file.metadata()?.len() < self.header.footer_offset {
2517            self.file.set_len(self.header.footer_offset)?;
2518        }
2519
2520        Ok(())
2521    }
2522
2523    /// Persist the sketch track to the file without a full rebuild.
2524    ///
2525    /// This is used when the sketch track has been modified (e.g., after
2526    /// running `sketch build`).
2527    fn persist_sketch_track(&mut self) -> Result<()> {
2528        if self.sketch_track.is_empty() {
2529            self.toc.sketch_track = None;
2530            return Ok(());
2531        }
2532
2533        // Seek to write after the current footer_offset
2534        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2535
2536        // Write the sketch track and get (offset, length, checksum)
2537        let (sketch_offset, sketch_length, sketch_checksum) =
2538            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2539
2540        let stats = self.sketch_track.stats();
2541        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2542            bytes_offset: sketch_offset,
2543            bytes_length: sketch_length,
2544            entry_count: stats.entry_count,
2545            #[allow(clippy::cast_possible_truncation)]
2546            entry_size: stats.variant.entry_size() as u16,
2547            flags: 0,
2548            checksum: sketch_checksum,
2549        });
2550
2551        // Update footer_offset to account for the sketch track
2552        self.header.footer_offset = sketch_offset + sketch_length;
2553
2554        // Ensure the file length covers the sketch track
2555        if self.file.metadata()?.len() < self.header.footer_offset {
2556            self.file.set_len(self.header.footer_offset)?;
2557        }
2558
2559        tracing::debug!(
2560            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2561            stats.entry_count,
2562            sketch_offset
2563        );
2564
2565        Ok(())
2566    }
2567
2568    #[cfg(feature = "lex")]
2569    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2570        let LexWalBatch {
2571            generation,
2572            doc_count,
2573            checksum,
2574            segments,
2575        } = batch;
2576
2577        if let Ok(mut storage) = self.lex_storage.write() {
2578            storage.replace(doc_count, checksum, segments);
2579            storage.set_generation(generation);
2580        }
2581
2582        self.persist_lex_manifest()
2583    }
2584
2585    #[cfg(feature = "lex")]
2586    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2587        let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2588        self.append_wal_entry(&payload)?;
2589        Ok(())
2590    }
2591
2592    #[cfg(feature = "lex")]
2593    fn persist_lex_manifest(&mut self) -> Result<()> {
2594        let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2595            storage.to_manifest()
2596        } else {
2597            (None, Vec::new())
2598        };
2599
2600        // Update the manifest
2601        if let Some(storage_manifest) = index_manifest {
2602            // Old LexIndexArtifact format: set the manifest with actual offset/length
2603            self.toc.indexes.lex = Some(storage_manifest);
2604        } else {
2605            // Tantivy segments OR lex disabled: clear the manifest
2606            // Stats will check lex_segments instead of manifest
2607            self.toc.indexes.lex = None;
2608        }
2609
2610        self.toc.indexes.lex_segments = segments;
2611
2612        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2613        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2614
2615        self.rewrite_toc_footer()?;
2616        self.header.toc_checksum = self.toc.toc_checksum;
2617        crate::persist_header(&mut self.file, &self.header)?;
2618        Ok(())
2619    }
2620
2621    #[cfg(feature = "lex")]
2622    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2623        let TantivySnapshot {
2624            doc_count,
2625            checksum,
2626            segments,
2627        } = snapshot;
2628
2629        let mut footer_offset = self.data_end;
2630        self.file.seek(SeekFrom::Start(footer_offset))?;
2631
2632        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2633        for segment in segments {
2634            let bytes_length = segment.bytes.len() as u64;
2635            self.file.write_all(&segment.bytes)?;
2636            self.file.flush()?; // Flush segment data to disk
2637            embedded_segments.push(EmbeddedLexSegment {
2638                path: segment.path,
2639                bytes_offset: footer_offset,
2640                bytes_length,
2641                checksum: segment.checksum,
2642            });
2643            footer_offset += bytes_length;
2644        }
2645        // Set footer_offset for TOC writing, but DON'T update data_end
2646        // data_end stays at end of payloads, so next commit overwrites these segments
2647        // Use max() to never decrease footer_offset - this preserves replay segments
2648        // that may have been written at a higher offset
2649        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2650
2651        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2652        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2653            Vec::with_capacity(embedded_segments.len());
2654        for segment in &embedded_segments {
2655            let descriptor = TantivySegmentDescriptor::from_common(
2656                SegmentCommon::new(
2657                    next_segment_id,
2658                    segment.bytes_offset,
2659                    segment.bytes_length,
2660                    segment.checksum,
2661                ),
2662                segment.path.clone(),
2663            );
2664            catalog_segments.push(descriptor);
2665            next_segment_id = next_segment_id.saturating_add(1);
2666        }
2667        if catalog_segments.is_empty() {
2668            self.toc.segment_catalog.tantivy_segments.clear();
2669        } else {
2670            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2671            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2672        }
2673        self.toc.segment_catalog.next_segment_id = next_segment_id;
2674
2675        // REMOVED: catalog_data_end() check
2676        // This was causing orphaned Tantivy segments because it would see OLD segments
2677        // still in the catalog from previous commits, and push footer_offset forward.
2678        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2679        // stay at the end of the newly written segments.
2680
2681        let generation = self
2682            .lex_storage
2683            .write()
2684            .map_err(|_| MemvidError::Tantivy {
2685                reason: "embedded lex storage lock poisoned".into(),
2686            })
2687            .map(|mut storage| {
2688                storage.replace(doc_count, checksum, embedded_segments.clone());
2689                storage.generation()
2690            })?;
2691
2692        let batch = LexWalBatch {
2693            generation,
2694            doc_count,
2695            checksum,
2696            segments: embedded_segments.clone(),
2697        };
2698        self.append_lex_batch(&batch)?;
2699        self.persist_lex_manifest()?;
2700        self.header.toc_checksum = self.toc.toc_checksum;
2701        crate::persist_header(&mut self.file, &self.header)?;
2702        Ok(())
2703    }
2704
2705    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2706        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2707            frame_id,
2708            reason: "frame id too large",
2709        })?;
2710        let frame = self
2711            .toc
2712            .frames
2713            .get_mut(index)
2714            .ok_or(MemvidError::InvalidFrame {
2715                frame_id,
2716                reason: "delete target missing",
2717            })?;
2718        frame.status = FrameStatus::Deleted;
2719        frame.superseded_by = None;
2720        self.remove_frame_from_indexes(frame_id)
2721    }
2722
2723    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2724        #[cfg(feature = "lex")]
2725        if let Some(engine) = self.tantivy.as_mut() {
2726            engine.delete_frame(frame_id)?;
2727            self.tantivy_dirty = true;
2728        }
2729        if let Some(index) = self.lex_index.as_mut() {
2730            index.remove_document(frame_id);
2731        }
2732        if let Some(index) = self.vec_index.as_mut() {
2733            index.remove(frame_id);
2734        }
2735        Ok(())
2736    }
2737
2738    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2739        let Ok(index) = usize::try_from(frame_id) else {
2740            return false;
2741        };
2742        self.toc
2743            .frames
2744            .get(index)
2745            .is_some_and(|frame| frame.status == FrameStatus::Active)
2746    }
2747
2748    #[cfg(feature = "parallel_segments")]
2749    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2750    where
2751        I: IntoIterator<Item = FrameId>,
2752    {
2753        let mut iter = iter.into_iter();
2754        let first_id = iter.next()?;
2755        let first_frame = self.toc.frames.get(first_id as usize);
2756        let mut min_id = first_id;
2757        let mut max_id = first_id;
2758        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2759        let mut page_end = first_frame
2760            .and_then(|frame| frame.chunk_count)
2761            .map(|count| page_start + count.saturating_sub(1))
2762            .unwrap_or(page_start);
2763        for frame_id in iter {
2764            if frame_id < min_id {
2765                min_id = frame_id;
2766            }
2767            if frame_id > max_id {
2768                max_id = frame_id;
2769            }
2770            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2771                if let Some(idx) = frame.chunk_index {
2772                    page_start = page_start.min(idx);
2773                    if let Some(count) = frame.chunk_count {
2774                        let end = idx + count.saturating_sub(1);
2775                        page_end = page_end.max(end);
2776                    } else {
2777                        page_end = page_end.max(idx);
2778                    }
2779                }
2780            }
2781        }
2782        Some(SegmentSpan {
2783            frame_start: min_id,
2784            frame_end: max_id,
2785            page_start,
2786            page_end,
2787            ..SegmentSpan::default()
2788        })
2789    }
2790
2791    #[cfg(feature = "parallel_segments")]
2792    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2793        common.span = Some(span);
2794        if common.codec_version == 0 {
2795            common.codec_version = 1;
2796        }
2797    }
2798
2799    #[cfg(feature = "parallel_segments")]
2800    pub(crate) fn record_index_segment(
2801        &mut self,
2802        kind: SegmentKind,
2803        common: SegmentCommon,
2804        stats: SegmentStats,
2805    ) -> Result<()> {
2806        let entry = IndexSegmentRef {
2807            kind,
2808            common,
2809            stats,
2810        };
2811        self.toc.segment_catalog.index_segments.push(entry.clone());
2812        if let Some(wal) = self.manifest_wal.as_mut() {
2813            wal.append_segments(&[entry])?;
2814        }
2815        Ok(())
2816    }
2817
2818    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2819        self.ensure_writable()?;
2820        if self.toc.ticket_ref.issuer == "free-tier" {
2821            return Ok(());
2822        }
2823        match self.tier() {
2824            Tier::Free => Ok(()),
2825            tier => {
2826                if self.toc.ticket_ref.issuer.trim().is_empty() {
2827                    Err(MemvidError::TicketRequired { tier })
2828                } else {
2829                    Ok(())
2830                }
2831            }
2832        }
2833    }
2834
2835    pub(crate) fn tier(&self) -> Tier {
2836        if self.header.wal_size >= WAL_SIZE_LARGE {
2837            Tier::Enterprise
2838        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2839            Tier::Dev
2840        } else {
2841            Tier::Free
2842        }
2843    }
2844
2845    pub(crate) fn capacity_limit(&self) -> u64 {
2846        if self.toc.ticket_ref.capacity_bytes != 0 {
2847            self.toc.ticket_ref.capacity_bytes
2848        } else {
2849            self.tier().capacity_bytes()
2850        }
2851    }
2852
2853    /// Get current storage capacity in bytes.
2854    ///
2855    /// Returns the capacity from the applied ticket, or the default
2856    /// tier capacity (1 GB for free tier).
2857    #[must_use]
2858    pub fn get_capacity(&self) -> u64 {
2859        self.capacity_limit()
2860    }
2861
2862    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2863        tracing::info!(
2864            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2865            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2866            time_segments = self.toc.segment_catalog.time_segments.len(),
2867            footer_offset = self.header.footer_offset,
2868            data_end = self.data_end,
2869            "rewrite_toc_footer: about to serialize TOC"
2870        );
2871        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2872        let footer_offset = self.header.footer_offset;
2873        self.file.seek(SeekFrom::Start(footer_offset))?;
2874        self.file.write_all(&toc_bytes)?;
2875        let footer = CommitFooter {
2876            toc_len: toc_bytes.len() as u64,
2877            toc_hash: *hash(&toc_bytes).as_bytes(),
2878            generation: self.generation,
2879        };
2880        let encoded_footer = footer.encode();
2881        self.file.write_all(&encoded_footer)?;
2882
2883        // The file must always be at least header + WAL size
2884        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2885        let min_len = self.header.wal_offset + self.header.wal_size;
2886        let final_len = new_len.max(min_len);
2887
2888        if new_len < min_len {
2889            tracing::warn!(
2890                file.new_len = new_len,
2891                file.min_len = min_len,
2892                file.final_len = final_len,
2893                "truncation would cut into WAL region, clamping to min_len"
2894            );
2895        }
2896
2897        self.file.set_len(final_len)?;
2898        // Ensure footer is flushed to disk so mmap-based readers can find it
2899        self.file.sync_all()?;
2900        Ok(())
2901    }
2902}
2903
2904#[cfg(feature = "parallel_segments")]
2905impl Memvid {
2906    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2907        let chunks = self.collect_segment_chunks(delta)?;
2908        if chunks.is_empty() {
2909            return Ok(false);
2910        }
2911        let planner = SegmentPlanner::new(opts.clone());
2912        let plans = planner.plan_from_chunks(chunks);
2913        if plans.is_empty() {
2914            return Ok(false);
2915        }
2916        let worker_pool = SegmentWorkerPool::new(opts);
2917        let results = worker_pool.execute(plans)?;
2918        if results.is_empty() {
2919            return Ok(false);
2920        }
2921        self.append_parallel_segments(results)?;
2922        Ok(true)
2923    }
2924
2925    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2926        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2927            delta.inserted_embeddings.iter().cloned().collect();
2928        tracing::info!(
2929            inserted_frames = ?delta.inserted_frames,
2930            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2931            "collect_segment_chunks: comparing frame IDs"
2932        );
2933        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2934        for frame_id in &delta.inserted_frames {
2935            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2936                MemvidError::InvalidFrame {
2937                    frame_id: *frame_id,
2938                    reason: "frame id out of range while planning segments",
2939                },
2940            )?;
2941            let text = self.frame_content(&frame)?;
2942            if text.trim().is_empty() {
2943                continue;
2944            }
2945            let token_estimate = estimate_tokens(&text);
2946            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2947            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2948            let page_start = if frame.chunk_index.is_some() {
2949                chunk_index + 1
2950            } else {
2951                0
2952            };
2953            let page_end = if frame.chunk_index.is_some() {
2954                page_start
2955            } else {
2956                0
2957            };
2958            chunks.push(SegmentChunkPlan {
2959                text,
2960                frame_id: *frame_id,
2961                timestamp: frame.timestamp,
2962                chunk_index,
2963                chunk_count: chunk_count.max(1),
2964                token_estimate,
2965                token_start: 0,
2966                token_end: 0,
2967                page_start,
2968                page_end,
2969                embedding: embedding_map.remove(frame_id),
2970            });
2971        }
2972        Ok(chunks)
2973    }
2974}
2975
2976#[cfg(feature = "parallel_segments")]
2977fn estimate_tokens(text: &str) -> usize {
2978    text.split_whitespace().count().max(1)
2979}
2980
2981impl Memvid {
2982    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2983        let catalog_end = self.catalog_data_end();
2984        if catalog_end <= self.header.footer_offset {
2985            return Ok(false);
2986        }
2987        self.header.footer_offset = catalog_end;
2988        self.rewrite_toc_footer()?;
2989        self.header.toc_checksum = self.toc.toc_checksum;
2990        crate::persist_header(&mut self.file, &self.header)?;
2991        Ok(true)
2992    }
2993}
2994
2995impl Memvid {
2996    pub fn vacuum(&mut self) -> Result<()> {
2997        self.commit()?;
2998
2999        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3000        let frames: Vec<Frame> = self
3001            .toc
3002            .frames
3003            .iter()
3004            .filter(|frame| frame.status == FrameStatus::Active)
3005            .cloned()
3006            .collect();
3007        for frame in frames {
3008            let bytes = self.read_frame_payload_bytes(&frame)?;
3009            active_payloads.insert(frame.id, bytes);
3010        }
3011
3012        let mut cursor = self.header.wal_offset + self.header.wal_size;
3013        self.file.seek(SeekFrom::Start(cursor))?;
3014        for frame in &mut self.toc.frames {
3015            if frame.status == FrameStatus::Active {
3016                if let Some(bytes) = active_payloads.get(&frame.id) {
3017                    self.file.write_all(bytes)?;
3018                    frame.payload_offset = cursor;
3019                    frame.payload_length = bytes.len() as u64;
3020                    cursor += bytes.len() as u64;
3021                } else {
3022                    frame.payload_offset = 0;
3023                    frame.payload_length = 0;
3024                }
3025            } else {
3026                frame.payload_offset = 0;
3027                frame.payload_length = 0;
3028            }
3029        }
3030
3031        self.data_end = cursor;
3032
3033        self.toc.segments.clear();
3034        self.toc.indexes.lex_segments.clear();
3035        self.toc.segment_catalog.lex_segments.clear();
3036        self.toc.segment_catalog.vec_segments.clear();
3037        self.toc.segment_catalog.time_segments.clear();
3038        #[cfg(feature = "temporal_track")]
3039        {
3040            self.toc.temporal_track = None;
3041            self.toc.segment_catalog.temporal_segments.clear();
3042        }
3043        #[cfg(feature = "lex")]
3044        {
3045            self.toc.segment_catalog.tantivy_segments.clear();
3046        }
3047        #[cfg(feature = "parallel_segments")]
3048        {
3049            self.toc.segment_catalog.index_segments.clear();
3050        }
3051
3052        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
3053        #[cfg(feature = "lex")]
3054        {
3055            self.tantivy = None;
3056            self.tantivy_dirty = false;
3057        }
3058
3059        self.rebuild_indexes(&[], &[])?;
3060        self.file.sync_all()?;
3061        Ok(())
3062    }
3063
3064    /// Preview how a document would be chunked without actually ingesting it.
3065    ///
3066    /// This is useful when you need to compute embeddings for each chunk externally
3067    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
3068    /// is too small to be chunked (< 2400 chars after normalization).
3069    ///
3070    /// # Example
3071    /// ```ignore
3072    /// let chunks = mem.preview_chunks(b"long document text...")?;
3073    /// if let Some(chunk_texts) = chunks {
3074    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
3075    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
3076    /// } else {
3077    ///     let embedding = my_embedder.embed_query(text)?;
3078    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
3079    /// }
3080    /// ```
3081    #[must_use]
3082    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3083        plan_document_chunks(payload).map(|plan| plan.chunks)
3084    }
3085
3086    /// Append raw bytes as a document frame.
3087    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3088        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3089    }
3090
3091    /// Append raw bytes with explicit metadata/options.
3092    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3093        self.put_internal(Some(payload), None, None, None, options, None)
3094    }
3095
3096    /// Append bytes and an existing embedding (bypasses on-device embedding).
3097    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3098        self.put_internal(
3099            Some(payload),
3100            None,
3101            Some(embedding),
3102            None,
3103            PutOptions::default(),
3104            None,
3105        )
3106    }
3107
3108    pub fn put_with_embedding_and_options(
3109        &mut self,
3110        payload: &[u8],
3111        embedding: Vec<f32>,
3112        options: PutOptions,
3113    ) -> Result<u64> {
3114        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3115    }
3116
3117    /// Ingest a document with pre-computed embeddings for both parent and chunks.
3118    ///
3119    /// This is the recommended API for high-accuracy semantic search when chunking
3120    /// occurs. The caller provides:
3121    /// - `payload`: The document bytes
3122    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
3123    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
3124    /// - `options`: Standard put options
3125    ///
3126    /// The number of chunk embeddings should match the number of chunks that will be
3127    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
3128    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
3129    pub fn put_with_chunk_embeddings(
3130        &mut self,
3131        payload: &[u8],
3132        parent_embedding: Option<Vec<f32>>,
3133        chunk_embeddings: Vec<Vec<f32>>,
3134        options: PutOptions,
3135    ) -> Result<u64> {
3136        self.put_internal(
3137            Some(payload),
3138            None,
3139            parent_embedding,
3140            Some(chunk_embeddings),
3141            options,
3142            None,
3143        )
3144    }
3145
3146    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
3147    pub fn update_frame(
3148        &mut self,
3149        frame_id: FrameId,
3150        payload: Option<Vec<u8>>,
3151        mut options: PutOptions,
3152        embedding: Option<Vec<f32>>,
3153    ) -> Result<u64> {
3154        self.ensure_mutation_allowed()?;
3155        let existing = self.frame_by_id(frame_id)?;
3156        if existing.status != FrameStatus::Active {
3157            return Err(MemvidError::InvalidFrame {
3158                frame_id,
3159                reason: "frame is not active",
3160            });
3161        }
3162
3163        if options.timestamp.is_none() {
3164            options.timestamp = Some(existing.timestamp);
3165        }
3166        if options.track.is_none() {
3167            options.track = existing.track.clone();
3168        }
3169        if options.kind.is_none() {
3170            options.kind = existing.kind.clone();
3171        }
3172        if options.uri.is_none() {
3173            options.uri = existing.uri.clone();
3174        }
3175        if options.title.is_none() {
3176            options.title = existing.title.clone();
3177        }
3178        if options.metadata.is_none() {
3179            options.metadata = existing.metadata.clone();
3180        }
3181        if options.search_text.is_none() {
3182            options.search_text = existing.search_text.clone();
3183        }
3184        if options.tags.is_empty() {
3185            options.tags = existing.tags.clone();
3186        }
3187        if options.labels.is_empty() {
3188            options.labels = existing.labels.clone();
3189        }
3190        if options.extra_metadata.is_empty() {
3191            options.extra_metadata = existing.extra_metadata.clone();
3192        }
3193
3194        let reuse_frame = if payload.is_none() {
3195            options.auto_tag = false;
3196            options.extract_dates = false;
3197            Some(existing.clone())
3198        } else {
3199            None
3200        };
3201
3202        let effective_embedding = if let Some(explicit) = embedding {
3203            Some(explicit)
3204        } else if self.vec_enabled {
3205            self.frame_embedding(frame_id)?
3206        } else {
3207            None
3208        };
3209
3210        let payload_slice = payload.as_deref();
3211        let reuse_flag = reuse_frame.is_some();
3212        let replace_flag = payload_slice.is_some();
3213        let seq = self.put_internal(
3214            payload_slice,
3215            reuse_frame,
3216            effective_embedding,
3217            None, // No chunk embeddings for update
3218            options,
3219            Some(frame_id),
3220        )?;
3221        info!(
3222            "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3223        );
3224        Ok(seq)
3225    }
3226
3227    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3228        self.ensure_mutation_allowed()?;
3229        let frame = self.frame_by_id(frame_id)?;
3230        if frame.status != FrameStatus::Active {
3231            return Err(MemvidError::InvalidFrame {
3232                frame_id,
3233                reason: "frame is not active",
3234            });
3235        }
3236
3237        let mut tombstone = WalEntryData {
3238            timestamp: SystemTime::now()
3239                .duration_since(UNIX_EPOCH)
3240                .map(|d| d.as_secs() as i64)
3241                .unwrap_or(frame.timestamp),
3242            kind: None,
3243            track: None,
3244            payload: Vec::new(),
3245            embedding: None,
3246            uri: frame.uri.clone(),
3247            title: frame.title.clone(),
3248            canonical_encoding: frame.canonical_encoding,
3249            canonical_length: frame.canonical_length,
3250            metadata: None,
3251            search_text: None,
3252            tags: Vec::new(),
3253            labels: Vec::new(),
3254            extra_metadata: BTreeMap::new(),
3255            content_dates: Vec::new(),
3256            chunk_manifest: None,
3257            role: frame.role,
3258            parent_sequence: None,
3259            chunk_index: frame.chunk_index,
3260            chunk_count: frame.chunk_count,
3261            op: FrameWalOp::Tombstone,
3262            target_frame_id: Some(frame_id),
3263            supersedes_frame_id: None,
3264            reuse_payload_from: None,
3265            source_sha256: None,
3266            source_path: None,
3267            enrichment_state: crate::types::EnrichmentState::default(),
3268        };
3269        tombstone.kind = frame.kind.clone();
3270        tombstone.track = frame.track.clone();
3271
3272        let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3273        let seq = self.append_wal_entry(&payload_bytes)?;
3274        self.dirty = true;
3275        let suppress_checkpoint = self
3276            .batch_opts
3277            .as_ref()
3278            .is_some_and(|o| o.disable_auto_checkpoint);
3279        if !suppress_checkpoint && self.wal.should_checkpoint() {
3280            self.commit()?;
3281        }
3282        info!("frame_delete frame_id={frame_id} seq={seq}");
3283        Ok(seq)
3284    }
3285}
3286
3287impl Memvid {
3288    fn put_internal(
3289        &mut self,
3290        payload: Option<&[u8]>,
3291        reuse_frame: Option<Frame>,
3292        embedding: Option<Vec<f32>>,
3293        chunk_embeddings: Option<Vec<Vec<f32>>>,
3294        mut options: PutOptions,
3295        supersedes: Option<FrameId>,
3296    ) -> Result<u64> {
3297        self.ensure_mutation_allowed()?;
3298
3299        // Deduplication: if enabled and we have payload, check if identical content exists
3300        if options.dedup {
3301            if let Some(bytes) = payload {
3302                let content_hash = hash(bytes);
3303                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3304                    // Found existing frame with same content hash, skip ingestion
3305                    tracing::debug!(
3306                        frame_id = existing_frame.id,
3307                        "dedup: skipping ingestion, identical content already exists"
3308                    );
3309                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3310                    return Ok(existing_frame.id);
3311                }
3312            }
3313        }
3314
3315        if payload.is_some() && reuse_frame.is_some() {
3316            let frame_id = reuse_frame
3317                .as_ref()
3318                .map(|frame| frame.id)
3319                .unwrap_or_default();
3320            return Err(MemvidError::InvalidFrame {
3321                frame_id,
3322                reason: "cannot reuse payload when bytes are provided",
3323            });
3324        }
3325
3326        // If the caller supplies embeddings, enforce a single vector dimension contract
3327        // for the entire memory (fail fast, never silently accept mixed dimensions).
3328        let incoming_dimension = {
3329            let mut dim: Option<u32> = None;
3330
3331            if let Some(ref vector) = embedding {
3332                if !vector.is_empty() {
3333                    #[allow(clippy::cast_possible_truncation)]
3334                    let len = vector.len() as u32;
3335                    dim = Some(len);
3336                }
3337            }
3338
3339            if let Some(ref vectors) = chunk_embeddings {
3340                for vector in vectors {
3341                    if vector.is_empty() {
3342                        continue;
3343                    }
3344                    let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3345                    match dim {
3346                        None => dim = Some(vec_dim),
3347                        Some(existing) if existing == vec_dim => {}
3348                        Some(existing) => {
3349                            return Err(MemvidError::VecDimensionMismatch {
3350                                expected: existing,
3351                                actual: vector.len(),
3352                            });
3353                        }
3354                    }
3355                }
3356            }
3357
3358            dim
3359        };
3360
3361        if let Some(incoming_dimension) = incoming_dimension {
3362            // Embeddings imply vector search should be enabled.
3363            if !self.vec_enabled {
3364                self.enable_vec()?;
3365            }
3366
3367            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3368                if existing_dimension != incoming_dimension {
3369                    return Err(MemvidError::VecDimensionMismatch {
3370                        expected: existing_dimension,
3371                        actual: incoming_dimension as usize,
3372                    });
3373                }
3374            }
3375
3376            // Persist the dimension early for better auto-detection (even before the next commit).
3377            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3378                if manifest.dimension == 0 {
3379                    manifest.dimension = incoming_dimension;
3380                }
3381            }
3382        }
3383
3384        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3385        let payload_tail = self.payload_region_end();
3386        let projected = if let Some(bytes) = payload {
3387            let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3388                prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3389            } else {
3390                prepare_canonical_payload(bytes)?
3391            };
3392            let len = prepared.len();
3393            prepared_payload = Some((prepared, encoding, length));
3394            payload_tail.saturating_add(len as u64)
3395        } else if reuse_frame.is_some() {
3396            payload_tail
3397        } else {
3398            return Err(MemvidError::InvalidFrame {
3399                frame_id: 0,
3400                reason: "payload required for frame insertion",
3401            });
3402        };
3403
3404        let capacity_limit = self.capacity_limit();
3405        if projected > capacity_limit {
3406            let incoming_size = projected.saturating_sub(payload_tail);
3407            return Err(MemvidError::CapacityExceeded {
3408                current: payload_tail,
3409                limit: capacity_limit,
3410                required: incoming_size,
3411            });
3412        }
3413        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3414            SystemTime::now()
3415                .duration_since(UNIX_EPOCH)
3416                .map(|d| d.as_secs() as i64)
3417                .unwrap_or(0)
3418        });
3419
3420        #[allow(unused_assignments)]
3421        let mut reuse_bytes: Option<Vec<u8>> = None;
3422        let payload_for_processing = if let Some(bytes) = payload {
3423            Some(bytes)
3424        } else if let Some(frame) = reuse_frame.as_ref() {
3425            let bytes = self.frame_canonical_bytes(frame)?;
3426            reuse_bytes = Some(bytes);
3427            reuse_bytes.as_deref()
3428        } else {
3429            None
3430        };
3431
3432        // Try to create a chunk plan from raw UTF-8 bytes first
3433        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3434            (Some(bytes), None) => plan_document_chunks(bytes),
3435            _ => None,
3436        };
3437
3438        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3439        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3440        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3441        let mut source_sha256: Option<[u8; 32]> = None;
3442        let source_path_value = options.source_path.take();
3443
3444        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3445            if raw_chunk_plan.is_some() {
3446                // UTF-8 text document - chunks contain the text, no parent payload needed
3447                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3448            } else if options.no_raw {
3449                // --no-raw mode: don't store the raw binary, only compute hash
3450                if let Some(bytes) = payload {
3451                    // Compute BLAKE3 hash of original binary for verification
3452                    let hash_result = hash(bytes);
3453                    source_sha256 = Some(*hash_result.as_bytes());
3454                    // Store empty payload - the extracted text is in search_text
3455                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3456                } else {
3457                    return Err(MemvidError::InvalidFrame {
3458                        frame_id: 0,
3459                        reason: "payload required for --no-raw mode",
3460                    });
3461                }
3462            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3463                (prepared, encoding, length, None)
3464            } else if let Some(bytes) = payload {
3465                let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3466                    prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3467                } else {
3468                    prepare_canonical_payload(bytes)?
3469                };
3470                (prepared, encoding, length, None)
3471            } else if let Some(frame) = reuse_frame.as_ref() {
3472                (
3473                    Vec::new(),
3474                    frame.canonical_encoding,
3475                    frame.canonical_length,
3476                    Some(frame.id),
3477                )
3478            } else {
3479                return Err(MemvidError::InvalidFrame {
3480                    frame_id: 0,
3481                    reason: "payload required for frame insertion",
3482                });
3483            };
3484
3485        // Track whether we'll create an extracted text chunk plan later
3486        let mut chunk_plan = raw_chunk_plan;
3487
3488        let mut metadata = options.metadata.take();
3489        let mut search_text = options
3490            .search_text
3491            .take()
3492            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3493        let mut tags = std::mem::take(&mut options.tags);
3494        let mut labels = std::mem::take(&mut options.labels);
3495        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3496        let mut content_dates: Vec<String> = Vec::new();
3497
3498        let need_search_text = search_text
3499            .as_ref()
3500            .is_none_or(|text| text.trim().is_empty());
3501        let need_metadata = metadata.is_none();
3502        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3503
3504        let mut extraction_error = None;
3505        let mut is_skim_extraction = false; // Track if extraction was time-limited
3506
3507        let extracted = if run_extractor {
3508            if let Some(bytes) = payload_for_processing {
3509                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3510                let uri_hint = options.uri.as_deref();
3511
3512                // Use time-budgeted extraction for instant indexing with a budget
3513                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3514
3515                if use_budgeted {
3516                    // Time-budgeted extraction for sub-second ingestion
3517                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3518                        options.extraction_budget_ms,
3519                    );
3520                    match crate::extract_budgeted::extract_with_budget(
3521                        bytes, mime_hint, uri_hint, budget,
3522                    ) {
3523                        Ok(result) => {
3524                            is_skim_extraction = result.is_skim();
3525                            if is_skim_extraction {
3526                                tracing::debug!(
3527                                    coverage = result.coverage,
3528                                    elapsed_ms = result.elapsed_ms,
3529                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3530                                    "time-budgeted extraction (skim)"
3531                                );
3532                            }
3533                            // Convert BudgetedExtractionResult to ExtractedDocument
3534                            let doc = crate::extract::ExtractedDocument {
3535                                text: if result.text.is_empty() {
3536                                    None
3537                                } else {
3538                                    Some(result.text)
3539                                },
3540                                metadata: serde_json::json!({
3541                                    "skim": is_skim_extraction,
3542                                    "coverage": result.coverage,
3543                                    "sections_extracted": result.sections_extracted,
3544                                    "sections_total": result.sections_total,
3545                                }),
3546                                mime_type: mime_hint.map(std::string::ToString::to_string),
3547                            };
3548                            Some(doc)
3549                        }
3550                        Err(err) => {
3551                            // Fall back to full extraction on budgeted extraction error
3552                            tracing::warn!(
3553                                ?err,
3554                                "budgeted extraction failed, trying full extraction"
3555                            );
3556                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3557                                Ok(doc) => Some(doc),
3558                                Err(err) => {
3559                                    extraction_error = Some(err);
3560                                    None
3561                                }
3562                            }
3563                        }
3564                    }
3565                } else {
3566                    // Full extraction (no time budget)
3567                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3568                        Ok(doc) => Some(doc),
3569                        Err(err) => {
3570                            extraction_error = Some(err);
3571                            None
3572                        }
3573                    }
3574                }
3575            } else {
3576                None
3577            }
3578        } else {
3579            None
3580        };
3581
3582        if let Some(err) = extraction_error {
3583            return Err(err);
3584        }
3585
3586        if let Some(doc) = &extracted {
3587            if need_search_text {
3588                if let Some(text) = &doc.text {
3589                    if let Some(normalized) =
3590                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3591                    {
3592                        search_text = Some(normalized);
3593                    }
3594                }
3595            }
3596
3597            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3598            // from extracted text. This ensures large documents like PDFs get fully indexed.
3599            if chunk_plan.is_none() {
3600                if let Some(text) = &doc.text {
3601                    chunk_plan = plan_text_chunks(text);
3602                }
3603            }
3604
3605            if let Some(mime) = doc.mime_type.as_ref() {
3606                if let Some(existing) = &mut metadata {
3607                    if existing.mime.is_none() {
3608                        existing.mime = Some(mime.clone());
3609                    }
3610                } else {
3611                    let mut doc_meta = DocMetadata::default();
3612                    doc_meta.mime = Some(mime.clone());
3613                    metadata = Some(doc_meta);
3614                }
3615            }
3616
3617            // Only add extractous_metadata when auto_tag is enabled
3618            // This allows callers to disable metadata pollution by setting auto_tag=false
3619            if options.auto_tag {
3620                if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3621                {
3622                    extra_metadata
3623                        .entry("extractous_metadata".to_string())
3624                        .or_insert(meta_json);
3625                }
3626            }
3627        }
3628
3629        if options.auto_tag {
3630            if let Some(ref text) = search_text {
3631                if !text.trim().is_empty() {
3632                    let result = AutoTagger.analyse(text, options.extract_dates);
3633                    merge_unique(&mut tags, result.tags);
3634                    merge_unique(&mut labels, result.labels);
3635                    if options.extract_dates && content_dates.is_empty() {
3636                        content_dates = result.content_dates;
3637                    }
3638                }
3639            }
3640        }
3641
3642        if content_dates.is_empty() {
3643            if let Some(frame) = reuse_frame.as_ref() {
3644                content_dates = frame.content_dates.clone();
3645            }
3646        }
3647
3648        let metadata_ref = metadata.as_ref();
3649        let mut search_text = augment_search_text(
3650            search_text,
3651            options.uri.as_deref(),
3652            options.title.as_deref(),
3653            options.track.as_deref(),
3654            &tags,
3655            &labels,
3656            &extra_metadata,
3657            &content_dates,
3658            metadata_ref,
3659        );
3660        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3661        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3662        let mut parent_chunk_count: Option<u32> = None;
3663
3664        let kind_value = options.kind.take();
3665        let track_value = options.track.take();
3666        let uri_value = options.uri.take();
3667        let title_value = options.title.take();
3668        let should_extract_triplets = options.extract_triplets;
3669        // Save references for triplet extraction (after search_text is moved into WAL entry)
3670        let triplet_uri = uri_value.clone();
3671        let triplet_title = title_value.clone();
3672
3673        if let Some(plan) = chunk_plan.as_ref() {
3674            let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3675            parent_chunk_manifest = Some(plan.manifest.clone());
3676            parent_chunk_count = Some(chunk_total);
3677
3678            if let Some(first_chunk) = plan.chunks.first() {
3679                if let Some(normalized) =
3680                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3681                {
3682                    if !normalized.trim().is_empty() {
3683                        search_text = Some(normalized);
3684                    }
3685                }
3686            }
3687
3688            let chunk_tags = tags.clone();
3689            let chunk_labels = labels.clone();
3690            let chunk_metadata = metadata.clone();
3691            let chunk_extra_metadata = extra_metadata.clone();
3692            let chunk_content_dates = content_dates.clone();
3693
3694            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3695                let (chunk_payload, chunk_encoding, chunk_length) =
3696                    prepare_canonical_payload(chunk_text.as_bytes())?;
3697                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3698                    .map(|n| n.text)
3699                    .filter(|text| !text.trim().is_empty());
3700
3701                let chunk_uri = uri_value
3702                    .as_ref()
3703                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3704                let chunk_title = title_value
3705                    .as_ref()
3706                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3707
3708                // Use provided chunk embedding if available, otherwise None
3709                let chunk_embedding = chunk_embeddings
3710                    .as_ref()
3711                    .and_then(|embeddings| embeddings.get(idx).cloned());
3712
3713                chunk_entries.push(WalEntryData {
3714                    timestamp,
3715                    kind: kind_value.clone(),
3716                    track: track_value.clone(),
3717                    payload: chunk_payload,
3718                    embedding: chunk_embedding,
3719                    uri: chunk_uri,
3720                    title: chunk_title,
3721                    canonical_encoding: chunk_encoding,
3722                    canonical_length: chunk_length,
3723                    metadata: chunk_metadata.clone(),
3724                    search_text: chunk_search_text,
3725                    tags: chunk_tags.clone(),
3726                    labels: chunk_labels.clone(),
3727                    extra_metadata: chunk_extra_metadata.clone(),
3728                    content_dates: chunk_content_dates.clone(),
3729                    chunk_manifest: None,
3730                    role: FrameRole::DocumentChunk,
3731                    parent_sequence: None,
3732                    chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3733                    chunk_count: Some(chunk_total),
3734                    op: FrameWalOp::Insert,
3735                    target_frame_id: None,
3736                    supersedes_frame_id: None,
3737                    reuse_payload_from: None,
3738                    source_sha256: None, // Chunks don't have source references
3739                    source_path: None,
3740                    // Chunks are already extracted, so mark as Enriched
3741                    enrichment_state: crate::types::EnrichmentState::Enriched,
3742                });
3743            }
3744        }
3745
3746        let parent_uri = uri_value.clone();
3747        let parent_title = title_value.clone();
3748
3749        // Get parent_sequence from options.parent_id if provided
3750        // We need the WAL sequence of the parent frame to link them
3751        let parent_sequence = if let Some(parent_id) = options.parent_id {
3752            // Look up the parent frame to get its WAL sequence
3753            // Since frame.id corresponds to the array index, we need to find the sequence
3754            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3755            // This works because sequence numbers are assigned incrementally
3756            usize::try_from(parent_id)
3757                .ok()
3758                .and_then(|idx| self.toc.frames.get(idx))
3759                .map(|_| parent_id + 2) // WAL sequences start at 2
3760        } else {
3761            None
3762        };
3763
3764        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3765        let triplet_text = search_text.clone();
3766
3767        // Capture values needed for instant indexing BEFORE they're moved into entry
3768        #[cfg(feature = "lex")]
3769        let instant_index_tags = if options.instant_index {
3770            tags.clone()
3771        } else {
3772            Vec::new()
3773        };
3774        #[cfg(feature = "lex")]
3775        let instant_index_labels = if options.instant_index {
3776            labels.clone()
3777        } else {
3778            Vec::new()
3779        };
3780
3781        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3782        #[cfg(feature = "lex")]
3783        let needs_enrichment =
3784            options.instant_index && (options.enable_embedding || is_skim_extraction);
3785        #[cfg(feature = "lex")]
3786        let enrichment_state = if needs_enrichment {
3787            crate::types::EnrichmentState::Searchable
3788        } else {
3789            crate::types::EnrichmentState::Enriched
3790        };
3791        #[cfg(not(feature = "lex"))]
3792        let enrichment_state = crate::types::EnrichmentState::Enriched;
3793
3794        let entry = WalEntryData {
3795            timestamp,
3796            kind: kind_value,
3797            track: track_value,
3798            payload: storage_payload,
3799            embedding,
3800            uri: parent_uri,
3801            title: parent_title,
3802            canonical_encoding,
3803            canonical_length,
3804            metadata,
3805            search_text,
3806            tags,
3807            labels,
3808            extra_metadata,
3809            content_dates,
3810            chunk_manifest: parent_chunk_manifest,
3811            role: options.role,
3812            parent_sequence,
3813            chunk_index: None,
3814            chunk_count: parent_chunk_count,
3815            op: FrameWalOp::Insert,
3816            target_frame_id: None,
3817            supersedes_frame_id: supersedes,
3818            reuse_payload_from,
3819            source_sha256,
3820            source_path: source_path_value,
3821            enrichment_state,
3822        };
3823
3824        let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3825        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3826        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3827
3828        // Instant indexing: make frame searchable immediately (<1s) without full commit
3829        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3830        #[cfg(feature = "lex")]
3831        if options.instant_index && self.tantivy.is_some() {
3832            // Create a minimal frame for indexing
3833            let frame_id = parent_seq as FrameId;
3834
3835            // Use triplet_text which was cloned before entry was created
3836            if let Some(ref text) = triplet_text {
3837                if !text.trim().is_empty() {
3838                    // Create temporary frame for indexing (minimal fields for Tantivy)
3839                    let temp_frame = Frame {
3840                        id: frame_id,
3841                        timestamp,
3842                        anchor_ts: None,
3843                        anchor_source: None,
3844                        kind: options.kind.clone(),
3845                        track: options.track.clone(),
3846                        payload_offset: 0,
3847                        payload_length: 0,
3848                        checksum: [0u8; 32],
3849                        uri: options
3850                            .uri
3851                            .clone()
3852                            .or_else(|| Some(crate::default_uri(frame_id))),
3853                        title: options.title.clone(),
3854                        canonical_encoding: crate::types::CanonicalEncoding::default(),
3855                        canonical_length: None,
3856                        metadata: None, // Not needed for text search
3857                        search_text: triplet_text.clone(),
3858                        tags: instant_index_tags.clone(),
3859                        labels: instant_index_labels.clone(),
3860                        extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3861                        content_dates: Vec::new(),                         // Not needed for search
3862                        chunk_manifest: None,
3863                        role: options.role,
3864                        parent_id: None,
3865                        chunk_index: None,
3866                        chunk_count: None,
3867                        status: FrameStatus::Active,
3868                        supersedes,
3869                        superseded_by: None,
3870                        source_sha256: None, // Not needed for search
3871                        source_path: None,   // Not needed for search
3872                        enrichment_state: crate::types::EnrichmentState::Searchable,
3873                    };
3874
3875                    // Get mutable reference to engine and index the frame
3876                    if let Some(engine) = self.tantivy.as_mut() {
3877                        engine.add_frame(&temp_frame, text)?;
3878                        engine.soft_commit()?;
3879                        self.tantivy_dirty = true;
3880
3881                        tracing::debug!(
3882                            frame_id = frame_id,
3883                            "instant index: frame searchable immediately"
3884                        );
3885                    }
3886                }
3887            }
3888        }
3889
3890        // Queue frame for background enrichment when using instant index path
3891        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3892        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3893        #[cfg(feature = "lex")]
3894        if needs_enrichment {
3895            let frame_id = parent_seq as FrameId;
3896            self.toc.enrichment_queue.push(frame_id);
3897            tracing::debug!(
3898                frame_id = frame_id,
3899                is_skim = is_skim_extraction,
3900                needs_embedding = options.enable_embedding,
3901                "queued frame for background enrichment"
3902            );
3903        }
3904
3905        for mut chunk_entry in chunk_entries {
3906            chunk_entry.parent_sequence = Some(parent_seq);
3907            let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3908            self.append_wal_entry(&chunk_bytes)?;
3909            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3910        }
3911
3912        self.dirty = true;
3913        let suppress_checkpoint = self
3914            .batch_opts
3915            .as_ref()
3916            .is_some_and(|o| o.disable_auto_checkpoint);
3917        if !suppress_checkpoint && self.wal.should_checkpoint() {
3918            self.commit()?;
3919        }
3920
3921        // Record the put action if a replay session is active
3922        #[cfg(feature = "replay")]
3923        if let Some(input_bytes) = payload {
3924            self.record_put_action(parent_seq, input_bytes);
3925        }
3926
3927        // Extract triplets if enabled (default: true)
3928        // Triplets are stored as MemoryCards with entity/slot/value structure
3929        if should_extract_triplets {
3930            if let Some(ref text) = triplet_text {
3931                if !text.trim().is_empty() {
3932                    let extractor = TripletExtractor::default();
3933                    let frame_id = parent_seq as FrameId;
3934                    let (cards, _stats) = extractor.extract(
3935                        frame_id,
3936                        text,
3937                        triplet_uri.as_deref(),
3938                        triplet_title.as_deref(),
3939                        timestamp,
3940                    );
3941
3942                    if !cards.is_empty() {
3943                        // Add cards to memories track
3944                        let card_ids = self.memories_track.add_cards(cards);
3945
3946                        // Record enrichment for incremental processing
3947                        self.memories_track
3948                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3949                    }
3950                }
3951            }
3952        }
3953
3954        Ok(parent_seq)
3955    }
3956}
3957
3958#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3959#[serde(rename_all = "snake_case")]
3960pub(crate) enum FrameWalOp {
3961    Insert,
3962    Tombstone,
3963}
3964
3965impl Default for FrameWalOp {
3966    fn default() -> Self {
3967        Self::Insert
3968    }
3969}
3970
3971#[derive(Debug, Serialize, Deserialize)]
3972enum WalEntry {
3973    Frame(WalEntryData),
3974    #[cfg(feature = "lex")]
3975    Lex(LexWalBatch),
3976}
3977
3978fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3979    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3980        return Ok(entry);
3981    }
3982    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3983    Ok(WalEntry::Frame(legacy))
3984}
3985
3986#[derive(Debug, Serialize, Deserialize)]
3987pub(crate) struct WalEntryData {
3988    pub(crate) timestamp: i64,
3989    pub(crate) kind: Option<String>,
3990    pub(crate) track: Option<String>,
3991    pub(crate) payload: Vec<u8>,
3992    pub(crate) embedding: Option<Vec<f32>>,
3993    #[serde(default)]
3994    pub(crate) uri: Option<String>,
3995    #[serde(default)]
3996    pub(crate) title: Option<String>,
3997    #[serde(default)]
3998    pub(crate) canonical_encoding: CanonicalEncoding,
3999    #[serde(default)]
4000    pub(crate) canonical_length: Option<u64>,
4001    #[serde(default)]
4002    pub(crate) metadata: Option<DocMetadata>,
4003    #[serde(default)]
4004    pub(crate) search_text: Option<String>,
4005    #[serde(default)]
4006    pub(crate) tags: Vec<String>,
4007    #[serde(default)]
4008    pub(crate) labels: Vec<String>,
4009    #[serde(default)]
4010    pub(crate) extra_metadata: BTreeMap<String, String>,
4011    #[serde(default)]
4012    pub(crate) content_dates: Vec<String>,
4013    #[serde(default)]
4014    pub(crate) chunk_manifest: Option<TextChunkManifest>,
4015    #[serde(default)]
4016    pub(crate) role: FrameRole,
4017    #[serde(default)]
4018    pub(crate) parent_sequence: Option<u64>,
4019    #[serde(default)]
4020    pub(crate) chunk_index: Option<u32>,
4021    #[serde(default)]
4022    pub(crate) chunk_count: Option<u32>,
4023    #[serde(default)]
4024    pub(crate) op: FrameWalOp,
4025    #[serde(default)]
4026    pub(crate) target_frame_id: Option<FrameId>,
4027    #[serde(default)]
4028    pub(crate) supersedes_frame_id: Option<FrameId>,
4029    #[serde(default)]
4030    pub(crate) reuse_payload_from: Option<FrameId>,
4031    /// SHA-256 hash of original source file (set when --no-raw is used).
4032    #[serde(default)]
4033    pub(crate) source_sha256: Option<[u8; 32]>,
4034    /// Original source file path (set when --no-raw is used).
4035    #[serde(default)]
4036    pub(crate) source_path: Option<String>,
4037    /// Enrichment state for progressive ingestion.
4038    #[serde(default)]
4039    pub(crate) enrichment_state: crate::types::EnrichmentState,
4040}
4041
4042pub(crate) fn prepare_canonical_payload(
4043    payload: &[u8],
4044) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4045    prepare_canonical_payload_with_level(payload, 3)
4046}
4047
4048pub(crate) fn prepare_canonical_payload_with_level(
4049    payload: &[u8],
4050    level: i32,
4051) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4052    if level == 0 {
4053        // No compression — store as plain text
4054        return Ok((
4055            payload.to_vec(),
4056            CanonicalEncoding::Plain,
4057            Some(payload.len() as u64),
4058        ));
4059    }
4060    if std::str::from_utf8(payload).is_ok() {
4061        let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4062        Ok((
4063            compressed,
4064            CanonicalEncoding::Zstd,
4065            Some(payload.len() as u64),
4066        ))
4067    } else {
4068        Ok((
4069            payload.to_vec(),
4070            CanonicalEncoding::Plain,
4071            Some(payload.len() as u64),
4072        ))
4073    }
4074}
4075
4076pub(crate) fn augment_search_text(
4077    base: Option<String>,
4078    uri: Option<&str>,
4079    title: Option<&str>,
4080    track: Option<&str>,
4081    tags: &[String],
4082    labels: &[String],
4083    extra_metadata: &BTreeMap<String, String>,
4084    content_dates: &[String],
4085    metadata: Option<&DocMetadata>,
4086) -> Option<String> {
4087    let mut segments: Vec<String> = Vec::new();
4088    if let Some(text) = base {
4089        let trimmed = text.trim();
4090        if !trimmed.is_empty() {
4091            segments.push(trimmed.to_string());
4092        }
4093    }
4094
4095    if let Some(title) = title {
4096        if !title.trim().is_empty() {
4097            segments.push(format!("title: {}", title.trim()));
4098        }
4099    }
4100
4101    if let Some(uri) = uri {
4102        if !uri.trim().is_empty() {
4103            segments.push(format!("uri: {}", uri.trim()));
4104        }
4105    }
4106
4107    if let Some(track) = track {
4108        if !track.trim().is_empty() {
4109            segments.push(format!("track: {}", track.trim()));
4110        }
4111    }
4112
4113    if !tags.is_empty() {
4114        segments.push(format!("tags: {}", tags.join(" ")));
4115    }
4116
4117    if !labels.is_empty() {
4118        segments.push(format!("labels: {}", labels.join(" ")));
4119    }
4120
4121    if !extra_metadata.is_empty() {
4122        for (key, value) in extra_metadata {
4123            if value.trim().is_empty() {
4124                continue;
4125            }
4126            segments.push(format!("{key}: {value}"));
4127        }
4128    }
4129
4130    if !content_dates.is_empty() {
4131        segments.push(format!("dates: {}", content_dates.join(" ")));
4132    }
4133
4134    if let Some(meta) = metadata {
4135        if let Ok(meta_json) = serde_json::to_string(meta) {
4136            segments.push(format!("metadata: {meta_json}"));
4137        }
4138    }
4139
4140    if segments.is_empty() {
4141        None
4142    } else {
4143        Some(segments.join("\n"))
4144    }
4145}
4146
4147pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4148    if additions.is_empty() {
4149        return;
4150    }
4151    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4152    for value in additions {
4153        let trimmed = value.trim();
4154        if trimmed.is_empty() {
4155            continue;
4156        }
4157        let candidate = trimmed.to_string();
4158        if seen.insert(candidate.clone()) {
4159            target.push(candidate);
4160        }
4161    }
4162}