Skip to main content

memvid_core/memvid/
lifecycle.rs

1//! Lifecycle management for creating and opening `.mv2` memories.
2//!
3//! Responsibilities:
4//! - Enforce single-file invariant (no sidecars) and take OS locks.
5//! - Bootstrap headers, internal WAL, and TOC on create, and recover them on open.
6//! - Validate TOC/footer layout, recover the latest valid footer when needed.
7//! - Wire up index state (lex/vector/time) without mutating payload bytes.
8
9use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_TINY};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31    FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, PutManyOpts, SchemaRegistry,
32    SegmentCatalog, SketchTrack, TicketRef, Tier, Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44/// Primary handle for interacting with a `.mv2` memory file.
45///
46/// Holds the file descriptor, lock, header, TOC, and in-memory index state. Mutations
47/// append to the embedded WAL and are materialized at commit time to keep the layout deterministic.
48pub struct Memvid {
49    pub(crate) file: File,
50    pub(crate) path: PathBuf,
51    pub(crate) lock: FileLock,
52    pub(crate) read_only: bool,
53    pub(crate) header: Header,
54    pub(crate) toc: Toc,
55    pub(crate) wal: EmbeddedWal,
56    /// Number of frame inserts appended to WAL but not yet materialized into `toc.frames`.
57    ///
58    /// This lets frontends predict stable frame IDs before an explicit commit.
59    pub(crate) pending_frame_inserts: u64,
60    pub(crate) data_end: u64,
61    /// Cached end of the payload region (max of payload_offset + payload_length across all frames).
62    /// Updated incrementally on frame insert to avoid O(n) scans.
63    pub(crate) cached_payload_end: u64,
64    pub(crate) generation: u64,
65    pub(crate) lock_settings: LockSettings,
66    pub(crate) lex_enabled: bool,
67    pub(crate) lex_index: Option<LexIndex>,
68    #[cfg(feature = "lex")]
69    #[allow(dead_code)]
70    pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
71    pub(crate) vec_enabled: bool,
72    pub(crate) vec_compression: VectorCompression,
73    pub(crate) vec_index: Option<VecIndex>,
74    /// CLIP visual embeddings index (separate from vec due to different dimensions)
75    pub(crate) clip_enabled: bool,
76    pub(crate) clip_index: Option<crate::clip::ClipIndex>,
77    pub(crate) dirty: bool,
78    #[cfg(feature = "lex")]
79    pub(crate) tantivy: Option<TantivyEngine>,
80    #[cfg(feature = "lex")]
81    pub(crate) tantivy_dirty: bool,
82    #[cfg(feature = "temporal_track")]
83    pub(crate) temporal_track: Option<TemporalTrack>,
84    #[cfg(feature = "parallel_segments")]
85    pub(crate) manifest_wal: Option<ManifestWal>,
86    /// In-memory track for structured memory cards.
87    pub(crate) memories_track: MemoriesTrack,
88    /// In-memory Logic-Mesh graph for entity-relationship traversal.
89    pub(crate) logic_mesh: LogicMesh,
90    /// In-memory sketch track for fast candidate generation.
91    pub(crate) sketch_track: SketchTrack,
92    /// Schema registry for predicate validation.
93    pub(crate) schema_registry: SchemaRegistry,
94    /// Whether to enforce strict schema validation on card insert.
95    pub(crate) schema_strict: bool,
96    /// Active batch mode options (set by `begin_batch`, cleared by `end_batch`).
97    pub(crate) batch_opts: Option<PutManyOpts>,
98    /// Active replay session being recorded (if any).
99    #[cfg(feature = "replay")]
100    pub(crate) active_session: Option<crate::replay::ActiveSession>,
101    /// Completed sessions stored in memory (until persisted to file).
102    #[cfg(feature = "replay")]
103    pub(crate) completed_sessions: Vec<crate::replay::ReplaySession>,
104}
105
106/// Controls read-only open behaviour for `.mv2` memories.
107#[derive(Debug, Clone, Copy, Default)]
108pub struct OpenReadOptions {
109    pub allow_repair: bool,
110}
111
112#[derive(Debug, Clone)]
113pub struct LockSettings {
114    pub timeout_ms: u64,
115    pub heartbeat_ms: u64,
116    pub stale_grace_ms: u64,
117    pub force_stale: bool,
118    pub command: Option<String>,
119}
120
121impl Default for LockSettings {
122    fn default() -> Self {
123        Self {
124            timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
125            heartbeat_ms: DEFAULT_HEARTBEAT_MS,
126            stale_grace_ms: DEFAULT_STALE_GRACE_MS,
127            force_stale: false,
128            command: None,
129        }
130    }
131}
132
133impl Memvid {
134    /// Create a new, empty `.mv2` file with an embedded WAL and empty TOC.
135    /// The file is locked exclusively for the lifetime of the handle.
136    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
137        let path_ref = path.as_ref();
138        ensure_single_file(path_ref)?;
139
140        OpenOptions::new()
141            .read(true)
142            .write(true)
143            .create(true)
144            .truncate(true)
145            .open(path_ref)?;
146        let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
147
148        let header = Header {
149            magic: MAGIC,
150            version: SPEC_VERSION,
151            footer_offset: WAL_OFFSET + WAL_SIZE_TINY,
152            wal_offset: WAL_OFFSET,
153            wal_size: WAL_SIZE_TINY,
154            wal_checkpoint_pos: 0,
155            wal_sequence: 0,
156            toc_checksum: [0u8; 32],
157        };
158
159        let mut toc = empty_toc();
160        // If lex feature is enabled, set the catalog flag immediately
161        #[cfg(feature = "lex")]
162        {
163            toc.segment_catalog.lex_enabled = true;
164        }
165        file.set_len(header.footer_offset)?;
166        HeaderCodec::write(&mut file, &header)?;
167
168        let wal = EmbeddedWal::open(&file, &header)?;
169        let data_end = header.footer_offset;
170        #[cfg(feature = "lex")]
171        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
172        #[cfg(feature = "parallel_segments")]
173        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
174        #[cfg(feature = "parallel_segments")]
175        let manifest_wal_entries = manifest_wal.replay()?;
176
177        // No frames yet, so payload region ends at WAL boundary
178        let cached_payload_end = header.wal_offset + header.wal_size;
179
180        let mut memvid = Self {
181            file,
182            path: path_ref.to_path_buf(),
183            lock,
184            read_only: false,
185            header,
186            toc,
187            wal,
188            pending_frame_inserts: 0,
189            data_end,
190            cached_payload_end,
191            generation: 0,
192            lock_settings: LockSettings::default(),
193            lex_enabled: cfg!(feature = "lex"), // Enable by default if feature is enabled
194            lex_index: None,
195            #[cfg(feature = "lex")]
196            lex_storage,
197            vec_enabled: cfg!(feature = "vec"), // Enable by default if feature is enabled
198            vec_compression: VectorCompression::None,
199            vec_index: None,
200            clip_enabled: cfg!(feature = "clip"), // Enable by default if feature is enabled
201            clip_index: None,
202            dirty: false,
203            #[cfg(feature = "lex")]
204            tantivy: None,
205            #[cfg(feature = "lex")]
206            tantivy_dirty: false,
207            #[cfg(feature = "temporal_track")]
208            temporal_track: None,
209            #[cfg(feature = "parallel_segments")]
210            manifest_wal: Some(manifest_wal),
211            memories_track: MemoriesTrack::new(),
212            logic_mesh: LogicMesh::new(),
213            sketch_track: SketchTrack::default(),
214            schema_registry: SchemaRegistry::new(),
215            schema_strict: false,
216            batch_opts: None,
217            #[cfg(feature = "replay")]
218            active_session: None,
219            #[cfg(feature = "replay")]
220            completed_sessions: Vec::new(),
221        };
222
223        #[cfg(feature = "lex")]
224        memvid.init_tantivy()?;
225
226        #[cfg(feature = "parallel_segments")]
227        memvid.load_manifest_segments(manifest_wal_entries);
228
229        memvid.bootstrap_segment_catalog();
230
231        // Create empty manifests for enabled indexes so they persist across open/close
232        let empty_offset = memvid.data_end;
233        let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
234                                \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
235
236        #[cfg(feature = "lex")]
237        if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
238            memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
239                doc_count: 0,
240                generation: 0,
241                bytes_offset: empty_offset,
242                bytes_length: 0,
243                checksum: empty_checksum,
244            });
245        }
246
247        #[cfg(feature = "vec")]
248        if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
249            memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
250                vector_count: 0,
251                dimension: 0,
252                bytes_offset: empty_offset,
253                bytes_length: 0,
254                checksum: empty_checksum,
255                compression_mode: memvid.vec_compression.clone(),
256            });
257        }
258
259        memvid.rewrite_toc_footer()?;
260        memvid.header.toc_checksum = memvid.toc.toc_checksum;
261        crate::persist_header(&mut memvid.file, &memvid.header)?;
262        memvid.file.sync_all()?;
263        Ok(memvid)
264    }
265
266    #[must_use]
267    pub fn lock_settings(&self) -> &LockSettings {
268        &self.lock_settings
269    }
270
271    pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
272        &mut self.lock_settings
273    }
274
275    /// Set the vector compression mode for this memory
276    /// Must be called before ingesting documents with embeddings
277    pub fn set_vector_compression(&mut self, compression: VectorCompression) {
278        self.vec_compression = compression;
279    }
280
281    /// Get the current vector compression mode
282    #[must_use]
283    pub fn vector_compression(&self) -> &VectorCompression {
284        &self.vec_compression
285    }
286
287    /// Predict the next frame ID that would be assigned to a new insert.
288    ///
289    /// Frame IDs are dense indices into `toc.frames`. When a memory is mutable, inserts are first
290    /// appended to the embedded WAL and only materialized into `toc.frames` on commit. This helper
291    /// lets frontends allocate stable frame IDs before an explicit commit.
292    #[must_use]
293    pub fn next_frame_id(&self) -> u64 {
294        (self.toc.frames.len() as u64).saturating_add(self.pending_frame_inserts)
295    }
296
297    /// Returns the total number of frames in the memory.
298    ///
299    /// This includes all frames regardless of status (active, deleted, etc.).
300    #[must_use]
301    pub fn frame_count(&self) -> usize {
302        self.toc.frames.len()
303    }
304
305    fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
306        // Fast-path detection for encrypted capsules (.mv2e).
307        // This avoids confusing "invalid header" errors and provides an actionable hint.
308        let mut magic = [0u8; 4];
309        let is_mv2e = file.read_exact(&mut magic).is_ok() && magic == *b"MV2E";
310        file.seek(SeekFrom::Start(0))?;
311        if is_mv2e {
312            return Err(MemvidError::EncryptedFile {
313                path: path_ref.to_path_buf(),
314                hint: format!("Run: memvid unlock {}", path_ref.display()),
315            });
316        }
317
318        let mut header = HeaderCodec::read(&mut file)?;
319        let toc = match read_toc(&mut file, &header) {
320            Ok(toc) => toc,
321            Err(err @ (MemvidError::Decode(_) | MemvidError::InvalidToc { .. })) => {
322                tracing::info!("toc decode failed ({}); attempting recovery", err);
323                let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
324                if recovered_offset != header.footer_offset
325                    || header.toc_checksum != toc.toc_checksum
326                {
327                    header.footer_offset = recovered_offset;
328                    header.toc_checksum = toc.toc_checksum;
329                    crate::persist_header(&mut file, &header)?;
330                }
331                toc
332            }
333            Err(err) => return Err(err),
334        };
335        let checksum_result = toc.verify_checksum();
336
337        // Validate segment integrity early to catch corruption before loading indexes
338        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
339        if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
340            tracing::warn!("Segment integrity validation failed: {}", e);
341            // Don't fail file open - let doctor handle it
342            // This is just an early warning system
343        }
344        ensure_non_overlapping_frames(&toc, file_len)?;
345
346        let wal = EmbeddedWal::open(&file, &header)?;
347        #[cfg(feature = "lex")]
348        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
349            toc.indexes.lex.as_ref(),
350            &toc.indexes.lex_segments,
351        )));
352        #[cfg(feature = "parallel_segments")]
353        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
354        #[cfg(feature = "parallel_segments")]
355        let manifest_wal_entries = manifest_wal.replay()?;
356
357        let generation = detect_generation(&file)?.unwrap_or(0);
358        let read_only = lock.mode() == LockMode::Shared;
359
360        let mut memvid = Self {
361            file,
362            path: path_ref.to_path_buf(),
363            lock,
364            read_only,
365            header,
366            toc,
367            wal,
368            pending_frame_inserts: 0,
369            data_end: 0,
370            cached_payload_end: 0,
371            generation,
372            lock_settings: LockSettings::default(),
373            lex_enabled: false,
374            lex_index: None,
375            #[cfg(feature = "lex")]
376            lex_storage,
377            vec_enabled: false,
378            vec_compression: VectorCompression::None,
379            vec_index: None,
380            clip_enabled: false,
381            clip_index: None,
382            dirty: false,
383            #[cfg(feature = "lex")]
384            tantivy: None,
385            #[cfg(feature = "lex")]
386            tantivy_dirty: false,
387            #[cfg(feature = "temporal_track")]
388            temporal_track: None,
389            #[cfg(feature = "parallel_segments")]
390            manifest_wal: Some(manifest_wal),
391            memories_track: MemoriesTrack::new(),
392            logic_mesh: LogicMesh::new(),
393            sketch_track: SketchTrack::default(),
394            schema_registry: SchemaRegistry::new(),
395            schema_strict: false,
396            batch_opts: None,
397            #[cfg(feature = "replay")]
398            active_session: None,
399            #[cfg(feature = "replay")]
400            completed_sessions: Vec::new(),
401        };
402        memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
403        // One-time O(n) scan to initialize cached_payload_end from existing frames
404        memvid.cached_payload_end = compute_payload_region_end(&memvid.toc, &memvid.header);
405        // Use consolidated helper for lex_enabled check
406        memvid.lex_enabled = has_lex_index(&memvid.toc);
407        if memvid.lex_enabled {
408            memvid.load_lex_index_from_manifest()?;
409        }
410        #[cfg(feature = "lex")]
411        {
412            memvid.init_tantivy()?;
413        }
414        memvid.vec_enabled =
415            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
416        if memvid.vec_enabled {
417            memvid.load_vec_index_from_manifest()?;
418        }
419        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
420        if memvid.clip_enabled {
421            memvid.load_clip_index_from_manifest()?;
422        }
423        memvid.recover_wal()?;
424        #[cfg(feature = "parallel_segments")]
425        memvid.load_manifest_segments(manifest_wal_entries);
426        memvid.bootstrap_segment_catalog();
427        #[cfg(feature = "temporal_track")]
428        memvid.ensure_temporal_track_loaded()?;
429        memvid.load_memories_track()?;
430        memvid.load_logic_mesh()?;
431        memvid.load_sketch_track()?;
432        if checksum_result.is_err() {
433            memvid.toc.verify_checksum()?;
434            if memvid.toc.toc_checksum != memvid.header.toc_checksum {
435                memvid.header.toc_checksum = memvid.toc.toc_checksum;
436                crate::persist_header(&mut memvid.file, &memvid.header)?;
437                memvid.file.sync_all()?;
438            }
439        }
440        Ok(memvid)
441    }
442
443    /// Open an existing `.mv2` with exclusive access, performing recovery if needed.
444    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
445        let path_ref = path.as_ref();
446        ensure_single_file(path_ref)?;
447
448        let (file, lock) = FileLock::open_and_lock(path_ref)?;
449        Self::open_locked(file, lock, path_ref)
450    }
451
452    pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
453        Self::open_read_only_with_options(path, OpenReadOptions::default())
454    }
455
456    pub fn open_read_only_with_options<P: AsRef<Path>>(
457        path: P,
458        options: OpenReadOptions,
459    ) -> Result<Self> {
460        let path_ref = path.as_ref();
461        ensure_single_file(path_ref)?;
462
463        if options.allow_repair {
464            return Self::open(path_ref);
465        }
466
467        Self::open_read_only_snapshot(path_ref)
468    }
469
470    fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
471        let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
472        let TailSnapshot {
473            toc,
474            footer_offset,
475            data_end,
476            generation,
477        } = load_tail_snapshot(&file)?;
478
479        let mut header = HeaderCodec::read(&mut file)?;
480        header.footer_offset = footer_offset;
481        header.toc_checksum = toc.toc_checksum;
482
483        let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
484        let wal = EmbeddedWal::open_read_only(&file, &header)?;
485
486        #[cfg(feature = "lex")]
487        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
488            toc.indexes.lex.as_ref(),
489            &toc.indexes.lex_segments,
490        )));
491
492        let cached_payload_end = compute_payload_region_end(&toc, &header);
493
494        let mut memvid = Self {
495            file,
496            path: path_ref.to_path_buf(),
497            lock,
498            read_only: true,
499            header,
500            toc,
501            wal,
502            pending_frame_inserts: 0,
503            data_end,
504            cached_payload_end,
505            generation,
506            lock_settings: LockSettings::default(),
507            lex_enabled: false,
508            lex_index: None,
509            #[cfg(feature = "lex")]
510            lex_storage,
511            vec_enabled: false,
512            vec_compression: VectorCompression::None,
513            vec_index: None,
514            clip_enabled: false,
515            clip_index: None,
516            dirty: false,
517            #[cfg(feature = "lex")]
518            tantivy: None,
519            #[cfg(feature = "lex")]
520            tantivy_dirty: false,
521            #[cfg(feature = "temporal_track")]
522            temporal_track: None,
523            #[cfg(feature = "parallel_segments")]
524            manifest_wal: None,
525            memories_track: MemoriesTrack::new(),
526            logic_mesh: LogicMesh::new(),
527            sketch_track: SketchTrack::default(),
528            schema_registry: SchemaRegistry::new(),
529            schema_strict: false,
530            batch_opts: None,
531            #[cfg(feature = "replay")]
532            active_session: None,
533            #[cfg(feature = "replay")]
534            completed_sessions: Vec::new(),
535        };
536
537        // Use consolidated helper for lex_enabled check
538        memvid.lex_enabled = has_lex_index(&memvid.toc);
539        if memvid.lex_enabled {
540            memvid.load_lex_index_from_manifest()?;
541        }
542        #[cfg(feature = "lex")]
543        memvid.init_tantivy()?;
544
545        memvid.vec_enabled =
546            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
547        if memvid.vec_enabled {
548            memvid.load_vec_index_from_manifest()?;
549        }
550        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
551        if memvid.clip_enabled {
552            memvid.load_clip_index_from_manifest()?;
553        }
554        // Load memories track, Logic-Mesh, and sketch track if present
555        memvid.load_memories_track()?;
556        memvid.load_logic_mesh()?;
557        memvid.load_sketch_track()?;
558
559        memvid.bootstrap_segment_catalog();
560        #[cfg(feature = "temporal_track")]
561        memvid.ensure_temporal_track_loaded()?;
562
563        Ok(memvid)
564    }
565
566    pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
567        let path_ref = path.as_ref();
568        ensure_single_file(path_ref)?;
569
570        let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
571        let lock = match FileLock::try_acquire(&file, path_ref)? {
572            Some(lock) => lock,
573            None => {
574                return Err(MemvidError::Lock(
575                    "exclusive access unavailable for doctor".to_string(),
576                ));
577            }
578        };
579        Self::open_locked(file, lock, path_ref)
580    }
581
582    fn bootstrap_segment_catalog(&mut self) {
583        let catalog = &mut self.toc.segment_catalog;
584        if catalog.version == 0 {
585            catalog.version = 1;
586        }
587        if catalog.next_segment_id == 0 {
588            let mut max_id = 0u64;
589            for descriptor in &catalog.lex_segments {
590                max_id = max_id.max(descriptor.common.segment_id);
591            }
592            for descriptor in &catalog.vec_segments {
593                max_id = max_id.max(descriptor.common.segment_id);
594            }
595            for descriptor in &catalog.time_segments {
596                max_id = max_id.max(descriptor.common.segment_id);
597            }
598            #[cfg(feature = "temporal_track")]
599            for descriptor in &catalog.temporal_segments {
600                max_id = max_id.max(descriptor.common.segment_id);
601            }
602            #[cfg(feature = "parallel_segments")]
603            for descriptor in &catalog.index_segments {
604                max_id = max_id.max(descriptor.common.segment_id);
605            }
606            if max_id > 0 {
607                catalog.next_segment_id = max_id.saturating_add(1);
608            }
609        }
610    }
611
612    #[cfg(feature = "parallel_segments")]
613    fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
614        if entries.is_empty() {
615            return;
616        }
617        for entry in entries {
618            let duplicate = self
619                .toc
620                .segment_catalog
621                .index_segments
622                .iter()
623                .any(|existing| existing.common.segment_id == entry.common.segment_id);
624            if !duplicate {
625                self.toc.segment_catalog.index_segments.push(entry);
626            }
627        }
628    }
629
630    /// Load the memories track from the manifest if present.
631    fn load_memories_track(&mut self) -> Result<()> {
632        let manifest = match &self.toc.memories_track {
633            Some(m) => m,
634            None => return Ok(()),
635        };
636
637        // Read the compressed data from the file
638        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
639            return Err(MemvidError::InvalidToc {
640                reason: "memories track exceeds safety limit".into(),
641            });
642        }
643        // Safe: guarded by MAX_INDEX_BYTES check above
644        #[allow(clippy::cast_possible_truncation)]
645        let mut buf = vec![0u8; manifest.bytes_length as usize];
646        self.file
647            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
648        self.file.read_exact(&mut buf)?;
649
650        // Verify checksum
651        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
652        if actual_checksum != manifest.checksum {
653            return Err(MemvidError::InvalidToc {
654                reason: "memories track checksum mismatch".into(),
655            });
656        }
657
658        // Deserialize the memories track
659        self.memories_track = MemoriesTrack::deserialize(&buf)?;
660
661        Ok(())
662    }
663
664    /// Load the Logic-Mesh from the manifest if present.
665    fn load_logic_mesh(&mut self) -> Result<()> {
666        let manifest = match &self.toc.logic_mesh {
667            Some(m) => m,
668            None => return Ok(()),
669        };
670
671        // Read the serialized data from the file
672        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
673            return Err(MemvidError::InvalidToc {
674                reason: "logic mesh exceeds safety limit".into(),
675            });
676        }
677        // Safe: guarded by MAX_INDEX_BYTES check above
678        #[allow(clippy::cast_possible_truncation)]
679        let mut buf = vec![0u8; manifest.bytes_length as usize];
680        self.file
681            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
682        self.file.read_exact(&mut buf)?;
683
684        // Verify checksum
685        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
686        if actual_checksum != manifest.checksum {
687            return Err(MemvidError::InvalidToc {
688                reason: "logic mesh checksum mismatch".into(),
689            });
690        }
691
692        // Deserialize the logic mesh
693        self.logic_mesh = LogicMesh::deserialize(&buf)?;
694
695        Ok(())
696    }
697
698    /// Load the sketch track from the manifest if present.
699    fn load_sketch_track(&mut self) -> Result<()> {
700        let manifest = match &self.toc.sketch_track {
701            Some(m) => m.clone(),
702            None => return Ok(()),
703        };
704
705        // Read and deserialize the sketch track (read_sketch_track handles seeking and checksum)
706        self.sketch_track = crate::types::read_sketch_track(
707            &mut self.file,
708            manifest.bytes_offset,
709            manifest.bytes_length,
710        )?;
711
712        Ok(())
713    }
714
715    #[cfg(feature = "temporal_track")]
716    pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
717        if self.temporal_track.is_some() {
718            return Ok(());
719        }
720        let manifest = match &self.toc.temporal_track {
721            Some(manifest) => manifest.clone(),
722            None => return Ok(()),
723        };
724        if manifest.bytes_length == 0 {
725            return Ok(());
726        }
727        let file_len = self.file.metadata()?.len();
728        let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
729            return Ok(());
730        };
731        if end > file_len {
732            return Ok(());
733        }
734        match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
735            Ok(track) => self.temporal_track = Some(track),
736            Err(MemvidError::InvalidTemporalTrack { .. }) => {
737                return Ok(());
738            }
739            Err(err) => return Err(err),
740        }
741        Ok(())
742    }
743
744    #[cfg(feature = "temporal_track")]
745    pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
746        self.ensure_temporal_track_loaded()?;
747        Ok(self.temporal_track.as_ref())
748    }
749
750    #[cfg(feature = "temporal_track")]
751    pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
752        self.ensure_temporal_track_loaded()?;
753        let Some(track) = self.temporal_track.as_ref() else {
754            return Ok(None);
755        };
756        if !track.capabilities().has_anchors {
757            return Ok(None);
758        }
759        Ok(track
760            .anchor_for_frame(frame_id)
761            .map(|anchor| anchor.anchor_ts))
762    }
763
764    #[cfg(feature = "temporal_track")]
765    pub(crate) fn clear_temporal_track_cache(&mut self) {
766        self.temporal_track = None;
767    }
768
769    #[cfg(feature = "temporal_track")]
770    pub(crate) fn effective_temporal_timestamp(
771        &mut self,
772        frame_id: FrameId,
773        fallback: i64,
774    ) -> Result<i64> {
775        Ok(self
776            .temporal_anchor_timestamp(frame_id)?
777            .unwrap_or(fallback))
778    }
779
780    #[cfg(not(feature = "temporal_track"))]
781    pub(crate) fn effective_temporal_timestamp(
782        &mut self,
783        _frame_id: crate::types::FrameId,
784        fallback: i64,
785    ) -> Result<i64> {
786        Ok(fallback)
787    }
788
789    /// Get current memory binding information.
790    ///
791    /// Returns the binding if this file is bound to a dashboard memory,
792    /// or None if unbound.
793    #[must_use]
794    pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
795        self.toc.memory_binding.as_ref()
796    }
797
798    /// Bind this file to a dashboard memory.
799    ///
800    /// This stores the binding in the TOC and applies a temporary ticket for initial binding.
801    /// The caller should follow up with `apply_signed_ticket` for cryptographic verification.
802    ///
803    /// # Errors
804    ///
805    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
806    #[allow(deprecated)]
807    pub fn bind_memory(
808        &mut self,
809        binding: crate::types::MemoryBinding,
810        ticket: crate::types::Ticket,
811    ) -> Result<()> {
812        // Check existing binding
813        if let Some(existing) = self.get_memory_binding() {
814            if existing.memory_id != binding.memory_id {
815                return Err(MemvidError::MemoryAlreadyBound {
816                    existing_memory_id: existing.memory_id,
817                    existing_memory_name: existing.memory_name.clone(),
818                    bound_at: existing.bound_at.to_rfc3339(),
819                });
820            }
821        }
822
823        // Apply ticket for capacity
824        self.apply_ticket(ticket)?;
825
826        // Store binding in TOC
827        self.toc.memory_binding = Some(binding);
828        self.dirty = true;
829
830        Ok(())
831    }
832
833    /// Set only the memory binding without applying a ticket.
834    ///
835    /// This is used when the caller will immediately follow up with `apply_signed_ticket`
836    /// to apply the cryptographically verified ticket. This avoids the sequence number
837    /// conflict that occurs when using `bind_memory` with a temporary ticket.
838    ///
839    /// # Errors
840    ///
841    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
842    pub fn set_memory_binding_only(&mut self, binding: crate::types::MemoryBinding) -> Result<()> {
843        self.ensure_writable()?;
844
845        // Check existing binding
846        if let Some(existing) = self.get_memory_binding() {
847            if existing.memory_id != binding.memory_id {
848                return Err(MemvidError::MemoryAlreadyBound {
849                    existing_memory_id: existing.memory_id,
850                    existing_memory_name: existing.memory_name.clone(),
851                    bound_at: existing.bound_at.to_rfc3339(),
852                });
853            }
854        }
855
856        // Store binding in TOC (without applying a ticket)
857        self.toc.memory_binding = Some(binding);
858        self.dirty = true;
859
860        Ok(())
861    }
862
863    /// Unbind this file from its dashboard memory.
864    ///
865    /// This clears the binding and reverts to free tier capacity (1 GB).
866    pub fn unbind_memory(&mut self) -> Result<()> {
867        self.toc.memory_binding = None;
868        // Revert to free tier
869        self.toc.ticket_ref = crate::types::TicketRef {
870            issuer: "free-tier".into(),
871            seq_no: 1,
872            expires_in_secs: 0,
873            capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
874            verified: false,
875        };
876        self.dirty = true;
877        Ok(())
878    }
879}
880
881pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
882    use crate::footer::{CommitFooter, FOOTER_SIZE};
883
884    let len = file.metadata()?.len();
885    if len < header.footer_offset {
886        return Err(MemvidError::InvalidToc {
887            reason: "footer offset beyond file length".into(),
888        });
889    }
890
891    // Read the entire region from footer_offset to EOF (includes TOC + footer)
892    file.seek(SeekFrom::Start(header.footer_offset))?;
893    // Safe: total_size bounded by file length, and we check MAX_INDEX_BYTES before reading
894    #[allow(clippy::cast_possible_truncation)]
895    let total_size = (len - header.footer_offset) as usize;
896    if total_size as u64 > crate::MAX_INDEX_BYTES {
897        return Err(MemvidError::InvalidToc {
898            reason: "toc region exceeds safety limit".into(),
899        });
900    }
901
902    if total_size < FOOTER_SIZE {
903        return Err(MemvidError::InvalidToc {
904            reason: "region too small to contain footer".into(),
905        });
906    }
907
908    let mut buf = Vec::with_capacity(total_size);
909    file.read_to_end(&mut buf)?;
910
911    // Parse the footer (last FOOTER_SIZE bytes)
912    let footer_start = buf.len() - FOOTER_SIZE;
913    let footer_bytes = &buf[footer_start..];
914    let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
915        reason: "failed to decode commit footer".into(),
916    })?;
917
918    // Extract only the TOC bytes (excluding the footer)
919    let toc_bytes = &buf[..footer_start];
920    #[allow(clippy::cast_possible_truncation)]
921    if toc_bytes.len() != footer.toc_len as usize {
922        return Err(MemvidError::InvalidToc {
923            reason: "toc length mismatch".into(),
924        });
925    }
926    if !footer.hash_matches(toc_bytes) {
927        return Err(MemvidError::InvalidToc {
928            reason: "commit footer toc hash mismatch".into(),
929        });
930    }
931
932    verify_toc_prefix(toc_bytes)?;
933    let toc = Toc::decode(toc_bytes)?;
934    Ok(toc)
935}
936
937fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
938    const MAX_SEGMENTS: u64 = 1_000_000;
939    const MAX_FRAMES: u64 = 1_000_000;
940    const MIN_SEGMENT_META_BYTES: u64 = 32;
941    const MIN_FRAME_BYTES: u64 = 64;
942    // TOC trailer layout (little-endian):
943    // [toc_version:u64][segments_len:u64][frames_len:u64]...
944    let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
945        let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
946            reason: context.to_string().into(),
947        })?;
948        let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
949            reason: context.to_string().into(),
950        })?;
951        Ok(u64::from_le_bytes(array))
952    };
953
954    if bytes.len() < 24 {
955        return Err(MemvidError::InvalidToc {
956            reason: "toc trailer too small".into(),
957        });
958    }
959    let toc_version = read_u64(0..8, "toc version missing or truncated")?;
960    if toc_version > 32 {
961        return Err(MemvidError::InvalidToc {
962            reason: "toc version unreasonable".into(),
963        });
964    }
965    let segments_len = read_u64(8..16, "segment count missing or truncated")?;
966    if segments_len > MAX_SEGMENTS {
967        return Err(MemvidError::InvalidToc {
968            reason: "segment count unreasonable".into(),
969        });
970    }
971    let frames_len = read_u64(16..24, "frame count missing or truncated")?;
972    if frames_len > MAX_FRAMES {
973        return Err(MemvidError::InvalidToc {
974            reason: "frame count unreasonable".into(),
975        });
976    }
977    let required = segments_len
978        .saturating_mul(MIN_SEGMENT_META_BYTES)
979        .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
980    if required > bytes.len() as u64 {
981        return Err(MemvidError::InvalidToc {
982            reason: "toc payload inconsistent with counts".into(),
983        });
984    }
985    Ok(())
986}
987
988/// Ensure frame payloads do not overlap each other or exceed file boundary.
989///
990/// Frames in the TOC are ordered by `frame_id`, not by `payload_offset`, so we must
991/// sort by `payload_offset` before checking for overlaps.
992///
993/// Note: Frames with `payload_length` == 0 are "virtual" frames (e.g., document
994/// frames that reference chunks) and are skipped from this check.
995fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
996    // Collect active frames with actual payloads and sort by payload_offset
997    let mut frames_by_offset: Vec<_> = toc
998        .frames
999        .iter()
1000        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1001        .collect();
1002    frames_by_offset.sort_by_key(|f| f.payload_offset);
1003
1004    let mut previous_end = 0u64;
1005    for frame in frames_by_offset {
1006        let end = frame
1007            .payload_offset
1008            .checked_add(frame.payload_length)
1009            .ok_or_else(|| MemvidError::InvalidToc {
1010                reason: "frame payload offsets overflow".into(),
1011            })?;
1012        if end > file_len {
1013            return Err(MemvidError::InvalidToc {
1014                reason: "frame payload exceeds file length".into(),
1015            });
1016        }
1017        if frame.payload_offset < previous_end {
1018            return Err(MemvidError::InvalidToc {
1019                reason: format!(
1020                    "frame {} payload overlaps with previous frame (offset {} < previous end {})",
1021                    frame.id, frame.payload_offset, previous_end
1022                )
1023                .into(),
1024            });
1025        }
1026        previous_end = end;
1027    }
1028    Ok(())
1029}
1030
1031pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
1032    let len = file.metadata()?.len();
1033    // Safety: we only create a read-only mapping over stable file bytes.
1034    let mmap = unsafe { Mmap::map(&*file)? };
1035    tracing::debug!(file_len = len, "attempting toc recovery");
1036
1037    // First, try to find a valid footer which includes validated TOC bytes
1038    if let Some(footer_slice) = find_last_valid_footer(&mmap) {
1039        tracing::debug!(
1040            footer_offset = footer_slice.footer_offset,
1041            toc_offset = footer_slice.toc_offset,
1042            toc_len = footer_slice.toc_bytes.len(),
1043            "found valid footer during recovery"
1044        );
1045        // The footer has already validated the TOC hash, so we can directly decode it
1046        match Toc::decode(footer_slice.toc_bytes) {
1047            Ok(toc) => {
1048                return Ok((toc, footer_slice.toc_offset as u64));
1049            }
1050            Err(err) => {
1051                tracing::warn!(
1052                    error = %err,
1053                    "footer-validated TOC failed to decode, falling back to scan"
1054                );
1055            }
1056        }
1057    }
1058
1059    // If we have a header-provided hint (`footer_offset`) but the commit footer itself is corrupted,
1060    // we can often still recover because the TOC bytes are intact. In that case, assume the TOC
1061    // spans from `hint` up to the final fixed-size commit footer and decode it best-effort.
1062    if let Some(hint_offset) = hint {
1063        use crate::footer::FOOTER_SIZE;
1064
1065        // Safe: file successfully mmapped so length fits in usize
1066        #[allow(clippy::cast_possible_truncation)]
1067        let start = (hint_offset.min(len)) as usize;
1068        if mmap.len().saturating_sub(start) >= FOOTER_SIZE {
1069            let toc_end = mmap.len().saturating_sub(FOOTER_SIZE);
1070            if toc_end > start {
1071                let toc_bytes = &mmap[start..toc_end];
1072                if verify_toc_prefix(toc_bytes).is_ok() {
1073                    let attempt = panic::catch_unwind(|| Toc::decode(toc_bytes));
1074                    if let Ok(Ok(toc)) = attempt {
1075                        tracing::debug!(
1076                            recovered_offset = hint_offset,
1077                            recovered_frames = toc.frames.len(),
1078                            "recovered toc from hinted offset without validated footer"
1079                        );
1080                        return Ok((toc, hint_offset));
1081                    }
1082                }
1083            }
1084        }
1085    }
1086
1087    // Fallback to manual scan if footer-based recovery failed
1088    let mut ranges = Vec::new();
1089    if let Some(hint_offset) = hint {
1090        // Safe: file successfully mmapped so length fits in usize
1091        #[allow(clippy::cast_possible_truncation)]
1092        let hint_idx = hint_offset.min(len) as usize;
1093        ranges.push((hint_idx, mmap.len()));
1094        if hint_idx > 0 {
1095            ranges.push((0, hint_idx));
1096        }
1097    } else {
1098        ranges.push((0, mmap.len()));
1099    }
1100
1101    for (start, end) in ranges {
1102        if let Some(found) = scan_range_for_toc(&mmap, start, end) {
1103            return Ok(found);
1104        }
1105    }
1106
1107    Err(MemvidError::InvalidToc {
1108        reason: "unable to recover table of contents from file trailer".into(),
1109    })
1110}
1111
1112fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
1113    if start >= end || end > data.len() {
1114        return None;
1115    }
1116    const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
1117    const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
1118
1119    // We only ever consider offsets where the candidate TOC slice would be <= MAX_TOC_BYTES,
1120    // otherwise the loop devolves into iterating over the entire file for large memories.
1121    let min_offset = data.len().saturating_sub(MAX_TOC_BYTES);
1122    let scan_start = start.max(min_offset);
1123
1124    for offset in (scan_start..end).rev() {
1125        let slice = &data[offset..];
1126        if slice.len() < 16 {
1127            continue;
1128        }
1129        debug_assert!(slice.len() <= MAX_TOC_BYTES);
1130
1131        // No footer found - try old format with checksum
1132        if slice.len() < ZERO_CHECKSUM.len() {
1133            continue;
1134        }
1135        let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
1136        let mut hasher = Hasher::new();
1137        hasher.update(body);
1138        hasher.update(&ZERO_CHECKSUM);
1139        if hasher.finalize().as_bytes() != stored_checksum {
1140            continue;
1141        }
1142        if verify_toc_prefix(slice).is_err() {
1143            continue;
1144        }
1145        let attempt = panic::catch_unwind(|| Toc::decode(slice));
1146        if let Ok(Ok(toc)) = attempt {
1147            let recovered_offset = offset as u64;
1148            tracing::debug!(
1149                recovered_offset,
1150                recovered_frames = toc.frames.len(),
1151                "recovered toc via scan"
1152            );
1153            return Some((toc, recovered_offset));
1154        }
1155    }
1156    None
1157}
1158
1159pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
1160    toc.toc_checksum = [0u8; 32];
1161    let bytes = toc.encode()?;
1162    let checksum = Toc::calculate_checksum(&bytes);
1163    toc.toc_checksum = checksum;
1164    toc.encode()
1165}
1166
1167pub(crate) fn empty_toc() -> Toc {
1168    Toc {
1169        toc_version: 0,
1170        segments: Vec::new(),
1171        frames: Vec::new(),
1172        indexes: IndexManifests::default(),
1173        time_index: None,
1174        temporal_track: None,
1175        memories_track: None,
1176        logic_mesh: None,
1177        sketch_track: None,
1178        segment_catalog: SegmentCatalog::default(),
1179        ticket_ref: TicketRef {
1180            issuer: "free-tier".into(),
1181            seq_no: 1,
1182            expires_in_secs: 0,
1183            capacity_bytes: Tier::Free.capacity_bytes(),
1184            verified: false,
1185        },
1186        memory_binding: None,
1187        replay_manifest: None,
1188        enrichment_queue: crate::types::EnrichmentQueueManifest::default(),
1189        merkle_root: [0u8; 32],
1190        toc_checksum: [0u8; 32],
1191    }
1192}
1193
1194/// Compute the end of the payload region from frame payloads only.
1195/// Used once at open time to seed `cached_payload_end`.
1196pub(crate) fn compute_payload_region_end(toc: &Toc, header: &Header) -> u64 {
1197    let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1198    let mut max_end = wal_region_end;
1199    for frame in &toc.frames {
1200        if frame.payload_length != 0 {
1201            if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1202                max_end = max_end.max(end);
1203            }
1204        }
1205    }
1206    max_end
1207}
1208
1209pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1210    // `data_end` tracks the end of all data bytes that should not be overwritten by appends:
1211    // - frame payloads
1212    // - embedded indexes / metadata segments referenced by the TOC
1213    // - the current footer boundary (TOC offset), since callers may safely overwrite old TOCs
1214    //
1215    // Keeping this conservative prevents WAL replay / appends from corrupting embedded segments.
1216    let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1217    let mut max_end = wal_region_end.max(header.footer_offset);
1218
1219    // Frame payloads (active only).
1220    for frame in toc
1221        .frames
1222        .iter()
1223        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1224    {
1225        if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1226            max_end = max_end.max(end);
1227        }
1228    }
1229
1230    // Segment catalog entries.
1231    let catalog = &toc.segment_catalog;
1232    for seg in &catalog.lex_segments {
1233        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1234            max_end = max_end.max(end);
1235        }
1236    }
1237    for seg in &catalog.vec_segments {
1238        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1239            max_end = max_end.max(end);
1240        }
1241    }
1242    for seg in &catalog.time_segments {
1243        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1244            max_end = max_end.max(end);
1245        }
1246    }
1247    #[cfg(feature = "temporal_track")]
1248    for seg in &catalog.temporal_segments {
1249        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1250            max_end = max_end.max(end);
1251        }
1252    }
1253    #[cfg(feature = "lex")]
1254    for seg in &catalog.tantivy_segments {
1255        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1256            max_end = max_end.max(end);
1257        }
1258    }
1259    #[cfg(feature = "parallel_segments")]
1260    for seg in &catalog.index_segments {
1261        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1262            max_end = max_end.max(end);
1263        }
1264    }
1265
1266    // Global manifests (non-segment storage paths).
1267    if let Some(manifest) = toc.indexes.lex.as_ref() {
1268        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1269            max_end = max_end.max(end);
1270        }
1271    }
1272    if let Some(manifest) = toc.indexes.vec.as_ref() {
1273        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1274            max_end = max_end.max(end);
1275        }
1276    }
1277    if let Some(manifest) = toc.indexes.clip.as_ref() {
1278        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1279            max_end = max_end.max(end);
1280        }
1281    }
1282    if let Some(manifest) = toc.time_index.as_ref() {
1283        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1284            max_end = max_end.max(end);
1285        }
1286    }
1287    #[cfg(feature = "temporal_track")]
1288    if let Some(track) = toc.temporal_track.as_ref() {
1289        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1290            max_end = max_end.max(end);
1291        }
1292    }
1293    if let Some(track) = toc.memories_track.as_ref() {
1294        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1295            max_end = max_end.max(end);
1296        }
1297    }
1298    if let Some(mesh) = toc.logic_mesh.as_ref() {
1299        if let Some(end) = mesh.bytes_offset.checked_add(mesh.bytes_length) {
1300            max_end = max_end.max(end);
1301        }
1302    }
1303    if let Some(track) = toc.sketch_track.as_ref() {
1304        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1305            max_end = max_end.max(end);
1306        }
1307    }
1308    #[cfg(feature = "replay")]
1309    if let Some(manifest) = toc.replay_manifest.as_ref() {
1310        if let Some(end) = manifest.segment_offset.checked_add(manifest.segment_size) {
1311            max_end = max_end.max(end);
1312        }
1313    }
1314
1315    tracing::debug!(
1316        wal_region_end,
1317        footer_offset = header.footer_offset,
1318        computed_data_end = max_end,
1319        "compute_data_end"
1320    );
1321
1322    max_end
1323}
1324
1325struct TailSnapshot {
1326    toc: Toc,
1327    footer_offset: u64,
1328    data_end: u64,
1329    generation: u64,
1330}
1331
1332fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1333    const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1334    if mmap.is_empty() {
1335        return None;
1336    }
1337    let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1338    loop {
1339        let start = mmap.len() - window;
1340        if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1341            return Some((slice, start));
1342        }
1343        if window == mmap.len() {
1344            break;
1345        }
1346        window = (window * 2).min(mmap.len());
1347    }
1348    None
1349}
1350
1351fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1352    // Safety: we only create a read-only mapping over the stable file bytes.
1353    let mmap = unsafe { Mmap::map(file)? };
1354
1355    let (slice, offset_adjustment) =
1356        locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1357            reason: "no valid commit footer found".into(),
1358        })?;
1359    let toc = Toc::decode(slice.toc_bytes)?;
1360    toc.verify_checksum()?;
1361
1362    Ok(TailSnapshot {
1363        toc,
1364        footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1365        // Using toc_offset causes stale data_end that moves footer backwards on next commit
1366        data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1367        generation: slice.footer.generation,
1368    })
1369}
1370
1371fn detect_generation(file: &File) -> Result<Option<u64>> {
1372    // Safety: read-only mapping for footer inspection.
1373    let mmap = unsafe { Mmap::map(file)? };
1374
1375    Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1376}
1377
1378pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1379    if let Some(parent) = path.parent() {
1380        let name = path
1381            .file_name()
1382            .and_then(|n| n.to_str())
1383            .unwrap_or_default();
1384        let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1385        for suffix in forbidden {
1386            let candidate = parent.join(format!("{name}{suffix}"));
1387            if candidate.exists() {
1388                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1389            }
1390        }
1391        let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1392        for suffix in hidden_forbidden {
1393            let candidate = parent.join(format!(".{name}{suffix}"));
1394            if candidate.exists() {
1395                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1396            }
1397        }
1398    }
1399    Ok(())
1400}
1401
1402#[cfg(feature = "parallel_segments")]
1403fn manifest_wal_path(path: &Path) -> PathBuf {
1404    let mut wal_path = path.to_path_buf();
1405    wal_path.set_extension("manifest.wal");
1406    wal_path
1407}
1408
1409#[cfg(feature = "parallel_segments")]
1410pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1411    let wal_path = manifest_wal_path(path);
1412    if wal_path.exists() {
1413        let _ = std::fs::remove_file(&wal_path);
1414    }
1415}
1416
1417/// Single source of truth: does this TOC have a lexical index?
1418/// Checks all possible locations: old manifest, `lex_segments`, and `tantivy_segments`.
1419pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1420    toc.segment_catalog.lex_enabled
1421        || toc.indexes.lex.is_some()
1422        || !toc.indexes.lex_segments.is_empty()
1423        || !toc.segment_catalog.tantivy_segments.is_empty()
1424}
1425
1426/// Single source of truth: expected document count for lex index.
1427/// Returns None if we can't determine (e.g., Tantivy segments without manifest).
1428#[cfg(feature = "lex")]
1429pub(crate) fn lex_doc_count(
1430    toc: &Toc,
1431    lex_storage: &crate::search::EmbeddedLexStorage,
1432) -> Option<u64> {
1433    // First try old manifest
1434    if let Some(manifest) = &toc.indexes.lex {
1435        if manifest.doc_count > 0 {
1436            return Some(manifest.doc_count);
1437        }
1438    }
1439
1440    // Then try lex_storage (contains info from lex_segments)
1441    let storage_count = lex_storage.doc_count();
1442    if storage_count > 0 {
1443        return Some(storage_count);
1444    }
1445
1446    // For Tantivy files with segments but no manifest/storage doc_count,
1447    // we can't know doc count without loading the index.
1448    // Return None and let caller decide (init_tantivy should trust segments exist)
1449    None
1450}
1451
1452/// Validates segment integrity on file open to catch corruption early.
1453/// This helps doctor by detecting issues before they cause problems.
1454#[allow(dead_code)]
1455fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1456    let data_limit = header.footer_offset;
1457
1458    // Validate replay segment (if present). Replay is stored AT the footer boundary,
1459    // and footer_offset is moved forward after writing. So we only check against file_len,
1460    // not against footer_offset (which would be after the replay segment).
1461    #[cfg(feature = "replay")]
1462    if let Some(manifest) = toc.replay_manifest.as_ref() {
1463        if manifest.segment_size != 0 {
1464            let end = manifest
1465                .segment_offset
1466                .checked_add(manifest.segment_size)
1467                .ok_or_else(|| MemvidError::Doctor {
1468                    reason: format!(
1469                        "Replay segment offset overflow: {} + {}",
1470                        manifest.segment_offset, manifest.segment_size
1471                    ),
1472                })?;
1473
1474            // Only check against file_len - replay segments sit at the footer boundary
1475            // and footer_offset is updated to point after them
1476            if end > file_len {
1477                return Err(MemvidError::Doctor {
1478                    reason: format!(
1479                        "Replay segment out of bounds: offset={}, length={}, end={}, file_len={}",
1480                        manifest.segment_offset, manifest.segment_size, end, file_len
1481                    ),
1482                });
1483            }
1484        }
1485    }
1486
1487    // Validate Tantivy segments
1488    for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1489        let offset = seg.common.bytes_offset;
1490        let length = seg.common.bytes_length;
1491
1492        if length == 0 {
1493            continue; // Empty segments are okay
1494        }
1495
1496        let end = offset
1497            .checked_add(length)
1498            .ok_or_else(|| MemvidError::Doctor {
1499                reason: format!("Tantivy segment {idx} offset overflow: {offset} + {length}"),
1500            })?;
1501
1502        if end > file_len || end > data_limit {
1503            return Err(MemvidError::Doctor {
1504                reason: format!(
1505                    "Tantivy segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1506                ),
1507            });
1508        }
1509    }
1510
1511    // Validate time index segments
1512    for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1513        let offset = seg.common.bytes_offset;
1514        let length = seg.common.bytes_length;
1515
1516        if length == 0 {
1517            continue;
1518        }
1519
1520        let end = offset
1521            .checked_add(length)
1522            .ok_or_else(|| MemvidError::Doctor {
1523                reason: format!("Time segment {idx} offset overflow: {offset} + {length}"),
1524            })?;
1525
1526        if end > file_len || end > data_limit {
1527            return Err(MemvidError::Doctor {
1528                reason: format!(
1529                    "Time segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1530                ),
1531            });
1532        }
1533    }
1534
1535    // Validate vec segments
1536    for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1537        let offset = seg.common.bytes_offset;
1538        let length = seg.common.bytes_length;
1539
1540        if length == 0 {
1541            continue;
1542        }
1543
1544        let end = offset
1545            .checked_add(length)
1546            .ok_or_else(|| MemvidError::Doctor {
1547                reason: format!("Vec segment {idx} offset overflow: {offset} + {length}"),
1548            })?;
1549
1550        if end > file_len || end > data_limit {
1551            return Err(MemvidError::Doctor {
1552                reason: format!(
1553                    "Vec segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1554                ),
1555            });
1556        }
1557    }
1558
1559    log::debug!("✓ Segment integrity validation passed");
1560    Ok(())
1561}
1562
1563#[cfg(test)]
1564mod tests {
1565    use super::*;
1566    use tempfile::tempdir;
1567
1568    #[test]
1569    fn toc_prefix_underflow_surfaces_reason() {
1570        let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1571        match err {
1572            MemvidError::InvalidToc { reason } => {
1573                assert!(
1574                    reason.contains("trailer too small"),
1575                    "unexpected reason: {reason}"
1576                );
1577            }
1578            other => panic!("unexpected error: {other:?}"),
1579        }
1580    }
1581
1582    #[test]
1583    fn ensure_single_file_blocks_sidecars() {
1584        let dir = tempdir().expect("tmp");
1585        let path = dir.path().join("mem.mv2");
1586        std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1587        let result = Memvid::create(&path);
1588        assert!(matches!(
1589            result,
1590            Err(MemvidError::AuxiliaryFileDetected { .. })
1591        ));
1592    }
1593}