1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53#[cfg(feature = "lex")]
54use crate::types::TantivySegmentDescriptor;
55use crate::types::{
56 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
57 SegmentCommon, TextChunkManifest, Tier,
58};
59#[cfg(feature = "parallel_segments")]
60use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
61#[cfg(feature = "temporal_track")]
62use crate::{
63 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
64 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
65 TemporalResolutionValue,
66};
67use crate::triplet::TripletExtractor;
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 pub fn new(mode: CommitMode) -> Self {
210 Self {
211 mode,
212 background: false,
213 }
214 }
215
216 pub fn background(mut self, background: bool) -> Self {
217 self.background = background;
218 self
219 }
220}
221
222fn default_reader_registry() -> &'static ReaderRegistry {
223 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
224 REGISTRY.get_or_init(ReaderRegistry::default)
225}
226
227fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
228 if detect_pdf_magic(magic) {
229 return Some(DocumentFormat::Pdf);
230 }
231
232 let mime = mime?.trim().to_ascii_lowercase();
233 match mime.as_str() {
234 "application/pdf" => Some(DocumentFormat::Pdf),
235 "text/plain" => Some(DocumentFormat::PlainText),
236 "text/markdown" => Some(DocumentFormat::Markdown),
237 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
238 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
239 Some(DocumentFormat::Docx)
240 }
241 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
242 Some(DocumentFormat::Xlsx)
243 }
244 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
245 Some(DocumentFormat::Pptx)
246 }
247 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
248 _ => None,
249 }
250}
251
252fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
253 let mut slice = match magic {
254 Some(slice) if !slice.is_empty() => slice,
255 _ => return false,
256 };
257 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
258 slice = &slice[3..];
259 }
260 while let Some((first, rest)) = slice.split_first() {
261 if first.is_ascii_whitespace() {
262 slice = rest;
263 } else {
264 break;
265 }
266 }
267 slice.starts_with(b"%PDF")
268}
269
270#[instrument(
271 target = "memvid::extract",
272 skip_all,
273 fields(mime = mime_hint, uri = uri)
274)]
275fn extract_via_registry(
276 bytes: &[u8],
277 mime_hint: Option<&str>,
278 uri: Option<&str>,
279) -> Result<ExtractedDocument> {
280 let registry = default_reader_registry();
281 let magic = bytes
282 .get(..MAGIC_SNIFF_BYTES)
283 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
284 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic))
285 .with_uri(uri)
286 .with_magic(magic);
287
288 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
289 let start = Instant::now();
290 match reader.extract(bytes, &hint) {
291 Ok(output) => {
292 return Ok(finalize_reader_output(output, start));
293 }
294 Err(err) => {
295 tracing::error!(
296 target = "memvid::extract",
297 reader = reader.name(),
298 error = %err,
299 "reader failed; falling back"
300 );
301 Some(format!("reader {} failed: {err}", reader.name()))
302 }
303 }
304 } else {
305 tracing::debug!(
306 target = "memvid::extract",
307 format = hint.format.map(|f| f.label()),
308 "no reader matched; using default extractor"
309 );
310 Some("no registered reader matched; using default extractor".to_string())
311 };
312
313 let start = Instant::now();
314 let mut output = PassthroughReader.extract(bytes, &hint)?;
315 if let Some(reason) = fallback_reason {
316 output.diagnostics.track_warning(reason);
317 }
318 Ok(finalize_reader_output(output, start))
319}
320
321fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
322 let elapsed = start.elapsed();
323 let ReaderOutput {
324 document,
325 reader_name,
326 diagnostics,
327 } = output;
328 log_reader_result(&reader_name, &diagnostics, elapsed);
329 document
330}
331
332fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
333 let duration_ms = diagnostics
334 .duration_ms
335 .unwrap_or_else(|| elapsed.as_millis() as u64);
336 let warnings = diagnostics.warnings.len();
337 let pages = diagnostics.pages_processed;
338
339 if warnings > 0 || diagnostics.fallback {
340 tracing::warn!(
341 target = "memvid::extract",
342 reader,
343 duration_ms,
344 pages,
345 warnings,
346 fallback = diagnostics.fallback,
347 "extraction completed with warnings"
348 );
349 for warning in diagnostics.warnings.iter() {
350 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
351 }
352 } else {
353 tracing::info!(
354 target = "memvid::extract",
355 reader,
356 duration_ms,
357 pages,
358 "extraction completed"
359 );
360 }
361}
362
363impl Memvid {
364 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
367 where
368 F: FnOnce(&mut Self) -> Result<()>,
369 {
370 self.file.sync_all()?;
371 let mut staging = CommitStaging::prepare(self.path())?;
372 staging.copy_from(&self.file)?;
373
374 let staging_handle = staging.clone_file()?;
375 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
376 let original_file = std::mem::replace(&mut self.file, staging_handle);
377 let original_wal = std::mem::replace(&mut self.wal, new_wal);
378 let original_header = self.header.clone();
379 let original_toc = self.toc.clone();
380 let original_data_end = self.data_end;
381 let original_generation = self.generation;
382 let original_dirty = self.dirty;
383 #[cfg(feature = "lex")]
384 let original_tantivy_dirty = self.tantivy_dirty;
385
386 let destination_path = self.path().to_path_buf();
387 let mut original_file = Some(original_file);
388 let mut original_wal = Some(original_wal);
389
390 match op(self) {
391 Ok(()) => {
392 self.file.sync_all()?;
393 match staging.commit() {
394 Ok(()) => {
395 drop(original_file.take());
396 drop(original_wal.take());
397 self.file = OpenOptions::new()
398 .read(true)
399 .write(true)
400 .open(&destination_path)?;
401 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
402 Ok(())
403 }
404 Err(commit_err) => {
405 if let Some(file) = original_file.take() {
406 self.file = file;
407 }
408 if let Some(wal) = original_wal.take() {
409 self.wal = wal;
410 }
411 self.header = original_header;
412 self.toc = original_toc;
413 self.data_end = original_data_end;
414 self.generation = original_generation;
415 self.dirty = original_dirty;
416 #[cfg(feature = "lex")]
417 {
418 self.tantivy_dirty = original_tantivy_dirty;
419 }
420 Err(commit_err.into())
421 }
422 }
423 }
424 Err(err) => {
425 let _ = staging.discard();
426 if let Some(file) = original_file.take() {
427 self.file = file;
428 }
429 if let Some(wal) = original_wal.take() {
430 self.wal = wal;
431 }
432 self.header = original_header;
433 self.toc = original_toc;
434 self.data_end = original_data_end;
435 self.generation = original_generation;
436 self.dirty = original_dirty;
437 #[cfg(feature = "lex")]
438 {
439 self.tantivy_dirty = original_tantivy_dirty;
440 }
441 Err(err)
442 }
443 }
444 }
445
446 pub(crate) fn catalog_data_end(&self) -> u64 {
447 let mut max_end = self.header.wal_offset + self.header.wal_size;
448
449 for descriptor in &self.toc.segment_catalog.lex_segments {
450 if descriptor.common.bytes_length == 0 {
451 continue;
452 }
453 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
454 }
455
456 for descriptor in &self.toc.segment_catalog.vec_segments {
457 if descriptor.common.bytes_length == 0 {
458 continue;
459 }
460 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
461 }
462
463 for descriptor in &self.toc.segment_catalog.time_segments {
464 if descriptor.common.bytes_length == 0 {
465 continue;
466 }
467 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
468 }
469
470 #[cfg(feature = "temporal_track")]
471 for descriptor in &self.toc.segment_catalog.temporal_segments {
472 if descriptor.common.bytes_length == 0 {
473 continue;
474 }
475 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
476 }
477
478 #[cfg(feature = "lex")]
479 for descriptor in &self.toc.segment_catalog.tantivy_segments {
480 if descriptor.common.bytes_length == 0 {
481 continue;
482 }
483 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
484 }
485
486 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
487 if manifest.bytes_length != 0 {
488 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
489 }
490 }
491
492 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
493 if manifest.bytes_length != 0 {
494 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
495 }
496 }
497
498 if let Some(manifest) = self.toc.time_index.as_ref() {
499 if manifest.bytes_length != 0 {
500 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
501 }
502 }
503
504 #[cfg(feature = "temporal_track")]
505 if let Some(track) = self.toc.temporal_track.as_ref() {
506 if track.bytes_length != 0 {
507 max_end = max_end.max(track.bytes_offset + track.bytes_length);
508 }
509 }
510
511 max_end
512 }
513
514 fn payload_region_end(&self) -> u64 {
515 let wal_region_end = self.header.wal_offset + self.header.wal_size;
516 let frames_with_payload: Vec<_> = self
517 .toc
518 .frames
519 .iter()
520 .filter(|frame| frame.payload_length != 0)
521 .collect();
522
523 tracing::info!(
524 "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
525 frames_with_payload.len(),
526 self.toc.frames.len(),
527 wal_region_end
528 );
529
530 for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
531 tracing::info!(
532 " frame[{}]: id={} offset={} length={} status={:?}",
533 idx,
534 frame.id,
535 frame.payload_offset,
536 frame.payload_length,
537 frame.status
538 );
539 }
540
541 let result = frames_with_payload
542 .iter()
543 .fold(wal_region_end, |max_end, frame| {
544 match frame.payload_offset.checked_add(frame.payload_length) {
545 Some(end) => max_end.max(end),
546 None => max_end,
547 }
548 });
549
550 tracing::info!("payload_region_end: returning {}", result);
551 result
552 }
553
554 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
555 loop {
556 match self.wal.append_entry(payload) {
557 Ok(seq) => return Ok(seq),
558 Err(MemvidError::CheckpointFailed { reason })
559 if reason == "embedded WAL region too small for entry" =>
560 {
561 let required = WAL_ENTRY_HEADER_SIZE
562 .saturating_add(payload.len() as u64)
563 .max(self.header.wal_size + 1);
564 self.grow_wal_region(required)?;
565 }
566 Err(err) => return Err(err),
567 }
568 }
569 }
570
571 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
572 let mut new_size = self.header.wal_size;
573 let mut target = required_entry_size;
574 if target == 0 {
575 target = self.header.wal_size;
576 }
577 while new_size <= target {
578 new_size = new_size
579 .checked_mul(2)
580 .ok_or_else(|| MemvidError::CheckpointFailed {
581 reason: "wal_size overflow".into(),
582 })?;
583 }
584 let delta = new_size - self.header.wal_size;
585 if delta == 0 {
586 return Ok(());
587 }
588
589 self.shift_data_for_wal_growth(delta)?;
590 self.header.wal_size = new_size;
591 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
592 self.data_end = self.data_end.saturating_add(delta);
593 self.adjust_offsets_after_wal_growth(delta);
594
595 let catalog_end = self.catalog_data_end();
596 self.header.footer_offset = catalog_end
597 .max(self.header.footer_offset)
598 .max(self.data_end);
599
600 self.rewrite_toc_footer()?;
601 self.header.toc_checksum = self.toc.toc_checksum;
602 crate::persist_header(&mut self.file, &self.header)?;
603 self.file.sync_all()?;
604 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
605 Ok(())
606 }
607
608 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
609 if delta == 0 {
610 return Ok(());
611 }
612 let original_len = self.file.metadata()?.len();
613 let data_start = self.header.wal_offset + self.header.wal_size;
614 self.file.set_len(original_len + delta)?;
615
616 let mut remaining = original_len.saturating_sub(data_start);
617 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
618 while remaining > 0 {
619 let chunk = min(remaining, buffer.len() as u64);
620 let src = data_start + remaining - chunk;
621 self.file.seek(SeekFrom::Start(src))?;
622 self.file.read_exact(&mut buffer[..chunk as usize])?;
623 let dst = src + delta;
624 self.file.seek(SeekFrom::Start(dst))?;
625 self.file.write_all(&buffer[..chunk as usize])?;
626 remaining -= chunk;
627 }
628
629 self.file.seek(SeekFrom::Start(data_start))?;
630 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
631 let mut remaining = delta;
632 while remaining > 0 {
633 let write = min(remaining, zero_buf.len() as u64);
634 self.file.write_all(&zero_buf[..write as usize])?;
635 remaining -= write;
636 }
637 Ok(())
638 }
639
640 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
641 if delta == 0 {
642 return;
643 }
644
645 for frame in &mut self.toc.frames {
646 if frame.payload_offset != 0 {
647 frame.payload_offset += delta;
648 }
649 }
650
651 for segment in &mut self.toc.segments {
652 if segment.bytes_offset != 0 {
653 segment.bytes_offset += delta;
654 }
655 }
656
657 if let Some(lex) = self.toc.indexes.lex.as_mut() {
658 if lex.bytes_offset != 0 {
659 lex.bytes_offset += delta;
660 }
661 }
662 for manifest in &mut self.toc.indexes.lex_segments {
663 if manifest.bytes_offset != 0 {
664 manifest.bytes_offset += delta;
665 }
666 }
667 if let Some(vec) = self.toc.indexes.vec.as_mut() {
668 if vec.bytes_offset != 0 {
669 vec.bytes_offset += delta;
670 }
671 }
672 if let Some(time_index) = self.toc.time_index.as_mut() {
673 if time_index.bytes_offset != 0 {
674 time_index.bytes_offset += delta;
675 }
676 }
677 #[cfg(feature = "temporal_track")]
678 if let Some(track) = self.toc.temporal_track.as_mut() {
679 if track.bytes_offset != 0 {
680 track.bytes_offset += delta;
681 }
682 }
683
684 let catalog = &mut self.toc.segment_catalog;
685 for descriptor in &mut catalog.lex_segments {
686 if descriptor.common.bytes_offset != 0 {
687 descriptor.common.bytes_offset += delta;
688 }
689 }
690 for descriptor in &mut catalog.vec_segments {
691 if descriptor.common.bytes_offset != 0 {
692 descriptor.common.bytes_offset += delta;
693 }
694 }
695 for descriptor in &mut catalog.time_segments {
696 if descriptor.common.bytes_offset != 0 {
697 descriptor.common.bytes_offset += delta;
698 }
699 }
700 #[cfg(feature = "temporal_track")]
701 for descriptor in &mut catalog.temporal_segments {
702 if descriptor.common.bytes_offset != 0 {
703 descriptor.common.bytes_offset += delta;
704 }
705 }
706 for descriptor in &mut catalog.tantivy_segments {
707 if descriptor.common.bytes_offset != 0 {
708 descriptor.common.bytes_offset += delta;
709 }
710 }
711
712 #[cfg(feature = "lex")]
713 if let Ok(mut storage) = self.lex_storage.write() {
714 storage.adjust_offsets(delta);
715 }
716 }
717 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
718 self.ensure_writable()?;
719 if options.background {
720 tracing::debug!("commit background flag ignored; running synchronously");
721 }
722 let mode = options.mode;
723 let records = self.wal.pending_records()?;
724 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
725 return Ok(());
726 }
727 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
728 }
729
730 pub fn commit(&mut self) -> Result<()> {
731 self.ensure_writable()?;
732 self.commit_with_options(CommitOptions::new(CommitMode::Full))
733 }
734
735 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
736 self.generation = self.generation.wrapping_add(1);
737
738 let delta = self.apply_records(records)?;
739 let mut indexes_rebuilt = false;
740
741 let clip_needs_persist = self.clip_index.as_ref().map_or(false, |idx| !idx.is_empty());
743
744 if !delta.is_empty() || clip_needs_persist {
745 tracing::debug!(
746 inserted_frames = delta.inserted_frames.len(),
747 inserted_embeddings = delta.inserted_embeddings.len(),
748 inserted_time_entries = delta.inserted_time_entries.len(),
749 clip_needs_persist = clip_needs_persist,
750 "commit applied delta"
751 );
752 self.rebuild_indexes(&delta.inserted_embeddings)?;
753 indexes_rebuilt = true;
754 }
755
756 if !indexes_rebuilt && self.tantivy_index_pending() {
757 self.flush_tantivy()?;
758 }
759
760 if !indexes_rebuilt && self.clip_enabled {
762 if let Some(ref clip_index) = self.clip_index {
763 if !clip_index.is_empty() {
764 self.persist_clip_index()?;
765 }
766 }
767 }
768
769 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
771 self.persist_memories_track()?;
772 }
773
774 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
776 self.persist_logic_mesh()?;
777 }
778
779 self.rewrite_toc_footer()?;
783 self.header.toc_checksum = self.toc.toc_checksum;
784 self.wal.record_checkpoint(&mut self.header)?;
785 self.header.toc_checksum = self.toc.toc_checksum;
786 crate::persist_header(&mut self.file, &self.header)?;
787 self.file.sync_all()?;
788 #[cfg(feature = "parallel_segments")]
789 if let Some(wal) = self.manifest_wal.as_mut() {
790 wal.flush()?;
791 wal.truncate()?;
792 }
793 self.pending_frame_inserts = 0;
794 self.dirty = false;
795 Ok(())
796 }
797
798 #[cfg(feature = "parallel_segments")]
799 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
800 self.ensure_writable()?;
801 if !self.dirty && !self.tantivy_index_pending() {
802 return Ok(());
803 }
804 let opts = opts.clone();
805 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
806 }
807
808 #[cfg(feature = "parallel_segments")]
809 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
810 if !self.dirty && !self.tantivy_index_pending() {
811 return Ok(());
812 }
813 let records = self.wal.pending_records()?;
814 let delta = self.apply_records(records)?;
815 self.generation = self.generation.wrapping_add(1);
816 let mut indexes_rebuilt = false;
817 if !delta.is_empty() {
818 tracing::info!(
819 inserted_frames = delta.inserted_frames.len(),
820 inserted_embeddings = delta.inserted_embeddings.len(),
821 inserted_time_entries = delta.inserted_time_entries.len(),
822 "parallel commit applied delta"
823 );
824 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
826 tracing::info!(
827 "parallel_commit: used_parallel={}, lex_enabled={}",
828 used_parallel,
829 self.lex_enabled
830 );
831 if used_parallel {
832 self.header.footer_offset = self.data_end;
835 indexes_rebuilt = true;
836
837 #[cfg(feature = "lex")]
840 if self.lex_enabled {
841 tracing::info!(
842 "parallel_commit: rebuilding Tantivy index, data_end={}, footer_offset={}, frames={}",
843 self.data_end,
844 self.header.footer_offset,
845 self.toc.frames.len()
846 );
847
848 self.toc.segment_catalog.tantivy_segments.clear();
850 self.toc.indexes.lex_segments.clear();
851
852 if let Ok(mut storage) = self.lex_storage.write() {
854 storage.clear();
855 storage.set_generation(0);
856 }
857
858 self.init_tantivy()?;
860 if let Some(mut engine) = self.tantivy.take() {
861 let rebuilt = self.rebuild_tantivy_engine(&mut engine)?;
862 let num_docs = engine.num_docs();
863 tracing::info!(
864 "parallel_commit: Tantivy rebuilt={}, num_docs={}",
865 rebuilt,
866 num_docs
867 );
868 self.tantivy = Some(engine);
869 self.tantivy_dirty = true;
870 } else {
871 tracing::warn!(
872 "parallel_commit: Tantivy engine is None after init_tantivy"
873 );
874 }
875 }
876
877 self.file.seek(SeekFrom::Start(self.data_end))?;
881 let mut time_entries: Vec<TimeIndexEntry> = self
882 .toc
883 .frames
884 .iter()
885 .filter(|frame| {
886 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
887 })
888 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
889 .collect();
890 let (ti_offset, ti_length, ti_checksum) =
891 time_index_append(&mut self.file, &mut time_entries)?;
892 self.toc.time_index = Some(TimeIndexManifest {
893 bytes_offset: ti_offset,
894 bytes_length: ti_length,
895 entry_count: time_entries.len() as u64,
896 checksum: ti_checksum,
897 });
898 self.data_end = ti_offset + ti_length;
900 self.header.footer_offset = self.data_end;
901 tracing::info!(
902 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
903 ti_offset,
904 ti_length,
905 time_entries.len()
906 );
907 } else {
908 self.rebuild_indexes(&delta.inserted_embeddings)?;
910 indexes_rebuilt = true;
911 }
912 }
913
914 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
916 self.flush_tantivy()?;
917 }
918
919 if self.clip_enabled {
921 if let Some(ref clip_index) = self.clip_index {
922 if !clip_index.is_empty() {
923 self.persist_clip_index()?;
924 }
925 }
926 }
927
928 if self.memories_track.card_count() > 0 {
930 self.persist_memories_track()?;
931 }
932
933 if !self.logic_mesh.is_empty() {
935 self.persist_logic_mesh()?;
936 }
937
938 self.rewrite_toc_footer()?;
941 self.header.toc_checksum = self.toc.toc_checksum;
942 self.wal.record_checkpoint(&mut self.header)?;
943 self.header.toc_checksum = self.toc.toc_checksum;
944 crate::persist_header(&mut self.file, &self.header)?;
945 self.file.sync_all()?;
946 if let Some(wal) = self.manifest_wal.as_mut() {
947 wal.flush()?;
948 wal.truncate()?;
949 }
950 self.pending_frame_inserts = 0;
951 self.dirty = false;
952 Ok(())
953 }
954
955 pub(crate) fn recover_wal(&mut self) -> Result<()> {
956 let records = self.wal.records_after(self.header.wal_sequence)?;
957 if records.is_empty() {
958 if self.tantivy_index_pending() {
959 self.flush_tantivy()?;
960 }
961 return Ok(());
962 }
963 let delta = self.apply_records(records)?;
964 if !delta.is_empty() {
965 tracing::debug!(
966 inserted_frames = delta.inserted_frames.len(),
967 inserted_embeddings = delta.inserted_embeddings.len(),
968 inserted_time_entries = delta.inserted_time_entries.len(),
969 "recover applied delta"
970 );
971 self.rebuild_indexes(&delta.inserted_embeddings)?;
972 } else if self.tantivy_index_pending() {
973 self.flush_tantivy()?;
974 }
975 self.wal.record_checkpoint(&mut self.header)?;
976 crate::persist_header(&mut self.file, &self.header)?;
977 if !delta.is_empty() {
978 } else if self.tantivy_index_pending() {
980 self.flush_tantivy()?;
981 crate::persist_header(&mut self.file, &self.header)?;
982 }
983 self.file.sync_all()?;
984 self.pending_frame_inserts = 0;
985 self.dirty = false;
986 Ok(())
987 }
988
989 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
990 let mut delta = IngestionDelta::default();
991 if records.is_empty() {
992 return Ok(delta);
993 }
994
995 let mut data_cursor = self.data_end;
1000 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1001
1002 if !records.is_empty() {
1003 self.file.seek(SeekFrom::Start(data_cursor))?;
1004 for record in records {
1005 let mut entry = match decode_wal_entry(&record.payload)? {
1006 WalEntry::Frame(entry) => entry,
1007 #[cfg(feature = "lex")]
1008 WalEntry::Lex(batch) => {
1009 self.apply_lex_wal(batch)?;
1010 continue;
1011 }
1012 };
1013
1014 match entry.op {
1015 FrameWalOp::Insert => {
1016 let frame_id = self.toc.frames.len() as u64;
1017
1018 let (
1019 payload_offset,
1020 payload_length,
1021 checksum_bytes,
1022 canonical_length_value,
1023 ) = if let Some(source_id) = entry.reuse_payload_from {
1024 if !entry.payload.is_empty() {
1025 return Err(MemvidError::InvalidFrame {
1026 frame_id: source_id,
1027 reason: "reused payload entry contained inline bytes",
1028 });
1029 }
1030 let source = self.toc.frames.get(source_id as usize).cloned().ok_or(
1031 MemvidError::InvalidFrame {
1032 frame_id: source_id,
1033 reason: "reused payload source missing",
1034 },
1035 )?;
1036 (
1037 source.payload_offset,
1038 source.payload_length,
1039 source.checksum,
1040 entry
1041 .canonical_length
1042 .or(source.canonical_length)
1043 .unwrap_or(source.payload_length),
1044 )
1045 } else {
1046 self.file.seek(SeekFrom::Start(data_cursor))?;
1047 self.file.write_all(&entry.payload)?;
1048 let checksum = hash(&entry.payload);
1049 let payload_length = entry.payload.len() as u64;
1050 let canonical_length =
1051 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1052 match entry.canonical_length {
1053 Some(len) => len,
1054 None => {
1055 let decoded = crate::decode_canonical_bytes(
1056 &entry.payload,
1057 CanonicalEncoding::Zstd,
1058 frame_id,
1059 )?;
1060 decoded.len() as u64
1061 }
1062 }
1063 } else {
1064 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1065 };
1066 let payload_offset = data_cursor;
1067 data_cursor += payload_length;
1068 (
1069 payload_offset,
1070 payload_length,
1071 *checksum.as_bytes(),
1072 canonical_length,
1073 )
1074 };
1075
1076 let uri = entry
1077 .uri
1078 .clone()
1079 .unwrap_or_else(|| crate::default_uri(frame_id));
1080 let title = entry
1081 .title
1082 .clone()
1083 .or_else(|| crate::infer_title_from_uri(&uri));
1084
1085 #[cfg(feature = "temporal_track")]
1086 let (anchor_ts, anchor_source) =
1087 self.determine_temporal_anchor(entry.timestamp);
1088
1089 let mut frame = Frame {
1090 id: frame_id,
1091 timestamp: entry.timestamp,
1092 anchor_ts: {
1093 #[cfg(feature = "temporal_track")]
1094 {
1095 Some(anchor_ts)
1096 }
1097 #[cfg(not(feature = "temporal_track"))]
1098 {
1099 None
1100 }
1101 },
1102 anchor_source: {
1103 #[cfg(feature = "temporal_track")]
1104 {
1105 Some(anchor_source)
1106 }
1107 #[cfg(not(feature = "temporal_track"))]
1108 {
1109 None
1110 }
1111 },
1112 kind: entry.kind.clone(),
1113 track: entry.track.clone(),
1114 payload_offset,
1115 payload_length,
1116 checksum: checksum_bytes,
1117 uri: Some(uri),
1118 title,
1119 canonical_encoding: entry.canonical_encoding,
1120 canonical_length: Some(canonical_length_value),
1121 metadata: entry.metadata.clone(),
1122 search_text: entry.search_text.clone(),
1123 tags: entry.tags.clone(),
1124 labels: entry.labels.clone(),
1125 extra_metadata: entry.extra_metadata.clone(),
1126 content_dates: entry.content_dates.clone(),
1127 chunk_manifest: entry.chunk_manifest.clone(),
1128 role: entry.role,
1129 parent_id: None,
1130 chunk_index: entry.chunk_index,
1131 chunk_count: entry.chunk_count,
1132 status: FrameStatus::Active,
1133 supersedes: entry.supersedes_frame_id,
1134 superseded_by: None,
1135 };
1136
1137 if let Some(parent_seq) = entry.parent_sequence {
1138 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1139 frame.parent_id = Some(*parent_frame_id);
1140 }
1141 }
1142
1143 #[cfg(feature = "lex")]
1144 let index_text = if self.tantivy.is_some() {
1145 if let Some(text) = entry.search_text.clone() {
1146 if text.trim().is_empty() {
1147 None
1148 } else {
1149 Some(text)
1150 }
1151 } else {
1152 Some(self.frame_content(&frame)?)
1153 }
1154 } else {
1155 None
1156 };
1157 #[cfg(feature = "lex")]
1158 if let (Some(engine), Some(ref text)) =
1159 (self.tantivy.as_mut(), index_text.as_ref())
1160 {
1161 engine.add_frame(&frame, text)?;
1162 self.tantivy_dirty = true;
1163 }
1164
1165 if let Some(embedding) = entry.embedding.take() {
1166 delta
1167 .inserted_embeddings
1168 .push((frame_id, embedding.clone()));
1169 }
1170
1171 if entry.role == FrameRole::Document {
1172 delta
1173 .inserted_time_entries
1174 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1175 #[cfg(feature = "temporal_track")]
1176 {
1177 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1178 frame_id,
1179 anchor_ts,
1180 anchor_source,
1181 ));
1182 delta.inserted_temporal_mentions.extend(
1183 Self::collect_temporal_mentions(
1184 entry.search_text.as_deref(),
1185 frame_id,
1186 anchor_ts,
1187 ),
1188 );
1189 }
1190 }
1191
1192 if let Some(predecessor) = frame.supersedes {
1193 self.mark_frame_superseded(predecessor, frame_id)?;
1194 }
1195
1196 self.toc.frames.push(frame);
1197 delta.inserted_frames.push(frame_id);
1198 sequence_to_frame.insert(record.sequence, frame_id);
1199 }
1200 FrameWalOp::Tombstone => {
1201 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1202 frame_id: 0,
1203 reason: "tombstone missing frame reference",
1204 })?;
1205 self.mark_frame_deleted(target)?;
1206 delta.mutated_frames = true;
1207 }
1208 }
1209 }
1210 self.data_end = self.data_end.max(data_cursor);
1211 }
1212
1213 Ok(delta)
1216 }
1217
1218 #[cfg(feature = "temporal_track")]
1219 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1220 (timestamp, AnchorSource::FrameTimestamp)
1221 }
1222
1223 #[cfg(feature = "temporal_track")]
1224 fn collect_temporal_mentions(
1225 text: Option<&str>,
1226 frame_id: FrameId,
1227 anchor_ts: i64,
1228 ) -> Vec<TemporalMention> {
1229 let text = match text {
1230 Some(value) if !value.trim().is_empty() => value,
1231 _ => return Vec::new(),
1232 };
1233
1234 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1235 Ok(ts) => ts,
1236 Err(_) => return Vec::new(),
1237 };
1238
1239 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1240 let normalizer = TemporalNormalizer::new(context);
1241 let mut spans: Vec<(usize, usize)> = Vec::new();
1242 let lower = text.to_ascii_lowercase();
1243
1244 for phrase in STATIC_TEMPORAL_PHRASES {
1245 let mut search_start = 0usize;
1246 while let Some(idx) = lower[search_start..].find(phrase) {
1247 let abs = search_start + idx;
1248 let end = abs + phrase.len();
1249 spans.push((abs, end));
1250 search_start = end;
1251 }
1252 }
1253
1254 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1255 let regex = NUMERIC_DATE.get_or_init(|| {
1256 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1257 });
1258 let regex = match regex {
1259 Ok(re) => re,
1260 Err(msg) => {
1261 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1262 return Vec::new();
1263 }
1264 };
1265 for mat in regex.find_iter(text) {
1266 spans.push((mat.start(), mat.end()));
1267 }
1268
1269 spans.sort_unstable();
1270 spans.dedup();
1271
1272 let mut mentions: Vec<TemporalMention> = Vec::new();
1273 for (start, end) in spans {
1274 if end > text.len() || start >= end {
1275 continue;
1276 }
1277 let raw = &text[start..end];
1278 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1279 if trimmed.is_empty() {
1280 continue;
1281 }
1282 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1283 let finish = offset + trimmed.len();
1284 match normalizer.resolve(trimmed) {
1285 Ok(resolution) => {
1286 mentions.extend(Self::resolution_to_mentions(
1287 resolution, frame_id, offset, finish,
1288 ));
1289 }
1290 Err(_) => continue,
1291 }
1292 }
1293
1294 mentions
1295 }
1296
1297 #[cfg(feature = "temporal_track")]
1298 fn resolution_to_mentions(
1299 resolution: TemporalResolution,
1300 frame_id: FrameId,
1301 byte_start: usize,
1302 byte_end: usize,
1303 ) -> Vec<TemporalMention> {
1304 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1305 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1306 let mut results = Vec::new();
1307
1308 let base_flags = Self::flags_from_resolution(&resolution.flags);
1309 match resolution.value {
1310 TemporalResolutionValue::Date(date) => {
1311 let ts = Self::date_to_timestamp(date);
1312 results.push(TemporalMention::new(
1313 ts,
1314 frame_id,
1315 byte_start,
1316 byte_len,
1317 TemporalMentionKind::Date,
1318 resolution.confidence,
1319 0,
1320 base_flags,
1321 ));
1322 }
1323 TemporalResolutionValue::DateTime(dt) => {
1324 let ts = dt.unix_timestamp();
1325 let tz_hint = dt.offset().whole_minutes() as i16;
1326 results.push(TemporalMention::new(
1327 ts,
1328 frame_id,
1329 byte_start,
1330 byte_len,
1331 TemporalMentionKind::DateTime,
1332 resolution.confidence,
1333 tz_hint,
1334 base_flags,
1335 ));
1336 }
1337 TemporalResolutionValue::DateRange { start, end } => {
1338 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1339 let start_ts = Self::date_to_timestamp(start);
1340 results.push(TemporalMention::new(
1341 start_ts,
1342 frame_id,
1343 byte_start,
1344 byte_len,
1345 TemporalMentionKind::RangeStart,
1346 resolution.confidence,
1347 0,
1348 flags,
1349 ));
1350 let end_ts = Self::date_to_timestamp(end);
1351 results.push(TemporalMention::new(
1352 end_ts,
1353 frame_id,
1354 byte_start,
1355 byte_len,
1356 TemporalMentionKind::RangeEnd,
1357 resolution.confidence,
1358 0,
1359 flags,
1360 ));
1361 }
1362 TemporalResolutionValue::DateTimeRange { start, end } => {
1363 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1364 results.push(TemporalMention::new(
1365 start.unix_timestamp(),
1366 frame_id,
1367 byte_start,
1368 byte_len,
1369 TemporalMentionKind::RangeStart,
1370 resolution.confidence,
1371 start.offset().whole_minutes() as i16,
1372 flags,
1373 ));
1374 results.push(TemporalMention::new(
1375 end.unix_timestamp(),
1376 frame_id,
1377 byte_start,
1378 byte_len,
1379 TemporalMentionKind::RangeEnd,
1380 resolution.confidence,
1381 end.offset().whole_minutes() as i16,
1382 flags,
1383 ));
1384 }
1385 TemporalResolutionValue::Month { year, month } => {
1386 let start_date = match Date::from_calendar_date(year, month, 1) {
1387 Ok(date) => date,
1388 Err(err) => {
1389 tracing::warn!(
1390 target = "memvid::temporal",
1391 %err,
1392 year,
1393 month = month as u8,
1394 "skipping invalid month resolution"
1395 );
1396 return results;
1398 }
1399 };
1400 let end_date = match Self::last_day_in_month(year, month) {
1401 Some(date) => date,
1402 None => {
1403 tracing::warn!(
1404 target = "memvid::temporal",
1405 year,
1406 month = month as u8,
1407 "skipping month resolution with invalid calendar range"
1408 );
1409 return results;
1410 }
1411 };
1412 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1413 results.push(TemporalMention::new(
1414 Self::date_to_timestamp(start_date),
1415 frame_id,
1416 byte_start,
1417 byte_len,
1418 TemporalMentionKind::RangeStart,
1419 resolution.confidence,
1420 0,
1421 flags,
1422 ));
1423 results.push(TemporalMention::new(
1424 Self::date_to_timestamp(end_date),
1425 frame_id,
1426 byte_start,
1427 byte_len,
1428 TemporalMentionKind::RangeEnd,
1429 resolution.confidence,
1430 0,
1431 flags,
1432 ));
1433 }
1434 }
1435
1436 results
1437 }
1438
1439 #[cfg(feature = "temporal_track")]
1440 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1441 let mut result = TemporalMentionFlags::empty();
1442 if flags
1443 .iter()
1444 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1445 {
1446 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1447 }
1448 if flags
1449 .iter()
1450 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1451 {
1452 result = result.set(TemporalMentionFlags::DERIVED, true);
1453 }
1454 result
1455 }
1456
1457 #[cfg(feature = "temporal_track")]
1458 fn date_to_timestamp(date: Date) -> i64 {
1459 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1460 .assume_offset(UtcOffset::UTC)
1461 .unix_timestamp()
1462 }
1463
1464 #[cfg(feature = "temporal_track")]
1465 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1466 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1467 while let Some(next) = date.next_day() {
1468 if next.month() == month {
1469 date = next;
1470 } else {
1471 break;
1472 }
1473 }
1474 Some(date)
1475 }
1476
1477 #[allow(dead_code)]
1478 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1479 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1480 return Ok(false);
1481 }
1482
1483 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1484 Some(artifact) => artifact,
1485 None => return Ok(false),
1486 };
1487
1488 let segment_id = self.toc.segment_catalog.next_segment_id;
1489 #[cfg(feature = "parallel_segments")]
1490 let span =
1491 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1492
1493 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1494 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1495 #[cfg(feature = "parallel_segments")]
1496 if let Some(span) = span {
1497 Self::decorate_segment_common(&mut descriptor.common, span);
1498 }
1499 #[cfg(feature = "parallel_segments")]
1500 let descriptor_for_manifest = descriptor.clone();
1501 self.toc.segment_catalog.lex_segments.push(descriptor);
1502 #[cfg(feature = "parallel_segments")]
1503 if let Err(err) = self.record_index_segment(
1504 SegmentKind::Lexical,
1505 descriptor_for_manifest.common,
1506 SegmentStats {
1507 doc_count: artifact.doc_count,
1508 vector_count: 0,
1509 time_entries: 0,
1510 bytes_uncompressed: artifact.bytes.len() as u64,
1511 build_micros: 0,
1512 },
1513 ) {
1514 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1515 }
1516 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1517 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1518 Ok(true)
1519 }
1520
1521 #[allow(dead_code)]
1522 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1523 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1524 return Ok(false);
1525 }
1526
1527 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1528 Some(artifact) => artifact,
1529 None => return Ok(false),
1530 };
1531
1532 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1533 if existing_dim != artifact.dimension {
1534 return Err(MemvidError::VecDimensionMismatch {
1535 expected: existing_dim,
1536 actual: artifact.dimension as usize,
1537 });
1538 }
1539 }
1540
1541 let segment_id = self.toc.segment_catalog.next_segment_id;
1542 #[cfg(feature = "parallel_segments")]
1543 #[cfg(feature = "parallel_segments")]
1544 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1545
1546 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1547 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1548 #[cfg(feature = "parallel_segments")]
1549 if let Some(span) = span {
1550 Self::decorate_segment_common(&mut descriptor.common, span);
1551 }
1552 #[cfg(feature = "parallel_segments")]
1553 let descriptor_for_manifest = descriptor.clone();
1554 self.toc.segment_catalog.vec_segments.push(descriptor);
1555 #[cfg(feature = "parallel_segments")]
1556 if let Err(err) = self.record_index_segment(
1557 SegmentKind::Vector,
1558 descriptor_for_manifest.common,
1559 SegmentStats {
1560 doc_count: 0,
1561 vector_count: artifact.vector_count,
1562 time_entries: 0,
1563 bytes_uncompressed: artifact.bytes_uncompressed,
1564 build_micros: 0,
1565 },
1566 ) {
1567 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1568 }
1569 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1570 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1571
1572 if self.toc.indexes.vec.is_none() {
1574 let empty_offset = self.data_end;
1575 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1576 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1577 self.toc.indexes.vec = Some(VecIndexManifest {
1578 vector_count: 0,
1579 dimension: 0,
1580 bytes_offset: empty_offset,
1581 bytes_length: 0,
1582 checksum: empty_checksum,
1583 compression_mode: self.vec_compression.clone(),
1584 });
1585 }
1586 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1587 if manifest.dimension == 0 {
1588 manifest.dimension = artifact.dimension;
1589 }
1590 if manifest.bytes_length == 0 {
1591 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1592 manifest.compression_mode = artifact.compression.clone();
1593 }
1594 }
1595
1596 self.vec_enabled = true;
1597 Ok(true)
1598 }
1599
1600 #[allow(dead_code)]
1601 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1602 if delta.inserted_time_entries.is_empty() {
1603 return Ok(false);
1604 }
1605
1606 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1607 Some(artifact) => artifact,
1608 None => return Ok(false),
1609 };
1610
1611 let segment_id = self.toc.segment_catalog.next_segment_id;
1612 #[cfg(feature = "parallel_segments")]
1613 #[cfg(feature = "parallel_segments")]
1614 let span = self.segment_span_from_iter(
1615 delta
1616 .inserted_time_entries
1617 .iter()
1618 .map(|entry| entry.frame_id),
1619 );
1620
1621 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1622 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1623 #[cfg(feature = "parallel_segments")]
1624 if let Some(span) = span {
1625 Self::decorate_segment_common(&mut descriptor.common, span);
1626 }
1627 #[cfg(feature = "parallel_segments")]
1628 let descriptor_for_manifest = descriptor.clone();
1629 self.toc.segment_catalog.time_segments.push(descriptor);
1630 #[cfg(feature = "parallel_segments")]
1631 if let Err(err) = self.record_index_segment(
1632 SegmentKind::Time,
1633 descriptor_for_manifest.common,
1634 SegmentStats {
1635 doc_count: 0,
1636 vector_count: 0,
1637 time_entries: artifact.entry_count,
1638 bytes_uncompressed: artifact.bytes.len() as u64,
1639 build_micros: 0,
1640 },
1641 ) {
1642 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1643 }
1644 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1645 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1646 Ok(true)
1647 }
1648
1649 #[cfg(feature = "temporal_track")]
1650 #[allow(dead_code)]
1651 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1652 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1653 {
1654 return Ok(false);
1655 }
1656
1657 debug_assert!(
1658 delta.inserted_temporal_mentions.len() < 1_000_000,
1659 "temporal delta mentions unexpectedly large: {}",
1660 delta.inserted_temporal_mentions.len()
1661 );
1662 debug_assert!(
1663 delta.inserted_temporal_anchors.len() < 1_000_000,
1664 "temporal delta anchors unexpectedly large: {}",
1665 delta.inserted_temporal_anchors.len()
1666 );
1667
1668 let artifact = match self.build_temporal_segment_from_records(
1669 &delta.inserted_temporal_mentions,
1670 &delta.inserted_temporal_anchors,
1671 )? {
1672 Some(artifact) => artifact,
1673 None => return Ok(false),
1674 };
1675
1676 let segment_id = self.toc.segment_catalog.next_segment_id;
1677 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1678 self.toc
1679 .segment_catalog
1680 .temporal_segments
1681 .push(descriptor.clone());
1682 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1683 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1684
1685 self.toc.temporal_track = Some(TemporalTrackManifest {
1686 bytes_offset: descriptor.common.bytes_offset,
1687 bytes_length: descriptor.common.bytes_length,
1688 entry_count: artifact.entry_count,
1689 anchor_count: artifact.anchor_count,
1690 checksum: artifact.checksum,
1691 flags: artifact.flags,
1692 });
1693
1694 self.clear_temporal_track_cache();
1695
1696 Ok(true)
1697 }
1698
1699 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1700 let frame =
1701 self.toc
1702 .frames
1703 .get_mut(frame_id as usize)
1704 .ok_or(MemvidError::InvalidFrame {
1705 frame_id,
1706 reason: "supersede target missing",
1707 })?;
1708 frame.status = FrameStatus::Superseded;
1709 frame.superseded_by = Some(successor_id);
1710 self.remove_frame_from_indexes(frame_id)
1711 }
1712
1713 pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1714 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1715 return Ok(());
1716 }
1717
1718 let payload_end = self.payload_region_end();
1719 self.data_end = payload_end;
1720 let safe_truncate_len = self.header.footer_offset.max(payload_end);
1723 if self.file.metadata()?.len() > safe_truncate_len {
1724 self.file.set_len(safe_truncate_len)?;
1725 }
1726 self.file.seek(SeekFrom::Start(payload_end))?;
1727
1728 self.toc.segment_catalog.lex_segments.clear();
1730 self.toc.segment_catalog.vec_segments.clear();
1731 self.toc.segment_catalog.time_segments.clear();
1732 #[cfg(feature = "temporal_track")]
1733 self.toc.segment_catalog.temporal_segments.clear();
1734 #[cfg(feature = "parallel_segments")]
1735 self.toc.segment_catalog.index_segments.clear();
1736 self.toc.segment_catalog.tantivy_segments.clear();
1738 self.toc.indexes.lex_segments.clear();
1740
1741 let mut time_entries: Vec<TimeIndexEntry> = self
1742 .toc
1743 .frames
1744 .iter()
1745 .filter(|frame| {
1746 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1747 })
1748 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1749 .collect();
1750 let (ti_offset, ti_length, ti_checksum) =
1751 time_index_append(&mut self.file, &mut time_entries)?;
1752 self.toc.time_index = Some(TimeIndexManifest {
1753 bytes_offset: ti_offset,
1754 bytes_length: ti_length,
1755 entry_count: time_entries.len() as u64,
1756 checksum: ti_checksum,
1757 });
1758
1759 let mut footer_offset = ti_offset + ti_length;
1760
1761 #[cfg(feature = "temporal_track")]
1762 {
1763 self.toc.temporal_track = None;
1764 self.toc.segment_catalog.temporal_segments.clear();
1765 self.clear_temporal_track_cache();
1766 }
1767
1768 if self.lex_enabled {
1769 #[cfg(feature = "lex")]
1770 {
1771 if let Ok(mut storage) = self.lex_storage.write() {
1773 storage.clear();
1774 storage.set_generation(0);
1775 }
1776
1777 self.init_tantivy()?;
1779
1780 if let Some(mut engine) = self.tantivy.take() {
1781 self.rebuild_tantivy_engine(&mut engine)?;
1782 self.tantivy = Some(engine);
1783 } else {
1784 return Err(MemvidError::InvalidToc {
1785 reason: "tantivy engine missing during doctor rebuild".into(),
1786 });
1787 }
1788
1789 self.lex_enabled = true;
1791
1792 self.tantivy_dirty = true;
1794
1795 self.data_end = footer_offset;
1797
1798 self.flush_tantivy()?;
1800
1801 footer_offset = self.header.footer_offset;
1803
1804 self.data_end = payload_end;
1806 }
1807 #[cfg(not(feature = "lex"))]
1808 {
1809 self.toc.indexes.lex = None;
1810 self.toc.indexes.lex_segments.clear();
1811 }
1812 } else {
1813 self.toc.indexes.lex = None;
1815 self.toc.indexes.lex_segments.clear();
1816 #[cfg(feature = "lex")]
1817 if let Ok(mut storage) = self.lex_storage.write() {
1818 storage.clear();
1819 }
1820 }
1821
1822 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
1823 let vec_offset = footer_offset;
1824 self.file.seek(SeekFrom::Start(vec_offset))?;
1825 self.file.write_all(&artifact.bytes)?;
1826 footer_offset += artifact.bytes.len() as u64;
1827 self.toc.indexes.vec = Some(VecIndexManifest {
1828 vector_count: artifact.vector_count,
1829 dimension: artifact.dimension,
1830 bytes_offset: vec_offset,
1831 bytes_length: artifact.bytes.len() as u64,
1832 checksum: artifact.checksum,
1833 compression_mode: self.vec_compression.clone(),
1834 });
1835 self.vec_index = Some(index);
1836 } else {
1837 if !self.vec_enabled {
1839 self.toc.indexes.vec = None;
1840 }
1841 self.vec_index = None;
1842 }
1843
1844 if self.clip_enabled {
1846 if let Some(ref clip_index) = self.clip_index {
1847 if !clip_index.is_empty() {
1848 let artifact = clip_index.encode()?;
1849 let clip_offset = footer_offset;
1850 self.file.seek(SeekFrom::Start(clip_offset))?;
1851 self.file.write_all(&artifact.bytes)?;
1852 footer_offset += artifact.bytes.len() as u64;
1853 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
1854 bytes_offset: clip_offset,
1855 bytes_length: artifact.bytes.len() as u64,
1856 vector_count: artifact.vector_count,
1857 dimension: artifact.dimension,
1858 checksum: artifact.checksum,
1859 model_name: crate::clip::default_model_info().name.to_string(),
1860 });
1861 tracing::info!(
1862 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
1863 artifact.vector_count,
1864 clip_offset
1865 );
1866 }
1867 }
1868 } else {
1869 self.toc.indexes.clip = None;
1870 }
1871
1872 if self.memories_track.card_count() > 0 {
1874 let memories_offset = footer_offset;
1875 let memories_bytes = self.memories_track.serialize()?;
1876 let memories_checksum = blake3::hash(&memories_bytes).into();
1877 self.file.seek(SeekFrom::Start(memories_offset))?;
1878 self.file.write_all(&memories_bytes)?;
1879 footer_offset += memories_bytes.len() as u64;
1880
1881 let stats = self.memories_track.stats();
1882 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
1883 bytes_offset: memories_offset,
1884 bytes_length: memories_bytes.len() as u64,
1885 card_count: stats.card_count as u64,
1886 entity_count: stats.entity_count as u64,
1887 checksum: memories_checksum,
1888 });
1889 } else {
1890 self.toc.memories_track = None;
1891 }
1892
1893 if !self.logic_mesh.is_empty() {
1895 let mesh_offset = footer_offset;
1896 let mesh_bytes = self.logic_mesh.serialize()?;
1897 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
1898 self.file.seek(SeekFrom::Start(mesh_offset))?;
1899 self.file.write_all(&mesh_bytes)?;
1900 footer_offset += mesh_bytes.len() as u64;
1901
1902 let stats = self.logic_mesh.stats();
1903 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
1904 bytes_offset: mesh_offset,
1905 bytes_length: mesh_bytes.len() as u64,
1906 node_count: stats.node_count as u64,
1907 edge_count: stats.edge_count as u64,
1908 checksum: mesh_checksum,
1909 });
1910 } else {
1911 self.toc.logic_mesh = None;
1912 }
1913
1914 tracing::info!(
1916 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
1917 ti_offset,
1918 ti_length,
1919 footer_offset,
1920 self.header.footer_offset
1921 );
1922
1923 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
1926
1927 if self.file.metadata()?.len() < self.header.footer_offset {
1929 self.file.set_len(self.header.footer_offset)?;
1930 }
1931
1932 self.rewrite_toc_footer()?;
1933 self.header.toc_checksum = self.toc.toc_checksum;
1934 crate::persist_header(&mut self.file, &self.header)?;
1935
1936 #[cfg(feature = "lex")]
1937 if self.lex_enabled {
1938 if let Some(ref engine) = self.tantivy {
1939 let doc_count = engine.num_docs();
1940 let active_frame_count = self
1941 .toc
1942 .frames
1943 .iter()
1944 .filter(|f| f.status == FrameStatus::Active)
1945 .count();
1946
1947 let text_indexable_count = self
1950 .toc
1951 .frames
1952 .iter()
1953 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
1954 .count();
1955
1956 if doc_count == 0 && text_indexable_count > 0 {
1959 return Err(MemvidError::Doctor {
1960 reason: format!(
1961 "Lex index rebuild failed: 0 documents indexed from {} text-indexable frames. \
1962 This indicates a critical failure in the rebuild process.",
1963 text_indexable_count
1964 ),
1965 });
1966 }
1967
1968 log::info!(
1970 "✓ Doctor lex index rebuild succeeded: {} docs from {} frames ({} text-indexable)",
1971 doc_count,
1972 active_frame_count,
1973 text_indexable_count
1974 );
1975 }
1976 }
1977
1978 Ok(())
1979 }
1980
1981 fn persist_memories_track(&mut self) -> Result<()> {
1986 if self.memories_track.card_count() == 0 {
1987 self.toc.memories_track = None;
1988 return Ok(());
1989 }
1990
1991 let memories_offset = self.header.footer_offset;
1993 let memories_bytes = self.memories_track.serialize()?;
1994 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
1995
1996 self.file.seek(SeekFrom::Start(memories_offset))?;
1997 self.file.write_all(&memories_bytes)?;
1998
1999 let stats = self.memories_track.stats();
2000 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2001 bytes_offset: memories_offset,
2002 bytes_length: memories_bytes.len() as u64,
2003 card_count: stats.card_count as u64,
2004 entity_count: stats.entity_count as u64,
2005 checksum: memories_checksum,
2006 });
2007
2008 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2010
2011 if self.file.metadata()?.len() < self.header.footer_offset {
2013 self.file.set_len(self.header.footer_offset)?;
2014 }
2015
2016 Ok(())
2017 }
2018
2019 fn persist_clip_index(&mut self) -> Result<()> {
2024 if !self.clip_enabled {
2025 self.toc.indexes.clip = None;
2026 return Ok(());
2027 }
2028
2029 let clip_index = match &self.clip_index {
2030 Some(idx) if !idx.is_empty() => idx,
2031 _ => {
2032 self.toc.indexes.clip = None;
2033 return Ok(());
2034 }
2035 };
2036
2037 let artifact = clip_index.encode()?;
2039
2040 let clip_offset = self.header.footer_offset;
2042 self.file.seek(SeekFrom::Start(clip_offset))?;
2043 self.file.write_all(&artifact.bytes)?;
2044
2045 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2046 bytes_offset: clip_offset,
2047 bytes_length: artifact.bytes.len() as u64,
2048 vector_count: artifact.vector_count,
2049 dimension: artifact.dimension,
2050 checksum: artifact.checksum,
2051 model_name: crate::clip::default_model_info().name.to_string(),
2052 });
2053
2054 tracing::info!(
2055 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2056 artifact.vector_count,
2057 clip_offset
2058 );
2059
2060 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2062
2063 if self.file.metadata()?.len() < self.header.footer_offset {
2065 self.file.set_len(self.header.footer_offset)?;
2066 }
2067
2068 Ok(())
2069 }
2070
2071 fn persist_logic_mesh(&mut self) -> Result<()> {
2076 if self.logic_mesh.is_empty() {
2077 self.toc.logic_mesh = None;
2078 return Ok(());
2079 }
2080
2081 let mesh_offset = self.header.footer_offset;
2083 let mesh_bytes = self.logic_mesh.serialize()?;
2084 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2085
2086 self.file.seek(SeekFrom::Start(mesh_offset))?;
2087 self.file.write_all(&mesh_bytes)?;
2088
2089 let stats = self.logic_mesh.stats();
2090 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2091 bytes_offset: mesh_offset,
2092 bytes_length: mesh_bytes.len() as u64,
2093 node_count: stats.node_count as u64,
2094 edge_count: stats.edge_count as u64,
2095 checksum: mesh_checksum,
2096 });
2097
2098 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2100
2101 if self.file.metadata()?.len() < self.header.footer_offset {
2103 self.file.set_len(self.header.footer_offset)?;
2104 }
2105
2106 Ok(())
2107 }
2108
2109 #[cfg(feature = "lex")]
2110 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2111 let LexWalBatch {
2112 generation,
2113 doc_count,
2114 checksum,
2115 segments,
2116 } = batch;
2117
2118 if let Some(mut storage) = self.lex_storage.write().ok() {
2119 storage.replace(doc_count, checksum, segments);
2120 storage.set_generation(generation);
2121 }
2122
2123 let result = self.persist_lex_manifest();
2124 result
2125 }
2126
2127 #[cfg(feature = "lex")]
2128 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2129 let payload = encode_to_vec(&WalEntry::Lex(batch.clone()), wal_config())?;
2130 self.append_wal_entry(&payload)?;
2131 Ok(())
2132 }
2133
2134 #[cfg(feature = "lex")]
2135 fn persist_lex_manifest(&mut self) -> Result<()> {
2136 let (index_manifest, segments) = if let Some(storage) = self.lex_storage.read().ok() {
2137 storage.to_manifest()
2138 } else {
2139 (None, Vec::new())
2140 };
2141
2142 if let Some(storage_manifest) = index_manifest {
2144 self.toc.indexes.lex = Some(storage_manifest);
2146 } else {
2147 self.toc.indexes.lex = None;
2150 }
2151
2152 self.toc.indexes.lex_segments = segments;
2153
2154 self.rewrite_toc_footer()?;
2158 self.header.toc_checksum = self.toc.toc_checksum;
2159 crate::persist_header(&mut self.file, &self.header)?;
2160 Ok(())
2161 }
2162
2163 #[cfg(feature = "lex")]
2164 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2165 let TantivySnapshot {
2166 doc_count,
2167 checksum,
2168 segments,
2169 } = snapshot;
2170
2171 let mut footer_offset = self.data_end;
2172 self.file.seek(SeekFrom::Start(footer_offset))?;
2173
2174 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2175 for segment in segments {
2176 let bytes_length = segment.bytes.len() as u64;
2177 self.file.write_all(&segment.bytes)?;
2178 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2180 path: segment.path,
2181 bytes_offset: footer_offset,
2182 bytes_length,
2183 checksum: segment.checksum,
2184 });
2185 footer_offset += bytes_length;
2186 }
2187 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2192
2193 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2194 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2195 Vec::with_capacity(embedded_segments.len());
2196 for segment in &embedded_segments {
2197 let descriptor = TantivySegmentDescriptor::from_common(
2198 SegmentCommon::new(
2199 next_segment_id,
2200 segment.bytes_offset,
2201 segment.bytes_length,
2202 segment.checksum,
2203 ),
2204 segment.path.clone(),
2205 );
2206 catalog_segments.push(descriptor);
2207 next_segment_id = next_segment_id.saturating_add(1);
2208 }
2209 if catalog_segments.is_empty() {
2210 self.toc.segment_catalog.tantivy_segments.clear();
2211 } else {
2212 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2213 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2214 }
2215 self.toc.segment_catalog.next_segment_id = next_segment_id;
2216
2217 let generation = self
2224 .lex_storage
2225 .write()
2226 .map_err(|_| MemvidError::Tantivy {
2227 reason: "embedded lex storage lock poisoned".into(),
2228 })
2229 .map(|mut storage| {
2230 storage.replace(doc_count, checksum, embedded_segments.clone());
2231 storage.generation()
2232 })?;
2233
2234 let batch = LexWalBatch {
2235 generation,
2236 doc_count,
2237 checksum,
2238 segments: embedded_segments.clone(),
2239 };
2240 self.append_lex_batch(&batch)?;
2241 self.persist_lex_manifest()?;
2242 self.header.toc_checksum = self.toc.toc_checksum;
2243 crate::persist_header(&mut self.file, &self.header)?;
2244 Ok(())
2245 }
2246
2247 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2248 let frame =
2249 self.toc
2250 .frames
2251 .get_mut(frame_id as usize)
2252 .ok_or(MemvidError::InvalidFrame {
2253 frame_id,
2254 reason: "delete target missing",
2255 })?;
2256 frame.status = FrameStatus::Deleted;
2257 frame.superseded_by = None;
2258 self.remove_frame_from_indexes(frame_id)
2259 }
2260
2261 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2262 #[cfg(feature = "lex")]
2263 if let Some(engine) = self.tantivy.as_mut() {
2264 engine.delete_frame(frame_id)?;
2265 self.tantivy_dirty = true;
2266 }
2267 if let Some(index) = self.lex_index.as_mut() {
2268 index.remove_document(frame_id);
2269 }
2270 if let Some(index) = self.vec_index.as_mut() {
2271 index.remove(frame_id);
2272 }
2273 Ok(())
2274 }
2275
2276 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2277 self.toc
2278 .frames
2279 .get(frame_id as usize)
2280 .map_or(false, |frame| frame.status == FrameStatus::Active)
2281 }
2282
2283 #[cfg(feature = "parallel_segments")]
2284 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2285 where
2286 I: IntoIterator<Item = FrameId>,
2287 {
2288 let mut iter = iter.into_iter();
2289 let first_id = iter.next()?;
2290 let first_frame = self.toc.frames.get(first_id as usize);
2291 let mut min_id = first_id;
2292 let mut max_id = first_id;
2293 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2294 let mut page_end = first_frame
2295 .and_then(|frame| frame.chunk_count)
2296 .map(|count| page_start + count.saturating_sub(1))
2297 .unwrap_or(page_start);
2298 for frame_id in iter {
2299 if frame_id < min_id {
2300 min_id = frame_id;
2301 }
2302 if frame_id > max_id {
2303 max_id = frame_id;
2304 }
2305 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2306 if let Some(idx) = frame.chunk_index {
2307 page_start = page_start.min(idx);
2308 if let Some(count) = frame.chunk_count {
2309 let end = idx + count.saturating_sub(1);
2310 page_end = page_end.max(end);
2311 } else {
2312 page_end = page_end.max(idx);
2313 }
2314 }
2315 }
2316 }
2317 Some(SegmentSpan {
2319 frame_start: min_id,
2320 frame_end: max_id,
2321 page_start,
2322 page_end,
2323 ..SegmentSpan::default()
2324 })
2325 }
2326
2327 #[cfg(feature = "parallel_segments")]
2328 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2329 common.span = Some(span);
2330 if common.codec_version == 0 {
2331 common.codec_version = 1;
2332 }
2333 }
2334
2335 #[cfg(feature = "parallel_segments")]
2336 pub(crate) fn record_index_segment(
2337 &mut self,
2338 kind: SegmentKind,
2339 common: SegmentCommon,
2340 stats: SegmentStats,
2341 ) -> Result<()> {
2342 let entry = IndexSegmentRef {
2343 kind,
2344 common,
2345 stats,
2346 };
2347 self.toc.segment_catalog.index_segments.push(entry.clone());
2348 if let Some(wal) = self.manifest_wal.as_mut() {
2349 wal.append_segments(&[entry])?;
2350 }
2351 Ok(())
2352 }
2353
2354 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2355 self.ensure_writable()?;
2356 if self.toc.ticket_ref.issuer == "free-tier" {
2357 return Ok(());
2358 }
2359 match self.tier() {
2360 Tier::Free => Ok(()),
2361 tier => {
2362 if self.toc.ticket_ref.issuer.trim().is_empty() {
2363 Err(MemvidError::TicketRequired { tier })
2364 } else {
2365 Ok(())
2366 }
2367 }
2368 }
2369 }
2370
2371 pub(crate) fn tier(&self) -> Tier {
2372 if self.header.wal_size >= WAL_SIZE_LARGE {
2373 Tier::Enterprise
2374 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2375 Tier::Dev
2376 } else {
2377 Tier::Free
2378 }
2379 }
2380
2381 pub(crate) fn capacity_limit(&self) -> u64 {
2382 if self.toc.ticket_ref.capacity_bytes != 0 {
2383 self.toc.ticket_ref.capacity_bytes
2384 } else {
2385 self.tier().capacity_bytes()
2386 }
2387 }
2388
2389 pub fn get_capacity(&self) -> u64 {
2394 self.capacity_limit()
2395 }
2396
2397 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2398 tracing::info!(
2399 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2400 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2401 time_segments = self.toc.segment_catalog.time_segments.len(),
2402 footer_offset = self.header.footer_offset,
2403 data_end = self.data_end,
2404 "rewrite_toc_footer: about to serialize TOC"
2405 );
2406 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2407 let footer_offset = self.header.footer_offset;
2408 self.file.seek(SeekFrom::Start(footer_offset))?;
2409 self.file.write_all(&toc_bytes)?;
2410 let footer = CommitFooter {
2411 toc_len: toc_bytes.len() as u64,
2412 toc_hash: *hash(&toc_bytes).as_bytes(),
2413 generation: self.generation,
2414 };
2415 let encoded_footer = footer.encode();
2416 self.file.write_all(&encoded_footer)?;
2417
2418 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2420 let min_len = self.header.wal_offset + self.header.wal_size;
2421 let final_len = new_len.max(min_len);
2422
2423 if new_len < min_len {
2424 tracing::warn!(
2425 file.new_len = new_len,
2426 file.min_len = min_len,
2427 file.final_len = final_len,
2428 "truncation would cut into WAL region, clamping to min_len"
2429 );
2430 }
2431
2432 self.file.set_len(final_len)?;
2433 self.file.sync_all()?;
2435 Ok(())
2436 }
2437}
2438
2439#[cfg(feature = "parallel_segments")]
2440impl Memvid {
2441 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2442 let chunks = self.collect_segment_chunks(delta)?;
2443 if chunks.is_empty() {
2444 return Ok(false);
2445 }
2446 let planner = SegmentPlanner::new(opts.clone());
2447 let plans = planner.plan_from_chunks(chunks);
2448 if plans.is_empty() {
2449 return Ok(false);
2450 }
2451 let worker_pool = SegmentWorkerPool::new(opts);
2452 let results = worker_pool.execute(plans)?;
2453 if results.is_empty() {
2454 return Ok(false);
2455 }
2456 self.append_parallel_segments(results)?;
2457 Ok(true)
2458 }
2459
2460 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2461 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2462 delta.inserted_embeddings.iter().cloned().collect();
2463 tracing::info!(
2464 inserted_frames = ?delta.inserted_frames,
2465 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2466 "collect_segment_chunks: comparing frame IDs"
2467 );
2468 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2469 for frame_id in &delta.inserted_frames {
2470 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2471 MemvidError::InvalidFrame {
2472 frame_id: *frame_id,
2473 reason: "frame id out of range while planning segments",
2474 },
2475 )?;
2476 let text = self.frame_content(&frame)?;
2477 if text.trim().is_empty() {
2478 continue;
2479 }
2480 let token_estimate = estimate_tokens(&text);
2481 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2482 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2483 let page_start = if frame.chunk_index.is_some() {
2484 chunk_index + 1
2485 } else {
2486 0
2487 };
2488 let page_end = if frame.chunk_index.is_some() {
2489 page_start
2490 } else {
2491 0
2492 };
2493 chunks.push(SegmentChunkPlan {
2494 text,
2495 frame_id: *frame_id,
2496 timestamp: frame.timestamp,
2497 chunk_index,
2498 chunk_count: chunk_count.max(1),
2499 token_estimate,
2500 token_start: 0,
2501 token_end: 0,
2502 page_start,
2503 page_end,
2504 embedding: embedding_map.remove(frame_id),
2505 });
2506 }
2507 Ok(chunks)
2508 }
2509}
2510
2511#[cfg(feature = "parallel_segments")]
2512fn estimate_tokens(text: &str) -> usize {
2513 text.split_whitespace().count().max(1)
2514}
2515
2516impl Memvid {
2517 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2518 let catalog_end = self.catalog_data_end();
2519 if catalog_end <= self.header.footer_offset {
2520 return Ok(false);
2521 }
2522 self.header.footer_offset = catalog_end;
2523 self.rewrite_toc_footer()?;
2524 self.header.toc_checksum = self.toc.toc_checksum;
2525 crate::persist_header(&mut self.file, &self.header)?;
2526 Ok(true)
2527 }
2528}
2529
2530impl Memvid {
2531 pub fn vacuum(&mut self) -> Result<()> {
2532 self.commit()?;
2533
2534 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2535 let frames: Vec<Frame> = self
2536 .toc
2537 .frames
2538 .iter()
2539 .filter(|frame| frame.status == FrameStatus::Active)
2540 .cloned()
2541 .collect();
2542 for frame in frames {
2543 let bytes = self.read_frame_payload_bytes(&frame)?;
2544 active_payloads.insert(frame.id, bytes);
2545 }
2546
2547 let mut cursor = self.header.wal_offset + self.header.wal_size;
2548 self.file.seek(SeekFrom::Start(cursor))?;
2549 for frame in &mut self.toc.frames {
2550 if frame.status == FrameStatus::Active {
2551 if let Some(bytes) = active_payloads.get(&frame.id) {
2552 self.file.write_all(bytes)?;
2553 frame.payload_offset = cursor;
2554 frame.payload_length = bytes.len() as u64;
2555 cursor += bytes.len() as u64;
2556 } else {
2557 frame.payload_offset = 0;
2558 frame.payload_length = 0;
2559 }
2560 } else {
2561 frame.payload_offset = 0;
2562 frame.payload_length = 0;
2563 }
2564 }
2565
2566 self.data_end = cursor;
2567
2568 self.toc.segments.clear();
2569 self.toc.indexes.lex_segments.clear();
2570 self.toc.segment_catalog.lex_segments.clear();
2571 self.toc.segment_catalog.vec_segments.clear();
2572 self.toc.segment_catalog.time_segments.clear();
2573 #[cfg(feature = "temporal_track")]
2574 {
2575 self.toc.temporal_track = None;
2576 self.toc.segment_catalog.temporal_segments.clear();
2577 }
2578 #[cfg(feature = "lex")]
2579 {
2580 self.toc.segment_catalog.tantivy_segments.clear();
2581 }
2582 #[cfg(feature = "parallel_segments")]
2583 {
2584 self.toc.segment_catalog.index_segments.clear();
2585 }
2586
2587 #[cfg(feature = "lex")]
2589 {
2590 self.tantivy = None;
2591 self.tantivy_dirty = false;
2592 }
2593
2594 self.rebuild_indexes(&[])?;
2595 self.file.sync_all()?;
2596 Ok(())
2597 }
2598
2599 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2617 plan_document_chunks(payload).map(|plan| plan.chunks)
2618 }
2619
2620 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2622 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2623 }
2624
2625 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2627 self.put_internal(Some(payload), None, None, None, options, None)
2628 }
2629
2630 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2632 self.put_internal(
2633 Some(payload),
2634 None,
2635 Some(embedding),
2636 None,
2637 PutOptions::default(),
2638 None,
2639 )
2640 }
2641
2642 pub fn put_with_embedding_and_options(
2643 &mut self,
2644 payload: &[u8],
2645 embedding: Vec<f32>,
2646 options: PutOptions,
2647 ) -> Result<u64> {
2648 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2649 }
2650
2651 pub fn put_with_chunk_embeddings(
2664 &mut self,
2665 payload: &[u8],
2666 parent_embedding: Option<Vec<f32>>,
2667 chunk_embeddings: Vec<Vec<f32>>,
2668 options: PutOptions,
2669 ) -> Result<u64> {
2670 self.put_internal(
2671 Some(payload),
2672 None,
2673 parent_embedding,
2674 Some(chunk_embeddings),
2675 options,
2676 None,
2677 )
2678 }
2679
2680 pub fn update_frame(
2682 &mut self,
2683 frame_id: FrameId,
2684 payload: Option<Vec<u8>>,
2685 mut options: PutOptions,
2686 embedding: Option<Vec<f32>>,
2687 ) -> Result<u64> {
2688 self.ensure_mutation_allowed()?;
2689 let existing = self.frame_by_id(frame_id)?;
2690 if existing.status != FrameStatus::Active {
2691 return Err(MemvidError::InvalidFrame {
2692 frame_id,
2693 reason: "frame is not active",
2694 });
2695 }
2696
2697 if options.timestamp.is_none() {
2698 options.timestamp = Some(existing.timestamp);
2699 }
2700 if options.track.is_none() {
2701 options.track = existing.track.clone();
2702 }
2703 if options.kind.is_none() {
2704 options.kind = existing.kind.clone();
2705 }
2706 if options.uri.is_none() {
2707 options.uri = existing.uri.clone();
2708 }
2709 if options.title.is_none() {
2710 options.title = existing.title.clone();
2711 }
2712 if options.metadata.is_none() {
2713 options.metadata = existing.metadata.clone();
2714 }
2715 if options.search_text.is_none() {
2716 options.search_text = existing.search_text.clone();
2717 }
2718 if options.tags.is_empty() {
2719 options.tags = existing.tags.clone();
2720 }
2721 if options.labels.is_empty() {
2722 options.labels = existing.labels.clone();
2723 }
2724 if options.extra_metadata.is_empty() {
2725 options.extra_metadata = existing.extra_metadata.clone();
2726 }
2727
2728 let reuse_frame = if payload.is_none() {
2729 options.auto_tag = false;
2730 options.extract_dates = false;
2731 Some(existing.clone())
2732 } else {
2733 None
2734 };
2735
2736 let effective_embedding = if let Some(explicit) = embedding {
2737 Some(explicit)
2738 } else if self.vec_enabled {
2739 self.frame_embedding(frame_id)?
2740 } else {
2741 None
2742 };
2743
2744 let payload_slice = payload.as_deref();
2745 let reuse_flag = reuse_frame.is_some();
2746 let replace_flag = payload_slice.is_some();
2747 let seq = self.put_internal(
2748 payload_slice,
2749 reuse_frame,
2750 effective_embedding,
2751 None, options,
2753 Some(frame_id),
2754 )?;
2755 info!(
2756 "frame_update frame_id={} seq={} reused_payload={} replaced_payload={}",
2757 frame_id, seq, reuse_flag, replace_flag
2758 );
2759 Ok(seq)
2760 }
2761
2762 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
2763 self.ensure_mutation_allowed()?;
2764 let frame = self.frame_by_id(frame_id)?;
2765 if frame.status != FrameStatus::Active {
2766 return Err(MemvidError::InvalidFrame {
2767 frame_id,
2768 reason: "frame is not active",
2769 });
2770 }
2771
2772 let mut tombstone = WalEntryData {
2773 timestamp: SystemTime::now()
2774 .duration_since(UNIX_EPOCH)
2775 .map(|d| d.as_secs() as i64)
2776 .unwrap_or(frame.timestamp),
2777 kind: None,
2778 track: None,
2779 payload: Vec::new(),
2780 embedding: None,
2781 uri: frame.uri.clone(),
2782 title: frame.title.clone(),
2783 canonical_encoding: frame.canonical_encoding,
2784 canonical_length: frame.canonical_length,
2785 metadata: None,
2786 search_text: None,
2787 tags: Vec::new(),
2788 labels: Vec::new(),
2789 extra_metadata: BTreeMap::new(),
2790 content_dates: Vec::new(),
2791 chunk_manifest: None,
2792 role: frame.role,
2793 parent_sequence: None,
2794 chunk_index: frame.chunk_index,
2795 chunk_count: frame.chunk_count,
2796 op: FrameWalOp::Tombstone,
2797 target_frame_id: Some(frame_id),
2798 supersedes_frame_id: None,
2799 reuse_payload_from: None,
2800 };
2801 tombstone.kind = frame.kind.clone();
2802 tombstone.track = frame.track.clone();
2803
2804 let payload_bytes = encode_to_vec(&WalEntry::Frame(tombstone), wal_config())?;
2805 let seq = self.append_wal_entry(&payload_bytes)?;
2806 self.dirty = true;
2807 if self.wal.should_checkpoint() {
2808 self.commit()?;
2809 }
2810 info!("frame_delete frame_id={} seq={}", frame_id, seq);
2811 Ok(seq)
2812 }
2813}
2814
2815impl Memvid {
2816 fn put_internal(
2817 &mut self,
2818 payload: Option<&[u8]>,
2819 reuse_frame: Option<Frame>,
2820 embedding: Option<Vec<f32>>,
2821 chunk_embeddings: Option<Vec<Vec<f32>>>,
2822 mut options: PutOptions,
2823 supersedes: Option<FrameId>,
2824 ) -> Result<u64> {
2825 self.ensure_mutation_allowed()?;
2826 if payload.is_some() && reuse_frame.is_some() {
2827 let frame_id = reuse_frame
2828 .as_ref()
2829 .map(|frame| frame.id)
2830 .unwrap_or_default();
2831 return Err(MemvidError::InvalidFrame {
2832 frame_id,
2833 reason: "cannot reuse payload when bytes are provided",
2834 });
2835 }
2836
2837 let incoming_dimension = {
2840 let mut dim: Option<u32> = None;
2841
2842 if let Some(ref vector) = embedding {
2843 if !vector.is_empty() {
2844 dim = Some(vector.len() as u32);
2845 }
2846 }
2847
2848 if let Some(ref vectors) = chunk_embeddings {
2849 for vector in vectors {
2850 if vector.is_empty() {
2851 continue;
2852 }
2853 let vec_dim = vector.len() as u32;
2854 match dim {
2855 None => dim = Some(vec_dim),
2856 Some(existing) if existing == vec_dim => {}
2857 Some(existing) => {
2858 return Err(MemvidError::VecDimensionMismatch {
2859 expected: existing,
2860 actual: vector.len(),
2861 });
2862 }
2863 }
2864 }
2865 }
2866
2867 dim
2868 };
2869
2870 if let Some(incoming_dimension) = incoming_dimension {
2871 if !self.vec_enabled {
2873 self.enable_vec()?;
2874 }
2875
2876 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
2877 if existing_dimension != incoming_dimension {
2878 return Err(MemvidError::VecDimensionMismatch {
2879 expected: existing_dimension,
2880 actual: incoming_dimension as usize,
2881 });
2882 }
2883 }
2884
2885 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
2887 if manifest.dimension == 0 {
2888 manifest.dimension = incoming_dimension;
2889 }
2890 }
2891 }
2892
2893 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
2894 let payload_tail = self.payload_region_end();
2895 let projected = if let Some(bytes) = payload {
2896 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
2897 let len = prepared.len();
2898 prepared_payload = Some((prepared, encoding, length));
2899 payload_tail.saturating_add(len as u64)
2900 } else if reuse_frame.is_some() {
2901 payload_tail
2902 } else {
2903 return Err(MemvidError::InvalidFrame {
2904 frame_id: 0,
2905 reason: "payload required for frame insertion",
2906 });
2907 };
2908
2909 let capacity_limit = self.capacity_limit();
2910 if projected > capacity_limit {
2911 let incoming_size = projected.saturating_sub(payload_tail);
2912 return Err(MemvidError::CapacityExceeded {
2913 current: payload_tail,
2914 limit: capacity_limit,
2915 required: incoming_size,
2916 });
2917 }
2918 let timestamp = options.timestamp.take().unwrap_or_else(|| {
2919 SystemTime::now()
2920 .duration_since(UNIX_EPOCH)
2921 .map(|d| d.as_secs() as i64)
2922 .unwrap_or(0)
2923 });
2924
2925 let mut _reuse_bytes: Option<Vec<u8>> = None;
2926 let payload_for_processing = if let Some(bytes) = payload {
2927 Some(bytes)
2928 } else if let Some(frame) = reuse_frame.as_ref() {
2929 let bytes = self.frame_canonical_bytes(frame)?;
2930 _reuse_bytes = Some(bytes);
2931 _reuse_bytes.as_deref()
2932 } else {
2933 None
2934 };
2935
2936 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
2938 (Some(bytes), None) => plan_document_chunks(bytes),
2939 _ => None,
2940 };
2941
2942 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
2945 if raw_chunk_plan.is_some() {
2946 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
2948 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
2949 (prepared, encoding, length, None)
2950 } else if let Some(bytes) = payload {
2951 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
2952 (prepared, encoding, length, None)
2953 } else if let Some(frame) = reuse_frame.as_ref() {
2954 (
2955 Vec::new(),
2956 frame.canonical_encoding,
2957 frame.canonical_length,
2958 Some(frame.id),
2959 )
2960 } else {
2961 return Err(MemvidError::InvalidFrame {
2962 frame_id: 0,
2963 reason: "payload required for frame insertion",
2964 });
2965 };
2966
2967 let mut chunk_plan = raw_chunk_plan;
2969
2970 let mut metadata = options.metadata.take();
2971 let mut search_text = options
2972 .search_text
2973 .take()
2974 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
2975 let mut tags = std::mem::take(&mut options.tags);
2976 let mut labels = std::mem::take(&mut options.labels);
2977 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
2978 let mut content_dates: Vec<String> = Vec::new();
2979
2980 let need_search_text = search_text
2981 .as_ref()
2982 .map_or(true, |text| text.trim().is_empty());
2983 let need_metadata = metadata.is_none();
2984 let run_extractor = need_search_text || need_metadata || options.auto_tag;
2985
2986 let mut extraction_error = None;
2987 let extracted = if run_extractor {
2988 if let Some(bytes) = payload_for_processing {
2989 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
2990 let uri_hint = options.uri.as_deref();
2991 match extract_via_registry(bytes, mime_hint, uri_hint) {
2992 Ok(doc) => Some(doc),
2993 Err(err) => {
2994 extraction_error = Some(err);
2995 None
2996 }
2997 }
2998 } else {
2999 None
3000 }
3001 } else {
3002 None
3003 };
3004
3005 if let Some(err) = extraction_error {
3006 return Err(err);
3007 }
3008
3009 if let Some(doc) = &extracted {
3010 if need_search_text {
3011 if let Some(text) = &doc.text {
3012 if let Some(normalized) =
3013 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3014 {
3015 search_text = Some(normalized);
3016 }
3017 }
3018 }
3019
3020 if chunk_plan.is_none() {
3023 if let Some(text) = &doc.text {
3024 chunk_plan = plan_text_chunks(text);
3025 }
3026 }
3027
3028 if let Some(mime) = doc.mime_type.as_ref() {
3029 match &mut metadata {
3030 Some(existing) => {
3031 if existing.mime.is_none() {
3032 existing.mime = Some(mime.clone());
3033 }
3034 }
3035 None => {
3036 let mut doc_meta = DocMetadata::default();
3037 doc_meta.mime = Some(mime.clone());
3038 metadata = Some(doc_meta);
3039 }
3040 }
3041 }
3042
3043 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3044 extra_metadata
3045 .entry("extractous_metadata".to_string())
3046 .or_insert(meta_json);
3047 }
3048 }
3049
3050 if options.auto_tag {
3051 if let Some(ref text) = search_text {
3052 if !text.trim().is_empty() {
3053 let result = AutoTagger::default().analyse(text, options.extract_dates);
3054 merge_unique(&mut tags, result.tags);
3055 merge_unique(&mut labels, result.labels);
3056 if options.extract_dates && content_dates.is_empty() {
3057 content_dates = result.content_dates;
3058 }
3059 }
3060 }
3061 }
3062
3063 if content_dates.is_empty() {
3064 if let Some(frame) = reuse_frame.as_ref() {
3065 content_dates = frame.content_dates.clone();
3066 }
3067 }
3068
3069 let metadata_ref = metadata.as_ref();
3070 let mut search_text = augment_search_text(
3071 search_text,
3072 options.uri.as_deref(),
3073 options.title.as_deref(),
3074 options.track.as_deref(),
3075 &tags,
3076 &labels,
3077 &extra_metadata,
3078 &content_dates,
3079 metadata_ref,
3080 );
3081 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3082 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3083 let mut parent_chunk_count: Option<u32> = None;
3084
3085 let kind_value = options.kind.take();
3086 let track_value = options.track.take();
3087 let uri_value = options.uri.take();
3088 let title_value = options.title.take();
3089 let should_extract_triplets = options.extract_triplets;
3090 let triplet_uri = uri_value.clone();
3092 let triplet_title = title_value.clone();
3093
3094 if let Some(plan) = chunk_plan.as_ref() {
3095 let chunk_total = plan.chunks.len() as u32;
3096 parent_chunk_manifest = Some(plan.manifest.clone());
3097 parent_chunk_count = Some(chunk_total);
3098
3099 if let Some(first_chunk) = plan.chunks.first() {
3100 if let Some(normalized) =
3101 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3102 {
3103 if !normalized.trim().is_empty() {
3104 search_text = Some(normalized);
3105 }
3106 }
3107 }
3108
3109 let chunk_tags = tags.clone();
3110 let chunk_labels = labels.clone();
3111 let chunk_metadata = metadata.clone();
3112 let chunk_extra_metadata = extra_metadata.clone();
3113 let chunk_content_dates = content_dates.clone();
3114
3115 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3116 let (chunk_payload, chunk_encoding, chunk_length) =
3117 prepare_canonical_payload(chunk_text.as_bytes())?;
3118 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3119 .map(|n| n.text)
3120 .filter(|text| !text.trim().is_empty());
3121
3122 let chunk_uri = uri_value
3123 .as_ref()
3124 .map(|uri| format!("{uri}#page-{}", idx + 1));
3125 let chunk_title = title_value
3126 .as_ref()
3127 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3128
3129 let chunk_embedding = chunk_embeddings
3131 .as_ref()
3132 .and_then(|embeddings| embeddings.get(idx).cloned());
3133
3134 chunk_entries.push(WalEntryData {
3135 timestamp,
3136 kind: kind_value.clone(),
3137 track: track_value.clone(),
3138 payload: chunk_payload,
3139 embedding: chunk_embedding,
3140 uri: chunk_uri,
3141 title: chunk_title,
3142 canonical_encoding: chunk_encoding,
3143 canonical_length: chunk_length,
3144 metadata: chunk_metadata.clone(),
3145 search_text: chunk_search_text,
3146 tags: chunk_tags.clone(),
3147 labels: chunk_labels.clone(),
3148 extra_metadata: chunk_extra_metadata.clone(),
3149 content_dates: chunk_content_dates.clone(),
3150 chunk_manifest: None,
3151 role: FrameRole::DocumentChunk,
3152 parent_sequence: None,
3153 chunk_index: Some(idx as u32),
3154 chunk_count: Some(chunk_total),
3155 op: FrameWalOp::Insert,
3156 target_frame_id: None,
3157 supersedes_frame_id: None,
3158 reuse_payload_from: None,
3159 });
3160 }
3161 }
3162
3163 let parent_uri = uri_value.clone();
3164 let parent_title = title_value.clone();
3165
3166 let parent_sequence = if let Some(parent_id) = options.parent_id {
3169 self.toc
3174 .frames
3175 .get(parent_id as usize)
3176 .map(|_| parent_id + 2) } else {
3178 None
3179 };
3180
3181 let triplet_text = search_text.clone();
3183
3184 let entry = WalEntryData {
3185 timestamp,
3186 kind: kind_value,
3187 track: track_value,
3188 payload: storage_payload,
3189 embedding,
3190 uri: parent_uri,
3191 title: parent_title,
3192 canonical_encoding,
3193 canonical_length,
3194 metadata,
3195 search_text,
3196 tags,
3197 labels,
3198 extra_metadata,
3199 content_dates,
3200 chunk_manifest: parent_chunk_manifest,
3201 role: options.role,
3202 parent_sequence,
3203 chunk_index: None,
3204 chunk_count: parent_chunk_count,
3205 op: FrameWalOp::Insert,
3206 target_frame_id: None,
3207 supersedes_frame_id: supersedes,
3208 reuse_payload_from,
3209 };
3210
3211 let parent_bytes = encode_to_vec(&WalEntry::Frame(entry), wal_config())?;
3212 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3213 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3214
3215 for mut chunk_entry in chunk_entries {
3216 chunk_entry.parent_sequence = Some(parent_seq);
3217 let chunk_bytes = encode_to_vec(&WalEntry::Frame(chunk_entry), wal_config())?;
3218 self.append_wal_entry(&chunk_bytes)?;
3219 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3220 }
3221
3222 self.dirty = true;
3223 if self.wal.should_checkpoint() {
3224 self.commit()?;
3225 }
3226
3227 #[cfg(feature = "replay")]
3229 if let Some(input_bytes) = payload {
3230 self.record_put_action(parent_seq, input_bytes);
3231 }
3232
3233 if should_extract_triplets {
3236 if let Some(ref text) = triplet_text {
3237 if !text.trim().is_empty() {
3238 let extractor = TripletExtractor::default();
3239 let frame_id = parent_seq as FrameId;
3240 let (cards, _stats) = extractor.extract(
3241 frame_id,
3242 text,
3243 triplet_uri.as_deref(),
3244 triplet_title.as_deref(),
3245 timestamp,
3246 );
3247
3248 if !cards.is_empty() {
3249 let card_ids = self.memories_track.add_cards(cards);
3251
3252 self.memories_track.record_enrichment(
3254 frame_id,
3255 "rules",
3256 "1.0.0",
3257 card_ids,
3258 );
3259 }
3260 }
3261 }
3262 }
3263
3264 Ok(parent_seq)
3265 }
3266}
3267
3268#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3269#[serde(rename_all = "snake_case")]
3270pub(crate) enum FrameWalOp {
3271 Insert,
3272 Tombstone,
3273}
3274
3275impl Default for FrameWalOp {
3276 fn default() -> Self {
3277 Self::Insert
3278 }
3279}
3280
3281#[derive(Debug, Serialize, Deserialize)]
3282enum WalEntry {
3283 Frame(WalEntryData),
3284 #[cfg(feature = "lex")]
3285 Lex(LexWalBatch),
3286}
3287
3288fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3289 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3290 return Ok(entry);
3291 }
3292 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3293 Ok(WalEntry::Frame(legacy))
3294}
3295
3296#[derive(Debug, Serialize, Deserialize)]
3297pub(crate) struct WalEntryData {
3298 pub(crate) timestamp: i64,
3299 pub(crate) kind: Option<String>,
3300 pub(crate) track: Option<String>,
3301 pub(crate) payload: Vec<u8>,
3302 pub(crate) embedding: Option<Vec<f32>>,
3303 #[serde(default)]
3304 pub(crate) uri: Option<String>,
3305 #[serde(default)]
3306 pub(crate) title: Option<String>,
3307 #[serde(default)]
3308 pub(crate) canonical_encoding: CanonicalEncoding,
3309 #[serde(default)]
3310 pub(crate) canonical_length: Option<u64>,
3311 #[serde(default)]
3312 pub(crate) metadata: Option<DocMetadata>,
3313 #[serde(default)]
3314 pub(crate) search_text: Option<String>,
3315 #[serde(default)]
3316 pub(crate) tags: Vec<String>,
3317 #[serde(default)]
3318 pub(crate) labels: Vec<String>,
3319 #[serde(default)]
3320 pub(crate) extra_metadata: BTreeMap<String, String>,
3321 #[serde(default)]
3322 pub(crate) content_dates: Vec<String>,
3323 #[serde(default)]
3324 pub(crate) chunk_manifest: Option<TextChunkManifest>,
3325 #[serde(default)]
3326 pub(crate) role: FrameRole,
3327 #[serde(default)]
3328 pub(crate) parent_sequence: Option<u64>,
3329 #[serde(default)]
3330 pub(crate) chunk_index: Option<u32>,
3331 #[serde(default)]
3332 pub(crate) chunk_count: Option<u32>,
3333 #[serde(default)]
3334 pub(crate) op: FrameWalOp,
3335 #[serde(default)]
3336 pub(crate) target_frame_id: Option<FrameId>,
3337 #[serde(default)]
3338 pub(crate) supersedes_frame_id: Option<FrameId>,
3339 #[serde(default)]
3340 pub(crate) reuse_payload_from: Option<FrameId>,
3341}
3342
3343pub(crate) fn prepare_canonical_payload(
3344 payload: &[u8],
3345) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3346 if std::str::from_utf8(payload).is_ok() {
3347 let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3348 Ok((
3349 compressed,
3350 CanonicalEncoding::Zstd,
3351 Some(payload.len() as u64),
3352 ))
3353 } else {
3354 Ok((
3355 payload.to_vec(),
3356 CanonicalEncoding::Plain,
3357 Some(payload.len() as u64),
3358 ))
3359 }
3360}
3361
3362pub(crate) fn augment_search_text(
3363 base: Option<String>,
3364 uri: Option<&str>,
3365 title: Option<&str>,
3366 track: Option<&str>,
3367 tags: &[String],
3368 labels: &[String],
3369 extra_metadata: &BTreeMap<String, String>,
3370 content_dates: &[String],
3371 metadata: Option<&DocMetadata>,
3372) -> Option<String> {
3373 let mut segments: Vec<String> = Vec::new();
3374 if let Some(text) = base {
3375 let trimmed = text.trim();
3376 if !trimmed.is_empty() {
3377 segments.push(trimmed.to_string());
3378 }
3379 }
3380
3381 if let Some(title) = title {
3382 if !title.trim().is_empty() {
3383 segments.push(format!("title: {}", title.trim()));
3384 }
3385 }
3386
3387 if let Some(uri) = uri {
3388 if !uri.trim().is_empty() {
3389 segments.push(format!("uri: {}", uri.trim()));
3390 }
3391 }
3392
3393 if let Some(track) = track {
3394 if !track.trim().is_empty() {
3395 segments.push(format!("track: {}", track.trim()));
3396 }
3397 }
3398
3399 if !tags.is_empty() {
3400 segments.push(format!("tags: {}", tags.join(" ")));
3401 }
3402
3403 if !labels.is_empty() {
3404 segments.push(format!("labels: {}", labels.join(" ")));
3405 }
3406
3407 if !extra_metadata.is_empty() {
3408 for (key, value) in extra_metadata {
3409 if value.trim().is_empty() {
3410 continue;
3411 }
3412 segments.push(format!("{}: {}", key, value));
3413 }
3414 }
3415
3416 if !content_dates.is_empty() {
3417 segments.push(format!("dates: {}", content_dates.join(" ")));
3418 }
3419
3420 if let Some(meta) = metadata {
3421 if let Ok(meta_json) = serde_json::to_string(meta) {
3422 segments.push(format!("metadata: {}", meta_json));
3423 }
3424 }
3425
3426 if segments.is_empty() {
3427 None
3428 } else {
3429 Some(segments.join("\n"))
3430 }
3431}
3432
3433pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3434 if additions.is_empty() {
3435 return;
3436 }
3437 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3438 for value in additions {
3439 let trimmed = value.trim();
3440 if trimmed.is_empty() {
3441 continue;
3442 }
3443 let candidate = trimmed.to_string();
3444 if seen.insert(candidate.clone()) {
3445 target.push(candidate);
3446 }
3447 }
3448}