memvid_core/memvid/
lifecycle.rs

1//! Lifecycle management for creating and opening `.mv2` memories.
2//!
3//! Responsibilities:
4//! - Enforce single-file invariant (no sidecars) and take OS locks.
5//! - Bootstrap headers, internal WAL, and TOC on create, and recover them on open.
6//! - Validate TOC/footer layout, recover the latest valid footer when needed.
7//! - Wire up index state (lex/vector/time) without mutating payload bytes.
8
9use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_MEDIUM};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31    FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, SegmentCatalog, TicketRef, Tier,
32    Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44/// Primary handle for interacting with a `.mv2` memory file.
45///
46/// Holds the file descriptor, lock, header, TOC, and in-memory index state. Mutations
47/// append to the embedded WAL and are materialized at commit time to keep the layout deterministic.
48pub struct Memvid {
49    pub(crate) file: File,
50    pub(crate) path: PathBuf,
51    pub(crate) lock: FileLock,
52    pub(crate) read_only: bool,
53    pub(crate) header: Header,
54    pub(crate) toc: Toc,
55    pub(crate) wal: EmbeddedWal,
56    pub(crate) data_end: u64,
57    pub(crate) generation: u64,
58    pub(crate) lock_settings: LockSettings,
59    pub(crate) lex_enabled: bool,
60    pub(crate) lex_index: Option<LexIndex>,
61    #[cfg(feature = "lex")]
62    #[allow(dead_code)]
63    pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
64    pub(crate) vec_enabled: bool,
65    pub(crate) vec_compression: VectorCompression,
66    pub(crate) vec_index: Option<VecIndex>,
67    /// CLIP visual embeddings index (separate from vec due to different dimensions)
68    pub(crate) clip_enabled: bool,
69    pub(crate) clip_index: Option<crate::clip::ClipIndex>,
70    pub(crate) dirty: bool,
71    #[cfg(feature = "lex")]
72    pub(crate) tantivy: Option<TantivyEngine>,
73    #[cfg(feature = "lex")]
74    pub(crate) tantivy_dirty: bool,
75    #[cfg(feature = "temporal_track")]
76    pub(crate) temporal_track: Option<TemporalTrack>,
77    #[cfg(feature = "parallel_segments")]
78    pub(crate) manifest_wal: Option<ManifestWal>,
79    /// In-memory track for structured memory cards.
80    pub(crate) memories_track: MemoriesTrack,
81    /// In-memory Logic-Mesh graph for entity-relationship traversal.
82    pub(crate) logic_mesh: LogicMesh,
83}
84
85/// Controls read-only open behaviour for `.mv2` memories.
86#[derive(Debug, Clone, Copy)]
87pub struct OpenReadOptions {
88    pub allow_repair: bool,
89}
90
91#[derive(Debug, Clone)]
92pub struct LockSettings {
93    pub timeout_ms: u64,
94    pub heartbeat_ms: u64,
95    pub stale_grace_ms: u64,
96    pub force_stale: bool,
97    pub command: Option<String>,
98}
99
100impl Default for LockSettings {
101    fn default() -> Self {
102        Self {
103            timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
104            heartbeat_ms: DEFAULT_HEARTBEAT_MS,
105            stale_grace_ms: DEFAULT_STALE_GRACE_MS,
106            force_stale: false,
107            command: None,
108        }
109    }
110}
111
112impl Default for OpenReadOptions {
113    fn default() -> Self {
114        Self {
115            allow_repair: false,
116        }
117    }
118}
119
120impl Memvid {
121    /// Create a new, empty `.mv2` file with an embedded WAL and empty TOC.
122    /// The file is locked exclusively for the lifetime of the handle.
123    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
124        let path_ref = path.as_ref();
125        ensure_single_file(path_ref)?;
126
127        OpenOptions::new()
128            .read(true)
129            .write(true)
130            .create(true)
131            .truncate(true)
132            .open(path_ref)?;
133        let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
134
135        let header = Header {
136            magic: MAGIC,
137            version: SPEC_VERSION,
138            footer_offset: WAL_OFFSET + WAL_SIZE_MEDIUM,
139            wal_offset: WAL_OFFSET,
140            wal_size: WAL_SIZE_MEDIUM,
141            wal_checkpoint_pos: 0,
142            wal_sequence: 0,
143            toc_checksum: [0u8; 32],
144        };
145
146        let mut toc = empty_toc();
147        // If lex feature is enabled, set the catalog flag immediately
148        #[cfg(feature = "lex")]
149        {
150            toc.segment_catalog.lex_enabled = true;
151        }
152        file.set_len(header.footer_offset)?;
153        HeaderCodec::write(&mut file, &header)?;
154
155        let wal = EmbeddedWal::open(&file, &header)?;
156        let data_end = header.footer_offset;
157        #[cfg(feature = "lex")]
158        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
159        #[cfg(feature = "parallel_segments")]
160        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
161        #[cfg(feature = "parallel_segments")]
162        let manifest_wal_entries = manifest_wal.replay()?;
163
164        let mut memvid = Self {
165            file,
166            path: path_ref.to_path_buf(),
167            lock,
168            read_only: false,
169            header,
170            toc,
171            wal,
172            data_end,
173            generation: 0,
174            lock_settings: LockSettings::default(),
175            lex_enabled: cfg!(feature = "lex"), // Enable by default if feature is enabled
176            lex_index: None,
177            #[cfg(feature = "lex")]
178            lex_storage,
179            vec_enabled: cfg!(feature = "vec"), // Enable by default if feature is enabled
180            vec_compression: VectorCompression::None,
181            vec_index: None,
182            clip_enabled: cfg!(feature = "clip"), // Enable by default if feature is enabled
183            clip_index: None,
184            dirty: false,
185            #[cfg(feature = "lex")]
186            tantivy: None,
187            #[cfg(feature = "lex")]
188            tantivy_dirty: false,
189            #[cfg(feature = "temporal_track")]
190            temporal_track: None,
191            #[cfg(feature = "parallel_segments")]
192            manifest_wal: Some(manifest_wal),
193            memories_track: MemoriesTrack::new(),
194            logic_mesh: LogicMesh::new(),
195        };
196
197        #[cfg(feature = "lex")]
198        memvid.init_tantivy()?;
199
200        #[cfg(feature = "parallel_segments")]
201        memvid.load_manifest_segments(manifest_wal_entries);
202
203        memvid.bootstrap_segment_catalog();
204
205        // Create empty manifests for enabled indexes so they persist across open/close
206        let empty_offset = memvid.data_end;
207        let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
208                                \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
209
210        #[cfg(feature = "lex")]
211        if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
212            memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
213                doc_count: 0,
214                generation: 0,
215                bytes_offset: empty_offset,
216                bytes_length: 0,
217                checksum: empty_checksum,
218            });
219        }
220
221        #[cfg(feature = "vec")]
222        if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
223            memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
224                vector_count: 0,
225                dimension: 0,
226                bytes_offset: empty_offset,
227                bytes_length: 0,
228                checksum: empty_checksum,
229                compression_mode: memvid.vec_compression.clone(),
230            });
231        }
232
233        memvid.rewrite_toc_footer()?;
234        memvid.header.toc_checksum = memvid.toc.toc_checksum;
235        crate::persist_header(&mut memvid.file, &memvid.header)?;
236        memvid.file.sync_all()?;
237        Ok(memvid)
238    }
239
240    pub fn lock_settings(&self) -> &LockSettings {
241        &self.lock_settings
242    }
243
244    pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
245        &mut self.lock_settings
246    }
247
248    /// Set the vector compression mode for this memory
249    /// Must be called before ingesting documents with embeddings
250    pub fn set_vector_compression(&mut self, compression: VectorCompression) {
251        self.vec_compression = compression;
252    }
253
254    /// Get the current vector compression mode
255    pub fn vector_compression(&self) -> &VectorCompression {
256        &self.vec_compression
257    }
258
259    fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
260        let mut header = HeaderCodec::read(&mut file)?;
261        let toc = match read_toc(&mut file, &header) {
262            Ok(toc) => toc,
263            Err(err @ MemvidError::Decode(_)) | Err(err @ MemvidError::InvalidToc { .. }) => {
264                tracing::info!("toc decode failed ({}); attempting recovery", err);
265                let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
266                if recovered_offset != header.footer_offset
267                    || header.toc_checksum != toc.toc_checksum
268                {
269                    header.footer_offset = recovered_offset;
270                    header.toc_checksum = toc.toc_checksum;
271                    crate::persist_header(&mut file, &header)?;
272                }
273                toc
274            }
275            Err(err) => return Err(err),
276        };
277        let checksum_result = toc.verify_checksum();
278
279        // Validate segment integrity early to catch corruption before loading indexes
280        let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
281        if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
282            tracing::warn!("Segment integrity validation failed: {}", e);
283            // Don't fail file open - let doctor handle it
284            // This is just an early warning system
285        }
286        ensure_non_overlapping_frames(&toc, file_len)?;
287
288        let wal = EmbeddedWal::open(&file, &header)?;
289        #[cfg(feature = "lex")]
290        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
291            toc.indexes.lex.as_ref(),
292            &toc.indexes.lex_segments,
293        )));
294        #[cfg(feature = "parallel_segments")]
295        let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
296        #[cfg(feature = "parallel_segments")]
297        let manifest_wal_entries = manifest_wal.replay()?;
298
299        let generation = detect_generation(&file)?.unwrap_or(0);
300        let read_only = lock.mode() == LockMode::Shared;
301
302        let mut memvid = Self {
303            file,
304            path: path_ref.to_path_buf(),
305            lock,
306            read_only,
307            header,
308            toc,
309            wal,
310            data_end: 0,
311            generation,
312            lock_settings: LockSettings::default(),
313            lex_enabled: false,
314            lex_index: None,
315            #[cfg(feature = "lex")]
316            lex_storage,
317            vec_enabled: false,
318            vec_compression: VectorCompression::None,
319            vec_index: None,
320            clip_enabled: false,
321            clip_index: None,
322            dirty: false,
323            #[cfg(feature = "lex")]
324            tantivy: None,
325            #[cfg(feature = "lex")]
326            tantivy_dirty: false,
327            #[cfg(feature = "temporal_track")]
328            temporal_track: None,
329            #[cfg(feature = "parallel_segments")]
330            manifest_wal: Some(manifest_wal),
331            memories_track: MemoriesTrack::new(),
332            logic_mesh: LogicMesh::new(),
333        };
334        memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
335        // Use consolidated helper for lex_enabled check
336        memvid.lex_enabled = has_lex_index(&memvid.toc);
337        if memvid.lex_enabled {
338            memvid.load_lex_index_from_manifest()?;
339        }
340        #[cfg(feature = "lex")]
341        {
342            memvid.init_tantivy()?;
343        }
344        memvid.vec_enabled =
345            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
346        if memvid.vec_enabled {
347            memvid.load_vec_index_from_manifest()?;
348        }
349        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
350        if memvid.clip_enabled {
351            memvid.load_clip_index_from_manifest()?;
352        }
353        memvid.recover_wal()?;
354        #[cfg(feature = "parallel_segments")]
355        memvid.load_manifest_segments(manifest_wal_entries);
356        memvid.bootstrap_segment_catalog();
357        #[cfg(feature = "temporal_track")]
358        memvid.ensure_temporal_track_loaded()?;
359        memvid.load_memories_track()?;
360        memvid.load_logic_mesh()?;
361        if checksum_result.is_err() {
362            memvid.toc.verify_checksum()?;
363            if memvid.toc.toc_checksum != memvid.header.toc_checksum {
364                memvid.header.toc_checksum = memvid.toc.toc_checksum;
365                crate::persist_header(&mut memvid.file, &memvid.header)?;
366                memvid.file.sync_all()?;
367            }
368        }
369        Ok(memvid)
370    }
371
372    /// Open an existing `.mv2` with exclusive access, performing recovery if needed.
373    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
374        let path_ref = path.as_ref();
375        ensure_single_file(path_ref)?;
376
377        let (file, lock) = FileLock::open_and_lock(path_ref)?;
378        Self::open_locked(file, lock, path_ref)
379    }
380
381    pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
382        Self::open_read_only_with_options(path, OpenReadOptions::default())
383    }
384
385    pub fn open_read_only_with_options<P: AsRef<Path>>(
386        path: P,
387        options: OpenReadOptions,
388    ) -> Result<Self> {
389        let path_ref = path.as_ref();
390        ensure_single_file(path_ref)?;
391
392        if options.allow_repair {
393            return Self::open(path_ref);
394        }
395
396        Self::open_read_only_snapshot(path_ref)
397    }
398
399    fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
400        let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
401        let TailSnapshot {
402            toc,
403            footer_offset,
404            data_end,
405            generation,
406        } = load_tail_snapshot(&file)?;
407
408        let mut header = HeaderCodec::read(&mut file)?;
409        header.footer_offset = footer_offset;
410        header.toc_checksum = toc.toc_checksum;
411
412        let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
413        let wal = EmbeddedWal::open_read_only(&file, &header)?;
414
415        #[cfg(feature = "lex")]
416        let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
417            toc.indexes.lex.as_ref(),
418            &toc.indexes.lex_segments,
419        )));
420
421        let mut memvid = Self {
422            file,
423            path: path_ref.to_path_buf(),
424            lock,
425            read_only: true,
426            header,
427            toc,
428            wal,
429            data_end,
430            generation,
431            lock_settings: LockSettings::default(),
432            lex_enabled: false,
433            lex_index: None,
434            #[cfg(feature = "lex")]
435            lex_storage,
436            vec_enabled: false,
437            vec_compression: VectorCompression::None,
438            vec_index: None,
439            clip_enabled: false,
440            clip_index: None,
441            dirty: false,
442            #[cfg(feature = "lex")]
443            tantivy: None,
444            #[cfg(feature = "lex")]
445            tantivy_dirty: false,
446            #[cfg(feature = "temporal_track")]
447            temporal_track: None,
448            #[cfg(feature = "parallel_segments")]
449            manifest_wal: None,
450            memories_track: MemoriesTrack::new(),
451            logic_mesh: LogicMesh::new(),
452        };
453
454        // Use consolidated helper for lex_enabled check
455        memvid.lex_enabled = has_lex_index(&memvid.toc);
456        if memvid.lex_enabled {
457            memvid.load_lex_index_from_manifest()?;
458        }
459        #[cfg(feature = "lex")]
460        memvid.init_tantivy()?;
461
462        memvid.vec_enabled =
463            memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
464        if memvid.vec_enabled {
465            memvid.load_vec_index_from_manifest()?;
466        }
467        memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
468        if memvid.clip_enabled {
469            memvid.load_clip_index_from_manifest()?;
470        }
471        // Load Logic-Mesh if present
472        memvid.load_logic_mesh()?;
473
474        memvid.bootstrap_segment_catalog();
475        #[cfg(feature = "temporal_track")]
476        memvid.ensure_temporal_track_loaded()?;
477
478        Ok(memvid)
479    }
480
481    pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
482        let path_ref = path.as_ref();
483        ensure_single_file(path_ref)?;
484
485        let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
486        let lock = match FileLock::try_acquire(&file, path_ref)? {
487            Some(lock) => lock,
488            None => {
489                return Err(MemvidError::Lock(
490                    "exclusive access unavailable for doctor".to_string(),
491                ));
492            }
493        };
494        Self::open_locked(file, lock, path_ref)
495    }
496
497    fn bootstrap_segment_catalog(&mut self) {
498        let catalog = &mut self.toc.segment_catalog;
499        if catalog.version == 0 {
500            catalog.version = 1;
501        }
502        if catalog.next_segment_id == 0 {
503            let mut max_id = 0u64;
504            for descriptor in &catalog.lex_segments {
505                max_id = max_id.max(descriptor.common.segment_id);
506            }
507            for descriptor in &catalog.vec_segments {
508                max_id = max_id.max(descriptor.common.segment_id);
509            }
510            for descriptor in &catalog.time_segments {
511                max_id = max_id.max(descriptor.common.segment_id);
512            }
513            #[cfg(feature = "temporal_track")]
514            for descriptor in &catalog.temporal_segments {
515                max_id = max_id.max(descriptor.common.segment_id);
516            }
517            #[cfg(feature = "parallel_segments")]
518            for descriptor in &catalog.index_segments {
519                max_id = max_id.max(descriptor.common.segment_id);
520            }
521            if max_id > 0 {
522                catalog.next_segment_id = max_id.saturating_add(1);
523            }
524        }
525    }
526
527    #[cfg(feature = "parallel_segments")]
528    fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
529        if entries.is_empty() {
530            return;
531        }
532        for entry in entries {
533            let duplicate = self
534                .toc
535                .segment_catalog
536                .index_segments
537                .iter()
538                .any(|existing| existing.common.segment_id == entry.common.segment_id);
539            if !duplicate {
540                self.toc.segment_catalog.index_segments.push(entry);
541            }
542        }
543    }
544
545    /// Load the memories track from the manifest if present.
546    fn load_memories_track(&mut self) -> Result<()> {
547        let manifest = match &self.toc.memories_track {
548            Some(m) => m,
549            None => return Ok(()),
550        };
551
552        // Read the compressed data from the file
553        let mut buf = vec![0u8; manifest.bytes_length as usize];
554        self.file
555            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
556        self.file.read_exact(&mut buf)?;
557
558        // Verify checksum
559        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
560        if actual_checksum != manifest.checksum {
561            return Err(MemvidError::InvalidToc {
562                reason: "memories track checksum mismatch".into(),
563            });
564        }
565
566        // Deserialize the memories track
567        self.memories_track = MemoriesTrack::deserialize(&buf)?;
568
569        Ok(())
570    }
571
572    /// Load the Logic-Mesh from the manifest if present.
573    fn load_logic_mesh(&mut self) -> Result<()> {
574        let manifest = match &self.toc.logic_mesh {
575            Some(m) => m,
576            None => return Ok(()),
577        };
578
579        // Read the serialized data from the file
580        let mut buf = vec![0u8; manifest.bytes_length as usize];
581        self.file
582            .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
583        self.file.read_exact(&mut buf)?;
584
585        // Verify checksum
586        let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
587        if actual_checksum != manifest.checksum {
588            return Err(MemvidError::InvalidToc {
589                reason: "logic mesh checksum mismatch".into(),
590            });
591        }
592
593        // Deserialize the logic mesh
594        self.logic_mesh = LogicMesh::deserialize(&buf)?;
595
596        Ok(())
597    }
598
599    #[cfg(feature = "temporal_track")]
600    pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
601        if self.temporal_track.is_some() {
602            return Ok(());
603        }
604        let manifest = match &self.toc.temporal_track {
605            Some(manifest) => manifest.clone(),
606            None => return Ok(()),
607        };
608        if manifest.bytes_length == 0 {
609            return Ok(());
610        }
611        let file_len = self.file.metadata()?.len();
612        let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
613            return Ok(());
614        };
615        if end > file_len {
616            return Ok(());
617        }
618        match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
619            Ok(track) => self.temporal_track = Some(track),
620            Err(MemvidError::InvalidTemporalTrack { .. }) => {
621                return Ok(());
622            }
623            Err(err) => return Err(err),
624        }
625        Ok(())
626    }
627
628    #[cfg(feature = "temporal_track")]
629    pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
630        self.ensure_temporal_track_loaded()?;
631        Ok(self.temporal_track.as_ref())
632    }
633
634    #[cfg(feature = "temporal_track")]
635    pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
636        self.ensure_temporal_track_loaded()?;
637        let Some(track) = self.temporal_track.as_ref() else {
638            return Ok(None);
639        };
640        if !track.capabilities().has_anchors {
641            return Ok(None);
642        }
643        Ok(track
644            .anchor_for_frame(frame_id)
645            .map(|anchor| anchor.anchor_ts))
646    }
647
648    #[cfg(feature = "temporal_track")]
649    pub(crate) fn clear_temporal_track_cache(&mut self) {
650        self.temporal_track = None;
651    }
652
653    #[cfg(feature = "temporal_track")]
654    pub(crate) fn effective_temporal_timestamp(
655        &mut self,
656        frame_id: FrameId,
657        fallback: i64,
658    ) -> Result<i64> {
659        Ok(self
660            .temporal_anchor_timestamp(frame_id)?
661            .unwrap_or(fallback))
662    }
663
664    #[cfg(not(feature = "temporal_track"))]
665    pub(crate) fn effective_temporal_timestamp(
666        &mut self,
667        _frame_id: crate::types::FrameId,
668        fallback: i64,
669    ) -> Result<i64> {
670        Ok(fallback)
671    }
672
673    /// Get current memory binding information.
674    ///
675    /// Returns the binding if this file is bound to a dashboard memory,
676    /// or None if unbound.
677    pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
678        self.toc.memory_binding.as_ref()
679    }
680
681    /// Bind this file to a dashboard memory.
682    ///
683    /// This stores the binding in the TOC and applies the ticket for capacity.
684    /// The caller (CLI/bindings) is responsible for fetching the ticket from the API.
685    ///
686    /// # Errors
687    ///
688    /// Returns `MemoryAlreadyBound` if this file is already bound to a different memory.
689    pub fn bind_memory(
690        &mut self,
691        binding: crate::types::MemoryBinding,
692        ticket: crate::types::Ticket,
693    ) -> Result<()> {
694        // Check existing binding
695        if let Some(existing) = self.get_memory_binding() {
696            if existing.memory_id != binding.memory_id {
697                return Err(MemvidError::MemoryAlreadyBound {
698                    existing_memory_id: existing.memory_id,
699                    existing_memory_name: existing.memory_name.clone(),
700                    bound_at: existing.bound_at.to_rfc3339(),
701                });
702            }
703        }
704
705        // Apply ticket for capacity
706        self.apply_ticket(ticket)?;
707
708        // Store binding in TOC
709        self.toc.memory_binding = Some(binding);
710        self.dirty = true;
711
712        Ok(())
713    }
714
715    /// Unbind this file from its dashboard memory.
716    ///
717    /// This clears the binding and reverts to free tier capacity (1 GB).
718    pub fn unbind_memory(&mut self) -> Result<()> {
719        self.toc.memory_binding = None;
720        // Revert to free tier
721        self.toc.ticket_ref = crate::types::TicketRef {
722            issuer: "free-tier".into(),
723            seq_no: 1,
724            expires_in_secs: 0,
725            capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
726        };
727        self.dirty = true;
728        Ok(())
729    }
730}
731
732pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
733    use crate::footer::{CommitFooter, FOOTER_SIZE};
734
735    let len = file.metadata()?.len();
736    if len < header.footer_offset {
737        return Err(MemvidError::InvalidToc {
738            reason: "footer offset beyond file length".into(),
739        });
740    }
741
742    // Read the entire region from footer_offset to EOF (includes TOC + footer)
743    file.seek(SeekFrom::Start(header.footer_offset))?;
744    let total_size = (len - header.footer_offset) as usize;
745
746    if total_size < FOOTER_SIZE {
747        return Err(MemvidError::InvalidToc {
748            reason: "region too small to contain footer".into(),
749        });
750    }
751
752    let mut buf = Vec::with_capacity(total_size);
753    file.read_to_end(&mut buf)?;
754
755    // Parse the footer (last FOOTER_SIZE bytes)
756    let footer_start = buf.len() - FOOTER_SIZE;
757    let footer_bytes = &buf[footer_start..];
758    let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
759        reason: "failed to decode commit footer".into(),
760    })?;
761
762    // Extract only the TOC bytes (excluding the footer)
763    let toc_bytes = &buf[..footer_start];
764    if toc_bytes.len() != footer.toc_len as usize {
765        return Err(MemvidError::InvalidToc {
766            reason: "toc length mismatch".into(),
767        });
768    }
769
770    verify_toc_prefix(toc_bytes)?;
771    let toc = Toc::decode(toc_bytes)?;
772    Ok(toc)
773}
774
775fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
776    const MAX_SEGMENTS: u64 = 1_000_000;
777    const MAX_FRAMES: u64 = 1_000_000;
778    const MIN_SEGMENT_META_BYTES: u64 = 32;
779    const MIN_FRAME_BYTES: u64 = 64;
780    // TOC trailer layout (little-endian):
781    // [toc_version:u64][segments_len:u64][frames_len:u64]...
782    let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
783        let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
784            reason: context.to_string().into(),
785        })?;
786        let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
787            reason: context.to_string().into(),
788        })?;
789        Ok(u64::from_le_bytes(array))
790    };
791
792    if bytes.len() < 24 {
793        return Err(MemvidError::InvalidToc {
794            reason: "toc trailer too small".into(),
795        });
796    }
797    let toc_version = read_u64(0..8, "toc version missing or truncated")?;
798    if toc_version > 32 {
799        return Err(MemvidError::InvalidToc {
800            reason: "toc version unreasonable".into(),
801        });
802    }
803    let segments_len = read_u64(8..16, "segment count missing or truncated")?;
804    if segments_len > MAX_SEGMENTS {
805        return Err(MemvidError::InvalidToc {
806            reason: "segment count unreasonable".into(),
807        });
808    }
809    let frames_len = read_u64(16..24, "frame count missing or truncated")?;
810    if frames_len > MAX_FRAMES {
811        return Err(MemvidError::InvalidToc {
812            reason: "frame count unreasonable".into(),
813        });
814    }
815    let required = segments_len
816        .saturating_mul(MIN_SEGMENT_META_BYTES)
817        .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
818    if required > bytes.len() as u64 {
819        return Err(MemvidError::InvalidToc {
820            reason: "toc payload inconsistent with counts".into(),
821        });
822    }
823    Ok(())
824}
825
826/// Ensure frame payloads are ordered and do not overlap the file boundary.
827fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
828    let mut previous_end = 0u64;
829    for frame in toc
830        .frames
831        .iter()
832        .filter(|f| f.status == FrameStatus::Active)
833    {
834        let end = frame
835            .payload_offset
836            .checked_add(frame.payload_length)
837            .ok_or_else(|| MemvidError::InvalidToc {
838                reason: "frame payload offsets overflow".into(),
839            })?;
840        if end > file_len {
841            return Err(MemvidError::InvalidToc {
842                reason: "frame payload exceeds file length".into(),
843            });
844        }
845        if frame.payload_offset < previous_end {
846            return Err(MemvidError::InvalidToc {
847                reason: "frame payloads overlap or are out of order".into(),
848            });
849        }
850        previous_end = end;
851    }
852    Ok(())
853}
854
855pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
856    use crate::footer::find_last_valid_footer;
857
858    let len = file.metadata()?.len();
859    file.seek(SeekFrom::Start(0))?;
860    let mut data = Vec::new();
861    file.read_to_end(&mut data)?;
862    tracing::debug!(file_len = len, "attempting full scan toc recovery");
863
864    // First, try to find a valid footer which includes validated TOC bytes
865    if let Some(footer_slice) = find_last_valid_footer(&data) {
866        tracing::debug!(
867            footer_offset = footer_slice.footer_offset,
868            toc_offset = footer_slice.toc_offset,
869            toc_len = footer_slice.toc_bytes.len(),
870            "found valid footer during recovery"
871        );
872        // The footer has already validated the TOC hash, so we can directly decode it
873        match Toc::decode(footer_slice.toc_bytes) {
874            Ok(toc) => {
875                return Ok((toc, footer_slice.toc_offset as u64));
876            }
877            Err(err) => {
878                tracing::warn!(
879                    error = %err,
880                    "footer-validated TOC failed to decode, falling back to scan"
881                );
882            }
883        }
884    }
885
886    // Fallback to manual scan if footer-based recovery failed
887    let mut ranges = Vec::new();
888    if let Some(hint_offset) = hint {
889        let hint_idx = hint_offset.min(len) as usize;
890        ranges.push((hint_idx, data.len()));
891        if hint_idx > 0 {
892            ranges.push((0, hint_idx));
893        }
894    } else {
895        ranges.push((0, data.len()));
896    }
897
898    for (start, end) in ranges {
899        if let Some(found) = scan_range_for_toc(&data, start, end) {
900            return Ok(found);
901        }
902    }
903
904    Err(MemvidError::InvalidToc {
905        reason: "unable to recover table of contents from file trailer".into(),
906    })
907}
908
909fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
910    use crate::footer::find_last_valid_footer;
911
912    if start >= end || end > data.len() {
913        return None;
914    }
915    const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
916    const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
917    for offset in (start..end).rev() {
918        let slice = &data[offset..];
919        if slice.len() < 16 {
920            continue;
921        }
922        if slice.len() > MAX_TOC_BYTES {
923            continue;
924        }
925
926        // First check if this slice has a footer at the end
927        // If so, use the footer-validated TOC bytes
928        if let Some(footer_slice) = find_last_valid_footer(slice) {
929            // Footer found and validated - use its TOC bytes
930            if verify_toc_prefix(footer_slice.toc_bytes).is_ok() {
931                let attempt = panic::catch_unwind(|| Toc::decode(footer_slice.toc_bytes));
932                if let Ok(Ok(toc)) = attempt {
933                    let recovered_offset = (offset + footer_slice.toc_offset) as u64;
934                    tracing::debug!(
935                        recovered_offset,
936                        recovered_frames = toc.frames.len(),
937                        "recovered toc via scan with footer"
938                    );
939                    return Some((toc, recovered_offset));
940                }
941            }
942            continue;
943        }
944
945        // No footer found - try old format with checksum
946        if slice.len() < ZERO_CHECKSUM.len() {
947            continue;
948        }
949        let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
950        let mut hasher = Hasher::new();
951        hasher.update(body);
952        hasher.update(&ZERO_CHECKSUM);
953        if hasher.finalize().as_bytes() != stored_checksum {
954            continue;
955        }
956        if verify_toc_prefix(slice).is_err() {
957            continue;
958        }
959        let attempt = panic::catch_unwind(|| Toc::decode(slice));
960        if let Ok(Ok(toc)) = attempt {
961            let recovered_offset = offset as u64;
962            tracing::debug!(
963                recovered_offset,
964                recovered_frames = toc.frames.len(),
965                "recovered toc via scan"
966            );
967            return Some((toc, recovered_offset));
968        }
969    }
970    None
971}
972
973pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
974    toc.toc_checksum = [0u8; 32];
975    let bytes = toc.encode()?;
976    let checksum = Toc::calculate_checksum(&bytes);
977    toc.toc_checksum = checksum;
978    toc.encode()
979}
980
981pub(crate) fn empty_toc() -> Toc {
982    Toc {
983        toc_version: 0,
984        segments: Vec::new(),
985        frames: Vec::new(),
986        indexes: IndexManifests::default(),
987        time_index: None,
988        temporal_track: None,
989        memories_track: None,
990        logic_mesh: None,
991        segment_catalog: SegmentCatalog::default(),
992        ticket_ref: TicketRef {
993            issuer: "free-tier".into(),
994            seq_no: 1,
995            expires_in_secs: 0,
996            capacity_bytes: Tier::Free.capacity_bytes(),
997        },
998        memory_binding: None,
999        merkle_root: [0u8; 32],
1000        toc_checksum: [0u8; 32],
1001    }
1002}
1003
1004pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1005    // data_end tracks the end of all data bytes (frames + segments)
1006    let wal_region_end = header.wal_offset + header.wal_size;
1007
1008    let active_frames_with_payload: Vec<_> = toc
1009        .frames
1010        .iter()
1011        .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1012        .collect();
1013
1014    tracing::info!(
1015        "compute_data_end: found {} active frames with payloads out of {} total frames",
1016        active_frames_with_payload.len(),
1017        toc.frames.len()
1018    );
1019
1020    for (idx, frame) in active_frames_with_payload.iter().enumerate().take(5) {
1021        tracing::info!(
1022            "  frame[{}]: id={} offset={} length={} end={}",
1023            idx,
1024            frame.id,
1025            frame.payload_offset,
1026            frame.payload_length,
1027            frame.payload_offset + frame.payload_length
1028        );
1029    }
1030
1031    let payload_end = active_frames_with_payload
1032        .iter()
1033        .filter_map(|f| f.payload_offset.checked_add(f.payload_length))
1034        .max()
1035        .unwrap_or(wal_region_end);
1036
1037    // Also consider segment catalog entries (lex, vec, time segments)
1038    let catalog = &toc.segment_catalog;
1039
1040    let lex_segment_end = catalog
1041        .lex_segments
1042        .iter()
1043        .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1044        .max()
1045        .unwrap_or(0);
1046
1047    let vec_segment_end = catalog
1048        .vec_segments
1049        .iter()
1050        .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1051        .max()
1052        .unwrap_or(0);
1053
1054    let time_segment_end = catalog
1055        .time_segments
1056        .iter()
1057        .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1058        .max()
1059        .unwrap_or(0);
1060
1061    let segment_end = lex_segment_end.max(vec_segment_end).max(time_segment_end);
1062
1063    let data_boundary = payload_end.max(segment_end).max(wal_region_end);
1064
1065    tracing::info!(
1066        "compute_data_end: wal_region_end={} payload_end={} segment_end={} footer_offset={} returning {}",
1067        wal_region_end,
1068        payload_end,
1069        segment_end,
1070        header.footer_offset,
1071        data_boundary
1072    );
1073
1074    data_boundary
1075}
1076
1077struct TailSnapshot {
1078    toc: Toc,
1079    footer_offset: u64,
1080    data_end: u64,
1081    generation: u64,
1082}
1083
1084fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1085    const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1086    if mmap.is_empty() {
1087        return None;
1088    }
1089    let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1090    loop {
1091        let start = mmap.len() - window;
1092        if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1093            return Some((slice, start));
1094        }
1095        if window == mmap.len() {
1096            break;
1097        }
1098        window = (window * 2).min(mmap.len());
1099    }
1100    None
1101}
1102
1103fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1104    // Safety: we only create a read-only mapping over the stable file bytes.
1105    let mmap = unsafe { Mmap::map(file)? };
1106
1107    let (slice, offset_adjustment) =
1108        locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1109            reason: "no valid commit footer found".into(),
1110        })?;
1111    let toc = Toc::decode(slice.toc_bytes)?;
1112    toc.verify_checksum()?;
1113
1114    Ok(TailSnapshot {
1115        toc,
1116        footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1117        // Using toc_offset causes stale data_end that moves footer backwards on next commit
1118        data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1119        generation: slice.footer.generation,
1120    })
1121}
1122
1123fn detect_generation(file: &File) -> Result<Option<u64>> {
1124    // Safety: read-only mapping for footer inspection.
1125    let mmap = unsafe { Mmap::map(file)? };
1126
1127    Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1128}
1129
1130pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1131    if let Some(parent) = path.parent() {
1132        let name = path
1133            .file_name()
1134            .and_then(|n| n.to_str())
1135            .unwrap_or_default();
1136        let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1137        for suffix in forbidden {
1138            let candidate = parent.join(format!("{name}{suffix}"));
1139            if candidate.exists() {
1140                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1141            }
1142        }
1143        let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1144        for suffix in hidden_forbidden {
1145            let candidate = parent.join(format!(".{name}{suffix}"));
1146            if candidate.exists() {
1147                return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1148            }
1149        }
1150    }
1151    Ok(())
1152}
1153
1154#[cfg(feature = "parallel_segments")]
1155fn manifest_wal_path(path: &Path) -> PathBuf {
1156    let mut wal_path = path.to_path_buf();
1157    wal_path.set_extension("manifest.wal");
1158    wal_path
1159}
1160
1161#[cfg(feature = "parallel_segments")]
1162pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1163    let wal_path = manifest_wal_path(path);
1164    if wal_path.exists() {
1165        let _ = std::fs::remove_file(&wal_path);
1166    }
1167}
1168
1169/// Single source of truth: does this TOC have a lexical index?
1170/// Checks all possible locations: old manifest, lex_segments, and tantivy_segments.
1171pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1172    toc.segment_catalog.lex_enabled
1173        || toc.indexes.lex.is_some()
1174        || !toc.indexes.lex_segments.is_empty()
1175        || !toc.segment_catalog.tantivy_segments.is_empty()
1176}
1177
1178/// Single source of truth: expected document count for lex index.
1179/// Returns None if we can't determine (e.g., Tantivy segments without manifest).
1180#[cfg(feature = "lex")]
1181pub(crate) fn lex_doc_count(
1182    toc: &Toc,
1183    lex_storage: &crate::search::EmbeddedLexStorage,
1184) -> Option<u64> {
1185    // First try old manifest
1186    if let Some(manifest) = &toc.indexes.lex {
1187        if manifest.doc_count > 0 {
1188            return Some(manifest.doc_count);
1189        }
1190    }
1191
1192    // Then try lex_storage (contains info from lex_segments)
1193    let storage_count = lex_storage.doc_count();
1194    if storage_count > 0 {
1195        return Some(storage_count);
1196    }
1197
1198    // For Tantivy files with segments but no manifest/storage doc_count,
1199    // we can't know doc count without loading the index.
1200    // Return None and let caller decide (init_tantivy should trust segments exist)
1201    None
1202}
1203
1204/// Validates segment integrity on file open to catch corruption early.
1205/// This helps doctor by detecting issues before they cause problems.
1206#[allow(dead_code)]
1207fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1208    let data_limit = header.footer_offset;
1209
1210    // Validate Tantivy segments
1211    for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1212        let offset = seg.common.bytes_offset;
1213        let length = seg.common.bytes_length;
1214
1215        if length == 0 {
1216            continue; // Empty segments are okay
1217        }
1218
1219        let end = offset
1220            .checked_add(length)
1221            .ok_or_else(|| MemvidError::Doctor {
1222                reason: format!(
1223                    "Tantivy segment {} offset overflow: {} + {}",
1224                    idx, offset, length
1225                ),
1226            })?;
1227
1228        if end > file_len || end > data_limit {
1229            return Err(MemvidError::Doctor {
1230                reason: format!(
1231                    "Tantivy segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1232                    idx, offset, length, end, file_len, data_limit
1233                ),
1234            });
1235        }
1236    }
1237
1238    // Validate time index segments
1239    for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1240        let offset = seg.common.bytes_offset;
1241        let length = seg.common.bytes_length;
1242
1243        if length == 0 {
1244            continue;
1245        }
1246
1247        let end = offset
1248            .checked_add(length)
1249            .ok_or_else(|| MemvidError::Doctor {
1250                reason: format!(
1251                    "Time segment {} offset overflow: {} + {}",
1252                    idx, offset, length
1253                ),
1254            })?;
1255
1256        if end > file_len || end > data_limit {
1257            return Err(MemvidError::Doctor {
1258                reason: format!(
1259                    "Time segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1260                    idx, offset, length, end, file_len, data_limit
1261                ),
1262            });
1263        }
1264    }
1265
1266    // Validate vec segments
1267    for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1268        let offset = seg.common.bytes_offset;
1269        let length = seg.common.bytes_length;
1270
1271        if length == 0 {
1272            continue;
1273        }
1274
1275        let end = offset
1276            .checked_add(length)
1277            .ok_or_else(|| MemvidError::Doctor {
1278                reason: format!(
1279                    "Vec segment {} offset overflow: {} + {}",
1280                    idx, offset, length
1281                ),
1282            })?;
1283
1284        if end > file_len || end > data_limit {
1285            return Err(MemvidError::Doctor {
1286                reason: format!(
1287                    "Vec segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1288                    idx, offset, length, end, file_len, data_limit
1289                ),
1290            });
1291        }
1292    }
1293
1294    log::debug!("✓ Segment integrity validation passed");
1295    Ok(())
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use tempfile::tempdir;
1302
1303    #[test]
1304    fn toc_prefix_underflow_surfaces_reason() {
1305        let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1306        match err {
1307            MemvidError::InvalidToc { reason } => {
1308                assert!(
1309                    reason.contains("trailer too small"),
1310                    "unexpected reason: {reason}"
1311                );
1312            }
1313            other => panic!("unexpected error: {other:?}"),
1314        }
1315    }
1316
1317    #[test]
1318    fn ensure_single_file_blocks_sidecars() {
1319        let dir = tempdir().expect("tmp");
1320        let path = dir.path().join("mem.mv2");
1321        std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1322        let result = Memvid::create(&path);
1323        assert!(matches!(
1324            result,
1325            Err(MemvidError::AuxiliaryFileDetected { .. })
1326        ));
1327    }
1328}