1use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_MEDIUM};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31 FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, SchemaRegistry, SegmentCatalog,
32 TicketRef, Tier, Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44pub struct Memvid {
49 pub(crate) file: File,
50 pub(crate) path: PathBuf,
51 pub(crate) lock: FileLock,
52 pub(crate) read_only: bool,
53 pub(crate) header: Header,
54 pub(crate) toc: Toc,
55 pub(crate) wal: EmbeddedWal,
56 pub(crate) pending_frame_inserts: u64,
60 pub(crate) data_end: u64,
61 pub(crate) generation: u64,
62 pub(crate) lock_settings: LockSettings,
63 pub(crate) lex_enabled: bool,
64 pub(crate) lex_index: Option<LexIndex>,
65 #[cfg(feature = "lex")]
66 #[allow(dead_code)]
67 pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
68 pub(crate) vec_enabled: bool,
69 pub(crate) vec_compression: VectorCompression,
70 pub(crate) vec_index: Option<VecIndex>,
71 pub(crate) clip_enabled: bool,
73 pub(crate) clip_index: Option<crate::clip::ClipIndex>,
74 pub(crate) dirty: bool,
75 #[cfg(feature = "lex")]
76 pub(crate) tantivy: Option<TantivyEngine>,
77 #[cfg(feature = "lex")]
78 pub(crate) tantivy_dirty: bool,
79 #[cfg(feature = "temporal_track")]
80 pub(crate) temporal_track: Option<TemporalTrack>,
81 #[cfg(feature = "parallel_segments")]
82 pub(crate) manifest_wal: Option<ManifestWal>,
83 pub(crate) memories_track: MemoriesTrack,
85 pub(crate) logic_mesh: LogicMesh,
87 pub(crate) schema_registry: SchemaRegistry,
89 pub(crate) schema_strict: bool,
91 #[cfg(feature = "replay")]
93 pub(crate) active_session: Option<crate::replay::ActiveSession>,
94 #[cfg(feature = "replay")]
96 pub(crate) completed_sessions: Vec<crate::replay::ReplaySession>,
97}
98
99#[derive(Debug, Clone, Copy)]
101pub struct OpenReadOptions {
102 pub allow_repair: bool,
103}
104
105#[derive(Debug, Clone)]
106pub struct LockSettings {
107 pub timeout_ms: u64,
108 pub heartbeat_ms: u64,
109 pub stale_grace_ms: u64,
110 pub force_stale: bool,
111 pub command: Option<String>,
112}
113
114impl Default for LockSettings {
115 fn default() -> Self {
116 Self {
117 timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
118 heartbeat_ms: DEFAULT_HEARTBEAT_MS,
119 stale_grace_ms: DEFAULT_STALE_GRACE_MS,
120 force_stale: false,
121 command: None,
122 }
123 }
124}
125
126impl Default for OpenReadOptions {
127 fn default() -> Self {
128 Self {
129 allow_repair: false,
130 }
131 }
132}
133
134impl Memvid {
135 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
138 let path_ref = path.as_ref();
139 ensure_single_file(path_ref)?;
140
141 OpenOptions::new()
142 .read(true)
143 .write(true)
144 .create(true)
145 .truncate(true)
146 .open(path_ref)?;
147 let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
148
149 let header = Header {
150 magic: MAGIC,
151 version: SPEC_VERSION,
152 footer_offset: WAL_OFFSET + WAL_SIZE_MEDIUM,
153 wal_offset: WAL_OFFSET,
154 wal_size: WAL_SIZE_MEDIUM,
155 wal_checkpoint_pos: 0,
156 wal_sequence: 0,
157 toc_checksum: [0u8; 32],
158 };
159
160 let mut toc = empty_toc();
161 #[cfg(feature = "lex")]
163 {
164 toc.segment_catalog.lex_enabled = true;
165 }
166 file.set_len(header.footer_offset)?;
167 HeaderCodec::write(&mut file, &header)?;
168
169 let wal = EmbeddedWal::open(&file, &header)?;
170 let data_end = header.footer_offset;
171 #[cfg(feature = "lex")]
172 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
173 #[cfg(feature = "parallel_segments")]
174 let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
175 #[cfg(feature = "parallel_segments")]
176 let manifest_wal_entries = manifest_wal.replay()?;
177
178 let mut memvid = Self {
179 file,
180 path: path_ref.to_path_buf(),
181 lock,
182 read_only: false,
183 header,
184 toc,
185 wal,
186 pending_frame_inserts: 0,
187 data_end,
188 generation: 0,
189 lock_settings: LockSettings::default(),
190 lex_enabled: cfg!(feature = "lex"), lex_index: None,
192 #[cfg(feature = "lex")]
193 lex_storage,
194 vec_enabled: cfg!(feature = "vec"), vec_compression: VectorCompression::None,
196 vec_index: None,
197 clip_enabled: cfg!(feature = "clip"), clip_index: None,
199 dirty: false,
200 #[cfg(feature = "lex")]
201 tantivy: None,
202 #[cfg(feature = "lex")]
203 tantivy_dirty: false,
204 #[cfg(feature = "temporal_track")]
205 temporal_track: None,
206 #[cfg(feature = "parallel_segments")]
207 manifest_wal: Some(manifest_wal),
208 memories_track: MemoriesTrack::new(),
209 logic_mesh: LogicMesh::new(),
210 schema_registry: SchemaRegistry::new(),
211 schema_strict: false,
212 #[cfg(feature = "replay")]
213 active_session: None,
214 #[cfg(feature = "replay")]
215 completed_sessions: Vec::new(),
216 };
217
218 #[cfg(feature = "lex")]
219 memvid.init_tantivy()?;
220
221 #[cfg(feature = "parallel_segments")]
222 memvid.load_manifest_segments(manifest_wal_entries);
223
224 memvid.bootstrap_segment_catalog();
225
226 let empty_offset = memvid.data_end;
228 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
229 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
230
231 #[cfg(feature = "lex")]
232 if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
233 memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
234 doc_count: 0,
235 generation: 0,
236 bytes_offset: empty_offset,
237 bytes_length: 0,
238 checksum: empty_checksum,
239 });
240 }
241
242 #[cfg(feature = "vec")]
243 if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
244 memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
245 vector_count: 0,
246 dimension: 0,
247 bytes_offset: empty_offset,
248 bytes_length: 0,
249 checksum: empty_checksum,
250 compression_mode: memvid.vec_compression.clone(),
251 });
252 }
253
254 memvid.rewrite_toc_footer()?;
255 memvid.header.toc_checksum = memvid.toc.toc_checksum;
256 crate::persist_header(&mut memvid.file, &memvid.header)?;
257 memvid.file.sync_all()?;
258 Ok(memvid)
259 }
260
261 pub fn lock_settings(&self) -> &LockSettings {
262 &self.lock_settings
263 }
264
265 pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
266 &mut self.lock_settings
267 }
268
269 pub fn set_vector_compression(&mut self, compression: VectorCompression) {
272 self.vec_compression = compression;
273 }
274
275 pub fn vector_compression(&self) -> &VectorCompression {
277 &self.vec_compression
278 }
279
280 pub fn next_frame_id(&self) -> u64 {
286 (self.toc.frames.len() as u64).saturating_add(self.pending_frame_inserts)
287 }
288
289 fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
290 let mut magic = [0u8; 4];
293 let is_mv2e = file.read_exact(&mut magic).is_ok() && magic == *b"MV2E";
294 file.seek(SeekFrom::Start(0))?;
295 if is_mv2e {
296 return Err(MemvidError::EncryptedFile {
297 path: path_ref.to_path_buf(),
298 hint: format!("Run: memvid unlock {}", path_ref.display()),
299 });
300 }
301
302 let mut header = HeaderCodec::read(&mut file)?;
303 let toc = match read_toc(&mut file, &header) {
304 Ok(toc) => toc,
305 Err(err @ MemvidError::Decode(_)) | Err(err @ MemvidError::InvalidToc { .. }) => {
306 tracing::info!("toc decode failed ({}); attempting recovery", err);
307 let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
308 if recovered_offset != header.footer_offset
309 || header.toc_checksum != toc.toc_checksum
310 {
311 header.footer_offset = recovered_offset;
312 header.toc_checksum = toc.toc_checksum;
313 crate::persist_header(&mut file, &header)?;
314 }
315 toc
316 }
317 Err(err) => return Err(err),
318 };
319 let checksum_result = toc.verify_checksum();
320
321 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
323 if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
324 tracing::warn!("Segment integrity validation failed: {}", e);
325 }
328 ensure_non_overlapping_frames(&toc, file_len)?;
329
330 let wal = EmbeddedWal::open(&file, &header)?;
331 #[cfg(feature = "lex")]
332 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
333 toc.indexes.lex.as_ref(),
334 &toc.indexes.lex_segments,
335 )));
336 #[cfg(feature = "parallel_segments")]
337 let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
338 #[cfg(feature = "parallel_segments")]
339 let manifest_wal_entries = manifest_wal.replay()?;
340
341 let generation = detect_generation(&file)?.unwrap_or(0);
342 let read_only = lock.mode() == LockMode::Shared;
343
344 let mut memvid = Self {
345 file,
346 path: path_ref.to_path_buf(),
347 lock,
348 read_only,
349 header,
350 toc,
351 wal,
352 pending_frame_inserts: 0,
353 data_end: 0,
354 generation,
355 lock_settings: LockSettings::default(),
356 lex_enabled: false,
357 lex_index: None,
358 #[cfg(feature = "lex")]
359 lex_storage,
360 vec_enabled: false,
361 vec_compression: VectorCompression::None,
362 vec_index: None,
363 clip_enabled: false,
364 clip_index: None,
365 dirty: false,
366 #[cfg(feature = "lex")]
367 tantivy: None,
368 #[cfg(feature = "lex")]
369 tantivy_dirty: false,
370 #[cfg(feature = "temporal_track")]
371 temporal_track: None,
372 #[cfg(feature = "parallel_segments")]
373 manifest_wal: Some(manifest_wal),
374 memories_track: MemoriesTrack::new(),
375 logic_mesh: LogicMesh::new(),
376 schema_registry: SchemaRegistry::new(),
377 schema_strict: false,
378 #[cfg(feature = "replay")]
379 active_session: None,
380 #[cfg(feature = "replay")]
381 completed_sessions: Vec::new(),
382 };
383 memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
384 memvid.lex_enabled = has_lex_index(&memvid.toc);
386 if memvid.lex_enabled {
387 memvid.load_lex_index_from_manifest()?;
388 }
389 #[cfg(feature = "lex")]
390 {
391 memvid.init_tantivy()?;
392 }
393 memvid.vec_enabled =
394 memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
395 if memvid.vec_enabled {
396 memvid.load_vec_index_from_manifest()?;
397 }
398 memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
399 if memvid.clip_enabled {
400 memvid.load_clip_index_from_manifest()?;
401 }
402 memvid.recover_wal()?;
403 #[cfg(feature = "parallel_segments")]
404 memvid.load_manifest_segments(manifest_wal_entries);
405 memvid.bootstrap_segment_catalog();
406 #[cfg(feature = "temporal_track")]
407 memvid.ensure_temporal_track_loaded()?;
408 memvid.load_memories_track()?;
409 memvid.load_logic_mesh()?;
410 if checksum_result.is_err() {
411 memvid.toc.verify_checksum()?;
412 if memvid.toc.toc_checksum != memvid.header.toc_checksum {
413 memvid.header.toc_checksum = memvid.toc.toc_checksum;
414 crate::persist_header(&mut memvid.file, &memvid.header)?;
415 memvid.file.sync_all()?;
416 }
417 }
418 Ok(memvid)
419 }
420
421 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
423 let path_ref = path.as_ref();
424 ensure_single_file(path_ref)?;
425
426 let (file, lock) = FileLock::open_and_lock(path_ref)?;
427 Self::open_locked(file, lock, path_ref)
428 }
429
430 pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
431 Self::open_read_only_with_options(path, OpenReadOptions::default())
432 }
433
434 pub fn open_read_only_with_options<P: AsRef<Path>>(
435 path: P,
436 options: OpenReadOptions,
437 ) -> Result<Self> {
438 let path_ref = path.as_ref();
439 ensure_single_file(path_ref)?;
440
441 if options.allow_repair {
442 return Self::open(path_ref);
443 }
444
445 Self::open_read_only_snapshot(path_ref)
446 }
447
448 fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
449 let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
450 let TailSnapshot {
451 toc,
452 footer_offset,
453 data_end,
454 generation,
455 } = load_tail_snapshot(&file)?;
456
457 let mut header = HeaderCodec::read(&mut file)?;
458 header.footer_offset = footer_offset;
459 header.toc_checksum = toc.toc_checksum;
460
461 let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
462 let wal = EmbeddedWal::open_read_only(&file, &header)?;
463
464 #[cfg(feature = "lex")]
465 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
466 toc.indexes.lex.as_ref(),
467 &toc.indexes.lex_segments,
468 )));
469
470 let mut memvid = Self {
471 file,
472 path: path_ref.to_path_buf(),
473 lock,
474 read_only: true,
475 header,
476 toc,
477 wal,
478 pending_frame_inserts: 0,
479 data_end,
480 generation,
481 lock_settings: LockSettings::default(),
482 lex_enabled: false,
483 lex_index: None,
484 #[cfg(feature = "lex")]
485 lex_storage,
486 vec_enabled: false,
487 vec_compression: VectorCompression::None,
488 vec_index: None,
489 clip_enabled: false,
490 clip_index: None,
491 dirty: false,
492 #[cfg(feature = "lex")]
493 tantivy: None,
494 #[cfg(feature = "lex")]
495 tantivy_dirty: false,
496 #[cfg(feature = "temporal_track")]
497 temporal_track: None,
498 #[cfg(feature = "parallel_segments")]
499 manifest_wal: None,
500 memories_track: MemoriesTrack::new(),
501 logic_mesh: LogicMesh::new(),
502 schema_registry: SchemaRegistry::new(),
503 schema_strict: false,
504 #[cfg(feature = "replay")]
505 active_session: None,
506 #[cfg(feature = "replay")]
507 completed_sessions: Vec::new(),
508 };
509
510 memvid.lex_enabled = has_lex_index(&memvid.toc);
512 if memvid.lex_enabled {
513 memvid.load_lex_index_from_manifest()?;
514 }
515 #[cfg(feature = "lex")]
516 memvid.init_tantivy()?;
517
518 memvid.vec_enabled =
519 memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
520 if memvid.vec_enabled {
521 memvid.load_vec_index_from_manifest()?;
522 }
523 memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
524 if memvid.clip_enabled {
525 memvid.load_clip_index_from_manifest()?;
526 }
527 memvid.load_memories_track()?;
529 memvid.load_logic_mesh()?;
530
531 memvid.bootstrap_segment_catalog();
532 #[cfg(feature = "temporal_track")]
533 memvid.ensure_temporal_track_loaded()?;
534
535 Ok(memvid)
536 }
537
538 pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
539 let path_ref = path.as_ref();
540 ensure_single_file(path_ref)?;
541
542 let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
543 let lock = match FileLock::try_acquire(&file, path_ref)? {
544 Some(lock) => lock,
545 None => {
546 return Err(MemvidError::Lock(
547 "exclusive access unavailable for doctor".to_string(),
548 ));
549 }
550 };
551 Self::open_locked(file, lock, path_ref)
552 }
553
554 fn bootstrap_segment_catalog(&mut self) {
555 let catalog = &mut self.toc.segment_catalog;
556 if catalog.version == 0 {
557 catalog.version = 1;
558 }
559 if catalog.next_segment_id == 0 {
560 let mut max_id = 0u64;
561 for descriptor in &catalog.lex_segments {
562 max_id = max_id.max(descriptor.common.segment_id);
563 }
564 for descriptor in &catalog.vec_segments {
565 max_id = max_id.max(descriptor.common.segment_id);
566 }
567 for descriptor in &catalog.time_segments {
568 max_id = max_id.max(descriptor.common.segment_id);
569 }
570 #[cfg(feature = "temporal_track")]
571 for descriptor in &catalog.temporal_segments {
572 max_id = max_id.max(descriptor.common.segment_id);
573 }
574 #[cfg(feature = "parallel_segments")]
575 for descriptor in &catalog.index_segments {
576 max_id = max_id.max(descriptor.common.segment_id);
577 }
578 if max_id > 0 {
579 catalog.next_segment_id = max_id.saturating_add(1);
580 }
581 }
582 }
583
584 #[cfg(feature = "parallel_segments")]
585 fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
586 if entries.is_empty() {
587 return;
588 }
589 for entry in entries {
590 let duplicate = self
591 .toc
592 .segment_catalog
593 .index_segments
594 .iter()
595 .any(|existing| existing.common.segment_id == entry.common.segment_id);
596 if !duplicate {
597 self.toc.segment_catalog.index_segments.push(entry);
598 }
599 }
600 }
601
602 fn load_memories_track(&mut self) -> Result<()> {
604 let manifest = match &self.toc.memories_track {
605 Some(m) => m,
606 None => return Ok(()),
607 };
608
609 let mut buf = vec![0u8; manifest.bytes_length as usize];
611 self.file
612 .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
613 self.file.read_exact(&mut buf)?;
614
615 let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
617 if actual_checksum != manifest.checksum {
618 return Err(MemvidError::InvalidToc {
619 reason: "memories track checksum mismatch".into(),
620 });
621 }
622
623 self.memories_track = MemoriesTrack::deserialize(&buf)?;
625
626 Ok(())
627 }
628
629 fn load_logic_mesh(&mut self) -> Result<()> {
631 let manifest = match &self.toc.logic_mesh {
632 Some(m) => m,
633 None => return Ok(()),
634 };
635
636 let mut buf = vec![0u8; manifest.bytes_length as usize];
638 self.file
639 .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
640 self.file.read_exact(&mut buf)?;
641
642 let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
644 if actual_checksum != manifest.checksum {
645 return Err(MemvidError::InvalidToc {
646 reason: "logic mesh checksum mismatch".into(),
647 });
648 }
649
650 self.logic_mesh = LogicMesh::deserialize(&buf)?;
652
653 Ok(())
654 }
655
656 #[cfg(feature = "temporal_track")]
657 pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
658 if self.temporal_track.is_some() {
659 return Ok(());
660 }
661 let manifest = match &self.toc.temporal_track {
662 Some(manifest) => manifest.clone(),
663 None => return Ok(()),
664 };
665 if manifest.bytes_length == 0 {
666 return Ok(());
667 }
668 let file_len = self.file.metadata()?.len();
669 let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
670 return Ok(());
671 };
672 if end > file_len {
673 return Ok(());
674 }
675 match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
676 Ok(track) => self.temporal_track = Some(track),
677 Err(MemvidError::InvalidTemporalTrack { .. }) => {
678 return Ok(());
679 }
680 Err(err) => return Err(err),
681 }
682 Ok(())
683 }
684
685 #[cfg(feature = "temporal_track")]
686 pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
687 self.ensure_temporal_track_loaded()?;
688 Ok(self.temporal_track.as_ref())
689 }
690
691 #[cfg(feature = "temporal_track")]
692 pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
693 self.ensure_temporal_track_loaded()?;
694 let Some(track) = self.temporal_track.as_ref() else {
695 return Ok(None);
696 };
697 if !track.capabilities().has_anchors {
698 return Ok(None);
699 }
700 Ok(track
701 .anchor_for_frame(frame_id)
702 .map(|anchor| anchor.anchor_ts))
703 }
704
705 #[cfg(feature = "temporal_track")]
706 pub(crate) fn clear_temporal_track_cache(&mut self) {
707 self.temporal_track = None;
708 }
709
710 #[cfg(feature = "temporal_track")]
711 pub(crate) fn effective_temporal_timestamp(
712 &mut self,
713 frame_id: FrameId,
714 fallback: i64,
715 ) -> Result<i64> {
716 Ok(self
717 .temporal_anchor_timestamp(frame_id)?
718 .unwrap_or(fallback))
719 }
720
721 #[cfg(not(feature = "temporal_track"))]
722 pub(crate) fn effective_temporal_timestamp(
723 &mut self,
724 _frame_id: crate::types::FrameId,
725 fallback: i64,
726 ) -> Result<i64> {
727 Ok(fallback)
728 }
729
730 pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
735 self.toc.memory_binding.as_ref()
736 }
737
738 pub fn bind_memory(
747 &mut self,
748 binding: crate::types::MemoryBinding,
749 ticket: crate::types::Ticket,
750 ) -> Result<()> {
751 if let Some(existing) = self.get_memory_binding() {
753 if existing.memory_id != binding.memory_id {
754 return Err(MemvidError::MemoryAlreadyBound {
755 existing_memory_id: existing.memory_id,
756 existing_memory_name: existing.memory_name.clone(),
757 bound_at: existing.bound_at.to_rfc3339(),
758 });
759 }
760 }
761
762 self.apply_ticket(ticket)?;
764
765 self.toc.memory_binding = Some(binding);
767 self.dirty = true;
768
769 Ok(())
770 }
771
772 pub fn unbind_memory(&mut self) -> Result<()> {
776 self.toc.memory_binding = None;
777 self.toc.ticket_ref = crate::types::TicketRef {
779 issuer: "free-tier".into(),
780 seq_no: 1,
781 expires_in_secs: 0,
782 capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
783 };
784 self.dirty = true;
785 Ok(())
786 }
787}
788
789pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
790 use crate::footer::{CommitFooter, FOOTER_SIZE};
791
792 let len = file.metadata()?.len();
793 if len < header.footer_offset {
794 return Err(MemvidError::InvalidToc {
795 reason: "footer offset beyond file length".into(),
796 });
797 }
798
799 file.seek(SeekFrom::Start(header.footer_offset))?;
801 let total_size = (len - header.footer_offset) as usize;
802
803 if total_size < FOOTER_SIZE {
804 return Err(MemvidError::InvalidToc {
805 reason: "region too small to contain footer".into(),
806 });
807 }
808
809 let mut buf = Vec::with_capacity(total_size);
810 file.read_to_end(&mut buf)?;
811
812 let footer_start = buf.len() - FOOTER_SIZE;
814 let footer_bytes = &buf[footer_start..];
815 let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
816 reason: "failed to decode commit footer".into(),
817 })?;
818
819 let toc_bytes = &buf[..footer_start];
821 if toc_bytes.len() != footer.toc_len as usize {
822 return Err(MemvidError::InvalidToc {
823 reason: "toc length mismatch".into(),
824 });
825 }
826 if !footer.hash_matches(toc_bytes) {
827 return Err(MemvidError::InvalidToc {
828 reason: "commit footer toc hash mismatch".into(),
829 });
830 }
831
832 verify_toc_prefix(toc_bytes)?;
833 let toc = Toc::decode(toc_bytes)?;
834 Ok(toc)
835}
836
837fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
838 const MAX_SEGMENTS: u64 = 1_000_000;
839 const MAX_FRAMES: u64 = 1_000_000;
840 const MIN_SEGMENT_META_BYTES: u64 = 32;
841 const MIN_FRAME_BYTES: u64 = 64;
842 let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
845 let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
846 reason: context.to_string().into(),
847 })?;
848 let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
849 reason: context.to_string().into(),
850 })?;
851 Ok(u64::from_le_bytes(array))
852 };
853
854 if bytes.len() < 24 {
855 return Err(MemvidError::InvalidToc {
856 reason: "toc trailer too small".into(),
857 });
858 }
859 let toc_version = read_u64(0..8, "toc version missing or truncated")?;
860 if toc_version > 32 {
861 return Err(MemvidError::InvalidToc {
862 reason: "toc version unreasonable".into(),
863 });
864 }
865 let segments_len = read_u64(8..16, "segment count missing or truncated")?;
866 if segments_len > MAX_SEGMENTS {
867 return Err(MemvidError::InvalidToc {
868 reason: "segment count unreasonable".into(),
869 });
870 }
871 let frames_len = read_u64(16..24, "frame count missing or truncated")?;
872 if frames_len > MAX_FRAMES {
873 return Err(MemvidError::InvalidToc {
874 reason: "frame count unreasonable".into(),
875 });
876 }
877 let required = segments_len
878 .saturating_mul(MIN_SEGMENT_META_BYTES)
879 .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
880 if required > bytes.len() as u64 {
881 return Err(MemvidError::InvalidToc {
882 reason: "toc payload inconsistent with counts".into(),
883 });
884 }
885 Ok(())
886}
887
888fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
890 let mut previous_end = 0u64;
891 for frame in toc
892 .frames
893 .iter()
894 .filter(|f| f.status == FrameStatus::Active)
895 {
896 let end = frame
897 .payload_offset
898 .checked_add(frame.payload_length)
899 .ok_or_else(|| MemvidError::InvalidToc {
900 reason: "frame payload offsets overflow".into(),
901 })?;
902 if end > file_len {
903 return Err(MemvidError::InvalidToc {
904 reason: "frame payload exceeds file length".into(),
905 });
906 }
907 if frame.payload_offset < previous_end {
908 return Err(MemvidError::InvalidToc {
909 reason: "frame payloads overlap or are out of order".into(),
910 });
911 }
912 previous_end = end;
913 }
914 Ok(())
915}
916
917pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
918 let len = file.metadata()?.len();
919 let mmap = unsafe { Mmap::map(&*file)? };
921 tracing::debug!(file_len = len, "attempting toc recovery");
922
923 if let Some(footer_slice) = find_last_valid_footer(&mmap) {
925 tracing::debug!(
926 footer_offset = footer_slice.footer_offset,
927 toc_offset = footer_slice.toc_offset,
928 toc_len = footer_slice.toc_bytes.len(),
929 "found valid footer during recovery"
930 );
931 match Toc::decode(footer_slice.toc_bytes) {
933 Ok(toc) => {
934 return Ok((toc, footer_slice.toc_offset as u64));
935 }
936 Err(err) => {
937 tracing::warn!(
938 error = %err,
939 "footer-validated TOC failed to decode, falling back to scan"
940 );
941 }
942 }
943 }
944
945 if let Some(hint_offset) = hint {
949 use crate::footer::FOOTER_SIZE;
950
951 let start = (hint_offset.min(len)) as usize;
952 if mmap.len().saturating_sub(start) >= FOOTER_SIZE {
953 let toc_end = mmap.len().saturating_sub(FOOTER_SIZE);
954 if toc_end > start {
955 let toc_bytes = &mmap[start..toc_end];
956 if verify_toc_prefix(toc_bytes).is_ok() {
957 let attempt = panic::catch_unwind(|| Toc::decode(toc_bytes));
958 if let Ok(Ok(toc)) = attempt {
959 tracing::debug!(
960 recovered_offset = hint_offset,
961 recovered_frames = toc.frames.len(),
962 "recovered toc from hinted offset without validated footer"
963 );
964 return Ok((toc, hint_offset));
965 }
966 }
967 }
968 }
969 }
970
971 let mut ranges = Vec::new();
973 if let Some(hint_offset) = hint {
974 let hint_idx = hint_offset.min(len) as usize;
975 ranges.push((hint_idx, mmap.len()));
976 if hint_idx > 0 {
977 ranges.push((0, hint_idx));
978 }
979 } else {
980 ranges.push((0, mmap.len()));
981 }
982
983 for (start, end) in ranges {
984 if let Some(found) = scan_range_for_toc(&mmap, start, end) {
985 return Ok(found);
986 }
987 }
988
989 Err(MemvidError::InvalidToc {
990 reason: "unable to recover table of contents from file trailer".into(),
991 })
992}
993
994fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
995 if start >= end || end > data.len() {
996 return None;
997 }
998 const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
999 const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
1000
1001 let min_offset = data.len().saturating_sub(MAX_TOC_BYTES);
1004 let scan_start = start.max(min_offset);
1005
1006 for offset in (scan_start..end).rev() {
1007 let slice = &data[offset..];
1008 if slice.len() < 16 {
1009 continue;
1010 }
1011 debug_assert!(slice.len() <= MAX_TOC_BYTES);
1012
1013 if slice.len() < ZERO_CHECKSUM.len() {
1015 continue;
1016 }
1017 let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
1018 let mut hasher = Hasher::new();
1019 hasher.update(body);
1020 hasher.update(&ZERO_CHECKSUM);
1021 if hasher.finalize().as_bytes() != stored_checksum {
1022 continue;
1023 }
1024 if verify_toc_prefix(slice).is_err() {
1025 continue;
1026 }
1027 let attempt = panic::catch_unwind(|| Toc::decode(slice));
1028 if let Ok(Ok(toc)) = attempt {
1029 let recovered_offset = offset as u64;
1030 tracing::debug!(
1031 recovered_offset,
1032 recovered_frames = toc.frames.len(),
1033 "recovered toc via scan"
1034 );
1035 return Some((toc, recovered_offset));
1036 }
1037 }
1038 None
1039}
1040
1041pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
1042 toc.toc_checksum = [0u8; 32];
1043 let bytes = toc.encode()?;
1044 let checksum = Toc::calculate_checksum(&bytes);
1045 toc.toc_checksum = checksum;
1046 toc.encode()
1047}
1048
1049pub(crate) fn empty_toc() -> Toc {
1050 Toc {
1051 toc_version: 0,
1052 segments: Vec::new(),
1053 frames: Vec::new(),
1054 indexes: IndexManifests::default(),
1055 time_index: None,
1056 temporal_track: None,
1057 memories_track: None,
1058 logic_mesh: None,
1059 segment_catalog: SegmentCatalog::default(),
1060 ticket_ref: TicketRef {
1061 issuer: "free-tier".into(),
1062 seq_no: 1,
1063 expires_in_secs: 0,
1064 capacity_bytes: Tier::Free.capacity_bytes(),
1065 },
1066 memory_binding: None,
1067 replay_manifest: None,
1068 merkle_root: [0u8; 32],
1069 toc_checksum: [0u8; 32],
1070 }
1071}
1072
1073pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1074 let wal_region_end = header.wal_offset.saturating_add(header.wal_size);
1081 let mut max_end = wal_region_end.max(header.footer_offset);
1082
1083 for frame in toc
1085 .frames
1086 .iter()
1087 .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1088 {
1089 if let Some(end) = frame.payload_offset.checked_add(frame.payload_length) {
1090 max_end = max_end.max(end);
1091 }
1092 }
1093
1094 let catalog = &toc.segment_catalog;
1096 for seg in &catalog.lex_segments {
1097 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1098 max_end = max_end.max(end);
1099 }
1100 }
1101 for seg in &catalog.vec_segments {
1102 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1103 max_end = max_end.max(end);
1104 }
1105 }
1106 for seg in &catalog.time_segments {
1107 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1108 max_end = max_end.max(end);
1109 }
1110 }
1111 #[cfg(feature = "temporal_track")]
1112 for seg in &catalog.temporal_segments {
1113 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1114 max_end = max_end.max(end);
1115 }
1116 }
1117 #[cfg(feature = "lex")]
1118 for seg in &catalog.tantivy_segments {
1119 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1120 max_end = max_end.max(end);
1121 }
1122 }
1123 #[cfg(feature = "parallel_segments")]
1124 for seg in &catalog.index_segments {
1125 if let Some(end) = seg.common.bytes_offset.checked_add(seg.common.bytes_length) {
1126 max_end = max_end.max(end);
1127 }
1128 }
1129
1130 if let Some(manifest) = toc.indexes.lex.as_ref() {
1132 if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1133 max_end = max_end.max(end);
1134 }
1135 }
1136 if let Some(manifest) = toc.indexes.vec.as_ref() {
1137 if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1138 max_end = max_end.max(end);
1139 }
1140 }
1141 if let Some(manifest) = toc.indexes.clip.as_ref() {
1142 if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1143 max_end = max_end.max(end);
1144 }
1145 }
1146 if let Some(manifest) = toc.time_index.as_ref() {
1147 if let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) {
1148 max_end = max_end.max(end);
1149 }
1150 }
1151 #[cfg(feature = "temporal_track")]
1152 if let Some(track) = toc.temporal_track.as_ref() {
1153 if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1154 max_end = max_end.max(end);
1155 }
1156 }
1157 if let Some(track) = toc.memories_track.as_ref() {
1158 if let Some(end) = track.bytes_offset.checked_add(track.bytes_length) {
1159 max_end = max_end.max(end);
1160 }
1161 }
1162 if let Some(mesh) = toc.logic_mesh.as_ref() {
1163 if let Some(end) = mesh.bytes_offset.checked_add(mesh.bytes_length) {
1164 max_end = max_end.max(end);
1165 }
1166 }
1167 #[cfg(feature = "replay")]
1168 if let Some(manifest) = toc.replay_manifest.as_ref() {
1169 if let Some(end) = manifest
1170 .segment_offset
1171 .checked_add(manifest.segment_size)
1172 {
1173 max_end = max_end.max(end);
1174 }
1175 }
1176
1177 tracing::debug!(
1178 wal_region_end,
1179 footer_offset = header.footer_offset,
1180 computed_data_end = max_end,
1181 "compute_data_end"
1182 );
1183
1184 max_end
1185}
1186
1187struct TailSnapshot {
1188 toc: Toc,
1189 footer_offset: u64,
1190 data_end: u64,
1191 generation: u64,
1192}
1193
1194fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1195 const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1196 if mmap.is_empty() {
1197 return None;
1198 }
1199 let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1200 loop {
1201 let start = mmap.len() - window;
1202 if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1203 return Some((slice, start));
1204 }
1205 if window == mmap.len() {
1206 break;
1207 }
1208 window = (window * 2).min(mmap.len());
1209 }
1210 None
1211}
1212
1213fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1214 let mmap = unsafe { Mmap::map(file)? };
1216
1217 let (slice, offset_adjustment) =
1218 locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1219 reason: "no valid commit footer found".into(),
1220 })?;
1221 let toc = Toc::decode(slice.toc_bytes)?;
1222 toc.verify_checksum()?;
1223
1224 Ok(TailSnapshot {
1225 toc,
1226 footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1227 data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1229 generation: slice.footer.generation,
1230 })
1231}
1232
1233fn detect_generation(file: &File) -> Result<Option<u64>> {
1234 let mmap = unsafe { Mmap::map(file)? };
1236
1237 Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1238}
1239
1240pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1241 if let Some(parent) = path.parent() {
1242 let name = path
1243 .file_name()
1244 .and_then(|n| n.to_str())
1245 .unwrap_or_default();
1246 let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1247 for suffix in forbidden {
1248 let candidate = parent.join(format!("{name}{suffix}"));
1249 if candidate.exists() {
1250 return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1251 }
1252 }
1253 let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1254 for suffix in hidden_forbidden {
1255 let candidate = parent.join(format!(".{name}{suffix}"));
1256 if candidate.exists() {
1257 return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1258 }
1259 }
1260 }
1261 Ok(())
1262}
1263
1264#[cfg(feature = "parallel_segments")]
1265fn manifest_wal_path(path: &Path) -> PathBuf {
1266 let mut wal_path = path.to_path_buf();
1267 wal_path.set_extension("manifest.wal");
1268 wal_path
1269}
1270
1271#[cfg(feature = "parallel_segments")]
1272pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1273 let wal_path = manifest_wal_path(path);
1274 if wal_path.exists() {
1275 let _ = std::fs::remove_file(&wal_path);
1276 }
1277}
1278
1279pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1282 toc.segment_catalog.lex_enabled
1283 || toc.indexes.lex.is_some()
1284 || !toc.indexes.lex_segments.is_empty()
1285 || !toc.segment_catalog.tantivy_segments.is_empty()
1286}
1287
1288#[cfg(feature = "lex")]
1291pub(crate) fn lex_doc_count(
1292 toc: &Toc,
1293 lex_storage: &crate::search::EmbeddedLexStorage,
1294) -> Option<u64> {
1295 if let Some(manifest) = &toc.indexes.lex {
1297 if manifest.doc_count > 0 {
1298 return Some(manifest.doc_count);
1299 }
1300 }
1301
1302 let storage_count = lex_storage.doc_count();
1304 if storage_count > 0 {
1305 return Some(storage_count);
1306 }
1307
1308 None
1312}
1313
1314#[allow(dead_code)]
1317fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1318 let data_limit = header.footer_offset;
1319
1320 #[cfg(feature = "replay")]
1324 if let Some(manifest) = toc.replay_manifest.as_ref() {
1325 if manifest.segment_size != 0 {
1326 let end = manifest
1327 .segment_offset
1328 .checked_add(manifest.segment_size)
1329 .ok_or_else(|| MemvidError::Doctor {
1330 reason: format!(
1331 "Replay segment offset overflow: {} + {}",
1332 manifest.segment_offset, manifest.segment_size
1333 ),
1334 })?;
1335
1336 if end > file_len {
1339 return Err(MemvidError::Doctor {
1340 reason: format!(
1341 "Replay segment out of bounds: offset={}, length={}, end={}, file_len={}",
1342 manifest.segment_offset, manifest.segment_size, end, file_len
1343 ),
1344 });
1345 }
1346 }
1347 }
1348
1349 for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1351 let offset = seg.common.bytes_offset;
1352 let length = seg.common.bytes_length;
1353
1354 if length == 0 {
1355 continue; }
1357
1358 let end = offset
1359 .checked_add(length)
1360 .ok_or_else(|| MemvidError::Doctor {
1361 reason: format!(
1362 "Tantivy segment {} offset overflow: {} + {}",
1363 idx, offset, length
1364 ),
1365 })?;
1366
1367 if end > file_len || end > data_limit {
1368 return Err(MemvidError::Doctor {
1369 reason: format!(
1370 "Tantivy segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1371 idx, offset, length, end, file_len, data_limit
1372 ),
1373 });
1374 }
1375 }
1376
1377 for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1379 let offset = seg.common.bytes_offset;
1380 let length = seg.common.bytes_length;
1381
1382 if length == 0 {
1383 continue;
1384 }
1385
1386 let end = offset
1387 .checked_add(length)
1388 .ok_or_else(|| MemvidError::Doctor {
1389 reason: format!(
1390 "Time segment {} offset overflow: {} + {}",
1391 idx, offset, length
1392 ),
1393 })?;
1394
1395 if end > file_len || end > data_limit {
1396 return Err(MemvidError::Doctor {
1397 reason: format!(
1398 "Time segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1399 idx, offset, length, end, file_len, data_limit
1400 ),
1401 });
1402 }
1403 }
1404
1405 for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1407 let offset = seg.common.bytes_offset;
1408 let length = seg.common.bytes_length;
1409
1410 if length == 0 {
1411 continue;
1412 }
1413
1414 let end = offset
1415 .checked_add(length)
1416 .ok_or_else(|| MemvidError::Doctor {
1417 reason: format!(
1418 "Vec segment {} offset overflow: {} + {}",
1419 idx, offset, length
1420 ),
1421 })?;
1422
1423 if end > file_len || end > data_limit {
1424 return Err(MemvidError::Doctor {
1425 reason: format!(
1426 "Vec segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1427 idx, offset, length, end, file_len, data_limit
1428 ),
1429 });
1430 }
1431 }
1432
1433 log::debug!("✓ Segment integrity validation passed");
1434 Ok(())
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439 use super::*;
1440 use tempfile::tempdir;
1441
1442 #[test]
1443 fn toc_prefix_underflow_surfaces_reason() {
1444 let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1445 match err {
1446 MemvidError::InvalidToc { reason } => {
1447 assert!(
1448 reason.contains("trailer too small"),
1449 "unexpected reason: {reason}"
1450 );
1451 }
1452 other => panic!("unexpected error: {other:?}"),
1453 }
1454 }
1455
1456 #[test]
1457 fn ensure_single_file_blocks_sidecars() {
1458 let dir = tempdir().expect("tmp");
1459 let path = dir.path().join("mem.mv2");
1460 std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1461 let result = Memvid::create(&path);
1462 assert!(matches!(
1463 result,
1464 Err(MemvidError::AuxiliaryFileDetected { .. })
1465 ));
1466 }
1467}