Skip to main content

memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58    PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    #[must_use]
210    pub fn new(mode: CommitMode) -> Self {
211        Self {
212            mode,
213            background: false,
214        }
215    }
216
217    #[must_use]
218    pub fn background(mut self, background: bool) -> Self {
219        self.background = background;
220        self
221    }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226    REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230    mime: Option<&str>,
231    magic: Option<&[u8]>,
232    uri: Option<&str>,
233) -> Option<DocumentFormat> {
234    // Check PDF magic bytes first
235    if detect_pdf_magic(magic) {
236        return Some(DocumentFormat::Pdf);
237    }
238
239    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
240    // so we need to check file extension to distinguish them
241    if let Some(magic_bytes) = magic {
242        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243            // It's a ZIP file - check extension to determine OOXML type
244            if let Some(format) = infer_format_from_extension(uri) {
245                return Some(format);
246            }
247        }
248    }
249
250    // Try MIME type
251    if let Some(mime_str) = mime {
252        let mime_lower = mime_str.trim().to_ascii_lowercase();
253        let format = match mime_lower.as_str() {
254            "application/pdf" => Some(DocumentFormat::Pdf),
255            "text/plain" => Some(DocumentFormat::PlainText),
256            "text/markdown" => Some(DocumentFormat::Markdown),
257            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259                Some(DocumentFormat::Docx)
260            }
261            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262                Some(DocumentFormat::Xlsx)
263            }
264            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266                Some(DocumentFormat::Pptx)
267            }
268            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269            _ => None,
270        };
271        if format.is_some() {
272            return format;
273        }
274    }
275
276    // Fall back to extension-based detection
277    infer_format_from_extension(uri)
278}
279
280/// Infer document format from file extension in URI/path
281fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282    let uri = uri?;
283    let path = std::path::Path::new(uri);
284    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285    match ext.as_str() {
286        "pdf" => Some(DocumentFormat::Pdf),
287        "docx" => Some(DocumentFormat::Docx),
288        "xlsx" => Some(DocumentFormat::Xlsx),
289        "xls" => Some(DocumentFormat::Xls),
290        "pptx" => Some(DocumentFormat::Pptx),
291        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294        | "sql" => Some(DocumentFormat::PlainText),
295        "md" | "markdown" => Some(DocumentFormat::Markdown),
296        "html" | "htm" => Some(DocumentFormat::Html),
297        _ => None,
298    }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302    let mut slice = match magic {
303        Some(slice) if !slice.is_empty() => slice,
304        _ => return false,
305    };
306    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307        slice = &slice[3..];
308    }
309    while let Some((first, rest)) = slice.split_first() {
310        if first.is_ascii_whitespace() {
311            slice = rest;
312        } else {
313            break;
314        }
315    }
316    slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320    target = "memvid::extract",
321    skip_all,
322    fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325    bytes: &[u8],
326    mime_hint: Option<&str>,
327    uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329    let registry = default_reader_registry();
330    let magic = bytes
331        .get(..MAGIC_SNIFF_BYTES)
332        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334        .with_uri(uri)
335        .with_magic(magic);
336
337    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338        let start = Instant::now();
339        match reader.extract(bytes, &hint) {
340            Ok(output) => {
341                return Ok(finalize_reader_output(output, start));
342            }
343            Err(err) => {
344                tracing::error!(
345                    target = "memvid::extract",
346                    reader = reader.name(),
347                    error = %err,
348                    "reader failed; falling back"
349                );
350                Some(format!("reader {} failed: {err}", reader.name()))
351            }
352        }
353    } else {
354        tracing::debug!(
355            target = "memvid::extract",
356            format = hint.format.map(super::super::reader::DocumentFormat::label),
357            "no reader matched; using default extractor"
358        );
359        Some("no registered reader matched; using default extractor".to_string())
360    };
361
362    let start = Instant::now();
363    let mut output = PassthroughReader.extract(bytes, &hint)?;
364    if let Some(reason) = fallback_reason {
365        output.diagnostics.track_warning(reason);
366    }
367    Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371    let elapsed = start.elapsed();
372    let ReaderOutput {
373        document,
374        reader_name,
375        diagnostics,
376    } = output;
377    log_reader_result(&reader_name, &diagnostics, elapsed);
378    document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382    let duration_ms = diagnostics
383        .duration_ms
384        .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385    let warnings = diagnostics.warnings.len();
386    let pages = diagnostics.pages_processed;
387
388    if warnings > 0 || diagnostics.fallback {
389        tracing::warn!(
390            target = "memvid::extract",
391            reader,
392            duration_ms,
393            pages,
394            warnings,
395            fallback = diagnostics.fallback,
396            "extraction completed with warnings"
397        );
398        for warning in &diagnostics.warnings {
399            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400        }
401    } else {
402        tracing::info!(
403            target = "memvid::extract",
404            reader,
405            duration_ms,
406            pages,
407            "extraction completed"
408        );
409    }
410}
411
412impl Memvid {
413    // -- Public ingestion entrypoints ---------------------------------------------------------
414
415    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416    where
417        F: FnOnce(&mut Self) -> Result<()>,
418    {
419        self.file.sync_all()?;
420        let mut staging = CommitStaging::prepare(self.path())?;
421        staging.copy_from(&self.file)?;
422
423        let staging_handle = staging.clone_file()?;
424        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425        let original_file = std::mem::replace(&mut self.file, staging_handle);
426        let original_wal = std::mem::replace(&mut self.wal, new_wal);
427        let original_header = self.header.clone();
428        let original_toc = self.toc.clone();
429        let original_data_end = self.data_end;
430        let original_generation = self.generation;
431        let original_dirty = self.dirty;
432        let original_lex_enabled = self.lex_enabled;
433        #[cfg(feature = "lex")]
434        let original_tantivy_dirty = self.tantivy_dirty;
435
436        let destination_path = self.path().to_path_buf();
437        let mut original_file = Some(original_file);
438        let mut original_wal = Some(original_wal);
439
440        match op(self) {
441            Ok(()) => {
442                self.file.sync_all()?;
443                match staging.commit() {
444                    Ok(()) => {
445                        drop(original_file.take());
446                        drop(original_wal.take());
447                        self.file = OpenOptions::new()
448                            .read(true)
449                            .write(true)
450                            .open(&destination_path)?;
451                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
452                        Ok(())
453                    }
454                    Err(commit_err) => {
455                        if let Some(file) = original_file.take() {
456                            self.file = file;
457                        }
458                        if let Some(wal) = original_wal.take() {
459                            self.wal = wal;
460                        }
461                        self.header = original_header;
462                        self.toc = original_toc;
463                        self.data_end = original_data_end;
464                        self.generation = original_generation;
465                        self.dirty = original_dirty;
466                        self.lex_enabled = original_lex_enabled;
467                        #[cfg(feature = "lex")]
468                        {
469                            self.tantivy_dirty = original_tantivy_dirty;
470                        }
471                        Err(commit_err)
472                    }
473                }
474            }
475            Err(err) => {
476                let _ = staging.discard();
477                if let Some(file) = original_file.take() {
478                    self.file = file;
479                }
480                if let Some(wal) = original_wal.take() {
481                    self.wal = wal;
482                }
483                self.header = original_header;
484                self.toc = original_toc;
485                self.data_end = original_data_end;
486                self.generation = original_generation;
487                self.dirty = original_dirty;
488                self.lex_enabled = original_lex_enabled;
489                #[cfg(feature = "lex")]
490                {
491                    self.tantivy_dirty = original_tantivy_dirty;
492                }
493                Err(err)
494            }
495        }
496    }
497
498    pub(crate) fn catalog_data_end(&self) -> u64 {
499        let mut max_end = self.header.wal_offset + self.header.wal_size;
500
501        for descriptor in &self.toc.segment_catalog.lex_segments {
502            if descriptor.common.bytes_length == 0 {
503                continue;
504            }
505            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
506        }
507
508        for descriptor in &self.toc.segment_catalog.vec_segments {
509            if descriptor.common.bytes_length == 0 {
510                continue;
511            }
512            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
513        }
514
515        for descriptor in &self.toc.segment_catalog.time_segments {
516            if descriptor.common.bytes_length == 0 {
517                continue;
518            }
519            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
520        }
521
522        #[cfg(feature = "temporal_track")]
523        for descriptor in &self.toc.segment_catalog.temporal_segments {
524            if descriptor.common.bytes_length == 0 {
525                continue;
526            }
527            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
528        }
529
530        #[cfg(feature = "lex")]
531        for descriptor in &self.toc.segment_catalog.tantivy_segments {
532            if descriptor.common.bytes_length == 0 {
533                continue;
534            }
535            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
536        }
537
538        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
539            if manifest.bytes_length != 0 {
540                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
541            }
542        }
543
544        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
545            if manifest.bytes_length != 0 {
546                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
547            }
548        }
549
550        if let Some(manifest) = self.toc.time_index.as_ref() {
551            if manifest.bytes_length != 0 {
552                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
553            }
554        }
555
556        #[cfg(feature = "temporal_track")]
557        if let Some(track) = self.toc.temporal_track.as_ref() {
558            if track.bytes_length != 0 {
559                max_end = max_end.max(track.bytes_offset + track.bytes_length);
560            }
561        }
562
563        max_end
564    }
565
566    fn payload_region_end(&self) -> u64 {
567        self.cached_payload_end
568    }
569
570    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
571        loop {
572            match self.wal.append_entry(payload) {
573                Ok(seq) => return Ok(seq),
574                Err(MemvidError::CheckpointFailed { reason })
575                    if reason == "embedded WAL region too small for entry"
576                        || reason == "embedded WAL region full" =>
577                {
578                    // WAL is either too small for this entry or full with pending entries.
579                    // Grow the WAL to accommodate - doubling ensures we have space.
580                    let required = WAL_ENTRY_HEADER_SIZE
581                        .saturating_add(payload.len() as u64)
582                        .max(self.header.wal_size + 1);
583                    self.grow_wal_region(required)?;
584                }
585                Err(err) => return Err(err),
586            }
587        }
588    }
589
590    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
591        let mut new_size = self.header.wal_size;
592        let mut target = required_entry_size;
593        if target == 0 {
594            target = self.header.wal_size;
595        }
596        while new_size <= target {
597            new_size = new_size
598                .checked_mul(2)
599                .ok_or_else(|| MemvidError::CheckpointFailed {
600                    reason: "wal_size overflow".into(),
601                })?;
602        }
603        let delta = new_size - self.header.wal_size;
604        if delta == 0 {
605            return Ok(());
606        }
607
608        self.shift_data_for_wal_growth(delta)?;
609        self.header.wal_size = new_size;
610        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
611        self.data_end = self.data_end.saturating_add(delta);
612        self.adjust_offsets_after_wal_growth(delta);
613
614        let catalog_end = self.catalog_data_end();
615        self.header.footer_offset = catalog_end
616            .max(self.header.footer_offset)
617            .max(self.data_end);
618
619        self.rewrite_toc_footer()?;
620        self.header.toc_checksum = self.toc.toc_checksum;
621        crate::persist_header(&mut self.file, &self.header)?;
622        self.file.sync_all()?;
623        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
624        Ok(())
625    }
626
627    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
628        if delta == 0 {
629            return Ok(());
630        }
631        let original_len = self.file.metadata()?.len();
632        let data_start = self.header.wal_offset + self.header.wal_size;
633        self.file.set_len(original_len + delta)?;
634
635        let mut remaining = original_len.saturating_sub(data_start);
636        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
637        while remaining > 0 {
638            let chunk = min(remaining, buffer.len() as u64);
639            let src = data_start + remaining - chunk;
640            self.file.seek(SeekFrom::Start(src))?;
641            #[allow(clippy::cast_possible_truncation)]
642            self.file.read_exact(&mut buffer[..chunk as usize])?;
643            let dst = src + delta;
644            self.file.seek(SeekFrom::Start(dst))?;
645            #[allow(clippy::cast_possible_truncation)]
646            self.file.write_all(&buffer[..chunk as usize])?;
647            remaining -= chunk;
648        }
649
650        self.file.seek(SeekFrom::Start(data_start))?;
651        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
652        let mut remaining = delta;
653        while remaining > 0 {
654            let write = min(remaining, zero_buf.len() as u64);
655            #[allow(clippy::cast_possible_truncation)]
656            self.file.write_all(&zero_buf[..write as usize])?;
657            remaining -= write;
658        }
659        Ok(())
660    }
661
662    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
663        if delta == 0 {
664            return;
665        }
666
667        for frame in &mut self.toc.frames {
668            if frame.payload_offset != 0 {
669                frame.payload_offset += delta;
670            }
671        }
672
673        for segment in &mut self.toc.segments {
674            if segment.bytes_offset != 0 {
675                segment.bytes_offset += delta;
676            }
677        }
678
679        if let Some(lex) = self.toc.indexes.lex.as_mut() {
680            if lex.bytes_offset != 0 {
681                lex.bytes_offset += delta;
682            }
683        }
684        for manifest in &mut self.toc.indexes.lex_segments {
685            if manifest.bytes_offset != 0 {
686                manifest.bytes_offset += delta;
687            }
688        }
689        if let Some(vec) = self.toc.indexes.vec.as_mut() {
690            if vec.bytes_offset != 0 {
691                vec.bytes_offset += delta;
692            }
693        }
694        if let Some(time_index) = self.toc.time_index.as_mut() {
695            if time_index.bytes_offset != 0 {
696                time_index.bytes_offset += delta;
697            }
698        }
699        #[cfg(feature = "temporal_track")]
700        if let Some(track) = self.toc.temporal_track.as_mut() {
701            if track.bytes_offset != 0 {
702                track.bytes_offset += delta;
703            }
704        }
705
706        let catalog = &mut self.toc.segment_catalog;
707        for descriptor in &mut catalog.lex_segments {
708            if descriptor.common.bytes_offset != 0 {
709                descriptor.common.bytes_offset += delta;
710            }
711        }
712        for descriptor in &mut catalog.vec_segments {
713            if descriptor.common.bytes_offset != 0 {
714                descriptor.common.bytes_offset += delta;
715            }
716        }
717        for descriptor in &mut catalog.time_segments {
718            if descriptor.common.bytes_offset != 0 {
719                descriptor.common.bytes_offset += delta;
720            }
721        }
722        #[cfg(feature = "temporal_track")]
723        for descriptor in &mut catalog.temporal_segments {
724            if descriptor.common.bytes_offset != 0 {
725                descriptor.common.bytes_offset += delta;
726            }
727        }
728        for descriptor in &mut catalog.tantivy_segments {
729            if descriptor.common.bytes_offset != 0 {
730                descriptor.common.bytes_offset += delta;
731            }
732        }
733
734        #[cfg(feature = "lex")]
735        if let Ok(mut storage) = self.lex_storage.write() {
736            storage.adjust_offsets(delta);
737        }
738    }
739    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
740        self.ensure_writable()?;
741        if options.background {
742            tracing::debug!("commit background flag ignored; running synchronously");
743        }
744        let mode = options.mode;
745        let records = self.wal.pending_records()?;
746        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
747            return Ok(());
748        }
749        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
750    }
751
752    pub fn commit(&mut self) -> Result<()> {
753        self.ensure_writable()?;
754        self.commit_with_options(CommitOptions::new(CommitMode::Full))
755    }
756
757    /// Enter batch mode for high-throughput ingestion.
758    ///
759    /// While batch mode is active:
760    /// - WAL fsync is skipped on every append (controlled by `opts.skip_sync`)
761    /// - Auto-checkpoint is suppressed (controlled by `opts.disable_auto_checkpoint`)
762    /// - Compression level is lowered (controlled by `opts.compression_level`)
763    /// - WAL is pre-sized to avoid expensive mid-batch growth (controlled by `opts.wal_pre_size_bytes`)
764    ///
765    /// **You must call [`end_batch()`](Self::end_batch) when done** to flush the WAL
766    /// and restore normal operation.
767    pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
768        if opts.wal_pre_size_bytes > 0 {
769            self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
770        }
771        self.wal.set_skip_sync(opts.skip_sync);
772        self.batch_opts = Some(opts);
773        Ok(())
774    }
775
776    /// Pre-grow the embedded WAL to at least `min_bytes` in a single operation.
777    ///
778    /// When `disable_auto_checkpoint` is true, all WAL entries accumulate for
779    /// the entire batch.  If the region is too small it must grow repeatedly,
780    /// and every growth shifts **all** payload data — O(file_size) per shift.
781    ///
782    /// Calling this once before the batch eliminates all mid-batch shifts.
783    /// On a freshly-created file the shift is essentially free (no data to move).
784    fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
785        if min_bytes <= self.header.wal_size {
786            return Ok(());
787        }
788        // Jump directly to the target size (next power of two for alignment)
789        let target = min_bytes.next_power_of_two();
790        let delta = target.saturating_sub(self.header.wal_size);
791        if delta == 0 {
792            return Ok(());
793        }
794
795        tracing::info!(
796            current_wal = self.header.wal_size,
797            target_wal = target,
798            delta,
799            "pre-sizing WAL for batch mode"
800        );
801
802        self.shift_data_for_wal_growth(delta)?;
803        self.header.wal_size = target;
804        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
805        self.data_end = self.data_end.saturating_add(delta);
806        self.adjust_offsets_after_wal_growth(delta);
807
808        let catalog_end = self.catalog_data_end();
809        self.header.footer_offset = catalog_end
810            .max(self.header.footer_offset)
811            .max(self.data_end);
812
813        self.rewrite_toc_footer()?;
814        self.header.toc_checksum = self.toc.toc_checksum;
815        crate::persist_header(&mut self.file, &self.header)?;
816        self.file.sync_all()?;
817        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
818        Ok(())
819    }
820
821    /// Exit batch mode, flushing the WAL and restoring per-entry fsync.
822    ///
823    /// This performs a single `fsync` for all appends accumulated during the batch,
824    /// then clears batch options so subsequent puts use default behaviour.
825    pub fn end_batch(&mut self) -> Result<()> {
826        // Single fsync for the entire batch
827        self.wal.flush()?;
828        self.wal.set_skip_sync(false);
829        self.batch_opts = None;
830        Ok(())
831    }
832
833    /// Commit pending WAL records without rebuilding any indexes.
834    ///
835    /// Optimized for bulk ingestion: payloads and frame metadata are persisted,
836    /// but time index, Tantivy, vec index, and other index structures are NOT
837    /// rebuilt. The caller must do a full `commit()` (which triggers
838    /// `rebuild_indexes()`) after all batches are written.
839    ///
840    /// Skips the staging lock for performance — not crash-safe.
841    pub fn commit_skip_indexes(&mut self) -> Result<()> {
842        self.ensure_writable()?;
843        let records = self.wal.pending_records()?;
844        if records.is_empty() && !self.dirty {
845            return Ok(());
846        }
847        self.commit_skip_indexes_inner(records)
848    }
849
850    fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
851        self.generation = self.generation.wrapping_add(1);
852
853        // Temporarily remove Tantivy engine to avoid per-frame indexing work
854        // and disk reads in apply_records(). We won't persist Tantivy state anyway.
855        #[cfg(feature = "lex")]
856        let tantivy_backup = self.tantivy.take();
857
858        let result = self.apply_records(records);
859
860        // Restore Tantivy engine (unchanged — no dirty frames added)
861        #[cfg(feature = "lex")]
862        {
863            self.tantivy = tantivy_backup;
864            self.tantivy_dirty = false;
865        }
866
867        let _delta = result?;
868
869        // Set footer_offset to right after payloads (no index data written)
870        self.header.footer_offset = self.data_end;
871
872        // Clear stale index manifests so next open() doesn't try to load
873        // garbage data. Indexes will be rebuilt in the finalize step.
874        self.toc.time_index = None;
875        self.toc.indexes.lex_segments.clear();
876        // Preserve vec manifest (holds dimension info) but zero out data pointers
877        if let Some(vec) = self.toc.indexes.vec.as_mut() {
878            vec.bytes_offset = 0;
879            vec.bytes_length = 0;
880            vec.vector_count = 0;
881        }
882        self.toc.indexes.lex = None;
883        self.toc.indexes.clip = None;
884        self.toc.segment_catalog.lex_segments.clear();
885        self.toc.segment_catalog.vec_segments.clear();
886        self.toc.segment_catalog.time_segments.clear();
887        self.toc.segment_catalog.tantivy_segments.clear();
888        #[cfg(feature = "temporal_track")]
889        {
890            self.toc.temporal_track = None;
891            self.toc.segment_catalog.temporal_segments.clear();
892        }
893        self.toc.memories_track = None;
894        self.toc.logic_mesh = None;
895        self.toc.sketch_track = None;
896
897        self.rewrite_toc_footer()?;
898        self.header.toc_checksum = self.toc.toc_checksum;
899        self.wal.record_checkpoint(&mut self.header)?;
900        self.header.toc_checksum = self.toc.toc_checksum;
901        crate::persist_header(&mut self.file, &self.header)?;
902        self.file.sync_all()?;
903        self.pending_frame_inserts = 0;
904        self.dirty = false;
905        Ok(())
906    }
907
908    /// Rebuild all indexes (time, Tantivy, vec) and persist the TOC.
909    ///
910    /// Use after bulk ingestion with `commit_skip_indexes()` to build
911    /// all search indexes in one O(n) pass. This is the complement to
912    /// `commit_skip_indexes()` — call it once after all batches are done.
913    pub fn finalize_indexes(&mut self) -> Result<()> {
914        self.ensure_writable()?;
915        self.rebuild_indexes(&[], &[])?;
916        self.rewrite_toc_footer()?;
917        self.header.toc_checksum = self.toc.toc_checksum;
918        crate::persist_header(&mut self.file, &self.header)?;
919        self.file.sync_all()?;
920        Ok(())
921    }
922
923    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
924        self.generation = self.generation.wrapping_add(1);
925
926        let delta = self.apply_records(records)?;
927        let mut indexes_rebuilt = false;
928
929        // Check if CLIP index has pending embeddings that need to be persisted
930        let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
931
932        if !delta.is_empty() || clip_needs_persist {
933            tracing::debug!(
934                inserted_frames = delta.inserted_frames.len(),
935                inserted_embeddings = delta.inserted_embeddings.len(),
936                inserted_time_entries = delta.inserted_time_entries.len(),
937                clip_needs_persist = clip_needs_persist,
938                "commit applied delta"
939            );
940            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
941            indexes_rebuilt = true;
942        }
943
944        if !indexes_rebuilt && self.tantivy_index_pending() {
945            self.flush_tantivy()?;
946        }
947
948        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
949        if !indexes_rebuilt && self.clip_enabled {
950            if let Some(ref clip_index) = self.clip_index {
951                if !clip_index.is_empty() {
952                    self.persist_clip_index()?;
953                }
954            }
955        }
956
957        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
958        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
959            self.persist_memories_track()?;
960        }
961
962        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
963        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
964            self.persist_logic_mesh()?;
965        }
966
967        // Persist sketch track if it has entries
968        if !self.sketch_track.is_empty() {
969            self.persist_sketch_track()?;
970        }
971
972        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
973        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
974
975        self.rewrite_toc_footer()?;
976        self.header.toc_checksum = self.toc.toc_checksum;
977        self.wal.record_checkpoint(&mut self.header)?;
978        self.header.toc_checksum = self.toc.toc_checksum;
979        crate::persist_header(&mut self.file, &self.header)?;
980        self.file.sync_all()?;
981        #[cfg(feature = "parallel_segments")]
982        if let Some(wal) = self.manifest_wal.as_mut() {
983            wal.flush()?;
984            wal.truncate()?;
985        }
986        self.pending_frame_inserts = 0;
987        self.dirty = false;
988        Ok(())
989    }
990
991    #[cfg(feature = "parallel_segments")]
992    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
993        self.ensure_writable()?;
994        if !self.dirty && !self.tantivy_index_pending() {
995            return Ok(());
996        }
997        let opts = opts.clone();
998        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
999    }
1000
1001    #[cfg(feature = "parallel_segments")]
1002    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1003        if !self.dirty && !self.tantivy_index_pending() {
1004            return Ok(());
1005        }
1006        let records = self.wal.pending_records()?;
1007        let delta = self.apply_records(records)?;
1008        self.generation = self.generation.wrapping_add(1);
1009        let mut indexes_rebuilt = false;
1010        if !delta.is_empty() {
1011            tracing::info!(
1012                inserted_frames = delta.inserted_frames.len(),
1013                inserted_embeddings = delta.inserted_embeddings.len(),
1014                inserted_time_entries = delta.inserted_time_entries.len(),
1015                "parallel commit applied delta"
1016            );
1017            // Try to use parallel segment builder first
1018            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1019            tracing::info!(
1020                "parallel_commit: used_parallel={}, lex_enabled={}",
1021                used_parallel,
1022                self.lex_enabled
1023            );
1024            if used_parallel {
1025                // Segments were written at data_end; update footer_offset so
1026                // rewrite_toc_footer places the TOC after the new segment data
1027                self.header.footer_offset = self.data_end;
1028                indexes_rebuilt = true;
1029
1030                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
1031                // Only add new frames from the delta, not all frames.
1032                #[cfg(feature = "lex")]
1033                if self.lex_enabled {
1034                    tracing::info!(
1035                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1036                        delta.inserted_frames.len(),
1037                        self.toc.frames.len()
1038                    );
1039
1040                    // If Tantivy engine already exists, frames were already indexed during put()
1041                    // (at the add_frame call in put_bytes_with_options). Skip re-indexing to avoid
1042                    // duplicates. Only initialize and index if Tantivy wasn't present during put.
1043                    let tantivy_was_present = self.tantivy.is_some();
1044                    if self.tantivy.is_none() {
1045                        self.init_tantivy()?;
1046                    }
1047
1048                    // Skip indexing if Tantivy was already present - frames were indexed during put
1049                    if tantivy_was_present {
1050                        tracing::info!(
1051                            "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1052                        );
1053                    } else {
1054                        // First, collect all frames and their search text (to avoid borrow conflicts)
1055                        let max_payload = crate::memvid::search::max_index_payload();
1056                        let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1057
1058                        for frame_id in &delta.inserted_frames {
1059                            // Look up the actual Frame from the TOC
1060                            let frame = match self.toc.frames.get(*frame_id as usize) {
1061                                Some(f) => f.clone(),
1062                                None => continue,
1063                            };
1064
1065                            // Check if frame has explicit search_text first - clone it for ownership
1066                            let explicit_text = frame.search_text.clone();
1067                            if let Some(ref search_text) = explicit_text {
1068                                if !search_text.trim().is_empty() {
1069                                    prepared_docs.push((frame, search_text.clone()));
1070                                    continue;
1071                                }
1072                            }
1073
1074                            // Get MIME type and check if text-indexable
1075                            let mime = frame
1076                                .metadata
1077                                .as_ref()
1078                                .and_then(|m| m.mime.as_deref())
1079                                .unwrap_or("application/octet-stream");
1080
1081                            if !crate::memvid::search::is_text_indexable_mime(mime) {
1082                                continue;
1083                            }
1084
1085                            if frame.payload_length > max_payload {
1086                                continue;
1087                            }
1088
1089                            let text = self.frame_search_text(&frame)?;
1090                            if !text.trim().is_empty() {
1091                                prepared_docs.push((frame, text));
1092                            }
1093                        }
1094
1095                        // Now add to Tantivy engine (no borrow conflict)
1096                        if let Some(ref mut engine) = self.tantivy {
1097                            for (frame, text) in &prepared_docs {
1098                                engine.add_frame(frame, text)?;
1099                            }
1100
1101                            if !prepared_docs.is_empty() {
1102                                engine.commit()?;
1103                                self.tantivy_dirty = true;
1104                            }
1105
1106                            tracing::info!(
1107                                "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1108                                prepared_docs.len(),
1109                                engine.num_docs()
1110                            );
1111                        } else {
1112                            tracing::warn!(
1113                                "parallel_commit: Tantivy engine is None after init_tantivy"
1114                            );
1115                        }
1116                    } // end of else !tantivy_was_present
1117                }
1118
1119                // Time index stores all entries together for timeline queries.
1120                // Unlike Tantivy which is incremental, time index needs full rebuild.
1121                self.file.seek(SeekFrom::Start(self.data_end))?;
1122                let mut time_entries: Vec<TimeIndexEntry> = self
1123                    .toc
1124                    .frames
1125                    .iter()
1126                    .filter(|frame| {
1127                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1128                    })
1129                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1130                    .collect();
1131                let (ti_offset, ti_length, ti_checksum) =
1132                    time_index_append(&mut self.file, &mut time_entries)?;
1133                self.toc.time_index = Some(TimeIndexManifest {
1134                    bytes_offset: ti_offset,
1135                    bytes_length: ti_length,
1136                    entry_count: time_entries.len() as u64,
1137                    checksum: ti_checksum,
1138                });
1139                // Update data_end to account for the newly written time index
1140                self.data_end = ti_offset + ti_length;
1141                self.header.footer_offset = self.data_end;
1142                tracing::info!(
1143                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1144                    ti_offset,
1145                    ti_length,
1146                    time_entries.len()
1147                );
1148            } else {
1149                // Fall back to sequential rebuild if no segments were generated
1150                self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1151                indexes_rebuilt = true;
1152            }
1153        }
1154
1155        // Flush Tantivy index if dirty (from parallel path or pending updates)
1156        #[cfg(feature = "lex")]
1157        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1158            self.flush_tantivy()?;
1159        }
1160
1161        // Persist CLIP index if it has embeddings
1162        if self.clip_enabled {
1163            if let Some(ref clip_index) = self.clip_index {
1164                if !clip_index.is_empty() {
1165                    self.persist_clip_index()?;
1166                }
1167            }
1168        }
1169
1170        // Persist memories track if it has cards
1171        if self.memories_track.card_count() > 0 {
1172            self.persist_memories_track()?;
1173        }
1174
1175        // Persist logic mesh if it has nodes
1176        if !self.logic_mesh.is_empty() {
1177            self.persist_logic_mesh()?;
1178        }
1179
1180        // Persist sketch track if it has entries
1181        if !self.sketch_track.is_empty() {
1182            self.persist_sketch_track()?;
1183        }
1184
1185        // flush_tantivy() has already set footer_offset correctly
1186        // DO NOT overwrite with catalog_data_end()
1187        self.rewrite_toc_footer()?;
1188        self.header.toc_checksum = self.toc.toc_checksum;
1189        self.wal.record_checkpoint(&mut self.header)?;
1190        self.header.toc_checksum = self.toc.toc_checksum;
1191        crate::persist_header(&mut self.file, &self.header)?;
1192        self.file.sync_all()?;
1193        if let Some(wal) = self.manifest_wal.as_mut() {
1194            wal.flush()?;
1195            wal.truncate()?;
1196        }
1197        self.pending_frame_inserts = 0;
1198        self.dirty = false;
1199        Ok(())
1200    }
1201
1202    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1203        let records = self.wal.records_after(self.header.wal_sequence)?;
1204        if records.is_empty() {
1205            if self.tantivy_index_pending() {
1206                self.flush_tantivy()?;
1207            }
1208            return Ok(());
1209        }
1210        let delta = self.apply_records(records)?;
1211        if !delta.is_empty() {
1212            tracing::debug!(
1213                inserted_frames = delta.inserted_frames.len(),
1214                inserted_embeddings = delta.inserted_embeddings.len(),
1215                inserted_time_entries = delta.inserted_time_entries.len(),
1216                "recover applied delta"
1217            );
1218            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1219        } else if self.tantivy_index_pending() {
1220            self.flush_tantivy()?;
1221        }
1222        self.wal.record_checkpoint(&mut self.header)?;
1223        crate::persist_header(&mut self.file, &self.header)?;
1224        if !delta.is_empty() {
1225            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1226        } else if self.tantivy_index_pending() {
1227            self.flush_tantivy()?;
1228            crate::persist_header(&mut self.file, &self.header)?;
1229        }
1230        self.file.sync_all()?;
1231        self.pending_frame_inserts = 0;
1232        self.dirty = false;
1233        Ok(())
1234    }
1235
1236    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1237        let mut delta = IngestionDelta::default();
1238        if records.is_empty() {
1239            return Ok(delta);
1240        }
1241
1242        // Use data_end instead of payload_region_end to avoid overwriting
1243        // vec/lex/time segments that were written after the payload region.
1244        // payload_region_end() only considers frame payloads, but data_end tracks
1245        // all data including index segments.
1246        let mut data_cursor = self.data_end;
1247        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1248
1249        if !records.is_empty() {
1250            self.file.seek(SeekFrom::Start(data_cursor))?;
1251            for record in records {
1252                let mut entry = match decode_wal_entry(&record.payload)? {
1253                    WalEntry::Frame(entry) => entry,
1254                    #[cfg(feature = "lex")]
1255                    WalEntry::Lex(batch) => {
1256                        self.apply_lex_wal(batch)?;
1257                        continue;
1258                    }
1259                };
1260
1261                match entry.op {
1262                    FrameWalOp::Insert => {
1263                        let frame_id = self.toc.frames.len() as u64;
1264
1265                        let (
1266                            payload_offset,
1267                            payload_length,
1268                            checksum_bytes,
1269                            canonical_length_value,
1270                        ) = if let Some(source_id) = entry.reuse_payload_from {
1271                            if !entry.payload.is_empty() {
1272                                return Err(MemvidError::InvalidFrame {
1273                                    frame_id: source_id,
1274                                    reason: "reused payload entry contained inline bytes",
1275                                });
1276                            }
1277                            let source_idx = usize::try_from(source_id).map_err(|_| {
1278                                MemvidError::InvalidFrame {
1279                                    frame_id: source_id,
1280                                    reason: "frame id too large for memory",
1281                                }
1282                            })?;
1283                            let source = self.toc.frames.get(source_idx).cloned().ok_or(
1284                                MemvidError::InvalidFrame {
1285                                    frame_id: source_id,
1286                                    reason: "reused payload source missing",
1287                                },
1288                            )?;
1289                            (
1290                                source.payload_offset,
1291                                source.payload_length,
1292                                source.checksum,
1293                                entry
1294                                    .canonical_length
1295                                    .or(source.canonical_length)
1296                                    .unwrap_or(source.payload_length),
1297                            )
1298                        } else {
1299                            self.file.seek(SeekFrom::Start(data_cursor))?;
1300                            self.file.write_all(&entry.payload)?;
1301                            let checksum = hash(&entry.payload);
1302                            let payload_length = entry.payload.len() as u64;
1303                            let canonical_length =
1304                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1305                                    if let Some(len) = entry.canonical_length {
1306                                        len
1307                                    } else {
1308                                        let decoded = crate::decode_canonical_bytes(
1309                                            &entry.payload,
1310                                            CanonicalEncoding::Zstd,
1311                                            frame_id,
1312                                        )?;
1313                                        decoded.len() as u64
1314                                    }
1315                                } else {
1316                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1317                                };
1318                            let payload_offset = data_cursor;
1319                            data_cursor += payload_length;
1320                            // Keep cached_payload_end in sync (monotonically increasing)
1321                            self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1322                            (
1323                                payload_offset,
1324                                payload_length,
1325                                *checksum.as_bytes(),
1326                                canonical_length,
1327                            )
1328                        };
1329
1330                        let uri = entry
1331                            .uri
1332                            .clone()
1333                            .unwrap_or_else(|| crate::default_uri(frame_id));
1334                        let title = entry
1335                            .title
1336                            .clone()
1337                            .or_else(|| crate::infer_title_from_uri(&uri));
1338
1339                        #[cfg(feature = "temporal_track")]
1340                        let (anchor_ts, anchor_source) =
1341                            self.determine_temporal_anchor(entry.timestamp);
1342
1343                        let mut frame = Frame {
1344                            id: frame_id,
1345                            timestamp: entry.timestamp,
1346                            anchor_ts: {
1347                                #[cfg(feature = "temporal_track")]
1348                                {
1349                                    Some(anchor_ts)
1350                                }
1351                                #[cfg(not(feature = "temporal_track"))]
1352                                {
1353                                    None
1354                                }
1355                            },
1356                            anchor_source: {
1357                                #[cfg(feature = "temporal_track")]
1358                                {
1359                                    Some(anchor_source)
1360                                }
1361                                #[cfg(not(feature = "temporal_track"))]
1362                                {
1363                                    None
1364                                }
1365                            },
1366                            kind: entry.kind.clone(),
1367                            track: entry.track.clone(),
1368                            payload_offset,
1369                            payload_length,
1370                            checksum: checksum_bytes,
1371                            uri: Some(uri),
1372                            title,
1373                            canonical_encoding: entry.canonical_encoding,
1374                            canonical_length: Some(canonical_length_value),
1375                            metadata: entry.metadata.clone(),
1376                            search_text: entry.search_text.clone(),
1377                            tags: entry.tags.clone(),
1378                            labels: entry.labels.clone(),
1379                            extra_metadata: entry.extra_metadata.clone(),
1380                            content_dates: entry.content_dates.clone(),
1381                            chunk_manifest: entry.chunk_manifest.clone(),
1382                            role: entry.role,
1383                            parent_id: None,
1384                            chunk_index: entry.chunk_index,
1385                            chunk_count: entry.chunk_count,
1386                            status: FrameStatus::Active,
1387                            supersedes: entry.supersedes_frame_id,
1388                            superseded_by: None,
1389                            source_sha256: entry.source_sha256,
1390                            source_path: entry.source_path.clone(),
1391                            enrichment_state: entry.enrichment_state,
1392                        };
1393
1394                        if let Some(parent_seq) = entry.parent_sequence {
1395                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1396                                frame.parent_id = Some(*parent_frame_id);
1397                            } else {
1398                                // Parent sequence not found in current batch - this can happen
1399                                // if parent was committed in a previous batch. Try to find parent
1400                                // by looking at recently inserted frames with matching characteristics.
1401                                // The parent should be the most recent Document frame that has
1402                                // a chunk_manifest matching this chunk's expected parent.
1403                                if entry.role == FrameRole::DocumentChunk {
1404                                    // Look backwards through recently inserted frames
1405                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1406                                        if let Ok(idx) = usize::try_from(candidate_id) {
1407                                            if let Some(candidate) = self.toc.frames.get(idx) {
1408                                                if candidate.role == FrameRole::Document
1409                                                    && candidate.chunk_manifest.is_some()
1410                                                {
1411                                                    // Found a parent document - use it
1412                                                    frame.parent_id = Some(candidate_id);
1413                                                    tracing::debug!(
1414                                                        chunk_frame_id = frame_id,
1415                                                        parent_frame_id = candidate_id,
1416                                                        parent_seq = parent_seq,
1417                                                        "resolved chunk parent via fallback"
1418                                                    );
1419                                                    break;
1420                                                }
1421                                            }
1422                                        }
1423                                    }
1424                                }
1425                                if frame.parent_id.is_none() {
1426                                    tracing::warn!(
1427                                        chunk_frame_id = frame_id,
1428                                        parent_seq = parent_seq,
1429                                        "chunk has parent_sequence but parent not found in batch"
1430                                    );
1431                                }
1432                            }
1433                        }
1434
1435                        #[cfg(feature = "lex")]
1436                        let index_text = if self.tantivy.is_some() {
1437                            if let Some(text) = entry.search_text.clone() {
1438                                if text.trim().is_empty() {
1439                                    None
1440                                } else {
1441                                    Some(text)
1442                                }
1443                            } else {
1444                                Some(self.frame_content(&frame)?)
1445                            }
1446                        } else {
1447                            None
1448                        };
1449                        #[cfg(feature = "lex")]
1450                        if let (Some(engine), Some(text)) =
1451                            (self.tantivy.as_mut(), index_text.as_ref())
1452                        {
1453                            engine.add_frame(&frame, text)?;
1454                            self.tantivy_dirty = true;
1455
1456                            // Generate sketch for fast candidate pre-filtering
1457                            // Uses the same text as tantivy indexing for consistency
1458                            if !text.trim().is_empty() {
1459                                let entry = crate::types::generate_sketch(
1460                                    frame_id,
1461                                    text,
1462                                    crate::types::SketchVariant::Small,
1463                                    None,
1464                                );
1465                                self.sketch_track.insert(entry);
1466                            }
1467                        }
1468
1469                        if let Some(embedding) = entry.embedding.take() {
1470                            delta
1471                                .inserted_embeddings
1472                                .push((frame_id, embedding.clone()));
1473                        }
1474
1475                        if entry.role == FrameRole::Document {
1476                            delta
1477                                .inserted_time_entries
1478                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1479                            #[cfg(feature = "temporal_track")]
1480                            {
1481                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1482                                    frame_id,
1483                                    anchor_ts,
1484                                    anchor_source,
1485                                ));
1486                                delta.inserted_temporal_mentions.extend(
1487                                    Self::collect_temporal_mentions(
1488                                        entry.search_text.as_deref(),
1489                                        frame_id,
1490                                        anchor_ts,
1491                                    ),
1492                                );
1493                            }
1494                        }
1495
1496                        if let Some(predecessor) = frame.supersedes {
1497                            self.mark_frame_superseded(predecessor, frame_id)?;
1498                        }
1499
1500                        self.toc.frames.push(frame);
1501                        delta.inserted_frames.push(frame_id);
1502                        sequence_to_frame.insert(record.sequence, frame_id);
1503                    }
1504                    FrameWalOp::Tombstone => {
1505                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1506                            frame_id: 0,
1507                            reason: "tombstone missing frame reference",
1508                        })?;
1509                        self.mark_frame_deleted(target)?;
1510                        delta.mutated_frames = true;
1511                    }
1512                }
1513            }
1514            self.data_end = self.data_end.max(data_cursor);
1515        }
1516
1517        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1518        // This handles edge cases where chunks couldn't be linked during the first pass.
1519        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1520        let orphan_resolutions: Vec<(u64, u64)> = delta
1521            .inserted_frames
1522            .iter()
1523            .filter_map(|&frame_id| {
1524                let idx = usize::try_from(frame_id).ok()?;
1525                let frame = self.toc.frames.get(idx)?;
1526                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1527                    return None;
1528                }
1529                // Find the most recent Document frame before this chunk that has a manifest
1530                for candidate_id in (0..frame_id).rev() {
1531                    if let Ok(idx) = usize::try_from(candidate_id) {
1532                        if let Some(candidate) = self.toc.frames.get(idx) {
1533                            if candidate.role == FrameRole::Document
1534                                && candidate.chunk_manifest.is_some()
1535                                && candidate.status == FrameStatus::Active
1536                            {
1537                                return Some((frame_id, candidate_id));
1538                            }
1539                        }
1540                    }
1541                }
1542                None
1543            })
1544            .collect();
1545
1546        // Now apply the resolutions
1547        for (chunk_id, parent_id) in orphan_resolutions {
1548            if let Ok(idx) = usize::try_from(chunk_id) {
1549                if let Some(frame) = self.toc.frames.get_mut(idx) {
1550                    frame.parent_id = Some(parent_id);
1551                    tracing::debug!(
1552                        chunk_frame_id = chunk_id,
1553                        parent_frame_id = parent_id,
1554                        "resolved orphan chunk parent in second pass"
1555                    );
1556                }
1557            }
1558        }
1559
1560        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1561        // See commit_from_records() for where rebuild_indexes() is invoked.
1562        Ok(delta)
1563    }
1564
1565    #[cfg(feature = "temporal_track")]
1566    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1567        (timestamp, AnchorSource::FrameTimestamp)
1568    }
1569
1570    #[cfg(feature = "temporal_track")]
1571    fn collect_temporal_mentions(
1572        text: Option<&str>,
1573        frame_id: FrameId,
1574        anchor_ts: i64,
1575    ) -> Vec<TemporalMention> {
1576        let text = match text {
1577            Some(value) if !value.trim().is_empty() => value,
1578            _ => return Vec::new(),
1579        };
1580
1581        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1582            Ok(ts) => ts,
1583            Err(_) => return Vec::new(),
1584        };
1585
1586        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1587        let normalizer = TemporalNormalizer::new(context);
1588        let mut spans: Vec<(usize, usize)> = Vec::new();
1589        let lower = text.to_ascii_lowercase();
1590
1591        for phrase in STATIC_TEMPORAL_PHRASES {
1592            let mut search_start = 0usize;
1593            while let Some(idx) = lower[search_start..].find(phrase) {
1594                let abs = search_start + idx;
1595                let end = abs + phrase.len();
1596                spans.push((abs, end));
1597                search_start = end;
1598            }
1599        }
1600
1601        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1602        let regex = NUMERIC_DATE.get_or_init(|| {
1603            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1604        });
1605        let regex = match regex {
1606            Ok(re) => re,
1607            Err(msg) => {
1608                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1609                return Vec::new();
1610            }
1611        };
1612        for mat in regex.find_iter(text) {
1613            spans.push((mat.start(), mat.end()));
1614        }
1615
1616        spans.sort_unstable();
1617        spans.dedup();
1618
1619        let mut mentions: Vec<TemporalMention> = Vec::new();
1620        for (start, end) in spans {
1621            if end > text.len() || start >= end {
1622                continue;
1623            }
1624            let raw = &text[start..end];
1625            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1626            if trimmed.is_empty() {
1627                continue;
1628            }
1629            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1630            let finish = offset + trimmed.len();
1631            match normalizer.resolve(trimmed) {
1632                Ok(resolution) => {
1633                    mentions.extend(Self::resolution_to_mentions(
1634                        resolution, frame_id, offset, finish,
1635                    ));
1636                }
1637                Err(_) => continue,
1638            }
1639        }
1640
1641        mentions
1642    }
1643
1644    #[cfg(feature = "temporal_track")]
1645    fn resolution_to_mentions(
1646        resolution: TemporalResolution,
1647        frame_id: FrameId,
1648        byte_start: usize,
1649        byte_end: usize,
1650    ) -> Vec<TemporalMention> {
1651        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1652        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1653        let mut results = Vec::new();
1654
1655        let base_flags = Self::flags_from_resolution(&resolution.flags);
1656        match resolution.value {
1657            TemporalResolutionValue::Date(date) => {
1658                let ts = Self::date_to_timestamp(date);
1659                results.push(TemporalMention::new(
1660                    ts,
1661                    frame_id,
1662                    byte_start,
1663                    byte_len,
1664                    TemporalMentionKind::Date,
1665                    resolution.confidence,
1666                    0,
1667                    base_flags,
1668                ));
1669            }
1670            TemporalResolutionValue::DateTime(dt) => {
1671                let ts = dt.unix_timestamp();
1672                let tz_hint = dt.offset().whole_minutes() as i16;
1673                results.push(TemporalMention::new(
1674                    ts,
1675                    frame_id,
1676                    byte_start,
1677                    byte_len,
1678                    TemporalMentionKind::DateTime,
1679                    resolution.confidence,
1680                    tz_hint,
1681                    base_flags,
1682                ));
1683            }
1684            TemporalResolutionValue::DateRange { start, end } => {
1685                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1686                let start_ts = Self::date_to_timestamp(start);
1687                results.push(TemporalMention::new(
1688                    start_ts,
1689                    frame_id,
1690                    byte_start,
1691                    byte_len,
1692                    TemporalMentionKind::RangeStart,
1693                    resolution.confidence,
1694                    0,
1695                    flags,
1696                ));
1697                let end_ts = Self::date_to_timestamp(end);
1698                results.push(TemporalMention::new(
1699                    end_ts,
1700                    frame_id,
1701                    byte_start,
1702                    byte_len,
1703                    TemporalMentionKind::RangeEnd,
1704                    resolution.confidence,
1705                    0,
1706                    flags,
1707                ));
1708            }
1709            TemporalResolutionValue::DateTimeRange { start, end } => {
1710                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1711                results.push(TemporalMention::new(
1712                    start.unix_timestamp(),
1713                    frame_id,
1714                    byte_start,
1715                    byte_len,
1716                    TemporalMentionKind::RangeStart,
1717                    resolution.confidence,
1718                    start.offset().whole_minutes() as i16,
1719                    flags,
1720                ));
1721                results.push(TemporalMention::new(
1722                    end.unix_timestamp(),
1723                    frame_id,
1724                    byte_start,
1725                    byte_len,
1726                    TemporalMentionKind::RangeEnd,
1727                    resolution.confidence,
1728                    end.offset().whole_minutes() as i16,
1729                    flags,
1730                ));
1731            }
1732            TemporalResolutionValue::Month { year, month } => {
1733                let start_date = match Date::from_calendar_date(year, month, 1) {
1734                    Ok(date) => date,
1735                    Err(err) => {
1736                        tracing::warn!(
1737                            target = "memvid::temporal",
1738                            %err,
1739                            year,
1740                            month = month as u8,
1741                            "skipping invalid month resolution"
1742                        );
1743                        // Skip invalid range for this mention only.
1744                        return results;
1745                    }
1746                };
1747                let end_date = match Self::last_day_in_month(year, month) {
1748                    Some(date) => date,
1749                    None => {
1750                        tracing::warn!(
1751                            target = "memvid::temporal",
1752                            year,
1753                            month = month as u8,
1754                            "skipping month resolution with invalid calendar range"
1755                        );
1756                        return results;
1757                    }
1758                };
1759                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1760                results.push(TemporalMention::new(
1761                    Self::date_to_timestamp(start_date),
1762                    frame_id,
1763                    byte_start,
1764                    byte_len,
1765                    TemporalMentionKind::RangeStart,
1766                    resolution.confidence,
1767                    0,
1768                    flags,
1769                ));
1770                results.push(TemporalMention::new(
1771                    Self::date_to_timestamp(end_date),
1772                    frame_id,
1773                    byte_start,
1774                    byte_len,
1775                    TemporalMentionKind::RangeEnd,
1776                    resolution.confidence,
1777                    0,
1778                    flags,
1779                ));
1780            }
1781        }
1782
1783        results
1784    }
1785
1786    #[cfg(feature = "temporal_track")]
1787    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1788        let mut result = TemporalMentionFlags::empty();
1789        if flags
1790            .iter()
1791            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1792        {
1793            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1794        }
1795        if flags
1796            .iter()
1797            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1798        {
1799            result = result.set(TemporalMentionFlags::DERIVED, true);
1800        }
1801        result
1802    }
1803
1804    #[cfg(feature = "temporal_track")]
1805    fn date_to_timestamp(date: Date) -> i64 {
1806        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1807            .assume_offset(UtcOffset::UTC)
1808            .unix_timestamp()
1809    }
1810
1811    #[cfg(feature = "temporal_track")]
1812    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1813        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1814        while let Some(next) = date.next_day() {
1815            if next.month() == month {
1816                date = next;
1817            } else {
1818                break;
1819            }
1820        }
1821        Some(date)
1822    }
1823
1824    #[allow(dead_code)]
1825    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1826        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1827            return Ok(false);
1828        }
1829
1830        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1831            Some(artifact) => artifact,
1832            None => return Ok(false),
1833        };
1834
1835        let segment_id = self.toc.segment_catalog.next_segment_id;
1836        #[cfg(feature = "parallel_segments")]
1837        let span =
1838            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1839
1840        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1841        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1842        #[cfg(feature = "parallel_segments")]
1843        if let Some(span) = span {
1844            Self::decorate_segment_common(&mut descriptor.common, span);
1845        }
1846        #[cfg(feature = "parallel_segments")]
1847        let descriptor_for_manifest = descriptor.clone();
1848        self.toc.segment_catalog.lex_segments.push(descriptor);
1849        #[cfg(feature = "parallel_segments")]
1850        if let Err(err) = self.record_index_segment(
1851            SegmentKind::Lexical,
1852            descriptor_for_manifest.common,
1853            SegmentStats {
1854                doc_count: artifact.doc_count,
1855                vector_count: 0,
1856                time_entries: 0,
1857                bytes_uncompressed: artifact.bytes.len() as u64,
1858                build_micros: 0,
1859            },
1860        ) {
1861            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1862        }
1863        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1864        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1865        Ok(true)
1866    }
1867
1868    #[allow(dead_code)]
1869    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1870        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1871            return Ok(false);
1872        }
1873
1874        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1875            Some(artifact) => artifact,
1876            None => return Ok(false),
1877        };
1878
1879        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1880            if existing_dim != artifact.dimension {
1881                return Err(MemvidError::VecDimensionMismatch {
1882                    expected: existing_dim,
1883                    actual: artifact.dimension as usize,
1884                });
1885            }
1886        }
1887
1888        let segment_id = self.toc.segment_catalog.next_segment_id;
1889        #[cfg(feature = "parallel_segments")]
1890        #[cfg(feature = "parallel_segments")]
1891        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1892
1893        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1894        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1895        #[cfg(feature = "parallel_segments")]
1896        if let Some(span) = span {
1897            Self::decorate_segment_common(&mut descriptor.common, span);
1898        }
1899        #[cfg(feature = "parallel_segments")]
1900        let descriptor_for_manifest = descriptor.clone();
1901        self.toc.segment_catalog.vec_segments.push(descriptor);
1902        #[cfg(feature = "parallel_segments")]
1903        if let Err(err) = self.record_index_segment(
1904            SegmentKind::Vector,
1905            descriptor_for_manifest.common,
1906            SegmentStats {
1907                doc_count: 0,
1908                vector_count: artifact.vector_count,
1909                time_entries: 0,
1910                bytes_uncompressed: artifact.bytes_uncompressed,
1911                build_micros: 0,
1912            },
1913        ) {
1914            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1915        }
1916        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1917        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1918
1919        // Keep the global vec manifest in sync for auto-detection and stats.
1920        if self.toc.indexes.vec.is_none() {
1921            let empty_offset = self.data_end;
1922            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1923                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1924            self.toc.indexes.vec = Some(VecIndexManifest {
1925                vector_count: 0,
1926                dimension: 0,
1927                bytes_offset: empty_offset,
1928                bytes_length: 0,
1929                checksum: empty_checksum,
1930                compression_mode: self.vec_compression.clone(),
1931                model: self.vec_model.clone(),
1932            });
1933        }
1934        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1935            if manifest.dimension == 0 {
1936                manifest.dimension = artifact.dimension;
1937            }
1938            if manifest.bytes_length == 0 {
1939                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1940                manifest.compression_mode = artifact.compression.clone();
1941            }
1942        }
1943
1944        self.vec_enabled = true;
1945        Ok(true)
1946    }
1947
1948    #[allow(dead_code)]
1949    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1950        if delta.inserted_time_entries.is_empty() {
1951            return Ok(false);
1952        }
1953
1954        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1955            Some(artifact) => artifact,
1956            None => return Ok(false),
1957        };
1958
1959        let segment_id = self.toc.segment_catalog.next_segment_id;
1960        #[cfg(feature = "parallel_segments")]
1961        #[cfg(feature = "parallel_segments")]
1962        let span = self.segment_span_from_iter(
1963            delta
1964                .inserted_time_entries
1965                .iter()
1966                .map(|entry| entry.frame_id),
1967        );
1968
1969        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1970        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1971        #[cfg(feature = "parallel_segments")]
1972        if let Some(span) = span {
1973            Self::decorate_segment_common(&mut descriptor.common, span);
1974        }
1975        #[cfg(feature = "parallel_segments")]
1976        let descriptor_for_manifest = descriptor.clone();
1977        self.toc.segment_catalog.time_segments.push(descriptor);
1978        #[cfg(feature = "parallel_segments")]
1979        if let Err(err) = self.record_index_segment(
1980            SegmentKind::Time,
1981            descriptor_for_manifest.common,
1982            SegmentStats {
1983                doc_count: 0,
1984                vector_count: 0,
1985                time_entries: artifact.entry_count,
1986                bytes_uncompressed: artifact.bytes.len() as u64,
1987                build_micros: 0,
1988            },
1989        ) {
1990            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1991        }
1992        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1993        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1994        Ok(true)
1995    }
1996
1997    #[cfg(feature = "temporal_track")]
1998    #[allow(dead_code)]
1999    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
2000        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
2001        {
2002            return Ok(false);
2003        }
2004
2005        debug_assert!(
2006            delta.inserted_temporal_mentions.len() < 1_000_000,
2007            "temporal delta mentions unexpectedly large: {}",
2008            delta.inserted_temporal_mentions.len()
2009        );
2010        debug_assert!(
2011            delta.inserted_temporal_anchors.len() < 1_000_000,
2012            "temporal delta anchors unexpectedly large: {}",
2013            delta.inserted_temporal_anchors.len()
2014        );
2015
2016        let artifact = match self.build_temporal_segment_from_records(
2017            &delta.inserted_temporal_mentions,
2018            &delta.inserted_temporal_anchors,
2019        )? {
2020            Some(artifact) => artifact,
2021            None => return Ok(false),
2022        };
2023
2024        let segment_id = self.toc.segment_catalog.next_segment_id;
2025        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2026        self.toc
2027            .segment_catalog
2028            .temporal_segments
2029            .push(descriptor.clone());
2030        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2031        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2032
2033        self.toc.temporal_track = Some(TemporalTrackManifest {
2034            bytes_offset: descriptor.common.bytes_offset,
2035            bytes_length: descriptor.common.bytes_length,
2036            entry_count: artifact.entry_count,
2037            anchor_count: artifact.anchor_count,
2038            checksum: artifact.checksum,
2039            flags: artifact.flags,
2040        });
2041
2042        self.clear_temporal_track_cache();
2043
2044        Ok(true)
2045    }
2046
2047    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2048        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2049            frame_id,
2050            reason: "frame id too large",
2051        })?;
2052        let frame = self
2053            .toc
2054            .frames
2055            .get_mut(index)
2056            .ok_or(MemvidError::InvalidFrame {
2057                frame_id,
2058                reason: "supersede target missing",
2059            })?;
2060        frame.status = FrameStatus::Superseded;
2061        frame.superseded_by = Some(successor_id);
2062        self.remove_frame_from_indexes(frame_id)
2063    }
2064
2065    pub(crate) fn rebuild_indexes(
2066        &mut self,
2067        new_vec_docs: &[(FrameId, Vec<f32>)],
2068        inserted_frame_ids: &[FrameId],
2069    ) -> Result<()> {
2070        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2071            return Ok(());
2072        }
2073
2074        let payload_end = self.payload_region_end();
2075        self.data_end = payload_end;
2076        // Don't truncate if footer_offset is higher - there may be replay segments
2077        // or other data written after payload_end that must be preserved.
2078        let safe_truncate_len = self.header.footer_offset.max(payload_end);
2079        if self.file.metadata()?.len() > safe_truncate_len {
2080            self.file.set_len(safe_truncate_len)?;
2081        }
2082        self.file.seek(SeekFrom::Start(payload_end))?;
2083
2084        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
2085        self.toc.segment_catalog.lex_segments.clear();
2086        self.toc.segment_catalog.vec_segments.clear();
2087        self.toc.segment_catalog.time_segments.clear();
2088        #[cfg(feature = "temporal_track")]
2089        self.toc.segment_catalog.temporal_segments.clear();
2090        #[cfg(feature = "parallel_segments")]
2091        self.toc.segment_catalog.index_segments.clear();
2092        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
2093        self.toc.segment_catalog.tantivy_segments.clear();
2094        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
2095        self.toc.indexes.lex_segments.clear();
2096
2097        let mut time_entries: Vec<TimeIndexEntry> = self
2098            .toc
2099            .frames
2100            .iter()
2101            .filter(|frame| {
2102                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2103            })
2104            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2105            .collect();
2106        let (ti_offset, ti_length, ti_checksum) =
2107            time_index_append(&mut self.file, &mut time_entries)?;
2108        self.toc.time_index = Some(TimeIndexManifest {
2109            bytes_offset: ti_offset,
2110            bytes_length: ti_length,
2111            entry_count: time_entries.len() as u64,
2112            checksum: ti_checksum,
2113        });
2114
2115        let mut footer_offset = ti_offset + ti_length;
2116
2117        #[cfg(feature = "temporal_track")]
2118        {
2119            self.toc.temporal_track = None;
2120            self.toc.segment_catalog.temporal_segments.clear();
2121            self.clear_temporal_track_cache();
2122        }
2123
2124        if self.lex_enabled {
2125            #[cfg(feature = "lex")]
2126            {
2127                if self.tantivy_dirty {
2128                    // instant_index was used: frames were added to Tantivy with WAL
2129                    // sequence numbers as IDs, which don't match the actual frame IDs
2130                    // assigned during apply_records(). Must do a full rebuild to fix IDs.
2131                    if let Ok(mut storage) = self.lex_storage.write() {
2132                        storage.clear();
2133                        storage.set_generation(0);
2134                    }
2135                    self.init_tantivy()?;
2136                    if let Some(mut engine) = self.tantivy.take() {
2137                        self.rebuild_tantivy_engine(&mut engine)?;
2138                        self.tantivy = Some(engine);
2139                    } else {
2140                        return Err(MemvidError::InvalidToc {
2141                            reason: "tantivy engine missing during rebuild".into(),
2142                        });
2143                    }
2144                } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2145                    // Incremental path: engine exists (from open() or previous commit),
2146                    // instant_index was NOT used so no wrong IDs. Just add new frames.
2147                    // This is O(batch_size) — the key optimization for bulk ingestion.
2148                    //
2149                    // Collect frames + text first to avoid borrow conflicts with engine.
2150                    let max_payload = crate::memvid::search::max_index_payload();
2151                    let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2152                    for &frame_id in inserted_frame_ids {
2153                        let Ok(idx) = usize::try_from(frame_id) else {
2154                            continue;
2155                        };
2156                        let frame = match self.toc.frames.get(idx) {
2157                            Some(f) => f.clone(),
2158                            None => continue,
2159                        };
2160                        if frame.status != FrameStatus::Active {
2161                            continue;
2162                        }
2163                        if let Some(search_text) = frame.search_text.clone() {
2164                            if !search_text.trim().is_empty() {
2165                                prepared_docs.push((frame, search_text));
2166                                continue;
2167                            }
2168                        }
2169                        let mime = frame
2170                            .metadata
2171                            .as_ref()
2172                            .and_then(|m| m.mime.as_deref())
2173                            .unwrap_or("application/octet-stream");
2174                        if !crate::memvid::search::is_text_indexable_mime(mime) {
2175                            continue;
2176                        }
2177                        if frame.payload_length > max_payload {
2178                            continue;
2179                        }
2180                        let text = self.frame_search_text(&frame)?;
2181                        if !text.trim().is_empty() {
2182                            prepared_docs.push((frame, text));
2183                        }
2184                    }
2185                    if let Some(engine) = self.tantivy.as_mut() {
2186                        for (frame, text) in &prepared_docs {
2187                            engine.add_frame(frame, text)?;
2188                        }
2189                        engine.commit()?;
2190                    }
2191                } else {
2192                    // Full rebuild path: no engine or no new frames (e.g., doctor repair).
2193                    // Clear embedded storage to avoid carrying stale segments between rebuilds.
2194                    if let Ok(mut storage) = self.lex_storage.write() {
2195                        storage.clear();
2196                        storage.set_generation(0);
2197                    }
2198                    self.init_tantivy()?;
2199                    if let Some(mut engine) = self.tantivy.take() {
2200                        self.rebuild_tantivy_engine(&mut engine)?;
2201                        self.tantivy = Some(engine);
2202                    } else {
2203                        return Err(MemvidError::InvalidToc {
2204                            reason: "tantivy engine missing during rebuild".into(),
2205                        });
2206                    }
2207                }
2208
2209                // Set lex_enabled to ensure it persists
2210                self.lex_enabled = true;
2211
2212                // Mark Tantivy as dirty so it gets flushed
2213                self.tantivy_dirty = true;
2214
2215                // Position embedded Tantivy segments immediately after the time index.
2216                self.data_end = footer_offset;
2217
2218                // Flush Tantivy segments to file
2219                self.flush_tantivy()?;
2220
2221                // Update footer_offset after Tantivy flush
2222                footer_offset = self.header.footer_offset;
2223
2224                // Restore data_end to payload boundary so future payload writes stay before indexes.
2225                self.data_end = payload_end;
2226            }
2227            #[cfg(not(feature = "lex"))]
2228            {
2229                self.toc.indexes.lex = None;
2230                self.toc.indexes.lex_segments.clear();
2231            }
2232        } else {
2233            // Lex disabled: clear everything
2234            self.toc.indexes.lex = None;
2235            self.toc.indexes.lex_segments.clear();
2236            #[cfg(feature = "lex")]
2237            if let Ok(mut storage) = self.lex_storage.write() {
2238                storage.clear();
2239            }
2240        }
2241
2242        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2243            let vec_offset = footer_offset;
2244            self.file.seek(SeekFrom::Start(vec_offset))?;
2245            self.file.write_all(&artifact.bytes)?;
2246            footer_offset += artifact.bytes.len() as u64;
2247            self.toc.indexes.vec = Some(VecIndexManifest {
2248                vector_count: artifact.vector_count,
2249                dimension: artifact.dimension,
2250                bytes_offset: vec_offset,
2251                bytes_length: artifact.bytes.len() as u64,
2252                checksum: artifact.checksum,
2253                compression_mode: self.vec_compression.clone(),
2254                model: self.vec_model.clone(),
2255            });
2256            self.vec_index = Some(index);
2257        } else {
2258            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2259            if !self.vec_enabled {
2260                self.toc.indexes.vec = None;
2261            }
2262            self.vec_index = None;
2263        }
2264
2265        // Persist CLIP index if it has embeddings
2266        if self.clip_enabled {
2267            if let Some(ref clip_index) = self.clip_index {
2268                if !clip_index.is_empty() {
2269                    let artifact = clip_index.encode()?;
2270                    let clip_offset = footer_offset;
2271                    self.file.seek(SeekFrom::Start(clip_offset))?;
2272                    self.file.write_all(&artifact.bytes)?;
2273                    footer_offset += artifact.bytes.len() as u64;
2274                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2275                        bytes_offset: clip_offset,
2276                        bytes_length: artifact.bytes.len() as u64,
2277                        vector_count: artifact.vector_count,
2278                        dimension: artifact.dimension,
2279                        checksum: artifact.checksum,
2280                        model_name: crate::clip::default_model_info().name.to_string(),
2281                    });
2282                    tracing::info!(
2283                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2284                        artifact.vector_count,
2285                        clip_offset
2286                    );
2287                }
2288            }
2289        } else {
2290            self.toc.indexes.clip = None;
2291        }
2292
2293        // Persist memories track if it has cards
2294        if self.memories_track.card_count() > 0 {
2295            let memories_offset = footer_offset;
2296            let memories_bytes = self.memories_track.serialize()?;
2297            let memories_checksum = blake3::hash(&memories_bytes).into();
2298            self.file.seek(SeekFrom::Start(memories_offset))?;
2299            self.file.write_all(&memories_bytes)?;
2300            footer_offset += memories_bytes.len() as u64;
2301
2302            let stats = self.memories_track.stats();
2303            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2304                bytes_offset: memories_offset,
2305                bytes_length: memories_bytes.len() as u64,
2306                card_count: stats.card_count as u64,
2307                entity_count: stats.entity_count as u64,
2308                checksum: memories_checksum,
2309            });
2310        } else {
2311            self.toc.memories_track = None;
2312        }
2313
2314        // Persist logic mesh if it has nodes
2315        if self.logic_mesh.is_empty() {
2316            self.toc.logic_mesh = None;
2317        } else {
2318            let mesh_offset = footer_offset;
2319            let mesh_bytes = self.logic_mesh.serialize()?;
2320            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2321            self.file.seek(SeekFrom::Start(mesh_offset))?;
2322            self.file.write_all(&mesh_bytes)?;
2323            footer_offset += mesh_bytes.len() as u64;
2324
2325            let stats = self.logic_mesh.stats();
2326            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2327                bytes_offset: mesh_offset,
2328                bytes_length: mesh_bytes.len() as u64,
2329                node_count: stats.node_count as u64,
2330                edge_count: stats.edge_count as u64,
2331                checksum: mesh_checksum,
2332            });
2333        }
2334
2335        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2336        tracing::info!(
2337            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2338            ti_offset,
2339            ti_length,
2340            footer_offset,
2341            self.header.footer_offset
2342        );
2343
2344        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2345        // This prevents overwriting data like replay segments that were written after index data
2346        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2347
2348        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2349        if self.file.metadata()?.len() < self.header.footer_offset {
2350            self.file.set_len(self.header.footer_offset)?;
2351        }
2352
2353        self.rewrite_toc_footer()?;
2354        self.header.toc_checksum = self.toc.toc_checksum;
2355        crate::persist_header(&mut self.file, &self.header)?;
2356
2357        #[cfg(feature = "lex")]
2358        if self.lex_enabled {
2359            if let Some(ref engine) = self.tantivy {
2360                let doc_count = engine.num_docs();
2361                let active_frame_count = self
2362                    .toc
2363                    .frames
2364                    .iter()
2365                    .filter(|f| f.status == FrameStatus::Active)
2366                    .count();
2367
2368                // Count frames that would actually be indexed by rebuild_tantivy_engine
2369                // Uses the same logic: content-type based check + size limit
2370                let text_indexable_count = self
2371                    .toc
2372                    .frames
2373                    .iter()
2374                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2375                    .count();
2376
2377                // Only fail if we have text-indexable frames but none got indexed
2378                // This avoids false positives for binary files (videos, images)
2379                if doc_count == 0 && text_indexable_count > 0 {
2380                    return Err(MemvidError::Doctor {
2381                        reason: format!(
2382                            "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2383                            This indicates a critical failure in the rebuild process."
2384                        ),
2385                    });
2386                }
2387
2388                // Success! Log it
2389                log::info!(
2390                    "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2391                );
2392            }
2393        }
2394
2395        Ok(())
2396    }
2397
2398    /// Persist the memories track to the file without a full rebuild.
2399    ///
2400    /// This is used when the memories track has been modified but no frame
2401    /// changes were made (e.g., after running enrichment).
2402    fn persist_memories_track(&mut self) -> Result<()> {
2403        if self.memories_track.card_count() == 0 {
2404            self.toc.memories_track = None;
2405            return Ok(());
2406        }
2407
2408        // Write after the current footer_offset
2409        let memories_offset = self.header.footer_offset;
2410        let memories_bytes = self.memories_track.serialize()?;
2411        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2412
2413        self.file.seek(SeekFrom::Start(memories_offset))?;
2414        self.file.write_all(&memories_bytes)?;
2415
2416        let stats = self.memories_track.stats();
2417        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2418            bytes_offset: memories_offset,
2419            bytes_length: memories_bytes.len() as u64,
2420            card_count: stats.card_count as u64,
2421            entity_count: stats.entity_count as u64,
2422            checksum: memories_checksum,
2423        });
2424
2425        // Update footer_offset to account for the memories track
2426        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2427
2428        // Ensure the file length covers the memories track
2429        if self.file.metadata()?.len() < self.header.footer_offset {
2430            self.file.set_len(self.header.footer_offset)?;
2431        }
2432
2433        Ok(())
2434    }
2435
2436    /// Persist the CLIP index to the file without a full rebuild.
2437    ///
2438    /// This is used when CLIP embeddings have been added but no full
2439    /// index rebuild is needed (e.g., in parallel segments mode).
2440    fn persist_clip_index(&mut self) -> Result<()> {
2441        if !self.clip_enabled {
2442            self.toc.indexes.clip = None;
2443            return Ok(());
2444        }
2445
2446        let clip_index = match &self.clip_index {
2447            Some(idx) if !idx.is_empty() => idx,
2448            _ => {
2449                self.toc.indexes.clip = None;
2450                return Ok(());
2451            }
2452        };
2453
2454        // Encode the CLIP index
2455        let artifact = clip_index.encode()?;
2456
2457        // Write after the current footer_offset
2458        let clip_offset = self.header.footer_offset;
2459        self.file.seek(SeekFrom::Start(clip_offset))?;
2460        self.file.write_all(&artifact.bytes)?;
2461
2462        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2463            bytes_offset: clip_offset,
2464            bytes_length: artifact.bytes.len() as u64,
2465            vector_count: artifact.vector_count,
2466            dimension: artifact.dimension,
2467            checksum: artifact.checksum,
2468            model_name: crate::clip::default_model_info().name.to_string(),
2469        });
2470
2471        tracing::info!(
2472            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2473            artifact.vector_count,
2474            clip_offset
2475        );
2476
2477        // Update footer_offset to account for the CLIP index
2478        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2479
2480        // Ensure the file length covers the CLIP index
2481        if self.file.metadata()?.len() < self.header.footer_offset {
2482            self.file.set_len(self.header.footer_offset)?;
2483        }
2484
2485        Ok(())
2486    }
2487
2488    /// Persist the Logic-Mesh to the file without a full rebuild.
2489    ///
2490    /// This is used when the Logic-Mesh has been modified but no frame
2491    /// changes were made (e.g., after running NER enrichment).
2492    fn persist_logic_mesh(&mut self) -> Result<()> {
2493        if self.logic_mesh.is_empty() {
2494            self.toc.logic_mesh = None;
2495            return Ok(());
2496        }
2497
2498        // Write after the current footer_offset
2499        let mesh_offset = self.header.footer_offset;
2500        let mesh_bytes = self.logic_mesh.serialize()?;
2501        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2502
2503        self.file.seek(SeekFrom::Start(mesh_offset))?;
2504        self.file.write_all(&mesh_bytes)?;
2505
2506        let stats = self.logic_mesh.stats();
2507        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2508            bytes_offset: mesh_offset,
2509            bytes_length: mesh_bytes.len() as u64,
2510            node_count: stats.node_count as u64,
2511            edge_count: stats.edge_count as u64,
2512            checksum: mesh_checksum,
2513        });
2514
2515        // Update footer_offset to account for the logic mesh
2516        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2517
2518        // Ensure the file length covers the logic mesh
2519        if self.file.metadata()?.len() < self.header.footer_offset {
2520            self.file.set_len(self.header.footer_offset)?;
2521        }
2522
2523        Ok(())
2524    }
2525
2526    /// Persist the sketch track to the file without a full rebuild.
2527    ///
2528    /// This is used when the sketch track has been modified (e.g., after
2529    /// running `sketch build`).
2530    fn persist_sketch_track(&mut self) -> Result<()> {
2531        if self.sketch_track.is_empty() {
2532            self.toc.sketch_track = None;
2533            return Ok(());
2534        }
2535
2536        // Seek to write after the current footer_offset
2537        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2538
2539        // Write the sketch track and get (offset, length, checksum)
2540        let (sketch_offset, sketch_length, sketch_checksum) =
2541            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2542
2543        let stats = self.sketch_track.stats();
2544        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2545            bytes_offset: sketch_offset,
2546            bytes_length: sketch_length,
2547            entry_count: stats.entry_count,
2548            #[allow(clippy::cast_possible_truncation)]
2549            entry_size: stats.variant.entry_size() as u16,
2550            flags: 0,
2551            checksum: sketch_checksum,
2552        });
2553
2554        // Update footer_offset to account for the sketch track
2555        self.header.footer_offset = sketch_offset + sketch_length;
2556
2557        // Ensure the file length covers the sketch track
2558        if self.file.metadata()?.len() < self.header.footer_offset {
2559            self.file.set_len(self.header.footer_offset)?;
2560        }
2561
2562        tracing::debug!(
2563            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2564            stats.entry_count,
2565            sketch_offset
2566        );
2567
2568        Ok(())
2569    }
2570
2571    #[cfg(feature = "lex")]
2572    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2573        let LexWalBatch {
2574            generation,
2575            doc_count,
2576            checksum,
2577            segments,
2578        } = batch;
2579
2580        if let Ok(mut storage) = self.lex_storage.write() {
2581            storage.replace(doc_count, checksum, segments);
2582            storage.set_generation(generation);
2583        }
2584
2585        self.persist_lex_manifest()
2586    }
2587
2588    #[cfg(feature = "lex")]
2589    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2590        let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2591        self.append_wal_entry(&payload)?;
2592        Ok(())
2593    }
2594
2595    #[cfg(feature = "lex")]
2596    fn persist_lex_manifest(&mut self) -> Result<()> {
2597        let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2598            storage.to_manifest()
2599        } else {
2600            (None, Vec::new())
2601        };
2602
2603        // Update the manifest
2604        if let Some(storage_manifest) = index_manifest {
2605            // Old LexIndexArtifact format: set the manifest with actual offset/length
2606            self.toc.indexes.lex = Some(storage_manifest);
2607        } else {
2608            // Tantivy segments OR lex disabled: clear the manifest
2609            // Stats will check lex_segments instead of manifest
2610            self.toc.indexes.lex = None;
2611        }
2612
2613        self.toc.indexes.lex_segments = segments;
2614
2615        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2616        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2617
2618        self.rewrite_toc_footer()?;
2619        self.header.toc_checksum = self.toc.toc_checksum;
2620        crate::persist_header(&mut self.file, &self.header)?;
2621        Ok(())
2622    }
2623
2624    #[cfg(feature = "lex")]
2625    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2626        let TantivySnapshot {
2627            doc_count,
2628            checksum,
2629            segments,
2630        } = snapshot;
2631
2632        let mut footer_offset = self.data_end;
2633        self.file.seek(SeekFrom::Start(footer_offset))?;
2634
2635        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2636        for segment in segments {
2637            let bytes_length = segment.bytes.len() as u64;
2638            self.file.write_all(&segment.bytes)?;
2639            self.file.flush()?; // Flush segment data to disk
2640            embedded_segments.push(EmbeddedLexSegment {
2641                path: segment.path,
2642                bytes_offset: footer_offset,
2643                bytes_length,
2644                checksum: segment.checksum,
2645            });
2646            footer_offset += bytes_length;
2647        }
2648        // Set footer_offset for TOC writing, but DON'T update data_end
2649        // data_end stays at end of payloads, so next commit overwrites these segments
2650        // Use max() to never decrease footer_offset - this preserves replay segments
2651        // that may have been written at a higher offset
2652        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2653
2654        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2655        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2656            Vec::with_capacity(embedded_segments.len());
2657        for segment in &embedded_segments {
2658            let descriptor = TantivySegmentDescriptor::from_common(
2659                SegmentCommon::new(
2660                    next_segment_id,
2661                    segment.bytes_offset,
2662                    segment.bytes_length,
2663                    segment.checksum,
2664                ),
2665                segment.path.clone(),
2666            );
2667            catalog_segments.push(descriptor);
2668            next_segment_id = next_segment_id.saturating_add(1);
2669        }
2670        if catalog_segments.is_empty() {
2671            self.toc.segment_catalog.tantivy_segments.clear();
2672        } else {
2673            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2674            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2675        }
2676        self.toc.segment_catalog.next_segment_id = next_segment_id;
2677
2678        // REMOVED: catalog_data_end() check
2679        // This was causing orphaned Tantivy segments because it would see OLD segments
2680        // still in the catalog from previous commits, and push footer_offset forward.
2681        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2682        // stay at the end of the newly written segments.
2683
2684        let generation = self
2685            .lex_storage
2686            .write()
2687            .map_err(|_| MemvidError::Tantivy {
2688                reason: "embedded lex storage lock poisoned".into(),
2689            })
2690            .map(|mut storage| {
2691                storage.replace(doc_count, checksum, embedded_segments.clone());
2692                storage.generation()
2693            })?;
2694
2695        let batch = LexWalBatch {
2696            generation,
2697            doc_count,
2698            checksum,
2699            segments: embedded_segments.clone(),
2700        };
2701        self.append_lex_batch(&batch)?;
2702        self.persist_lex_manifest()?;
2703        self.header.toc_checksum = self.toc.toc_checksum;
2704        crate::persist_header(&mut self.file, &self.header)?;
2705        Ok(())
2706    }
2707
2708    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2709        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2710            frame_id,
2711            reason: "frame id too large",
2712        })?;
2713        let frame = self
2714            .toc
2715            .frames
2716            .get_mut(index)
2717            .ok_or(MemvidError::InvalidFrame {
2718                frame_id,
2719                reason: "delete target missing",
2720            })?;
2721        frame.status = FrameStatus::Deleted;
2722        frame.superseded_by = None;
2723        self.remove_frame_from_indexes(frame_id)
2724    }
2725
2726    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2727        #[cfg(feature = "lex")]
2728        if let Some(engine) = self.tantivy.as_mut() {
2729            engine.delete_frame(frame_id)?;
2730            self.tantivy_dirty = true;
2731        }
2732        if let Some(index) = self.lex_index.as_mut() {
2733            index.remove_document(frame_id);
2734        }
2735        if let Some(index) = self.vec_index.as_mut() {
2736            index.remove(frame_id);
2737        }
2738        Ok(())
2739    }
2740
2741    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2742        let Ok(index) = usize::try_from(frame_id) else {
2743            return false;
2744        };
2745        self.toc
2746            .frames
2747            .get(index)
2748            .is_some_and(|frame| frame.status == FrameStatus::Active)
2749    }
2750
2751    #[cfg(feature = "parallel_segments")]
2752    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2753    where
2754        I: IntoIterator<Item = FrameId>,
2755    {
2756        let mut iter = iter.into_iter();
2757        let first_id = iter.next()?;
2758        let first_frame = self.toc.frames.get(first_id as usize);
2759        let mut min_id = first_id;
2760        let mut max_id = first_id;
2761        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2762        let mut page_end = first_frame
2763            .and_then(|frame| frame.chunk_count)
2764            .map(|count| page_start + count.saturating_sub(1))
2765            .unwrap_or(page_start);
2766        for frame_id in iter {
2767            if frame_id < min_id {
2768                min_id = frame_id;
2769            }
2770            if frame_id > max_id {
2771                max_id = frame_id;
2772            }
2773            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2774                if let Some(idx) = frame.chunk_index {
2775                    page_start = page_start.min(idx);
2776                    if let Some(count) = frame.chunk_count {
2777                        let end = idx + count.saturating_sub(1);
2778                        page_end = page_end.max(end);
2779                    } else {
2780                        page_end = page_end.max(idx);
2781                    }
2782                }
2783            }
2784        }
2785        Some(SegmentSpan {
2786            frame_start: min_id,
2787            frame_end: max_id,
2788            page_start,
2789            page_end,
2790            ..SegmentSpan::default()
2791        })
2792    }
2793
2794    #[cfg(feature = "parallel_segments")]
2795    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2796        common.span = Some(span);
2797        if common.codec_version == 0 {
2798            common.codec_version = 1;
2799        }
2800    }
2801
2802    #[cfg(feature = "parallel_segments")]
2803    pub(crate) fn record_index_segment(
2804        &mut self,
2805        kind: SegmentKind,
2806        common: SegmentCommon,
2807        stats: SegmentStats,
2808    ) -> Result<()> {
2809        let entry = IndexSegmentRef {
2810            kind,
2811            common,
2812            stats,
2813        };
2814        self.toc.segment_catalog.index_segments.push(entry.clone());
2815        if let Some(wal) = self.manifest_wal.as_mut() {
2816            wal.append_segments(&[entry])?;
2817        }
2818        Ok(())
2819    }
2820
2821    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2822        self.ensure_writable()?;
2823        if self.toc.ticket_ref.issuer == "free-tier" {
2824            return Ok(());
2825        }
2826        match self.tier() {
2827            Tier::Free => Ok(()),
2828            tier => {
2829                if self.toc.ticket_ref.issuer.trim().is_empty() {
2830                    Err(MemvidError::TicketRequired { tier })
2831                } else {
2832                    Ok(())
2833                }
2834            }
2835        }
2836    }
2837
2838    pub(crate) fn tier(&self) -> Tier {
2839        if self.header.wal_size >= WAL_SIZE_LARGE {
2840            Tier::Enterprise
2841        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2842            Tier::Dev
2843        } else {
2844            Tier::Free
2845        }
2846    }
2847
2848    pub(crate) fn capacity_limit(&self) -> u64 {
2849        if self.toc.ticket_ref.capacity_bytes != 0 {
2850            self.toc.ticket_ref.capacity_bytes
2851        } else {
2852            self.tier().capacity_bytes()
2853        }
2854    }
2855
2856    /// Get current storage capacity in bytes.
2857    ///
2858    /// Returns the capacity from the applied ticket, or the default
2859    /// tier capacity (1 GB for free tier).
2860    #[must_use]
2861    pub fn get_capacity(&self) -> u64 {
2862        self.capacity_limit()
2863    }
2864
2865    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2866        tracing::info!(
2867            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2868            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2869            time_segments = self.toc.segment_catalog.time_segments.len(),
2870            footer_offset = self.header.footer_offset,
2871            data_end = self.data_end,
2872            "rewrite_toc_footer: about to serialize TOC"
2873        );
2874        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2875        let footer_offset = self.header.footer_offset;
2876        self.file.seek(SeekFrom::Start(footer_offset))?;
2877        self.file.write_all(&toc_bytes)?;
2878        let footer = CommitFooter {
2879            toc_len: toc_bytes.len() as u64,
2880            toc_hash: *hash(&toc_bytes).as_bytes(),
2881            generation: self.generation,
2882        };
2883        let encoded_footer = footer.encode();
2884        self.file.write_all(&encoded_footer)?;
2885
2886        // The file must always be at least header + WAL size
2887        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2888        let min_len = self.header.wal_offset + self.header.wal_size;
2889        let final_len = new_len.max(min_len);
2890
2891        if new_len < min_len {
2892            tracing::warn!(
2893                file.new_len = new_len,
2894                file.min_len = min_len,
2895                file.final_len = final_len,
2896                "truncation would cut into WAL region, clamping to min_len"
2897            );
2898        }
2899
2900        self.file.set_len(final_len)?;
2901        // Ensure footer is flushed to disk so mmap-based readers can find it
2902        self.file.sync_all()?;
2903        Ok(())
2904    }
2905}
2906
2907#[cfg(feature = "parallel_segments")]
2908impl Memvid {
2909    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2910        let chunks = self.collect_segment_chunks(delta)?;
2911        if chunks.is_empty() {
2912            return Ok(false);
2913        }
2914        let planner = SegmentPlanner::new(opts.clone());
2915        let plans = planner.plan_from_chunks(chunks);
2916        if plans.is_empty() {
2917            return Ok(false);
2918        }
2919        let worker_pool = SegmentWorkerPool::new(opts);
2920        let results = worker_pool.execute(plans)?;
2921        if results.is_empty() {
2922            return Ok(false);
2923        }
2924        self.append_parallel_segments(results)?;
2925        Ok(true)
2926    }
2927
2928    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2929        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2930            delta.inserted_embeddings.iter().cloned().collect();
2931        tracing::info!(
2932            inserted_frames = ?delta.inserted_frames,
2933            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2934            "collect_segment_chunks: comparing frame IDs"
2935        );
2936        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2937        for frame_id in &delta.inserted_frames {
2938            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2939                MemvidError::InvalidFrame {
2940                    frame_id: *frame_id,
2941                    reason: "frame id out of range while planning segments",
2942                },
2943            )?;
2944            let text = self.frame_content(&frame)?;
2945            if text.trim().is_empty() {
2946                continue;
2947            }
2948            let token_estimate = estimate_tokens(&text);
2949            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2950            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2951            let page_start = if frame.chunk_index.is_some() {
2952                chunk_index + 1
2953            } else {
2954                0
2955            };
2956            let page_end = if frame.chunk_index.is_some() {
2957                page_start
2958            } else {
2959                0
2960            };
2961            chunks.push(SegmentChunkPlan {
2962                text,
2963                frame_id: *frame_id,
2964                timestamp: frame.timestamp,
2965                chunk_index,
2966                chunk_count: chunk_count.max(1),
2967                token_estimate,
2968                token_start: 0,
2969                token_end: 0,
2970                page_start,
2971                page_end,
2972                embedding: embedding_map.remove(frame_id),
2973            });
2974        }
2975        Ok(chunks)
2976    }
2977}
2978
2979#[cfg(feature = "parallel_segments")]
2980fn estimate_tokens(text: &str) -> usize {
2981    text.split_whitespace().count().max(1)
2982}
2983
2984impl Memvid {
2985    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2986        let catalog_end = self.catalog_data_end();
2987        if catalog_end <= self.header.footer_offset {
2988            return Ok(false);
2989        }
2990        self.header.footer_offset = catalog_end;
2991        self.rewrite_toc_footer()?;
2992        self.header.toc_checksum = self.toc.toc_checksum;
2993        crate::persist_header(&mut self.file, &self.header)?;
2994        Ok(true)
2995    }
2996}
2997
2998impl Memvid {
2999    pub fn vacuum(&mut self) -> Result<()> {
3000        self.commit()?;
3001
3002        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3003        let frames: Vec<Frame> = self
3004            .toc
3005            .frames
3006            .iter()
3007            .filter(|frame| frame.status == FrameStatus::Active)
3008            .cloned()
3009            .collect();
3010        for frame in frames {
3011            let bytes = self.read_frame_payload_bytes(&frame)?;
3012            active_payloads.insert(frame.id, bytes);
3013        }
3014
3015        let mut cursor = self.header.wal_offset + self.header.wal_size;
3016        self.file.seek(SeekFrom::Start(cursor))?;
3017        for frame in &mut self.toc.frames {
3018            if frame.status == FrameStatus::Active {
3019                if let Some(bytes) = active_payloads.get(&frame.id) {
3020                    self.file.write_all(bytes)?;
3021                    frame.payload_offset = cursor;
3022                    frame.payload_length = bytes.len() as u64;
3023                    cursor += bytes.len() as u64;
3024                } else {
3025                    frame.payload_offset = 0;
3026                    frame.payload_length = 0;
3027                }
3028            } else {
3029                frame.payload_offset = 0;
3030                frame.payload_length = 0;
3031            }
3032        }
3033
3034        self.data_end = cursor;
3035
3036        self.toc.segments.clear();
3037        self.toc.indexes.lex_segments.clear();
3038        self.toc.segment_catalog.lex_segments.clear();
3039        self.toc.segment_catalog.vec_segments.clear();
3040        self.toc.segment_catalog.time_segments.clear();
3041        #[cfg(feature = "temporal_track")]
3042        {
3043            self.toc.temporal_track = None;
3044            self.toc.segment_catalog.temporal_segments.clear();
3045        }
3046        #[cfg(feature = "lex")]
3047        {
3048            self.toc.segment_catalog.tantivy_segments.clear();
3049        }
3050        #[cfg(feature = "parallel_segments")]
3051        {
3052            self.toc.segment_catalog.index_segments.clear();
3053        }
3054
3055        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
3056        #[cfg(feature = "lex")]
3057        {
3058            self.tantivy = None;
3059            self.tantivy_dirty = false;
3060        }
3061
3062        self.rebuild_indexes(&[], &[])?;
3063        self.file.sync_all()?;
3064        Ok(())
3065    }
3066
3067    /// Preview how a document would be chunked without actually ingesting it.
3068    ///
3069    /// This is useful when you need to compute embeddings for each chunk externally
3070    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
3071    /// is too small to be chunked (< 2400 chars after normalization).
3072    ///
3073    /// # Example
3074    /// ```ignore
3075    /// let chunks = mem.preview_chunks(b"long document text...")?;
3076    /// if let Some(chunk_texts) = chunks {
3077    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
3078    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
3079    /// } else {
3080    ///     let embedding = my_embedder.embed_query(text)?;
3081    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
3082    /// }
3083    /// ```
3084    #[must_use]
3085    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3086        plan_document_chunks(payload).map(|plan| plan.chunks)
3087    }
3088
3089    /// Append raw bytes as a document frame.
3090    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3091        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3092    }
3093
3094    /// Append raw bytes with explicit metadata/options.
3095    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3096        self.put_internal(Some(payload), None, None, None, options, None)
3097    }
3098
3099    /// Append bytes and an existing embedding (bypasses on-device embedding).
3100    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3101        self.put_internal(
3102            Some(payload),
3103            None,
3104            Some(embedding),
3105            None,
3106            PutOptions::default(),
3107            None,
3108        )
3109    }
3110
3111    pub fn put_with_embedding_and_options(
3112        &mut self,
3113        payload: &[u8],
3114        embedding: Vec<f32>,
3115        options: PutOptions,
3116    ) -> Result<u64> {
3117        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3118    }
3119
3120    /// Ingest a document with pre-computed embeddings for both parent and chunks.
3121    ///
3122    /// This is the recommended API for high-accuracy semantic search when chunking
3123    /// occurs. The caller provides:
3124    /// - `payload`: The document bytes
3125    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
3126    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
3127    /// - `options`: Standard put options
3128    ///
3129    /// The number of chunk embeddings should match the number of chunks that will be
3130    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
3131    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
3132    pub fn put_with_chunk_embeddings(
3133        &mut self,
3134        payload: &[u8],
3135        parent_embedding: Option<Vec<f32>>,
3136        chunk_embeddings: Vec<Vec<f32>>,
3137        options: PutOptions,
3138    ) -> Result<u64> {
3139        self.put_internal(
3140            Some(payload),
3141            None,
3142            parent_embedding,
3143            Some(chunk_embeddings),
3144            options,
3145            None,
3146        )
3147    }
3148
3149    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
3150    pub fn update_frame(
3151        &mut self,
3152        frame_id: FrameId,
3153        payload: Option<Vec<u8>>,
3154        mut options: PutOptions,
3155        embedding: Option<Vec<f32>>,
3156    ) -> Result<u64> {
3157        self.ensure_mutation_allowed()?;
3158        let existing = self.frame_by_id(frame_id)?;
3159        if existing.status != FrameStatus::Active {
3160            return Err(MemvidError::InvalidFrame {
3161                frame_id,
3162                reason: "frame is not active",
3163            });
3164        }
3165
3166        if options.timestamp.is_none() {
3167            options.timestamp = Some(existing.timestamp);
3168        }
3169        if options.track.is_none() {
3170            options.track = existing.track.clone();
3171        }
3172        if options.kind.is_none() {
3173            options.kind = existing.kind.clone();
3174        }
3175        if options.uri.is_none() {
3176            options.uri = existing.uri.clone();
3177        }
3178        if options.title.is_none() {
3179            options.title = existing.title.clone();
3180        }
3181        if options.metadata.is_none() {
3182            options.metadata = existing.metadata.clone();
3183        }
3184        if options.search_text.is_none() {
3185            options.search_text = existing.search_text.clone();
3186        }
3187        if options.tags.is_empty() {
3188            options.tags = existing.tags.clone();
3189        }
3190        if options.labels.is_empty() {
3191            options.labels = existing.labels.clone();
3192        }
3193        if options.extra_metadata.is_empty() {
3194            options.extra_metadata = existing.extra_metadata.clone();
3195        }
3196
3197        let reuse_frame = if payload.is_none() {
3198            options.auto_tag = false;
3199            options.extract_dates = false;
3200            Some(existing.clone())
3201        } else {
3202            None
3203        };
3204
3205        let effective_embedding = if let Some(explicit) = embedding {
3206            Some(explicit)
3207        } else if self.vec_enabled {
3208            self.frame_embedding(frame_id)?
3209        } else {
3210            None
3211        };
3212
3213        let payload_slice = payload.as_deref();
3214        let reuse_flag = reuse_frame.is_some();
3215        let replace_flag = payload_slice.is_some();
3216        let seq = self.put_internal(
3217            payload_slice,
3218            reuse_frame,
3219            effective_embedding,
3220            None, // No chunk embeddings for update
3221            options,
3222            Some(frame_id),
3223        )?;
3224        info!(
3225            "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3226        );
3227        Ok(seq)
3228    }
3229
3230    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3231        self.ensure_mutation_allowed()?;
3232        let frame = self.frame_by_id(frame_id)?;
3233        if frame.status != FrameStatus::Active {
3234            return Err(MemvidError::InvalidFrame {
3235                frame_id,
3236                reason: "frame is not active",
3237            });
3238        }
3239
3240        let mut tombstone = WalEntryData {
3241            timestamp: SystemTime::now()
3242                .duration_since(UNIX_EPOCH)
3243                .map(|d| d.as_secs() as i64)
3244                .unwrap_or(frame.timestamp),
3245            kind: None,
3246            track: None,
3247            payload: Vec::new(),
3248            embedding: None,
3249            uri: frame.uri.clone(),
3250            title: frame.title.clone(),
3251            canonical_encoding: frame.canonical_encoding,
3252            canonical_length: frame.canonical_length,
3253            metadata: None,
3254            search_text: None,
3255            tags: Vec::new(),
3256            labels: Vec::new(),
3257            extra_metadata: BTreeMap::new(),
3258            content_dates: Vec::new(),
3259            chunk_manifest: None,
3260            role: frame.role,
3261            parent_sequence: None,
3262            chunk_index: frame.chunk_index,
3263            chunk_count: frame.chunk_count,
3264            op: FrameWalOp::Tombstone,
3265            target_frame_id: Some(frame_id),
3266            supersedes_frame_id: None,
3267            reuse_payload_from: None,
3268            source_sha256: None,
3269            source_path: None,
3270            enrichment_state: crate::types::EnrichmentState::default(),
3271        };
3272        tombstone.kind = frame.kind.clone();
3273        tombstone.track = frame.track.clone();
3274
3275        let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3276        let seq = self.append_wal_entry(&payload_bytes)?;
3277        self.dirty = true;
3278        let suppress_checkpoint = self
3279            .batch_opts
3280            .as_ref()
3281            .is_some_and(|o| o.disable_auto_checkpoint);
3282        if !suppress_checkpoint && self.wal.should_checkpoint() {
3283            self.commit()?;
3284        }
3285        info!("frame_delete frame_id={frame_id} seq={seq}");
3286        Ok(seq)
3287    }
3288}
3289
3290impl Memvid {
3291    fn put_internal(
3292        &mut self,
3293        payload: Option<&[u8]>,
3294        reuse_frame: Option<Frame>,
3295        embedding: Option<Vec<f32>>,
3296        chunk_embeddings: Option<Vec<Vec<f32>>>,
3297        mut options: PutOptions,
3298        supersedes: Option<FrameId>,
3299    ) -> Result<u64> {
3300        self.ensure_mutation_allowed()?;
3301
3302        // Deduplication: if enabled and we have payload, check if identical content exists
3303        if options.dedup {
3304            if let Some(bytes) = payload {
3305                let content_hash = hash(bytes);
3306                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3307                    // Found existing frame with same content hash, skip ingestion
3308                    tracing::debug!(
3309                        frame_id = existing_frame.id,
3310                        "dedup: skipping ingestion, identical content already exists"
3311                    );
3312                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3313                    return Ok(existing_frame.id);
3314                }
3315            }
3316        }
3317
3318        if payload.is_some() && reuse_frame.is_some() {
3319            let frame_id = reuse_frame
3320                .as_ref()
3321                .map(|frame| frame.id)
3322                .unwrap_or_default();
3323            return Err(MemvidError::InvalidFrame {
3324                frame_id,
3325                reason: "cannot reuse payload when bytes are provided",
3326            });
3327        }
3328
3329        // If the caller supplies embeddings, enforce a single vector dimension contract
3330        // for the entire memory (fail fast, never silently accept mixed dimensions).
3331        let incoming_dimension = {
3332            let mut dim: Option<u32> = None;
3333
3334            if let Some(ref vector) = embedding {
3335                if !vector.is_empty() {
3336                    #[allow(clippy::cast_possible_truncation)]
3337                    let len = vector.len() as u32;
3338                    dim = Some(len);
3339                }
3340            }
3341
3342            if let Some(ref vectors) = chunk_embeddings {
3343                for vector in vectors {
3344                    if vector.is_empty() {
3345                        continue;
3346                    }
3347                    let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3348                    match dim {
3349                        None => dim = Some(vec_dim),
3350                        Some(existing) if existing == vec_dim => {}
3351                        Some(existing) => {
3352                            return Err(MemvidError::VecDimensionMismatch {
3353                                expected: existing,
3354                                actual: vector.len(),
3355                            });
3356                        }
3357                    }
3358                }
3359            }
3360
3361            dim
3362        };
3363
3364        if let Some(incoming_dimension) = incoming_dimension {
3365            // Embeddings imply vector search should be enabled.
3366            if !self.vec_enabled {
3367                self.enable_vec()?;
3368            }
3369
3370            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3371                if existing_dimension != incoming_dimension {
3372                    return Err(MemvidError::VecDimensionMismatch {
3373                        expected: existing_dimension,
3374                        actual: incoming_dimension as usize,
3375                    });
3376                }
3377            }
3378
3379            // Persist the dimension early for better auto-detection (even before the next commit).
3380            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3381                if manifest.dimension == 0 {
3382                    manifest.dimension = incoming_dimension;
3383                }
3384            }
3385        }
3386
3387        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3388        let payload_tail = self.payload_region_end();
3389        let projected = if let Some(bytes) = payload {
3390            let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3391                prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3392            } else {
3393                prepare_canonical_payload(bytes)?
3394            };
3395            let len = prepared.len();
3396            prepared_payload = Some((prepared, encoding, length));
3397            payload_tail.saturating_add(len as u64)
3398        } else if reuse_frame.is_some() {
3399            payload_tail
3400        } else {
3401            return Err(MemvidError::InvalidFrame {
3402                frame_id: 0,
3403                reason: "payload required for frame insertion",
3404            });
3405        };
3406
3407        let capacity_limit = self.capacity_limit();
3408        if projected > capacity_limit {
3409            let incoming_size = projected.saturating_sub(payload_tail);
3410            return Err(MemvidError::CapacityExceeded {
3411                current: payload_tail,
3412                limit: capacity_limit,
3413                required: incoming_size,
3414            });
3415        }
3416        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3417            SystemTime::now()
3418                .duration_since(UNIX_EPOCH)
3419                .map(|d| d.as_secs() as i64)
3420                .unwrap_or(0)
3421        });
3422
3423        #[allow(unused_assignments)]
3424        let mut reuse_bytes: Option<Vec<u8>> = None;
3425        let payload_for_processing = if let Some(bytes) = payload {
3426            Some(bytes)
3427        } else if let Some(frame) = reuse_frame.as_ref() {
3428            let bytes = self.frame_canonical_bytes(frame)?;
3429            reuse_bytes = Some(bytes);
3430            reuse_bytes.as_deref()
3431        } else {
3432            None
3433        };
3434
3435        // Try to create a chunk plan from raw UTF-8 bytes first
3436        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3437            (Some(bytes), None) => plan_document_chunks(bytes),
3438            _ => None,
3439        };
3440
3441        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3442        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3443        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3444        let mut source_sha256: Option<[u8; 32]> = None;
3445        let source_path_value = options.source_path.take();
3446
3447        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3448            if raw_chunk_plan.is_some() {
3449                // UTF-8 text document - chunks contain the text, no parent payload needed
3450                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3451            } else if options.no_raw {
3452                // --no-raw mode: don't store the raw binary, only compute hash
3453                if let Some(bytes) = payload {
3454                    // Compute BLAKE3 hash of original binary for verification
3455                    let hash_result = hash(bytes);
3456                    source_sha256 = Some(*hash_result.as_bytes());
3457                    // Store empty payload - the extracted text is in search_text
3458                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3459                } else {
3460                    return Err(MemvidError::InvalidFrame {
3461                        frame_id: 0,
3462                        reason: "payload required for --no-raw mode",
3463                    });
3464                }
3465            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3466                (prepared, encoding, length, None)
3467            } else if let Some(bytes) = payload {
3468                let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3469                    prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3470                } else {
3471                    prepare_canonical_payload(bytes)?
3472                };
3473                (prepared, encoding, length, None)
3474            } else if let Some(frame) = reuse_frame.as_ref() {
3475                (
3476                    Vec::new(),
3477                    frame.canonical_encoding,
3478                    frame.canonical_length,
3479                    Some(frame.id),
3480                )
3481            } else {
3482                return Err(MemvidError::InvalidFrame {
3483                    frame_id: 0,
3484                    reason: "payload required for frame insertion",
3485                });
3486            };
3487
3488        // Track whether we'll create an extracted text chunk plan later
3489        let mut chunk_plan = raw_chunk_plan;
3490
3491        let mut metadata = options.metadata.take();
3492        let mut search_text = options
3493            .search_text
3494            .take()
3495            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3496        let mut tags = std::mem::take(&mut options.tags);
3497        let mut labels = std::mem::take(&mut options.labels);
3498        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3499        let mut content_dates: Vec<String> = Vec::new();
3500
3501        let need_search_text = search_text
3502            .as_ref()
3503            .is_none_or(|text| text.trim().is_empty());
3504        let need_metadata = metadata.is_none();
3505        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3506
3507        let mut extraction_error = None;
3508        let mut is_skim_extraction = false; // Track if extraction was time-limited
3509
3510        let extracted = if run_extractor {
3511            if let Some(bytes) = payload_for_processing {
3512                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3513                let uri_hint = options.uri.as_deref();
3514
3515                // Use time-budgeted extraction for instant indexing with a budget
3516                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3517
3518                if use_budgeted {
3519                    // Time-budgeted extraction for sub-second ingestion
3520                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3521                        options.extraction_budget_ms,
3522                    );
3523                    match crate::extract_budgeted::extract_with_budget(
3524                        bytes, mime_hint, uri_hint, budget,
3525                    ) {
3526                        Ok(result) => {
3527                            is_skim_extraction = result.is_skim();
3528                            if is_skim_extraction {
3529                                tracing::debug!(
3530                                    coverage = result.coverage,
3531                                    elapsed_ms = result.elapsed_ms,
3532                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3533                                    "time-budgeted extraction (skim)"
3534                                );
3535                            }
3536                            // Convert BudgetedExtractionResult to ExtractedDocument
3537                            let doc = crate::extract::ExtractedDocument {
3538                                text: if result.text.is_empty() {
3539                                    None
3540                                } else {
3541                                    Some(result.text)
3542                                },
3543                                metadata: serde_json::json!({
3544                                    "skim": is_skim_extraction,
3545                                    "coverage": result.coverage,
3546                                    "sections_extracted": result.sections_extracted,
3547                                    "sections_total": result.sections_total,
3548                                }),
3549                                mime_type: mime_hint.map(std::string::ToString::to_string),
3550                            };
3551                            Some(doc)
3552                        }
3553                        Err(err) => {
3554                            // Fall back to full extraction on budgeted extraction error
3555                            tracing::warn!(
3556                                ?err,
3557                                "budgeted extraction failed, trying full extraction"
3558                            );
3559                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3560                                Ok(doc) => Some(doc),
3561                                Err(err) => {
3562                                    extraction_error = Some(err);
3563                                    None
3564                                }
3565                            }
3566                        }
3567                    }
3568                } else {
3569                    // Full extraction (no time budget)
3570                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3571                        Ok(doc) => Some(doc),
3572                        Err(err) => {
3573                            extraction_error = Some(err);
3574                            None
3575                        }
3576                    }
3577                }
3578            } else {
3579                None
3580            }
3581        } else {
3582            None
3583        };
3584
3585        if let Some(err) = extraction_error {
3586            return Err(err);
3587        }
3588
3589        if let Some(doc) = &extracted {
3590            if need_search_text {
3591                if let Some(text) = &doc.text {
3592                    if let Some(normalized) =
3593                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3594                    {
3595                        search_text = Some(normalized);
3596                    }
3597                }
3598            }
3599
3600            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3601            // from extracted text. This ensures large documents like PDFs get fully indexed.
3602            if chunk_plan.is_none() {
3603                if let Some(text) = &doc.text {
3604                    chunk_plan = plan_text_chunks(text);
3605                }
3606            }
3607
3608            if let Some(mime) = doc.mime_type.as_ref() {
3609                if let Some(existing) = &mut metadata {
3610                    if existing.mime.is_none() {
3611                        existing.mime = Some(mime.clone());
3612                    }
3613                } else {
3614                    let mut doc_meta = DocMetadata::default();
3615                    doc_meta.mime = Some(mime.clone());
3616                    metadata = Some(doc_meta);
3617                }
3618            }
3619
3620            // Only add extractous_metadata when auto_tag is enabled
3621            // This allows callers to disable metadata pollution by setting auto_tag=false
3622            if options.auto_tag {
3623                if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3624                {
3625                    extra_metadata
3626                        .entry("extractous_metadata".to_string())
3627                        .or_insert(meta_json);
3628                }
3629            }
3630        }
3631
3632        if options.auto_tag {
3633            if let Some(ref text) = search_text {
3634                if !text.trim().is_empty() {
3635                    let result = AutoTagger.analyse(text, options.extract_dates);
3636                    merge_unique(&mut tags, result.tags);
3637                    merge_unique(&mut labels, result.labels);
3638                    if options.extract_dates && content_dates.is_empty() {
3639                        content_dates = result.content_dates;
3640                    }
3641                }
3642            }
3643        }
3644
3645        if content_dates.is_empty() {
3646            if let Some(frame) = reuse_frame.as_ref() {
3647                content_dates = frame.content_dates.clone();
3648            }
3649        }
3650
3651        let metadata_ref = metadata.as_ref();
3652        let mut search_text = augment_search_text(
3653            search_text,
3654            options.uri.as_deref(),
3655            options.title.as_deref(),
3656            options.track.as_deref(),
3657            &tags,
3658            &labels,
3659            &extra_metadata,
3660            &content_dates,
3661            metadata_ref,
3662        );
3663        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3664        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3665        let mut parent_chunk_count: Option<u32> = None;
3666
3667        let kind_value = options.kind.take();
3668        let track_value = options.track.take();
3669        let uri_value = options.uri.take();
3670        let title_value = options.title.take();
3671        let should_extract_triplets = options.extract_triplets;
3672        // Save references for triplet extraction (after search_text is moved into WAL entry)
3673        let triplet_uri = uri_value.clone();
3674        let triplet_title = title_value.clone();
3675
3676        if let Some(plan) = chunk_plan.as_ref() {
3677            let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3678            parent_chunk_manifest = Some(plan.manifest.clone());
3679            parent_chunk_count = Some(chunk_total);
3680
3681            if let Some(first_chunk) = plan.chunks.first() {
3682                if let Some(normalized) =
3683                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3684                {
3685                    if !normalized.trim().is_empty() {
3686                        search_text = Some(normalized);
3687                    }
3688                }
3689            }
3690
3691            let chunk_tags = tags.clone();
3692            let chunk_labels = labels.clone();
3693            let chunk_metadata = metadata.clone();
3694            let chunk_extra_metadata = extra_metadata.clone();
3695            let chunk_content_dates = content_dates.clone();
3696
3697            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3698                let (chunk_payload, chunk_encoding, chunk_length) =
3699                    prepare_canonical_payload(chunk_text.as_bytes())?;
3700                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3701                    .map(|n| n.text)
3702                    .filter(|text| !text.trim().is_empty());
3703
3704                let chunk_uri = uri_value
3705                    .as_ref()
3706                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3707                let chunk_title = title_value
3708                    .as_ref()
3709                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3710
3711                // Use provided chunk embedding if available, otherwise None
3712                let chunk_embedding = chunk_embeddings
3713                    .as_ref()
3714                    .and_then(|embeddings| embeddings.get(idx).cloned());
3715
3716                chunk_entries.push(WalEntryData {
3717                    timestamp,
3718                    kind: kind_value.clone(),
3719                    track: track_value.clone(),
3720                    payload: chunk_payload,
3721                    embedding: chunk_embedding,
3722                    uri: chunk_uri,
3723                    title: chunk_title,
3724                    canonical_encoding: chunk_encoding,
3725                    canonical_length: chunk_length,
3726                    metadata: chunk_metadata.clone(),
3727                    search_text: chunk_search_text,
3728                    tags: chunk_tags.clone(),
3729                    labels: chunk_labels.clone(),
3730                    extra_metadata: chunk_extra_metadata.clone(),
3731                    content_dates: chunk_content_dates.clone(),
3732                    chunk_manifest: None,
3733                    role: FrameRole::DocumentChunk,
3734                    parent_sequence: None,
3735                    chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3736                    chunk_count: Some(chunk_total),
3737                    op: FrameWalOp::Insert,
3738                    target_frame_id: None,
3739                    supersedes_frame_id: None,
3740                    reuse_payload_from: None,
3741                    source_sha256: None, // Chunks don't have source references
3742                    source_path: None,
3743                    // Chunks are already extracted, so mark as Enriched
3744                    enrichment_state: crate::types::EnrichmentState::Enriched,
3745                });
3746            }
3747        }
3748
3749        let parent_uri = uri_value.clone();
3750        let parent_title = title_value.clone();
3751
3752        // Get parent_sequence from options.parent_id if provided
3753        // We need the WAL sequence of the parent frame to link them
3754        let parent_sequence = if let Some(parent_id) = options.parent_id {
3755            // Look up the parent frame to get its WAL sequence
3756            // Since frame.id corresponds to the array index, we need to find the sequence
3757            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3758            // This works because sequence numbers are assigned incrementally
3759            usize::try_from(parent_id)
3760                .ok()
3761                .and_then(|idx| self.toc.frames.get(idx))
3762                .map(|_| parent_id + 2) // WAL sequences start at 2
3763        } else {
3764            None
3765        };
3766
3767        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3768        let triplet_text = search_text.clone();
3769
3770        // Capture values needed for instant indexing BEFORE they're moved into entry
3771        #[cfg(feature = "lex")]
3772        let instant_index_tags = if options.instant_index {
3773            tags.clone()
3774        } else {
3775            Vec::new()
3776        };
3777        #[cfg(feature = "lex")]
3778        let instant_index_labels = if options.instant_index {
3779            labels.clone()
3780        } else {
3781            Vec::new()
3782        };
3783
3784        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3785        #[cfg(feature = "lex")]
3786        let needs_enrichment =
3787            options.instant_index && (options.enable_embedding || is_skim_extraction);
3788        #[cfg(feature = "lex")]
3789        let enrichment_state = if needs_enrichment {
3790            crate::types::EnrichmentState::Searchable
3791        } else {
3792            crate::types::EnrichmentState::Enriched
3793        };
3794        #[cfg(not(feature = "lex"))]
3795        let enrichment_state = crate::types::EnrichmentState::Enriched;
3796
3797        let entry = WalEntryData {
3798            timestamp,
3799            kind: kind_value,
3800            track: track_value,
3801            payload: storage_payload,
3802            embedding,
3803            uri: parent_uri,
3804            title: parent_title,
3805            canonical_encoding,
3806            canonical_length,
3807            metadata,
3808            search_text,
3809            tags,
3810            labels,
3811            extra_metadata,
3812            content_dates,
3813            chunk_manifest: parent_chunk_manifest,
3814            role: options.role,
3815            parent_sequence,
3816            chunk_index: None,
3817            chunk_count: parent_chunk_count,
3818            op: FrameWalOp::Insert,
3819            target_frame_id: None,
3820            supersedes_frame_id: supersedes,
3821            reuse_payload_from,
3822            source_sha256,
3823            source_path: source_path_value,
3824            enrichment_state,
3825        };
3826
3827        let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3828        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3829        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3830
3831        // Instant indexing: make frame searchable immediately (<1s) without full commit
3832        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3833        #[cfg(feature = "lex")]
3834        if options.instant_index && self.tantivy.is_some() {
3835            // Create a minimal frame for indexing
3836            let frame_id = parent_seq as FrameId;
3837
3838            // Use triplet_text which was cloned before entry was created
3839            if let Some(ref text) = triplet_text {
3840                if !text.trim().is_empty() {
3841                    // Create temporary frame for indexing (minimal fields for Tantivy)
3842                    let temp_frame = Frame {
3843                        id: frame_id,
3844                        timestamp,
3845                        anchor_ts: None,
3846                        anchor_source: None,
3847                        kind: options.kind.clone(),
3848                        track: options.track.clone(),
3849                        payload_offset: 0,
3850                        payload_length: 0,
3851                        checksum: [0u8; 32],
3852                        uri: options
3853                            .uri
3854                            .clone()
3855                            .or_else(|| Some(crate::default_uri(frame_id))),
3856                        title: options.title.clone(),
3857                        canonical_encoding: crate::types::CanonicalEncoding::default(),
3858                        canonical_length: None,
3859                        metadata: None, // Not needed for text search
3860                        search_text: triplet_text.clone(),
3861                        tags: instant_index_tags.clone(),
3862                        labels: instant_index_labels.clone(),
3863                        extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3864                        content_dates: Vec::new(),                         // Not needed for search
3865                        chunk_manifest: None,
3866                        role: options.role,
3867                        parent_id: None,
3868                        chunk_index: None,
3869                        chunk_count: None,
3870                        status: FrameStatus::Active,
3871                        supersedes,
3872                        superseded_by: None,
3873                        source_sha256: None, // Not needed for search
3874                        source_path: None,   // Not needed for search
3875                        enrichment_state: crate::types::EnrichmentState::Searchable,
3876                    };
3877
3878                    // Get mutable reference to engine and index the frame
3879                    if let Some(engine) = self.tantivy.as_mut() {
3880                        engine.add_frame(&temp_frame, text)?;
3881                        engine.soft_commit()?;
3882                        self.tantivy_dirty = true;
3883
3884                        tracing::debug!(
3885                            frame_id = frame_id,
3886                            "instant index: frame searchable immediately"
3887                        );
3888                    }
3889                }
3890            }
3891        }
3892
3893        // Queue frame for background enrichment when using instant index path
3894        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3895        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3896        #[cfg(feature = "lex")]
3897        if needs_enrichment {
3898            let frame_id = parent_seq as FrameId;
3899            self.toc.enrichment_queue.push(frame_id);
3900            tracing::debug!(
3901                frame_id = frame_id,
3902                is_skim = is_skim_extraction,
3903                needs_embedding = options.enable_embedding,
3904                "queued frame for background enrichment"
3905            );
3906        }
3907
3908        for mut chunk_entry in chunk_entries {
3909            chunk_entry.parent_sequence = Some(parent_seq);
3910            let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3911            self.append_wal_entry(&chunk_bytes)?;
3912            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3913        }
3914
3915        self.dirty = true;
3916        let suppress_checkpoint = self
3917            .batch_opts
3918            .as_ref()
3919            .is_some_and(|o| o.disable_auto_checkpoint);
3920        if !suppress_checkpoint && self.wal.should_checkpoint() {
3921            self.commit()?;
3922        }
3923
3924        // Record the put action if a replay session is active
3925        #[cfg(feature = "replay")]
3926        if let Some(input_bytes) = payload {
3927            self.record_put_action(parent_seq, input_bytes);
3928        }
3929
3930        // Extract triplets if enabled (default: true)
3931        // Triplets are stored as MemoryCards with entity/slot/value structure
3932        if should_extract_triplets {
3933            if let Some(ref text) = triplet_text {
3934                if !text.trim().is_empty() {
3935                    let extractor = TripletExtractor::default();
3936                    let frame_id = parent_seq as FrameId;
3937                    let (cards, _stats) = extractor.extract(
3938                        frame_id,
3939                        text,
3940                        triplet_uri.as_deref(),
3941                        triplet_title.as_deref(),
3942                        timestamp,
3943                    );
3944
3945                    if !cards.is_empty() {
3946                        // Add cards to memories track
3947                        let card_ids = self.memories_track.add_cards(cards);
3948
3949                        // Record enrichment for incremental processing
3950                        self.memories_track
3951                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3952                    }
3953                }
3954            }
3955        }
3956
3957        Ok(parent_seq)
3958    }
3959}
3960
3961#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3962#[serde(rename_all = "snake_case")]
3963pub(crate) enum FrameWalOp {
3964    Insert,
3965    Tombstone,
3966}
3967
3968impl Default for FrameWalOp {
3969    fn default() -> Self {
3970        Self::Insert
3971    }
3972}
3973
3974#[derive(Debug, Serialize, Deserialize)]
3975enum WalEntry {
3976    Frame(WalEntryData),
3977    #[cfg(feature = "lex")]
3978    Lex(LexWalBatch),
3979}
3980
3981fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3982    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3983        return Ok(entry);
3984    }
3985    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3986    Ok(WalEntry::Frame(legacy))
3987}
3988
3989#[derive(Debug, Serialize, Deserialize)]
3990pub(crate) struct WalEntryData {
3991    pub(crate) timestamp: i64,
3992    pub(crate) kind: Option<String>,
3993    pub(crate) track: Option<String>,
3994    pub(crate) payload: Vec<u8>,
3995    pub(crate) embedding: Option<Vec<f32>>,
3996    #[serde(default)]
3997    pub(crate) uri: Option<String>,
3998    #[serde(default)]
3999    pub(crate) title: Option<String>,
4000    #[serde(default)]
4001    pub(crate) canonical_encoding: CanonicalEncoding,
4002    #[serde(default)]
4003    pub(crate) canonical_length: Option<u64>,
4004    #[serde(default)]
4005    pub(crate) metadata: Option<DocMetadata>,
4006    #[serde(default)]
4007    pub(crate) search_text: Option<String>,
4008    #[serde(default)]
4009    pub(crate) tags: Vec<String>,
4010    #[serde(default)]
4011    pub(crate) labels: Vec<String>,
4012    #[serde(default)]
4013    pub(crate) extra_metadata: BTreeMap<String, String>,
4014    #[serde(default)]
4015    pub(crate) content_dates: Vec<String>,
4016    #[serde(default)]
4017    pub(crate) chunk_manifest: Option<TextChunkManifest>,
4018    #[serde(default)]
4019    pub(crate) role: FrameRole,
4020    #[serde(default)]
4021    pub(crate) parent_sequence: Option<u64>,
4022    #[serde(default)]
4023    pub(crate) chunk_index: Option<u32>,
4024    #[serde(default)]
4025    pub(crate) chunk_count: Option<u32>,
4026    #[serde(default)]
4027    pub(crate) op: FrameWalOp,
4028    #[serde(default)]
4029    pub(crate) target_frame_id: Option<FrameId>,
4030    #[serde(default)]
4031    pub(crate) supersedes_frame_id: Option<FrameId>,
4032    #[serde(default)]
4033    pub(crate) reuse_payload_from: Option<FrameId>,
4034    /// SHA-256 hash of original source file (set when --no-raw is used).
4035    #[serde(default)]
4036    pub(crate) source_sha256: Option<[u8; 32]>,
4037    /// Original source file path (set when --no-raw is used).
4038    #[serde(default)]
4039    pub(crate) source_path: Option<String>,
4040    /// Enrichment state for progressive ingestion.
4041    #[serde(default)]
4042    pub(crate) enrichment_state: crate::types::EnrichmentState,
4043}
4044
4045pub(crate) fn prepare_canonical_payload(
4046    payload: &[u8],
4047) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4048    prepare_canonical_payload_with_level(payload, 3)
4049}
4050
4051pub(crate) fn prepare_canonical_payload_with_level(
4052    payload: &[u8],
4053    level: i32,
4054) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4055    if level == 0 {
4056        // No compression — store as plain text
4057        return Ok((
4058            payload.to_vec(),
4059            CanonicalEncoding::Plain,
4060            Some(payload.len() as u64),
4061        ));
4062    }
4063    if std::str::from_utf8(payload).is_ok() {
4064        let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4065        Ok((
4066            compressed,
4067            CanonicalEncoding::Zstd,
4068            Some(payload.len() as u64),
4069        ))
4070    } else {
4071        Ok((
4072            payload.to_vec(),
4073            CanonicalEncoding::Plain,
4074            Some(payload.len() as u64),
4075        ))
4076    }
4077}
4078
4079pub(crate) fn augment_search_text(
4080    base: Option<String>,
4081    uri: Option<&str>,
4082    title: Option<&str>,
4083    track: Option<&str>,
4084    tags: &[String],
4085    labels: &[String],
4086    extra_metadata: &BTreeMap<String, String>,
4087    content_dates: &[String],
4088    metadata: Option<&DocMetadata>,
4089) -> Option<String> {
4090    let mut segments: Vec<String> = Vec::new();
4091    if let Some(text) = base {
4092        let trimmed = text.trim();
4093        if !trimmed.is_empty() {
4094            segments.push(trimmed.to_string());
4095        }
4096    }
4097
4098    if let Some(title) = title {
4099        if !title.trim().is_empty() {
4100            segments.push(format!("title: {}", title.trim()));
4101        }
4102    }
4103
4104    if let Some(uri) = uri {
4105        if !uri.trim().is_empty() {
4106            segments.push(format!("uri: {}", uri.trim()));
4107        }
4108    }
4109
4110    if let Some(track) = track {
4111        if !track.trim().is_empty() {
4112            segments.push(format!("track: {}", track.trim()));
4113        }
4114    }
4115
4116    if !tags.is_empty() {
4117        segments.push(format!("tags: {}", tags.join(" ")));
4118    }
4119
4120    if !labels.is_empty() {
4121        segments.push(format!("labels: {}", labels.join(" ")));
4122    }
4123
4124    if !extra_metadata.is_empty() {
4125        for (key, value) in extra_metadata {
4126            if value.trim().is_empty() {
4127                continue;
4128            }
4129            segments.push(format!("{key}: {value}"));
4130        }
4131    }
4132
4133    if !content_dates.is_empty() {
4134        segments.push(format!("dates: {}", content_dates.join(" ")));
4135    }
4136
4137    if let Some(meta) = metadata {
4138        if let Ok(meta_json) = serde_json::to_string(meta) {
4139            segments.push(format!("metadata: {meta_json}"));
4140        }
4141    }
4142
4143    if segments.is_empty() {
4144        None
4145    } else {
4146        Some(segments.join("\n"))
4147    }
4148}
4149
4150pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4151    if additions.is_empty() {
4152        return;
4153    }
4154    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4155    for value in additions {
4156        let trimmed = value.trim();
4157        if trimmed.is_empty() {
4158            continue;
4159        }
4160        let candidate = trimmed.to_string();
4161        if seen.insert(candidate.clone()) {
4162            target.push(candidate);
4163        }
4164    }
4165}