Skip to main content

memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58    PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66    TemporalResolutionValue,
67};
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    #[must_use]
210    pub fn new(mode: CommitMode) -> Self {
211        Self {
212            mode,
213            background: false,
214        }
215    }
216
217    #[must_use]
218    pub fn background(mut self, background: bool) -> Self {
219        self.background = background;
220        self
221    }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226    REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230    mime: Option<&str>,
231    magic: Option<&[u8]>,
232    uri: Option<&str>,
233) -> Option<DocumentFormat> {
234    // Check PDF magic bytes first
235    if detect_pdf_magic(magic) {
236        return Some(DocumentFormat::Pdf);
237    }
238
239    // For ZIP-based OOXML formats (DOCX, XLSX, PPTX), magic bytes are just ZIP header
240    // so we need to check file extension to distinguish them
241    if let Some(magic_bytes) = magic {
242        if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243            // It's a ZIP file - check extension to determine OOXML type
244            if let Some(format) = infer_format_from_extension(uri) {
245                return Some(format);
246            }
247        }
248    }
249
250    // Try MIME type
251    if let Some(mime_str) = mime {
252        let mime_lower = mime_str.trim().to_ascii_lowercase();
253        let format = match mime_lower.as_str() {
254            "application/pdf" => Some(DocumentFormat::Pdf),
255            "text/plain" => Some(DocumentFormat::PlainText),
256            "text/markdown" => Some(DocumentFormat::Markdown),
257            "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258            "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259                Some(DocumentFormat::Docx)
260            }
261            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262                Some(DocumentFormat::Xlsx)
263            }
264            "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265            "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266                Some(DocumentFormat::Pptx)
267            }
268            other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269            _ => None,
270        };
271        if format.is_some() {
272            return format;
273        }
274    }
275
276    // Fall back to extension-based detection
277    infer_format_from_extension(uri)
278}
279
280/// Infer document format from file extension in URI/path
281fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282    let uri = uri?;
283    let path = std::path::Path::new(uri);
284    let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285    match ext.as_str() {
286        "pdf" => Some(DocumentFormat::Pdf),
287        "docx" => Some(DocumentFormat::Docx),
288        "xlsx" => Some(DocumentFormat::Xlsx),
289        "xls" => Some(DocumentFormat::Xls),
290        "pptx" => Some(DocumentFormat::Pptx),
291        "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292        | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293        | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294        | "sql" => Some(DocumentFormat::PlainText),
295        "md" | "markdown" => Some(DocumentFormat::Markdown),
296        "html" | "htm" => Some(DocumentFormat::Html),
297        _ => None,
298    }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302    let mut slice = match magic {
303        Some(slice) if !slice.is_empty() => slice,
304        _ => return false,
305    };
306    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307        slice = &slice[3..];
308    }
309    while let Some((first, rest)) = slice.split_first() {
310        if first.is_ascii_whitespace() {
311            slice = rest;
312        } else {
313            break;
314        }
315    }
316    slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320    target = "memvid::extract",
321    skip_all,
322    fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325    bytes: &[u8],
326    mime_hint: Option<&str>,
327    uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329    let registry = default_reader_registry();
330    let magic = bytes
331        .get(..MAGIC_SNIFF_BYTES)
332        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334        .with_uri(uri)
335        .with_magic(magic);
336
337    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338        let start = Instant::now();
339        match reader.extract(bytes, &hint) {
340            Ok(output) => {
341                return Ok(finalize_reader_output(output, start));
342            }
343            Err(err) => {
344                tracing::error!(
345                    target = "memvid::extract",
346                    reader = reader.name(),
347                    error = %err,
348                    "reader failed; falling back"
349                );
350                Some(format!("reader {} failed: {err}", reader.name()))
351            }
352        }
353    } else {
354        tracing::debug!(
355            target = "memvid::extract",
356            format = hint.format.map(super::super::reader::DocumentFormat::label),
357            "no reader matched; using default extractor"
358        );
359        Some("no registered reader matched; using default extractor".to_string())
360    };
361
362    let start = Instant::now();
363    let mut output = PassthroughReader.extract(bytes, &hint)?;
364    if let Some(reason) = fallback_reason {
365        output.diagnostics.track_warning(reason);
366    }
367    Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371    let elapsed = start.elapsed();
372    let ReaderOutput {
373        document,
374        reader_name,
375        diagnostics,
376    } = output;
377    log_reader_result(&reader_name, &diagnostics, elapsed);
378    document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382    let duration_ms = diagnostics
383        .duration_ms
384        .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385    let warnings = diagnostics.warnings.len();
386    let pages = diagnostics.pages_processed;
387
388    if warnings > 0 || diagnostics.fallback {
389        tracing::warn!(
390            target = "memvid::extract",
391            reader,
392            duration_ms,
393            pages,
394            warnings,
395            fallback = diagnostics.fallback,
396            "extraction completed with warnings"
397        );
398        for warning in &diagnostics.warnings {
399            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400        }
401    } else {
402        tracing::info!(
403            target = "memvid::extract",
404            reader,
405            duration_ms,
406            pages,
407            "extraction completed"
408        );
409    }
410}
411
412impl Memvid {
413    // -- Public ingestion entrypoints ---------------------------------------------------------
414
415    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416    where
417        F: FnOnce(&mut Self) -> Result<()>,
418    {
419        self.file.sync_all()?;
420        let mut staging = CommitStaging::prepare(self.path())?;
421        staging.copy_from(&self.file)?;
422
423        let staging_handle = staging.clone_file()?;
424        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425        let original_file = std::mem::replace(&mut self.file, staging_handle);
426        let original_wal = std::mem::replace(&mut self.wal, new_wal);
427        let original_header = self.header.clone();
428        let original_toc = self.toc.clone();
429        let original_data_end = self.data_end;
430        let original_generation = self.generation;
431        let original_dirty = self.dirty;
432        #[cfg(feature = "lex")]
433        let original_tantivy_dirty = self.tantivy_dirty;
434
435        let destination_path = self.path().to_path_buf();
436        let mut original_file = Some(original_file);
437        let mut original_wal = Some(original_wal);
438
439        match op(self) {
440            Ok(()) => {
441                self.file.sync_all()?;
442                match staging.commit() {
443                    Ok(()) => {
444                        drop(original_file.take());
445                        drop(original_wal.take());
446                        self.file = OpenOptions::new()
447                            .read(true)
448                            .write(true)
449                            .open(&destination_path)?;
450                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451                        Ok(())
452                    }
453                    Err(commit_err) => {
454                        if let Some(file) = original_file.take() {
455                            self.file = file;
456                        }
457                        if let Some(wal) = original_wal.take() {
458                            self.wal = wal;
459                        }
460                        self.header = original_header;
461                        self.toc = original_toc;
462                        self.data_end = original_data_end;
463                        self.generation = original_generation;
464                        self.dirty = original_dirty;
465                        #[cfg(feature = "lex")]
466                        {
467                            self.tantivy_dirty = original_tantivy_dirty;
468                        }
469                        Err(commit_err)
470                    }
471                }
472            }
473            Err(err) => {
474                let _ = staging.discard();
475                if let Some(file) = original_file.take() {
476                    self.file = file;
477                }
478                if let Some(wal) = original_wal.take() {
479                    self.wal = wal;
480                }
481                self.header = original_header;
482                self.toc = original_toc;
483                self.data_end = original_data_end;
484                self.generation = original_generation;
485                self.dirty = original_dirty;
486                #[cfg(feature = "lex")]
487                {
488                    self.tantivy_dirty = original_tantivy_dirty;
489                }
490                Err(err)
491            }
492        }
493    }
494
495    pub(crate) fn catalog_data_end(&self) -> u64 {
496        let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498        for descriptor in &self.toc.segment_catalog.lex_segments {
499            if descriptor.common.bytes_length == 0 {
500                continue;
501            }
502            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503        }
504
505        for descriptor in &self.toc.segment_catalog.vec_segments {
506            if descriptor.common.bytes_length == 0 {
507                continue;
508            }
509            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510        }
511
512        for descriptor in &self.toc.segment_catalog.time_segments {
513            if descriptor.common.bytes_length == 0 {
514                continue;
515            }
516            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517        }
518
519        #[cfg(feature = "temporal_track")]
520        for descriptor in &self.toc.segment_catalog.temporal_segments {
521            if descriptor.common.bytes_length == 0 {
522                continue;
523            }
524            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525        }
526
527        #[cfg(feature = "lex")]
528        for descriptor in &self.toc.segment_catalog.tantivy_segments {
529            if descriptor.common.bytes_length == 0 {
530                continue;
531            }
532            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533        }
534
535        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536            if manifest.bytes_length != 0 {
537                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538            }
539        }
540
541        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542            if manifest.bytes_length != 0 {
543                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544            }
545        }
546
547        if let Some(manifest) = self.toc.time_index.as_ref() {
548            if manifest.bytes_length != 0 {
549                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550            }
551        }
552
553        #[cfg(feature = "temporal_track")]
554        if let Some(track) = self.toc.temporal_track.as_ref() {
555            if track.bytes_length != 0 {
556                max_end = max_end.max(track.bytes_offset + track.bytes_length);
557            }
558        }
559
560        max_end
561    }
562
563    fn payload_region_end(&self) -> u64 {
564        self.cached_payload_end
565    }
566
567    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
568        loop {
569            match self.wal.append_entry(payload) {
570                Ok(seq) => return Ok(seq),
571                Err(MemvidError::CheckpointFailed { reason })
572                    if reason == "embedded WAL region too small for entry"
573                        || reason == "embedded WAL region full" =>
574                {
575                    // WAL is either too small for this entry or full with pending entries.
576                    // Grow the WAL to accommodate - doubling ensures we have space.
577                    let required = WAL_ENTRY_HEADER_SIZE
578                        .saturating_add(payload.len() as u64)
579                        .max(self.header.wal_size + 1);
580                    self.grow_wal_region(required)?;
581                }
582                Err(err) => return Err(err),
583            }
584        }
585    }
586
587    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
588        let mut new_size = self.header.wal_size;
589        let mut target = required_entry_size;
590        if target == 0 {
591            target = self.header.wal_size;
592        }
593        while new_size <= target {
594            new_size = new_size
595                .checked_mul(2)
596                .ok_or_else(|| MemvidError::CheckpointFailed {
597                    reason: "wal_size overflow".into(),
598                })?;
599        }
600        let delta = new_size - self.header.wal_size;
601        if delta == 0 {
602            return Ok(());
603        }
604
605        self.shift_data_for_wal_growth(delta)?;
606        self.header.wal_size = new_size;
607        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
608        self.data_end = self.data_end.saturating_add(delta);
609        self.adjust_offsets_after_wal_growth(delta);
610
611        let catalog_end = self.catalog_data_end();
612        self.header.footer_offset = catalog_end
613            .max(self.header.footer_offset)
614            .max(self.data_end);
615
616        self.rewrite_toc_footer()?;
617        self.header.toc_checksum = self.toc.toc_checksum;
618        crate::persist_header(&mut self.file, &self.header)?;
619        self.file.sync_all()?;
620        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
621        Ok(())
622    }
623
624    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
625        if delta == 0 {
626            return Ok(());
627        }
628        let original_len = self.file.metadata()?.len();
629        let data_start = self.header.wal_offset + self.header.wal_size;
630        self.file.set_len(original_len + delta)?;
631
632        let mut remaining = original_len.saturating_sub(data_start);
633        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
634        while remaining > 0 {
635            let chunk = min(remaining, buffer.len() as u64);
636            let src = data_start + remaining - chunk;
637            self.file.seek(SeekFrom::Start(src))?;
638            #[allow(clippy::cast_possible_truncation)]
639            self.file.read_exact(&mut buffer[..chunk as usize])?;
640            let dst = src + delta;
641            self.file.seek(SeekFrom::Start(dst))?;
642            #[allow(clippy::cast_possible_truncation)]
643            self.file.write_all(&buffer[..chunk as usize])?;
644            remaining -= chunk;
645        }
646
647        self.file.seek(SeekFrom::Start(data_start))?;
648        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
649        let mut remaining = delta;
650        while remaining > 0 {
651            let write = min(remaining, zero_buf.len() as u64);
652            #[allow(clippy::cast_possible_truncation)]
653            self.file.write_all(&zero_buf[..write as usize])?;
654            remaining -= write;
655        }
656        Ok(())
657    }
658
659    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
660        if delta == 0 {
661            return;
662        }
663
664        for frame in &mut self.toc.frames {
665            if frame.payload_offset != 0 {
666                frame.payload_offset += delta;
667            }
668        }
669
670        for segment in &mut self.toc.segments {
671            if segment.bytes_offset != 0 {
672                segment.bytes_offset += delta;
673            }
674        }
675
676        if let Some(lex) = self.toc.indexes.lex.as_mut() {
677            if lex.bytes_offset != 0 {
678                lex.bytes_offset += delta;
679            }
680        }
681        for manifest in &mut self.toc.indexes.lex_segments {
682            if manifest.bytes_offset != 0 {
683                manifest.bytes_offset += delta;
684            }
685        }
686        if let Some(vec) = self.toc.indexes.vec.as_mut() {
687            if vec.bytes_offset != 0 {
688                vec.bytes_offset += delta;
689            }
690        }
691        if let Some(time_index) = self.toc.time_index.as_mut() {
692            if time_index.bytes_offset != 0 {
693                time_index.bytes_offset += delta;
694            }
695        }
696        #[cfg(feature = "temporal_track")]
697        if let Some(track) = self.toc.temporal_track.as_mut() {
698            if track.bytes_offset != 0 {
699                track.bytes_offset += delta;
700            }
701        }
702
703        let catalog = &mut self.toc.segment_catalog;
704        for descriptor in &mut catalog.lex_segments {
705            if descriptor.common.bytes_offset != 0 {
706                descriptor.common.bytes_offset += delta;
707            }
708        }
709        for descriptor in &mut catalog.vec_segments {
710            if descriptor.common.bytes_offset != 0 {
711                descriptor.common.bytes_offset += delta;
712            }
713        }
714        for descriptor in &mut catalog.time_segments {
715            if descriptor.common.bytes_offset != 0 {
716                descriptor.common.bytes_offset += delta;
717            }
718        }
719        #[cfg(feature = "temporal_track")]
720        for descriptor in &mut catalog.temporal_segments {
721            if descriptor.common.bytes_offset != 0 {
722                descriptor.common.bytes_offset += delta;
723            }
724        }
725        for descriptor in &mut catalog.tantivy_segments {
726            if descriptor.common.bytes_offset != 0 {
727                descriptor.common.bytes_offset += delta;
728            }
729        }
730
731        #[cfg(feature = "lex")]
732        if let Ok(mut storage) = self.lex_storage.write() {
733            storage.adjust_offsets(delta);
734        }
735    }
736    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
737        self.ensure_writable()?;
738        if options.background {
739            tracing::debug!("commit background flag ignored; running synchronously");
740        }
741        let mode = options.mode;
742        let records = self.wal.pending_records()?;
743        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
744            return Ok(());
745        }
746        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
747    }
748
749    pub fn commit(&mut self) -> Result<()> {
750        self.ensure_writable()?;
751        self.commit_with_options(CommitOptions::new(CommitMode::Full))
752    }
753
754    /// Enter batch mode for high-throughput ingestion.
755    ///
756    /// While batch mode is active:
757    /// - WAL fsync is skipped on every append (controlled by `opts.skip_sync`)
758    /// - Auto-checkpoint is suppressed (controlled by `opts.disable_auto_checkpoint`)
759    /// - Compression level is lowered (controlled by `opts.compression_level`)
760    /// - WAL is pre-sized to avoid expensive mid-batch growth (controlled by `opts.wal_pre_size_bytes`)
761    ///
762    /// **You must call [`end_batch()`](Self::end_batch) when done** to flush the WAL
763    /// and restore normal operation.
764    pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
765        if opts.wal_pre_size_bytes > 0 {
766            self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
767        }
768        self.wal.set_skip_sync(opts.skip_sync);
769        self.batch_opts = Some(opts);
770        Ok(())
771    }
772
773    /// Pre-grow the embedded WAL to at least `min_bytes` in a single operation.
774    ///
775    /// When `disable_auto_checkpoint` is true, all WAL entries accumulate for
776    /// the entire batch.  If the region is too small it must grow repeatedly,
777    /// and every growth shifts **all** payload data — O(file_size) per shift.
778    ///
779    /// Calling this once before the batch eliminates all mid-batch shifts.
780    /// On a freshly-created file the shift is essentially free (no data to move).
781    fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
782        if min_bytes <= self.header.wal_size {
783            return Ok(());
784        }
785        // Jump directly to the target size (next power of two for alignment)
786        let target = min_bytes.next_power_of_two();
787        let delta = target.saturating_sub(self.header.wal_size);
788        if delta == 0 {
789            return Ok(());
790        }
791
792        tracing::info!(
793            current_wal = self.header.wal_size,
794            target_wal = target,
795            delta,
796            "pre-sizing WAL for batch mode"
797        );
798
799        self.shift_data_for_wal_growth(delta)?;
800        self.header.wal_size = target;
801        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
802        self.data_end = self.data_end.saturating_add(delta);
803        self.adjust_offsets_after_wal_growth(delta);
804
805        let catalog_end = self.catalog_data_end();
806        self.header.footer_offset = catalog_end
807            .max(self.header.footer_offset)
808            .max(self.data_end);
809
810        self.rewrite_toc_footer()?;
811        self.header.toc_checksum = self.toc.toc_checksum;
812        crate::persist_header(&mut self.file, &self.header)?;
813        self.file.sync_all()?;
814        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
815        Ok(())
816    }
817
818    /// Exit batch mode, flushing the WAL and restoring per-entry fsync.
819    ///
820    /// This performs a single `fsync` for all appends accumulated during the batch,
821    /// then clears batch options so subsequent puts use default behaviour.
822    pub fn end_batch(&mut self) -> Result<()> {
823        // Single fsync for the entire batch
824        self.wal.flush()?;
825        self.wal.set_skip_sync(false);
826        self.batch_opts = None;
827        Ok(())
828    }
829
830    /// Commit pending WAL records without rebuilding any indexes.
831    ///
832    /// Optimized for bulk ingestion: payloads and frame metadata are persisted,
833    /// but time index, Tantivy, vec index, and other index structures are NOT
834    /// rebuilt. The caller must do a full `commit()` (which triggers
835    /// `rebuild_indexes()`) after all batches are written.
836    ///
837    /// Skips the staging lock for performance — not crash-safe.
838    pub fn commit_skip_indexes(&mut self) -> Result<()> {
839        self.ensure_writable()?;
840        let records = self.wal.pending_records()?;
841        if records.is_empty() && !self.dirty {
842            return Ok(());
843        }
844        self.commit_skip_indexes_inner(records)
845    }
846
847    fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
848        self.generation = self.generation.wrapping_add(1);
849
850        // Temporarily remove Tantivy engine to avoid per-frame indexing work
851        // and disk reads in apply_records(). We won't persist Tantivy state anyway.
852        #[cfg(feature = "lex")]
853        let tantivy_backup = self.tantivy.take();
854
855        let result = self.apply_records(records);
856
857        // Restore Tantivy engine (unchanged — no dirty frames added)
858        #[cfg(feature = "lex")]
859        {
860            self.tantivy = tantivy_backup;
861            self.tantivy_dirty = false;
862        }
863
864        let _delta = result?;
865
866        // Set footer_offset to right after payloads (no index data written)
867        self.header.footer_offset = self.data_end;
868
869        // Clear stale index manifests so next open() doesn't try to load
870        // garbage data. Indexes will be rebuilt in the finalize step.
871        self.toc.time_index = None;
872        self.toc.indexes.lex_segments.clear();
873        // Preserve vec manifest (holds dimension info) but zero out data pointers
874        if let Some(vec) = self.toc.indexes.vec.as_mut() {
875            vec.bytes_offset = 0;
876            vec.bytes_length = 0;
877            vec.vector_count = 0;
878        }
879        self.toc.indexes.lex = None;
880        self.toc.indexes.clip = None;
881        self.toc.segment_catalog.lex_segments.clear();
882        self.toc.segment_catalog.vec_segments.clear();
883        self.toc.segment_catalog.time_segments.clear();
884        self.toc.segment_catalog.tantivy_segments.clear();
885        #[cfg(feature = "temporal_track")]
886        {
887            self.toc.temporal_track = None;
888            self.toc.segment_catalog.temporal_segments.clear();
889        }
890        self.toc.memories_track = None;
891        self.toc.logic_mesh = None;
892        self.toc.sketch_track = None;
893
894        self.rewrite_toc_footer()?;
895        self.header.toc_checksum = self.toc.toc_checksum;
896        self.wal.record_checkpoint(&mut self.header)?;
897        self.header.toc_checksum = self.toc.toc_checksum;
898        crate::persist_header(&mut self.file, &self.header)?;
899        self.file.sync_all()?;
900        self.pending_frame_inserts = 0;
901        self.dirty = false;
902        Ok(())
903    }
904
905    /// Rebuild all indexes (time, Tantivy, vec) and persist the TOC.
906    ///
907    /// Use after bulk ingestion with `commit_skip_indexes()` to build
908    /// all search indexes in one O(n) pass. This is the complement to
909    /// `commit_skip_indexes()` — call it once after all batches are done.
910    pub fn finalize_indexes(&mut self) -> Result<()> {
911        self.ensure_writable()?;
912        self.rebuild_indexes(&[], &[])?;
913        self.rewrite_toc_footer()?;
914        self.header.toc_checksum = self.toc.toc_checksum;
915        crate::persist_header(&mut self.file, &self.header)?;
916        self.file.sync_all()?;
917        Ok(())
918    }
919
920    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
921        self.generation = self.generation.wrapping_add(1);
922
923        let delta = self.apply_records(records)?;
924        let mut indexes_rebuilt = false;
925
926        // Check if CLIP index has pending embeddings that need to be persisted
927        let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
928
929        if !delta.is_empty() || clip_needs_persist {
930            tracing::debug!(
931                inserted_frames = delta.inserted_frames.len(),
932                inserted_embeddings = delta.inserted_embeddings.len(),
933                inserted_time_entries = delta.inserted_time_entries.len(),
934                clip_needs_persist = clip_needs_persist,
935                "commit applied delta"
936            );
937            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
938            indexes_rebuilt = true;
939        }
940
941        if !indexes_rebuilt && self.tantivy_index_pending() {
942            self.flush_tantivy()?;
943        }
944
945        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
946        if !indexes_rebuilt && self.clip_enabled {
947            if let Some(ref clip_index) = self.clip_index {
948                if !clip_index.is_empty() {
949                    self.persist_clip_index()?;
950                }
951            }
952        }
953
954        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
955        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
956            self.persist_memories_track()?;
957        }
958
959        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
960        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
961            self.persist_logic_mesh()?;
962        }
963
964        // Persist sketch track if it has entries
965        if !self.sketch_track.is_empty() {
966            self.persist_sketch_track()?;
967        }
968
969        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
970        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
971
972        self.rewrite_toc_footer()?;
973        self.header.toc_checksum = self.toc.toc_checksum;
974        self.wal.record_checkpoint(&mut self.header)?;
975        self.header.toc_checksum = self.toc.toc_checksum;
976        crate::persist_header(&mut self.file, &self.header)?;
977        self.file.sync_all()?;
978        #[cfg(feature = "parallel_segments")]
979        if let Some(wal) = self.manifest_wal.as_mut() {
980            wal.flush()?;
981            wal.truncate()?;
982        }
983        self.pending_frame_inserts = 0;
984        self.dirty = false;
985        Ok(())
986    }
987
988    #[cfg(feature = "parallel_segments")]
989    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
990        self.ensure_writable()?;
991        if !self.dirty && !self.tantivy_index_pending() {
992            return Ok(());
993        }
994        let opts = opts.clone();
995        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
996    }
997
998    #[cfg(feature = "parallel_segments")]
999    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1000        if !self.dirty && !self.tantivy_index_pending() {
1001            return Ok(());
1002        }
1003        let records = self.wal.pending_records()?;
1004        let delta = self.apply_records(records)?;
1005        self.generation = self.generation.wrapping_add(1);
1006        let mut indexes_rebuilt = false;
1007        if !delta.is_empty() {
1008            tracing::info!(
1009                inserted_frames = delta.inserted_frames.len(),
1010                inserted_embeddings = delta.inserted_embeddings.len(),
1011                inserted_time_entries = delta.inserted_time_entries.len(),
1012                "parallel commit applied delta"
1013            );
1014            // Try to use parallel segment builder first
1015            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1016            tracing::info!(
1017                "parallel_commit: used_parallel={}, lex_enabled={}",
1018                used_parallel,
1019                self.lex_enabled
1020            );
1021            if used_parallel {
1022                // Segments were written at data_end; update footer_offset so
1023                // rewrite_toc_footer places the TOC after the new segment data
1024                self.header.footer_offset = self.data_end;
1025                indexes_rebuilt = true;
1026
1027                // OPTIMIZATION: Use incremental Tantivy indexing instead of full rebuild.
1028                // Only add new frames from the delta, not all frames.
1029                #[cfg(feature = "lex")]
1030                if self.lex_enabled {
1031                    tracing::info!(
1032                        "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1033                        delta.inserted_frames.len(),
1034                        self.toc.frames.len()
1035                    );
1036
1037                    // If Tantivy engine already exists, frames were already indexed during put()
1038                    // (at the add_frame call in put_bytes_with_options). Skip re-indexing to avoid
1039                    // duplicates. Only initialize and index if Tantivy wasn't present during put.
1040                    let tantivy_was_present = self.tantivy.is_some();
1041                    if self.tantivy.is_none() {
1042                        self.init_tantivy()?;
1043                    }
1044
1045                    // Skip indexing if Tantivy was already present - frames were indexed during put
1046                    if tantivy_was_present {
1047                        tracing::info!(
1048                            "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1049                        );
1050                    } else {
1051                        // First, collect all frames and their search text (to avoid borrow conflicts)
1052                        let max_payload = crate::memvid::search::max_index_payload();
1053                        let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1054
1055                        for frame_id in &delta.inserted_frames {
1056                            // Look up the actual Frame from the TOC
1057                            let frame = match self.toc.frames.get(*frame_id as usize) {
1058                                Some(f) => f.clone(),
1059                                None => continue,
1060                            };
1061
1062                            // Check if frame has explicit search_text first - clone it for ownership
1063                            let explicit_text = frame.search_text.clone();
1064                            if let Some(ref search_text) = explicit_text {
1065                                if !search_text.trim().is_empty() {
1066                                    prepared_docs.push((frame, search_text.clone()));
1067                                    continue;
1068                                }
1069                            }
1070
1071                            // Get MIME type and check if text-indexable
1072                            let mime = frame
1073                                .metadata
1074                                .as_ref()
1075                                .and_then(|m| m.mime.as_deref())
1076                                .unwrap_or("application/octet-stream");
1077
1078                            if !crate::memvid::search::is_text_indexable_mime(mime) {
1079                                continue;
1080                            }
1081
1082                            if frame.payload_length > max_payload {
1083                                continue;
1084                            }
1085
1086                            let text = self.frame_search_text(&frame)?;
1087                            if !text.trim().is_empty() {
1088                                prepared_docs.push((frame, text));
1089                            }
1090                        }
1091
1092                        // Now add to Tantivy engine (no borrow conflict)
1093                        if let Some(ref mut engine) = self.tantivy {
1094                            for (frame, text) in &prepared_docs {
1095                                engine.add_frame(frame, text)?;
1096                            }
1097
1098                            if !prepared_docs.is_empty() {
1099                                engine.commit()?;
1100                                self.tantivy_dirty = true;
1101                            }
1102
1103                            tracing::info!(
1104                                "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1105                                prepared_docs.len(),
1106                                engine.num_docs()
1107                            );
1108                        } else {
1109                            tracing::warn!(
1110                                "parallel_commit: Tantivy engine is None after init_tantivy"
1111                            );
1112                        }
1113                    } // end of else !tantivy_was_present
1114                }
1115
1116                // Time index stores all entries together for timeline queries.
1117                // Unlike Tantivy which is incremental, time index needs full rebuild.
1118                self.file.seek(SeekFrom::Start(self.data_end))?;
1119                let mut time_entries: Vec<TimeIndexEntry> = self
1120                    .toc
1121                    .frames
1122                    .iter()
1123                    .filter(|frame| {
1124                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1125                    })
1126                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1127                    .collect();
1128                let (ti_offset, ti_length, ti_checksum) =
1129                    time_index_append(&mut self.file, &mut time_entries)?;
1130                self.toc.time_index = Some(TimeIndexManifest {
1131                    bytes_offset: ti_offset,
1132                    bytes_length: ti_length,
1133                    entry_count: time_entries.len() as u64,
1134                    checksum: ti_checksum,
1135                });
1136                // Update data_end to account for the newly written time index
1137                self.data_end = ti_offset + ti_length;
1138                self.header.footer_offset = self.data_end;
1139                tracing::info!(
1140                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1141                    ti_offset,
1142                    ti_length,
1143                    time_entries.len()
1144                );
1145            } else {
1146                // Fall back to sequential rebuild if no segments were generated
1147                self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1148                indexes_rebuilt = true;
1149            }
1150        }
1151
1152        // Flush Tantivy index if dirty (from parallel path or pending updates)
1153        #[cfg(feature = "lex")]
1154        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1155            self.flush_tantivy()?;
1156        }
1157
1158        // Persist CLIP index if it has embeddings
1159        if self.clip_enabled {
1160            if let Some(ref clip_index) = self.clip_index {
1161                if !clip_index.is_empty() {
1162                    self.persist_clip_index()?;
1163                }
1164            }
1165        }
1166
1167        // Persist memories track if it has cards
1168        if self.memories_track.card_count() > 0 {
1169            self.persist_memories_track()?;
1170        }
1171
1172        // Persist logic mesh if it has nodes
1173        if !self.logic_mesh.is_empty() {
1174            self.persist_logic_mesh()?;
1175        }
1176
1177        // Persist sketch track if it has entries
1178        if !self.sketch_track.is_empty() {
1179            self.persist_sketch_track()?;
1180        }
1181
1182        // flush_tantivy() has already set footer_offset correctly
1183        // DO NOT overwrite with catalog_data_end()
1184        self.rewrite_toc_footer()?;
1185        self.header.toc_checksum = self.toc.toc_checksum;
1186        self.wal.record_checkpoint(&mut self.header)?;
1187        self.header.toc_checksum = self.toc.toc_checksum;
1188        crate::persist_header(&mut self.file, &self.header)?;
1189        self.file.sync_all()?;
1190        if let Some(wal) = self.manifest_wal.as_mut() {
1191            wal.flush()?;
1192            wal.truncate()?;
1193        }
1194        self.pending_frame_inserts = 0;
1195        self.dirty = false;
1196        Ok(())
1197    }
1198
1199    pub(crate) fn recover_wal(&mut self) -> Result<()> {
1200        let records = self.wal.records_after(self.header.wal_sequence)?;
1201        if records.is_empty() {
1202            if self.tantivy_index_pending() {
1203                self.flush_tantivy()?;
1204            }
1205            return Ok(());
1206        }
1207        let delta = self.apply_records(records)?;
1208        if !delta.is_empty() {
1209            tracing::debug!(
1210                inserted_frames = delta.inserted_frames.len(),
1211                inserted_embeddings = delta.inserted_embeddings.len(),
1212                inserted_time_entries = delta.inserted_time_entries.len(),
1213                "recover applied delta"
1214            );
1215            self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1216        } else if self.tantivy_index_pending() {
1217            self.flush_tantivy()?;
1218        }
1219        self.wal.record_checkpoint(&mut self.header)?;
1220        crate::persist_header(&mut self.file, &self.header)?;
1221        if !delta.is_empty() {
1222            // rebuild_indexes already flushed Tantivy, so nothing further to do.
1223        } else if self.tantivy_index_pending() {
1224            self.flush_tantivy()?;
1225            crate::persist_header(&mut self.file, &self.header)?;
1226        }
1227        self.file.sync_all()?;
1228        self.pending_frame_inserts = 0;
1229        self.dirty = false;
1230        Ok(())
1231    }
1232
1233    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1234        let mut delta = IngestionDelta::default();
1235        if records.is_empty() {
1236            return Ok(delta);
1237        }
1238
1239        // Use data_end instead of payload_region_end to avoid overwriting
1240        // vec/lex/time segments that were written after the payload region.
1241        // payload_region_end() only considers frame payloads, but data_end tracks
1242        // all data including index segments.
1243        let mut data_cursor = self.data_end;
1244        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1245
1246        if !records.is_empty() {
1247            self.file.seek(SeekFrom::Start(data_cursor))?;
1248            for record in records {
1249                let mut entry = match decode_wal_entry(&record.payload)? {
1250                    WalEntry::Frame(entry) => entry,
1251                    #[cfg(feature = "lex")]
1252                    WalEntry::Lex(batch) => {
1253                        self.apply_lex_wal(batch)?;
1254                        continue;
1255                    }
1256                };
1257
1258                match entry.op {
1259                    FrameWalOp::Insert => {
1260                        let frame_id = self.toc.frames.len() as u64;
1261
1262                        let (
1263                            payload_offset,
1264                            payload_length,
1265                            checksum_bytes,
1266                            canonical_length_value,
1267                        ) = if let Some(source_id) = entry.reuse_payload_from {
1268                            if !entry.payload.is_empty() {
1269                                return Err(MemvidError::InvalidFrame {
1270                                    frame_id: source_id,
1271                                    reason: "reused payload entry contained inline bytes",
1272                                });
1273                            }
1274                            let source_idx = usize::try_from(source_id).map_err(|_| {
1275                                MemvidError::InvalidFrame {
1276                                    frame_id: source_id,
1277                                    reason: "frame id too large for memory",
1278                                }
1279                            })?;
1280                            let source = self.toc.frames.get(source_idx).cloned().ok_or(
1281                                MemvidError::InvalidFrame {
1282                                    frame_id: source_id,
1283                                    reason: "reused payload source missing",
1284                                },
1285                            )?;
1286                            (
1287                                source.payload_offset,
1288                                source.payload_length,
1289                                source.checksum,
1290                                entry
1291                                    .canonical_length
1292                                    .or(source.canonical_length)
1293                                    .unwrap_or(source.payload_length),
1294                            )
1295                        } else {
1296                            self.file.seek(SeekFrom::Start(data_cursor))?;
1297                            self.file.write_all(&entry.payload)?;
1298                            let checksum = hash(&entry.payload);
1299                            let payload_length = entry.payload.len() as u64;
1300                            let canonical_length =
1301                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1302                                    if let Some(len) = entry.canonical_length {
1303                                        len
1304                                    } else {
1305                                        let decoded = crate::decode_canonical_bytes(
1306                                            &entry.payload,
1307                                            CanonicalEncoding::Zstd,
1308                                            frame_id,
1309                                        )?;
1310                                        decoded.len() as u64
1311                                    }
1312                                } else {
1313                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1314                                };
1315                            let payload_offset = data_cursor;
1316                            data_cursor += payload_length;
1317                            // Keep cached_payload_end in sync (monotonically increasing)
1318                            self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1319                            (
1320                                payload_offset,
1321                                payload_length,
1322                                *checksum.as_bytes(),
1323                                canonical_length,
1324                            )
1325                        };
1326
1327                        let uri = entry
1328                            .uri
1329                            .clone()
1330                            .unwrap_or_else(|| crate::default_uri(frame_id));
1331                        let title = entry
1332                            .title
1333                            .clone()
1334                            .or_else(|| crate::infer_title_from_uri(&uri));
1335
1336                        #[cfg(feature = "temporal_track")]
1337                        let (anchor_ts, anchor_source) =
1338                            self.determine_temporal_anchor(entry.timestamp);
1339
1340                        let mut frame = Frame {
1341                            id: frame_id,
1342                            timestamp: entry.timestamp,
1343                            anchor_ts: {
1344                                #[cfg(feature = "temporal_track")]
1345                                {
1346                                    Some(anchor_ts)
1347                                }
1348                                #[cfg(not(feature = "temporal_track"))]
1349                                {
1350                                    None
1351                                }
1352                            },
1353                            anchor_source: {
1354                                #[cfg(feature = "temporal_track")]
1355                                {
1356                                    Some(anchor_source)
1357                                }
1358                                #[cfg(not(feature = "temporal_track"))]
1359                                {
1360                                    None
1361                                }
1362                            },
1363                            kind: entry.kind.clone(),
1364                            track: entry.track.clone(),
1365                            payload_offset,
1366                            payload_length,
1367                            checksum: checksum_bytes,
1368                            uri: Some(uri),
1369                            title,
1370                            canonical_encoding: entry.canonical_encoding,
1371                            canonical_length: Some(canonical_length_value),
1372                            metadata: entry.metadata.clone(),
1373                            search_text: entry.search_text.clone(),
1374                            tags: entry.tags.clone(),
1375                            labels: entry.labels.clone(),
1376                            extra_metadata: entry.extra_metadata.clone(),
1377                            content_dates: entry.content_dates.clone(),
1378                            chunk_manifest: entry.chunk_manifest.clone(),
1379                            role: entry.role,
1380                            parent_id: None,
1381                            chunk_index: entry.chunk_index,
1382                            chunk_count: entry.chunk_count,
1383                            status: FrameStatus::Active,
1384                            supersedes: entry.supersedes_frame_id,
1385                            superseded_by: None,
1386                            source_sha256: entry.source_sha256,
1387                            source_path: entry.source_path.clone(),
1388                            enrichment_state: entry.enrichment_state,
1389                        };
1390
1391                        if let Some(parent_seq) = entry.parent_sequence {
1392                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1393                                frame.parent_id = Some(*parent_frame_id);
1394                            } else {
1395                                // Parent sequence not found in current batch - this can happen
1396                                // if parent was committed in a previous batch. Try to find parent
1397                                // by looking at recently inserted frames with matching characteristics.
1398                                // The parent should be the most recent Document frame that has
1399                                // a chunk_manifest matching this chunk's expected parent.
1400                                if entry.role == FrameRole::DocumentChunk {
1401                                    // Look backwards through recently inserted frames
1402                                    for &candidate_id in delta.inserted_frames.iter().rev() {
1403                                        if let Ok(idx) = usize::try_from(candidate_id) {
1404                                            if let Some(candidate) = self.toc.frames.get(idx) {
1405                                                if candidate.role == FrameRole::Document
1406                                                    && candidate.chunk_manifest.is_some()
1407                                                {
1408                                                    // Found a parent document - use it
1409                                                    frame.parent_id = Some(candidate_id);
1410                                                    tracing::debug!(
1411                                                        chunk_frame_id = frame_id,
1412                                                        parent_frame_id = candidate_id,
1413                                                        parent_seq = parent_seq,
1414                                                        "resolved chunk parent via fallback"
1415                                                    );
1416                                                    break;
1417                                                }
1418                                            }
1419                                        }
1420                                    }
1421                                }
1422                                if frame.parent_id.is_none() {
1423                                    tracing::warn!(
1424                                        chunk_frame_id = frame_id,
1425                                        parent_seq = parent_seq,
1426                                        "chunk has parent_sequence but parent not found in batch"
1427                                    );
1428                                }
1429                            }
1430                        }
1431
1432                        #[cfg(feature = "lex")]
1433                        let index_text = if self.tantivy.is_some() {
1434                            if let Some(text) = entry.search_text.clone() {
1435                                if text.trim().is_empty() {
1436                                    None
1437                                } else {
1438                                    Some(text)
1439                                }
1440                            } else {
1441                                Some(self.frame_content(&frame)?)
1442                            }
1443                        } else {
1444                            None
1445                        };
1446                        #[cfg(feature = "lex")]
1447                        if let (Some(engine), Some(text)) =
1448                            (self.tantivy.as_mut(), index_text.as_ref())
1449                        {
1450                            engine.add_frame(&frame, text)?;
1451                            self.tantivy_dirty = true;
1452
1453                            // Generate sketch for fast candidate pre-filtering
1454                            // Uses the same text as tantivy indexing for consistency
1455                            if !text.trim().is_empty() {
1456                                let entry = crate::types::generate_sketch(
1457                                    frame_id,
1458                                    text,
1459                                    crate::types::SketchVariant::Small,
1460                                    None,
1461                                );
1462                                self.sketch_track.insert(entry);
1463                            }
1464                        }
1465
1466                        if let Some(embedding) = entry.embedding.take() {
1467                            delta
1468                                .inserted_embeddings
1469                                .push((frame_id, embedding.clone()));
1470                        }
1471
1472                        if entry.role == FrameRole::Document {
1473                            delta
1474                                .inserted_time_entries
1475                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1476                            #[cfg(feature = "temporal_track")]
1477                            {
1478                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1479                                    frame_id,
1480                                    anchor_ts,
1481                                    anchor_source,
1482                                ));
1483                                delta.inserted_temporal_mentions.extend(
1484                                    Self::collect_temporal_mentions(
1485                                        entry.search_text.as_deref(),
1486                                        frame_id,
1487                                        anchor_ts,
1488                                    ),
1489                                );
1490                            }
1491                        }
1492
1493                        if let Some(predecessor) = frame.supersedes {
1494                            self.mark_frame_superseded(predecessor, frame_id)?;
1495                        }
1496
1497                        self.toc.frames.push(frame);
1498                        delta.inserted_frames.push(frame_id);
1499                        sequence_to_frame.insert(record.sequence, frame_id);
1500                    }
1501                    FrameWalOp::Tombstone => {
1502                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1503                            frame_id: 0,
1504                            reason: "tombstone missing frame reference",
1505                        })?;
1506                        self.mark_frame_deleted(target)?;
1507                        delta.mutated_frames = true;
1508                    }
1509                }
1510            }
1511            self.data_end = self.data_end.max(data_cursor);
1512        }
1513
1514        // Second pass: resolve any orphan DocumentChunk frames that are missing parent_id.
1515        // This handles edge cases where chunks couldn't be linked during the first pass.
1516        // First, collect orphan chunks and their resolved parents to avoid borrow conflicts.
1517        let orphan_resolutions: Vec<(u64, u64)> = delta
1518            .inserted_frames
1519            .iter()
1520            .filter_map(|&frame_id| {
1521                let idx = usize::try_from(frame_id).ok()?;
1522                let frame = self.toc.frames.get(idx)?;
1523                if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1524                    return None;
1525                }
1526                // Find the most recent Document frame before this chunk that has a manifest
1527                for candidate_id in (0..frame_id).rev() {
1528                    if let Ok(idx) = usize::try_from(candidate_id) {
1529                        if let Some(candidate) = self.toc.frames.get(idx) {
1530                            if candidate.role == FrameRole::Document
1531                                && candidate.chunk_manifest.is_some()
1532                                && candidate.status == FrameStatus::Active
1533                            {
1534                                return Some((frame_id, candidate_id));
1535                            }
1536                        }
1537                    }
1538                }
1539                None
1540            })
1541            .collect();
1542
1543        // Now apply the resolutions
1544        for (chunk_id, parent_id) in orphan_resolutions {
1545            if let Ok(idx) = usize::try_from(chunk_id) {
1546                if let Some(frame) = self.toc.frames.get_mut(idx) {
1547                    frame.parent_id = Some(parent_id);
1548                    tracing::debug!(
1549                        chunk_frame_id = chunk_id,
1550                        parent_frame_id = parent_id,
1551                        "resolved orphan chunk parent in second pass"
1552                    );
1553                }
1554            }
1555        }
1556
1557        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1558        // See commit_from_records() for where rebuild_indexes() is invoked.
1559        Ok(delta)
1560    }
1561
1562    #[cfg(feature = "temporal_track")]
1563    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1564        (timestamp, AnchorSource::FrameTimestamp)
1565    }
1566
1567    #[cfg(feature = "temporal_track")]
1568    fn collect_temporal_mentions(
1569        text: Option<&str>,
1570        frame_id: FrameId,
1571        anchor_ts: i64,
1572    ) -> Vec<TemporalMention> {
1573        let text = match text {
1574            Some(value) if !value.trim().is_empty() => value,
1575            _ => return Vec::new(),
1576        };
1577
1578        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1579            Ok(ts) => ts,
1580            Err(_) => return Vec::new(),
1581        };
1582
1583        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1584        let normalizer = TemporalNormalizer::new(context);
1585        let mut spans: Vec<(usize, usize)> = Vec::new();
1586        let lower = text.to_ascii_lowercase();
1587
1588        for phrase in STATIC_TEMPORAL_PHRASES {
1589            let mut search_start = 0usize;
1590            while let Some(idx) = lower[search_start..].find(phrase) {
1591                let abs = search_start + idx;
1592                let end = abs + phrase.len();
1593                spans.push((abs, end));
1594                search_start = end;
1595            }
1596        }
1597
1598        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1599        let regex = NUMERIC_DATE.get_or_init(|| {
1600            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1601        });
1602        let regex = match regex {
1603            Ok(re) => re,
1604            Err(msg) => {
1605                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1606                return Vec::new();
1607            }
1608        };
1609        for mat in regex.find_iter(text) {
1610            spans.push((mat.start(), mat.end()));
1611        }
1612
1613        spans.sort_unstable();
1614        spans.dedup();
1615
1616        let mut mentions: Vec<TemporalMention> = Vec::new();
1617        for (start, end) in spans {
1618            if end > text.len() || start >= end {
1619                continue;
1620            }
1621            let raw = &text[start..end];
1622            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1623            if trimmed.is_empty() {
1624                continue;
1625            }
1626            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1627            let finish = offset + trimmed.len();
1628            match normalizer.resolve(trimmed) {
1629                Ok(resolution) => {
1630                    mentions.extend(Self::resolution_to_mentions(
1631                        resolution, frame_id, offset, finish,
1632                    ));
1633                }
1634                Err(_) => continue,
1635            }
1636        }
1637
1638        mentions
1639    }
1640
1641    #[cfg(feature = "temporal_track")]
1642    fn resolution_to_mentions(
1643        resolution: TemporalResolution,
1644        frame_id: FrameId,
1645        byte_start: usize,
1646        byte_end: usize,
1647    ) -> Vec<TemporalMention> {
1648        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1649        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1650        let mut results = Vec::new();
1651
1652        let base_flags = Self::flags_from_resolution(&resolution.flags);
1653        match resolution.value {
1654            TemporalResolutionValue::Date(date) => {
1655                let ts = Self::date_to_timestamp(date);
1656                results.push(TemporalMention::new(
1657                    ts,
1658                    frame_id,
1659                    byte_start,
1660                    byte_len,
1661                    TemporalMentionKind::Date,
1662                    resolution.confidence,
1663                    0,
1664                    base_flags,
1665                ));
1666            }
1667            TemporalResolutionValue::DateTime(dt) => {
1668                let ts = dt.unix_timestamp();
1669                let tz_hint = dt.offset().whole_minutes() as i16;
1670                results.push(TemporalMention::new(
1671                    ts,
1672                    frame_id,
1673                    byte_start,
1674                    byte_len,
1675                    TemporalMentionKind::DateTime,
1676                    resolution.confidence,
1677                    tz_hint,
1678                    base_flags,
1679                ));
1680            }
1681            TemporalResolutionValue::DateRange { start, end } => {
1682                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1683                let start_ts = Self::date_to_timestamp(start);
1684                results.push(TemporalMention::new(
1685                    start_ts,
1686                    frame_id,
1687                    byte_start,
1688                    byte_len,
1689                    TemporalMentionKind::RangeStart,
1690                    resolution.confidence,
1691                    0,
1692                    flags,
1693                ));
1694                let end_ts = Self::date_to_timestamp(end);
1695                results.push(TemporalMention::new(
1696                    end_ts,
1697                    frame_id,
1698                    byte_start,
1699                    byte_len,
1700                    TemporalMentionKind::RangeEnd,
1701                    resolution.confidence,
1702                    0,
1703                    flags,
1704                ));
1705            }
1706            TemporalResolutionValue::DateTimeRange { start, end } => {
1707                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1708                results.push(TemporalMention::new(
1709                    start.unix_timestamp(),
1710                    frame_id,
1711                    byte_start,
1712                    byte_len,
1713                    TemporalMentionKind::RangeStart,
1714                    resolution.confidence,
1715                    start.offset().whole_minutes() as i16,
1716                    flags,
1717                ));
1718                results.push(TemporalMention::new(
1719                    end.unix_timestamp(),
1720                    frame_id,
1721                    byte_start,
1722                    byte_len,
1723                    TemporalMentionKind::RangeEnd,
1724                    resolution.confidence,
1725                    end.offset().whole_minutes() as i16,
1726                    flags,
1727                ));
1728            }
1729            TemporalResolutionValue::Month { year, month } => {
1730                let start_date = match Date::from_calendar_date(year, month, 1) {
1731                    Ok(date) => date,
1732                    Err(err) => {
1733                        tracing::warn!(
1734                            target = "memvid::temporal",
1735                            %err,
1736                            year,
1737                            month = month as u8,
1738                            "skipping invalid month resolution"
1739                        );
1740                        // Skip invalid range for this mention only.
1741                        return results;
1742                    }
1743                };
1744                let end_date = match Self::last_day_in_month(year, month) {
1745                    Some(date) => date,
1746                    None => {
1747                        tracing::warn!(
1748                            target = "memvid::temporal",
1749                            year,
1750                            month = month as u8,
1751                            "skipping month resolution with invalid calendar range"
1752                        );
1753                        return results;
1754                    }
1755                };
1756                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1757                results.push(TemporalMention::new(
1758                    Self::date_to_timestamp(start_date),
1759                    frame_id,
1760                    byte_start,
1761                    byte_len,
1762                    TemporalMentionKind::RangeStart,
1763                    resolution.confidence,
1764                    0,
1765                    flags,
1766                ));
1767                results.push(TemporalMention::new(
1768                    Self::date_to_timestamp(end_date),
1769                    frame_id,
1770                    byte_start,
1771                    byte_len,
1772                    TemporalMentionKind::RangeEnd,
1773                    resolution.confidence,
1774                    0,
1775                    flags,
1776                ));
1777            }
1778        }
1779
1780        results
1781    }
1782
1783    #[cfg(feature = "temporal_track")]
1784    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1785        let mut result = TemporalMentionFlags::empty();
1786        if flags
1787            .iter()
1788            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1789        {
1790            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1791        }
1792        if flags
1793            .iter()
1794            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1795        {
1796            result = result.set(TemporalMentionFlags::DERIVED, true);
1797        }
1798        result
1799    }
1800
1801    #[cfg(feature = "temporal_track")]
1802    fn date_to_timestamp(date: Date) -> i64 {
1803        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1804            .assume_offset(UtcOffset::UTC)
1805            .unix_timestamp()
1806    }
1807
1808    #[cfg(feature = "temporal_track")]
1809    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1810        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1811        while let Some(next) = date.next_day() {
1812            if next.month() == month {
1813                date = next;
1814            } else {
1815                break;
1816            }
1817        }
1818        Some(date)
1819    }
1820
1821    #[allow(dead_code)]
1822    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1823        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1824            return Ok(false);
1825        }
1826
1827        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1828            Some(artifact) => artifact,
1829            None => return Ok(false),
1830        };
1831
1832        let segment_id = self.toc.segment_catalog.next_segment_id;
1833        #[cfg(feature = "parallel_segments")]
1834        let span =
1835            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1836
1837        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1838        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1839        #[cfg(feature = "parallel_segments")]
1840        if let Some(span) = span {
1841            Self::decorate_segment_common(&mut descriptor.common, span);
1842        }
1843        #[cfg(feature = "parallel_segments")]
1844        let descriptor_for_manifest = descriptor.clone();
1845        self.toc.segment_catalog.lex_segments.push(descriptor);
1846        #[cfg(feature = "parallel_segments")]
1847        if let Err(err) = self.record_index_segment(
1848            SegmentKind::Lexical,
1849            descriptor_for_manifest.common,
1850            SegmentStats {
1851                doc_count: artifact.doc_count,
1852                vector_count: 0,
1853                time_entries: 0,
1854                bytes_uncompressed: artifact.bytes.len() as u64,
1855                build_micros: 0,
1856            },
1857        ) {
1858            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1859        }
1860        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1861        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1862        Ok(true)
1863    }
1864
1865    #[allow(dead_code)]
1866    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1867        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1868            return Ok(false);
1869        }
1870
1871        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1872            Some(artifact) => artifact,
1873            None => return Ok(false),
1874        };
1875
1876        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1877            if existing_dim != artifact.dimension {
1878                return Err(MemvidError::VecDimensionMismatch {
1879                    expected: existing_dim,
1880                    actual: artifact.dimension as usize,
1881                });
1882            }
1883        }
1884
1885        let segment_id = self.toc.segment_catalog.next_segment_id;
1886        #[cfg(feature = "parallel_segments")]
1887        #[cfg(feature = "parallel_segments")]
1888        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1889
1890        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1891        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1892        #[cfg(feature = "parallel_segments")]
1893        if let Some(span) = span {
1894            Self::decorate_segment_common(&mut descriptor.common, span);
1895        }
1896        #[cfg(feature = "parallel_segments")]
1897        let descriptor_for_manifest = descriptor.clone();
1898        self.toc.segment_catalog.vec_segments.push(descriptor);
1899        #[cfg(feature = "parallel_segments")]
1900        if let Err(err) = self.record_index_segment(
1901            SegmentKind::Vector,
1902            descriptor_for_manifest.common,
1903            SegmentStats {
1904                doc_count: 0,
1905                vector_count: artifact.vector_count,
1906                time_entries: 0,
1907                bytes_uncompressed: artifact.bytes_uncompressed,
1908                build_micros: 0,
1909            },
1910        ) {
1911            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1912        }
1913        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1914        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1915
1916        // Keep the global vec manifest in sync for auto-detection and stats.
1917        if self.toc.indexes.vec.is_none() {
1918            let empty_offset = self.data_end;
1919            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1920                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1921            self.toc.indexes.vec = Some(VecIndexManifest {
1922                vector_count: 0,
1923                dimension: 0,
1924                bytes_offset: empty_offset,
1925                bytes_length: 0,
1926                checksum: empty_checksum,
1927                compression_mode: self.vec_compression.clone(),
1928            });
1929        }
1930        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1931            if manifest.dimension == 0 {
1932                manifest.dimension = artifact.dimension;
1933            }
1934            if manifest.bytes_length == 0 {
1935                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1936                manifest.compression_mode = artifact.compression.clone();
1937            }
1938        }
1939
1940        self.vec_enabled = true;
1941        Ok(true)
1942    }
1943
1944    #[allow(dead_code)]
1945    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1946        if delta.inserted_time_entries.is_empty() {
1947            return Ok(false);
1948        }
1949
1950        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1951            Some(artifact) => artifact,
1952            None => return Ok(false),
1953        };
1954
1955        let segment_id = self.toc.segment_catalog.next_segment_id;
1956        #[cfg(feature = "parallel_segments")]
1957        #[cfg(feature = "parallel_segments")]
1958        let span = self.segment_span_from_iter(
1959            delta
1960                .inserted_time_entries
1961                .iter()
1962                .map(|entry| entry.frame_id),
1963        );
1964
1965        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1966        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1967        #[cfg(feature = "parallel_segments")]
1968        if let Some(span) = span {
1969            Self::decorate_segment_common(&mut descriptor.common, span);
1970        }
1971        #[cfg(feature = "parallel_segments")]
1972        let descriptor_for_manifest = descriptor.clone();
1973        self.toc.segment_catalog.time_segments.push(descriptor);
1974        #[cfg(feature = "parallel_segments")]
1975        if let Err(err) = self.record_index_segment(
1976            SegmentKind::Time,
1977            descriptor_for_manifest.common,
1978            SegmentStats {
1979                doc_count: 0,
1980                vector_count: 0,
1981                time_entries: artifact.entry_count,
1982                bytes_uncompressed: artifact.bytes.len() as u64,
1983                build_micros: 0,
1984            },
1985        ) {
1986            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1987        }
1988        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1989        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1990        Ok(true)
1991    }
1992
1993    #[cfg(feature = "temporal_track")]
1994    #[allow(dead_code)]
1995    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1996        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1997        {
1998            return Ok(false);
1999        }
2000
2001        debug_assert!(
2002            delta.inserted_temporal_mentions.len() < 1_000_000,
2003            "temporal delta mentions unexpectedly large: {}",
2004            delta.inserted_temporal_mentions.len()
2005        );
2006        debug_assert!(
2007            delta.inserted_temporal_anchors.len() < 1_000_000,
2008            "temporal delta anchors unexpectedly large: {}",
2009            delta.inserted_temporal_anchors.len()
2010        );
2011
2012        let artifact = match self.build_temporal_segment_from_records(
2013            &delta.inserted_temporal_mentions,
2014            &delta.inserted_temporal_anchors,
2015        )? {
2016            Some(artifact) => artifact,
2017            None => return Ok(false),
2018        };
2019
2020        let segment_id = self.toc.segment_catalog.next_segment_id;
2021        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2022        self.toc
2023            .segment_catalog
2024            .temporal_segments
2025            .push(descriptor.clone());
2026        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2027        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2028
2029        self.toc.temporal_track = Some(TemporalTrackManifest {
2030            bytes_offset: descriptor.common.bytes_offset,
2031            bytes_length: descriptor.common.bytes_length,
2032            entry_count: artifact.entry_count,
2033            anchor_count: artifact.anchor_count,
2034            checksum: artifact.checksum,
2035            flags: artifact.flags,
2036        });
2037
2038        self.clear_temporal_track_cache();
2039
2040        Ok(true)
2041    }
2042
2043    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2044        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2045            frame_id,
2046            reason: "frame id too large",
2047        })?;
2048        let frame = self
2049            .toc
2050            .frames
2051            .get_mut(index)
2052            .ok_or(MemvidError::InvalidFrame {
2053                frame_id,
2054                reason: "supersede target missing",
2055            })?;
2056        frame.status = FrameStatus::Superseded;
2057        frame.superseded_by = Some(successor_id);
2058        self.remove_frame_from_indexes(frame_id)
2059    }
2060
2061    pub(crate) fn rebuild_indexes(
2062        &mut self,
2063        new_vec_docs: &[(FrameId, Vec<f32>)],
2064        inserted_frame_ids: &[FrameId],
2065    ) -> Result<()> {
2066        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2067            return Ok(());
2068        }
2069
2070        let payload_end = self.payload_region_end();
2071        self.data_end = payload_end;
2072        // Don't truncate if footer_offset is higher - there may be replay segments
2073        // or other data written after payload_end that must be preserved.
2074        let safe_truncate_len = self.header.footer_offset.max(payload_end);
2075        if self.file.metadata()?.len() > safe_truncate_len {
2076            self.file.set_len(safe_truncate_len)?;
2077        }
2078        self.file.seek(SeekFrom::Start(payload_end))?;
2079
2080        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
2081        self.toc.segment_catalog.lex_segments.clear();
2082        self.toc.segment_catalog.vec_segments.clear();
2083        self.toc.segment_catalog.time_segments.clear();
2084        #[cfg(feature = "temporal_track")]
2085        self.toc.segment_catalog.temporal_segments.clear();
2086        #[cfg(feature = "parallel_segments")]
2087        self.toc.segment_catalog.index_segments.clear();
2088        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
2089        self.toc.segment_catalog.tantivy_segments.clear();
2090        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
2091        self.toc.indexes.lex_segments.clear();
2092
2093        let mut time_entries: Vec<TimeIndexEntry> = self
2094            .toc
2095            .frames
2096            .iter()
2097            .filter(|frame| {
2098                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2099            })
2100            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2101            .collect();
2102        let (ti_offset, ti_length, ti_checksum) =
2103            time_index_append(&mut self.file, &mut time_entries)?;
2104        self.toc.time_index = Some(TimeIndexManifest {
2105            bytes_offset: ti_offset,
2106            bytes_length: ti_length,
2107            entry_count: time_entries.len() as u64,
2108            checksum: ti_checksum,
2109        });
2110
2111        let mut footer_offset = ti_offset + ti_length;
2112
2113        #[cfg(feature = "temporal_track")]
2114        {
2115            self.toc.temporal_track = None;
2116            self.toc.segment_catalog.temporal_segments.clear();
2117            self.clear_temporal_track_cache();
2118        }
2119
2120        if self.lex_enabled {
2121            #[cfg(feature = "lex")]
2122            {
2123                if self.tantivy_dirty {
2124                    // instant_index was used: frames were added to Tantivy with WAL
2125                    // sequence numbers as IDs, which don't match the actual frame IDs
2126                    // assigned during apply_records(). Must do a full rebuild to fix IDs.
2127                    if let Ok(mut storage) = self.lex_storage.write() {
2128                        storage.clear();
2129                        storage.set_generation(0);
2130                    }
2131                    self.init_tantivy()?;
2132                    if let Some(mut engine) = self.tantivy.take() {
2133                        self.rebuild_tantivy_engine(&mut engine)?;
2134                        self.tantivy = Some(engine);
2135                    } else {
2136                        return Err(MemvidError::InvalidToc {
2137                            reason: "tantivy engine missing during rebuild".into(),
2138                        });
2139                    }
2140                } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2141                    // Incremental path: engine exists (from open() or previous commit),
2142                    // instant_index was NOT used so no wrong IDs. Just add new frames.
2143                    // This is O(batch_size) — the key optimization for bulk ingestion.
2144                    //
2145                    // Collect frames + text first to avoid borrow conflicts with engine.
2146                    let max_payload = crate::memvid::search::max_index_payload();
2147                    let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2148                    for &frame_id in inserted_frame_ids {
2149                        let frame = match self.toc.frames.get(frame_id as usize) {
2150                            Some(f) => f.clone(),
2151                            None => continue,
2152                        };
2153                        if frame.status != FrameStatus::Active {
2154                            continue;
2155                        }
2156                        if let Some(search_text) = frame.search_text.clone() {
2157                            if !search_text.trim().is_empty() {
2158                                prepared_docs.push((frame, search_text));
2159                                continue;
2160                            }
2161                        }
2162                        let mime = frame
2163                            .metadata
2164                            .as_ref()
2165                            .and_then(|m| m.mime.as_deref())
2166                            .unwrap_or("application/octet-stream");
2167                        if !crate::memvid::search::is_text_indexable_mime(mime) {
2168                            continue;
2169                        }
2170                        if frame.payload_length > max_payload {
2171                            continue;
2172                        }
2173                        let text = self.frame_search_text(&frame)?;
2174                        if !text.trim().is_empty() {
2175                            prepared_docs.push((frame, text));
2176                        }
2177                    }
2178                    if let Some(engine) = self.tantivy.as_mut() {
2179                        for (frame, text) in &prepared_docs {
2180                            engine.add_frame(frame, text)?;
2181                        }
2182                        engine.commit()?;
2183                    }
2184                } else {
2185                    // Full rebuild path: no engine or no new frames (e.g., doctor repair).
2186                    // Clear embedded storage to avoid carrying stale segments between rebuilds.
2187                    if let Ok(mut storage) = self.lex_storage.write() {
2188                        storage.clear();
2189                        storage.set_generation(0);
2190                    }
2191                    self.init_tantivy()?;
2192                    if let Some(mut engine) = self.tantivy.take() {
2193                        self.rebuild_tantivy_engine(&mut engine)?;
2194                        self.tantivy = Some(engine);
2195                    } else {
2196                        return Err(MemvidError::InvalidToc {
2197                            reason: "tantivy engine missing during rebuild".into(),
2198                        });
2199                    }
2200                }
2201
2202                // Set lex_enabled to ensure it persists
2203                self.lex_enabled = true;
2204
2205                // Mark Tantivy as dirty so it gets flushed
2206                self.tantivy_dirty = true;
2207
2208                // Position embedded Tantivy segments immediately after the time index.
2209                self.data_end = footer_offset;
2210
2211                // Flush Tantivy segments to file
2212                self.flush_tantivy()?;
2213
2214                // Update footer_offset after Tantivy flush
2215                footer_offset = self.header.footer_offset;
2216
2217                // Restore data_end to payload boundary so future payload writes stay before indexes.
2218                self.data_end = payload_end;
2219            }
2220            #[cfg(not(feature = "lex"))]
2221            {
2222                self.toc.indexes.lex = None;
2223                self.toc.indexes.lex_segments.clear();
2224            }
2225        } else {
2226            // Lex disabled: clear everything
2227            self.toc.indexes.lex = None;
2228            self.toc.indexes.lex_segments.clear();
2229            #[cfg(feature = "lex")]
2230            if let Ok(mut storage) = self.lex_storage.write() {
2231                storage.clear();
2232            }
2233        }
2234
2235        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2236            let vec_offset = footer_offset;
2237            self.file.seek(SeekFrom::Start(vec_offset))?;
2238            self.file.write_all(&artifact.bytes)?;
2239            footer_offset += artifact.bytes.len() as u64;
2240            self.toc.indexes.vec = Some(VecIndexManifest {
2241                vector_count: artifact.vector_count,
2242                dimension: artifact.dimension,
2243                bytes_offset: vec_offset,
2244                bytes_length: artifact.bytes.len() as u64,
2245                checksum: artifact.checksum,
2246                compression_mode: self.vec_compression.clone(),
2247            });
2248            self.vec_index = Some(index);
2249        } else {
2250            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
2251            if !self.vec_enabled {
2252                self.toc.indexes.vec = None;
2253            }
2254            self.vec_index = None;
2255        }
2256
2257        // Persist CLIP index if it has embeddings
2258        if self.clip_enabled {
2259            if let Some(ref clip_index) = self.clip_index {
2260                if !clip_index.is_empty() {
2261                    let artifact = clip_index.encode()?;
2262                    let clip_offset = footer_offset;
2263                    self.file.seek(SeekFrom::Start(clip_offset))?;
2264                    self.file.write_all(&artifact.bytes)?;
2265                    footer_offset += artifact.bytes.len() as u64;
2266                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2267                        bytes_offset: clip_offset,
2268                        bytes_length: artifact.bytes.len() as u64,
2269                        vector_count: artifact.vector_count,
2270                        dimension: artifact.dimension,
2271                        checksum: artifact.checksum,
2272                        model_name: crate::clip::default_model_info().name.to_string(),
2273                    });
2274                    tracing::info!(
2275                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2276                        artifact.vector_count,
2277                        clip_offset
2278                    );
2279                }
2280            }
2281        } else {
2282            self.toc.indexes.clip = None;
2283        }
2284
2285        // Persist memories track if it has cards
2286        if self.memories_track.card_count() > 0 {
2287            let memories_offset = footer_offset;
2288            let memories_bytes = self.memories_track.serialize()?;
2289            let memories_checksum = blake3::hash(&memories_bytes).into();
2290            self.file.seek(SeekFrom::Start(memories_offset))?;
2291            self.file.write_all(&memories_bytes)?;
2292            footer_offset += memories_bytes.len() as u64;
2293
2294            let stats = self.memories_track.stats();
2295            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2296                bytes_offset: memories_offset,
2297                bytes_length: memories_bytes.len() as u64,
2298                card_count: stats.card_count as u64,
2299                entity_count: stats.entity_count as u64,
2300                checksum: memories_checksum,
2301            });
2302        } else {
2303            self.toc.memories_track = None;
2304        }
2305
2306        // Persist logic mesh if it has nodes
2307        if self.logic_mesh.is_empty() {
2308            self.toc.logic_mesh = None;
2309        } else {
2310            let mesh_offset = footer_offset;
2311            let mesh_bytes = self.logic_mesh.serialize()?;
2312            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2313            self.file.seek(SeekFrom::Start(mesh_offset))?;
2314            self.file.write_all(&mesh_bytes)?;
2315            footer_offset += mesh_bytes.len() as u64;
2316
2317            let stats = self.logic_mesh.stats();
2318            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2319                bytes_offset: mesh_offset,
2320                bytes_length: mesh_bytes.len() as u64,
2321                node_count: stats.node_count as u64,
2322                edge_count: stats.edge_count as u64,
2323                checksum: mesh_checksum,
2324            });
2325        }
2326
2327        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
2328        tracing::info!(
2329            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2330            ti_offset,
2331            ti_length,
2332            footer_offset,
2333            self.header.footer_offset
2334        );
2335
2336        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
2337        // This prevents overwriting data like replay segments that were written after index data
2338        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2339
2340        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
2341        if self.file.metadata()?.len() < self.header.footer_offset {
2342            self.file.set_len(self.header.footer_offset)?;
2343        }
2344
2345        self.rewrite_toc_footer()?;
2346        self.header.toc_checksum = self.toc.toc_checksum;
2347        crate::persist_header(&mut self.file, &self.header)?;
2348
2349        #[cfg(feature = "lex")]
2350        if self.lex_enabled {
2351            if let Some(ref engine) = self.tantivy {
2352                let doc_count = engine.num_docs();
2353                let active_frame_count = self
2354                    .toc
2355                    .frames
2356                    .iter()
2357                    .filter(|f| f.status == FrameStatus::Active)
2358                    .count();
2359
2360                // Count frames that would actually be indexed by rebuild_tantivy_engine
2361                // Uses the same logic: content-type based check + size limit
2362                let text_indexable_count = self
2363                    .toc
2364                    .frames
2365                    .iter()
2366                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2367                    .count();
2368
2369                // Only fail if we have text-indexable frames but none got indexed
2370                // This avoids false positives for binary files (videos, images)
2371                if doc_count == 0 && text_indexable_count > 0 {
2372                    return Err(MemvidError::Doctor {
2373                        reason: format!(
2374                            "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2375                            This indicates a critical failure in the rebuild process."
2376                        ),
2377                    });
2378                }
2379
2380                // Success! Log it
2381                log::info!(
2382                    "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2383                );
2384            }
2385        }
2386
2387        Ok(())
2388    }
2389
2390    /// Persist the memories track to the file without a full rebuild.
2391    ///
2392    /// This is used when the memories track has been modified but no frame
2393    /// changes were made (e.g., after running enrichment).
2394    fn persist_memories_track(&mut self) -> Result<()> {
2395        if self.memories_track.card_count() == 0 {
2396            self.toc.memories_track = None;
2397            return Ok(());
2398        }
2399
2400        // Write after the current footer_offset
2401        let memories_offset = self.header.footer_offset;
2402        let memories_bytes = self.memories_track.serialize()?;
2403        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2404
2405        self.file.seek(SeekFrom::Start(memories_offset))?;
2406        self.file.write_all(&memories_bytes)?;
2407
2408        let stats = self.memories_track.stats();
2409        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2410            bytes_offset: memories_offset,
2411            bytes_length: memories_bytes.len() as u64,
2412            card_count: stats.card_count as u64,
2413            entity_count: stats.entity_count as u64,
2414            checksum: memories_checksum,
2415        });
2416
2417        // Update footer_offset to account for the memories track
2418        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2419
2420        // Ensure the file length covers the memories track
2421        if self.file.metadata()?.len() < self.header.footer_offset {
2422            self.file.set_len(self.header.footer_offset)?;
2423        }
2424
2425        Ok(())
2426    }
2427
2428    /// Persist the CLIP index to the file without a full rebuild.
2429    ///
2430    /// This is used when CLIP embeddings have been added but no full
2431    /// index rebuild is needed (e.g., in parallel segments mode).
2432    fn persist_clip_index(&mut self) -> Result<()> {
2433        if !self.clip_enabled {
2434            self.toc.indexes.clip = None;
2435            return Ok(());
2436        }
2437
2438        let clip_index = match &self.clip_index {
2439            Some(idx) if !idx.is_empty() => idx,
2440            _ => {
2441                self.toc.indexes.clip = None;
2442                return Ok(());
2443            }
2444        };
2445
2446        // Encode the CLIP index
2447        let artifact = clip_index.encode()?;
2448
2449        // Write after the current footer_offset
2450        let clip_offset = self.header.footer_offset;
2451        self.file.seek(SeekFrom::Start(clip_offset))?;
2452        self.file.write_all(&artifact.bytes)?;
2453
2454        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2455            bytes_offset: clip_offset,
2456            bytes_length: artifact.bytes.len() as u64,
2457            vector_count: artifact.vector_count,
2458            dimension: artifact.dimension,
2459            checksum: artifact.checksum,
2460            model_name: crate::clip::default_model_info().name.to_string(),
2461        });
2462
2463        tracing::info!(
2464            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2465            artifact.vector_count,
2466            clip_offset
2467        );
2468
2469        // Update footer_offset to account for the CLIP index
2470        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2471
2472        // Ensure the file length covers the CLIP index
2473        if self.file.metadata()?.len() < self.header.footer_offset {
2474            self.file.set_len(self.header.footer_offset)?;
2475        }
2476
2477        Ok(())
2478    }
2479
2480    /// Persist the Logic-Mesh to the file without a full rebuild.
2481    ///
2482    /// This is used when the Logic-Mesh has been modified but no frame
2483    /// changes were made (e.g., after running NER enrichment).
2484    fn persist_logic_mesh(&mut self) -> Result<()> {
2485        if self.logic_mesh.is_empty() {
2486            self.toc.logic_mesh = None;
2487            return Ok(());
2488        }
2489
2490        // Write after the current footer_offset
2491        let mesh_offset = self.header.footer_offset;
2492        let mesh_bytes = self.logic_mesh.serialize()?;
2493        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2494
2495        self.file.seek(SeekFrom::Start(mesh_offset))?;
2496        self.file.write_all(&mesh_bytes)?;
2497
2498        let stats = self.logic_mesh.stats();
2499        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2500            bytes_offset: mesh_offset,
2501            bytes_length: mesh_bytes.len() as u64,
2502            node_count: stats.node_count as u64,
2503            edge_count: stats.edge_count as u64,
2504            checksum: mesh_checksum,
2505        });
2506
2507        // Update footer_offset to account for the logic mesh
2508        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2509
2510        // Ensure the file length covers the logic mesh
2511        if self.file.metadata()?.len() < self.header.footer_offset {
2512            self.file.set_len(self.header.footer_offset)?;
2513        }
2514
2515        Ok(())
2516    }
2517
2518    /// Persist the sketch track to the file without a full rebuild.
2519    ///
2520    /// This is used when the sketch track has been modified (e.g., after
2521    /// running `sketch build`).
2522    fn persist_sketch_track(&mut self) -> Result<()> {
2523        if self.sketch_track.is_empty() {
2524            self.toc.sketch_track = None;
2525            return Ok(());
2526        }
2527
2528        // Seek to write after the current footer_offset
2529        self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2530
2531        // Write the sketch track and get (offset, length, checksum)
2532        let (sketch_offset, sketch_length, sketch_checksum) =
2533            crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2534
2535        let stats = self.sketch_track.stats();
2536        self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2537            bytes_offset: sketch_offset,
2538            bytes_length: sketch_length,
2539            entry_count: stats.entry_count,
2540            #[allow(clippy::cast_possible_truncation)]
2541            entry_size: stats.variant.entry_size() as u16,
2542            flags: 0,
2543            checksum: sketch_checksum,
2544        });
2545
2546        // Update footer_offset to account for the sketch track
2547        self.header.footer_offset = sketch_offset + sketch_length;
2548
2549        // Ensure the file length covers the sketch track
2550        if self.file.metadata()?.len() < self.header.footer_offset {
2551            self.file.set_len(self.header.footer_offset)?;
2552        }
2553
2554        tracing::debug!(
2555            "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2556            stats.entry_count,
2557            sketch_offset
2558        );
2559
2560        Ok(())
2561    }
2562
2563    #[cfg(feature = "lex")]
2564    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2565        let LexWalBatch {
2566            generation,
2567            doc_count,
2568            checksum,
2569            segments,
2570        } = batch;
2571
2572        if let Ok(mut storage) = self.lex_storage.write() {
2573            storage.replace(doc_count, checksum, segments);
2574            storage.set_generation(generation);
2575        }
2576
2577        self.persist_lex_manifest()
2578    }
2579
2580    #[cfg(feature = "lex")]
2581    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2582        let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2583        self.append_wal_entry(&payload)?;
2584        Ok(())
2585    }
2586
2587    #[cfg(feature = "lex")]
2588    fn persist_lex_manifest(&mut self) -> Result<()> {
2589        let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2590            storage.to_manifest()
2591        } else {
2592            (None, Vec::new())
2593        };
2594
2595        // Update the manifest
2596        if let Some(storage_manifest) = index_manifest {
2597            // Old LexIndexArtifact format: set the manifest with actual offset/length
2598            self.toc.indexes.lex = Some(storage_manifest);
2599        } else {
2600            // Tantivy segments OR lex disabled: clear the manifest
2601            // Stats will check lex_segments instead of manifest
2602            self.toc.indexes.lex = None;
2603        }
2604
2605        self.toc.indexes.lex_segments = segments;
2606
2607        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2608        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2609
2610        self.rewrite_toc_footer()?;
2611        self.header.toc_checksum = self.toc.toc_checksum;
2612        crate::persist_header(&mut self.file, &self.header)?;
2613        Ok(())
2614    }
2615
2616    #[cfg(feature = "lex")]
2617    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2618        let TantivySnapshot {
2619            doc_count,
2620            checksum,
2621            segments,
2622        } = snapshot;
2623
2624        let mut footer_offset = self.data_end;
2625        self.file.seek(SeekFrom::Start(footer_offset))?;
2626
2627        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2628        for segment in segments {
2629            let bytes_length = segment.bytes.len() as u64;
2630            self.file.write_all(&segment.bytes)?;
2631            self.file.flush()?; // Flush segment data to disk
2632            embedded_segments.push(EmbeddedLexSegment {
2633                path: segment.path,
2634                bytes_offset: footer_offset,
2635                bytes_length,
2636                checksum: segment.checksum,
2637            });
2638            footer_offset += bytes_length;
2639        }
2640        // Set footer_offset for TOC writing, but DON'T update data_end
2641        // data_end stays at end of payloads, so next commit overwrites these segments
2642        // Use max() to never decrease footer_offset - this preserves replay segments
2643        // that may have been written at a higher offset
2644        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2645
2646        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2647        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2648            Vec::with_capacity(embedded_segments.len());
2649        for segment in &embedded_segments {
2650            let descriptor = TantivySegmentDescriptor::from_common(
2651                SegmentCommon::new(
2652                    next_segment_id,
2653                    segment.bytes_offset,
2654                    segment.bytes_length,
2655                    segment.checksum,
2656                ),
2657                segment.path.clone(),
2658            );
2659            catalog_segments.push(descriptor);
2660            next_segment_id = next_segment_id.saturating_add(1);
2661        }
2662        if catalog_segments.is_empty() {
2663            self.toc.segment_catalog.tantivy_segments.clear();
2664        } else {
2665            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2666            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2667        }
2668        self.toc.segment_catalog.next_segment_id = next_segment_id;
2669
2670        // REMOVED: catalog_data_end() check
2671        // This was causing orphaned Tantivy segments because it would see OLD segments
2672        // still in the catalog from previous commits, and push footer_offset forward.
2673        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2674        // stay at the end of the newly written segments.
2675
2676        let generation = self
2677            .lex_storage
2678            .write()
2679            .map_err(|_| MemvidError::Tantivy {
2680                reason: "embedded lex storage lock poisoned".into(),
2681            })
2682            .map(|mut storage| {
2683                storage.replace(doc_count, checksum, embedded_segments.clone());
2684                storage.generation()
2685            })?;
2686
2687        let batch = LexWalBatch {
2688            generation,
2689            doc_count,
2690            checksum,
2691            segments: embedded_segments.clone(),
2692        };
2693        self.append_lex_batch(&batch)?;
2694        self.persist_lex_manifest()?;
2695        self.header.toc_checksum = self.toc.toc_checksum;
2696        crate::persist_header(&mut self.file, &self.header)?;
2697        Ok(())
2698    }
2699
2700    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2701        let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2702            frame_id,
2703            reason: "frame id too large",
2704        })?;
2705        let frame = self
2706            .toc
2707            .frames
2708            .get_mut(index)
2709            .ok_or(MemvidError::InvalidFrame {
2710                frame_id,
2711                reason: "delete target missing",
2712            })?;
2713        frame.status = FrameStatus::Deleted;
2714        frame.superseded_by = None;
2715        self.remove_frame_from_indexes(frame_id)
2716    }
2717
2718    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2719        #[cfg(feature = "lex")]
2720        if let Some(engine) = self.tantivy.as_mut() {
2721            engine.delete_frame(frame_id)?;
2722            self.tantivy_dirty = true;
2723        }
2724        if let Some(index) = self.lex_index.as_mut() {
2725            index.remove_document(frame_id);
2726        }
2727        if let Some(index) = self.vec_index.as_mut() {
2728            index.remove(frame_id);
2729        }
2730        Ok(())
2731    }
2732
2733    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2734        let Ok(index) = usize::try_from(frame_id) else {
2735            return false;
2736        };
2737        self.toc
2738            .frames
2739            .get(index)
2740            .is_some_and(|frame| frame.status == FrameStatus::Active)
2741    }
2742
2743    #[cfg(feature = "parallel_segments")]
2744    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2745    where
2746        I: IntoIterator<Item = FrameId>,
2747    {
2748        let mut iter = iter.into_iter();
2749        let first_id = iter.next()?;
2750        let first_frame = self.toc.frames.get(first_id as usize);
2751        let mut min_id = first_id;
2752        let mut max_id = first_id;
2753        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2754        let mut page_end = first_frame
2755            .and_then(|frame| frame.chunk_count)
2756            .map(|count| page_start + count.saturating_sub(1))
2757            .unwrap_or(page_start);
2758        for frame_id in iter {
2759            if frame_id < min_id {
2760                min_id = frame_id;
2761            }
2762            if frame_id > max_id {
2763                max_id = frame_id;
2764            }
2765            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2766                if let Some(idx) = frame.chunk_index {
2767                    page_start = page_start.min(idx);
2768                    if let Some(count) = frame.chunk_count {
2769                        let end = idx + count.saturating_sub(1);
2770                        page_end = page_end.max(end);
2771                    } else {
2772                        page_end = page_end.max(idx);
2773                    }
2774                }
2775            }
2776        }
2777        Some(SegmentSpan {
2778            frame_start: min_id,
2779            frame_end: max_id,
2780            page_start,
2781            page_end,
2782            ..SegmentSpan::default()
2783        })
2784    }
2785
2786    #[cfg(feature = "parallel_segments")]
2787    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2788        common.span = Some(span);
2789        if common.codec_version == 0 {
2790            common.codec_version = 1;
2791        }
2792    }
2793
2794    #[cfg(feature = "parallel_segments")]
2795    pub(crate) fn record_index_segment(
2796        &mut self,
2797        kind: SegmentKind,
2798        common: SegmentCommon,
2799        stats: SegmentStats,
2800    ) -> Result<()> {
2801        let entry = IndexSegmentRef {
2802            kind,
2803            common,
2804            stats,
2805        };
2806        self.toc.segment_catalog.index_segments.push(entry.clone());
2807        if let Some(wal) = self.manifest_wal.as_mut() {
2808            wal.append_segments(&[entry])?;
2809        }
2810        Ok(())
2811    }
2812
2813    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2814        self.ensure_writable()?;
2815        if self.toc.ticket_ref.issuer == "free-tier" {
2816            return Ok(());
2817        }
2818        match self.tier() {
2819            Tier::Free => Ok(()),
2820            tier => {
2821                if self.toc.ticket_ref.issuer.trim().is_empty() {
2822                    Err(MemvidError::TicketRequired { tier })
2823                } else {
2824                    Ok(())
2825                }
2826            }
2827        }
2828    }
2829
2830    pub(crate) fn tier(&self) -> Tier {
2831        if self.header.wal_size >= WAL_SIZE_LARGE {
2832            Tier::Enterprise
2833        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2834            Tier::Dev
2835        } else {
2836            Tier::Free
2837        }
2838    }
2839
2840    pub(crate) fn capacity_limit(&self) -> u64 {
2841        if self.toc.ticket_ref.capacity_bytes != 0 {
2842            self.toc.ticket_ref.capacity_bytes
2843        } else {
2844            self.tier().capacity_bytes()
2845        }
2846    }
2847
2848    /// Get current storage capacity in bytes.
2849    ///
2850    /// Returns the capacity from the applied ticket, or the default
2851    /// tier capacity (1 GB for free tier).
2852    #[must_use]
2853    pub fn get_capacity(&self) -> u64 {
2854        self.capacity_limit()
2855    }
2856
2857    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2858        tracing::info!(
2859            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2860            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2861            time_segments = self.toc.segment_catalog.time_segments.len(),
2862            footer_offset = self.header.footer_offset,
2863            data_end = self.data_end,
2864            "rewrite_toc_footer: about to serialize TOC"
2865        );
2866        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2867        let footer_offset = self.header.footer_offset;
2868        self.file.seek(SeekFrom::Start(footer_offset))?;
2869        self.file.write_all(&toc_bytes)?;
2870        let footer = CommitFooter {
2871            toc_len: toc_bytes.len() as u64,
2872            toc_hash: *hash(&toc_bytes).as_bytes(),
2873            generation: self.generation,
2874        };
2875        let encoded_footer = footer.encode();
2876        self.file.write_all(&encoded_footer)?;
2877
2878        // The file must always be at least header + WAL size
2879        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2880        let min_len = self.header.wal_offset + self.header.wal_size;
2881        let final_len = new_len.max(min_len);
2882
2883        if new_len < min_len {
2884            tracing::warn!(
2885                file.new_len = new_len,
2886                file.min_len = min_len,
2887                file.final_len = final_len,
2888                "truncation would cut into WAL region, clamping to min_len"
2889            );
2890        }
2891
2892        self.file.set_len(final_len)?;
2893        // Ensure footer is flushed to disk so mmap-based readers can find it
2894        self.file.sync_all()?;
2895        Ok(())
2896    }
2897}
2898
2899#[cfg(feature = "parallel_segments")]
2900impl Memvid {
2901    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2902        let chunks = self.collect_segment_chunks(delta)?;
2903        if chunks.is_empty() {
2904            return Ok(false);
2905        }
2906        let planner = SegmentPlanner::new(opts.clone());
2907        let plans = planner.plan_from_chunks(chunks);
2908        if plans.is_empty() {
2909            return Ok(false);
2910        }
2911        let worker_pool = SegmentWorkerPool::new(opts);
2912        let results = worker_pool.execute(plans)?;
2913        if results.is_empty() {
2914            return Ok(false);
2915        }
2916        self.append_parallel_segments(results)?;
2917        Ok(true)
2918    }
2919
2920    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2921        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2922            delta.inserted_embeddings.iter().cloned().collect();
2923        tracing::info!(
2924            inserted_frames = ?delta.inserted_frames,
2925            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2926            "collect_segment_chunks: comparing frame IDs"
2927        );
2928        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2929        for frame_id in &delta.inserted_frames {
2930            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2931                MemvidError::InvalidFrame {
2932                    frame_id: *frame_id,
2933                    reason: "frame id out of range while planning segments",
2934                },
2935            )?;
2936            let text = self.frame_content(&frame)?;
2937            if text.trim().is_empty() {
2938                continue;
2939            }
2940            let token_estimate = estimate_tokens(&text);
2941            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2942            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2943            let page_start = if frame.chunk_index.is_some() {
2944                chunk_index + 1
2945            } else {
2946                0
2947            };
2948            let page_end = if frame.chunk_index.is_some() {
2949                page_start
2950            } else {
2951                0
2952            };
2953            chunks.push(SegmentChunkPlan {
2954                text,
2955                frame_id: *frame_id,
2956                timestamp: frame.timestamp,
2957                chunk_index,
2958                chunk_count: chunk_count.max(1),
2959                token_estimate,
2960                token_start: 0,
2961                token_end: 0,
2962                page_start,
2963                page_end,
2964                embedding: embedding_map.remove(frame_id),
2965            });
2966        }
2967        Ok(chunks)
2968    }
2969}
2970
2971#[cfg(feature = "parallel_segments")]
2972fn estimate_tokens(text: &str) -> usize {
2973    text.split_whitespace().count().max(1)
2974}
2975
2976impl Memvid {
2977    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2978        let catalog_end = self.catalog_data_end();
2979        if catalog_end <= self.header.footer_offset {
2980            return Ok(false);
2981        }
2982        self.header.footer_offset = catalog_end;
2983        self.rewrite_toc_footer()?;
2984        self.header.toc_checksum = self.toc.toc_checksum;
2985        crate::persist_header(&mut self.file, &self.header)?;
2986        Ok(true)
2987    }
2988}
2989
2990impl Memvid {
2991    pub fn vacuum(&mut self) -> Result<()> {
2992        self.commit()?;
2993
2994        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2995        let frames: Vec<Frame> = self
2996            .toc
2997            .frames
2998            .iter()
2999            .filter(|frame| frame.status == FrameStatus::Active)
3000            .cloned()
3001            .collect();
3002        for frame in frames {
3003            let bytes = self.read_frame_payload_bytes(&frame)?;
3004            active_payloads.insert(frame.id, bytes);
3005        }
3006
3007        let mut cursor = self.header.wal_offset + self.header.wal_size;
3008        self.file.seek(SeekFrom::Start(cursor))?;
3009        for frame in &mut self.toc.frames {
3010            if frame.status == FrameStatus::Active {
3011                if let Some(bytes) = active_payloads.get(&frame.id) {
3012                    self.file.write_all(bytes)?;
3013                    frame.payload_offset = cursor;
3014                    frame.payload_length = bytes.len() as u64;
3015                    cursor += bytes.len() as u64;
3016                } else {
3017                    frame.payload_offset = 0;
3018                    frame.payload_length = 0;
3019                }
3020            } else {
3021                frame.payload_offset = 0;
3022                frame.payload_length = 0;
3023            }
3024        }
3025
3026        self.data_end = cursor;
3027
3028        self.toc.segments.clear();
3029        self.toc.indexes.lex_segments.clear();
3030        self.toc.segment_catalog.lex_segments.clear();
3031        self.toc.segment_catalog.vec_segments.clear();
3032        self.toc.segment_catalog.time_segments.clear();
3033        #[cfg(feature = "temporal_track")]
3034        {
3035            self.toc.temporal_track = None;
3036            self.toc.segment_catalog.temporal_segments.clear();
3037        }
3038        #[cfg(feature = "lex")]
3039        {
3040            self.toc.segment_catalog.tantivy_segments.clear();
3041        }
3042        #[cfg(feature = "parallel_segments")]
3043        {
3044            self.toc.segment_catalog.index_segments.clear();
3045        }
3046
3047        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
3048        #[cfg(feature = "lex")]
3049        {
3050            self.tantivy = None;
3051            self.tantivy_dirty = false;
3052        }
3053
3054        self.rebuild_indexes(&[], &[])?;
3055        self.file.sync_all()?;
3056        Ok(())
3057    }
3058
3059    /// Preview how a document would be chunked without actually ingesting it.
3060    ///
3061    /// This is useful when you need to compute embeddings for each chunk externally
3062    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
3063    /// is too small to be chunked (< 2400 chars after normalization).
3064    ///
3065    /// # Example
3066    /// ```ignore
3067    /// let chunks = mem.preview_chunks(b"long document text...")?;
3068    /// if let Some(chunk_texts) = chunks {
3069    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
3070    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
3071    /// } else {
3072    ///     let embedding = my_embedder.embed_query(text)?;
3073    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
3074    /// }
3075    /// ```
3076    #[must_use]
3077    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3078        plan_document_chunks(payload).map(|plan| plan.chunks)
3079    }
3080
3081    /// Append raw bytes as a document frame.
3082    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3083        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3084    }
3085
3086    /// Append raw bytes with explicit metadata/options.
3087    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3088        self.put_internal(Some(payload), None, None, None, options, None)
3089    }
3090
3091    /// Append bytes and an existing embedding (bypasses on-device embedding).
3092    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3093        self.put_internal(
3094            Some(payload),
3095            None,
3096            Some(embedding),
3097            None,
3098            PutOptions::default(),
3099            None,
3100        )
3101    }
3102
3103    pub fn put_with_embedding_and_options(
3104        &mut self,
3105        payload: &[u8],
3106        embedding: Vec<f32>,
3107        options: PutOptions,
3108    ) -> Result<u64> {
3109        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3110    }
3111
3112    /// Ingest a document with pre-computed embeddings for both parent and chunks.
3113    ///
3114    /// This is the recommended API for high-accuracy semantic search when chunking
3115    /// occurs. The caller provides:
3116    /// - `payload`: The document bytes
3117    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
3118    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
3119    /// - `options`: Standard put options
3120    ///
3121    /// The number of chunk embeddings should match the number of chunks that will be
3122    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
3123    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
3124    pub fn put_with_chunk_embeddings(
3125        &mut self,
3126        payload: &[u8],
3127        parent_embedding: Option<Vec<f32>>,
3128        chunk_embeddings: Vec<Vec<f32>>,
3129        options: PutOptions,
3130    ) -> Result<u64> {
3131        self.put_internal(
3132            Some(payload),
3133            None,
3134            parent_embedding,
3135            Some(chunk_embeddings),
3136            options,
3137            None,
3138        )
3139    }
3140
3141    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
3142    pub fn update_frame(
3143        &mut self,
3144        frame_id: FrameId,
3145        payload: Option<Vec<u8>>,
3146        mut options: PutOptions,
3147        embedding: Option<Vec<f32>>,
3148    ) -> Result<u64> {
3149        self.ensure_mutation_allowed()?;
3150        let existing = self.frame_by_id(frame_id)?;
3151        if existing.status != FrameStatus::Active {
3152            return Err(MemvidError::InvalidFrame {
3153                frame_id,
3154                reason: "frame is not active",
3155            });
3156        }
3157
3158        if options.timestamp.is_none() {
3159            options.timestamp = Some(existing.timestamp);
3160        }
3161        if options.track.is_none() {
3162            options.track = existing.track.clone();
3163        }
3164        if options.kind.is_none() {
3165            options.kind = existing.kind.clone();
3166        }
3167        if options.uri.is_none() {
3168            options.uri = existing.uri.clone();
3169        }
3170        if options.title.is_none() {
3171            options.title = existing.title.clone();
3172        }
3173        if options.metadata.is_none() {
3174            options.metadata = existing.metadata.clone();
3175        }
3176        if options.search_text.is_none() {
3177            options.search_text = existing.search_text.clone();
3178        }
3179        if options.tags.is_empty() {
3180            options.tags = existing.tags.clone();
3181        }
3182        if options.labels.is_empty() {
3183            options.labels = existing.labels.clone();
3184        }
3185        if options.extra_metadata.is_empty() {
3186            options.extra_metadata = existing.extra_metadata.clone();
3187        }
3188
3189        let reuse_frame = if payload.is_none() {
3190            options.auto_tag = false;
3191            options.extract_dates = false;
3192            Some(existing.clone())
3193        } else {
3194            None
3195        };
3196
3197        let effective_embedding = if let Some(explicit) = embedding {
3198            Some(explicit)
3199        } else if self.vec_enabled {
3200            self.frame_embedding(frame_id)?
3201        } else {
3202            None
3203        };
3204
3205        let payload_slice = payload.as_deref();
3206        let reuse_flag = reuse_frame.is_some();
3207        let replace_flag = payload_slice.is_some();
3208        let seq = self.put_internal(
3209            payload_slice,
3210            reuse_frame,
3211            effective_embedding,
3212            None, // No chunk embeddings for update
3213            options,
3214            Some(frame_id),
3215        )?;
3216        info!(
3217            "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3218        );
3219        Ok(seq)
3220    }
3221
3222    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3223        self.ensure_mutation_allowed()?;
3224        let frame = self.frame_by_id(frame_id)?;
3225        if frame.status != FrameStatus::Active {
3226            return Err(MemvidError::InvalidFrame {
3227                frame_id,
3228                reason: "frame is not active",
3229            });
3230        }
3231
3232        let mut tombstone = WalEntryData {
3233            timestamp: SystemTime::now()
3234                .duration_since(UNIX_EPOCH)
3235                .map(|d| d.as_secs() as i64)
3236                .unwrap_or(frame.timestamp),
3237            kind: None,
3238            track: None,
3239            payload: Vec::new(),
3240            embedding: None,
3241            uri: frame.uri.clone(),
3242            title: frame.title.clone(),
3243            canonical_encoding: frame.canonical_encoding,
3244            canonical_length: frame.canonical_length,
3245            metadata: None,
3246            search_text: None,
3247            tags: Vec::new(),
3248            labels: Vec::new(),
3249            extra_metadata: BTreeMap::new(),
3250            content_dates: Vec::new(),
3251            chunk_manifest: None,
3252            role: frame.role,
3253            parent_sequence: None,
3254            chunk_index: frame.chunk_index,
3255            chunk_count: frame.chunk_count,
3256            op: FrameWalOp::Tombstone,
3257            target_frame_id: Some(frame_id),
3258            supersedes_frame_id: None,
3259            reuse_payload_from: None,
3260            source_sha256: None,
3261            source_path: None,
3262            enrichment_state: crate::types::EnrichmentState::default(),
3263        };
3264        tombstone.kind = frame.kind.clone();
3265        tombstone.track = frame.track.clone();
3266
3267        let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3268        let seq = self.append_wal_entry(&payload_bytes)?;
3269        self.dirty = true;
3270        let suppress_checkpoint = self
3271            .batch_opts
3272            .as_ref()
3273            .is_some_and(|o| o.disable_auto_checkpoint);
3274        if !suppress_checkpoint && self.wal.should_checkpoint() {
3275            self.commit()?;
3276        }
3277        info!("frame_delete frame_id={frame_id} seq={seq}");
3278        Ok(seq)
3279    }
3280}
3281
3282impl Memvid {
3283    fn put_internal(
3284        &mut self,
3285        payload: Option<&[u8]>,
3286        reuse_frame: Option<Frame>,
3287        embedding: Option<Vec<f32>>,
3288        chunk_embeddings: Option<Vec<Vec<f32>>>,
3289        mut options: PutOptions,
3290        supersedes: Option<FrameId>,
3291    ) -> Result<u64> {
3292        self.ensure_mutation_allowed()?;
3293
3294        // Deduplication: if enabled and we have payload, check if identical content exists
3295        if options.dedup {
3296            if let Some(bytes) = payload {
3297                let content_hash = hash(bytes);
3298                if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3299                    // Found existing frame with same content hash, skip ingestion
3300                    tracing::debug!(
3301                        frame_id = existing_frame.id,
3302                        "dedup: skipping ingestion, identical content already exists"
3303                    );
3304                    // Return existing frame's sequence number (which equals frame_id for committed frames)
3305                    return Ok(existing_frame.id);
3306                }
3307            }
3308        }
3309
3310        if payload.is_some() && reuse_frame.is_some() {
3311            let frame_id = reuse_frame
3312                .as_ref()
3313                .map(|frame| frame.id)
3314                .unwrap_or_default();
3315            return Err(MemvidError::InvalidFrame {
3316                frame_id,
3317                reason: "cannot reuse payload when bytes are provided",
3318            });
3319        }
3320
3321        // If the caller supplies embeddings, enforce a single vector dimension contract
3322        // for the entire memory (fail fast, never silently accept mixed dimensions).
3323        let incoming_dimension = {
3324            let mut dim: Option<u32> = None;
3325
3326            if let Some(ref vector) = embedding {
3327                if !vector.is_empty() {
3328                    #[allow(clippy::cast_possible_truncation)]
3329                    let len = vector.len() as u32;
3330                    dim = Some(len);
3331                }
3332            }
3333
3334            if let Some(ref vectors) = chunk_embeddings {
3335                for vector in vectors {
3336                    if vector.is_empty() {
3337                        continue;
3338                    }
3339                    let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3340                    match dim {
3341                        None => dim = Some(vec_dim),
3342                        Some(existing) if existing == vec_dim => {}
3343                        Some(existing) => {
3344                            return Err(MemvidError::VecDimensionMismatch {
3345                                expected: existing,
3346                                actual: vector.len(),
3347                            });
3348                        }
3349                    }
3350                }
3351            }
3352
3353            dim
3354        };
3355
3356        if let Some(incoming_dimension) = incoming_dimension {
3357            // Embeddings imply vector search should be enabled.
3358            if !self.vec_enabled {
3359                self.enable_vec()?;
3360            }
3361
3362            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3363                if existing_dimension != incoming_dimension {
3364                    return Err(MemvidError::VecDimensionMismatch {
3365                        expected: existing_dimension,
3366                        actual: incoming_dimension as usize,
3367                    });
3368                }
3369            }
3370
3371            // Persist the dimension early for better auto-detection (even before the next commit).
3372            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3373                if manifest.dimension == 0 {
3374                    manifest.dimension = incoming_dimension;
3375                }
3376            }
3377        }
3378
3379        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3380        let payload_tail = self.payload_region_end();
3381        let projected = if let Some(bytes) = payload {
3382            let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3383                prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3384            } else {
3385                prepare_canonical_payload(bytes)?
3386            };
3387            let len = prepared.len();
3388            prepared_payload = Some((prepared, encoding, length));
3389            payload_tail.saturating_add(len as u64)
3390        } else if reuse_frame.is_some() {
3391            payload_tail
3392        } else {
3393            return Err(MemvidError::InvalidFrame {
3394                frame_id: 0,
3395                reason: "payload required for frame insertion",
3396            });
3397        };
3398
3399        let capacity_limit = self.capacity_limit();
3400        if projected > capacity_limit {
3401            let incoming_size = projected.saturating_sub(payload_tail);
3402            return Err(MemvidError::CapacityExceeded {
3403                current: payload_tail,
3404                limit: capacity_limit,
3405                required: incoming_size,
3406            });
3407        }
3408        let timestamp = options.timestamp.take().unwrap_or_else(|| {
3409            SystemTime::now()
3410                .duration_since(UNIX_EPOCH)
3411                .map(|d| d.as_secs() as i64)
3412                .unwrap_or(0)
3413        });
3414
3415        #[allow(unused_assignments)]
3416        let mut reuse_bytes: Option<Vec<u8>> = None;
3417        let payload_for_processing = if let Some(bytes) = payload {
3418            Some(bytes)
3419        } else if let Some(frame) = reuse_frame.as_ref() {
3420            let bytes = self.frame_canonical_bytes(frame)?;
3421            reuse_bytes = Some(bytes);
3422            reuse_bytes.as_deref()
3423        } else {
3424            None
3425        };
3426
3427        // Try to create a chunk plan from raw UTF-8 bytes first
3428        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3429            (Some(bytes), None) => plan_document_chunks(bytes),
3430            _ => None,
3431        };
3432
3433        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
3434        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
3435        // For --no-raw mode, we store only the extracted text and a hash of the original binary
3436        let mut source_sha256: Option<[u8; 32]> = None;
3437        let source_path_value = options.source_path.take();
3438
3439        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3440            if raw_chunk_plan.is_some() {
3441                // UTF-8 text document - chunks contain the text, no parent payload needed
3442                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3443            } else if options.no_raw {
3444                // --no-raw mode: don't store the raw binary, only compute hash
3445                if let Some(bytes) = payload {
3446                    // Compute BLAKE3 hash of original binary for verification
3447                    let hash_result = hash(bytes);
3448                    source_sha256 = Some(*hash_result.as_bytes());
3449                    // Store empty payload - the extracted text is in search_text
3450                    (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3451                } else {
3452                    return Err(MemvidError::InvalidFrame {
3453                        frame_id: 0,
3454                        reason: "payload required for --no-raw mode",
3455                    });
3456                }
3457            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3458                (prepared, encoding, length, None)
3459            } else if let Some(bytes) = payload {
3460                let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3461                    prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3462                } else {
3463                    prepare_canonical_payload(bytes)?
3464                };
3465                (prepared, encoding, length, None)
3466            } else if let Some(frame) = reuse_frame.as_ref() {
3467                (
3468                    Vec::new(),
3469                    frame.canonical_encoding,
3470                    frame.canonical_length,
3471                    Some(frame.id),
3472                )
3473            } else {
3474                return Err(MemvidError::InvalidFrame {
3475                    frame_id: 0,
3476                    reason: "payload required for frame insertion",
3477                });
3478            };
3479
3480        // Track whether we'll create an extracted text chunk plan later
3481        let mut chunk_plan = raw_chunk_plan;
3482
3483        let mut metadata = options.metadata.take();
3484        let mut search_text = options
3485            .search_text
3486            .take()
3487            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3488        let mut tags = std::mem::take(&mut options.tags);
3489        let mut labels = std::mem::take(&mut options.labels);
3490        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3491        let mut content_dates: Vec<String> = Vec::new();
3492
3493        let need_search_text = search_text
3494            .as_ref()
3495            .is_none_or(|text| text.trim().is_empty());
3496        let need_metadata = metadata.is_none();
3497        let run_extractor = need_search_text || need_metadata || options.auto_tag;
3498
3499        let mut extraction_error = None;
3500        let mut is_skim_extraction = false; // Track if extraction was time-limited
3501
3502        let extracted = if run_extractor {
3503            if let Some(bytes) = payload_for_processing {
3504                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3505                let uri_hint = options.uri.as_deref();
3506
3507                // Use time-budgeted extraction for instant indexing with a budget
3508                let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3509
3510                if use_budgeted {
3511                    // Time-budgeted extraction for sub-second ingestion
3512                    let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3513                        options.extraction_budget_ms,
3514                    );
3515                    match crate::extract_budgeted::extract_with_budget(
3516                        bytes, mime_hint, uri_hint, budget,
3517                    ) {
3518                        Ok(result) => {
3519                            is_skim_extraction = result.is_skim();
3520                            if is_skim_extraction {
3521                                tracing::debug!(
3522                                    coverage = result.coverage,
3523                                    elapsed_ms = result.elapsed_ms,
3524                                    sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3525                                    "time-budgeted extraction (skim)"
3526                                );
3527                            }
3528                            // Convert BudgetedExtractionResult to ExtractedDocument
3529                            let doc = crate::extract::ExtractedDocument {
3530                                text: if result.text.is_empty() {
3531                                    None
3532                                } else {
3533                                    Some(result.text)
3534                                },
3535                                metadata: serde_json::json!({
3536                                    "skim": is_skim_extraction,
3537                                    "coverage": result.coverage,
3538                                    "sections_extracted": result.sections_extracted,
3539                                    "sections_total": result.sections_total,
3540                                }),
3541                                mime_type: mime_hint.map(std::string::ToString::to_string),
3542                            };
3543                            Some(doc)
3544                        }
3545                        Err(err) => {
3546                            // Fall back to full extraction on budgeted extraction error
3547                            tracing::warn!(
3548                                ?err,
3549                                "budgeted extraction failed, trying full extraction"
3550                            );
3551                            match extract_via_registry(bytes, mime_hint, uri_hint) {
3552                                Ok(doc) => Some(doc),
3553                                Err(err) => {
3554                                    extraction_error = Some(err);
3555                                    None
3556                                }
3557                            }
3558                        }
3559                    }
3560                } else {
3561                    // Full extraction (no time budget)
3562                    match extract_via_registry(bytes, mime_hint, uri_hint) {
3563                        Ok(doc) => Some(doc),
3564                        Err(err) => {
3565                            extraction_error = Some(err);
3566                            None
3567                        }
3568                    }
3569                }
3570            } else {
3571                None
3572            }
3573        } else {
3574            None
3575        };
3576
3577        if let Some(err) = extraction_error {
3578            return Err(err);
3579        }
3580
3581        if let Some(doc) = &extracted {
3582            if need_search_text {
3583                if let Some(text) = &doc.text {
3584                    if let Some(normalized) =
3585                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3586                    {
3587                        search_text = Some(normalized);
3588                    }
3589                }
3590            }
3591
3592            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3593            // from extracted text. This ensures large documents like PDFs get fully indexed.
3594            if chunk_plan.is_none() {
3595                if let Some(text) = &doc.text {
3596                    chunk_plan = plan_text_chunks(text);
3597                }
3598            }
3599
3600            if let Some(mime) = doc.mime_type.as_ref() {
3601                if let Some(existing) = &mut metadata {
3602                    if existing.mime.is_none() {
3603                        existing.mime = Some(mime.clone());
3604                    }
3605                } else {
3606                    let mut doc_meta = DocMetadata::default();
3607                    doc_meta.mime = Some(mime.clone());
3608                    metadata = Some(doc_meta);
3609                }
3610            }
3611
3612            // Only add extractous_metadata when auto_tag is enabled
3613            // This allows callers to disable metadata pollution by setting auto_tag=false
3614            if options.auto_tag {
3615                if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3616                {
3617                    extra_metadata
3618                        .entry("extractous_metadata".to_string())
3619                        .or_insert(meta_json);
3620                }
3621            }
3622        }
3623
3624        if options.auto_tag {
3625            if let Some(ref text) = search_text {
3626                if !text.trim().is_empty() {
3627                    let result = AutoTagger.analyse(text, options.extract_dates);
3628                    merge_unique(&mut tags, result.tags);
3629                    merge_unique(&mut labels, result.labels);
3630                    if options.extract_dates && content_dates.is_empty() {
3631                        content_dates = result.content_dates;
3632                    }
3633                }
3634            }
3635        }
3636
3637        if content_dates.is_empty() {
3638            if let Some(frame) = reuse_frame.as_ref() {
3639                content_dates = frame.content_dates.clone();
3640            }
3641        }
3642
3643        let metadata_ref = metadata.as_ref();
3644        let mut search_text = augment_search_text(
3645            search_text,
3646            options.uri.as_deref(),
3647            options.title.as_deref(),
3648            options.track.as_deref(),
3649            &tags,
3650            &labels,
3651            &extra_metadata,
3652            &content_dates,
3653            metadata_ref,
3654        );
3655        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3656        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3657        let mut parent_chunk_count: Option<u32> = None;
3658
3659        let kind_value = options.kind.take();
3660        let track_value = options.track.take();
3661        let uri_value = options.uri.take();
3662        let title_value = options.title.take();
3663        let should_extract_triplets = options.extract_triplets;
3664        // Save references for triplet extraction (after search_text is moved into WAL entry)
3665        let triplet_uri = uri_value.clone();
3666        let triplet_title = title_value.clone();
3667
3668        if let Some(plan) = chunk_plan.as_ref() {
3669            let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3670            parent_chunk_manifest = Some(plan.manifest.clone());
3671            parent_chunk_count = Some(chunk_total);
3672
3673            if let Some(first_chunk) = plan.chunks.first() {
3674                if let Some(normalized) =
3675                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3676                {
3677                    if !normalized.trim().is_empty() {
3678                        search_text = Some(normalized);
3679                    }
3680                }
3681            }
3682
3683            let chunk_tags = tags.clone();
3684            let chunk_labels = labels.clone();
3685            let chunk_metadata = metadata.clone();
3686            let chunk_extra_metadata = extra_metadata.clone();
3687            let chunk_content_dates = content_dates.clone();
3688
3689            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3690                let (chunk_payload, chunk_encoding, chunk_length) =
3691                    prepare_canonical_payload(chunk_text.as_bytes())?;
3692                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3693                    .map(|n| n.text)
3694                    .filter(|text| !text.trim().is_empty());
3695
3696                let chunk_uri = uri_value
3697                    .as_ref()
3698                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3699                let chunk_title = title_value
3700                    .as_ref()
3701                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3702
3703                // Use provided chunk embedding if available, otherwise None
3704                let chunk_embedding = chunk_embeddings
3705                    .as_ref()
3706                    .and_then(|embeddings| embeddings.get(idx).cloned());
3707
3708                chunk_entries.push(WalEntryData {
3709                    timestamp,
3710                    kind: kind_value.clone(),
3711                    track: track_value.clone(),
3712                    payload: chunk_payload,
3713                    embedding: chunk_embedding,
3714                    uri: chunk_uri,
3715                    title: chunk_title,
3716                    canonical_encoding: chunk_encoding,
3717                    canonical_length: chunk_length,
3718                    metadata: chunk_metadata.clone(),
3719                    search_text: chunk_search_text,
3720                    tags: chunk_tags.clone(),
3721                    labels: chunk_labels.clone(),
3722                    extra_metadata: chunk_extra_metadata.clone(),
3723                    content_dates: chunk_content_dates.clone(),
3724                    chunk_manifest: None,
3725                    role: FrameRole::DocumentChunk,
3726                    parent_sequence: None,
3727                    chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3728                    chunk_count: Some(chunk_total),
3729                    op: FrameWalOp::Insert,
3730                    target_frame_id: None,
3731                    supersedes_frame_id: None,
3732                    reuse_payload_from: None,
3733                    source_sha256: None, // Chunks don't have source references
3734                    source_path: None,
3735                    // Chunks are already extracted, so mark as Enriched
3736                    enrichment_state: crate::types::EnrichmentState::Enriched,
3737                });
3738            }
3739        }
3740
3741        let parent_uri = uri_value.clone();
3742        let parent_title = title_value.clone();
3743
3744        // Get parent_sequence from options.parent_id if provided
3745        // We need the WAL sequence of the parent frame to link them
3746        let parent_sequence = if let Some(parent_id) = options.parent_id {
3747            // Look up the parent frame to get its WAL sequence
3748            // Since frame.id corresponds to the array index, we need to find the sequence
3749            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3750            // This works because sequence numbers are assigned incrementally
3751            usize::try_from(parent_id)
3752                .ok()
3753                .and_then(|idx| self.toc.frames.get(idx))
3754                .map(|_| parent_id + 2) // WAL sequences start at 2
3755        } else {
3756            None
3757        };
3758
3759        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3760        let triplet_text = search_text.clone();
3761
3762        // Capture values needed for instant indexing BEFORE they're moved into entry
3763        #[cfg(feature = "lex")]
3764        let instant_index_tags = if options.instant_index {
3765            tags.clone()
3766        } else {
3767            Vec::new()
3768        };
3769        #[cfg(feature = "lex")]
3770        let instant_index_labels = if options.instant_index {
3771            labels.clone()
3772        } else {
3773            Vec::new()
3774        };
3775
3776        // Determine enrichment state: Searchable if needs background work, Enriched if complete
3777        #[cfg(feature = "lex")]
3778        let needs_enrichment =
3779            options.instant_index && (options.enable_embedding || is_skim_extraction);
3780        #[cfg(feature = "lex")]
3781        let enrichment_state = if needs_enrichment {
3782            crate::types::EnrichmentState::Searchable
3783        } else {
3784            crate::types::EnrichmentState::Enriched
3785        };
3786        #[cfg(not(feature = "lex"))]
3787        let enrichment_state = crate::types::EnrichmentState::Enriched;
3788
3789        let entry = WalEntryData {
3790            timestamp,
3791            kind: kind_value,
3792            track: track_value,
3793            payload: storage_payload,
3794            embedding,
3795            uri: parent_uri,
3796            title: parent_title,
3797            canonical_encoding,
3798            canonical_length,
3799            metadata,
3800            search_text,
3801            tags,
3802            labels,
3803            extra_metadata,
3804            content_dates,
3805            chunk_manifest: parent_chunk_manifest,
3806            role: options.role,
3807            parent_sequence,
3808            chunk_index: None,
3809            chunk_count: parent_chunk_count,
3810            op: FrameWalOp::Insert,
3811            target_frame_id: None,
3812            supersedes_frame_id: supersedes,
3813            reuse_payload_from,
3814            source_sha256,
3815            source_path: source_path_value,
3816            enrichment_state,
3817        };
3818
3819        let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3820        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3821        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3822
3823        // Instant indexing: make frame searchable immediately (<1s) without full commit
3824        // This is Phase 1 of progressive ingestion - frame is searchable but not fully enriched
3825        #[cfg(feature = "lex")]
3826        if options.instant_index && self.tantivy.is_some() {
3827            // Create a minimal frame for indexing
3828            let frame_id = parent_seq as FrameId;
3829
3830            // Use triplet_text which was cloned before entry was created
3831            if let Some(ref text) = triplet_text {
3832                if !text.trim().is_empty() {
3833                    // Create temporary frame for indexing (minimal fields for Tantivy)
3834                    let temp_frame = Frame {
3835                        id: frame_id,
3836                        timestamp,
3837                        anchor_ts: None,
3838                        anchor_source: None,
3839                        kind: options.kind.clone(),
3840                        track: options.track.clone(),
3841                        payload_offset: 0,
3842                        payload_length: 0,
3843                        checksum: [0u8; 32],
3844                        uri: options
3845                            .uri
3846                            .clone()
3847                            .or_else(|| Some(crate::default_uri(frame_id))),
3848                        title: options.title.clone(),
3849                        canonical_encoding: crate::types::CanonicalEncoding::default(),
3850                        canonical_length: None,
3851                        metadata: None, // Not needed for text search
3852                        search_text: triplet_text.clone(),
3853                        tags: instant_index_tags.clone(),
3854                        labels: instant_index_labels.clone(),
3855                        extra_metadata: std::collections::BTreeMap::new(), // Not needed for search
3856                        content_dates: Vec::new(),                         // Not needed for search
3857                        chunk_manifest: None,
3858                        role: options.role,
3859                        parent_id: None,
3860                        chunk_index: None,
3861                        chunk_count: None,
3862                        status: FrameStatus::Active,
3863                        supersedes,
3864                        superseded_by: None,
3865                        source_sha256: None, // Not needed for search
3866                        source_path: None,   // Not needed for search
3867                        enrichment_state: crate::types::EnrichmentState::Searchable,
3868                    };
3869
3870                    // Get mutable reference to engine and index the frame
3871                    if let Some(engine) = self.tantivy.as_mut() {
3872                        engine.add_frame(&temp_frame, text)?;
3873                        engine.soft_commit()?;
3874                        self.tantivy_dirty = true;
3875
3876                        tracing::debug!(
3877                            frame_id = frame_id,
3878                            "instant index: frame searchable immediately"
3879                        );
3880                    }
3881                }
3882            }
3883        }
3884
3885        // Queue frame for background enrichment when using instant index path
3886        // Enrichment includes: embedding generation, full text re-extraction if time-limited
3887        // Note: enrichment_state is already set in the WAL entry, so it will be correct after replay
3888        #[cfg(feature = "lex")]
3889        if needs_enrichment {
3890            let frame_id = parent_seq as FrameId;
3891            self.toc.enrichment_queue.push(frame_id);
3892            tracing::debug!(
3893                frame_id = frame_id,
3894                is_skim = is_skim_extraction,
3895                needs_embedding = options.enable_embedding,
3896                "queued frame for background enrichment"
3897            );
3898        }
3899
3900        for mut chunk_entry in chunk_entries {
3901            chunk_entry.parent_sequence = Some(parent_seq);
3902            let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3903            self.append_wal_entry(&chunk_bytes)?;
3904            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3905        }
3906
3907        self.dirty = true;
3908        let suppress_checkpoint = self
3909            .batch_opts
3910            .as_ref()
3911            .is_some_and(|o| o.disable_auto_checkpoint);
3912        if !suppress_checkpoint && self.wal.should_checkpoint() {
3913            self.commit()?;
3914        }
3915
3916        // Record the put action if a replay session is active
3917        #[cfg(feature = "replay")]
3918        if let Some(input_bytes) = payload {
3919            self.record_put_action(parent_seq, input_bytes);
3920        }
3921
3922        // Extract triplets if enabled (default: true)
3923        // Triplets are stored as MemoryCards with entity/slot/value structure
3924        if should_extract_triplets {
3925            if let Some(ref text) = triplet_text {
3926                if !text.trim().is_empty() {
3927                    let extractor = TripletExtractor::default();
3928                    let frame_id = parent_seq as FrameId;
3929                    let (cards, _stats) = extractor.extract(
3930                        frame_id,
3931                        text,
3932                        triplet_uri.as_deref(),
3933                        triplet_title.as_deref(),
3934                        timestamp,
3935                    );
3936
3937                    if !cards.is_empty() {
3938                        // Add cards to memories track
3939                        let card_ids = self.memories_track.add_cards(cards);
3940
3941                        // Record enrichment for incremental processing
3942                        self.memories_track
3943                            .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3944                    }
3945                }
3946            }
3947        }
3948
3949        Ok(parent_seq)
3950    }
3951}
3952
3953#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3954#[serde(rename_all = "snake_case")]
3955pub(crate) enum FrameWalOp {
3956    Insert,
3957    Tombstone,
3958}
3959
3960impl Default for FrameWalOp {
3961    fn default() -> Self {
3962        Self::Insert
3963    }
3964}
3965
3966#[derive(Debug, Serialize, Deserialize)]
3967enum WalEntry {
3968    Frame(WalEntryData),
3969    #[cfg(feature = "lex")]
3970    Lex(LexWalBatch),
3971}
3972
3973fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3974    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3975        return Ok(entry);
3976    }
3977    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3978    Ok(WalEntry::Frame(legacy))
3979}
3980
3981#[derive(Debug, Serialize, Deserialize)]
3982pub(crate) struct WalEntryData {
3983    pub(crate) timestamp: i64,
3984    pub(crate) kind: Option<String>,
3985    pub(crate) track: Option<String>,
3986    pub(crate) payload: Vec<u8>,
3987    pub(crate) embedding: Option<Vec<f32>>,
3988    #[serde(default)]
3989    pub(crate) uri: Option<String>,
3990    #[serde(default)]
3991    pub(crate) title: Option<String>,
3992    #[serde(default)]
3993    pub(crate) canonical_encoding: CanonicalEncoding,
3994    #[serde(default)]
3995    pub(crate) canonical_length: Option<u64>,
3996    #[serde(default)]
3997    pub(crate) metadata: Option<DocMetadata>,
3998    #[serde(default)]
3999    pub(crate) search_text: Option<String>,
4000    #[serde(default)]
4001    pub(crate) tags: Vec<String>,
4002    #[serde(default)]
4003    pub(crate) labels: Vec<String>,
4004    #[serde(default)]
4005    pub(crate) extra_metadata: BTreeMap<String, String>,
4006    #[serde(default)]
4007    pub(crate) content_dates: Vec<String>,
4008    #[serde(default)]
4009    pub(crate) chunk_manifest: Option<TextChunkManifest>,
4010    #[serde(default)]
4011    pub(crate) role: FrameRole,
4012    #[serde(default)]
4013    pub(crate) parent_sequence: Option<u64>,
4014    #[serde(default)]
4015    pub(crate) chunk_index: Option<u32>,
4016    #[serde(default)]
4017    pub(crate) chunk_count: Option<u32>,
4018    #[serde(default)]
4019    pub(crate) op: FrameWalOp,
4020    #[serde(default)]
4021    pub(crate) target_frame_id: Option<FrameId>,
4022    #[serde(default)]
4023    pub(crate) supersedes_frame_id: Option<FrameId>,
4024    #[serde(default)]
4025    pub(crate) reuse_payload_from: Option<FrameId>,
4026    /// SHA-256 hash of original source file (set when --no-raw is used).
4027    #[serde(default)]
4028    pub(crate) source_sha256: Option<[u8; 32]>,
4029    /// Original source file path (set when --no-raw is used).
4030    #[serde(default)]
4031    pub(crate) source_path: Option<String>,
4032    /// Enrichment state for progressive ingestion.
4033    #[serde(default)]
4034    pub(crate) enrichment_state: crate::types::EnrichmentState,
4035}
4036
4037pub(crate) fn prepare_canonical_payload(
4038    payload: &[u8],
4039) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4040    prepare_canonical_payload_with_level(payload, 3)
4041}
4042
4043pub(crate) fn prepare_canonical_payload_with_level(
4044    payload: &[u8],
4045    level: i32,
4046) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4047    if level == 0 {
4048        // No compression — store as plain text
4049        return Ok((
4050            payload.to_vec(),
4051            CanonicalEncoding::Plain,
4052            Some(payload.len() as u64),
4053        ));
4054    }
4055    if std::str::from_utf8(payload).is_ok() {
4056        let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4057        Ok((
4058            compressed,
4059            CanonicalEncoding::Zstd,
4060            Some(payload.len() as u64),
4061        ))
4062    } else {
4063        Ok((
4064            payload.to_vec(),
4065            CanonicalEncoding::Plain,
4066            Some(payload.len() as u64),
4067        ))
4068    }
4069}
4070
4071pub(crate) fn augment_search_text(
4072    base: Option<String>,
4073    uri: Option<&str>,
4074    title: Option<&str>,
4075    track: Option<&str>,
4076    tags: &[String],
4077    labels: &[String],
4078    extra_metadata: &BTreeMap<String, String>,
4079    content_dates: &[String],
4080    metadata: Option<&DocMetadata>,
4081) -> Option<String> {
4082    let mut segments: Vec<String> = Vec::new();
4083    if let Some(text) = base {
4084        let trimmed = text.trim();
4085        if !trimmed.is_empty() {
4086            segments.push(trimmed.to_string());
4087        }
4088    }
4089
4090    if let Some(title) = title {
4091        if !title.trim().is_empty() {
4092            segments.push(format!("title: {}", title.trim()));
4093        }
4094    }
4095
4096    if let Some(uri) = uri {
4097        if !uri.trim().is_empty() {
4098            segments.push(format!("uri: {}", uri.trim()));
4099        }
4100    }
4101
4102    if let Some(track) = track {
4103        if !track.trim().is_empty() {
4104            segments.push(format!("track: {}", track.trim()));
4105        }
4106    }
4107
4108    if !tags.is_empty() {
4109        segments.push(format!("tags: {}", tags.join(" ")));
4110    }
4111
4112    if !labels.is_empty() {
4113        segments.push(format!("labels: {}", labels.join(" ")));
4114    }
4115
4116    if !extra_metadata.is_empty() {
4117        for (key, value) in extra_metadata {
4118            if value.trim().is_empty() {
4119                continue;
4120            }
4121            segments.push(format!("{key}: {value}"));
4122        }
4123    }
4124
4125    if !content_dates.is_empty() {
4126        segments.push(format!("dates: {}", content_dates.join(" ")));
4127    }
4128
4129    if let Some(meta) = metadata {
4130        if let Ok(meta_json) = serde_json::to_string(meta) {
4131            segments.push(format!("metadata: {meta_json}"));
4132        }
4133    }
4134
4135    if segments.is_empty() {
4136        None
4137    } else {
4138        Some(segments.join("\n"))
4139    }
4140}
4141
4142pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4143    if additions.is_empty() {
4144        return;
4145    }
4146    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4147    for value in additions {
4148        let trimmed = value.trim();
4149        if trimmed.is_empty() {
4150            continue;
4151        }
4152        let candidate = trimmed.to_string();
4153        if seen.insert(candidate.clone()) {
4154            target.push(candidate);
4155        }
4156    }
4157}