1use std::convert::TryInto;
10use std::fs::{File, OpenOptions};
11use std::io::{Read, Seek, SeekFrom};
12use std::panic;
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, RwLock};
15
16use crate::constants::{MAGIC, SPEC_VERSION, WAL_OFFSET, WAL_SIZE_MEDIUM};
17use crate::error::{MemvidError, Result};
18use crate::footer::{FooterSlice, find_last_valid_footer};
19use crate::io::header::HeaderCodec;
20#[cfg(feature = "parallel_segments")]
21use crate::io::manifest_wal::ManifestWal;
22use crate::io::wal::EmbeddedWal;
23use crate::lock::{FileLock, LockMode};
24#[cfg(feature = "lex")]
25use crate::search::{EmbeddedLexStorage, TantivyEngine};
26#[cfg(feature = "temporal_track")]
27use crate::types::FrameId;
28#[cfg(feature = "parallel_segments")]
29use crate::types::IndexSegmentRef;
30use crate::types::{
31 FrameStatus, Header, IndexManifests, LogicMesh, MemoriesTrack, SegmentCatalog, TicketRef, Tier,
32 Toc, VectorCompression,
33};
34#[cfg(feature = "temporal_track")]
35use crate::{TemporalTrack, temporal_track_read};
36use crate::{lex::LexIndex, vec::VecIndex};
37use blake3::Hasher;
38use memmap2::Mmap;
39
40const DEFAULT_LOCK_TIMEOUT_MS: u64 = 250;
41const DEFAULT_HEARTBEAT_MS: u64 = 2_000;
42const DEFAULT_STALE_GRACE_MS: u64 = 10_000;
43
44pub struct Memvid {
49 pub(crate) file: File,
50 pub(crate) path: PathBuf,
51 pub(crate) lock: FileLock,
52 pub(crate) read_only: bool,
53 pub(crate) header: Header,
54 pub(crate) toc: Toc,
55 pub(crate) wal: EmbeddedWal,
56 pub(crate) data_end: u64,
57 pub(crate) generation: u64,
58 pub(crate) lock_settings: LockSettings,
59 pub(crate) lex_enabled: bool,
60 pub(crate) lex_index: Option<LexIndex>,
61 #[cfg(feature = "lex")]
62 #[allow(dead_code)]
63 pub(crate) lex_storage: Arc<RwLock<EmbeddedLexStorage>>,
64 pub(crate) vec_enabled: bool,
65 pub(crate) vec_compression: VectorCompression,
66 pub(crate) vec_index: Option<VecIndex>,
67 pub(crate) clip_enabled: bool,
69 pub(crate) clip_index: Option<crate::clip::ClipIndex>,
70 pub(crate) dirty: bool,
71 #[cfg(feature = "lex")]
72 pub(crate) tantivy: Option<TantivyEngine>,
73 #[cfg(feature = "lex")]
74 pub(crate) tantivy_dirty: bool,
75 #[cfg(feature = "temporal_track")]
76 pub(crate) temporal_track: Option<TemporalTrack>,
77 #[cfg(feature = "parallel_segments")]
78 pub(crate) manifest_wal: Option<ManifestWal>,
79 pub(crate) memories_track: MemoriesTrack,
81 pub(crate) logic_mesh: LogicMesh,
83}
84
85#[derive(Debug, Clone, Copy)]
87pub struct OpenReadOptions {
88 pub allow_repair: bool,
89}
90
91#[derive(Debug, Clone)]
92pub struct LockSettings {
93 pub timeout_ms: u64,
94 pub heartbeat_ms: u64,
95 pub stale_grace_ms: u64,
96 pub force_stale: bool,
97 pub command: Option<String>,
98}
99
100impl Default for LockSettings {
101 fn default() -> Self {
102 Self {
103 timeout_ms: DEFAULT_LOCK_TIMEOUT_MS,
104 heartbeat_ms: DEFAULT_HEARTBEAT_MS,
105 stale_grace_ms: DEFAULT_STALE_GRACE_MS,
106 force_stale: false,
107 command: None,
108 }
109 }
110}
111
112impl Default for OpenReadOptions {
113 fn default() -> Self {
114 Self {
115 allow_repair: false,
116 }
117 }
118}
119
120impl Memvid {
121 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
124 let path_ref = path.as_ref();
125 ensure_single_file(path_ref)?;
126
127 OpenOptions::new()
128 .read(true)
129 .write(true)
130 .create(true)
131 .truncate(true)
132 .open(path_ref)?;
133 let (mut file, lock) = FileLock::open_and_lock(path_ref)?;
134
135 let header = Header {
136 magic: MAGIC,
137 version: SPEC_VERSION,
138 footer_offset: WAL_OFFSET + WAL_SIZE_MEDIUM,
139 wal_offset: WAL_OFFSET,
140 wal_size: WAL_SIZE_MEDIUM,
141 wal_checkpoint_pos: 0,
142 wal_sequence: 0,
143 toc_checksum: [0u8; 32],
144 };
145
146 let mut toc = empty_toc();
147 #[cfg(feature = "lex")]
149 {
150 toc.segment_catalog.lex_enabled = true;
151 }
152 file.set_len(header.footer_offset)?;
153 HeaderCodec::write(&mut file, &header)?;
154
155 let wal = EmbeddedWal::open(&file, &header)?;
156 let data_end = header.footer_offset;
157 #[cfg(feature = "lex")]
158 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::new()));
159 #[cfg(feature = "parallel_segments")]
160 let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
161 #[cfg(feature = "parallel_segments")]
162 let manifest_wal_entries = manifest_wal.replay()?;
163
164 let mut memvid = Self {
165 file,
166 path: path_ref.to_path_buf(),
167 lock,
168 read_only: false,
169 header,
170 toc,
171 wal,
172 data_end,
173 generation: 0,
174 lock_settings: LockSettings::default(),
175 lex_enabled: cfg!(feature = "lex"), lex_index: None,
177 #[cfg(feature = "lex")]
178 lex_storage,
179 vec_enabled: cfg!(feature = "vec"), vec_compression: VectorCompression::None,
181 vec_index: None,
182 clip_enabled: cfg!(feature = "clip"), clip_index: None,
184 dirty: false,
185 #[cfg(feature = "lex")]
186 tantivy: None,
187 #[cfg(feature = "lex")]
188 tantivy_dirty: false,
189 #[cfg(feature = "temporal_track")]
190 temporal_track: None,
191 #[cfg(feature = "parallel_segments")]
192 manifest_wal: Some(manifest_wal),
193 memories_track: MemoriesTrack::new(),
194 logic_mesh: LogicMesh::new(),
195 };
196
197 #[cfg(feature = "lex")]
198 memvid.init_tantivy()?;
199
200 #[cfg(feature = "parallel_segments")]
201 memvid.load_manifest_segments(manifest_wal_entries);
202
203 memvid.bootstrap_segment_catalog();
204
205 let empty_offset = memvid.data_end;
207 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
208 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
209
210 #[cfg(feature = "lex")]
211 if memvid.lex_enabled && memvid.toc.indexes.lex.is_none() {
212 memvid.toc.indexes.lex = Some(crate::types::LexIndexManifest {
213 doc_count: 0,
214 generation: 0,
215 bytes_offset: empty_offset,
216 bytes_length: 0,
217 checksum: empty_checksum,
218 });
219 }
220
221 #[cfg(feature = "vec")]
222 if memvid.vec_enabled && memvid.toc.indexes.vec.is_none() {
223 memvid.toc.indexes.vec = Some(crate::types::VecIndexManifest {
224 vector_count: 0,
225 dimension: 0,
226 bytes_offset: empty_offset,
227 bytes_length: 0,
228 checksum: empty_checksum,
229 compression_mode: memvid.vec_compression.clone(),
230 });
231 }
232
233 memvid.rewrite_toc_footer()?;
234 memvid.header.toc_checksum = memvid.toc.toc_checksum;
235 crate::persist_header(&mut memvid.file, &memvid.header)?;
236 memvid.file.sync_all()?;
237 Ok(memvid)
238 }
239
240 pub fn lock_settings(&self) -> &LockSettings {
241 &self.lock_settings
242 }
243
244 pub fn lock_settings_mut(&mut self) -> &mut LockSettings {
245 &mut self.lock_settings
246 }
247
248 pub fn set_vector_compression(&mut self, compression: VectorCompression) {
251 self.vec_compression = compression;
252 }
253
254 pub fn vector_compression(&self) -> &VectorCompression {
256 &self.vec_compression
257 }
258
259 fn open_locked(mut file: File, lock: FileLock, path_ref: &Path) -> Result<Self> {
260 let mut header = HeaderCodec::read(&mut file)?;
261 let toc = match read_toc(&mut file, &header) {
262 Ok(toc) => toc,
263 Err(err @ MemvidError::Decode(_)) | Err(err @ MemvidError::InvalidToc { .. }) => {
264 tracing::info!("toc decode failed ({}); attempting recovery", err);
265 let (toc, recovered_offset) = recover_toc(&mut file, Some(header.footer_offset))?;
266 if recovered_offset != header.footer_offset
267 || header.toc_checksum != toc.toc_checksum
268 {
269 header.footer_offset = recovered_offset;
270 header.toc_checksum = toc.toc_checksum;
271 crate::persist_header(&mut file, &header)?;
272 }
273 toc
274 }
275 Err(err) => return Err(err),
276 };
277 let checksum_result = toc.verify_checksum();
278
279 let file_len = file.metadata().map(|m| m.len()).unwrap_or(0);
281 if let Err(e) = validate_segment_integrity(&toc, &header, file_len) {
282 tracing::warn!("Segment integrity validation failed: {}", e);
283 }
286 ensure_non_overlapping_frames(&toc, file_len)?;
287
288 let wal = EmbeddedWal::open(&file, &header)?;
289 #[cfg(feature = "lex")]
290 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
291 toc.indexes.lex.as_ref(),
292 &toc.indexes.lex_segments,
293 )));
294 #[cfg(feature = "parallel_segments")]
295 let manifest_wal = ManifestWal::open(manifest_wal_path(path_ref))?;
296 #[cfg(feature = "parallel_segments")]
297 let manifest_wal_entries = manifest_wal.replay()?;
298
299 let generation = detect_generation(&file)?.unwrap_or(0);
300 let read_only = lock.mode() == LockMode::Shared;
301
302 let mut memvid = Self {
303 file,
304 path: path_ref.to_path_buf(),
305 lock,
306 read_only,
307 header,
308 toc,
309 wal,
310 data_end: 0,
311 generation,
312 lock_settings: LockSettings::default(),
313 lex_enabled: false,
314 lex_index: None,
315 #[cfg(feature = "lex")]
316 lex_storage,
317 vec_enabled: false,
318 vec_compression: VectorCompression::None,
319 vec_index: None,
320 clip_enabled: false,
321 clip_index: None,
322 dirty: false,
323 #[cfg(feature = "lex")]
324 tantivy: None,
325 #[cfg(feature = "lex")]
326 tantivy_dirty: false,
327 #[cfg(feature = "temporal_track")]
328 temporal_track: None,
329 #[cfg(feature = "parallel_segments")]
330 manifest_wal: Some(manifest_wal),
331 memories_track: MemoriesTrack::new(),
332 logic_mesh: LogicMesh::new(),
333 };
334 memvid.data_end = compute_data_end(&memvid.toc, &memvid.header);
335 memvid.lex_enabled = has_lex_index(&memvid.toc);
337 if memvid.lex_enabled {
338 memvid.load_lex_index_from_manifest()?;
339 }
340 #[cfg(feature = "lex")]
341 {
342 memvid.init_tantivy()?;
343 }
344 memvid.vec_enabled =
345 memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
346 if memvid.vec_enabled {
347 memvid.load_vec_index_from_manifest()?;
348 }
349 memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
350 if memvid.clip_enabled {
351 memvid.load_clip_index_from_manifest()?;
352 }
353 memvid.recover_wal()?;
354 #[cfg(feature = "parallel_segments")]
355 memvid.load_manifest_segments(manifest_wal_entries);
356 memvid.bootstrap_segment_catalog();
357 #[cfg(feature = "temporal_track")]
358 memvid.ensure_temporal_track_loaded()?;
359 memvid.load_memories_track()?;
360 memvid.load_logic_mesh()?;
361 if checksum_result.is_err() {
362 memvid.toc.verify_checksum()?;
363 if memvid.toc.toc_checksum != memvid.header.toc_checksum {
364 memvid.header.toc_checksum = memvid.toc.toc_checksum;
365 crate::persist_header(&mut memvid.file, &memvid.header)?;
366 memvid.file.sync_all()?;
367 }
368 }
369 Ok(memvid)
370 }
371
372 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
374 let path_ref = path.as_ref();
375 ensure_single_file(path_ref)?;
376
377 let (file, lock) = FileLock::open_and_lock(path_ref)?;
378 Self::open_locked(file, lock, path_ref)
379 }
380
381 pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
382 Self::open_read_only_with_options(path, OpenReadOptions::default())
383 }
384
385 pub fn open_read_only_with_options<P: AsRef<Path>>(
386 path: P,
387 options: OpenReadOptions,
388 ) -> Result<Self> {
389 let path_ref = path.as_ref();
390 ensure_single_file(path_ref)?;
391
392 if options.allow_repair {
393 return Self::open(path_ref);
394 }
395
396 Self::open_read_only_snapshot(path_ref)
397 }
398
399 fn open_read_only_snapshot(path_ref: &Path) -> Result<Self> {
400 let mut file = OpenOptions::new().read(true).write(true).open(path_ref)?;
401 let TailSnapshot {
402 toc,
403 footer_offset,
404 data_end,
405 generation,
406 } = load_tail_snapshot(&file)?;
407
408 let mut header = HeaderCodec::read(&mut file)?;
409 header.footer_offset = footer_offset;
410 header.toc_checksum = toc.toc_checksum;
411
412 let lock = FileLock::acquire_with_mode(&file, LockMode::Shared)?;
413 let wal = EmbeddedWal::open_read_only(&file, &header)?;
414
415 #[cfg(feature = "lex")]
416 let lex_storage = Arc::new(RwLock::new(EmbeddedLexStorage::from_manifest(
417 toc.indexes.lex.as_ref(),
418 &toc.indexes.lex_segments,
419 )));
420
421 let mut memvid = Self {
422 file,
423 path: path_ref.to_path_buf(),
424 lock,
425 read_only: true,
426 header,
427 toc,
428 wal,
429 data_end,
430 generation,
431 lock_settings: LockSettings::default(),
432 lex_enabled: false,
433 lex_index: None,
434 #[cfg(feature = "lex")]
435 lex_storage,
436 vec_enabled: false,
437 vec_compression: VectorCompression::None,
438 vec_index: None,
439 clip_enabled: false,
440 clip_index: None,
441 dirty: false,
442 #[cfg(feature = "lex")]
443 tantivy: None,
444 #[cfg(feature = "lex")]
445 tantivy_dirty: false,
446 #[cfg(feature = "temporal_track")]
447 temporal_track: None,
448 #[cfg(feature = "parallel_segments")]
449 manifest_wal: None,
450 memories_track: MemoriesTrack::new(),
451 logic_mesh: LogicMesh::new(),
452 };
453
454 memvid.lex_enabled = has_lex_index(&memvid.toc);
456 if memvid.lex_enabled {
457 memvid.load_lex_index_from_manifest()?;
458 }
459 #[cfg(feature = "lex")]
460 memvid.init_tantivy()?;
461
462 memvid.vec_enabled =
463 memvid.toc.indexes.vec.is_some() || !memvid.toc.segment_catalog.vec_segments.is_empty();
464 if memvid.vec_enabled {
465 memvid.load_vec_index_from_manifest()?;
466 }
467 memvid.clip_enabled = memvid.toc.indexes.clip.is_some();
468 if memvid.clip_enabled {
469 memvid.load_clip_index_from_manifest()?;
470 }
471 memvid.load_logic_mesh()?;
473
474 memvid.bootstrap_segment_catalog();
475 #[cfg(feature = "temporal_track")]
476 memvid.ensure_temporal_track_loaded()?;
477
478 Ok(memvid)
479 }
480
481 pub(crate) fn try_open<P: AsRef<Path>>(path: P) -> Result<Self> {
482 let path_ref = path.as_ref();
483 ensure_single_file(path_ref)?;
484
485 let file = OpenOptions::new().read(true).write(true).open(path_ref)?;
486 let lock = match FileLock::try_acquire(&file, path_ref)? {
487 Some(lock) => lock,
488 None => {
489 return Err(MemvidError::Lock(
490 "exclusive access unavailable for doctor".to_string(),
491 ));
492 }
493 };
494 Self::open_locked(file, lock, path_ref)
495 }
496
497 fn bootstrap_segment_catalog(&mut self) {
498 let catalog = &mut self.toc.segment_catalog;
499 if catalog.version == 0 {
500 catalog.version = 1;
501 }
502 if catalog.next_segment_id == 0 {
503 let mut max_id = 0u64;
504 for descriptor in &catalog.lex_segments {
505 max_id = max_id.max(descriptor.common.segment_id);
506 }
507 for descriptor in &catalog.vec_segments {
508 max_id = max_id.max(descriptor.common.segment_id);
509 }
510 for descriptor in &catalog.time_segments {
511 max_id = max_id.max(descriptor.common.segment_id);
512 }
513 #[cfg(feature = "temporal_track")]
514 for descriptor in &catalog.temporal_segments {
515 max_id = max_id.max(descriptor.common.segment_id);
516 }
517 #[cfg(feature = "parallel_segments")]
518 for descriptor in &catalog.index_segments {
519 max_id = max_id.max(descriptor.common.segment_id);
520 }
521 if max_id > 0 {
522 catalog.next_segment_id = max_id.saturating_add(1);
523 }
524 }
525 }
526
527 #[cfg(feature = "parallel_segments")]
528 fn load_manifest_segments(&mut self, entries: Vec<IndexSegmentRef>) {
529 if entries.is_empty() {
530 return;
531 }
532 for entry in entries {
533 let duplicate = self
534 .toc
535 .segment_catalog
536 .index_segments
537 .iter()
538 .any(|existing| existing.common.segment_id == entry.common.segment_id);
539 if !duplicate {
540 self.toc.segment_catalog.index_segments.push(entry);
541 }
542 }
543 }
544
545 fn load_memories_track(&mut self) -> Result<()> {
547 let manifest = match &self.toc.memories_track {
548 Some(m) => m,
549 None => return Ok(()),
550 };
551
552 let mut buf = vec![0u8; manifest.bytes_length as usize];
554 self.file
555 .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
556 self.file.read_exact(&mut buf)?;
557
558 let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
560 if actual_checksum != manifest.checksum {
561 return Err(MemvidError::InvalidToc {
562 reason: "memories track checksum mismatch".into(),
563 });
564 }
565
566 self.memories_track = MemoriesTrack::deserialize(&buf)?;
568
569 Ok(())
570 }
571
572 fn load_logic_mesh(&mut self) -> Result<()> {
574 let manifest = match &self.toc.logic_mesh {
575 Some(m) => m,
576 None => return Ok(()),
577 };
578
579 let mut buf = vec![0u8; manifest.bytes_length as usize];
581 self.file
582 .seek(std::io::SeekFrom::Start(manifest.bytes_offset))?;
583 self.file.read_exact(&mut buf)?;
584
585 let actual_checksum: [u8; 32] = blake3::hash(&buf).into();
587 if actual_checksum != manifest.checksum {
588 return Err(MemvidError::InvalidToc {
589 reason: "logic mesh checksum mismatch".into(),
590 });
591 }
592
593 self.logic_mesh = LogicMesh::deserialize(&buf)?;
595
596 Ok(())
597 }
598
599 #[cfg(feature = "temporal_track")]
600 pub(crate) fn ensure_temporal_track_loaded(&mut self) -> Result<()> {
601 if self.temporal_track.is_some() {
602 return Ok(());
603 }
604 let manifest = match &self.toc.temporal_track {
605 Some(manifest) => manifest.clone(),
606 None => return Ok(()),
607 };
608 if manifest.bytes_length == 0 {
609 return Ok(());
610 }
611 let file_len = self.file.metadata()?.len();
612 let Some(end) = manifest.bytes_offset.checked_add(manifest.bytes_length) else {
613 return Ok(());
614 };
615 if end > file_len {
616 return Ok(());
617 }
618 match temporal_track_read(&mut self.file, manifest.bytes_offset, manifest.bytes_length) {
619 Ok(track) => self.temporal_track = Some(track),
620 Err(MemvidError::InvalidTemporalTrack { .. }) => {
621 return Ok(());
622 }
623 Err(err) => return Err(err),
624 }
625 Ok(())
626 }
627
628 #[cfg(feature = "temporal_track")]
629 pub(crate) fn temporal_track_ref(&mut self) -> Result<Option<&TemporalTrack>> {
630 self.ensure_temporal_track_loaded()?;
631 Ok(self.temporal_track.as_ref())
632 }
633
634 #[cfg(feature = "temporal_track")]
635 pub(crate) fn temporal_anchor_timestamp(&mut self, frame_id: FrameId) -> Result<Option<i64>> {
636 self.ensure_temporal_track_loaded()?;
637 let Some(track) = self.temporal_track.as_ref() else {
638 return Ok(None);
639 };
640 if !track.capabilities().has_anchors {
641 return Ok(None);
642 }
643 Ok(track
644 .anchor_for_frame(frame_id)
645 .map(|anchor| anchor.anchor_ts))
646 }
647
648 #[cfg(feature = "temporal_track")]
649 pub(crate) fn clear_temporal_track_cache(&mut self) {
650 self.temporal_track = None;
651 }
652
653 #[cfg(feature = "temporal_track")]
654 pub(crate) fn effective_temporal_timestamp(
655 &mut self,
656 frame_id: FrameId,
657 fallback: i64,
658 ) -> Result<i64> {
659 Ok(self
660 .temporal_anchor_timestamp(frame_id)?
661 .unwrap_or(fallback))
662 }
663
664 #[cfg(not(feature = "temporal_track"))]
665 pub(crate) fn effective_temporal_timestamp(
666 &mut self,
667 _frame_id: crate::types::FrameId,
668 fallback: i64,
669 ) -> Result<i64> {
670 Ok(fallback)
671 }
672
673 pub fn get_memory_binding(&self) -> Option<&crate::types::MemoryBinding> {
678 self.toc.memory_binding.as_ref()
679 }
680
681 pub fn bind_memory(
690 &mut self,
691 binding: crate::types::MemoryBinding,
692 ticket: crate::types::Ticket,
693 ) -> Result<()> {
694 if let Some(existing) = self.get_memory_binding() {
696 if existing.memory_id != binding.memory_id {
697 return Err(MemvidError::MemoryAlreadyBound {
698 existing_memory_id: existing.memory_id,
699 existing_memory_name: existing.memory_name.clone(),
700 bound_at: existing.bound_at.to_rfc3339(),
701 });
702 }
703 }
704
705 self.apply_ticket(ticket)?;
707
708 self.toc.memory_binding = Some(binding);
710 self.dirty = true;
711
712 Ok(())
713 }
714
715 pub fn unbind_memory(&mut self) -> Result<()> {
719 self.toc.memory_binding = None;
720 self.toc.ticket_ref = crate::types::TicketRef {
722 issuer: "free-tier".into(),
723 seq_no: 1,
724 expires_in_secs: 0,
725 capacity_bytes: crate::types::Tier::Free.capacity_bytes(),
726 };
727 self.dirty = true;
728 Ok(())
729 }
730}
731
732pub(crate) fn read_toc(file: &mut File, header: &Header) -> Result<Toc> {
733 use crate::footer::{CommitFooter, FOOTER_SIZE};
734
735 let len = file.metadata()?.len();
736 if len < header.footer_offset {
737 return Err(MemvidError::InvalidToc {
738 reason: "footer offset beyond file length".into(),
739 });
740 }
741
742 file.seek(SeekFrom::Start(header.footer_offset))?;
744 let total_size = (len - header.footer_offset) as usize;
745
746 if total_size < FOOTER_SIZE {
747 return Err(MemvidError::InvalidToc {
748 reason: "region too small to contain footer".into(),
749 });
750 }
751
752 let mut buf = Vec::with_capacity(total_size);
753 file.read_to_end(&mut buf)?;
754
755 let footer_start = buf.len() - FOOTER_SIZE;
757 let footer_bytes = &buf[footer_start..];
758 let footer = CommitFooter::decode(footer_bytes).ok_or(MemvidError::InvalidToc {
759 reason: "failed to decode commit footer".into(),
760 })?;
761
762 let toc_bytes = &buf[..footer_start];
764 if toc_bytes.len() != footer.toc_len as usize {
765 return Err(MemvidError::InvalidToc {
766 reason: "toc length mismatch".into(),
767 });
768 }
769
770 verify_toc_prefix(toc_bytes)?;
771 let toc = Toc::decode(toc_bytes)?;
772 Ok(toc)
773}
774
775fn verify_toc_prefix(bytes: &[u8]) -> Result<()> {
776 const MAX_SEGMENTS: u64 = 1_000_000;
777 const MAX_FRAMES: u64 = 1_000_000;
778 const MIN_SEGMENT_META_BYTES: u64 = 32;
779 const MIN_FRAME_BYTES: u64 = 64;
780 let read_u64 = |range: std::ops::Range<usize>, context: &str| -> Result<u64> {
783 let slice = bytes.get(range).ok_or_else(|| MemvidError::InvalidToc {
784 reason: context.to_string().into(),
785 })?;
786 let array: [u8; 8] = slice.try_into().map_err(|_| MemvidError::InvalidToc {
787 reason: context.to_string().into(),
788 })?;
789 Ok(u64::from_le_bytes(array))
790 };
791
792 if bytes.len() < 24 {
793 return Err(MemvidError::InvalidToc {
794 reason: "toc trailer too small".into(),
795 });
796 }
797 let toc_version = read_u64(0..8, "toc version missing or truncated")?;
798 if toc_version > 32 {
799 return Err(MemvidError::InvalidToc {
800 reason: "toc version unreasonable".into(),
801 });
802 }
803 let segments_len = read_u64(8..16, "segment count missing or truncated")?;
804 if segments_len > MAX_SEGMENTS {
805 return Err(MemvidError::InvalidToc {
806 reason: "segment count unreasonable".into(),
807 });
808 }
809 let frames_len = read_u64(16..24, "frame count missing or truncated")?;
810 if frames_len > MAX_FRAMES {
811 return Err(MemvidError::InvalidToc {
812 reason: "frame count unreasonable".into(),
813 });
814 }
815 let required = segments_len
816 .saturating_mul(MIN_SEGMENT_META_BYTES)
817 .saturating_add(frames_len.saturating_mul(MIN_FRAME_BYTES));
818 if required > bytes.len() as u64 {
819 return Err(MemvidError::InvalidToc {
820 reason: "toc payload inconsistent with counts".into(),
821 });
822 }
823 Ok(())
824}
825
826fn ensure_non_overlapping_frames(toc: &Toc, file_len: u64) -> Result<()> {
828 let mut previous_end = 0u64;
829 for frame in toc
830 .frames
831 .iter()
832 .filter(|f| f.status == FrameStatus::Active)
833 {
834 let end = frame
835 .payload_offset
836 .checked_add(frame.payload_length)
837 .ok_or_else(|| MemvidError::InvalidToc {
838 reason: "frame payload offsets overflow".into(),
839 })?;
840 if end > file_len {
841 return Err(MemvidError::InvalidToc {
842 reason: "frame payload exceeds file length".into(),
843 });
844 }
845 if frame.payload_offset < previous_end {
846 return Err(MemvidError::InvalidToc {
847 reason: "frame payloads overlap or are out of order".into(),
848 });
849 }
850 previous_end = end;
851 }
852 Ok(())
853}
854
855pub(crate) fn recover_toc(file: &mut File, hint: Option<u64>) -> Result<(Toc, u64)> {
856 use crate::footer::find_last_valid_footer;
857
858 let len = file.metadata()?.len();
859 file.seek(SeekFrom::Start(0))?;
860 let mut data = Vec::new();
861 file.read_to_end(&mut data)?;
862 tracing::debug!(file_len = len, "attempting full scan toc recovery");
863
864 if let Some(footer_slice) = find_last_valid_footer(&data) {
866 tracing::debug!(
867 footer_offset = footer_slice.footer_offset,
868 toc_offset = footer_slice.toc_offset,
869 toc_len = footer_slice.toc_bytes.len(),
870 "found valid footer during recovery"
871 );
872 match Toc::decode(footer_slice.toc_bytes) {
874 Ok(toc) => {
875 return Ok((toc, footer_slice.toc_offset as u64));
876 }
877 Err(err) => {
878 tracing::warn!(
879 error = %err,
880 "footer-validated TOC failed to decode, falling back to scan"
881 );
882 }
883 }
884 }
885
886 let mut ranges = Vec::new();
888 if let Some(hint_offset) = hint {
889 let hint_idx = hint_offset.min(len) as usize;
890 ranges.push((hint_idx, data.len()));
891 if hint_idx > 0 {
892 ranges.push((0, hint_idx));
893 }
894 } else {
895 ranges.push((0, data.len()));
896 }
897
898 for (start, end) in ranges {
899 if let Some(found) = scan_range_for_toc(&data, start, end) {
900 return Ok(found);
901 }
902 }
903
904 Err(MemvidError::InvalidToc {
905 reason: "unable to recover table of contents from file trailer".into(),
906 })
907}
908
909fn scan_range_for_toc(data: &[u8], start: usize, end: usize) -> Option<(Toc, u64)> {
910 use crate::footer::find_last_valid_footer;
911
912 if start >= end || end > data.len() {
913 return None;
914 }
915 const MAX_TOC_BYTES: usize = 64 * 1024 * 1024;
916 const ZERO_CHECKSUM: [u8; 32] = [0u8; 32];
917 for offset in (start..end).rev() {
918 let slice = &data[offset..];
919 if slice.len() < 16 {
920 continue;
921 }
922 if slice.len() > MAX_TOC_BYTES {
923 continue;
924 }
925
926 if let Some(footer_slice) = find_last_valid_footer(slice) {
929 if verify_toc_prefix(footer_slice.toc_bytes).is_ok() {
931 let attempt = panic::catch_unwind(|| Toc::decode(footer_slice.toc_bytes));
932 if let Ok(Ok(toc)) = attempt {
933 let recovered_offset = (offset + footer_slice.toc_offset) as u64;
934 tracing::debug!(
935 recovered_offset,
936 recovered_frames = toc.frames.len(),
937 "recovered toc via scan with footer"
938 );
939 return Some((toc, recovered_offset));
940 }
941 }
942 continue;
943 }
944
945 if slice.len() < ZERO_CHECKSUM.len() {
947 continue;
948 }
949 let (body, stored_checksum) = slice.split_at(slice.len() - ZERO_CHECKSUM.len());
950 let mut hasher = Hasher::new();
951 hasher.update(body);
952 hasher.update(&ZERO_CHECKSUM);
953 if hasher.finalize().as_bytes() != stored_checksum {
954 continue;
955 }
956 if verify_toc_prefix(slice).is_err() {
957 continue;
958 }
959 let attempt = panic::catch_unwind(|| Toc::decode(slice));
960 if let Ok(Ok(toc)) = attempt {
961 let recovered_offset = offset as u64;
962 tracing::debug!(
963 recovered_offset,
964 recovered_frames = toc.frames.len(),
965 "recovered toc via scan"
966 );
967 return Some((toc, recovered_offset));
968 }
969 }
970 None
971}
972
973pub(crate) fn prepare_toc_bytes(toc: &mut Toc) -> Result<Vec<u8>> {
974 toc.toc_checksum = [0u8; 32];
975 let bytes = toc.encode()?;
976 let checksum = Toc::calculate_checksum(&bytes);
977 toc.toc_checksum = checksum;
978 toc.encode()
979}
980
981pub(crate) fn empty_toc() -> Toc {
982 Toc {
983 toc_version: 0,
984 segments: Vec::new(),
985 frames: Vec::new(),
986 indexes: IndexManifests::default(),
987 time_index: None,
988 temporal_track: None,
989 memories_track: None,
990 logic_mesh: None,
991 segment_catalog: SegmentCatalog::default(),
992 ticket_ref: TicketRef {
993 issuer: "free-tier".into(),
994 seq_no: 1,
995 expires_in_secs: 0,
996 capacity_bytes: Tier::Free.capacity_bytes(),
997 },
998 memory_binding: None,
999 merkle_root: [0u8; 32],
1000 toc_checksum: [0u8; 32],
1001 }
1002}
1003
1004pub(crate) fn compute_data_end(toc: &Toc, header: &Header) -> u64 {
1005 let wal_region_end = header.wal_offset + header.wal_size;
1007
1008 let active_frames_with_payload: Vec<_> = toc
1009 .frames
1010 .iter()
1011 .filter(|f| f.status == FrameStatus::Active && f.payload_length > 0)
1012 .collect();
1013
1014 tracing::info!(
1015 "compute_data_end: found {} active frames with payloads out of {} total frames",
1016 active_frames_with_payload.len(),
1017 toc.frames.len()
1018 );
1019
1020 for (idx, frame) in active_frames_with_payload.iter().enumerate().take(5) {
1021 tracing::info!(
1022 " frame[{}]: id={} offset={} length={} end={}",
1023 idx,
1024 frame.id,
1025 frame.payload_offset,
1026 frame.payload_length,
1027 frame.payload_offset + frame.payload_length
1028 );
1029 }
1030
1031 let payload_end = active_frames_with_payload
1032 .iter()
1033 .filter_map(|f| f.payload_offset.checked_add(f.payload_length))
1034 .max()
1035 .unwrap_or(wal_region_end);
1036
1037 let catalog = &toc.segment_catalog;
1039
1040 let lex_segment_end = catalog
1041 .lex_segments
1042 .iter()
1043 .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1044 .max()
1045 .unwrap_or(0);
1046
1047 let vec_segment_end = catalog
1048 .vec_segments
1049 .iter()
1050 .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1051 .max()
1052 .unwrap_or(0);
1053
1054 let time_segment_end = catalog
1055 .time_segments
1056 .iter()
1057 .filter_map(|s| s.common.bytes_offset.checked_add(s.common.bytes_length))
1058 .max()
1059 .unwrap_or(0);
1060
1061 let segment_end = lex_segment_end.max(vec_segment_end).max(time_segment_end);
1062
1063 let data_boundary = payload_end.max(segment_end).max(wal_region_end);
1064
1065 tracing::info!(
1066 "compute_data_end: wal_region_end={} payload_end={} segment_end={} footer_offset={} returning {}",
1067 wal_region_end,
1068 payload_end,
1069 segment_end,
1070 header.footer_offset,
1071 data_boundary
1072 );
1073
1074 data_boundary
1075}
1076
1077struct TailSnapshot {
1078 toc: Toc,
1079 footer_offset: u64,
1080 data_end: u64,
1081 generation: u64,
1082}
1083
1084fn locate_footer_window(mmap: &[u8]) -> Option<(FooterSlice<'_>, usize)> {
1085 const MAX_SEARCH_SIZE: usize = 16 * 1024 * 1024;
1086 if mmap.is_empty() {
1087 return None;
1088 }
1089 let mut window = MAX_SEARCH_SIZE.min(mmap.len());
1090 loop {
1091 let start = mmap.len() - window;
1092 if let Some(slice) = find_last_valid_footer(&mmap[start..]) {
1093 return Some((slice, start));
1094 }
1095 if window == mmap.len() {
1096 break;
1097 }
1098 window = (window * 2).min(mmap.len());
1099 }
1100 None
1101}
1102
1103fn load_tail_snapshot(file: &File) -> Result<TailSnapshot> {
1104 let mmap = unsafe { Mmap::map(file)? };
1106
1107 let (slice, offset_adjustment) =
1108 locate_footer_window(&mmap).ok_or_else(|| MemvidError::InvalidToc {
1109 reason: "no valid commit footer found".into(),
1110 })?;
1111 let toc = Toc::decode(slice.toc_bytes)?;
1112 toc.verify_checksum()?;
1113
1114 Ok(TailSnapshot {
1115 toc,
1116 footer_offset: slice.footer_offset as u64 + offset_adjustment as u64,
1117 data_end: slice.footer_offset as u64 + offset_adjustment as u64,
1119 generation: slice.footer.generation,
1120 })
1121}
1122
1123fn detect_generation(file: &File) -> Result<Option<u64>> {
1124 let mmap = unsafe { Mmap::map(file)? };
1126
1127 Ok(locate_footer_window(&mmap).map(|(slice, _)| slice.footer.generation))
1128}
1129
1130pub(crate) fn ensure_single_file(path: &Path) -> Result<()> {
1131 if let Some(parent) = path.parent() {
1132 let name = path
1133 .file_name()
1134 .and_then(|n| n.to_str())
1135 .unwrap_or_default();
1136 let forbidden = ["-wal", "-shm", "-lock", "-journal"];
1137 for suffix in forbidden {
1138 let candidate = parent.join(format!("{name}{suffix}"));
1139 if candidate.exists() {
1140 return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1141 }
1142 }
1143 let hidden_forbidden = [".wal", ".shm", ".lock", ".journal"];
1144 for suffix in hidden_forbidden {
1145 let candidate = parent.join(format!(".{name}{suffix}"));
1146 if candidate.exists() {
1147 return Err(MemvidError::AuxiliaryFileDetected { path: candidate });
1148 }
1149 }
1150 }
1151 Ok(())
1152}
1153
1154#[cfg(feature = "parallel_segments")]
1155fn manifest_wal_path(path: &Path) -> PathBuf {
1156 let mut wal_path = path.to_path_buf();
1157 wal_path.set_extension("manifest.wal");
1158 wal_path
1159}
1160
1161#[cfg(feature = "parallel_segments")]
1162pub(crate) fn cleanup_manifest_wal_public(path: &Path) {
1163 let wal_path = manifest_wal_path(path);
1164 if wal_path.exists() {
1165 let _ = std::fs::remove_file(&wal_path);
1166 }
1167}
1168
1169pub(crate) fn has_lex_index(toc: &Toc) -> bool {
1172 toc.segment_catalog.lex_enabled
1173 || toc.indexes.lex.is_some()
1174 || !toc.indexes.lex_segments.is_empty()
1175 || !toc.segment_catalog.tantivy_segments.is_empty()
1176}
1177
1178#[cfg(feature = "lex")]
1181pub(crate) fn lex_doc_count(
1182 toc: &Toc,
1183 lex_storage: &crate::search::EmbeddedLexStorage,
1184) -> Option<u64> {
1185 if let Some(manifest) = &toc.indexes.lex {
1187 if manifest.doc_count > 0 {
1188 return Some(manifest.doc_count);
1189 }
1190 }
1191
1192 let storage_count = lex_storage.doc_count();
1194 if storage_count > 0 {
1195 return Some(storage_count);
1196 }
1197
1198 None
1202}
1203
1204#[allow(dead_code)]
1207fn validate_segment_integrity(toc: &Toc, header: &Header, file_len: u64) -> Result<()> {
1208 let data_limit = header.footer_offset;
1209
1210 for (idx, seg) in toc.segment_catalog.tantivy_segments.iter().enumerate() {
1212 let offset = seg.common.bytes_offset;
1213 let length = seg.common.bytes_length;
1214
1215 if length == 0 {
1216 continue; }
1218
1219 let end = offset
1220 .checked_add(length)
1221 .ok_or_else(|| MemvidError::Doctor {
1222 reason: format!(
1223 "Tantivy segment {} offset overflow: {} + {}",
1224 idx, offset, length
1225 ),
1226 })?;
1227
1228 if end > file_len || end > data_limit {
1229 return Err(MemvidError::Doctor {
1230 reason: format!(
1231 "Tantivy segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1232 idx, offset, length, end, file_len, data_limit
1233 ),
1234 });
1235 }
1236 }
1237
1238 for (idx, seg) in toc.segment_catalog.time_segments.iter().enumerate() {
1240 let offset = seg.common.bytes_offset;
1241 let length = seg.common.bytes_length;
1242
1243 if length == 0 {
1244 continue;
1245 }
1246
1247 let end = offset
1248 .checked_add(length)
1249 .ok_or_else(|| MemvidError::Doctor {
1250 reason: format!(
1251 "Time segment {} offset overflow: {} + {}",
1252 idx, offset, length
1253 ),
1254 })?;
1255
1256 if end > file_len || end > data_limit {
1257 return Err(MemvidError::Doctor {
1258 reason: format!(
1259 "Time segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1260 idx, offset, length, end, file_len, data_limit
1261 ),
1262 });
1263 }
1264 }
1265
1266 for (idx, seg) in toc.segment_catalog.vec_segments.iter().enumerate() {
1268 let offset = seg.common.bytes_offset;
1269 let length = seg.common.bytes_length;
1270
1271 if length == 0 {
1272 continue;
1273 }
1274
1275 let end = offset
1276 .checked_add(length)
1277 .ok_or_else(|| MemvidError::Doctor {
1278 reason: format!(
1279 "Vec segment {} offset overflow: {} + {}",
1280 idx, offset, length
1281 ),
1282 })?;
1283
1284 if end > file_len || end > data_limit {
1285 return Err(MemvidError::Doctor {
1286 reason: format!(
1287 "Vec segment {} out of bounds: offset={}, length={}, end={}, file_len={}, data_limit={}",
1288 idx, offset, length, end, file_len, data_limit
1289 ),
1290 });
1291 }
1292 }
1293
1294 log::debug!("✓ Segment integrity validation passed");
1295 Ok(())
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301 use tempfile::tempdir;
1302
1303 #[test]
1304 fn toc_prefix_underflow_surfaces_reason() {
1305 let err = verify_toc_prefix(&[0u8; 8]).expect_err("should reject short toc prefix");
1306 match err {
1307 MemvidError::InvalidToc { reason } => {
1308 assert!(
1309 reason.contains("trailer too small"),
1310 "unexpected reason: {reason}"
1311 );
1312 }
1313 other => panic!("unexpected error: {other:?}"),
1314 }
1315 }
1316
1317 #[test]
1318 fn ensure_single_file_blocks_sidecars() {
1319 let dir = tempdir().expect("tmp");
1320 let path = dir.path().join("mem.mv2");
1321 std::fs::write(dir.path().join("mem.mv2-wal"), b"junk").expect("sidecar");
1322 let result = Memvid::create(&path);
1323 assert!(matches!(
1324 result,
1325 Err(MemvidError::AuxiliaryFileDetected { .. })
1326 ));
1327 }
1328}