Skip to main content

memvid_core/memvid/
lifecycle.rs

1//! Lifecycle management for creating and opening `.mv2` memories.
2//!
3//! Responsibilities:
4//! - Enforce single-file invariant (no sidecars) and take OS locks.
5//! - Bootstrap headers, internal WAL, and TOC on create, and recover them on open.
6//! - Validate TOC/footer layout, recover the latest valid footer when needed.
7//! - Wire up index state (lex/vector/time) without mutating payload bytes.
8
9use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_TINY};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31    FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, SchemaRegistry, SegmentCatalog,
32    SketchTrack, TicketRef, Tier, Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44/// Primary handle for interacting with a `.mv2` memory file.
45///
46/// Holds the file descriptor, lock, header, TOC, and in-memory index state. Mutations
47/// append to the embedded WAL and are materialized at commit time to keep the layout deterministic.
48pub struct Memvid {
49    pub(crate) file: File,
50    pub(crate) path: PathBuf,
51    pub(crate) lock: FileLock,
52    pub(crate) read_only: bool,
53    pub(crate) header: Header,
54    pub(crate) toc: Toc,
55    pub(crate) wal: EmbeddedWal,
56    /// Number of frame inserts appended to WAL but not yet materialized into `toc.frames`.
57    ///
58    /// This lets frontends predict stable frame IDs before an explicit commit.
59    pub(crate) pending_frame_inserts: u64,
60    pub(crate) data_end: u64,
61    pub(crate) generation: u64,
62    pub(crate) lock_settings: LockSettings,
63    pub(crate) lex_enabled: bool,
64    pub(crate) lex_index: Option<LexIndex>,
65    #[cfg(feature = "lex")]
66    #[allow(dead_code)]
67    pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
68    pub(crate) vec_enabled: bool,
69    pub(crate) vec_compression: VectorCompression,
70    pub(crate) vec_index: Option<VecIndex>,
71    /// CLIP visual embeddings index (separate from vec due to different dimensions)
72    pub(crate) clip_enabled: bool,
73    pub(crate) clip_index: Option<crate::clip::ClipIndex>,
74    pub(crate) dirty: bool,
75    #[cfg(feature = "lex")]
76    pub(crate) tantivy: Option<TantivyEngine>,
77    #[cfg(feature = "lex")]
78    pub(crate) tantivy_dirty: bool,
79    #[cfg(feature = "temporal_track")]
80    pub(crate) temporal_track: Option<TemporalTrack>,
81    #[cfg(feature = "parallel_segments")]
82    pub(crate) manifest_wal: Option<ManifestWal>,
83    /// In-memory track for structured memory cards.
84    pub(crate) memories_track: MemoriesTrack,
85    /// In-memory Logic-Mesh graph for entity-relationship traversal.
86    pub(crate) logic_mesh: LogicMesh,
87    /// In-memory sketch track for fast candidate generation.
88    pub(crate) sketch_track: SketchTrack,
89    /// Schema registry for predicate validation.
90    pub(crate) schema_registry: SchemaRegistry,
91    /// Whether to enforce strict schema validation on card insert.
92    pub(crate) schema_strict: bool,
93    /// Active replay session being recorded (if any).
94    #[cfg(feature = "replay")]
95    pub(crate) active_session: Option<crate::replay::ActiveSession>,
96    /// Completed sessions stored in memory (until persisted to file).
97    #[cfg(feature = "replay")]
98    pub(crate) completed_sessions: Vec<crate::replay::ReplaySession>,
99}
100
101/// Controls read-only open behaviour for `.mv2` memories.
102#[derive(Debug, Clone, Copy, Default)]
103pub struct OpenReadOptions {
104    pub allow_repair: bool,
105}
106
107#[derive(Debug, Clone)]
108pub struct LockSettings {
109    pub timeout_ms: u64,
110    pub heartbeat_ms: u64,
111    pub stale_grace_ms: u64,
112    pub force_stale: bool,
113    pub command: Option<String>,
114}
115
116impl Default for LockSettings {
117    fn default() -> Self {
118        Self {
119            timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
120            heartbeat_ms: DEFAULT_HEARTBEAT_MS,
121            stale_grace_ms: DEFAULT_STALE_GRACE_MS,
122            force_stale: false,
123            command: None,
124        }
125    }
126}
127
128impl Memvid {
129    /// Create a new, empty `.mv2` file with an embedded WAL and empty TOC.
130    /// The file is locked exclusively for the lifetime of the handle.
131    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
132        let path_ref = path.as_ref();
133        ensure_single_file(path_ref)?;
134
135        OpenOptions::new()
136            .read(true)
137            .write(true)
138            .create(true)
139            .truncate(true)
140            .open(path_ref)?;
141        let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
142
143        let header = Header {
144            magic: MAGIC,
145            version: SPEC_VERSION,
146            footer_offset: WAL_OFFSET + WAL_SIZE_TINY,
147            wal_offset: WAL_OFFSET,
148            wal_size: WAL_SIZE_TINY,
149            wal_checkpoint_pos: 0,
150            wal_sequence: 0,
151            toc_checksum: [0u8; 32],
152        };
153
154        let mut toc = empty_toc();
155        // If lex feature is enabled, set the catalog flag immediately
156        #[cfg(feature = "lex")]
157        {
158            toc.segment_catalog.lex_enabled = true;
159        }
160        file.set_len(header.footer_offset)?;
161        HeaderCodec::write(&mut file, &header)?;
162
163        let wal = EmbeddedWal::open(&file, &header)?;
164        let data_end = header.footer_offset;
165        #[cfg(feature = "lex")]
166        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
167        #[cfg(feature = "parallel_segments")]
168        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
169        #[cfg(feature = "parallel_segments")]
170        let manifest_wal_entries = manifest_wal.replay()?;
171
172        let mut memvid = Self {
173            file,
174            path: path_ref.to_path_buf(),
175            lock,
176            read_only: false,
177            header,
178            toc,
179            wal,
180            pending_frame_inserts: 0,
181            data_end,
182            generation: 0,
183            lock_settings: LockSettings::default(),
184            lex_enabled: cfg!(feature = "lex"), // Enable by default if feature is enabled
185            lex_index: None,
186            #[cfg(feature = "lex")]
187            lex_storage,
188            vec_enabled: cfg!(feature = "vec"), // Enable by default if feature is enabled
189            vec_compression: VectorCompression::None,
190            vec_index: None,
191            clip_enabled: cfg!(feature = "clip"), // Enable by default if feature is enabled
192            clip_index: None,
193            dirty: false,
194            #[cfg(feature = "lex")]
195            tantivy: None,
196            #[cfg(feature = "lex")]
197            tantivy_dirty: false,
198            #[cfg(feature = "temporal_track")]
199            temporal_track: None,
200            #[cfg(feature = "parallel_segments")]
201            manifest_wal: Some(manifest_wal),
202            memories_track: MemoriesTrack::new(),
203            logic_mesh: LogicMesh::new(),
204            sketch_track: SketchTrack::default(),
205            schema_registry: SchemaRegistry::new(),
206            schema_strict: false,
207            #[cfg(feature = "replay")]
208            active_session: None,
209            #[cfg(feature = "replay")]
210            completed_sessions: Vec::new(),
211        };
212
213        #[cfg(feature = "lex")]
214        memvid.init_tantivy()?;
215
216        #[cfg(feature = "parallel_segments")]
217        memvid.load_manifest_segments(manifest_wal_entries);
218
219        memvid.bootstrap_segment_catalog();
220
221        // Create empty manifests for enabled indexes so they persist across open/close
222        let empty_offset = memvid.data_end;
223        let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
224                                \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
225
226        #[cfg(feature = "lex")]
227        if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
228            memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
229                doc_count: 0,
230                generation: 0,
231                bytes_offset: empty_offset,
232                bytes_length: 0,
233                checksum: empty_checksum,
234            });
235        }
236
237        #[cfg(feature = "vec")]
238        if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
239            memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
240                vector_count: 0,
241                dimension: 0,
242                bytes_offset: empty_offset,
243                bytes_length: 0,
244                checksum: empty_checksum,
245                compression_mode: memvid.vec_compression.clone(),
246            });
247        }
248
249        memvid.rewrite_toc_footer()?;
250        memvid.header.toc_checksum = memvid.toc.toc_checksum;
251        crate::persist_header(&mut memvid.file, &memvid.header)?;
252        memvid.file.sync_all()?;
253        Ok(memvid)
254    }
255
256    #[must_use]
257    pub fn lock_settings(&self) -> &LockSettings {
258        &self.lock_settings
259    }
260
261    pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
262        &mut self.lock_settings
263    }
264
265    /// Set the vector compression mode for this memory
266    /// Must be called before ingesting documents with embeddings
267    pub fn set_vector_compression(&mut self, compression: VectorCompression) {
268        self.vec_compression = compression;
269    }
270
271    /// Get the current vector compression mode
272    #[must_use]
273    pub fn vector_compression(&self) -> &VectorCompression {
274        &self.vec_compression
275    }
276
277    /// Predict the next frame ID that would be assigned to a new insert.
278    ///
279    /// Frame IDs are dense indices into `toc.frames`. When a memory is mutable, inserts are first
280    /// appended to the embedded WAL and only materialized into `toc.frames` on commit. This helper
281    /// lets frontends allocate stable frame IDs before an explicit commit.
282    #[must_use]
283    pub fn next_frame_id(&self) -> u64 {
284        (self.toc.frames.len() as u64).saturating_add(self.pending_frame_inserts)
285    }
286
287    /// Returns the total number of frames in the memory.
288    ///
289    /// This includes all frames regardless of status (active, deleted, etc.).
290    #[must_use]
291    pub fn frame_count(&self) -> usize {
292        self.toc.frames.len()
293    }
294
295    fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
296        // Fast-path detection for encrypted capsules (.mv2e).
297        // This avoids confusing "invalid header" errors and provides an actionable hint.
298        let mut magic = [0u8; 4];
299        let is_mv2e = file.read_exact(&mut magic).is_ok() && magic == *b"MV2E";
300        file.seek(SeekFrom::Start(0))?;
301        if is_mv2e {
302            return Err(MemvidError::EncryptedFile {
303                path: path_ref.to_path_buf(),
304                hint: format!("Run: memvid unlock {}", path_ref.display()),
305            });
306        }
307
308        let mut header = HeaderCodec::read(&mut file)?;
309        let toc = match read_toc(&mut file, &header) {
310            Ok(toc) => toc,
311            Err(err @ (MemvidError::Decode(_) | MemvidError::InvalidToc { .. })) => {
312                tracing::info!("toc decode failed ({}); attempting recovery", err);
313                let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
314                if recovered_offset != header.footer_offset
315                    || header.toc_checksum != toc.toc_checksum
316                {
317                    header.footer_offset = recovered_offset;
318                    header.toc_checksum = toc.toc_checksum;
319                    crate::persist_header(&mut file, &header)?;
320                }
321                toc
322            }
323            Err(err) => return Err(err),
324        };
325        let checksum_result = toc.verify_checksum();
326
327        // Validate segment integrity early to catch corruption before loading indexes
328        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
329        if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
330            tracing::warn!("Segment integrity validation failed: {}", e);
331            // Don't fail file open - let doctor handle it
332            // This is just an early warning system
333        }
334        ensure_non_overlapping_frames(&toc, file_len)?;
335
336        let wal = EmbeddedWal::open(&file, &header)?;
337        #[cfg(feature = "lex")]
338        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
339            toc.indexes.lex.as_ref(),
340            &toc.indexes.lex_segments,
341        )));
342        #[cfg(feature = "parallel_segments")]
343        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
344        #[cfg(feature = "parallel_segments")]
345        let manifest_wal_entries = manifest_wal.replay()?;
346
347        let generation = detect_generation(&file)?.unwrap_or(0);
348        let read_only = lock.mode() == LockMode::Shared;
349
350        let mut memvid = Self {
351            file,
352            path: path_ref.to_path_buf(),
353            lock,
354            read_only,
355            header,
356            toc,
357            wal,
358            pending_frame_inserts: 0,
359            data_end: 0,
360            generation,
361            lock_settings: LockSettings::default(),
362            lex_enabled: false,
363            lex_index: None,
364            #[cfg(feature = "lex")]
365            lex_storage,
366            vec_enabled: false,
367            vec_compression: VectorCompression::None,
368            vec_index: None,
369            clip_enabled: false,
370            clip_index: None,
371            dirty: false,
372            #[cfg(feature = "lex")]
373            tantivy: None,
374            #[cfg(feature = "lex")]
375            tantivy_dirty: false,
376            #[cfg(feature = "temporal_track")]
377            temporal_track: None,
378            #[cfg(feature = "parallel_segments")]
379            manifest_wal: Some(manifest_wal),
380            memories_track: MemoriesTrack::new(),
381            logic_mesh: LogicMesh::new(),
382            sketch_track: SketchTrack::default(),
383            schema_registry: SchemaRegistry::new(),
384            schema_strict: false,
385            #[cfg(feature = "replay")]
386            active_session: None,
387            #[cfg(feature = "replay")]
388            completed_sessions: Vec::new(),
389        };
390        memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
391        // Use consolidated helper for lex_enabled check
392        memvid.lex_enabled = has_lex_index(&memvid.toc);
393        if memvid.lex_enabled {
394            memvid.load_lex_index_from_manifest()?;
395        }
396        #[cfg(feature = "lex")]
397        {
398            memvid.init_tantivy()?;
399        }
400        memvid.vec_enabled =
401            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
402        if memvid.vec_enabled {
403            memvid.load_vec_index_from_manifest()?;
404        }
405        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
406        if memvid.clip_enabled {
407            memvid.load_clip_index_from_manifest()?;
408        }
409        memvid.recover_wal()?;
410        #[cfg(feature = "parallel_segments")]
411        memvid.load_manifest_segments(manifest_wal_entries);
412        memvid.bootstrap_segment_catalog();
413        #[cfg(feature = "temporal_track")]
414        memvid.ensure_temporal_track_loaded()?;
415        memvid.load_memories_track()?;
416        memvid.load_logic_mesh()?;
417        memvid.load_sketch_track()?;
418        if checksum_result.is_err() {
419            memvid.toc.verify_checksum()?;
420            if memvid.toc.toc_checksum != memvid.header.toc_checksum {
421                memvid.header.toc_checksum = memvid.toc.toc_checksum;
422                crate::persist_header(&mut memvid.file, &memvid.header)?;
423                memvid.file.sync_all()?;
424            }
425        }
426        Ok(memvid)
427    }
428
429    /// Open an existing `.mv2` with exclusive access, performing recovery if needed.
430    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
431        let path_ref = path.as_ref();
432        ensure_single_file(path_ref)?;
433
434        let (file, lock) = FileLock::open_and_lock(path_ref)?;
435        Self::open_locked(file, lock, path_ref)
436    }
437
438    pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
439        Self::open_read_only_with_options(path, OpenReadOptions::default())
440    }
441
442    pub fn open_read_only_with_options<P: AsRef<Path>>(
443        path: P,
444        options: OpenReadOptions,
445    ) -> Result<Self> {
446        let path_ref = path.as_ref();
447        ensure_single_file(path_ref)?;
448
449        if options.allow_repair {
450            return Self::open(path_ref);
451        }
452
453        Self::open_read_only_snapshot(path_ref)
454    }
455
456    fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
457        let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
458        let TailSnapshot {
459            toc,
460            footer_offset,
461            data_end,
462            generation,
463        } = load_tail_snapshot(&file)?;
464
465        let mut header = HeaderCodec::read(&mut file)?;
466        header.footer_offset = footer_offset;
467        header.toc_checksum = toc.toc_checksum;
468
469        let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
470        let wal = EmbeddedWal::open_read_only(&file, &header)?;
471
472        #[cfg(feature = "lex")]
473        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
474            toc.indexes.lex.as_ref(),
475            &toc.indexes.lex_segments,
476        )));
477
478        let mut memvid = Self {
479            file,
480            path: path_ref.to_path_buf(),
481            lock,
482            read_only: true,
483            header,
484            toc,
485            wal,
486            pending_frame_inserts: 0,
487            data_end,
488            generation,
489            lock_settings: LockSettings::default(),
490            lex_enabled: false,
491            lex_index: None,
492            #[cfg(feature = "lex")]
493            lex_storage,
494            vec_enabled: false,
495            vec_compression: VectorCompression::None,
496            vec_index: None,
497            clip_enabled: false,
498            clip_index: None,
499            dirty: false,
500            #[cfg(feature = "lex")]
501            tantivy: None,
502            #[cfg(feature = "lex")]
503            tantivy_dirty: false,
504            #[cfg(feature = "temporal_track")]
505            temporal_track: None,
506            #[cfg(feature = "parallel_segments")]
507            manifest_wal: None,
508            memories_track: MemoriesTrack::new(),
509            logic_mesh: LogicMesh::new(),
510            sketch_track: SketchTrack::default(),
511            schema_registry: SchemaRegistry::new(),
512            schema_strict: false,
513            #[cfg(feature = "replay")]
514            active_session: None,
515            #[cfg(feature = "replay")]
516            completed_sessions: Vec::new(),
517        };
518
519        // Use consolidated helper for lex_enabled check
520        memvid.lex_enabled = has_lex_index(&memvid.toc);
521        if memvid.lex_enabled {
522            memvid.load_lex_index_from_manifest()?;
523        }
524        #[cfg(feature = "lex")]
525        memvid.init_tantivy()?;
526
527        memvid.vec_enabled =
528            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
529        if memvid.vec_enabled {
530            memvid.load_vec_index_from_manifest()?;
531        }
532        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
533        if memvid.clip_enabled {
534            memvid.load_clip_index_from_manifest()?;
535        }
536        // Load memories track, Logic-Mesh, and sketch track if present
537        memvid.load_memories_track()?;
538        memvid.load_logic_mesh()?;
539        memvid.load_sketch_track()?;
540
541        memvid.bootstrap_segment_catalog();
542        #[cfg(feature = "temporal_track")]
543        memvid.ensure_temporal_track_loaded()?;
544
545        Ok(memvid)
546    }
547
548    pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
549        let path_ref = path.as_ref();
550        ensure_single_file(path_ref)?;
551
552        let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
553        let lock = match FileLock::try_acquire(&file, path_ref)? {
554            Some(lock) => lock,
555            None => {
556                return Err(MemvidError::Lock(
557                    "exclusive access unavailable for doctor".to_string(),
558                ));
559            }
560        };
561        Self::open_locked(file, lock, path_ref)
562    }
563
564    fn bootstrap_segment_catalog(&mut self) {
565        let catalog = &mut self.toc.segment_catalog;
566        if catalog.version == 0 {
567            catalog.version = 1;
568        }
569        if catalog.next_segment_id == 0 {
570            let mut max_id = 0u64;
571            for descriptor in &catalog.lex_segments {
572                max_id = max_id.max(descriptor.common.segment_id);
573            }
574            for descriptor in &catalog.vec_segments {
575                max_id = max_id.max(descriptor.common.segment_id);
576            }
577            for descriptor in &catalog.time_segments {
578                max_id = max_id.max(descriptor.common.segment_id);
579            }
580            #[cfg(feature = "temporal_track")]
581            for descriptor in &catalog.temporal_segments {
582                max_id = max_id.max(descriptor.common.segment_id);
583            }
584            #[cfg(feature = "parallel_segments")]
585            for descriptor in &catalog.index_segments {
586                max_id = max_id.max(descriptor.common.segment_id);
587            }
588            if max_id > 0 {
589                catalog.next_segment_id = max_id.saturating_add(1);
590            }
591        }
592    }
593
594    #[cfg(feature = "parallel_segments")]
595    fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
596        if entries.is_empty() {
597            return;
598        }
599        for entry in entries {
600            let duplicate = self
601                .toc
602                .segment_catalog
603                .index_segments
604                .iter()
605                .any(|existing| existing.common.segment_id == entry.common.segment_id);
606            if !duplicate {
607                self.toc.segment_catalog.index_segments.push(entry);
608            }
609        }
610    }
611
612    /// Load the memories track from the manifest if present.
613    fn load_memories_track(&mut self) -> Result<()> {
614        let manifest = match &self.toc.memories_track {
615            Some(m) => m,
616            None => return Ok(()),
617        };
618
619        // Read the compressed data from the file
620        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
621            return Err(MemvidError::InvalidToc {
622                reason: "memories track exceeds safety limit".into(),
623            });
624        }
625        // Safe: guarded by MAX_INDEX_BYTES check above
626        #[allow(clippy::cast_possible_truncation)]
627        let mut buf = vec![0u8; manifest.bytes_length as usize];
628        self.file
629            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
630        self.file.read_exact(&mut buf)?;
631
632        // Verify checksum
633        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
634        if actual_checksum != manifest.checksum {
635            return Err(MemvidError::InvalidToc {
636                reason: "memories track checksum mismatch".into(),
637            });
638        }
639
640        // Deserialize the memories track
641        self.memories_track = MemoriesTrack::deserialize(&buf)?;
642
643        Ok(())
644    }
645
646    /// Load the Logic-Mesh from the manifest if present.
647    fn load_logic_mesh(&mut self) -> Result<()> {
648        let manifest = match &self.toc.logic_mesh {
649            Some(m) => m,
650            None => return Ok(()),
651        };
652
653        // Read the serialized data from the file
654        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
655            return Err(MemvidError::InvalidToc {
656                reason: "logic mesh exceeds safety limit".into(),
657            });
658        }
659        // Safe: guarded by MAX_INDEX_BYTES check above
660        #[allow(clippy::cast_possible_truncation)]
661        let mut buf = vec![0u8; manifest.bytes_length as usize];
662        self.file
663            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
664        self.file.read_exact(&mut buf)?;
665
666        // Verify checksum
667        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
668        if actual_checksum != manifest.checksum {
669            return Err(MemvidError::InvalidToc {
670                reason: "logic mesh checksum mismatch".into(),
671            });
672        }
673
674        // Deserialize the logic mesh
675        self.logic_mesh = LogicMesh::deserialize(&buf)?;
676
677        Ok(())
678    }
679
680    /// Load the sketch track from the manifest if present.
681    fn load_sketch_track(&mut self) -> Result<()> {
682        let manifest = match &self.toc.sketch_track {
683            Some(m) => m.clone(),
684            None => return Ok(()),
685        };
686
687        // Read and deserialize the sketch track (read_sketch_track handles seeking and checksum)
688        self.sketch_track = crate::types::read_sketch_track(
689            &mut self.file,
690            manifest.bytes_offset,
691            manifest.bytes_length,
692        )?;
693
694        Ok(())
695    }
696
697    #[cfg(feature = "temporal_track")]
698    pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
699        if self.temporal_track.is_some() {
700            return Ok(());
701        }
702        let manifest = match &self.toc.temporal_track {
703            Some(manifest) => manifest.clone(),
704            None => return Ok(()),
705        };
706        if manifest.bytes_length == 0 {
707            return Ok(());
708        }
709        let file_len = self.file.metadata()?.len();
710        let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
711            return Ok(());
712        };
713        if end > file_len {
714            return Ok(());
715        }
716        match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
717            Ok(track) => self.temporal_track = Some(track),
718            Err(MemvidError::InvalidTemporalTrack { .. }) => {
719                return Ok(());
720            }
721            Err(err) => return Err(err),
722        }
723        Ok(())
724    }
725
726    #[cfg(feature = "temporal_track")]
727    pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
728        self.ensure_temporal_track_loaded()?;
729        Ok(self.temporal_track.as_ref())
730    }
731
732    #[cfg(feature = "temporal_track")]
733    pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
734        self.ensure_temporal_track_loaded()?;
735        let Some(track) = self.temporal_track.as_ref() else {
736            return Ok(None);
737        };
738        if !track.capabilities().has_anchors {
739            return Ok(None);
740        }
741        Ok(track
742            .anchor_for_frame(frame_id)
743            .map(|anchor| anchor.anchor_ts))
744    }
745
746    #[cfg(feature = "temporal_track")]
747    pub(crate) fn clear_temporal_track_cache(&mut self) {
748        self.temporal_track = None;
749    }
750
751    #[cfg(feature = "temporal_track")]
752    pub(crate) fn effective_temporal_timestamp(
753        &mut self,
754        frame_id: FrameId,
755        fallback: i64,
756    ) -> Result<i64> {
757        Ok(self
758            .temporal_anchor_timestamp(frame_id)?
759            .unwrap_or(fallback))
760    }
761
762    #[cfg(not(feature = "temporal_track"))]
763    pub(crate) fn effective_temporal_timestamp(
764        &mut self,
765        _frame_id: crate::types::FrameId,
766        fallback: i64,
767    ) -> Result<i64> {
768        Ok(fallback)
769    }
770
771    /// Get current memory binding information.
772    ///
773    /// Returns the binding if this file is bound to a dashboard memory,
774    /// or None if unbound.
775    #[must_use]
776    pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
777        self.toc.memory_binding.as_ref()
778    }
779
780    /// Bind this file to a dashboard memory.
781    ///
782    /// This stores the binding in the TOC and applies a temporary ticket for initial binding.
783    /// The caller should follow up with `apply_signed_ticket` for cryptographic verification.
784    ///
785    /// # Errors
786    ///
787    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
788    #[allow(deprecated)]
789    pub fn bind_memory(
790        &mut self,
791        binding: crate::types::MemoryBinding,
792        ticket: crate::types::Ticket,
793    ) -> Result<()> {
794        // Check existing binding
795        if let Some(existing) = self.get_memory_binding() {
796            if existing.memory_id != binding.memory_id {
797                return Err(MemvidError::MemoryAlreadyBound {
798                    existing_memory_id: existing.memory_id,
799                    existing_memory_name: existing.memory_name.clone(),
800                    bound_at: existing.bound_at.to_rfc3339(),
801                });
802            }
803        }
804
805        // Apply ticket for capacity
806        self.apply_ticket(ticket)?;
807
808        // Store binding in TOC
809        self.toc.memory_binding = Some(binding);
810        self.dirty = true;
811
812        Ok(())
813    }
814
815    /// Set only the memory binding without applying a ticket.
816    ///
817    /// This is used when the caller will immediately follow up with `apply_signed_ticket`
818    /// to apply the cryptographically verified ticket. This avoids the sequence number
819    /// conflict that occurs when using `bind_memory` with a temporary ticket.
820    ///
821    /// # Errors
822    ///
823    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
824    pub fn set_memory_binding_only(&mut self, binding: crate::types::MemoryBinding) -> Result<()> {
825        self.ensure_writable()?;
826
827        // Check existing binding
828        if let Some(existing) = self.get_memory_binding() {
829            if existing.memory_id != binding.memory_id {
830                return Err(MemvidError::MemoryAlreadyBound {
831                    existing_memory_id: existing.memory_id,
832                    existing_memory_name: existing.memory_name.clone(),
833                    bound_at: existing.bound_at.to_rfc3339(),
834                });
835            }
836        }
837
838        // Store binding in TOC (without applying a ticket)
839        self.toc.memory_binding = Some(binding);
840        self.dirty = true;
841
842        Ok(())
843    }
844
845    /// Unbind this file from its dashboard memory.
846    ///
847    /// This clears the binding and reverts to free tier capacity (1 GB).
848    pub fn unbind_memory(&mut self) -> Result<()> {
849        self.toc.memory_binding = None;
850        // Revert to free tier
851        self.toc.ticket_ref = crate::types::TicketRef {
852            issuer: "free-tier".into(),
853            seq_no: 1,
854            expires_in_secs: 0,
855            capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
856            verified: false,
857        };
858        self.dirty = true;
859        Ok(())
860    }
861}
862
863pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
864    use crate::footer::{CommitFooter, FOOTER_SIZE};
865
866    let len = file.metadata()?.len();
867    if len < header.footer_offset {
868        return Err(MemvidError::InvalidToc {
869            reason: "footer offset beyond file length".into(),
870        });
871    }
872
873    // Read the entire region from footer_offset to EOF (includes TOC + footer)
874    file.seek(SeekFrom::Start(header.footer_offset))?;
875    // Safe: total_size bounded by file length, and we check MAX_INDEX_BYTES before reading
876    #[allow(clippy::cast_possible_truncation)]
877    let total_size = (len - header.footer_offset) as usize;
878    if total_size as u64 > crate::MAX_INDEX_BYTES {
879        return Err(MemvidError::InvalidToc {
880            reason: "toc region exceeds safety limit".into(),
881        });
882    }
883
884    if total_size < FOOTER_SIZE {
885        return Err(MemvidError::InvalidToc {
886            reason: "region too small to contain footer".into(),
887        });
888    }
889
890    let mut buf = Vec::with_capacity(total_size);
891    file.read_to_end(&mut buf)?;
892
893    // Parse the footer (last FOOTER_SIZE bytes)
894    let footer_start = buf.len() - FOOTER_SIZE;
895    let footer_bytes = &buf[footer_start..];
896    let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
897        reason: "failed to decode commit footer".into(),
898    })?;
899
900    // Extract only the TOC bytes (excluding the footer)
901    let toc_bytes = &buf[..footer_start];
902    #[allow(clippy::cast_possible_truncation)]
903    if toc_bytes.len() != footer.toc_len as usize {
904        return Err(MemvidError::InvalidToc {
905            reason: "toc length mismatch".into(),
906        });
907    }
908    if !footer.hash_matches(toc_bytes) {
909        return Err(MemvidError::InvalidToc {
910            reason: "commit footer toc hash mismatch".into(),
911        });
912    }
913
914    verify_toc_prefix(toc_bytes)?;
915    let toc = Toc::decode(toc_bytes)?;
916    Ok(toc)
917}
918
919fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
920    const MAX_SEGMENTS: u64 = 1_000_000;
921    const MAX_FRAMES: u64 = 1_000_000;
922    const MIN_SEGMENT_META_BYTES: u64 = 32;
923    const MIN_FRAME_BYTES: u64 = 64;
924    // TOC trailer layout (little-endian):
925    // [toc_version:u64][segments_len:u64][frames_len:u64]...
926    let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
927        let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
928            reason: context.to_string().into(),
929        })?;
930        let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
931            reason: context.to_string().into(),
932        })?;
933        Ok(u64::from_le_bytes(array))
934    };
935
936    if bytes.len() < 24 {
937        return Err(MemvidError::InvalidToc {
938            reason: "toc trailer too small".into(),
939        });
940    }
941    let toc_version = read_u64(0..8, "toc version missing or truncated")?;
942    if toc_version > 32 {
943        return Err(MemvidError::InvalidToc {
944            reason: "toc version unreasonable".into(),
945        });
946    }
947    let segments_len = read_u64(8..16, "segment count missing or truncated")?;
948    if segments_len > MAX_SEGMENTS {
949        return Err(MemvidError::InvalidToc {
950            reason: "segment count unreasonable".into(),
951        });
952    }
953    let frames_len = read_u64(16..24, "frame count missing or truncated")?;
954    if frames_len > MAX_FRAMES {
955        return Err(MemvidError::InvalidToc {
956            reason: "frame count unreasonable".into(),
957        });
958    }
959    let required = segments_len
960        .saturating_mul(MIN_SEGMENT_META_BYTES)
961        .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
962    if required > bytes.len() as u64 {
963        return Err(MemvidError::InvalidToc {
964            reason: "toc payload inconsistent with counts".into(),
965        });
966    }
967    Ok(())
968}
969
970/// Ensure frame payloads do not overlap each other or exceed file boundary.
971///
972/// Frames in the TOC are ordered by `frame_id`, not by `payload_offset`, so we must
973/// sort by `payload_offset` before checking for overlaps.
974///
975/// Note: Frames with `payload_length` == 0 are "virtual" frames (e.g., document
976/// frames that reference chunks) and are skipped from this check.
977fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
978    // Collect active frames with actual payloads and sort by payload_offset
979    let mut frames_by_offset: Vec<_> = toc
980        .frames
981        .iter()
982        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
983        .collect();
984    frames_by_offset.sort_by_key(|f| f.payload_offset);
985
986    let mut previous_end = 0u64;
987    for frame in frames_by_offset {
988        let end = frame
989            .payload_offset
990            .checked_add(frame.payload_length)
991            .ok_or_else(|| MemvidError::InvalidToc {
992                reason: "frame payload offsets overflow".into(),
993            })?;
994        if end > file_len {
995            return Err(MemvidError::InvalidToc {
996                reason: "frame payload exceeds file length".into(),
997            });
998        }
999        if frame.payload_offset < previous_end {
1000            return Err(MemvidError::InvalidToc {
1001                reason: format!(
1002                    "frame {} payload overlaps with previous frame (offset {} < previous end {})",
1003                    frame.id, frame.payload_offset, previous_end
1004                )
1005                .into(),
1006            });
1007        }
1008        previous_end = end;
1009    }
1010    Ok(())
1011}
1012
1013pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
1014    let len = file.metadata()?.len();
1015    // Safety: we only create a read-only mapping over stable file bytes.
1016    let mmap = unsafe { Mmap::map(&*file)? };
1017    tracing::debug!(file_len = len, "attempting toc recovery");
1018
1019    // First, try to find a valid footer which includes validated TOC bytes
1020    if let Some(footer_slice) = find_last_valid_footer(&mmap) {
1021        tracing::debug!(
1022            footer_offset = footer_slice.footer_offset,
1023            toc_offset = footer_slice.toc_offset,
1024            toc_len = footer_slice.toc_bytes.len(),
1025            "found valid footer during recovery"
1026        );
1027        // The footer has already validated the TOC hash, so we can directly decode it
1028        match Toc::decode(footer_slice.toc_bytes) {
1029            Ok(toc) => {
1030                return Ok((toc, footer_slice.toc_offset as u64));
1031            }
1032            Err(err) => {
1033                tracing::warn!(
1034                    error = %err,
1035                    "footer-validated TOC failed to decode, falling back to scan"
1036                );
1037            }
1038        }
1039    }
1040
1041    // If we have a header-provided hint (`footer_offset`) but the commit footer itself is corrupted,
1042    // we can often still recover because the TOC bytes are intact. In that case, assume the TOC
1043    // spans from `hint` up to the final fixed-size commit footer and decode it best-effort.
1044    if let Some(hint_offset) = hint {
1045        use crate::footer::FOOTER_SIZE;
1046
1047        // Safe: file successfully mmapped so length fits in usize
1048        #[allow(clippy::cast_possible_truncation)]
1049        let start = (hint_offset.min(len)) as usize;
1050        if mmap.len().saturating_sub(start) >= FOOTER_SIZE {
1051            let toc_end = mmap.len().saturating_sub(FOOTER_SIZE);
1052            if toc_end > start {
1053                let toc_bytes = &mmap[start..toc_end];
1054                if verify_toc_prefix(toc_bytes).is_ok() {
1055                    let attempt = panic::catch_unwind(|| Toc::decode(toc_bytes));
1056                    if let Ok(Ok(toc)) = attempt {
1057                        tracing::debug!(
1058                            recovered_offset = hint_offset,
1059                            recovered_frames = toc.frames.len(),
1060                            "recovered toc from hinted offset without validated footer"
1061                        );
1062                        return Ok((toc, hint_offset));
1063                    }
1064                }
1065            }
1066        }
1067    }
1068
1069    // Fallback to manual scan if footer-based recovery failed
1070    let mut ranges = Vec::new();
1071    if let Some(hint_offset) = hint {
1072        // Safe: file successfully mmapped so length fits in usize
1073        #[allow(clippy::cast_possible_truncation)]
1074        let hint_idx = hint_offset.min(len) as usize;
1075        ranges.push((hint_idx, mmap.len()));
1076        if hint_idx > 0 {
1077            ranges.push((0, hint_idx));
1078        }
1079    } else {
1080        ranges.push((0, mmap.len()));
1081    }
1082
1083    for (start, end) in ranges {
1084        if let Some(found) = scan_range_for_toc(&mmap, start, end) {
1085            return Ok(found);
1086        }
1087    }
1088
1089    Err(MemvidError::InvalidToc {
1090        reason: "unable to recover table of contents from file trailer".into(),
1091    })
1092}
1093
1094fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
1095    if start >= end || end > data.len() {
1096        return None;
1097    }
1098    const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
1099    const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
1100
1101    // We only ever consider offsets where the candidate TOC slice would be <= MAX_TOC_BYTES,
1102    // otherwise the loop devolves into iterating over the entire file for large memories.
1103    let min_offset = data.len().saturating_sub(MAX_TOC_BYTES);
1104    let scan_start = start.max(min_offset);
1105
1106    for offset in (scan_start..end).rev() {
1107        let slice = &data[offset..];
1108        if slice.len() < 16 {
1109            continue;
1110        }
1111        debug_assert!(slice.len() <= MAX_TOC_BYTES);
1112
1113        // No footer found - try old format with checksum
1114        if slice.len() < ZERO_CHECKSUM.len() {
1115            continue;
1116        }
1117        let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
1118        let mut hasher = Hasher::new();
1119        hasher.update(body);
1120        hasher.update(&ZERO_CHECKSUM);
1121        if hasher.finalize().as_bytes() != stored_checksum {
1122            continue;
1123        }
1124        if verify_toc_prefix(slice).is_err() {
1125            continue;
1126        }
1127        let attempt = panic::catch_unwind(|| Toc::decode(slice));
1128        if let Ok(Ok(toc)) = attempt {
1129            let recovered_offset = offset as u64;
1130            tracing::debug!(
1131                recovered_offset,
1132                recovered_frames = toc.frames.len(),
1133                "recovered toc via scan"
1134            );
1135            return Some((toc, recovered_offset));
1136        }
1137    }
1138    None
1139}
1140
1141pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
1142    toc.toc_checksum = [0u8; 32];
1143    let bytes = toc.encode()?;
1144    let checksum = Toc::calculate_checksum(&bytes);
1145    toc.toc_checksum = checksum;
1146    toc.encode()
1147}
1148
1149pub(crate) fn empty_toc() -> Toc {
1150    Toc {
1151        toc_version: 0,
1152        segments: Vec::new(),
1153        frames: Vec::new(),
1154        indexes: IndexManifests::default(),
1155        time_index: None,
1156        temporal_track: None,
1157        memories_track: None,
1158        logic_mesh: None,
1159        sketch_track: None,
1160        segment_catalog: SegmentCatalog::default(),
1161        ticket_ref: TicketRef {
1162            issuer: "free-tier".into(),
1163            seq_no: 1,
1164            expires_in_secs: 0,
1165            capacity_bytes: Tier::Free.capacity_bytes(),
1166            verified: false,
1167        },
1168        memory_binding: None,
1169        replay_manifest: None,
1170        enrichment_queue: crate::types::EnrichmentQueueManifest::default(),
1171        merkle_root: [0u8; 32],
1172        toc_checksum: [0u8; 32],
1173    }
1174}
1175
1176pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1177    // `data_end` tracks the end of all data bytes that should not be overwritten by appends:
1178    // - frame payloads
1179    // - embedded indexes / metadata segments referenced by the TOC
1180    // - the current footer boundary (TOC offset), since callers may safely overwrite old TOCs
1181    //
1182    // Keeping this conservative prevents WAL replay / appends from corrupting embedded segments.
1183    let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1184    let mut max_end = wal_region_end.max(header.footer_offset);
1185
1186    // Frame payloads (active only).
1187    for frame in toc
1188        .frames
1189        .iter()
1190        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1191    {
1192        if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1193            max_end = max_end.max(end);
1194        }
1195    }
1196
1197    // Segment catalog entries.
1198    let catalog = &toc.segment_catalog;
1199    for seg in &catalog.lex_segments {
1200        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1201            max_end = max_end.max(end);
1202        }
1203    }
1204    for seg in &catalog.vec_segments {
1205        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1206            max_end = max_end.max(end);
1207        }
1208    }
1209    for seg in &catalog.time_segments {
1210        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1211            max_end = max_end.max(end);
1212        }
1213    }
1214    #[cfg(feature = "temporal_track")]
1215    for seg in &catalog.temporal_segments {
1216        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1217            max_end = max_end.max(end);
1218        }
1219    }
1220    #[cfg(feature = "lex")]
1221    for seg in &catalog.tantivy_segments {
1222        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1223            max_end = max_end.max(end);
1224        }
1225    }
1226    #[cfg(feature = "parallel_segments")]
1227    for seg in &catalog.index_segments {
1228        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1229            max_end = max_end.max(end);
1230        }
1231    }
1232
1233    // Global manifests (non-segment storage paths).
1234    if let Some(manifest) = toc.indexes.lex.as_ref() {
1235        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1236            max_end = max_end.max(end);
1237        }
1238    }
1239    if let Some(manifest) = toc.indexes.vec.as_ref() {
1240        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1241            max_end = max_end.max(end);
1242        }
1243    }
1244    if let Some(manifest) = toc.indexes.clip.as_ref() {
1245        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1246            max_end = max_end.max(end);
1247        }
1248    }
1249    if let Some(manifest) = toc.time_index.as_ref() {
1250        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1251            max_end = max_end.max(end);
1252        }
1253    }
1254    #[cfg(feature = "temporal_track")]
1255    if let Some(track) = toc.temporal_track.as_ref() {
1256        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1257            max_end = max_end.max(end);
1258        }
1259    }
1260    if let Some(track) = toc.memories_track.as_ref() {
1261        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1262            max_end = max_end.max(end);
1263        }
1264    }
1265    if let Some(mesh) = toc.logic_mesh.as_ref() {
1266        if let Some(end) = mesh.bytes_offset.checked_add(mesh.bytes_length) {
1267            max_end = max_end.max(end);
1268        }
1269    }
1270    if let Some(track) = toc.sketch_track.as_ref() {
1271        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1272            max_end = max_end.max(end);
1273        }
1274    }
1275    #[cfg(feature = "replay")]
1276    if let Some(manifest) = toc.replay_manifest.as_ref() {
1277        if let Some(end) = manifest.segment_offset.checked_add(manifest.segment_size) {
1278            max_end = max_end.max(end);
1279        }
1280    }
1281
1282    tracing::debug!(
1283        wal_region_end,
1284        footer_offset = header.footer_offset,
1285        computed_data_end = max_end,
1286        "compute_data_end"
1287    );
1288
1289    max_end
1290}
1291
1292struct TailSnapshot {
1293    toc: Toc,
1294    footer_offset: u64,
1295    data_end: u64,
1296    generation: u64,
1297}
1298
1299fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1300    const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1301    if mmap.is_empty() {
1302        return None;
1303    }
1304    let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1305    loop {
1306        let start = mmap.len() - window;
1307        if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1308            return Some((slice, start));
1309        }
1310        if window == mmap.len() {
1311            break;
1312        }
1313        window = (window * 2).min(mmap.len());
1314    }
1315    None
1316}
1317
1318fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1319    // Safety: we only create a read-only mapping over the stable file bytes.
1320    let mmap = unsafe { Mmap::map(file)? };
1321
1322    let (slice, offset_adjustment) =
1323        locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1324            reason: "no valid commit footer found".into(),
1325        })?;
1326    let toc = Toc::decode(slice.toc_bytes)?;
1327    toc.verify_checksum()?;
1328
1329    Ok(TailSnapshot {
1330        toc,
1331        footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1332        // Using toc_offset causes stale data_end that moves footer backwards on next commit
1333        data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1334        generation: slice.footer.generation,
1335    })
1336}
1337
1338fn detect_generation(file: &File) -> Result<Option<u64>> {
1339    // Safety: read-only mapping for footer inspection.
1340    let mmap = unsafe { Mmap::map(file)? };
1341
1342    Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1343}
1344
1345pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1346    if let Some(parent) = path.parent() {
1347        let name = path
1348            .file_name()
1349            .and_then(|n| n.to_str())
1350            .unwrap_or_default();
1351        let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1352        for suffix in forbidden {
1353            let candidate = parent.join(format!("{name}{suffix}"));
1354            if candidate.exists() {
1355                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1356            }
1357        }
1358        let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1359        for suffix in hidden_forbidden {
1360            let candidate = parent.join(format!(".{name}{suffix}"));
1361            if candidate.exists() {
1362                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1363            }
1364        }
1365    }
1366    Ok(())
1367}
1368
1369#[cfg(feature = "parallel_segments")]
1370fn manifest_wal_path(path: &Path) -> PathBuf {
1371    let mut wal_path = path.to_path_buf();
1372    wal_path.set_extension("manifest.wal");
1373    wal_path
1374}
1375
1376#[cfg(feature = "parallel_segments")]
1377pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1378    let wal_path = manifest_wal_path(path);
1379    if wal_path.exists() {
1380        let _ = std::fs::remove_file(&wal_path);
1381    }
1382}
1383
1384/// Single source of truth: does this TOC have a lexical index?
1385/// Checks all possible locations: old manifest, `lex_segments`, and `tantivy_segments`.
1386pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1387    toc.segment_catalog.lex_enabled
1388        || toc.indexes.lex.is_some()
1389        || !toc.indexes.lex_segments.is_empty()
1390        || !toc.segment_catalog.tantivy_segments.is_empty()
1391}
1392
1393/// Single source of truth: expected document count for lex index.
1394/// Returns None if we can't determine (e.g., Tantivy segments without manifest).
1395#[cfg(feature = "lex")]
1396pub(crate) fn lex_doc_count(
1397    toc: &Toc,
1398    lex_storage: &crate::search::EmbeddedLexStorage,
1399) -> Option<u64> {
1400    // First try old manifest
1401    if let Some(manifest) = &toc.indexes.lex {
1402        if manifest.doc_count > 0 {
1403            return Some(manifest.doc_count);
1404        }
1405    }
1406
1407    // Then try lex_storage (contains info from lex_segments)
1408    let storage_count = lex_storage.doc_count();
1409    if storage_count > 0 {
1410        return Some(storage_count);
1411    }
1412
1413    // For Tantivy files with segments but no manifest/storage doc_count,
1414    // we can't know doc count without loading the index.
1415    // Return None and let caller decide (init_tantivy should trust segments exist)
1416    None
1417}
1418
1419/// Validates segment integrity on file open to catch corruption early.
1420/// This helps doctor by detecting issues before they cause problems.
1421#[allow(dead_code)]
1422fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1423    let data_limit = header.footer_offset;
1424
1425    // Validate replay segment (if present). Replay is stored AT the footer boundary,
1426    // and footer_offset is moved forward after writing. So we only check against file_len,
1427    // not against footer_offset (which would be after the replay segment).
1428    #[cfg(feature = "replay")]
1429    if let Some(manifest) = toc.replay_manifest.as_ref() {
1430        if manifest.segment_size != 0 {
1431            let end = manifest
1432                .segment_offset
1433                .checked_add(manifest.segment_size)
1434                .ok_or_else(|| MemvidError::Doctor {
1435                    reason: format!(
1436                        "Replay segment offset overflow: {} + {}",
1437                        manifest.segment_offset, manifest.segment_size
1438                    ),
1439                })?;
1440
1441            // Only check against file_len - replay segments sit at the footer boundary
1442            // and footer_offset is updated to point after them
1443            if end > file_len {
1444                return Err(MemvidError::Doctor {
1445                    reason: format!(
1446                        "Replay segment out of bounds: offset={}, length={}, end={}, file_len={}",
1447                        manifest.segment_offset, manifest.segment_size, end, file_len
1448                    ),
1449                });
1450            }
1451        }
1452    }
1453
1454    // Validate Tantivy segments
1455    for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1456        let offset = seg.common.bytes_offset;
1457        let length = seg.common.bytes_length;
1458
1459        if length == 0 {
1460            continue; // Empty segments are okay
1461        }
1462
1463        let end = offset
1464            .checked_add(length)
1465            .ok_or_else(|| MemvidError::Doctor {
1466                reason: format!("Tantivy segment {idx} offset overflow: {offset} + {length}"),
1467            })?;
1468
1469        if end > file_len || end > data_limit {
1470            return Err(MemvidError::Doctor {
1471                reason: format!(
1472                    "Tantivy segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1473                ),
1474            });
1475        }
1476    }
1477
1478    // Validate time index segments
1479    for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1480        let offset = seg.common.bytes_offset;
1481        let length = seg.common.bytes_length;
1482
1483        if length == 0 {
1484            continue;
1485        }
1486
1487        let end = offset
1488            .checked_add(length)
1489            .ok_or_else(|| MemvidError::Doctor {
1490                reason: format!("Time segment {idx} offset overflow: {offset} + {length}"),
1491            })?;
1492
1493        if end > file_len || end > data_limit {
1494            return Err(MemvidError::Doctor {
1495                reason: format!(
1496                    "Time segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1497                ),
1498            });
1499        }
1500    }
1501
1502    // Validate vec segments
1503    for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1504        let offset = seg.common.bytes_offset;
1505        let length = seg.common.bytes_length;
1506
1507        if length == 0 {
1508            continue;
1509        }
1510
1511        let end = offset
1512            .checked_add(length)
1513            .ok_or_else(|| MemvidError::Doctor {
1514                reason: format!("Vec segment {idx} offset overflow: {offset} + {length}"),
1515            })?;
1516
1517        if end > file_len || end > data_limit {
1518            return Err(MemvidError::Doctor {
1519                reason: format!(
1520                    "Vec segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1521                ),
1522            });
1523        }
1524    }
1525
1526    log::debug!("✓ Segment integrity validation passed");
1527    Ok(())
1528}
1529
1530#[cfg(test)]
1531mod tests {
1532    use super::*;
1533    use tempfile::tempdir;
1534
1535    #[test]
1536    fn toc_prefix_underflow_surfaces_reason() {
1537        let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1538        match err {
1539            MemvidError::InvalidToc { reason } => {
1540                assert!(
1541                    reason.contains("trailer too small"),
1542                    "unexpected reason: {reason}"
1543                );
1544            }
1545            other => panic!("unexpected error: {other:?}"),
1546        }
1547    }
1548
1549    #[test]
1550    fn ensure_single_file_blocks_sidecars() {
1551        let dir = tempdir().expect("tmp");
1552        let path = dir.path().join("mem.mv2");
1553        std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1554        let result = Memvid::create(&path);
1555        assert!(matches!(
1556            result,
1557            Err(MemvidError::AuxiliaryFileDetected { .. })
1558        ));
1559    }
1560}