Skip to main content

memvid_core/memvid/
lifecycle.rs

1//! Lifecycle management for creating and opening `.mv2` memories.
2//!
3//! Responsibilities:
4//! - Enforce single-file invariant (no sidecars) and take OS locks.
5//! - Bootstrap headers, internal WAL, and TOC on create, and recover them on open.
6//! - Validate TOC/footer layout, recover the latest valid footer when needed.
7//! - Wire up index state (lex/vector/time) without mutating payload bytes.
8
9use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_TINY};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31    FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, PutManyOpts, SchemaRegistry,
32    SegmentCatalog, SketchTrack, TicketRef, Tier, Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44/// Primary handle for interacting with a `.mv2` memory file.
45///
46/// Holds the file descriptor, lock, header, TOC, and in-memory index state. Mutations
47/// append to the embedded WAL and are materialized at commit time to keep the layout deterministic.
48pub struct Memvid {
49    pub(crate) file: File,
50    pub(crate) path: PathBuf,
51    pub(crate) lock: FileLock,
52    pub(crate) read_only: bool,
53    pub(crate) header: Header,
54    pub(crate) toc: Toc,
55    pub(crate) wal: EmbeddedWal,
56    /// Number of frame inserts appended to WAL but not yet materialized into `toc.frames`.
57    ///
58    /// This lets frontends predict stable frame IDs before an explicit commit.
59    pub(crate) pending_frame_inserts: u64,
60    pub(crate) data_end: u64,
61    /// Cached end of the payload region (max of payload_offset + payload_length across all frames).
62    /// Updated incrementally on frame insert to avoid O(n) scans.
63    pub(crate) cached_payload_end: u64,
64    pub(crate) generation: u64,
65    pub(crate) lock_settings: LockSettings,
66    pub(crate) lex_enabled: bool,
67    pub(crate) lex_index: Option<LexIndex>,
68    #[cfg(feature = "lex")]
69    #[allow(dead_code)]
70    pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
71    pub(crate) vec_enabled: bool,
72    pub(crate) vec_compression: VectorCompression,
73    pub(crate) vec_model: Option<String>,
74    pub(crate) vec_index: Option<VecIndex>,
75    /// CLIP visual embeddings index (separate from vec due to different dimensions)
76    pub(crate) clip_enabled: bool,
77    pub(crate) clip_index: Option<crate::clip::ClipIndex>,
78    pub(crate) dirty: bool,
79    #[cfg(feature = "lex")]
80    pub(crate) tantivy: Option<TantivyEngine>,
81    #[cfg(feature = "lex")]
82    pub(crate) tantivy_dirty: bool,
83    #[cfg(feature = "temporal_track")]
84    pub(crate) temporal_track: Option<TemporalTrack>,
85    #[cfg(feature = "parallel_segments")]
86    pub(crate) manifest_wal: Option<ManifestWal>,
87    /// In-memory track for structured memory cards.
88    pub(crate) memories_track: MemoriesTrack,
89    /// In-memory Logic-Mesh graph for entity-relationship traversal.
90    pub(crate) logic_mesh: LogicMesh,
91    /// In-memory sketch track for fast candidate generation.
92    pub(crate) sketch_track: SketchTrack,
93    /// Schema registry for predicate validation.
94    pub(crate) schema_registry: SchemaRegistry,
95    /// Whether to enforce strict schema validation on card insert.
96    pub(crate) schema_strict: bool,
97    /// Active batch mode options (set by `begin_batch`, cleared by `end_batch`).
98    pub(crate) batch_opts: Option<PutManyOpts>,
99    /// Active replay session being recorded (if any).
100    #[cfg(feature = "replay")]
101    pub(crate) active_session: Option<crate::replay::ActiveSession>,
102    /// Completed sessions stored in memory (until persisted to file).
103    #[cfg(feature = "replay")]
104    pub(crate) completed_sessions: Vec<crate::replay::ReplaySession>,
105}
106
107/// Controls read-only open behaviour for `.mv2` memories.
108#[derive(Debug, Clone, Copy, Default)]
109pub struct OpenReadOptions {
110    pub allow_repair: bool,
111}
112
113#[derive(Debug, Clone)]
114pub struct LockSettings {
115    pub timeout_ms: u64,
116    pub heartbeat_ms: u64,
117    pub stale_grace_ms: u64,
118    pub force_stale: bool,
119    pub command: Option<String>,
120}
121
122impl Default for LockSettings {
123    fn default() -> Self {
124        Self {
125            timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
126            heartbeat_ms: DEFAULT_HEARTBEAT_MS,
127            stale_grace_ms: DEFAULT_STALE_GRACE_MS,
128            force_stale: false,
129            command: None,
130        }
131    }
132}
133
134impl Memvid {
135    /// Create a new, empty `.mv2` file with an embedded WAL and empty TOC.
136    /// The file is locked exclusively for the lifetime of the handle.
137    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
138        let path_ref = path.as_ref();
139        ensure_single_file(path_ref)?;
140
141        OpenOptions::new()
142            .read(true)
143            .write(true)
144            .create(true)
145            .truncate(true)
146            .open(path_ref)?;
147        let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
148
149        let header = Header {
150            magic: MAGIC,
151            version: SPEC_VERSION,
152            footer_offset: WAL_OFFSET + WAL_SIZE_TINY,
153            wal_offset: WAL_OFFSET,
154            wal_size: WAL_SIZE_TINY,
155            wal_checkpoint_pos: 0,
156            wal_sequence: 0,
157            toc_checksum: [0u8; 32],
158        };
159
160        let mut toc = empty_toc();
161        // If lex feature is enabled, set the catalog flag immediately
162        #[cfg(feature = "lex")]
163        {
164            toc.segment_catalog.lex_enabled = true;
165        }
166        file.set_len(header.footer_offset)?;
167        HeaderCodec::write(&mut file, &header)?;
168
169        let wal = EmbeddedWal::open(&file, &header)?;
170        let data_end = header.footer_offset;
171        #[cfg(feature = "lex")]
172        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
173        #[cfg(feature = "parallel_segments")]
174        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
175        #[cfg(feature = "parallel_segments")]
176        let manifest_wal_entries = manifest_wal.replay()?;
177
178        // No frames yet, so payload region ends at WAL boundary
179        let cached_payload_end = header.wal_offset + header.wal_size;
180
181        let mut memvid = Self {
182            file,
183            path: path_ref.to_path_buf(),
184            lock,
185            read_only: false,
186            header,
187            toc,
188            wal,
189            pending_frame_inserts: 0,
190            data_end,
191            cached_payload_end,
192            generation: 0,
193            lock_settings: LockSettings::default(),
194            lex_enabled: cfg!(feature = "lex"), // Enable by default if feature is enabled
195            lex_index: None,
196            #[cfg(feature = "lex")]
197            lex_storage,
198            vec_enabled: cfg!(feature = "vec"), // Enable by default if feature is enabled
199            vec_compression: VectorCompression::None,
200            vec_model: None,
201            vec_index: None,
202            clip_enabled: cfg!(feature = "clip"), // Enable by default if feature is enabled
203            clip_index: None,
204            dirty: false,
205            #[cfg(feature = "lex")]
206            tantivy: None,
207            #[cfg(feature = "lex")]
208            tantivy_dirty: false,
209            #[cfg(feature = "temporal_track")]
210            temporal_track: None,
211            #[cfg(feature = "parallel_segments")]
212            manifest_wal: Some(manifest_wal),
213            memories_track: MemoriesTrack::new(),
214            logic_mesh: LogicMesh::new(),
215            sketch_track: SketchTrack::default(),
216            schema_registry: SchemaRegistry::new(),
217            schema_strict: false,
218            batch_opts: None,
219            #[cfg(feature = "replay")]
220            active_session: None,
221            #[cfg(feature = "replay")]
222            completed_sessions: Vec::new(),
223        };
224
225        #[cfg(feature = "lex")]
226        memvid.init_tantivy()?;
227
228        #[cfg(feature = "parallel_segments")]
229        memvid.load_manifest_segments(manifest_wal_entries);
230
231        memvid.bootstrap_segment_catalog();
232
233        // Create empty manifests for enabled indexes so they persist across open/close
234        let empty_offset = memvid.data_end;
235        let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
236                                \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
237
238        #[cfg(feature = "lex")]
239        if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
240            memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
241                doc_count: 0,
242                generation: 0,
243                bytes_offset: empty_offset,
244                bytes_length: 0,
245                checksum: empty_checksum,
246            });
247        }
248
249        #[cfg(feature = "vec")]
250        if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
251            memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
252                vector_count: 0,
253                dimension: 0,
254                bytes_offset: empty_offset,
255                bytes_length: 0,
256                checksum: empty_checksum,
257                compression_mode: memvid.vec_compression.clone(),
258                model: memvid.vec_model.clone(),
259            });
260        }
261
262        memvid.rewrite_toc_footer()?;
263        memvid.header.toc_checksum = memvid.toc.toc_checksum;
264        crate::persist_header(&mut memvid.file, &memvid.header)?;
265        memvid.file.sync_all()?;
266        Ok(memvid)
267    }
268
269    #[must_use]
270    pub fn lock_settings(&self) -> &LockSettings {
271        &self.lock_settings
272    }
273
274    pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
275        &mut self.lock_settings
276    }
277
278    /// Set the vector compression mode for this memory
279    /// Must be called before ingesting documents with embeddings
280    pub fn set_vector_compression(&mut self, compression: VectorCompression) {
281        self.vec_compression = compression;
282    }
283
284    /// Get the current vector compression mode
285    #[must_use]
286    pub fn vector_compression(&self) -> &VectorCompression {
287        &self.vec_compression
288    }
289
290    /// Predict the next frame ID that would be assigned to a new insert.
291    ///
292    /// Frame IDs are dense indices into `toc.frames`. When a memory is mutable, inserts are first
293    /// appended to the embedded WAL and only materialized into `toc.frames` on commit. This helper
294    /// lets frontends allocate stable frame IDs before an explicit commit.
295    #[must_use]
296    pub fn next_frame_id(&self) -> u64 {
297        (self.toc.frames.len() as u64).saturating_add(self.pending_frame_inserts)
298    }
299
300    /// Returns the total number of frames in the memory.
301    ///
302    /// This includes all frames regardless of status (active, deleted, etc.).
303    #[must_use]
304    pub fn frame_count(&self) -> usize {
305        self.toc.frames.len()
306    }
307
308    fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
309        // Fast-path detection for encrypted capsules (.mv2e).
310        // This avoids confusing "invalid header" errors and provides an actionable hint.
311        let mut magic = [0u8; 4];
312        let is_mv2e = file.read_exact(&mut magic).is_ok() && magic == *b"MV2E";
313        file.seek(SeekFrom::Start(0))?;
314        if is_mv2e {
315            return Err(MemvidError::EncryptedFile {
316                path: path_ref.to_path_buf(),
317                hint: format!("Run: memvid unlock {}", path_ref.display()),
318            });
319        }
320
321        let mut header = HeaderCodec::read(&mut file)?;
322        let toc = match read_toc(&mut file, &header) {
323            Ok(toc) => toc,
324            Err(err @ (MemvidError::Decode(_) | MemvidError::InvalidToc { .. })) => {
325                tracing::info!("toc decode failed ({}); attempting recovery", err);
326                let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
327                if recovered_offset != header.footer_offset
328                    || header.toc_checksum != toc.toc_checksum
329                {
330                    header.footer_offset = recovered_offset;
331                    header.toc_checksum = toc.toc_checksum;
332                    crate::persist_header(&mut file, &header)?;
333                }
334                toc
335            }
336            Err(err) => return Err(err),
337        };
338        let checksum_result = toc.verify_checksum();
339
340        // Validate segment integrity early to catch corruption before loading indexes
341        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
342        if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
343            tracing::warn!("Segment integrity validation failed: {}", e);
344            // Don't fail file open - let doctor handle it
345            // This is just an early warning system
346        }
347        ensure_non_overlapping_frames(&toc, file_len)?;
348
349        let wal = EmbeddedWal::open(&file, &header)?;
350        #[cfg(feature = "lex")]
351        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
352            toc.indexes.lex.as_ref(),
353            &toc.indexes.lex_segments,
354        )));
355        #[cfg(feature = "parallel_segments")]
356        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
357        #[cfg(feature = "parallel_segments")]
358        let manifest_wal_entries = manifest_wal.replay()?;
359
360        let generation = detect_generation(&file)?.unwrap_or(0);
361        let read_only = lock.mode() == LockMode::Shared;
362
363        let mut memvid = Self {
364            file,
365            path: path_ref.to_path_buf(),
366            lock,
367            read_only,
368            header,
369            toc,
370            wal,
371            pending_frame_inserts: 0,
372            data_end: 0,
373            cached_payload_end: 0,
374            generation,
375            lock_settings: LockSettings::default(),
376            lex_enabled: false,
377            lex_index: None,
378            #[cfg(feature = "lex")]
379            lex_storage,
380            vec_enabled: false,
381            vec_compression: VectorCompression::None,
382            vec_model: None,
383            vec_index: None,
384            clip_enabled: false,
385            clip_index: None,
386            dirty: false,
387            #[cfg(feature = "lex")]
388            tantivy: None,
389            #[cfg(feature = "lex")]
390            tantivy_dirty: false,
391            #[cfg(feature = "temporal_track")]
392            temporal_track: None,
393            #[cfg(feature = "parallel_segments")]
394            manifest_wal: Some(manifest_wal),
395            memories_track: MemoriesTrack::new(),
396            logic_mesh: LogicMesh::new(),
397            sketch_track: SketchTrack::default(),
398            schema_registry: SchemaRegistry::new(),
399            schema_strict: false,
400            batch_opts: None,
401            #[cfg(feature = "replay")]
402            active_session: None,
403            #[cfg(feature = "replay")]
404            completed_sessions: Vec::new(),
405        };
406        memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
407        // One-time O(n) scan to initialize cached_payload_end from existing frames
408        memvid.cached_payload_end = compute_payload_region_end(&memvid.toc, &memvid.header);
409        // Use consolidated helper for lex_enabled check
410        memvid.lex_enabled = has_lex_index(&memvid.toc);
411        if memvid.lex_enabled {
412            memvid.load_lex_index_from_manifest()?;
413        }
414        #[cfg(feature = "lex")]
415        {
416            memvid.init_tantivy()?;
417        }
418        memvid.vec_enabled =
419            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
420        if memvid.vec_enabled {
421            memvid.load_vec_index_from_manifest()?;
422        }
423        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
424        if memvid.clip_enabled {
425            memvid.load_clip_index_from_manifest()?;
426        }
427        memvid.recover_wal()?;
428        #[cfg(feature = "parallel_segments")]
429        memvid.load_manifest_segments(manifest_wal_entries);
430        memvid.bootstrap_segment_catalog();
431        #[cfg(feature = "temporal_track")]
432        memvid.ensure_temporal_track_loaded()?;
433        memvid.load_memories_track()?;
434        memvid.load_logic_mesh()?;
435        memvid.load_sketch_track()?;
436        if checksum_result.is_err() {
437            memvid.toc.verify_checksum()?;
438            if memvid.toc.toc_checksum != memvid.header.toc_checksum {
439                memvid.header.toc_checksum = memvid.toc.toc_checksum;
440                crate::persist_header(&mut memvid.file, &memvid.header)?;
441                memvid.file.sync_all()?;
442            }
443        }
444        Ok(memvid)
445    }
446
447    /// Open an existing `.mv2` with exclusive access, performing recovery if needed.
448    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
449        let path_ref = path.as_ref();
450        ensure_single_file(path_ref)?;
451
452        let (file, lock) = FileLock::open_and_lock(path_ref)?;
453        Self::open_locked(file, lock, path_ref)
454    }
455
456    pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
457        Self::open_read_only_with_options(path, OpenReadOptions::default())
458    }
459
460    pub fn open_read_only_with_options<P: AsRef<Path>>(
461        path: P,
462        options: OpenReadOptions,
463    ) -> Result<Self> {
464        let path_ref = path.as_ref();
465        ensure_single_file(path_ref)?;
466
467        if options.allow_repair {
468            return Self::open(path_ref);
469        }
470
471        Self::open_read_only_snapshot(path_ref)
472    }
473
474    fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
475        let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
476        let TailSnapshot {
477            toc,
478            footer_offset,
479            data_end,
480            generation,
481        } = load_tail_snapshot(&file)?;
482
483        let mut header = HeaderCodec::read(&mut file)?;
484        header.footer_offset = footer_offset;
485        header.toc_checksum = toc.toc_checksum;
486
487        let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
488        let wal = EmbeddedWal::open_read_only(&file, &header)?;
489
490        #[cfg(feature = "lex")]
491        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
492            toc.indexes.lex.as_ref(),
493            &toc.indexes.lex_segments,
494        )));
495
496        let cached_payload_end = compute_payload_region_end(&toc, &header);
497
498        let mut memvid = Self {
499            file,
500            path: path_ref.to_path_buf(),
501            lock,
502            read_only: true,
503            header,
504            toc,
505            wal,
506            pending_frame_inserts: 0,
507            data_end,
508            cached_payload_end,
509            generation,
510            lock_settings: LockSettings::default(),
511            lex_enabled: false,
512            lex_index: None,
513            #[cfg(feature = "lex")]
514            lex_storage,
515            vec_enabled: false,
516            vec_compression: VectorCompression::None,
517            vec_model: None,
518            vec_index: None,
519            clip_enabled: false,
520            clip_index: None,
521            dirty: false,
522            #[cfg(feature = "lex")]
523            tantivy: None,
524            #[cfg(feature = "lex")]
525            tantivy_dirty: false,
526            #[cfg(feature = "temporal_track")]
527            temporal_track: None,
528            #[cfg(feature = "parallel_segments")]
529            manifest_wal: None,
530            memories_track: MemoriesTrack::new(),
531            logic_mesh: LogicMesh::new(),
532            sketch_track: SketchTrack::default(),
533            schema_registry: SchemaRegistry::new(),
534            schema_strict: false,
535            batch_opts: None,
536            #[cfg(feature = "replay")]
537            active_session: None,
538            #[cfg(feature = "replay")]
539            completed_sessions: Vec::new(),
540        };
541
542        // Use consolidated helper for lex_enabled check
543        memvid.lex_enabled = has_lex_index(&memvid.toc);
544        if memvid.lex_enabled {
545            memvid.load_lex_index_from_manifest()?;
546        }
547        #[cfg(feature = "lex")]
548        memvid.init_tantivy()?;
549
550        memvid.vec_enabled =
551            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
552        if memvid.vec_enabled {
553            memvid.load_vec_index_from_manifest()?;
554        }
555        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
556        if memvid.clip_enabled {
557            memvid.load_clip_index_from_manifest()?;
558        }
559        // Load memories track, Logic-Mesh, and sketch track if present
560        memvid.load_memories_track()?;
561        memvid.load_logic_mesh()?;
562        memvid.load_sketch_track()?;
563
564        memvid.bootstrap_segment_catalog();
565        #[cfg(feature = "temporal_track")]
566        memvid.ensure_temporal_track_loaded()?;
567
568        Ok(memvid)
569    }
570
571    pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
572        let path_ref = path.as_ref();
573        ensure_single_file(path_ref)?;
574
575        let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
576        let lock = match FileLock::try_acquire(&file, path_ref)? {
577            Some(lock) => lock,
578            None => {
579                return Err(MemvidError::Lock(
580                    "exclusive access unavailable for doctor".to_string(),
581                ));
582            }
583        };
584        Self::open_locked(file, lock, path_ref)
585    }
586
587    fn bootstrap_segment_catalog(&mut self) {
588        let catalog = &mut self.toc.segment_catalog;
589        if catalog.version == 0 {
590            catalog.version = 1;
591        }
592        if catalog.next_segment_id == 0 {
593            let mut max_id = 0u64;
594            for descriptor in &catalog.lex_segments {
595                max_id = max_id.max(descriptor.common.segment_id);
596            }
597            for descriptor in &catalog.vec_segments {
598                max_id = max_id.max(descriptor.common.segment_id);
599            }
600            for descriptor in &catalog.time_segments {
601                max_id = max_id.max(descriptor.common.segment_id);
602            }
603            #[cfg(feature = "temporal_track")]
604            for descriptor in &catalog.temporal_segments {
605                max_id = max_id.max(descriptor.common.segment_id);
606            }
607            #[cfg(feature = "parallel_segments")]
608            for descriptor in &catalog.index_segments {
609                max_id = max_id.max(descriptor.common.segment_id);
610            }
611            if max_id > 0 {
612                catalog.next_segment_id = max_id.saturating_add(1);
613            }
614        }
615    }
616
617    #[cfg(feature = "parallel_segments")]
618    fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
619        if entries.is_empty() {
620            return;
621        }
622        for entry in entries {
623            let duplicate = self
624                .toc
625                .segment_catalog
626                .index_segments
627                .iter()
628                .any(|existing| existing.common.segment_id == entry.common.segment_id);
629            if !duplicate {
630                self.toc.segment_catalog.index_segments.push(entry);
631            }
632        }
633    }
634
635    /// Load the memories track from the manifest if present.
636    fn load_memories_track(&mut self) -> Result<()> {
637        let manifest = match &self.toc.memories_track {
638            Some(m) => m,
639            None => return Ok(()),
640        };
641
642        // Read the compressed data from the file
643        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
644            return Err(MemvidError::InvalidToc {
645                reason: "memories track exceeds safety limit".into(),
646            });
647        }
648        // Safe: guarded by MAX_INDEX_BYTES check above
649        #[allow(clippy::cast_possible_truncation)]
650        let mut buf = vec![0u8; manifest.bytes_length as usize];
651        self.file
652            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
653        self.file.read_exact(&mut buf)?;
654
655        // Verify checksum
656        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
657        if actual_checksum != manifest.checksum {
658            return Err(MemvidError::InvalidToc {
659                reason: "memories track checksum mismatch".into(),
660            });
661        }
662
663        // Deserialize the memories track
664        self.memories_track = MemoriesTrack::deserialize(&buf)?;
665
666        Ok(())
667    }
668
669    /// Load the Logic-Mesh from the manifest if present.
670    fn load_logic_mesh(&mut self) -> Result<()> {
671        let manifest = match &self.toc.logic_mesh {
672            Some(m) => m,
673            None => return Ok(()),
674        };
675
676        // Read the serialized data from the file
677        if manifest.bytes_length > crate::MAX_INDEX_BYTES {
678            return Err(MemvidError::InvalidToc {
679                reason: "logic mesh exceeds safety limit".into(),
680            });
681        }
682        // Safe: guarded by MAX_INDEX_BYTES check above
683        #[allow(clippy::cast_possible_truncation)]
684        let mut buf = vec![0u8; manifest.bytes_length as usize];
685        self.file
686            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
687        self.file.read_exact(&mut buf)?;
688
689        // Verify checksum
690        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
691        if actual_checksum != manifest.checksum {
692            return Err(MemvidError::InvalidToc {
693                reason: "logic mesh checksum mismatch".into(),
694            });
695        }
696
697        // Deserialize the logic mesh
698        self.logic_mesh = LogicMesh::deserialize(&buf)?;
699
700        Ok(())
701    }
702
703    /// Load the sketch track from the manifest if present.
704    fn load_sketch_track(&mut self) -> Result<()> {
705        let manifest = match &self.toc.sketch_track {
706            Some(m) => m.clone(),
707            None => return Ok(()),
708        };
709
710        // Read and deserialize the sketch track (read_sketch_track handles seeking and checksum)
711        self.sketch_track = crate::types::read_sketch_track(
712            &mut self.file,
713            manifest.bytes_offset,
714            manifest.bytes_length,
715        )?;
716
717        Ok(())
718    }
719
720    #[cfg(feature = "temporal_track")]
721    pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
722        if self.temporal_track.is_some() {
723            return Ok(());
724        }
725        let manifest = match &self.toc.temporal_track {
726            Some(manifest) => manifest.clone(),
727            None => return Ok(()),
728        };
729        if manifest.bytes_length == 0 {
730            return Ok(());
731        }
732        let file_len = self.file.metadata()?.len();
733        let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
734            return Ok(());
735        };
736        if end > file_len {
737            return Ok(());
738        }
739        match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
740            Ok(track) => self.temporal_track = Some(track),
741            Err(MemvidError::InvalidTemporalTrack { .. }) => {
742                return Ok(());
743            }
744            Err(err) => return Err(err),
745        }
746        Ok(())
747    }
748
749    #[cfg(feature = "temporal_track")]
750    pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
751        self.ensure_temporal_track_loaded()?;
752        Ok(self.temporal_track.as_ref())
753    }
754
755    #[cfg(feature = "temporal_track")]
756    pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
757        self.ensure_temporal_track_loaded()?;
758        let Some(track) = self.temporal_track.as_ref() else {
759            return Ok(None);
760        };
761        if !track.capabilities().has_anchors {
762            return Ok(None);
763        }
764        Ok(track
765            .anchor_for_frame(frame_id)
766            .map(|anchor| anchor.anchor_ts))
767    }
768
769    #[cfg(feature = "temporal_track")]
770    pub(crate) fn clear_temporal_track_cache(&mut self) {
771        self.temporal_track = None;
772    }
773
774    #[cfg(feature = "temporal_track")]
775    pub(crate) fn effective_temporal_timestamp(
776        &mut self,
777        frame_id: FrameId,
778        fallback: i64,
779    ) -> Result<i64> {
780        Ok(self
781            .temporal_anchor_timestamp(frame_id)?
782            .unwrap_or(fallback))
783    }
784
785    #[cfg(not(feature = "temporal_track"))]
786    pub(crate) fn effective_temporal_timestamp(
787        &mut self,
788        _frame_id: crate::types::FrameId,
789        fallback: i64,
790    ) -> Result<i64> {
791        Ok(fallback)
792    }
793
794    /// Get current memory binding information.
795    ///
796    /// Returns the binding if this file is bound to a dashboard memory,
797    /// or None if unbound.
798    #[must_use]
799    pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
800        self.toc.memory_binding.as_ref()
801    }
802
803    /// Bind this file to a dashboard memory.
804    ///
805    /// This stores the binding in the TOC and applies a temporary ticket for initial binding.
806    /// The caller should follow up with `apply_signed_ticket` for cryptographic verification.
807    ///
808    /// # Errors
809    ///
810    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
811    #[allow(deprecated)]
812    pub fn bind_memory(
813        &mut self,
814        binding: crate::types::MemoryBinding,
815        ticket: crate::types::Ticket,
816    ) -> Result<()> {
817        // Check existing binding
818        if let Some(existing) = self.get_memory_binding() {
819            if existing.memory_id != binding.memory_id {
820                return Err(MemvidError::MemoryAlreadyBound {
821                    existing_memory_id: existing.memory_id,
822                    existing_memory_name: existing.memory_name.clone(),
823                    bound_at: existing.bound_at.to_rfc3339(),
824                });
825            }
826        }
827
828        // Apply ticket for capacity
829        self.apply_ticket(ticket)?;
830
831        // Store binding in TOC
832        self.toc.memory_binding = Some(binding);
833        self.dirty = true;
834
835        Ok(())
836    }
837
838    /// Set only the memory binding without applying a ticket.
839    ///
840    /// This is used when the caller will immediately follow up with `apply_signed_ticket`
841    /// to apply the cryptographically verified ticket. This avoids the sequence number
842    /// conflict that occurs when using `bind_memory` with a temporary ticket.
843    ///
844    /// # Errors
845    ///
846    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
847    pub fn set_memory_binding_only(&mut self, binding: crate::types::MemoryBinding) -> Result<()> {
848        self.ensure_writable()?;
849
850        // Check existing binding
851        if let Some(existing) = self.get_memory_binding() {
852            if existing.memory_id != binding.memory_id {
853                return Err(MemvidError::MemoryAlreadyBound {
854                    existing_memory_id: existing.memory_id,
855                    existing_memory_name: existing.memory_name.clone(),
856                    bound_at: existing.bound_at.to_rfc3339(),
857                });
858            }
859        }
860
861        // Store binding in TOC (without applying a ticket)
862        self.toc.memory_binding = Some(binding);
863        self.dirty = true;
864
865        Ok(())
866    }
867
868    /// Unbind this file from its dashboard memory.
869    ///
870    /// This clears the binding and reverts to free tier capacity (1 GB).
871    pub fn unbind_memory(&mut self) -> Result<()> {
872        self.toc.memory_binding = None;
873        // Revert to free tier
874        self.toc.ticket_ref = crate::types::TicketRef {
875            issuer: "free-tier".into(),
876            seq_no: 1,
877            expires_in_secs: 0,
878            capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
879            verified: false,
880        };
881        self.dirty = true;
882        Ok(())
883    }
884}
885
886pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
887    use crate::footer::{CommitFooter, FOOTER_SIZE};
888
889    let len = file.metadata()?.len();
890    if len < header.footer_offset {
891        return Err(MemvidError::InvalidToc {
892            reason: "footer offset beyond file length".into(),
893        });
894    }
895
896    // Read the entire region from footer_offset to EOF (includes TOC + footer)
897    file.seek(SeekFrom::Start(header.footer_offset))?;
898    // Safe: total_size bounded by file length, and we check MAX_INDEX_BYTES before reading
899    #[allow(clippy::cast_possible_truncation)]
900    let total_size = (len - header.footer_offset) as usize;
901    if total_size as u64 > crate::MAX_INDEX_BYTES {
902        return Err(MemvidError::InvalidToc {
903            reason: "toc region exceeds safety limit".into(),
904        });
905    }
906
907    if total_size < FOOTER_SIZE {
908        return Err(MemvidError::InvalidToc {
909            reason: "region too small to contain footer".into(),
910        });
911    }
912
913    let mut buf = Vec::with_capacity(total_size);
914    file.read_to_end(&mut buf)?;
915
916    // Parse the footer (last FOOTER_SIZE bytes)
917    let footer_start = buf.len() - FOOTER_SIZE;
918    let footer_bytes = &buf[footer_start..];
919    let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
920        reason: "failed to decode commit footer".into(),
921    })?;
922
923    // Extract only the TOC bytes (excluding the footer)
924    let toc_bytes = &buf[..footer_start];
925    #[allow(clippy::cast_possible_truncation)]
926    if toc_bytes.len() != footer.toc_len as usize {
927        return Err(MemvidError::InvalidToc {
928            reason: "toc length mismatch".into(),
929        });
930    }
931    if !footer.hash_matches(toc_bytes) {
932        return Err(MemvidError::InvalidToc {
933            reason: "commit footer toc hash mismatch".into(),
934        });
935    }
936
937    verify_toc_prefix(toc_bytes)?;
938    let toc = Toc::decode(toc_bytes)?;
939    Ok(toc)
940}
941
942fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
943    const MAX_SEGMENTS: u64 = 1_000_000;
944    const MAX_FRAMES: u64 = 1_000_000;
945    const MIN_SEGMENT_META_BYTES: u64 = 32;
946    const MIN_FRAME_BYTES: u64 = 64;
947    // TOC trailer layout (little-endian):
948    // [toc_version:u64][segments_len:u64][frames_len:u64]...
949    let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
950        let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
951            reason: context.to_string().into(),
952        })?;
953        let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
954            reason: context.to_string().into(),
955        })?;
956        Ok(u64::from_le_bytes(array))
957    };
958
959    if bytes.len() < 24 {
960        return Err(MemvidError::InvalidToc {
961            reason: "toc trailer too small".into(),
962        });
963    }
964    let toc_version = read_u64(0..8, "toc version missing or truncated")?;
965    if toc_version > 32 {
966        return Err(MemvidError::InvalidToc {
967            reason: "toc version unreasonable".into(),
968        });
969    }
970    let segments_len = read_u64(8..16, "segment count missing or truncated")?;
971    if segments_len > MAX_SEGMENTS {
972        return Err(MemvidError::InvalidToc {
973            reason: "segment count unreasonable".into(),
974        });
975    }
976    let frames_len = read_u64(16..24, "frame count missing or truncated")?;
977    if frames_len > MAX_FRAMES {
978        return Err(MemvidError::InvalidToc {
979            reason: "frame count unreasonable".into(),
980        });
981    }
982    let required = segments_len
983        .saturating_mul(MIN_SEGMENT_META_BYTES)
984        .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
985    if required > bytes.len() as u64 {
986        return Err(MemvidError::InvalidToc {
987            reason: "toc payload inconsistent with counts".into(),
988        });
989    }
990    Ok(())
991}
992
993/// Ensure frame payloads do not overlap each other or exceed file boundary.
994///
995/// Frames in the TOC are ordered by `frame_id`, not by `payload_offset`, so we must
996/// sort by `payload_offset` before checking for overlaps.
997///
998/// Note: Frames with `payload_length` == 0 are "virtual" frames (e.g., document
999/// frames that reference chunks) and are skipped from this check.
1000fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
1001    // Collect active frames with actual payloads and sort by payload_offset
1002    let mut frames_by_offset: Vec<_> = toc
1003        .frames
1004        .iter()
1005        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1006        .collect();
1007    frames_by_offset.sort_by_key(|f| f.payload_offset);
1008
1009    let mut previous_end = 0u64;
1010    for frame in frames_by_offset {
1011        let end = frame
1012            .payload_offset
1013            .checked_add(frame.payload_length)
1014            .ok_or_else(|| MemvidError::InvalidToc {
1015                reason: "frame payload offsets overflow".into(),
1016            })?;
1017        if end > file_len {
1018            return Err(MemvidError::InvalidToc {
1019                reason: "frame payload exceeds file length".into(),
1020            });
1021        }
1022        if frame.payload_offset < previous_end {
1023            return Err(MemvidError::InvalidToc {
1024                reason: format!(
1025                    "frame {} payload overlaps with previous frame (offset {} < previous end {})",
1026                    frame.id, frame.payload_offset, previous_end
1027                )
1028                .into(),
1029            });
1030        }
1031        previous_end = end;
1032    }
1033    Ok(())
1034}
1035
1036pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
1037    let len = file.metadata()?.len();
1038    // Safety: we only create a read-only mapping over stable file bytes.
1039    let mmap = unsafe { Mmap::map(&*file)? };
1040    tracing::debug!(file_len = len, "attempting toc recovery");
1041
1042    // First, try to find a valid footer which includes validated TOC bytes
1043    if let Some(footer_slice) = find_last_valid_footer(&mmap) {
1044        tracing::debug!(
1045            footer_offset = footer_slice.footer_offset,
1046            toc_offset = footer_slice.toc_offset,
1047            toc_len = footer_slice.toc_bytes.len(),
1048            "found valid footer during recovery"
1049        );
1050        // The footer has already validated the TOC hash, so we can directly decode it
1051        match Toc::decode(footer_slice.toc_bytes) {
1052            Ok(toc) => {
1053                return Ok((toc, footer_slice.toc_offset as u64));
1054            }
1055            Err(err) => {
1056                tracing::warn!(
1057                    error = %err,
1058                    "footer-validated TOC failed to decode, falling back to scan"
1059                );
1060            }
1061        }
1062    }
1063
1064    // If we have a header-provided hint (`footer_offset`) but the commit footer itself is corrupted,
1065    // we can often still recover because the TOC bytes are intact. In that case, assume the TOC
1066    // spans from `hint` up to the final fixed-size commit footer and decode it best-effort.
1067    if let Some(hint_offset) = hint {
1068        use crate::footer::FOOTER_SIZE;
1069
1070        // Safe: file successfully mmapped so length fits in usize
1071        #[allow(clippy::cast_possible_truncation)]
1072        let start = (hint_offset.min(len)) as usize;
1073        if mmap.len().saturating_sub(start) >= FOOTER_SIZE {
1074            let toc_end = mmap.len().saturating_sub(FOOTER_SIZE);
1075            if toc_end > start {
1076                let toc_bytes = &mmap[start..toc_end];
1077                if verify_toc_prefix(toc_bytes).is_ok() {
1078                    let attempt = panic::catch_unwind(|| Toc::decode(toc_bytes));
1079                    if let Ok(Ok(toc)) = attempt {
1080                        tracing::debug!(
1081                            recovered_offset = hint_offset,
1082                            recovered_frames = toc.frames.len(),
1083                            "recovered toc from hinted offset without validated footer"
1084                        );
1085                        return Ok((toc, hint_offset));
1086                    }
1087                }
1088            }
1089        }
1090    }
1091
1092    // Fallback to manual scan if footer-based recovery failed
1093    let mut ranges = Vec::new();
1094    if let Some(hint_offset) = hint {
1095        // Safe: file successfully mmapped so length fits in usize
1096        #[allow(clippy::cast_possible_truncation)]
1097        let hint_idx = hint_offset.min(len) as usize;
1098        ranges.push((hint_idx, mmap.len()));
1099        if hint_idx > 0 {
1100            ranges.push((0, hint_idx));
1101        }
1102    } else {
1103        ranges.push((0, mmap.len()));
1104    }
1105
1106    for (start, end) in ranges {
1107        if let Some(found) = scan_range_for_toc(&mmap, start, end) {
1108            return Ok(found);
1109        }
1110    }
1111
1112    Err(MemvidError::InvalidToc {
1113        reason: "unable to recover table of contents from file trailer".into(),
1114    })
1115}
1116
1117fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
1118    if start >= end || end > data.len() {
1119        return None;
1120    }
1121    const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
1122    const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
1123
1124    // We only ever consider offsets where the candidate TOC slice would be <= MAX_TOC_BYTES,
1125    // otherwise the loop devolves into iterating over the entire file for large memories.
1126    let min_offset = data.len().saturating_sub(MAX_TOC_BYTES);
1127    let scan_start = start.max(min_offset);
1128
1129    for offset in (scan_start..end).rev() {
1130        let slice = &data[offset..];
1131        if slice.len() < 16 {
1132            continue;
1133        }
1134        debug_assert!(slice.len() <= MAX_TOC_BYTES);
1135
1136        // No footer found - try old format with checksum
1137        if slice.len() < ZERO_CHECKSUM.len() {
1138            continue;
1139        }
1140        let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
1141        let mut hasher = Hasher::new();
1142        hasher.update(body);
1143        hasher.update(&ZERO_CHECKSUM);
1144        if hasher.finalize().as_bytes() != stored_checksum {
1145            continue;
1146        }
1147        if verify_toc_prefix(slice).is_err() {
1148            continue;
1149        }
1150        let attempt = panic::catch_unwind(|| Toc::decode(slice));
1151        if let Ok(Ok(toc)) = attempt {
1152            let recovered_offset = offset as u64;
1153            tracing::debug!(
1154                recovered_offset,
1155                recovered_frames = toc.frames.len(),
1156                "recovered toc via scan"
1157            );
1158            return Some((toc, recovered_offset));
1159        }
1160    }
1161    None
1162}
1163
1164pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
1165    toc.toc_checksum = [0u8; 32];
1166    let bytes = toc.encode()?;
1167    let checksum = Toc::calculate_checksum(&bytes);
1168    toc.toc_checksum = checksum;
1169    toc.encode()
1170}
1171
1172pub(crate) fn empty_toc() -> Toc {
1173    Toc {
1174        toc_version: 0,
1175        segments: Vec::new(),
1176        frames: Vec::new(),
1177        indexes: IndexManifests::default(),
1178        time_index: None,
1179        temporal_track: None,
1180        memories_track: None,
1181        logic_mesh: None,
1182        sketch_track: None,
1183        segment_catalog: SegmentCatalog::default(),
1184        ticket_ref: TicketRef {
1185            issuer: "free-tier".into(),
1186            seq_no: 1,
1187            expires_in_secs: 0,
1188            capacity_bytes: Tier::Free.capacity_bytes(),
1189            verified: false,
1190        },
1191        memory_binding: None,
1192        replay_manifest: None,
1193        enrichment_queue: crate::types::EnrichmentQueueManifest::default(),
1194        merkle_root: [0u8; 32],
1195        toc_checksum: [0u8; 32],
1196    }
1197}
1198
1199/// Compute the end of the payload region from frame payloads only.
1200/// Used once at open time to seed `cached_payload_end`.
1201pub(crate) fn compute_payload_region_end(toc: &Toc, header: &Header) -> u64 {
1202    let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1203    let mut max_end = wal_region_end;
1204    for frame in &toc.frames {
1205        if frame.payload_length != 0 {
1206            if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1207                max_end = max_end.max(end);
1208            }
1209        }
1210    }
1211    max_end
1212}
1213
1214pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1215    // `data_end` tracks the end of all data bytes that should not be overwritten by appends:
1216    // - frame payloads
1217    // - embedded indexes / metadata segments referenced by the TOC
1218    // - the current footer boundary (TOC offset), since callers may safely overwrite old TOCs
1219    //
1220    // Keeping this conservative prevents WAL replay / appends from corrupting embedded segments.
1221    let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1222    let mut max_end = wal_region_end.max(header.footer_offset);
1223
1224    // Frame payloads (active only).
1225    for frame in toc
1226        .frames
1227        .iter()
1228        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1229    {
1230        if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1231            max_end = max_end.max(end);
1232        }
1233    }
1234
1235    // Segment catalog entries.
1236    let catalog = &toc.segment_catalog;
1237    for seg in &catalog.lex_segments {
1238        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1239            max_end = max_end.max(end);
1240        }
1241    }
1242    for seg in &catalog.vec_segments {
1243        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1244            max_end = max_end.max(end);
1245        }
1246    }
1247    for seg in &catalog.time_segments {
1248        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1249            max_end = max_end.max(end);
1250        }
1251    }
1252    #[cfg(feature = "temporal_track")]
1253    for seg in &catalog.temporal_segments {
1254        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1255            max_end = max_end.max(end);
1256        }
1257    }
1258    #[cfg(feature = "lex")]
1259    for seg in &catalog.tantivy_segments {
1260        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1261            max_end = max_end.max(end);
1262        }
1263    }
1264    #[cfg(feature = "parallel_segments")]
1265    for seg in &catalog.index_segments {
1266        if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1267            max_end = max_end.max(end);
1268        }
1269    }
1270
1271    // Global manifests (non-segment storage paths).
1272    if let Some(manifest) = toc.indexes.lex.as_ref() {
1273        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1274            max_end = max_end.max(end);
1275        }
1276    }
1277    if let Some(manifest) = toc.indexes.vec.as_ref() {
1278        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1279            max_end = max_end.max(end);
1280        }
1281    }
1282    if let Some(manifest) = toc.indexes.clip.as_ref() {
1283        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1284            max_end = max_end.max(end);
1285        }
1286    }
1287    if let Some(manifest) = toc.time_index.as_ref() {
1288        if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1289            max_end = max_end.max(end);
1290        }
1291    }
1292    #[cfg(feature = "temporal_track")]
1293    if let Some(track) = toc.temporal_track.as_ref() {
1294        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1295            max_end = max_end.max(end);
1296        }
1297    }
1298    if let Some(track) = toc.memories_track.as_ref() {
1299        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1300            max_end = max_end.max(end);
1301        }
1302    }
1303    if let Some(mesh) = toc.logic_mesh.as_ref() {
1304        if let Some(end) = mesh.bytes_offset.checked_add(mesh.bytes_length) {
1305            max_end = max_end.max(end);
1306        }
1307    }
1308    if let Some(track) = toc.sketch_track.as_ref() {
1309        if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1310            max_end = max_end.max(end);
1311        }
1312    }
1313    #[cfg(feature = "replay")]
1314    if let Some(manifest) = toc.replay_manifest.as_ref() {
1315        if let Some(end) = manifest.segment_offset.checked_add(manifest.segment_size) {
1316            max_end = max_end.max(end);
1317        }
1318    }
1319
1320    tracing::debug!(
1321        wal_region_end,
1322        footer_offset = header.footer_offset,
1323        computed_data_end = max_end,
1324        "compute_data_end"
1325    );
1326
1327    max_end
1328}
1329
1330struct TailSnapshot {
1331    toc: Toc,
1332    footer_offset: u64,
1333    data_end: u64,
1334    generation: u64,
1335}
1336
1337fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1338    const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1339    if mmap.is_empty() {
1340        return None;
1341    }
1342    let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1343    loop {
1344        let start = mmap.len() - window;
1345        if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1346            return Some((slice, start));
1347        }
1348        if window == mmap.len() {
1349            break;
1350        }
1351        window = (window * 2).min(mmap.len());
1352    }
1353    None
1354}
1355
1356fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1357    // Safety: we only create a read-only mapping over the stable file bytes.
1358    let mmap = unsafe { Mmap::map(file)? };
1359
1360    let (slice, offset_adjustment) =
1361        locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1362            reason: "no valid commit footer found".into(),
1363        })?;
1364    let toc = Toc::decode(slice.toc_bytes)?;
1365    toc.verify_checksum()?;
1366
1367    Ok(TailSnapshot {
1368        toc,
1369        footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1370        // Using toc_offset causes stale data_end that moves footer backwards on next commit
1371        data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1372        generation: slice.footer.generation,
1373    })
1374}
1375
1376fn detect_generation(file: &File) -> Result<Option<u64>> {
1377    // Safety: read-only mapping for footer inspection.
1378    let mmap = unsafe { Mmap::map(file)? };
1379
1380    Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1381}
1382
1383pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1384    if let Some(parent) = path.parent() {
1385        let name = path
1386            .file_name()
1387            .and_then(|n| n.to_str())
1388            .unwrap_or_default();
1389        let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1390        for suffix in forbidden {
1391            let candidate = parent.join(format!("{name}{suffix}"));
1392            if candidate.exists() {
1393                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1394            }
1395        }
1396        let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1397        for suffix in hidden_forbidden {
1398            let candidate = parent.join(format!(".{name}{suffix}"));
1399            if candidate.exists() {
1400                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1401            }
1402        }
1403    }
1404    Ok(())
1405}
1406
1407#[cfg(feature = "parallel_segments")]
1408fn manifest_wal_path(path: &Path) -> PathBuf {
1409    let mut wal_path = path.to_path_buf();
1410    wal_path.set_extension("manifest.wal");
1411    wal_path
1412}
1413
1414#[cfg(feature = "parallel_segments")]
1415pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1416    let wal_path = manifest_wal_path(path);
1417    if wal_path.exists() {
1418        let _ = std::fs::remove_file(&wal_path);
1419    }
1420}
1421
1422/// Single source of truth: does this TOC have a lexical index?
1423/// Checks all possible locations: old manifest, `lex_segments`, and `tantivy_segments`.
1424pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1425    toc.segment_catalog.lex_enabled
1426        || toc.indexes.lex.is_some()
1427        || !toc.indexes.lex_segments.is_empty()
1428        || !toc.segment_catalog.tantivy_segments.is_empty()
1429}
1430
1431/// Single source of truth: expected document count for lex index.
1432/// Returns None if we can't determine (e.g., Tantivy segments without manifest).
1433#[cfg(feature = "lex")]
1434pub(crate) fn lex_doc_count(
1435    toc: &Toc,
1436    lex_storage: &crate::search::EmbeddedLexStorage,
1437) -> Option<u64> {
1438    // First try old manifest
1439    if let Some(manifest) = &toc.indexes.lex {
1440        if manifest.doc_count > 0 {
1441            return Some(manifest.doc_count);
1442        }
1443    }
1444
1445    // Then try lex_storage (contains info from lex_segments)
1446    let storage_count = lex_storage.doc_count();
1447    if storage_count > 0 {
1448        return Some(storage_count);
1449    }
1450
1451    // For Tantivy files with segments but no manifest/storage doc_count,
1452    // we can't know doc count without loading the index.
1453    // Return None and let caller decide (init_tantivy should trust segments exist)
1454    None
1455}
1456
1457/// Validates segment integrity on file open to catch corruption early.
1458/// This helps doctor by detecting issues before they cause problems.
1459#[allow(dead_code)]
1460fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1461    let data_limit = header.footer_offset;
1462
1463    // Validate replay segment (if present). Replay is stored AT the footer boundary,
1464    // and footer_offset is moved forward after writing. So we only check against file_len,
1465    // not against footer_offset (which would be after the replay segment).
1466    #[cfg(feature = "replay")]
1467    if let Some(manifest) = toc.replay_manifest.as_ref() {
1468        if manifest.segment_size != 0 {
1469            let end = manifest
1470                .segment_offset
1471                .checked_add(manifest.segment_size)
1472                .ok_or_else(|| MemvidError::Doctor {
1473                    reason: format!(
1474                        "Replay segment offset overflow: {} + {}",
1475                        manifest.segment_offset, manifest.segment_size
1476                    ),
1477                })?;
1478
1479            // Only check against file_len - replay segments sit at the footer boundary
1480            // and footer_offset is updated to point after them
1481            if end > file_len {
1482                return Err(MemvidError::Doctor {
1483                    reason: format!(
1484                        "Replay segment out of bounds: offset={}, length={}, end={}, file_len={}",
1485                        manifest.segment_offset, manifest.segment_size, end, file_len
1486                    ),
1487                });
1488            }
1489        }
1490    }
1491
1492    // Validate Tantivy segments
1493    for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1494        let offset = seg.common.bytes_offset;
1495        let length = seg.common.bytes_length;
1496
1497        if length == 0 {
1498            continue; // Empty segments are okay
1499        }
1500
1501        let end = offset
1502            .checked_add(length)
1503            .ok_or_else(|| MemvidError::Doctor {
1504                reason: format!("Tantivy segment {idx} offset overflow: {offset} + {length}"),
1505            })?;
1506
1507        if end > file_len || end > data_limit {
1508            return Err(MemvidError::Doctor {
1509                reason: format!(
1510                    "Tantivy segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1511                ),
1512            });
1513        }
1514    }
1515
1516    // Validate time index segments
1517    for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1518        let offset = seg.common.bytes_offset;
1519        let length = seg.common.bytes_length;
1520
1521        if length == 0 {
1522            continue;
1523        }
1524
1525        let end = offset
1526            .checked_add(length)
1527            .ok_or_else(|| MemvidError::Doctor {
1528                reason: format!("Time segment {idx} offset overflow: {offset} + {length}"),
1529            })?;
1530
1531        if end > file_len || end > data_limit {
1532            return Err(MemvidError::Doctor {
1533                reason: format!(
1534                    "Time segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1535                ),
1536            });
1537        }
1538    }
1539
1540    // Validate vec segments
1541    for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1542        let offset = seg.common.bytes_offset;
1543        let length = seg.common.bytes_length;
1544
1545        if length == 0 {
1546            continue;
1547        }
1548
1549        let end = offset
1550            .checked_add(length)
1551            .ok_or_else(|| MemvidError::Doctor {
1552                reason: format!("Vec segment {idx} offset overflow: {offset} + {length}"),
1553            })?;
1554
1555        if end > file_len || end > data_limit {
1556            return Err(MemvidError::Doctor {
1557                reason: format!(
1558                    "Vec segment {idx} out of bounds: offset={offset}, length={length}, end={end}, file_len={file_len}, data_limit={data_limit}"
1559                ),
1560            });
1561        }
1562    }
1563
1564    log::debug!("✓ Segment integrity validation passed");
1565    Ok(())
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570    use super::*;
1571    use tempfile::tempdir;
1572
1573    #[test]
1574    fn toc_prefix_underflow_surfaces_reason() {
1575        let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1576        match err {
1577            MemvidError::InvalidToc { reason } => {
1578                assert!(
1579                    reason.contains("trailer too small"),
1580                    "unexpected reason: {reason}"
1581                );
1582            }
1583            other => panic!("unexpected error: {other:?}"),
1584        }
1585    }
1586
1587    #[test]
1588    fn ensure_single_file_blocks_sidecars() {
1589        let dir = tempdir().expect("tmp");
1590        let path = dir.path().join("mem.mv2");
1591        std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1592        let result = Memvid::create(&path);
1593        assert!(matches!(
1594            result,
1595            Err(MemvidError::AuxiliaryFileDetected { .. })
1596        ));
1597    }
1598}