memvid_core/memvid/
mutation.rs

1//! Frame mutation and ingestion routines for `Memvid`.
2//!
3//! Owns the ingest pipeline: bytes/documents → extraction → chunking → metadata/temporal tags
4//! → WAL entries → manifest/index updates. This module keeps mutations crash-safe and
5//! deterministic; no bytes touch the data region until the embedded WAL is flushed during commit.
6//!
7//! The long-term structure will split into ingestion/chunking/WAL staging modules. For now
8//! everything lives here, grouped by section so the pipeline is easy to scan.
9
10use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35    builder::BuildOpts,
36    planner::{SegmentChunkPlan, SegmentPlanner},
37    workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48    DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49    ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53#[cfg(feature = "lex")]
54use crate::types::TantivySegmentDescriptor;
55use crate::types::{
56    CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
57    SegmentCommon, TextChunkManifest, Tier,
58};
59#[cfg(feature = "parallel_segments")]
60use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
61#[cfg(feature = "temporal_track")]
62use crate::{
63    AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
64    TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
65    TemporalResolutionValue,
66};
67use crate::triplet::TripletExtractor;
68use crate::{
69    DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70    TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84    "today",
85    "yesterday",
86    "tomorrow",
87    "two days ago",
88    "in 3 days",
89    "two weeks from now",
90    "2 weeks from now",
91    "two fridays ago",
92    "last friday",
93    "next friday",
94    "this friday",
95    "next week",
96    "last week",
97    "end of this month",
98    "start of next month",
99    "last month",
100    "3 months ago",
101    "in 90 minutes",
102    "at 5pm today",
103    "in the last 24 hours",
104    "this morning",
105    "on the sunday after next",
106    "next daylight saving change",
107    "midnight tomorrow",
108    "noon next tuesday",
109    "first business day of next month",
110    "the first business day of next month",
111    "end of q3",
112    "next wednesday at 9",
113    "sunday at 1:30am",
114    "monday",
115    "tuesday",
116    "wednesday",
117    "thursday",
118    "friday",
119    "saturday",
120    "sunday",
121];
122
123struct CommitStaging {
124    atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128    fn prepare(path: &Path) -> Result<Self> {
129        let mut options = AtomicWriteFile::options();
130        options.read(true);
131        let atomic = options.open(path)?;
132        Ok(Self { atomic })
133    }
134
135    fn copy_from(&mut self, source: &File) -> Result<()> {
136        let mut reader = source.try_clone()?;
137        reader.seek(SeekFrom::Start(0))?;
138
139        let writer = self.atomic.as_file_mut();
140        writer.set_len(0)?;
141        writer.seek(SeekFrom::Start(0))?;
142        std::io::copy(&mut reader, writer)?;
143        writer.flush()?;
144        writer.sync_all()?;
145        Ok(())
146    }
147
148    fn clone_file(&self) -> Result<File> {
149        Ok(self.atomic.as_file().try_clone()?)
150    }
151
152    fn commit(self) -> Result<()> {
153        self.atomic.commit().map_err(Into::into)
154    }
155
156    fn discard(self) -> Result<()> {
157        self.atomic.discard().map_err(Into::into)
158    }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163    inserted_frames: Vec<FrameId>,
164    inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165    inserted_time_entries: Vec<TimeIndexEntry>,
166    mutated_frames: bool,
167    #[cfg(feature = "temporal_track")]
168    inserted_temporal_mentions: Vec<TemporalMention>,
169    #[cfg(feature = "temporal_track")]
170    inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174    fn is_empty(&self) -> bool {
175        #[allow(unused_mut)]
176        let mut empty = self.inserted_frames.is_empty()
177            && self.inserted_embeddings.is_empty()
178            && self.inserted_time_entries.is_empty()
179            && !self.mutated_frames;
180        #[cfg(feature = "temporal_track")]
181        {
182            empty = empty
183                && self.inserted_temporal_mentions.is_empty()
184                && self.inserted_temporal_anchors.is_empty();
185        }
186        empty
187    }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192    Full,
193    Incremental,
194}
195
196impl Default for CommitMode {
197    fn default() -> Self {
198        Self::Full
199    }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204    pub mode: CommitMode,
205    pub background: bool,
206}
207
208impl CommitOptions {
209    pub fn new(mode: CommitMode) -> Self {
210        Self {
211            mode,
212            background: false,
213        }
214    }
215
216    pub fn background(mut self, background: bool) -> Self {
217        self.background = background;
218        self
219    }
220}
221
222fn default_reader_registry() -> &'static ReaderRegistry {
223    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
224    REGISTRY.get_or_init(ReaderRegistry::default)
225}
226
227fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
228    if detect_pdf_magic(magic) {
229        return Some(DocumentFormat::Pdf);
230    }
231
232    let mime = mime?.trim().to_ascii_lowercase();
233    match mime.as_str() {
234        "application/pdf" => Some(DocumentFormat::Pdf),
235        "text/plain" => Some(DocumentFormat::PlainText),
236        "text/markdown" => Some(DocumentFormat::Markdown),
237        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
238        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
239            Some(DocumentFormat::Docx)
240        }
241        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
242            Some(DocumentFormat::Xlsx)
243        }
244        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
245            Some(DocumentFormat::Pptx)
246        }
247        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
248        _ => None,
249    }
250}
251
252fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
253    let mut slice = match magic {
254        Some(slice) if !slice.is_empty() => slice,
255        _ => return false,
256    };
257    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
258        slice = &slice[3..];
259    }
260    while let Some((first, rest)) = slice.split_first() {
261        if first.is_ascii_whitespace() {
262            slice = rest;
263        } else {
264            break;
265        }
266    }
267    slice.starts_with(b"%PDF")
268}
269
270#[instrument(
271    target = "memvid::extract",
272    skip_all,
273    fields(mime = mime_hint, uri = uri)
274)]
275fn extract_via_registry(
276    bytes: &[u8],
277    mime_hint: Option<&str>,
278    uri: Option<&str>,
279) -> Result<ExtractedDocument> {
280    let registry = default_reader_registry();
281    let magic = bytes
282        .get(..MAGIC_SNIFF_BYTES)
283        .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
284    let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic))
285        .with_uri(uri)
286        .with_magic(magic);
287
288    let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
289        let start = Instant::now();
290        match reader.extract(bytes, &hint) {
291            Ok(output) => {
292                return Ok(finalize_reader_output(output, start));
293            }
294            Err(err) => {
295                tracing::error!(
296                    target = "memvid::extract",
297                    reader = reader.name(),
298                    error = %err,
299                    "reader failed; falling back"
300                );
301                Some(format!("reader {} failed: {err}", reader.name()))
302            }
303        }
304    } else {
305        tracing::debug!(
306            target = "memvid::extract",
307            format = hint.format.map(|f| f.label()),
308            "no reader matched; using default extractor"
309        );
310        Some("no registered reader matched; using default extractor".to_string())
311    };
312
313    let start = Instant::now();
314    let mut output = PassthroughReader.extract(bytes, &hint)?;
315    if let Some(reason) = fallback_reason {
316        output.diagnostics.track_warning(reason);
317    }
318    Ok(finalize_reader_output(output, start))
319}
320
321fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
322    let elapsed = start.elapsed();
323    let ReaderOutput {
324        document,
325        reader_name,
326        diagnostics,
327    } = output;
328    log_reader_result(&reader_name, &diagnostics, elapsed);
329    document
330}
331
332fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
333    let duration_ms = diagnostics
334        .duration_ms
335        .unwrap_or_else(|| elapsed.as_millis() as u64);
336    let warnings = diagnostics.warnings.len();
337    let pages = diagnostics.pages_processed;
338
339    if warnings > 0 || diagnostics.fallback {
340        tracing::warn!(
341            target = "memvid::extract",
342            reader,
343            duration_ms,
344            pages,
345            warnings,
346            fallback = diagnostics.fallback,
347            "extraction completed with warnings"
348        );
349        for warning in diagnostics.warnings.iter() {
350            tracing::warn!(target = "memvid::extract", reader, warning = %warning);
351        }
352    } else {
353        tracing::info!(
354            target = "memvid::extract",
355            reader,
356            duration_ms,
357            pages,
358            "extraction completed"
359        );
360    }
361}
362
363impl Memvid {
364    // -- Public ingestion entrypoints ---------------------------------------------------------
365
366    fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
367    where
368        F: FnOnce(&mut Self) -> Result<()>,
369    {
370        self.file.sync_all()?;
371        let mut staging = CommitStaging::prepare(self.path())?;
372        staging.copy_from(&self.file)?;
373
374        let staging_handle = staging.clone_file()?;
375        let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
376        let original_file = std::mem::replace(&mut self.file, staging_handle);
377        let original_wal = std::mem::replace(&mut self.wal, new_wal);
378        let original_header = self.header.clone();
379        let original_toc = self.toc.clone();
380        let original_data_end = self.data_end;
381        let original_generation = self.generation;
382        let original_dirty = self.dirty;
383        #[cfg(feature = "lex")]
384        let original_tantivy_dirty = self.tantivy_dirty;
385
386        let destination_path = self.path().to_path_buf();
387        let mut original_file = Some(original_file);
388        let mut original_wal = Some(original_wal);
389
390        match op(self) {
391            Ok(()) => {
392                self.file.sync_all()?;
393                match staging.commit() {
394                    Ok(()) => {
395                        drop(original_file.take());
396                        drop(original_wal.take());
397                        self.file = OpenOptions::new()
398                            .read(true)
399                            .write(true)
400                            .open(&destination_path)?;
401                        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
402                        Ok(())
403                    }
404                    Err(commit_err) => {
405                        if let Some(file) = original_file.take() {
406                            self.file = file;
407                        }
408                        if let Some(wal) = original_wal.take() {
409                            self.wal = wal;
410                        }
411                        self.header = original_header;
412                        self.toc = original_toc;
413                        self.data_end = original_data_end;
414                        self.generation = original_generation;
415                        self.dirty = original_dirty;
416                        #[cfg(feature = "lex")]
417                        {
418                            self.tantivy_dirty = original_tantivy_dirty;
419                        }
420                        Err(commit_err.into())
421                    }
422                }
423            }
424            Err(err) => {
425                let _ = staging.discard();
426                if let Some(file) = original_file.take() {
427                    self.file = file;
428                }
429                if let Some(wal) = original_wal.take() {
430                    self.wal = wal;
431                }
432                self.header = original_header;
433                self.toc = original_toc;
434                self.data_end = original_data_end;
435                self.generation = original_generation;
436                self.dirty = original_dirty;
437                #[cfg(feature = "lex")]
438                {
439                    self.tantivy_dirty = original_tantivy_dirty;
440                }
441                Err(err)
442            }
443        }
444    }
445
446    pub(crate) fn catalog_data_end(&self) -> u64 {
447        let mut max_end = self.header.wal_offset + self.header.wal_size;
448
449        for descriptor in &self.toc.segment_catalog.lex_segments {
450            if descriptor.common.bytes_length == 0 {
451                continue;
452            }
453            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
454        }
455
456        for descriptor in &self.toc.segment_catalog.vec_segments {
457            if descriptor.common.bytes_length == 0 {
458                continue;
459            }
460            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
461        }
462
463        for descriptor in &self.toc.segment_catalog.time_segments {
464            if descriptor.common.bytes_length == 0 {
465                continue;
466            }
467            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
468        }
469
470        #[cfg(feature = "temporal_track")]
471        for descriptor in &self.toc.segment_catalog.temporal_segments {
472            if descriptor.common.bytes_length == 0 {
473                continue;
474            }
475            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
476        }
477
478        #[cfg(feature = "lex")]
479        for descriptor in &self.toc.segment_catalog.tantivy_segments {
480            if descriptor.common.bytes_length == 0 {
481                continue;
482            }
483            max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
484        }
485
486        if let Some(manifest) = self.toc.indexes.lex.as_ref() {
487            if manifest.bytes_length != 0 {
488                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
489            }
490        }
491
492        if let Some(manifest) = self.toc.indexes.vec.as_ref() {
493            if manifest.bytes_length != 0 {
494                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
495            }
496        }
497
498        if let Some(manifest) = self.toc.time_index.as_ref() {
499            if manifest.bytes_length != 0 {
500                max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
501            }
502        }
503
504        #[cfg(feature = "temporal_track")]
505        if let Some(track) = self.toc.temporal_track.as_ref() {
506            if track.bytes_length != 0 {
507                max_end = max_end.max(track.bytes_offset + track.bytes_length);
508            }
509        }
510
511        max_end
512    }
513
514    fn payload_region_end(&self) -> u64 {
515        let wal_region_end = self.header.wal_offset + self.header.wal_size;
516        let frames_with_payload: Vec<_> = self
517            .toc
518            .frames
519            .iter()
520            .filter(|frame| frame.payload_length != 0)
521            .collect();
522
523        tracing::info!(
524            "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
525            frames_with_payload.len(),
526            self.toc.frames.len(),
527            wal_region_end
528        );
529
530        for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
531            tracing::info!(
532                "  frame[{}]: id={} offset={} length={} status={:?}",
533                idx,
534                frame.id,
535                frame.payload_offset,
536                frame.payload_length,
537                frame.status
538            );
539        }
540
541        let result = frames_with_payload
542            .iter()
543            .fold(wal_region_end, |max_end, frame| {
544                match frame.payload_offset.checked_add(frame.payload_length) {
545                    Some(end) => max_end.max(end),
546                    None => max_end,
547                }
548            });
549
550        tracing::info!("payload_region_end: returning {}", result);
551        result
552    }
553
554    fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
555        loop {
556            match self.wal.append_entry(payload) {
557                Ok(seq) => return Ok(seq),
558                Err(MemvidError::CheckpointFailed { reason })
559                    if reason == "embedded WAL region too small for entry" =>
560                {
561                    let required = WAL_ENTRY_HEADER_SIZE
562                        .saturating_add(payload.len() as u64)
563                        .max(self.header.wal_size + 1);
564                    self.grow_wal_region(required)?;
565                }
566                Err(err) => return Err(err),
567            }
568        }
569    }
570
571    fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
572        let mut new_size = self.header.wal_size;
573        let mut target = required_entry_size;
574        if target == 0 {
575            target = self.header.wal_size;
576        }
577        while new_size <= target {
578            new_size = new_size
579                .checked_mul(2)
580                .ok_or_else(|| MemvidError::CheckpointFailed {
581                    reason: "wal_size overflow".into(),
582                })?;
583        }
584        let delta = new_size - self.header.wal_size;
585        if delta == 0 {
586            return Ok(());
587        }
588
589        self.shift_data_for_wal_growth(delta)?;
590        self.header.wal_size = new_size;
591        self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
592        self.data_end = self.data_end.saturating_add(delta);
593        self.adjust_offsets_after_wal_growth(delta);
594
595        let catalog_end = self.catalog_data_end();
596        self.header.footer_offset = catalog_end
597            .max(self.header.footer_offset)
598            .max(self.data_end);
599
600        self.rewrite_toc_footer()?;
601        self.header.toc_checksum = self.toc.toc_checksum;
602        crate::persist_header(&mut self.file, &self.header)?;
603        self.file.sync_all()?;
604        self.wal = EmbeddedWal::open(&self.file, &self.header)?;
605        Ok(())
606    }
607
608    fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
609        if delta == 0 {
610            return Ok(());
611        }
612        let original_len = self.file.metadata()?.len();
613        let data_start = self.header.wal_offset + self.header.wal_size;
614        self.file.set_len(original_len + delta)?;
615
616        let mut remaining = original_len.saturating_sub(data_start);
617        let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
618        while remaining > 0 {
619            let chunk = min(remaining, buffer.len() as u64);
620            let src = data_start + remaining - chunk;
621            self.file.seek(SeekFrom::Start(src))?;
622            self.file.read_exact(&mut buffer[..chunk as usize])?;
623            let dst = src + delta;
624            self.file.seek(SeekFrom::Start(dst))?;
625            self.file.write_all(&buffer[..chunk as usize])?;
626            remaining -= chunk;
627        }
628
629        self.file.seek(SeekFrom::Start(data_start))?;
630        let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
631        let mut remaining = delta;
632        while remaining > 0 {
633            let write = min(remaining, zero_buf.len() as u64);
634            self.file.write_all(&zero_buf[..write as usize])?;
635            remaining -= write;
636        }
637        Ok(())
638    }
639
640    fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
641        if delta == 0 {
642            return;
643        }
644
645        for frame in &mut self.toc.frames {
646            if frame.payload_offset != 0 {
647                frame.payload_offset += delta;
648            }
649        }
650
651        for segment in &mut self.toc.segments {
652            if segment.bytes_offset != 0 {
653                segment.bytes_offset += delta;
654            }
655        }
656
657        if let Some(lex) = self.toc.indexes.lex.as_mut() {
658            if lex.bytes_offset != 0 {
659                lex.bytes_offset += delta;
660            }
661        }
662        for manifest in &mut self.toc.indexes.lex_segments {
663            if manifest.bytes_offset != 0 {
664                manifest.bytes_offset += delta;
665            }
666        }
667        if let Some(vec) = self.toc.indexes.vec.as_mut() {
668            if vec.bytes_offset != 0 {
669                vec.bytes_offset += delta;
670            }
671        }
672        if let Some(time_index) = self.toc.time_index.as_mut() {
673            if time_index.bytes_offset != 0 {
674                time_index.bytes_offset += delta;
675            }
676        }
677        #[cfg(feature = "temporal_track")]
678        if let Some(track) = self.toc.temporal_track.as_mut() {
679            if track.bytes_offset != 0 {
680                track.bytes_offset += delta;
681            }
682        }
683
684        let catalog = &mut self.toc.segment_catalog;
685        for descriptor in &mut catalog.lex_segments {
686            if descriptor.common.bytes_offset != 0 {
687                descriptor.common.bytes_offset += delta;
688            }
689        }
690        for descriptor in &mut catalog.vec_segments {
691            if descriptor.common.bytes_offset != 0 {
692                descriptor.common.bytes_offset += delta;
693            }
694        }
695        for descriptor in &mut catalog.time_segments {
696            if descriptor.common.bytes_offset != 0 {
697                descriptor.common.bytes_offset += delta;
698            }
699        }
700        #[cfg(feature = "temporal_track")]
701        for descriptor in &mut catalog.temporal_segments {
702            if descriptor.common.bytes_offset != 0 {
703                descriptor.common.bytes_offset += delta;
704            }
705        }
706        for descriptor in &mut catalog.tantivy_segments {
707            if descriptor.common.bytes_offset != 0 {
708                descriptor.common.bytes_offset += delta;
709            }
710        }
711
712        #[cfg(feature = "lex")]
713        if let Ok(mut storage) = self.lex_storage.write() {
714            storage.adjust_offsets(delta);
715        }
716    }
717    pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
718        self.ensure_writable()?;
719        if options.background {
720            tracing::debug!("commit background flag ignored; running synchronously");
721        }
722        let mode = options.mode;
723        let records = self.wal.pending_records()?;
724        if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
725            return Ok(());
726        }
727        self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
728    }
729
730    pub fn commit(&mut self) -> Result<()> {
731        self.ensure_writable()?;
732        self.commit_with_options(CommitOptions::new(CommitMode::Full))
733    }
734
735    fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
736        self.generation = self.generation.wrapping_add(1);
737
738        let delta = self.apply_records(records)?;
739        let mut indexes_rebuilt = false;
740
741        // Check if CLIP index has pending embeddings that need to be persisted
742        let clip_needs_persist = self.clip_index.as_ref().map_or(false, |idx| !idx.is_empty());
743
744        if !delta.is_empty() || clip_needs_persist {
745            tracing::debug!(
746                inserted_frames = delta.inserted_frames.len(),
747                inserted_embeddings = delta.inserted_embeddings.len(),
748                inserted_time_entries = delta.inserted_time_entries.len(),
749                clip_needs_persist = clip_needs_persist,
750                "commit applied delta"
751            );
752            self.rebuild_indexes(&delta.inserted_embeddings)?;
753            indexes_rebuilt = true;
754        }
755
756        if !indexes_rebuilt && self.tantivy_index_pending() {
757            self.flush_tantivy()?;
758        }
759
760        // Persist CLIP index if it has embeddings and wasn't already persisted by rebuild_indexes
761        if !indexes_rebuilt && self.clip_enabled {
762            if let Some(ref clip_index) = self.clip_index {
763                if !clip_index.is_empty() {
764                    self.persist_clip_index()?;
765                }
766            }
767        }
768
769        // Persist memories track if it has cards and wasn't already persisted by rebuild_indexes
770        if !indexes_rebuilt && self.memories_track.card_count() > 0 {
771            self.persist_memories_track()?;
772        }
773
774        // Persist logic mesh if it has nodes and wasn't already persisted by rebuild_indexes
775        if !indexes_rebuilt && !self.logic_mesh.is_empty() {
776            self.persist_logic_mesh()?;
777        }
778
779        // flush_tantivy() and rebuild_indexes() have already set footer_offset correctly.
780        // DO NOT overwrite it with catalog_data_end() as that would include orphaned segments.
781
782        self.rewrite_toc_footer()?;
783        self.header.toc_checksum = self.toc.toc_checksum;
784        self.wal.record_checkpoint(&mut self.header)?;
785        self.header.toc_checksum = self.toc.toc_checksum;
786        crate::persist_header(&mut self.file, &self.header)?;
787        self.file.sync_all()?;
788        #[cfg(feature = "parallel_segments")]
789        if let Some(wal) = self.manifest_wal.as_mut() {
790            wal.flush()?;
791            wal.truncate()?;
792        }
793        self.pending_frame_inserts = 0;
794        self.dirty = false;
795        Ok(())
796    }
797
798    #[cfg(feature = "parallel_segments")]
799    pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
800        self.ensure_writable()?;
801        if !self.dirty && !self.tantivy_index_pending() {
802            return Ok(());
803        }
804        let opts = opts.clone();
805        self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
806    }
807
808    #[cfg(feature = "parallel_segments")]
809    fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
810        if !self.dirty && !self.tantivy_index_pending() {
811            return Ok(());
812        }
813        let records = self.wal.pending_records()?;
814        let delta = self.apply_records(records)?;
815        self.generation = self.generation.wrapping_add(1);
816        let mut indexes_rebuilt = false;
817        if !delta.is_empty() {
818            tracing::info!(
819                inserted_frames = delta.inserted_frames.len(),
820                inserted_embeddings = delta.inserted_embeddings.len(),
821                inserted_time_entries = delta.inserted_time_entries.len(),
822                "parallel commit applied delta"
823            );
824            // Try to use parallel segment builder first
825            let used_parallel = self.publish_parallel_delta(&delta, opts)?;
826            tracing::info!(
827                "parallel_commit: used_parallel={}, lex_enabled={}",
828                used_parallel,
829                self.lex_enabled
830            );
831            if used_parallel {
832                // Segments were written at data_end; update footer_offset so
833                // rewrite_toc_footer places the TOC after the new segment data
834                self.header.footer_offset = self.data_end;
835                indexes_rebuilt = true;
836
837                // BUGFIX: Parallel segments writes lex_segments (FST format) but search
838                // uses tantivy_segments. We need to also build the Tantivy index so search works.
839                #[cfg(feature = "lex")]
840                if self.lex_enabled {
841                    tracing::info!(
842                        "parallel_commit: rebuilding Tantivy index, data_end={}, footer_offset={}, frames={}",
843                        self.data_end,
844                        self.header.footer_offset,
845                        self.toc.frames.len()
846                    );
847
848                    // Clear stale tantivy segments before rebuilding
849                    self.toc.segment_catalog.tantivy_segments.clear();
850                    self.toc.indexes.lex_segments.clear();
851
852                    // Clear embedded storage to avoid stale data
853                    if let Ok(mut storage) = self.lex_storage.write() {
854                        storage.clear();
855                        storage.set_generation(0);
856                    }
857
858                    // Initialize Tantivy engine and rebuild from all frames
859                    self.init_tantivy()?;
860                    if let Some(mut engine) = self.tantivy.take() {
861                        let rebuilt = self.rebuild_tantivy_engine(&mut engine)?;
862                        let num_docs = engine.num_docs();
863                        tracing::info!(
864                            "parallel_commit: Tantivy rebuilt={}, num_docs={}",
865                            rebuilt,
866                            num_docs
867                        );
868                        self.tantivy = Some(engine);
869                        self.tantivy_dirty = true;
870                    } else {
871                        tracing::warn!(
872                            "parallel_commit: Tantivy engine is None after init_tantivy"
873                        );
874                    }
875                }
876
877                // BUGFIX: Parallel segments does not update the global time_index manifest.
878                // We need to rebuild it so timeline queries work correctly.
879                // First, seek to data_end to ensure we write at the correct position.
880                self.file.seek(SeekFrom::Start(self.data_end))?;
881                let mut time_entries: Vec<TimeIndexEntry> = self
882                    .toc
883                    .frames
884                    .iter()
885                    .filter(|frame| {
886                        frame.status == FrameStatus::Active && frame.role == FrameRole::Document
887                    })
888                    .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
889                    .collect();
890                let (ti_offset, ti_length, ti_checksum) =
891                    time_index_append(&mut self.file, &mut time_entries)?;
892                self.toc.time_index = Some(TimeIndexManifest {
893                    bytes_offset: ti_offset,
894                    bytes_length: ti_length,
895                    entry_count: time_entries.len() as u64,
896                    checksum: ti_checksum,
897                });
898                // Update data_end to account for the newly written time index
899                self.data_end = ti_offset + ti_length;
900                self.header.footer_offset = self.data_end;
901                tracing::info!(
902                    "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
903                    ti_offset,
904                    ti_length,
905                    time_entries.len()
906                );
907            } else {
908                // Fall back to sequential rebuild if no segments were generated
909                self.rebuild_indexes(&delta.inserted_embeddings)?;
910                indexes_rebuilt = true;
911            }
912        }
913
914        // Flush Tantivy index if dirty (from parallel path or pending updates)
915        if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
916            self.flush_tantivy()?;
917        }
918
919        // Persist CLIP index if it has embeddings
920        if self.clip_enabled {
921            if let Some(ref clip_index) = self.clip_index {
922                if !clip_index.is_empty() {
923                    self.persist_clip_index()?;
924                }
925            }
926        }
927
928        // Persist memories track if it has cards
929        if self.memories_track.card_count() > 0 {
930            self.persist_memories_track()?;
931        }
932
933        // Persist logic mesh if it has nodes
934        if !self.logic_mesh.is_empty() {
935            self.persist_logic_mesh()?;
936        }
937
938        // flush_tantivy() has already set footer_offset correctly
939        // DO NOT overwrite with catalog_data_end()
940        self.rewrite_toc_footer()?;
941        self.header.toc_checksum = self.toc.toc_checksum;
942        self.wal.record_checkpoint(&mut self.header)?;
943        self.header.toc_checksum = self.toc.toc_checksum;
944        crate::persist_header(&mut self.file, &self.header)?;
945        self.file.sync_all()?;
946        if let Some(wal) = self.manifest_wal.as_mut() {
947            wal.flush()?;
948            wal.truncate()?;
949        }
950        self.pending_frame_inserts = 0;
951        self.dirty = false;
952        Ok(())
953    }
954
955    pub(crate) fn recover_wal(&mut self) -> Result<()> {
956        let records = self.wal.records_after(self.header.wal_sequence)?;
957        if records.is_empty() {
958            if self.tantivy_index_pending() {
959                self.flush_tantivy()?;
960            }
961            return Ok(());
962        }
963        let delta = self.apply_records(records)?;
964        if !delta.is_empty() {
965            tracing::debug!(
966                inserted_frames = delta.inserted_frames.len(),
967                inserted_embeddings = delta.inserted_embeddings.len(),
968                inserted_time_entries = delta.inserted_time_entries.len(),
969                "recover applied delta"
970            );
971            self.rebuild_indexes(&delta.inserted_embeddings)?;
972        } else if self.tantivy_index_pending() {
973            self.flush_tantivy()?;
974        }
975        self.wal.record_checkpoint(&mut self.header)?;
976        crate::persist_header(&mut self.file, &self.header)?;
977        if !delta.is_empty() {
978            // rebuild_indexes already flushed Tantivy, so nothing further to do.
979        } else if self.tantivy_index_pending() {
980            self.flush_tantivy()?;
981            crate::persist_header(&mut self.file, &self.header)?;
982        }
983        self.file.sync_all()?;
984        self.pending_frame_inserts = 0;
985        self.dirty = false;
986        Ok(())
987    }
988
989    fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
990        let mut delta = IngestionDelta::default();
991        if records.is_empty() {
992            return Ok(delta);
993        }
994
995        // BUGFIX: Use data_end instead of payload_region_end to avoid overwriting
996        // vec/lex/time segments that were written after the payload region.
997        // payload_region_end() only considers frame payloads, but data_end tracks
998        // all data including index segments.
999        let mut data_cursor = self.data_end;
1000        let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1001
1002        if !records.is_empty() {
1003            self.file.seek(SeekFrom::Start(data_cursor))?;
1004            for record in records {
1005                let mut entry = match decode_wal_entry(&record.payload)? {
1006                    WalEntry::Frame(entry) => entry,
1007                    #[cfg(feature = "lex")]
1008                    WalEntry::Lex(batch) => {
1009                        self.apply_lex_wal(batch)?;
1010                        continue;
1011                    }
1012                };
1013
1014                match entry.op {
1015                    FrameWalOp::Insert => {
1016                        let frame_id = self.toc.frames.len() as u64;
1017
1018                        let (
1019                            payload_offset,
1020                            payload_length,
1021                            checksum_bytes,
1022                            canonical_length_value,
1023                        ) = if let Some(source_id) = entry.reuse_payload_from {
1024                            if !entry.payload.is_empty() {
1025                                return Err(MemvidError::InvalidFrame {
1026                                    frame_id: source_id,
1027                                    reason: "reused payload entry contained inline bytes",
1028                                });
1029                            }
1030                            let source = self.toc.frames.get(source_id as usize).cloned().ok_or(
1031                                MemvidError::InvalidFrame {
1032                                    frame_id: source_id,
1033                                    reason: "reused payload source missing",
1034                                },
1035                            )?;
1036                            (
1037                                source.payload_offset,
1038                                source.payload_length,
1039                                source.checksum,
1040                                entry
1041                                    .canonical_length
1042                                    .or(source.canonical_length)
1043                                    .unwrap_or(source.payload_length),
1044                            )
1045                        } else {
1046                            self.file.seek(SeekFrom::Start(data_cursor))?;
1047                            self.file.write_all(&entry.payload)?;
1048                            let checksum = hash(&entry.payload);
1049                            let payload_length = entry.payload.len() as u64;
1050                            let canonical_length =
1051                                if entry.canonical_encoding == CanonicalEncoding::Zstd {
1052                                    match entry.canonical_length {
1053                                        Some(len) => len,
1054                                        None => {
1055                                            let decoded = crate::decode_canonical_bytes(
1056                                                &entry.payload,
1057                                                CanonicalEncoding::Zstd,
1058                                                frame_id,
1059                                            )?;
1060                                            decoded.len() as u64
1061                                        }
1062                                    }
1063                                } else {
1064                                    entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1065                                };
1066                            let payload_offset = data_cursor;
1067                            data_cursor += payload_length;
1068                            (
1069                                payload_offset,
1070                                payload_length,
1071                                *checksum.as_bytes(),
1072                                canonical_length,
1073                            )
1074                        };
1075
1076                        let uri = entry
1077                            .uri
1078                            .clone()
1079                            .unwrap_or_else(|| crate::default_uri(frame_id));
1080                        let title = entry
1081                            .title
1082                            .clone()
1083                            .or_else(|| crate::infer_title_from_uri(&uri));
1084
1085                        #[cfg(feature = "temporal_track")]
1086                        let (anchor_ts, anchor_source) =
1087                            self.determine_temporal_anchor(entry.timestamp);
1088
1089                        let mut frame = Frame {
1090                            id: frame_id,
1091                            timestamp: entry.timestamp,
1092                            anchor_ts: {
1093                                #[cfg(feature = "temporal_track")]
1094                                {
1095                                    Some(anchor_ts)
1096                                }
1097                                #[cfg(not(feature = "temporal_track"))]
1098                                {
1099                                    None
1100                                }
1101                            },
1102                            anchor_source: {
1103                                #[cfg(feature = "temporal_track")]
1104                                {
1105                                    Some(anchor_source)
1106                                }
1107                                #[cfg(not(feature = "temporal_track"))]
1108                                {
1109                                    None
1110                                }
1111                            },
1112                            kind: entry.kind.clone(),
1113                            track: entry.track.clone(),
1114                            payload_offset,
1115                            payload_length,
1116                            checksum: checksum_bytes,
1117                            uri: Some(uri),
1118                            title,
1119                            canonical_encoding: entry.canonical_encoding,
1120                            canonical_length: Some(canonical_length_value),
1121                            metadata: entry.metadata.clone(),
1122                            search_text: entry.search_text.clone(),
1123                            tags: entry.tags.clone(),
1124                            labels: entry.labels.clone(),
1125                            extra_metadata: entry.extra_metadata.clone(),
1126                            content_dates: entry.content_dates.clone(),
1127                            chunk_manifest: entry.chunk_manifest.clone(),
1128                            role: entry.role,
1129                            parent_id: None,
1130                            chunk_index: entry.chunk_index,
1131                            chunk_count: entry.chunk_count,
1132                            status: FrameStatus::Active,
1133                            supersedes: entry.supersedes_frame_id,
1134                            superseded_by: None,
1135                        };
1136
1137                        if let Some(parent_seq) = entry.parent_sequence {
1138                            if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1139                                frame.parent_id = Some(*parent_frame_id);
1140                            }
1141                        }
1142
1143                        #[cfg(feature = "lex")]
1144                        let index_text = if self.tantivy.is_some() {
1145                            if let Some(text) = entry.search_text.clone() {
1146                                if text.trim().is_empty() {
1147                                    None
1148                                } else {
1149                                    Some(text)
1150                                }
1151                            } else {
1152                                Some(self.frame_content(&frame)?)
1153                            }
1154                        } else {
1155                            None
1156                        };
1157                        #[cfg(feature = "lex")]
1158                        if let (Some(engine), Some(ref text)) =
1159                            (self.tantivy.as_mut(), index_text.as_ref())
1160                        {
1161                            engine.add_frame(&frame, text)?;
1162                            self.tantivy_dirty = true;
1163                        }
1164
1165                        if let Some(embedding) = entry.embedding.take() {
1166                            delta
1167                                .inserted_embeddings
1168                                .push((frame_id, embedding.clone()));
1169                        }
1170
1171                        if entry.role == FrameRole::Document {
1172                            delta
1173                                .inserted_time_entries
1174                                .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1175                            #[cfg(feature = "temporal_track")]
1176                            {
1177                                delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1178                                    frame_id,
1179                                    anchor_ts,
1180                                    anchor_source,
1181                                ));
1182                                delta.inserted_temporal_mentions.extend(
1183                                    Self::collect_temporal_mentions(
1184                                        entry.search_text.as_deref(),
1185                                        frame_id,
1186                                        anchor_ts,
1187                                    ),
1188                                );
1189                            }
1190                        }
1191
1192                        if let Some(predecessor) = frame.supersedes {
1193                            self.mark_frame_superseded(predecessor, frame_id)?;
1194                        }
1195
1196                        self.toc.frames.push(frame);
1197                        delta.inserted_frames.push(frame_id);
1198                        sequence_to_frame.insert(record.sequence, frame_id);
1199                    }
1200                    FrameWalOp::Tombstone => {
1201                        let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1202                            frame_id: 0,
1203                            reason: "tombstone missing frame reference",
1204                        })?;
1205                        self.mark_frame_deleted(target)?;
1206                        delta.mutated_frames = true;
1207                    }
1208                }
1209            }
1210            self.data_end = self.data_end.max(data_cursor);
1211        }
1212
1213        // Index rebuild now happens once per commit (Option A) instead of incremental append.
1214        // See commit_from_records() for where rebuild_indexes() is invoked.
1215        Ok(delta)
1216    }
1217
1218    #[cfg(feature = "temporal_track")]
1219    fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1220        (timestamp, AnchorSource::FrameTimestamp)
1221    }
1222
1223    #[cfg(feature = "temporal_track")]
1224    fn collect_temporal_mentions(
1225        text: Option<&str>,
1226        frame_id: FrameId,
1227        anchor_ts: i64,
1228    ) -> Vec<TemporalMention> {
1229        let text = match text {
1230            Some(value) if !value.trim().is_empty() => value,
1231            _ => return Vec::new(),
1232        };
1233
1234        let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1235            Ok(ts) => ts,
1236            Err(_) => return Vec::new(),
1237        };
1238
1239        let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1240        let normalizer = TemporalNormalizer::new(context);
1241        let mut spans: Vec<(usize, usize)> = Vec::new();
1242        let lower = text.to_ascii_lowercase();
1243
1244        for phrase in STATIC_TEMPORAL_PHRASES {
1245            let mut search_start = 0usize;
1246            while let Some(idx) = lower[search_start..].find(phrase) {
1247                let abs = search_start + idx;
1248                let end = abs + phrase.len();
1249                spans.push((abs, end));
1250                search_start = end;
1251            }
1252        }
1253
1254        static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1255        let regex = NUMERIC_DATE.get_or_init(|| {
1256            Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1257        });
1258        let regex = match regex {
1259            Ok(re) => re,
1260            Err(msg) => {
1261                tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1262                return Vec::new();
1263            }
1264        };
1265        for mat in regex.find_iter(text) {
1266            spans.push((mat.start(), mat.end()));
1267        }
1268
1269        spans.sort_unstable();
1270        spans.dedup();
1271
1272        let mut mentions: Vec<TemporalMention> = Vec::new();
1273        for (start, end) in spans {
1274            if end > text.len() || start >= end {
1275                continue;
1276            }
1277            let raw = &text[start..end];
1278            let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1279            if trimmed.is_empty() {
1280                continue;
1281            }
1282            let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1283            let finish = offset + trimmed.len();
1284            match normalizer.resolve(trimmed) {
1285                Ok(resolution) => {
1286                    mentions.extend(Self::resolution_to_mentions(
1287                        resolution, frame_id, offset, finish,
1288                    ));
1289                }
1290                Err(_) => continue,
1291            }
1292        }
1293
1294        mentions
1295    }
1296
1297    #[cfg(feature = "temporal_track")]
1298    fn resolution_to_mentions(
1299        resolution: TemporalResolution,
1300        frame_id: FrameId,
1301        byte_start: usize,
1302        byte_end: usize,
1303    ) -> Vec<TemporalMention> {
1304        let byte_len = byte_end.saturating_sub(byte_start) as u32;
1305        let byte_start = byte_start.min(u32::MAX as usize) as u32;
1306        let mut results = Vec::new();
1307
1308        let base_flags = Self::flags_from_resolution(&resolution.flags);
1309        match resolution.value {
1310            TemporalResolutionValue::Date(date) => {
1311                let ts = Self::date_to_timestamp(date);
1312                results.push(TemporalMention::new(
1313                    ts,
1314                    frame_id,
1315                    byte_start,
1316                    byte_len,
1317                    TemporalMentionKind::Date,
1318                    resolution.confidence,
1319                    0,
1320                    base_flags,
1321                ));
1322            }
1323            TemporalResolutionValue::DateTime(dt) => {
1324                let ts = dt.unix_timestamp();
1325                let tz_hint = dt.offset().whole_minutes() as i16;
1326                results.push(TemporalMention::new(
1327                    ts,
1328                    frame_id,
1329                    byte_start,
1330                    byte_len,
1331                    TemporalMentionKind::DateTime,
1332                    resolution.confidence,
1333                    tz_hint,
1334                    base_flags,
1335                ));
1336            }
1337            TemporalResolutionValue::DateRange { start, end } => {
1338                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1339                let start_ts = Self::date_to_timestamp(start);
1340                results.push(TemporalMention::new(
1341                    start_ts,
1342                    frame_id,
1343                    byte_start,
1344                    byte_len,
1345                    TemporalMentionKind::RangeStart,
1346                    resolution.confidence,
1347                    0,
1348                    flags,
1349                ));
1350                let end_ts = Self::date_to_timestamp(end);
1351                results.push(TemporalMention::new(
1352                    end_ts,
1353                    frame_id,
1354                    byte_start,
1355                    byte_len,
1356                    TemporalMentionKind::RangeEnd,
1357                    resolution.confidence,
1358                    0,
1359                    flags,
1360                ));
1361            }
1362            TemporalResolutionValue::DateTimeRange { start, end } => {
1363                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1364                results.push(TemporalMention::new(
1365                    start.unix_timestamp(),
1366                    frame_id,
1367                    byte_start,
1368                    byte_len,
1369                    TemporalMentionKind::RangeStart,
1370                    resolution.confidence,
1371                    start.offset().whole_minutes() as i16,
1372                    flags,
1373                ));
1374                results.push(TemporalMention::new(
1375                    end.unix_timestamp(),
1376                    frame_id,
1377                    byte_start,
1378                    byte_len,
1379                    TemporalMentionKind::RangeEnd,
1380                    resolution.confidence,
1381                    end.offset().whole_minutes() as i16,
1382                    flags,
1383                ));
1384            }
1385            TemporalResolutionValue::Month { year, month } => {
1386                let start_date = match Date::from_calendar_date(year, month, 1) {
1387                    Ok(date) => date,
1388                    Err(err) => {
1389                        tracing::warn!(
1390                            target = "memvid::temporal",
1391                            %err,
1392                            year,
1393                            month = month as u8,
1394                            "skipping invalid month resolution"
1395                        );
1396                        // Skip invalid range for this mention only.
1397                        return results;
1398                    }
1399                };
1400                let end_date = match Self::last_day_in_month(year, month) {
1401                    Some(date) => date,
1402                    None => {
1403                        tracing::warn!(
1404                            target = "memvid::temporal",
1405                            year,
1406                            month = month as u8,
1407                            "skipping month resolution with invalid calendar range"
1408                        );
1409                        return results;
1410                    }
1411                };
1412                let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1413                results.push(TemporalMention::new(
1414                    Self::date_to_timestamp(start_date),
1415                    frame_id,
1416                    byte_start,
1417                    byte_len,
1418                    TemporalMentionKind::RangeStart,
1419                    resolution.confidence,
1420                    0,
1421                    flags,
1422                ));
1423                results.push(TemporalMention::new(
1424                    Self::date_to_timestamp(end_date),
1425                    frame_id,
1426                    byte_start,
1427                    byte_len,
1428                    TemporalMentionKind::RangeEnd,
1429                    resolution.confidence,
1430                    0,
1431                    flags,
1432                ));
1433            }
1434        }
1435
1436        results
1437    }
1438
1439    #[cfg(feature = "temporal_track")]
1440    fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1441        let mut result = TemporalMentionFlags::empty();
1442        if flags
1443            .iter()
1444            .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1445        {
1446            result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1447        }
1448        if flags
1449            .iter()
1450            .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1451        {
1452            result = result.set(TemporalMentionFlags::DERIVED, true);
1453        }
1454        result
1455    }
1456
1457    #[cfg(feature = "temporal_track")]
1458    fn date_to_timestamp(date: Date) -> i64 {
1459        PrimitiveDateTime::new(date, Time::MIDNIGHT)
1460            .assume_offset(UtcOffset::UTC)
1461            .unix_timestamp()
1462    }
1463
1464    #[cfg(feature = "temporal_track")]
1465    fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1466        let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1467        while let Some(next) = date.next_day() {
1468            if next.month() == month {
1469                date = next;
1470            } else {
1471                break;
1472            }
1473        }
1474        Some(date)
1475    }
1476
1477    #[allow(dead_code)]
1478    fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1479        if delta.inserted_frames.is_empty() || !self.lex_enabled {
1480            return Ok(false);
1481        }
1482
1483        let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1484            Some(artifact) => artifact,
1485            None => return Ok(false),
1486        };
1487
1488        let segment_id = self.toc.segment_catalog.next_segment_id;
1489        #[cfg(feature = "parallel_segments")]
1490        let span =
1491            self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1492
1493        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1494        let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1495        #[cfg(feature = "parallel_segments")]
1496        if let Some(span) = span {
1497            Self::decorate_segment_common(&mut descriptor.common, span);
1498        }
1499        #[cfg(feature = "parallel_segments")]
1500        let descriptor_for_manifest = descriptor.clone();
1501        self.toc.segment_catalog.lex_segments.push(descriptor);
1502        #[cfg(feature = "parallel_segments")]
1503        if let Err(err) = self.record_index_segment(
1504            SegmentKind::Lexical,
1505            descriptor_for_manifest.common,
1506            SegmentStats {
1507                doc_count: artifact.doc_count,
1508                vector_count: 0,
1509                time_entries: 0,
1510                bytes_uncompressed: artifact.bytes.len() as u64,
1511                build_micros: 0,
1512            },
1513        ) {
1514            tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1515        }
1516        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1517        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1518        Ok(true)
1519    }
1520
1521    #[allow(dead_code)]
1522    fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1523        if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1524            return Ok(false);
1525        }
1526
1527        let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1528            Some(artifact) => artifact,
1529            None => return Ok(false),
1530        };
1531
1532        if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1533            if existing_dim != artifact.dimension {
1534                return Err(MemvidError::VecDimensionMismatch {
1535                    expected: existing_dim,
1536                    actual: artifact.dimension as usize,
1537                });
1538            }
1539        }
1540
1541        let segment_id = self.toc.segment_catalog.next_segment_id;
1542        #[cfg(feature = "parallel_segments")]
1543        #[cfg(feature = "parallel_segments")]
1544        let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1545
1546        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1547        let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1548        #[cfg(feature = "parallel_segments")]
1549        if let Some(span) = span {
1550            Self::decorate_segment_common(&mut descriptor.common, span);
1551        }
1552        #[cfg(feature = "parallel_segments")]
1553        let descriptor_for_manifest = descriptor.clone();
1554        self.toc.segment_catalog.vec_segments.push(descriptor);
1555        #[cfg(feature = "parallel_segments")]
1556        if let Err(err) = self.record_index_segment(
1557            SegmentKind::Vector,
1558            descriptor_for_manifest.common,
1559            SegmentStats {
1560                doc_count: 0,
1561                vector_count: artifact.vector_count,
1562                time_entries: 0,
1563                bytes_uncompressed: artifact.bytes_uncompressed,
1564                build_micros: 0,
1565            },
1566        ) {
1567            tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1568        }
1569        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1570        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1571
1572        // Keep the global vec manifest in sync for auto-detection and stats.
1573        if self.toc.indexes.vec.is_none() {
1574            let empty_offset = self.data_end;
1575            let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1576                                    \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1577            self.toc.indexes.vec = Some(VecIndexManifest {
1578                vector_count: 0,
1579                dimension: 0,
1580                bytes_offset: empty_offset,
1581                bytes_length: 0,
1582                checksum: empty_checksum,
1583                compression_mode: self.vec_compression.clone(),
1584            });
1585        }
1586        if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1587            if manifest.dimension == 0 {
1588                manifest.dimension = artifact.dimension;
1589            }
1590            if manifest.bytes_length == 0 {
1591                manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1592                manifest.compression_mode = artifact.compression.clone();
1593            }
1594        }
1595
1596        self.vec_enabled = true;
1597        Ok(true)
1598    }
1599
1600    #[allow(dead_code)]
1601    fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1602        if delta.inserted_time_entries.is_empty() {
1603            return Ok(false);
1604        }
1605
1606        let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1607            Some(artifact) => artifact,
1608            None => return Ok(false),
1609        };
1610
1611        let segment_id = self.toc.segment_catalog.next_segment_id;
1612        #[cfg(feature = "parallel_segments")]
1613        #[cfg(feature = "parallel_segments")]
1614        let span = self.segment_span_from_iter(
1615            delta
1616                .inserted_time_entries
1617                .iter()
1618                .map(|entry| entry.frame_id),
1619        );
1620
1621        #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1622        let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1623        #[cfg(feature = "parallel_segments")]
1624        if let Some(span) = span {
1625            Self::decorate_segment_common(&mut descriptor.common, span);
1626        }
1627        #[cfg(feature = "parallel_segments")]
1628        let descriptor_for_manifest = descriptor.clone();
1629        self.toc.segment_catalog.time_segments.push(descriptor);
1630        #[cfg(feature = "parallel_segments")]
1631        if let Err(err) = self.record_index_segment(
1632            SegmentKind::Time,
1633            descriptor_for_manifest.common,
1634            SegmentStats {
1635                doc_count: 0,
1636                vector_count: 0,
1637                time_entries: artifact.entry_count,
1638                bytes_uncompressed: artifact.bytes.len() as u64,
1639                build_micros: 0,
1640            },
1641        ) {
1642            tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1643        }
1644        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1645        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1646        Ok(true)
1647    }
1648
1649    #[cfg(feature = "temporal_track")]
1650    #[allow(dead_code)]
1651    fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1652        if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1653        {
1654            return Ok(false);
1655        }
1656
1657        debug_assert!(
1658            delta.inserted_temporal_mentions.len() < 1_000_000,
1659            "temporal delta mentions unexpectedly large: {}",
1660            delta.inserted_temporal_mentions.len()
1661        );
1662        debug_assert!(
1663            delta.inserted_temporal_anchors.len() < 1_000_000,
1664            "temporal delta anchors unexpectedly large: {}",
1665            delta.inserted_temporal_anchors.len()
1666        );
1667
1668        let artifact = match self.build_temporal_segment_from_records(
1669            &delta.inserted_temporal_mentions,
1670            &delta.inserted_temporal_anchors,
1671        )? {
1672            Some(artifact) => artifact,
1673            None => return Ok(false),
1674        };
1675
1676        let segment_id = self.toc.segment_catalog.next_segment_id;
1677        let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1678        self.toc
1679            .segment_catalog
1680            .temporal_segments
1681            .push(descriptor.clone());
1682        self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1683        self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1684
1685        self.toc.temporal_track = Some(TemporalTrackManifest {
1686            bytes_offset: descriptor.common.bytes_offset,
1687            bytes_length: descriptor.common.bytes_length,
1688            entry_count: artifact.entry_count,
1689            anchor_count: artifact.anchor_count,
1690            checksum: artifact.checksum,
1691            flags: artifact.flags,
1692        });
1693
1694        self.clear_temporal_track_cache();
1695
1696        Ok(true)
1697    }
1698
1699    fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1700        let frame =
1701            self.toc
1702                .frames
1703                .get_mut(frame_id as usize)
1704                .ok_or(MemvidError::InvalidFrame {
1705                    frame_id,
1706                    reason: "supersede target missing",
1707                })?;
1708        frame.status = FrameStatus::Superseded;
1709        frame.superseded_by = Some(successor_id);
1710        self.remove_frame_from_indexes(frame_id)
1711    }
1712
1713    pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1714        if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1715            return Ok(());
1716        }
1717
1718        let payload_end = self.payload_region_end();
1719        self.data_end = payload_end;
1720        // Don't truncate if footer_offset is higher - there may be replay segments
1721        // or other data written after payload_end that must be preserved.
1722        let safe_truncate_len = self.header.footer_offset.max(payload_end);
1723        if self.file.metadata()?.len() > safe_truncate_len {
1724            self.file.set_len(safe_truncate_len)?;
1725        }
1726        self.file.seek(SeekFrom::Start(payload_end))?;
1727
1728        // Clear legacy per-segment catalogs; full rebuild emits fresh manifests.
1729        self.toc.segment_catalog.lex_segments.clear();
1730        self.toc.segment_catalog.vec_segments.clear();
1731        self.toc.segment_catalog.time_segments.clear();
1732        #[cfg(feature = "temporal_track")]
1733        self.toc.segment_catalog.temporal_segments.clear();
1734        #[cfg(feature = "parallel_segments")]
1735        self.toc.segment_catalog.index_segments.clear();
1736        // Drop any stale Tantivy manifests so offsets are rebuilt fresh.
1737        self.toc.segment_catalog.tantivy_segments.clear();
1738        // Drop any stale embedded lex manifest entries before rebuilding Tantivy.
1739        self.toc.indexes.lex_segments.clear();
1740
1741        let mut time_entries: Vec<TimeIndexEntry> = self
1742            .toc
1743            .frames
1744            .iter()
1745            .filter(|frame| {
1746                frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1747            })
1748            .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1749            .collect();
1750        let (ti_offset, ti_length, ti_checksum) =
1751            time_index_append(&mut self.file, &mut time_entries)?;
1752        self.toc.time_index = Some(TimeIndexManifest {
1753            bytes_offset: ti_offset,
1754            bytes_length: ti_length,
1755            entry_count: time_entries.len() as u64,
1756            checksum: ti_checksum,
1757        });
1758
1759        let mut footer_offset = ti_offset + ti_length;
1760
1761        #[cfg(feature = "temporal_track")]
1762        {
1763            self.toc.temporal_track = None;
1764            self.toc.segment_catalog.temporal_segments.clear();
1765            self.clear_temporal_track_cache();
1766        }
1767
1768        if self.lex_enabled {
1769            #[cfg(feature = "lex")]
1770            {
1771                // Clear embedded storage to avoid carrying stale segments between rebuilds.
1772                if let Ok(mut storage) = self.lex_storage.write() {
1773                    storage.clear();
1774                    storage.set_generation(0);
1775                }
1776
1777                // Initialize Tantivy engine (loads existing segments if any)
1778                self.init_tantivy()?;
1779
1780                if let Some(mut engine) = self.tantivy.take() {
1781                    self.rebuild_tantivy_engine(&mut engine)?;
1782                    self.tantivy = Some(engine);
1783                } else {
1784                    return Err(MemvidError::InvalidToc {
1785                        reason: "tantivy engine missing during doctor rebuild".into(),
1786                    });
1787                }
1788
1789                // Set lex_enabled to ensure it persists
1790                self.lex_enabled = true;
1791
1792                // Mark Tantivy as dirty so it gets flushed
1793                self.tantivy_dirty = true;
1794
1795                // Position embedded Tantivy segments immediately after the time index.
1796                self.data_end = footer_offset;
1797
1798                // Flush Tantivy segments to file
1799                self.flush_tantivy()?;
1800
1801                // Update footer_offset after Tantivy flush
1802                footer_offset = self.header.footer_offset;
1803
1804                // Restore data_end to payload boundary so future payload writes stay before indexes.
1805                self.data_end = payload_end;
1806            }
1807            #[cfg(not(feature = "lex"))]
1808            {
1809                self.toc.indexes.lex = None;
1810                self.toc.indexes.lex_segments.clear();
1811            }
1812        } else {
1813            // Lex disabled: clear everything
1814            self.toc.indexes.lex = None;
1815            self.toc.indexes.lex_segments.clear();
1816            #[cfg(feature = "lex")]
1817            if let Ok(mut storage) = self.lex_storage.write() {
1818                storage.clear();
1819            }
1820        }
1821
1822        if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
1823            let vec_offset = footer_offset;
1824            self.file.seek(SeekFrom::Start(vec_offset))?;
1825            self.file.write_all(&artifact.bytes)?;
1826            footer_offset += artifact.bytes.len() as u64;
1827            self.toc.indexes.vec = Some(VecIndexManifest {
1828                vector_count: artifact.vector_count,
1829                dimension: artifact.dimension,
1830                bytes_offset: vec_offset,
1831                bytes_length: artifact.bytes.len() as u64,
1832                checksum: artifact.checksum,
1833                compression_mode: self.vec_compression.clone(),
1834            });
1835            self.vec_index = Some(index);
1836        } else {
1837            // Only clear manifest if vec is disabled, keep empty placeholder if enabled
1838            if !self.vec_enabled {
1839                self.toc.indexes.vec = None;
1840            }
1841            self.vec_index = None;
1842        }
1843
1844        // Persist CLIP index if it has embeddings
1845        if self.clip_enabled {
1846            if let Some(ref clip_index) = self.clip_index {
1847                if !clip_index.is_empty() {
1848                    let artifact = clip_index.encode()?;
1849                    let clip_offset = footer_offset;
1850                    self.file.seek(SeekFrom::Start(clip_offset))?;
1851                    self.file.write_all(&artifact.bytes)?;
1852                    footer_offset += artifact.bytes.len() as u64;
1853                    self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
1854                        bytes_offset: clip_offset,
1855                        bytes_length: artifact.bytes.len() as u64,
1856                        vector_count: artifact.vector_count,
1857                        dimension: artifact.dimension,
1858                        checksum: artifact.checksum,
1859                        model_name: crate::clip::default_model_info().name.to_string(),
1860                    });
1861                    tracing::info!(
1862                        "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
1863                        artifact.vector_count,
1864                        clip_offset
1865                    );
1866                }
1867            }
1868        } else {
1869            self.toc.indexes.clip = None;
1870        }
1871
1872        // Persist memories track if it has cards
1873        if self.memories_track.card_count() > 0 {
1874            let memories_offset = footer_offset;
1875            let memories_bytes = self.memories_track.serialize()?;
1876            let memories_checksum = blake3::hash(&memories_bytes).into();
1877            self.file.seek(SeekFrom::Start(memories_offset))?;
1878            self.file.write_all(&memories_bytes)?;
1879            footer_offset += memories_bytes.len() as u64;
1880
1881            let stats = self.memories_track.stats();
1882            self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
1883                bytes_offset: memories_offset,
1884                bytes_length: memories_bytes.len() as u64,
1885                card_count: stats.card_count as u64,
1886                entity_count: stats.entity_count as u64,
1887                checksum: memories_checksum,
1888            });
1889        } else {
1890            self.toc.memories_track = None;
1891        }
1892
1893        // Persist logic mesh if it has nodes
1894        if !self.logic_mesh.is_empty() {
1895            let mesh_offset = footer_offset;
1896            let mesh_bytes = self.logic_mesh.serialize()?;
1897            let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
1898            self.file.seek(SeekFrom::Start(mesh_offset))?;
1899            self.file.write_all(&mesh_bytes)?;
1900            footer_offset += mesh_bytes.len() as u64;
1901
1902            let stats = self.logic_mesh.stats();
1903            self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
1904                bytes_offset: mesh_offset,
1905                bytes_length: mesh_bytes.len() as u64,
1906                node_count: stats.node_count as u64,
1907                edge_count: stats.edge_count as u64,
1908                checksum: mesh_checksum,
1909            });
1910        } else {
1911            self.toc.logic_mesh = None;
1912        }
1913
1914        // This fires on every full rebuild (doctor/compaction); keep it informational to avoid noisy WARNs.
1915        tracing::info!(
1916            "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
1917            ti_offset,
1918            ti_length,
1919            footer_offset,
1920            self.header.footer_offset
1921        );
1922
1923        // Use max() to preserve any higher footer_offset (e.g., from replay segment)
1924        // This prevents overwriting data like replay segments that were written after index data
1925        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
1926
1927        // Ensure the file length covers rebuilt indexes to avoid out-of-bounds manifests.
1928        if self.file.metadata()?.len() < self.header.footer_offset {
1929            self.file.set_len(self.header.footer_offset)?;
1930        }
1931
1932        self.rewrite_toc_footer()?;
1933        self.header.toc_checksum = self.toc.toc_checksum;
1934        crate::persist_header(&mut self.file, &self.header)?;
1935
1936        #[cfg(feature = "lex")]
1937        if self.lex_enabled {
1938            if let Some(ref engine) = self.tantivy {
1939                let doc_count = engine.num_docs();
1940                let active_frame_count = self
1941                    .toc
1942                    .frames
1943                    .iter()
1944                    .filter(|f| f.status == FrameStatus::Active)
1945                    .count();
1946
1947                // Count frames that would actually be indexed by rebuild_tantivy_engine
1948                // Uses the same logic: content-type based check + size limit
1949                let text_indexable_count = self
1950                    .toc
1951                    .frames
1952                    .iter()
1953                    .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
1954                    .count();
1955
1956                // Only fail if we have text-indexable frames but none got indexed
1957                // This avoids false positives for binary files (videos, images)
1958                if doc_count == 0 && text_indexable_count > 0 {
1959                    return Err(MemvidError::Doctor {
1960                        reason: format!(
1961                            "Lex index rebuild failed: 0 documents indexed from {} text-indexable frames. \
1962                            This indicates a critical failure in the rebuild process.",
1963                            text_indexable_count
1964                        ),
1965                    });
1966                }
1967
1968                // Success! Log it
1969                log::info!(
1970                    "✓ Doctor lex index rebuild succeeded: {} docs from {} frames ({} text-indexable)",
1971                    doc_count,
1972                    active_frame_count,
1973                    text_indexable_count
1974                );
1975            }
1976        }
1977
1978        Ok(())
1979    }
1980
1981    /// Persist the memories track to the file without a full rebuild.
1982    ///
1983    /// This is used when the memories track has been modified but no frame
1984    /// changes were made (e.g., after running enrichment).
1985    fn persist_memories_track(&mut self) -> Result<()> {
1986        if self.memories_track.card_count() == 0 {
1987            self.toc.memories_track = None;
1988            return Ok(());
1989        }
1990
1991        // Write after the current footer_offset
1992        let memories_offset = self.header.footer_offset;
1993        let memories_bytes = self.memories_track.serialize()?;
1994        let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
1995
1996        self.file.seek(SeekFrom::Start(memories_offset))?;
1997        self.file.write_all(&memories_bytes)?;
1998
1999        let stats = self.memories_track.stats();
2000        self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2001            bytes_offset: memories_offset,
2002            bytes_length: memories_bytes.len() as u64,
2003            card_count: stats.card_count as u64,
2004            entity_count: stats.entity_count as u64,
2005            checksum: memories_checksum,
2006        });
2007
2008        // Update footer_offset to account for the memories track
2009        self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2010
2011        // Ensure the file length covers the memories track
2012        if self.file.metadata()?.len() < self.header.footer_offset {
2013            self.file.set_len(self.header.footer_offset)?;
2014        }
2015
2016        Ok(())
2017    }
2018
2019    /// Persist the CLIP index to the file without a full rebuild.
2020    ///
2021    /// This is used when CLIP embeddings have been added but no full
2022    /// index rebuild is needed (e.g., in parallel segments mode).
2023    fn persist_clip_index(&mut self) -> Result<()> {
2024        if !self.clip_enabled {
2025            self.toc.indexes.clip = None;
2026            return Ok(());
2027        }
2028
2029        let clip_index = match &self.clip_index {
2030            Some(idx) if !idx.is_empty() => idx,
2031            _ => {
2032                self.toc.indexes.clip = None;
2033                return Ok(());
2034            }
2035        };
2036
2037        // Encode the CLIP index
2038        let artifact = clip_index.encode()?;
2039
2040        // Write after the current footer_offset
2041        let clip_offset = self.header.footer_offset;
2042        self.file.seek(SeekFrom::Start(clip_offset))?;
2043        self.file.write_all(&artifact.bytes)?;
2044
2045        self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2046            bytes_offset: clip_offset,
2047            bytes_length: artifact.bytes.len() as u64,
2048            vector_count: artifact.vector_count,
2049            dimension: artifact.dimension,
2050            checksum: artifact.checksum,
2051            model_name: crate::clip::default_model_info().name.to_string(),
2052        });
2053
2054        tracing::info!(
2055            "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2056            artifact.vector_count,
2057            clip_offset
2058        );
2059
2060        // Update footer_offset to account for the CLIP index
2061        self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2062
2063        // Ensure the file length covers the CLIP index
2064        if self.file.metadata()?.len() < self.header.footer_offset {
2065            self.file.set_len(self.header.footer_offset)?;
2066        }
2067
2068        Ok(())
2069    }
2070
2071    /// Persist the Logic-Mesh to the file without a full rebuild.
2072    ///
2073    /// This is used when the Logic-Mesh has been modified but no frame
2074    /// changes were made (e.g., after running NER enrichment).
2075    fn persist_logic_mesh(&mut self) -> Result<()> {
2076        if self.logic_mesh.is_empty() {
2077            self.toc.logic_mesh = None;
2078            return Ok(());
2079        }
2080
2081        // Write after the current footer_offset
2082        let mesh_offset = self.header.footer_offset;
2083        let mesh_bytes = self.logic_mesh.serialize()?;
2084        let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2085
2086        self.file.seek(SeekFrom::Start(mesh_offset))?;
2087        self.file.write_all(&mesh_bytes)?;
2088
2089        let stats = self.logic_mesh.stats();
2090        self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2091            bytes_offset: mesh_offset,
2092            bytes_length: mesh_bytes.len() as u64,
2093            node_count: stats.node_count as u64,
2094            edge_count: stats.edge_count as u64,
2095            checksum: mesh_checksum,
2096        });
2097
2098        // Update footer_offset to account for the logic mesh
2099        self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2100
2101        // Ensure the file length covers the logic mesh
2102        if self.file.metadata()?.len() < self.header.footer_offset {
2103            self.file.set_len(self.header.footer_offset)?;
2104        }
2105
2106        Ok(())
2107    }
2108
2109    #[cfg(feature = "lex")]
2110    fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2111        let LexWalBatch {
2112            generation,
2113            doc_count,
2114            checksum,
2115            segments,
2116        } = batch;
2117
2118        if let Some(mut storage) = self.lex_storage.write().ok() {
2119            storage.replace(doc_count, checksum, segments);
2120            storage.set_generation(generation);
2121        }
2122
2123        let result = self.persist_lex_manifest();
2124        result
2125    }
2126
2127    #[cfg(feature = "lex")]
2128    fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2129        let payload = encode_to_vec(&WalEntry::Lex(batch.clone()), wal_config())?;
2130        self.append_wal_entry(&payload)?;
2131        Ok(())
2132    }
2133
2134    #[cfg(feature = "lex")]
2135    fn persist_lex_manifest(&mut self) -> Result<()> {
2136        let (index_manifest, segments) = if let Some(storage) = self.lex_storage.read().ok() {
2137            storage.to_manifest()
2138        } else {
2139            (None, Vec::new())
2140        };
2141
2142        // Update the manifest
2143        if let Some(storage_manifest) = index_manifest {
2144            // Old LexIndexArtifact format: set the manifest with actual offset/length
2145            self.toc.indexes.lex = Some(storage_manifest);
2146        } else {
2147            // Tantivy segments OR lex disabled: clear the manifest
2148            // Stats will check lex_segments instead of manifest
2149            self.toc.indexes.lex = None;
2150        }
2151
2152        self.toc.indexes.lex_segments = segments;
2153
2154        // footer_offset is already correctly set by flush_tantivy() earlier in this function.
2155        // DO NOT call catalog_data_end() as it would include orphaned Tantivy segments.
2156
2157        self.rewrite_toc_footer()?;
2158        self.header.toc_checksum = self.toc.toc_checksum;
2159        crate::persist_header(&mut self.file, &self.header)?;
2160        Ok(())
2161    }
2162
2163    #[cfg(feature = "lex")]
2164    pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2165        let TantivySnapshot {
2166            doc_count,
2167            checksum,
2168            segments,
2169        } = snapshot;
2170
2171        let mut footer_offset = self.data_end;
2172        self.file.seek(SeekFrom::Start(footer_offset))?;
2173
2174        let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2175        for segment in segments {
2176            let bytes_length = segment.bytes.len() as u64;
2177            self.file.write_all(&segment.bytes)?;
2178            self.file.flush()?; // Flush segment data to disk
2179            embedded_segments.push(EmbeddedLexSegment {
2180                path: segment.path,
2181                bytes_offset: footer_offset,
2182                bytes_length,
2183                checksum: segment.checksum,
2184            });
2185            footer_offset += bytes_length;
2186        }
2187        // Set footer_offset for TOC writing, but DON'T update data_end
2188        // data_end stays at end of payloads, so next commit overwrites these segments
2189        // Use max() to never decrease footer_offset - this preserves replay segments
2190        // that may have been written at a higher offset
2191        self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2192
2193        let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2194        let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2195            Vec::with_capacity(embedded_segments.len());
2196        for segment in &embedded_segments {
2197            let descriptor = TantivySegmentDescriptor::from_common(
2198                SegmentCommon::new(
2199                    next_segment_id,
2200                    segment.bytes_offset,
2201                    segment.bytes_length,
2202                    segment.checksum,
2203                ),
2204                segment.path.clone(),
2205            );
2206            catalog_segments.push(descriptor);
2207            next_segment_id = next_segment_id.saturating_add(1);
2208        }
2209        if catalog_segments.is_empty() {
2210            self.toc.segment_catalog.tantivy_segments.clear();
2211        } else {
2212            self.toc.segment_catalog.tantivy_segments = catalog_segments;
2213            self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2214        }
2215        self.toc.segment_catalog.next_segment_id = next_segment_id;
2216
2217        // REMOVED: catalog_data_end() check
2218        // This was causing orphaned Tantivy segments because it would see OLD segments
2219        // still in the catalog from previous commits, and push footer_offset forward.
2220        // We want Tantivy segments to overwrite at data_end, so footer_offset should
2221        // stay at the end of the newly written segments.
2222
2223        let generation = self
2224            .lex_storage
2225            .write()
2226            .map_err(|_| MemvidError::Tantivy {
2227                reason: "embedded lex storage lock poisoned".into(),
2228            })
2229            .map(|mut storage| {
2230                storage.replace(doc_count, checksum, embedded_segments.clone());
2231                storage.generation()
2232            })?;
2233
2234        let batch = LexWalBatch {
2235            generation,
2236            doc_count,
2237            checksum,
2238            segments: embedded_segments.clone(),
2239        };
2240        self.append_lex_batch(&batch)?;
2241        self.persist_lex_manifest()?;
2242        self.header.toc_checksum = self.toc.toc_checksum;
2243        crate::persist_header(&mut self.file, &self.header)?;
2244        Ok(())
2245    }
2246
2247    fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2248        let frame =
2249            self.toc
2250                .frames
2251                .get_mut(frame_id as usize)
2252                .ok_or(MemvidError::InvalidFrame {
2253                    frame_id,
2254                    reason: "delete target missing",
2255                })?;
2256        frame.status = FrameStatus::Deleted;
2257        frame.superseded_by = None;
2258        self.remove_frame_from_indexes(frame_id)
2259    }
2260
2261    fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2262        #[cfg(feature = "lex")]
2263        if let Some(engine) = self.tantivy.as_mut() {
2264            engine.delete_frame(frame_id)?;
2265            self.tantivy_dirty = true;
2266        }
2267        if let Some(index) = self.lex_index.as_mut() {
2268            index.remove_document(frame_id);
2269        }
2270        if let Some(index) = self.vec_index.as_mut() {
2271            index.remove(frame_id);
2272        }
2273        Ok(())
2274    }
2275
2276    pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2277        self.toc
2278            .frames
2279            .get(frame_id as usize)
2280            .map_or(false, |frame| frame.status == FrameStatus::Active)
2281    }
2282
2283    #[cfg(feature = "parallel_segments")]
2284    fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2285    where
2286        I: IntoIterator<Item = FrameId>,
2287    {
2288        let mut iter = iter.into_iter();
2289        let first_id = iter.next()?;
2290        let first_frame = self.toc.frames.get(first_id as usize);
2291        let mut min_id = first_id;
2292        let mut max_id = first_id;
2293        let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2294        let mut page_end = first_frame
2295            .and_then(|frame| frame.chunk_count)
2296            .map(|count| page_start + count.saturating_sub(1))
2297            .unwrap_or(page_start);
2298        for frame_id in iter {
2299            if frame_id < min_id {
2300                min_id = frame_id;
2301            }
2302            if frame_id > max_id {
2303                max_id = frame_id;
2304            }
2305            if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2306                if let Some(idx) = frame.chunk_index {
2307                    page_start = page_start.min(idx);
2308                    if let Some(count) = frame.chunk_count {
2309                        let end = idx + count.saturating_sub(1);
2310                        page_end = page_end.max(end);
2311                    } else {
2312                        page_end = page_end.max(idx);
2313                    }
2314                }
2315            }
2316        }
2317        // TODO(parallel_segments): capture token offsets once planner surfaces them.
2318        Some(SegmentSpan {
2319            frame_start: min_id,
2320            frame_end: max_id,
2321            page_start,
2322            page_end,
2323            ..SegmentSpan::default()
2324        })
2325    }
2326
2327    #[cfg(feature = "parallel_segments")]
2328    pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2329        common.span = Some(span);
2330        if common.codec_version == 0 {
2331            common.codec_version = 1;
2332        }
2333    }
2334
2335    #[cfg(feature = "parallel_segments")]
2336    pub(crate) fn record_index_segment(
2337        &mut self,
2338        kind: SegmentKind,
2339        common: SegmentCommon,
2340        stats: SegmentStats,
2341    ) -> Result<()> {
2342        let entry = IndexSegmentRef {
2343            kind,
2344            common,
2345            stats,
2346        };
2347        self.toc.segment_catalog.index_segments.push(entry.clone());
2348        if let Some(wal) = self.manifest_wal.as_mut() {
2349            wal.append_segments(&[entry])?;
2350        }
2351        Ok(())
2352    }
2353
2354    fn ensure_mutation_allowed(&mut self) -> Result<()> {
2355        self.ensure_writable()?;
2356        if self.toc.ticket_ref.issuer == "free-tier" {
2357            return Ok(());
2358        }
2359        match self.tier() {
2360            Tier::Free => Ok(()),
2361            tier => {
2362                if self.toc.ticket_ref.issuer.trim().is_empty() {
2363                    Err(MemvidError::TicketRequired { tier })
2364                } else {
2365                    Ok(())
2366                }
2367            }
2368        }
2369    }
2370
2371    pub(crate) fn tier(&self) -> Tier {
2372        if self.header.wal_size >= WAL_SIZE_LARGE {
2373            Tier::Enterprise
2374        } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2375            Tier::Dev
2376        } else {
2377            Tier::Free
2378        }
2379    }
2380
2381    pub(crate) fn capacity_limit(&self) -> u64 {
2382        if self.toc.ticket_ref.capacity_bytes != 0 {
2383            self.toc.ticket_ref.capacity_bytes
2384        } else {
2385            self.tier().capacity_bytes()
2386        }
2387    }
2388
2389    /// Get current storage capacity in bytes.
2390    ///
2391    /// Returns the capacity from the applied ticket, or the default
2392    /// tier capacity (1 GB for free tier).
2393    pub fn get_capacity(&self) -> u64 {
2394        self.capacity_limit()
2395    }
2396
2397    pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2398        tracing::info!(
2399            vec_segments = self.toc.segment_catalog.vec_segments.len(),
2400            lex_segments = self.toc.segment_catalog.lex_segments.len(),
2401            time_segments = self.toc.segment_catalog.time_segments.len(),
2402            footer_offset = self.header.footer_offset,
2403            data_end = self.data_end,
2404            "rewrite_toc_footer: about to serialize TOC"
2405        );
2406        let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2407        let footer_offset = self.header.footer_offset;
2408        self.file.seek(SeekFrom::Start(footer_offset))?;
2409        self.file.write_all(&toc_bytes)?;
2410        let footer = CommitFooter {
2411            toc_len: toc_bytes.len() as u64,
2412            toc_hash: *hash(&toc_bytes).as_bytes(),
2413            generation: self.generation,
2414        };
2415        let encoded_footer = footer.encode();
2416        self.file.write_all(&encoded_footer)?;
2417
2418        // The file must always be at least header + WAL size
2419        let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2420        let min_len = self.header.wal_offset + self.header.wal_size;
2421        let final_len = new_len.max(min_len);
2422
2423        if new_len < min_len {
2424            tracing::warn!(
2425                file.new_len = new_len,
2426                file.min_len = min_len,
2427                file.final_len = final_len,
2428                "truncation would cut into WAL region, clamping to min_len"
2429            );
2430        }
2431
2432        self.file.set_len(final_len)?;
2433        // Ensure footer is flushed to disk so mmap-based readers can find it
2434        self.file.sync_all()?;
2435        Ok(())
2436    }
2437}
2438
2439#[cfg(feature = "parallel_segments")]
2440impl Memvid {
2441    fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2442        let chunks = self.collect_segment_chunks(delta)?;
2443        if chunks.is_empty() {
2444            return Ok(false);
2445        }
2446        let planner = SegmentPlanner::new(opts.clone());
2447        let plans = planner.plan_from_chunks(chunks);
2448        if plans.is_empty() {
2449            return Ok(false);
2450        }
2451        let worker_pool = SegmentWorkerPool::new(opts);
2452        let results = worker_pool.execute(plans)?;
2453        if results.is_empty() {
2454            return Ok(false);
2455        }
2456        self.append_parallel_segments(results)?;
2457        Ok(true)
2458    }
2459
2460    fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2461        let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2462            delta.inserted_embeddings.iter().cloned().collect();
2463        tracing::info!(
2464            inserted_frames = ?delta.inserted_frames,
2465            embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2466            "collect_segment_chunks: comparing frame IDs"
2467        );
2468        let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2469        for frame_id in &delta.inserted_frames {
2470            let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2471                MemvidError::InvalidFrame {
2472                    frame_id: *frame_id,
2473                    reason: "frame id out of range while planning segments",
2474                },
2475            )?;
2476            let text = self.frame_content(&frame)?;
2477            if text.trim().is_empty() {
2478                continue;
2479            }
2480            let token_estimate = estimate_tokens(&text);
2481            let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2482            let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2483            let page_start = if frame.chunk_index.is_some() {
2484                chunk_index + 1
2485            } else {
2486                0
2487            };
2488            let page_end = if frame.chunk_index.is_some() {
2489                page_start
2490            } else {
2491                0
2492            };
2493            chunks.push(SegmentChunkPlan {
2494                text,
2495                frame_id: *frame_id,
2496                timestamp: frame.timestamp,
2497                chunk_index,
2498                chunk_count: chunk_count.max(1),
2499                token_estimate,
2500                token_start: 0,
2501                token_end: 0,
2502                page_start,
2503                page_end,
2504                embedding: embedding_map.remove(frame_id),
2505            });
2506        }
2507        Ok(chunks)
2508    }
2509}
2510
2511#[cfg(feature = "parallel_segments")]
2512fn estimate_tokens(text: &str) -> usize {
2513    text.split_whitespace().count().max(1)
2514}
2515
2516impl Memvid {
2517    pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2518        let catalog_end = self.catalog_data_end();
2519        if catalog_end <= self.header.footer_offset {
2520            return Ok(false);
2521        }
2522        self.header.footer_offset = catalog_end;
2523        self.rewrite_toc_footer()?;
2524        self.header.toc_checksum = self.toc.toc_checksum;
2525        crate::persist_header(&mut self.file, &self.header)?;
2526        Ok(true)
2527    }
2528}
2529
2530impl Memvid {
2531    pub fn vacuum(&mut self) -> Result<()> {
2532        self.commit()?;
2533
2534        let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2535        let frames: Vec<Frame> = self
2536            .toc
2537            .frames
2538            .iter()
2539            .filter(|frame| frame.status == FrameStatus::Active)
2540            .cloned()
2541            .collect();
2542        for frame in frames {
2543            let bytes = self.read_frame_payload_bytes(&frame)?;
2544            active_payloads.insert(frame.id, bytes);
2545        }
2546
2547        let mut cursor = self.header.wal_offset + self.header.wal_size;
2548        self.file.seek(SeekFrom::Start(cursor))?;
2549        for frame in &mut self.toc.frames {
2550            if frame.status == FrameStatus::Active {
2551                if let Some(bytes) = active_payloads.get(&frame.id) {
2552                    self.file.write_all(bytes)?;
2553                    frame.payload_offset = cursor;
2554                    frame.payload_length = bytes.len() as u64;
2555                    cursor += bytes.len() as u64;
2556                } else {
2557                    frame.payload_offset = 0;
2558                    frame.payload_length = 0;
2559                }
2560            } else {
2561                frame.payload_offset = 0;
2562                frame.payload_length = 0;
2563            }
2564        }
2565
2566        self.data_end = cursor;
2567
2568        self.toc.segments.clear();
2569        self.toc.indexes.lex_segments.clear();
2570        self.toc.segment_catalog.lex_segments.clear();
2571        self.toc.segment_catalog.vec_segments.clear();
2572        self.toc.segment_catalog.time_segments.clear();
2573        #[cfg(feature = "temporal_track")]
2574        {
2575            self.toc.temporal_track = None;
2576            self.toc.segment_catalog.temporal_segments.clear();
2577        }
2578        #[cfg(feature = "lex")]
2579        {
2580            self.toc.segment_catalog.tantivy_segments.clear();
2581        }
2582        #[cfg(feature = "parallel_segments")]
2583        {
2584            self.toc.segment_catalog.index_segments.clear();
2585        }
2586
2587        // Clear in-memory Tantivy state so it doesn't write old segments on next commit
2588        #[cfg(feature = "lex")]
2589        {
2590            self.tantivy = None;
2591            self.tantivy_dirty = false;
2592        }
2593
2594        self.rebuild_indexes(&[])?;
2595        self.file.sync_all()?;
2596        Ok(())
2597    }
2598
2599    /// Preview how a document would be chunked without actually ingesting it.
2600    ///
2601    /// This is useful when you need to compute embeddings for each chunk externally
2602    /// before calling `put_with_chunk_embeddings()`. Returns `None` if the document
2603    /// is too small to be chunked (< 2400 chars after normalization).
2604    ///
2605    /// # Example
2606    /// ```ignore
2607    /// let chunks = mem.preview_chunks(b"long document text...")?;
2608    /// if let Some(chunk_texts) = chunks {
2609    ///     let embeddings = my_embedder.embed_chunks(&chunk_texts)?;
2610    ///     mem.put_with_chunk_embeddings(payload, None, embeddings, options)?;
2611    /// } else {
2612    ///     let embedding = my_embedder.embed_query(text)?;
2613    ///     mem.put_with_embedding_and_options(payload, embedding, options)?;
2614    /// }
2615    /// ```
2616    pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2617        plan_document_chunks(payload).map(|plan| plan.chunks)
2618    }
2619
2620    /// Append raw bytes as a document frame.
2621    pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2622        self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2623    }
2624
2625    /// Append raw bytes with explicit metadata/options.
2626    pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2627        self.put_internal(Some(payload), None, None, None, options, None)
2628    }
2629
2630    /// Append bytes and an existing embedding (bypasses on-device embedding).
2631    pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2632        self.put_internal(
2633            Some(payload),
2634            None,
2635            Some(embedding),
2636            None,
2637            PutOptions::default(),
2638            None,
2639        )
2640    }
2641
2642    pub fn put_with_embedding_and_options(
2643        &mut self,
2644        payload: &[u8],
2645        embedding: Vec<f32>,
2646        options: PutOptions,
2647    ) -> Result<u64> {
2648        self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2649    }
2650
2651    /// Ingest a document with pre-computed embeddings for both parent and chunks.
2652    ///
2653    /// This is the recommended API for high-accuracy semantic search when chunking
2654    /// occurs. The caller provides:
2655    /// - `payload`: The document bytes
2656    /// - `parent_embedding`: Embedding for the parent document (can be empty Vec if chunks have embeddings)
2657    /// - `chunk_embeddings`: Pre-computed embeddings for each chunk (matched by index)
2658    /// - `options`: Standard put options
2659    ///
2660    /// The number of chunk embeddings should match the number of chunks that will be
2661    /// created by the chunking algorithm. If fewer embeddings are provided than chunks,
2662    /// remaining chunks will have no embedding. If more are provided, extras are ignored.
2663    pub fn put_with_chunk_embeddings(
2664        &mut self,
2665        payload: &[u8],
2666        parent_embedding: Option<Vec<f32>>,
2667        chunk_embeddings: Vec<Vec<f32>>,
2668        options: PutOptions,
2669    ) -> Result<u64> {
2670        self.put_internal(
2671            Some(payload),
2672            None,
2673            parent_embedding,
2674            Some(chunk_embeddings),
2675            options,
2676            None,
2677        )
2678    }
2679
2680    /// Replace an existing frame's payload/metadata, keeping its identity and URI.
2681    pub fn update_frame(
2682        &mut self,
2683        frame_id: FrameId,
2684        payload: Option<Vec<u8>>,
2685        mut options: PutOptions,
2686        embedding: Option<Vec<f32>>,
2687    ) -> Result<u64> {
2688        self.ensure_mutation_allowed()?;
2689        let existing = self.frame_by_id(frame_id)?;
2690        if existing.status != FrameStatus::Active {
2691            return Err(MemvidError::InvalidFrame {
2692                frame_id,
2693                reason: "frame is not active",
2694            });
2695        }
2696
2697        if options.timestamp.is_none() {
2698            options.timestamp = Some(existing.timestamp);
2699        }
2700        if options.track.is_none() {
2701            options.track = existing.track.clone();
2702        }
2703        if options.kind.is_none() {
2704            options.kind = existing.kind.clone();
2705        }
2706        if options.uri.is_none() {
2707            options.uri = existing.uri.clone();
2708        }
2709        if options.title.is_none() {
2710            options.title = existing.title.clone();
2711        }
2712        if options.metadata.is_none() {
2713            options.metadata = existing.metadata.clone();
2714        }
2715        if options.search_text.is_none() {
2716            options.search_text = existing.search_text.clone();
2717        }
2718        if options.tags.is_empty() {
2719            options.tags = existing.tags.clone();
2720        }
2721        if options.labels.is_empty() {
2722            options.labels = existing.labels.clone();
2723        }
2724        if options.extra_metadata.is_empty() {
2725            options.extra_metadata = existing.extra_metadata.clone();
2726        }
2727
2728        let reuse_frame = if payload.is_none() {
2729            options.auto_tag = false;
2730            options.extract_dates = false;
2731            Some(existing.clone())
2732        } else {
2733            None
2734        };
2735
2736        let effective_embedding = if let Some(explicit) = embedding {
2737            Some(explicit)
2738        } else if self.vec_enabled {
2739            self.frame_embedding(frame_id)?
2740        } else {
2741            None
2742        };
2743
2744        let payload_slice = payload.as_deref();
2745        let reuse_flag = reuse_frame.is_some();
2746        let replace_flag = payload_slice.is_some();
2747        let seq = self.put_internal(
2748            payload_slice,
2749            reuse_frame,
2750            effective_embedding,
2751            None, // No chunk embeddings for update
2752            options,
2753            Some(frame_id),
2754        )?;
2755        info!(
2756            "frame_update frame_id={} seq={} reused_payload={} replaced_payload={}",
2757            frame_id, seq, reuse_flag, replace_flag
2758        );
2759        Ok(seq)
2760    }
2761
2762    pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
2763        self.ensure_mutation_allowed()?;
2764        let frame = self.frame_by_id(frame_id)?;
2765        if frame.status != FrameStatus::Active {
2766            return Err(MemvidError::InvalidFrame {
2767                frame_id,
2768                reason: "frame is not active",
2769            });
2770        }
2771
2772        let mut tombstone = WalEntryData {
2773            timestamp: SystemTime::now()
2774                .duration_since(UNIX_EPOCH)
2775                .map(|d| d.as_secs() as i64)
2776                .unwrap_or(frame.timestamp),
2777            kind: None,
2778            track: None,
2779            payload: Vec::new(),
2780            embedding: None,
2781            uri: frame.uri.clone(),
2782            title: frame.title.clone(),
2783            canonical_encoding: frame.canonical_encoding,
2784            canonical_length: frame.canonical_length,
2785            metadata: None,
2786            search_text: None,
2787            tags: Vec::new(),
2788            labels: Vec::new(),
2789            extra_metadata: BTreeMap::new(),
2790            content_dates: Vec::new(),
2791            chunk_manifest: None,
2792            role: frame.role,
2793            parent_sequence: None,
2794            chunk_index: frame.chunk_index,
2795            chunk_count: frame.chunk_count,
2796            op: FrameWalOp::Tombstone,
2797            target_frame_id: Some(frame_id),
2798            supersedes_frame_id: None,
2799            reuse_payload_from: None,
2800        };
2801        tombstone.kind = frame.kind.clone();
2802        tombstone.track = frame.track.clone();
2803
2804        let payload_bytes = encode_to_vec(&WalEntry::Frame(tombstone), wal_config())?;
2805        let seq = self.append_wal_entry(&payload_bytes)?;
2806        self.dirty = true;
2807        if self.wal.should_checkpoint() {
2808            self.commit()?;
2809        }
2810        info!("frame_delete frame_id={} seq={}", frame_id, seq);
2811        Ok(seq)
2812    }
2813}
2814
2815impl Memvid {
2816    fn put_internal(
2817        &mut self,
2818        payload: Option<&[u8]>,
2819        reuse_frame: Option<Frame>,
2820        embedding: Option<Vec<f32>>,
2821        chunk_embeddings: Option<Vec<Vec<f32>>>,
2822        mut options: PutOptions,
2823        supersedes: Option<FrameId>,
2824    ) -> Result<u64> {
2825        self.ensure_mutation_allowed()?;
2826        if payload.is_some() && reuse_frame.is_some() {
2827            let frame_id = reuse_frame
2828                .as_ref()
2829                .map(|frame| frame.id)
2830                .unwrap_or_default();
2831            return Err(MemvidError::InvalidFrame {
2832                frame_id,
2833                reason: "cannot reuse payload when bytes are provided",
2834            });
2835        }
2836
2837        // If the caller supplies embeddings, enforce a single vector dimension contract
2838        // for the entire memory (fail fast, never silently accept mixed dimensions).
2839        let incoming_dimension = {
2840            let mut dim: Option<u32> = None;
2841
2842            if let Some(ref vector) = embedding {
2843                if !vector.is_empty() {
2844                    dim = Some(vector.len() as u32);
2845                }
2846            }
2847
2848            if let Some(ref vectors) = chunk_embeddings {
2849                for vector in vectors {
2850                    if vector.is_empty() {
2851                        continue;
2852                    }
2853                    let vec_dim = vector.len() as u32;
2854                    match dim {
2855                        None => dim = Some(vec_dim),
2856                        Some(existing) if existing == vec_dim => {}
2857                        Some(existing) => {
2858                            return Err(MemvidError::VecDimensionMismatch {
2859                                expected: existing,
2860                                actual: vector.len(),
2861                            });
2862                        }
2863                    }
2864                }
2865            }
2866
2867            dim
2868        };
2869
2870        if let Some(incoming_dimension) = incoming_dimension {
2871            // Embeddings imply vector search should be enabled.
2872            if !self.vec_enabled {
2873                self.enable_vec()?;
2874            }
2875
2876            if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
2877                if existing_dimension != incoming_dimension {
2878                    return Err(MemvidError::VecDimensionMismatch {
2879                        expected: existing_dimension,
2880                        actual: incoming_dimension as usize,
2881                    });
2882                }
2883            }
2884
2885            // Persist the dimension early for better auto-detection (even before the next commit).
2886            if let Some(manifest) = self.toc.indexes.vec.as_mut() {
2887                if manifest.dimension == 0 {
2888                    manifest.dimension = incoming_dimension;
2889                }
2890            }
2891        }
2892
2893        let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
2894        let payload_tail = self.payload_region_end();
2895        let projected = if let Some(bytes) = payload {
2896            let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
2897            let len = prepared.len();
2898            prepared_payload = Some((prepared, encoding, length));
2899            payload_tail.saturating_add(len as u64)
2900        } else if reuse_frame.is_some() {
2901            payload_tail
2902        } else {
2903            return Err(MemvidError::InvalidFrame {
2904                frame_id: 0,
2905                reason: "payload required for frame insertion",
2906            });
2907        };
2908
2909        let capacity_limit = self.capacity_limit();
2910        if projected > capacity_limit {
2911            let incoming_size = projected.saturating_sub(payload_tail);
2912            return Err(MemvidError::CapacityExceeded {
2913                current: payload_tail,
2914                limit: capacity_limit,
2915                required: incoming_size,
2916            });
2917        }
2918        let timestamp = options.timestamp.take().unwrap_or_else(|| {
2919            SystemTime::now()
2920                .duration_since(UNIX_EPOCH)
2921                .map(|d| d.as_secs() as i64)
2922                .unwrap_or(0)
2923        });
2924
2925        let mut _reuse_bytes: Option<Vec<u8>> = None;
2926        let payload_for_processing = if let Some(bytes) = payload {
2927            Some(bytes)
2928        } else if let Some(frame) = reuse_frame.as_ref() {
2929            let bytes = self.frame_canonical_bytes(frame)?;
2930            _reuse_bytes = Some(bytes);
2931            _reuse_bytes.as_deref()
2932        } else {
2933            None
2934        };
2935
2936        // Try to create a chunk plan from raw UTF-8 bytes first
2937        let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
2938            (Some(bytes), None) => plan_document_chunks(bytes),
2939            _ => None,
2940        };
2941
2942        // For UTF-8 text chunks, we don't store the parent payload (chunks contain the text)
2943        // For binary documents (PDF, etc.), we store the original payload and create text chunks separately
2944        let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
2945            if raw_chunk_plan.is_some() {
2946                // UTF-8 text document - chunks contain the text, no parent payload needed
2947                (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
2948            } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
2949                (prepared, encoding, length, None)
2950            } else if let Some(bytes) = payload {
2951                let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
2952                (prepared, encoding, length, None)
2953            } else if let Some(frame) = reuse_frame.as_ref() {
2954                (
2955                    Vec::new(),
2956                    frame.canonical_encoding,
2957                    frame.canonical_length,
2958                    Some(frame.id),
2959                )
2960            } else {
2961                return Err(MemvidError::InvalidFrame {
2962                    frame_id: 0,
2963                    reason: "payload required for frame insertion",
2964                });
2965            };
2966
2967        // Track whether we'll create an extracted text chunk plan later
2968        let mut chunk_plan = raw_chunk_plan;
2969
2970        let mut metadata = options.metadata.take();
2971        let mut search_text = options
2972            .search_text
2973            .take()
2974            .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
2975        let mut tags = std::mem::take(&mut options.tags);
2976        let mut labels = std::mem::take(&mut options.labels);
2977        let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
2978        let mut content_dates: Vec<String> = Vec::new();
2979
2980        let need_search_text = search_text
2981            .as_ref()
2982            .map_or(true, |text| text.trim().is_empty());
2983        let need_metadata = metadata.is_none();
2984        let run_extractor = need_search_text || need_metadata || options.auto_tag;
2985
2986        let mut extraction_error = None;
2987        let extracted = if run_extractor {
2988            if let Some(bytes) = payload_for_processing {
2989                let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
2990                let uri_hint = options.uri.as_deref();
2991                match extract_via_registry(bytes, mime_hint, uri_hint) {
2992                    Ok(doc) => Some(doc),
2993                    Err(err) => {
2994                        extraction_error = Some(err);
2995                        None
2996                    }
2997                }
2998            } else {
2999                None
3000            }
3001        } else {
3002            None
3003        };
3004
3005        if let Some(err) = extraction_error {
3006            return Err(err);
3007        }
3008
3009        if let Some(doc) = &extracted {
3010            if need_search_text {
3011                if let Some(text) = &doc.text {
3012                    if let Some(normalized) =
3013                        normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3014                    {
3015                        search_text = Some(normalized);
3016                    }
3017                }
3018            }
3019
3020            // If we don't have a chunk plan from raw bytes (e.g., PDF), try to create one
3021            // from extracted text. This ensures large documents like PDFs get fully indexed.
3022            if chunk_plan.is_none() {
3023                if let Some(text) = &doc.text {
3024                    chunk_plan = plan_text_chunks(text);
3025                }
3026            }
3027
3028            if let Some(mime) = doc.mime_type.as_ref() {
3029                match &mut metadata {
3030                    Some(existing) => {
3031                        if existing.mime.is_none() {
3032                            existing.mime = Some(mime.clone());
3033                        }
3034                    }
3035                    None => {
3036                        let mut doc_meta = DocMetadata::default();
3037                        doc_meta.mime = Some(mime.clone());
3038                        metadata = Some(doc_meta);
3039                    }
3040                }
3041            }
3042
3043            if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3044                extra_metadata
3045                    .entry("extractous_metadata".to_string())
3046                    .or_insert(meta_json);
3047            }
3048        }
3049
3050        if options.auto_tag {
3051            if let Some(ref text) = search_text {
3052                if !text.trim().is_empty() {
3053                    let result = AutoTagger::default().analyse(text, options.extract_dates);
3054                    merge_unique(&mut tags, result.tags);
3055                    merge_unique(&mut labels, result.labels);
3056                    if options.extract_dates && content_dates.is_empty() {
3057                        content_dates = result.content_dates;
3058                    }
3059                }
3060            }
3061        }
3062
3063        if content_dates.is_empty() {
3064            if let Some(frame) = reuse_frame.as_ref() {
3065                content_dates = frame.content_dates.clone();
3066            }
3067        }
3068
3069        let metadata_ref = metadata.as_ref();
3070        let mut search_text = augment_search_text(
3071            search_text,
3072            options.uri.as_deref(),
3073            options.title.as_deref(),
3074            options.track.as_deref(),
3075            &tags,
3076            &labels,
3077            &extra_metadata,
3078            &content_dates,
3079            metadata_ref,
3080        );
3081        let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3082        let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3083        let mut parent_chunk_count: Option<u32> = None;
3084
3085        let kind_value = options.kind.take();
3086        let track_value = options.track.take();
3087        let uri_value = options.uri.take();
3088        let title_value = options.title.take();
3089        let should_extract_triplets = options.extract_triplets;
3090        // Save references for triplet extraction (after search_text is moved into WAL entry)
3091        let triplet_uri = uri_value.clone();
3092        let triplet_title = title_value.clone();
3093
3094        if let Some(plan) = chunk_plan.as_ref() {
3095            let chunk_total = plan.chunks.len() as u32;
3096            parent_chunk_manifest = Some(plan.manifest.clone());
3097            parent_chunk_count = Some(chunk_total);
3098
3099            if let Some(first_chunk) = plan.chunks.first() {
3100                if let Some(normalized) =
3101                    normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3102                {
3103                    if !normalized.trim().is_empty() {
3104                        search_text = Some(normalized);
3105                    }
3106                }
3107            }
3108
3109            let chunk_tags = tags.clone();
3110            let chunk_labels = labels.clone();
3111            let chunk_metadata = metadata.clone();
3112            let chunk_extra_metadata = extra_metadata.clone();
3113            let chunk_content_dates = content_dates.clone();
3114
3115            for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3116                let (chunk_payload, chunk_encoding, chunk_length) =
3117                    prepare_canonical_payload(chunk_text.as_bytes())?;
3118                let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3119                    .map(|n| n.text)
3120                    .filter(|text| !text.trim().is_empty());
3121
3122                let chunk_uri = uri_value
3123                    .as_ref()
3124                    .map(|uri| format!("{uri}#page-{}", idx + 1));
3125                let chunk_title = title_value
3126                    .as_ref()
3127                    .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3128
3129                // Use provided chunk embedding if available, otherwise None
3130                let chunk_embedding = chunk_embeddings
3131                    .as_ref()
3132                    .and_then(|embeddings| embeddings.get(idx).cloned());
3133
3134                chunk_entries.push(WalEntryData {
3135                    timestamp,
3136                    kind: kind_value.clone(),
3137                    track: track_value.clone(),
3138                    payload: chunk_payload,
3139                    embedding: chunk_embedding,
3140                    uri: chunk_uri,
3141                    title: chunk_title,
3142                    canonical_encoding: chunk_encoding,
3143                    canonical_length: chunk_length,
3144                    metadata: chunk_metadata.clone(),
3145                    search_text: chunk_search_text,
3146                    tags: chunk_tags.clone(),
3147                    labels: chunk_labels.clone(),
3148                    extra_metadata: chunk_extra_metadata.clone(),
3149                    content_dates: chunk_content_dates.clone(),
3150                    chunk_manifest: None,
3151                    role: FrameRole::DocumentChunk,
3152                    parent_sequence: None,
3153                    chunk_index: Some(idx as u32),
3154                    chunk_count: Some(chunk_total),
3155                    op: FrameWalOp::Insert,
3156                    target_frame_id: None,
3157                    supersedes_frame_id: None,
3158                    reuse_payload_from: None,
3159                });
3160            }
3161        }
3162
3163        let parent_uri = uri_value.clone();
3164        let parent_title = title_value.clone();
3165
3166        // Get parent_sequence from options.parent_id if provided
3167        // We need the WAL sequence of the parent frame to link them
3168        let parent_sequence = if let Some(parent_id) = options.parent_id {
3169            // Look up the parent frame to get its WAL sequence
3170            // Since frame.id corresponds to the array index, we need to find the sequence
3171            // For now, we'll use the frame_id + WAL_START_SEQUENCE as an approximation
3172            // This works because sequence numbers are assigned incrementally
3173            self.toc
3174                .frames
3175                .get(parent_id as usize)
3176                .map(|_| parent_id + 2) // WAL sequences start at 2
3177        } else {
3178            None
3179        };
3180
3181        // Clone search_text for triplet extraction (before it's moved into WAL entry)
3182        let triplet_text = search_text.clone();
3183
3184        let entry = WalEntryData {
3185            timestamp,
3186            kind: kind_value,
3187            track: track_value,
3188            payload: storage_payload,
3189            embedding,
3190            uri: parent_uri,
3191            title: parent_title,
3192            canonical_encoding,
3193            canonical_length,
3194            metadata,
3195            search_text,
3196            tags,
3197            labels,
3198            extra_metadata,
3199            content_dates,
3200            chunk_manifest: parent_chunk_manifest,
3201            role: options.role,
3202            parent_sequence,
3203            chunk_index: None,
3204            chunk_count: parent_chunk_count,
3205            op: FrameWalOp::Insert,
3206            target_frame_id: None,
3207            supersedes_frame_id: supersedes,
3208            reuse_payload_from,
3209        };
3210
3211        let parent_bytes = encode_to_vec(&WalEntry::Frame(entry), wal_config())?;
3212        let parent_seq = self.append_wal_entry(&parent_bytes)?;
3213        self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3214
3215        for mut chunk_entry in chunk_entries {
3216            chunk_entry.parent_sequence = Some(parent_seq);
3217            let chunk_bytes = encode_to_vec(&WalEntry::Frame(chunk_entry), wal_config())?;
3218            self.append_wal_entry(&chunk_bytes)?;
3219            self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3220        }
3221
3222        self.dirty = true;
3223        if self.wal.should_checkpoint() {
3224            self.commit()?;
3225        }
3226
3227        // Record the put action if a replay session is active
3228        #[cfg(feature = "replay")]
3229        if let Some(input_bytes) = payload {
3230            self.record_put_action(parent_seq, input_bytes);
3231        }
3232
3233        // Extract triplets if enabled (default: true)
3234        // Triplets are stored as MemoryCards with entity/slot/value structure
3235        if should_extract_triplets {
3236            if let Some(ref text) = triplet_text {
3237                if !text.trim().is_empty() {
3238                    let extractor = TripletExtractor::default();
3239                    let frame_id = parent_seq as FrameId;
3240                    let (cards, _stats) = extractor.extract(
3241                        frame_id,
3242                        text,
3243                        triplet_uri.as_deref(),
3244                        triplet_title.as_deref(),
3245                        timestamp,
3246                    );
3247
3248                    if !cards.is_empty() {
3249                        // Add cards to memories track
3250                        let card_ids = self.memories_track.add_cards(cards);
3251
3252                        // Record enrichment for incremental processing
3253                        self.memories_track.record_enrichment(
3254                            frame_id,
3255                            "rules",
3256                            "1.0.0",
3257                            card_ids,
3258                        );
3259                    }
3260                }
3261            }
3262        }
3263
3264        Ok(parent_seq)
3265    }
3266}
3267
3268#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3269#[serde(rename_all = "snake_case")]
3270pub(crate) enum FrameWalOp {
3271    Insert,
3272    Tombstone,
3273}
3274
3275impl Default for FrameWalOp {
3276    fn default() -> Self {
3277        Self::Insert
3278    }
3279}
3280
3281#[derive(Debug, Serialize, Deserialize)]
3282enum WalEntry {
3283    Frame(WalEntryData),
3284    #[cfg(feature = "lex")]
3285    Lex(LexWalBatch),
3286}
3287
3288fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3289    if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3290        return Ok(entry);
3291    }
3292    let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3293    Ok(WalEntry::Frame(legacy))
3294}
3295
3296#[derive(Debug, Serialize, Deserialize)]
3297pub(crate) struct WalEntryData {
3298    pub(crate) timestamp: i64,
3299    pub(crate) kind: Option<String>,
3300    pub(crate) track: Option<String>,
3301    pub(crate) payload: Vec<u8>,
3302    pub(crate) embedding: Option<Vec<f32>>,
3303    #[serde(default)]
3304    pub(crate) uri: Option<String>,
3305    #[serde(default)]
3306    pub(crate) title: Option<String>,
3307    #[serde(default)]
3308    pub(crate) canonical_encoding: CanonicalEncoding,
3309    #[serde(default)]
3310    pub(crate) canonical_length: Option<u64>,
3311    #[serde(default)]
3312    pub(crate) metadata: Option<DocMetadata>,
3313    #[serde(default)]
3314    pub(crate) search_text: Option<String>,
3315    #[serde(default)]
3316    pub(crate) tags: Vec<String>,
3317    #[serde(default)]
3318    pub(crate) labels: Vec<String>,
3319    #[serde(default)]
3320    pub(crate) extra_metadata: BTreeMap<String, String>,
3321    #[serde(default)]
3322    pub(crate) content_dates: Vec<String>,
3323    #[serde(default)]
3324    pub(crate) chunk_manifest: Option<TextChunkManifest>,
3325    #[serde(default)]
3326    pub(crate) role: FrameRole,
3327    #[serde(default)]
3328    pub(crate) parent_sequence: Option<u64>,
3329    #[serde(default)]
3330    pub(crate) chunk_index: Option<u32>,
3331    #[serde(default)]
3332    pub(crate) chunk_count: Option<u32>,
3333    #[serde(default)]
3334    pub(crate) op: FrameWalOp,
3335    #[serde(default)]
3336    pub(crate) target_frame_id: Option<FrameId>,
3337    #[serde(default)]
3338    pub(crate) supersedes_frame_id: Option<FrameId>,
3339    #[serde(default)]
3340    pub(crate) reuse_payload_from: Option<FrameId>,
3341}
3342
3343pub(crate) fn prepare_canonical_payload(
3344    payload: &[u8],
3345) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3346    if std::str::from_utf8(payload).is_ok() {
3347        let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3348        Ok((
3349            compressed,
3350            CanonicalEncoding::Zstd,
3351            Some(payload.len() as u64),
3352        ))
3353    } else {
3354        Ok((
3355            payload.to_vec(),
3356            CanonicalEncoding::Plain,
3357            Some(payload.len() as u64),
3358        ))
3359    }
3360}
3361
3362pub(crate) fn augment_search_text(
3363    base: Option<String>,
3364    uri: Option<&str>,
3365    title: Option<&str>,
3366    track: Option<&str>,
3367    tags: &[String],
3368    labels: &[String],
3369    extra_metadata: &BTreeMap<String, String>,
3370    content_dates: &[String],
3371    metadata: Option<&DocMetadata>,
3372) -> Option<String> {
3373    let mut segments: Vec<String> = Vec::new();
3374    if let Some(text) = base {
3375        let trimmed = text.trim();
3376        if !trimmed.is_empty() {
3377            segments.push(trimmed.to_string());
3378        }
3379    }
3380
3381    if let Some(title) = title {
3382        if !title.trim().is_empty() {
3383            segments.push(format!("title: {}", title.trim()));
3384        }
3385    }
3386
3387    if let Some(uri) = uri {
3388        if !uri.trim().is_empty() {
3389            segments.push(format!("uri: {}", uri.trim()));
3390        }
3391    }
3392
3393    if let Some(track) = track {
3394        if !track.trim().is_empty() {
3395            segments.push(format!("track: {}", track.trim()));
3396        }
3397    }
3398
3399    if !tags.is_empty() {
3400        segments.push(format!("tags: {}", tags.join(" ")));
3401    }
3402
3403    if !labels.is_empty() {
3404        segments.push(format!("labels: {}", labels.join(" ")));
3405    }
3406
3407    if !extra_metadata.is_empty() {
3408        for (key, value) in extra_metadata {
3409            if value.trim().is_empty() {
3410                continue;
3411            }
3412            segments.push(format!("{}: {}", key, value));
3413        }
3414    }
3415
3416    if !content_dates.is_empty() {
3417        segments.push(format!("dates: {}", content_dates.join(" ")));
3418    }
3419
3420    if let Some(meta) = metadata {
3421        if let Ok(meta_json) = serde_json::to_string(meta) {
3422            segments.push(format!("metadata: {}", meta_json));
3423        }
3424    }
3425
3426    if segments.is_empty() {
3427        None
3428    } else {
3429        Some(segments.join("\n"))
3430    }
3431}
3432
3433pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3434    if additions.is_empty() {
3435        return;
3436    }
3437    let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3438    for value in additions {
3439        let trimmed = value.trim();
3440        if trimmed.is_empty() {
3441            continue;
3442        }
3443        let candidate = trimmed.to_string();
3444        if seen.insert(candidate.clone()) {
3445            target.push(candidate);
3446        }
3447    }
3448}