1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58 PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 #[must_use]
210 pub fn new(mode: CommitMode) -> Self {
211 Self {
212 mode,
213 background: false,
214 }
215 }
216
217 #[must_use]
218 pub fn background(mut self, background: bool) -> Self {
219 self.background = background;
220 self
221 }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226 REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230 mime: Option<&str>,
231 magic: Option<&[u8]>,
232 uri: Option<&str>,
233) -> Option<DocumentFormat> {
234 if detect_pdf_magic(magic) {
236 return Some(DocumentFormat::Pdf);
237 }
238
239 if let Some(magic_bytes) = magic {
242 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243 if let Some(format) = infer_format_from_extension(uri) {
245 return Some(format);
246 }
247 }
248 }
249
250 if let Some(mime_str) = mime {
252 let mime_lower = mime_str.trim().to_ascii_lowercase();
253 let format = match mime_lower.as_str() {
254 "application/pdf" => Some(DocumentFormat::Pdf),
255 "text/plain" => Some(DocumentFormat::PlainText),
256 "text/markdown" => Some(DocumentFormat::Markdown),
257 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259 Some(DocumentFormat::Docx)
260 }
261 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262 Some(DocumentFormat::Xlsx)
263 }
264 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266 Some(DocumentFormat::Pptx)
267 }
268 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269 _ => None,
270 };
271 if format.is_some() {
272 return format;
273 }
274 }
275
276 infer_format_from_extension(uri)
278}
279
280fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282 let uri = uri?;
283 let path = std::path::Path::new(uri);
284 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285 match ext.as_str() {
286 "pdf" => Some(DocumentFormat::Pdf),
287 "docx" => Some(DocumentFormat::Docx),
288 "xlsx" => Some(DocumentFormat::Xlsx),
289 "xls" => Some(DocumentFormat::Xls),
290 "pptx" => Some(DocumentFormat::Pptx),
291 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294 | "sql" => Some(DocumentFormat::PlainText),
295 "md" | "markdown" => Some(DocumentFormat::Markdown),
296 "html" | "htm" => Some(DocumentFormat::Html),
297 _ => None,
298 }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302 let mut slice = match magic {
303 Some(slice) if !slice.is_empty() => slice,
304 _ => return false,
305 };
306 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307 slice = &slice[3..];
308 }
309 while let Some((first, rest)) = slice.split_first() {
310 if first.is_ascii_whitespace() {
311 slice = rest;
312 } else {
313 break;
314 }
315 }
316 slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320 target = "memvid::extract",
321 skip_all,
322 fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325 bytes: &[u8],
326 mime_hint: Option<&str>,
327 uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329 let registry = default_reader_registry();
330 let magic = bytes
331 .get(..MAGIC_SNIFF_BYTES)
332 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334 .with_uri(uri)
335 .with_magic(magic);
336
337 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338 let start = Instant::now();
339 match reader.extract(bytes, &hint) {
340 Ok(output) => {
341 return Ok(finalize_reader_output(output, start));
342 }
343 Err(err) => {
344 tracing::error!(
345 target = "memvid::extract",
346 reader = reader.name(),
347 error = %err,
348 "reader failed; falling back"
349 );
350 Some(format!("reader {} failed: {err}", reader.name()))
351 }
352 }
353 } else {
354 tracing::debug!(
355 target = "memvid::extract",
356 format = hint.format.map(super::super::reader::DocumentFormat::label),
357 "no reader matched; using default extractor"
358 );
359 Some("no registered reader matched; using default extractor".to_string())
360 };
361
362 let start = Instant::now();
363 let mut output = PassthroughReader.extract(bytes, &hint)?;
364 if let Some(reason) = fallback_reason {
365 output.diagnostics.track_warning(reason);
366 }
367 Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371 let elapsed = start.elapsed();
372 let ReaderOutput {
373 document,
374 reader_name,
375 diagnostics,
376 } = output;
377 log_reader_result(&reader_name, &diagnostics, elapsed);
378 document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382 let duration_ms = diagnostics
383 .duration_ms
384 .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385 let warnings = diagnostics.warnings.len();
386 let pages = diagnostics.pages_processed;
387
388 if warnings > 0 || diagnostics.fallback {
389 tracing::warn!(
390 target = "memvid::extract",
391 reader,
392 duration_ms,
393 pages,
394 warnings,
395 fallback = diagnostics.fallback,
396 "extraction completed with warnings"
397 );
398 for warning in &diagnostics.warnings {
399 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400 }
401 } else {
402 tracing::info!(
403 target = "memvid::extract",
404 reader,
405 duration_ms,
406 pages,
407 "extraction completed"
408 );
409 }
410}
411
412impl Memvid {
413 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416 where
417 F: FnOnce(&mut Self) -> Result<()>,
418 {
419 self.file.sync_all()?;
420 let mut staging = CommitStaging::prepare(self.path())?;
421 staging.copy_from(&self.file)?;
422
423 let staging_handle = staging.clone_file()?;
424 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425 let original_file = std::mem::replace(&mut self.file, staging_handle);
426 let original_wal = std::mem::replace(&mut self.wal, new_wal);
427 let original_header = self.header.clone();
428 let original_toc = self.toc.clone();
429 let original_data_end = self.data_end;
430 let original_generation = self.generation;
431 let original_dirty = self.dirty;
432 #[cfg(feature = "lex")]
433 let original_tantivy_dirty = self.tantivy_dirty;
434
435 let destination_path = self.path().to_path_buf();
436 let mut original_file = Some(original_file);
437 let mut original_wal = Some(original_wal);
438
439 match op(self) {
440 Ok(()) => {
441 self.file.sync_all()?;
442 match staging.commit() {
443 Ok(()) => {
444 drop(original_file.take());
445 drop(original_wal.take());
446 self.file = OpenOptions::new()
447 .read(true)
448 .write(true)
449 .open(&destination_path)?;
450 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451 Ok(())
452 }
453 Err(commit_err) => {
454 if let Some(file) = original_file.take() {
455 self.file = file;
456 }
457 if let Some(wal) = original_wal.take() {
458 self.wal = wal;
459 }
460 self.header = original_header;
461 self.toc = original_toc;
462 self.data_end = original_data_end;
463 self.generation = original_generation;
464 self.dirty = original_dirty;
465 #[cfg(feature = "lex")]
466 {
467 self.tantivy_dirty = original_tantivy_dirty;
468 }
469 Err(commit_err)
470 }
471 }
472 }
473 Err(err) => {
474 let _ = staging.discard();
475 if let Some(file) = original_file.take() {
476 self.file = file;
477 }
478 if let Some(wal) = original_wal.take() {
479 self.wal = wal;
480 }
481 self.header = original_header;
482 self.toc = original_toc;
483 self.data_end = original_data_end;
484 self.generation = original_generation;
485 self.dirty = original_dirty;
486 #[cfg(feature = "lex")]
487 {
488 self.tantivy_dirty = original_tantivy_dirty;
489 }
490 Err(err)
491 }
492 }
493 }
494
495 pub(crate) fn catalog_data_end(&self) -> u64 {
496 let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498 for descriptor in &self.toc.segment_catalog.lex_segments {
499 if descriptor.common.bytes_length == 0 {
500 continue;
501 }
502 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503 }
504
505 for descriptor in &self.toc.segment_catalog.vec_segments {
506 if descriptor.common.bytes_length == 0 {
507 continue;
508 }
509 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510 }
511
512 for descriptor in &self.toc.segment_catalog.time_segments {
513 if descriptor.common.bytes_length == 0 {
514 continue;
515 }
516 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517 }
518
519 #[cfg(feature = "temporal_track")]
520 for descriptor in &self.toc.segment_catalog.temporal_segments {
521 if descriptor.common.bytes_length == 0 {
522 continue;
523 }
524 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525 }
526
527 #[cfg(feature = "lex")]
528 for descriptor in &self.toc.segment_catalog.tantivy_segments {
529 if descriptor.common.bytes_length == 0 {
530 continue;
531 }
532 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533 }
534
535 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536 if manifest.bytes_length != 0 {
537 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538 }
539 }
540
541 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542 if manifest.bytes_length != 0 {
543 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544 }
545 }
546
547 if let Some(manifest) = self.toc.time_index.as_ref() {
548 if manifest.bytes_length != 0 {
549 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550 }
551 }
552
553 #[cfg(feature = "temporal_track")]
554 if let Some(track) = self.toc.temporal_track.as_ref() {
555 if track.bytes_length != 0 {
556 max_end = max_end.max(track.bytes_offset + track.bytes_length);
557 }
558 }
559
560 max_end
561 }
562
563 fn payload_region_end(&self) -> u64 {
564 self.cached_payload_end
565 }
566
567 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
568 loop {
569 match self.wal.append_entry(payload) {
570 Ok(seq) => return Ok(seq),
571 Err(MemvidError::CheckpointFailed { reason })
572 if reason == "embedded WAL region too small for entry"
573 || reason == "embedded WAL region full" =>
574 {
575 let required = WAL_ENTRY_HEADER_SIZE
578 .saturating_add(payload.len() as u64)
579 .max(self.header.wal_size + 1);
580 self.grow_wal_region(required)?;
581 }
582 Err(err) => return Err(err),
583 }
584 }
585 }
586
587 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
588 let mut new_size = self.header.wal_size;
589 let mut target = required_entry_size;
590 if target == 0 {
591 target = self.header.wal_size;
592 }
593 while new_size <= target {
594 new_size = new_size
595 .checked_mul(2)
596 .ok_or_else(|| MemvidError::CheckpointFailed {
597 reason: "wal_size overflow".into(),
598 })?;
599 }
600 let delta = new_size - self.header.wal_size;
601 if delta == 0 {
602 return Ok(());
603 }
604
605 self.shift_data_for_wal_growth(delta)?;
606 self.header.wal_size = new_size;
607 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
608 self.data_end = self.data_end.saturating_add(delta);
609 self.adjust_offsets_after_wal_growth(delta);
610
611 let catalog_end = self.catalog_data_end();
612 self.header.footer_offset = catalog_end
613 .max(self.header.footer_offset)
614 .max(self.data_end);
615
616 self.rewrite_toc_footer()?;
617 self.header.toc_checksum = self.toc.toc_checksum;
618 crate::persist_header(&mut self.file, &self.header)?;
619 self.file.sync_all()?;
620 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
621 Ok(())
622 }
623
624 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
625 if delta == 0 {
626 return Ok(());
627 }
628 let original_len = self.file.metadata()?.len();
629 let data_start = self.header.wal_offset + self.header.wal_size;
630 self.file.set_len(original_len + delta)?;
631
632 let mut remaining = original_len.saturating_sub(data_start);
633 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
634 while remaining > 0 {
635 let chunk = min(remaining, buffer.len() as u64);
636 let src = data_start + remaining - chunk;
637 self.file.seek(SeekFrom::Start(src))?;
638 #[allow(clippy::cast_possible_truncation)]
639 self.file.read_exact(&mut buffer[..chunk as usize])?;
640 let dst = src + delta;
641 self.file.seek(SeekFrom::Start(dst))?;
642 #[allow(clippy::cast_possible_truncation)]
643 self.file.write_all(&buffer[..chunk as usize])?;
644 remaining -= chunk;
645 }
646
647 self.file.seek(SeekFrom::Start(data_start))?;
648 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
649 let mut remaining = delta;
650 while remaining > 0 {
651 let write = min(remaining, zero_buf.len() as u64);
652 #[allow(clippy::cast_possible_truncation)]
653 self.file.write_all(&zero_buf[..write as usize])?;
654 remaining -= write;
655 }
656 Ok(())
657 }
658
659 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
660 if delta == 0 {
661 return;
662 }
663
664 for frame in &mut self.toc.frames {
665 if frame.payload_offset != 0 {
666 frame.payload_offset += delta;
667 }
668 }
669
670 for segment in &mut self.toc.segments {
671 if segment.bytes_offset != 0 {
672 segment.bytes_offset += delta;
673 }
674 }
675
676 if let Some(lex) = self.toc.indexes.lex.as_mut() {
677 if lex.bytes_offset != 0 {
678 lex.bytes_offset += delta;
679 }
680 }
681 for manifest in &mut self.toc.indexes.lex_segments {
682 if manifest.bytes_offset != 0 {
683 manifest.bytes_offset += delta;
684 }
685 }
686 if let Some(vec) = self.toc.indexes.vec.as_mut() {
687 if vec.bytes_offset != 0 {
688 vec.bytes_offset += delta;
689 }
690 }
691 if let Some(time_index) = self.toc.time_index.as_mut() {
692 if time_index.bytes_offset != 0 {
693 time_index.bytes_offset += delta;
694 }
695 }
696 #[cfg(feature = "temporal_track")]
697 if let Some(track) = self.toc.temporal_track.as_mut() {
698 if track.bytes_offset != 0 {
699 track.bytes_offset += delta;
700 }
701 }
702
703 let catalog = &mut self.toc.segment_catalog;
704 for descriptor in &mut catalog.lex_segments {
705 if descriptor.common.bytes_offset != 0 {
706 descriptor.common.bytes_offset += delta;
707 }
708 }
709 for descriptor in &mut catalog.vec_segments {
710 if descriptor.common.bytes_offset != 0 {
711 descriptor.common.bytes_offset += delta;
712 }
713 }
714 for descriptor in &mut catalog.time_segments {
715 if descriptor.common.bytes_offset != 0 {
716 descriptor.common.bytes_offset += delta;
717 }
718 }
719 #[cfg(feature = "temporal_track")]
720 for descriptor in &mut catalog.temporal_segments {
721 if descriptor.common.bytes_offset != 0 {
722 descriptor.common.bytes_offset += delta;
723 }
724 }
725 for descriptor in &mut catalog.tantivy_segments {
726 if descriptor.common.bytes_offset != 0 {
727 descriptor.common.bytes_offset += delta;
728 }
729 }
730
731 #[cfg(feature = "lex")]
732 if let Ok(mut storage) = self.lex_storage.write() {
733 storage.adjust_offsets(delta);
734 }
735 }
736 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
737 self.ensure_writable()?;
738 if options.background {
739 tracing::debug!("commit background flag ignored; running synchronously");
740 }
741 let mode = options.mode;
742 let records = self.wal.pending_records()?;
743 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
744 return Ok(());
745 }
746 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
747 }
748
749 pub fn commit(&mut self) -> Result<()> {
750 self.ensure_writable()?;
751 self.commit_with_options(CommitOptions::new(CommitMode::Full))
752 }
753
754 pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
765 if opts.wal_pre_size_bytes > 0 {
766 self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
767 }
768 self.wal.set_skip_sync(opts.skip_sync);
769 self.batch_opts = Some(opts);
770 Ok(())
771 }
772
773 fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
782 if min_bytes <= self.header.wal_size {
783 return Ok(());
784 }
785 let target = min_bytes.next_power_of_two();
787 let delta = target.saturating_sub(self.header.wal_size);
788 if delta == 0 {
789 return Ok(());
790 }
791
792 tracing::info!(
793 current_wal = self.header.wal_size,
794 target_wal = target,
795 delta,
796 "pre-sizing WAL for batch mode"
797 );
798
799 self.shift_data_for_wal_growth(delta)?;
800 self.header.wal_size = target;
801 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
802 self.data_end = self.data_end.saturating_add(delta);
803 self.adjust_offsets_after_wal_growth(delta);
804
805 let catalog_end = self.catalog_data_end();
806 self.header.footer_offset = catalog_end
807 .max(self.header.footer_offset)
808 .max(self.data_end);
809
810 self.rewrite_toc_footer()?;
811 self.header.toc_checksum = self.toc.toc_checksum;
812 crate::persist_header(&mut self.file, &self.header)?;
813 self.file.sync_all()?;
814 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
815 Ok(())
816 }
817
818 pub fn end_batch(&mut self) -> Result<()> {
823 self.wal.flush()?;
825 self.wal.set_skip_sync(false);
826 self.batch_opts = None;
827 Ok(())
828 }
829
830 pub fn commit_skip_indexes(&mut self) -> Result<()> {
839 self.ensure_writable()?;
840 let records = self.wal.pending_records()?;
841 if records.is_empty() && !self.dirty {
842 return Ok(());
843 }
844 self.commit_skip_indexes_inner(records)
845 }
846
847 fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
848 self.generation = self.generation.wrapping_add(1);
849
850 #[cfg(feature = "lex")]
853 let tantivy_backup = self.tantivy.take();
854
855 let result = self.apply_records(records);
856
857 #[cfg(feature = "lex")]
859 {
860 self.tantivy = tantivy_backup;
861 self.tantivy_dirty = false;
862 }
863
864 let _delta = result?;
865
866 self.header.footer_offset = self.data_end;
868
869 self.toc.time_index = None;
872 self.toc.indexes.lex_segments.clear();
873 if let Some(vec) = self.toc.indexes.vec.as_mut() {
875 vec.bytes_offset = 0;
876 vec.bytes_length = 0;
877 vec.vector_count = 0;
878 }
879 self.toc.indexes.lex = None;
880 self.toc.indexes.clip = None;
881 self.toc.segment_catalog.lex_segments.clear();
882 self.toc.segment_catalog.vec_segments.clear();
883 self.toc.segment_catalog.time_segments.clear();
884 self.toc.segment_catalog.tantivy_segments.clear();
885 #[cfg(feature = "temporal_track")]
886 {
887 self.toc.temporal_track = None;
888 self.toc.segment_catalog.temporal_segments.clear();
889 }
890 self.toc.memories_track = None;
891 self.toc.logic_mesh = None;
892 self.toc.sketch_track = None;
893
894 self.rewrite_toc_footer()?;
895 self.header.toc_checksum = self.toc.toc_checksum;
896 self.wal.record_checkpoint(&mut self.header)?;
897 self.header.toc_checksum = self.toc.toc_checksum;
898 crate::persist_header(&mut self.file, &self.header)?;
899 self.file.sync_all()?;
900 self.pending_frame_inserts = 0;
901 self.dirty = false;
902 Ok(())
903 }
904
905 pub fn finalize_indexes(&mut self) -> Result<()> {
911 self.ensure_writable()?;
912 self.rebuild_indexes(&[], &[])?;
913 self.rewrite_toc_footer()?;
914 self.header.toc_checksum = self.toc.toc_checksum;
915 crate::persist_header(&mut self.file, &self.header)?;
916 self.file.sync_all()?;
917 Ok(())
918 }
919
920 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
921 self.generation = self.generation.wrapping_add(1);
922
923 let delta = self.apply_records(records)?;
924 let mut indexes_rebuilt = false;
925
926 let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
928
929 if !delta.is_empty() || clip_needs_persist {
930 tracing::debug!(
931 inserted_frames = delta.inserted_frames.len(),
932 inserted_embeddings = delta.inserted_embeddings.len(),
933 inserted_time_entries = delta.inserted_time_entries.len(),
934 clip_needs_persist = clip_needs_persist,
935 "commit applied delta"
936 );
937 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
938 indexes_rebuilt = true;
939 }
940
941 if !indexes_rebuilt && self.tantivy_index_pending() {
942 self.flush_tantivy()?;
943 }
944
945 if !indexes_rebuilt && self.clip_enabled {
947 if let Some(ref clip_index) = self.clip_index {
948 if !clip_index.is_empty() {
949 self.persist_clip_index()?;
950 }
951 }
952 }
953
954 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
956 self.persist_memories_track()?;
957 }
958
959 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
961 self.persist_logic_mesh()?;
962 }
963
964 if !self.sketch_track.is_empty() {
966 self.persist_sketch_track()?;
967 }
968
969 self.rewrite_toc_footer()?;
973 self.header.toc_checksum = self.toc.toc_checksum;
974 self.wal.record_checkpoint(&mut self.header)?;
975 self.header.toc_checksum = self.toc.toc_checksum;
976 crate::persist_header(&mut self.file, &self.header)?;
977 self.file.sync_all()?;
978 #[cfg(feature = "parallel_segments")]
979 if let Some(wal) = self.manifest_wal.as_mut() {
980 wal.flush()?;
981 wal.truncate()?;
982 }
983 self.pending_frame_inserts = 0;
984 self.dirty = false;
985 Ok(())
986 }
987
988 #[cfg(feature = "parallel_segments")]
989 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
990 self.ensure_writable()?;
991 if !self.dirty && !self.tantivy_index_pending() {
992 return Ok(());
993 }
994 let opts = opts.clone();
995 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
996 }
997
998 #[cfg(feature = "parallel_segments")]
999 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1000 if !self.dirty && !self.tantivy_index_pending() {
1001 return Ok(());
1002 }
1003 let records = self.wal.pending_records()?;
1004 let delta = self.apply_records(records)?;
1005 self.generation = self.generation.wrapping_add(1);
1006 let mut indexes_rebuilt = false;
1007 if !delta.is_empty() {
1008 tracing::info!(
1009 inserted_frames = delta.inserted_frames.len(),
1010 inserted_embeddings = delta.inserted_embeddings.len(),
1011 inserted_time_entries = delta.inserted_time_entries.len(),
1012 "parallel commit applied delta"
1013 );
1014 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1016 tracing::info!(
1017 "parallel_commit: used_parallel={}, lex_enabled={}",
1018 used_parallel,
1019 self.lex_enabled
1020 );
1021 if used_parallel {
1022 self.header.footer_offset = self.data_end;
1025 indexes_rebuilt = true;
1026
1027 #[cfg(feature = "lex")]
1030 if self.lex_enabled {
1031 tracing::info!(
1032 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1033 delta.inserted_frames.len(),
1034 self.toc.frames.len()
1035 );
1036
1037 let tantivy_was_present = self.tantivy.is_some();
1041 if self.tantivy.is_none() {
1042 self.init_tantivy()?;
1043 }
1044
1045 if tantivy_was_present {
1047 tracing::info!(
1048 "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1049 );
1050 } else {
1051 let max_payload = crate::memvid::search::max_index_payload();
1053 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1054
1055 for frame_id in &delta.inserted_frames {
1056 let frame = match self.toc.frames.get(*frame_id as usize) {
1058 Some(f) => f.clone(),
1059 None => continue,
1060 };
1061
1062 let explicit_text = frame.search_text.clone();
1064 if let Some(ref search_text) = explicit_text {
1065 if !search_text.trim().is_empty() {
1066 prepared_docs.push((frame, search_text.clone()));
1067 continue;
1068 }
1069 }
1070
1071 let mime = frame
1073 .metadata
1074 .as_ref()
1075 .and_then(|m| m.mime.as_deref())
1076 .unwrap_or("application/octet-stream");
1077
1078 if !crate::memvid::search::is_text_indexable_mime(mime) {
1079 continue;
1080 }
1081
1082 if frame.payload_length > max_payload {
1083 continue;
1084 }
1085
1086 let text = self.frame_search_text(&frame)?;
1087 if !text.trim().is_empty() {
1088 prepared_docs.push((frame, text));
1089 }
1090 }
1091
1092 if let Some(ref mut engine) = self.tantivy {
1094 for (frame, text) in &prepared_docs {
1095 engine.add_frame(frame, text)?;
1096 }
1097
1098 if !prepared_docs.is_empty() {
1099 engine.commit()?;
1100 self.tantivy_dirty = true;
1101 }
1102
1103 tracing::info!(
1104 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1105 prepared_docs.len(),
1106 engine.num_docs()
1107 );
1108 } else {
1109 tracing::warn!(
1110 "parallel_commit: Tantivy engine is None after init_tantivy"
1111 );
1112 }
1113 } }
1115
1116 self.file.seek(SeekFrom::Start(self.data_end))?;
1119 let mut time_entries: Vec<TimeIndexEntry> = self
1120 .toc
1121 .frames
1122 .iter()
1123 .filter(|frame| {
1124 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1125 })
1126 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1127 .collect();
1128 let (ti_offset, ti_length, ti_checksum) =
1129 time_index_append(&mut self.file, &mut time_entries)?;
1130 self.toc.time_index = Some(TimeIndexManifest {
1131 bytes_offset: ti_offset,
1132 bytes_length: ti_length,
1133 entry_count: time_entries.len() as u64,
1134 checksum: ti_checksum,
1135 });
1136 self.data_end = ti_offset + ti_length;
1138 self.header.footer_offset = self.data_end;
1139 tracing::info!(
1140 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1141 ti_offset,
1142 ti_length,
1143 time_entries.len()
1144 );
1145 } else {
1146 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1148 indexes_rebuilt = true;
1149 }
1150 }
1151
1152 #[cfg(feature = "lex")]
1154 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1155 self.flush_tantivy()?;
1156 }
1157
1158 if self.clip_enabled {
1160 if let Some(ref clip_index) = self.clip_index {
1161 if !clip_index.is_empty() {
1162 self.persist_clip_index()?;
1163 }
1164 }
1165 }
1166
1167 if self.memories_track.card_count() > 0 {
1169 self.persist_memories_track()?;
1170 }
1171
1172 if !self.logic_mesh.is_empty() {
1174 self.persist_logic_mesh()?;
1175 }
1176
1177 if !self.sketch_track.is_empty() {
1179 self.persist_sketch_track()?;
1180 }
1181
1182 self.rewrite_toc_footer()?;
1185 self.header.toc_checksum = self.toc.toc_checksum;
1186 self.wal.record_checkpoint(&mut self.header)?;
1187 self.header.toc_checksum = self.toc.toc_checksum;
1188 crate::persist_header(&mut self.file, &self.header)?;
1189 self.file.sync_all()?;
1190 if let Some(wal) = self.manifest_wal.as_mut() {
1191 wal.flush()?;
1192 wal.truncate()?;
1193 }
1194 self.pending_frame_inserts = 0;
1195 self.dirty = false;
1196 Ok(())
1197 }
1198
1199 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1200 let records = self.wal.records_after(self.header.wal_sequence)?;
1201 if records.is_empty() {
1202 if self.tantivy_index_pending() {
1203 self.flush_tantivy()?;
1204 }
1205 return Ok(());
1206 }
1207 let delta = self.apply_records(records)?;
1208 if !delta.is_empty() {
1209 tracing::debug!(
1210 inserted_frames = delta.inserted_frames.len(),
1211 inserted_embeddings = delta.inserted_embeddings.len(),
1212 inserted_time_entries = delta.inserted_time_entries.len(),
1213 "recover applied delta"
1214 );
1215 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1216 } else if self.tantivy_index_pending() {
1217 self.flush_tantivy()?;
1218 }
1219 self.wal.record_checkpoint(&mut self.header)?;
1220 crate::persist_header(&mut self.file, &self.header)?;
1221 if !delta.is_empty() {
1222 } else if self.tantivy_index_pending() {
1224 self.flush_tantivy()?;
1225 crate::persist_header(&mut self.file, &self.header)?;
1226 }
1227 self.file.sync_all()?;
1228 self.pending_frame_inserts = 0;
1229 self.dirty = false;
1230 Ok(())
1231 }
1232
1233 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1234 let mut delta = IngestionDelta::default();
1235 if records.is_empty() {
1236 return Ok(delta);
1237 }
1238
1239 let mut data_cursor = self.data_end;
1244 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1245
1246 if !records.is_empty() {
1247 self.file.seek(SeekFrom::Start(data_cursor))?;
1248 for record in records {
1249 let mut entry = match decode_wal_entry(&record.payload)? {
1250 WalEntry::Frame(entry) => entry,
1251 #[cfg(feature = "lex")]
1252 WalEntry::Lex(batch) => {
1253 self.apply_lex_wal(batch)?;
1254 continue;
1255 }
1256 };
1257
1258 match entry.op {
1259 FrameWalOp::Insert => {
1260 let frame_id = self.toc.frames.len() as u64;
1261
1262 let (
1263 payload_offset,
1264 payload_length,
1265 checksum_bytes,
1266 canonical_length_value,
1267 ) = if let Some(source_id) = entry.reuse_payload_from {
1268 if !entry.payload.is_empty() {
1269 return Err(MemvidError::InvalidFrame {
1270 frame_id: source_id,
1271 reason: "reused payload entry contained inline bytes",
1272 });
1273 }
1274 let source_idx = usize::try_from(source_id).map_err(|_| {
1275 MemvidError::InvalidFrame {
1276 frame_id: source_id,
1277 reason: "frame id too large for memory",
1278 }
1279 })?;
1280 let source = self.toc.frames.get(source_idx).cloned().ok_or(
1281 MemvidError::InvalidFrame {
1282 frame_id: source_id,
1283 reason: "reused payload source missing",
1284 },
1285 )?;
1286 (
1287 source.payload_offset,
1288 source.payload_length,
1289 source.checksum,
1290 entry
1291 .canonical_length
1292 .or(source.canonical_length)
1293 .unwrap_or(source.payload_length),
1294 )
1295 } else {
1296 self.file.seek(SeekFrom::Start(data_cursor))?;
1297 self.file.write_all(&entry.payload)?;
1298 let checksum = hash(&entry.payload);
1299 let payload_length = entry.payload.len() as u64;
1300 let canonical_length =
1301 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1302 if let Some(len) = entry.canonical_length {
1303 len
1304 } else {
1305 let decoded = crate::decode_canonical_bytes(
1306 &entry.payload,
1307 CanonicalEncoding::Zstd,
1308 frame_id,
1309 )?;
1310 decoded.len() as u64
1311 }
1312 } else {
1313 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1314 };
1315 let payload_offset = data_cursor;
1316 data_cursor += payload_length;
1317 self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1319 (
1320 payload_offset,
1321 payload_length,
1322 *checksum.as_bytes(),
1323 canonical_length,
1324 )
1325 };
1326
1327 let uri = entry
1328 .uri
1329 .clone()
1330 .unwrap_or_else(|| crate::default_uri(frame_id));
1331 let title = entry
1332 .title
1333 .clone()
1334 .or_else(|| crate::infer_title_from_uri(&uri));
1335
1336 #[cfg(feature = "temporal_track")]
1337 let (anchor_ts, anchor_source) =
1338 self.determine_temporal_anchor(entry.timestamp);
1339
1340 let mut frame = Frame {
1341 id: frame_id,
1342 timestamp: entry.timestamp,
1343 anchor_ts: {
1344 #[cfg(feature = "temporal_track")]
1345 {
1346 Some(anchor_ts)
1347 }
1348 #[cfg(not(feature = "temporal_track"))]
1349 {
1350 None
1351 }
1352 },
1353 anchor_source: {
1354 #[cfg(feature = "temporal_track")]
1355 {
1356 Some(anchor_source)
1357 }
1358 #[cfg(not(feature = "temporal_track"))]
1359 {
1360 None
1361 }
1362 },
1363 kind: entry.kind.clone(),
1364 track: entry.track.clone(),
1365 payload_offset,
1366 payload_length,
1367 checksum: checksum_bytes,
1368 uri: Some(uri),
1369 title,
1370 canonical_encoding: entry.canonical_encoding,
1371 canonical_length: Some(canonical_length_value),
1372 metadata: entry.metadata.clone(),
1373 search_text: entry.search_text.clone(),
1374 tags: entry.tags.clone(),
1375 labels: entry.labels.clone(),
1376 extra_metadata: entry.extra_metadata.clone(),
1377 content_dates: entry.content_dates.clone(),
1378 chunk_manifest: entry.chunk_manifest.clone(),
1379 role: entry.role,
1380 parent_id: None,
1381 chunk_index: entry.chunk_index,
1382 chunk_count: entry.chunk_count,
1383 status: FrameStatus::Active,
1384 supersedes: entry.supersedes_frame_id,
1385 superseded_by: None,
1386 source_sha256: entry.source_sha256,
1387 source_path: entry.source_path.clone(),
1388 enrichment_state: entry.enrichment_state,
1389 };
1390
1391 if let Some(parent_seq) = entry.parent_sequence {
1392 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1393 frame.parent_id = Some(*parent_frame_id);
1394 } else {
1395 if entry.role == FrameRole::DocumentChunk {
1401 for &candidate_id in delta.inserted_frames.iter().rev() {
1403 if let Ok(idx) = usize::try_from(candidate_id) {
1404 if let Some(candidate) = self.toc.frames.get(idx) {
1405 if candidate.role == FrameRole::Document
1406 && candidate.chunk_manifest.is_some()
1407 {
1408 frame.parent_id = Some(candidate_id);
1410 tracing::debug!(
1411 chunk_frame_id = frame_id,
1412 parent_frame_id = candidate_id,
1413 parent_seq = parent_seq,
1414 "resolved chunk parent via fallback"
1415 );
1416 break;
1417 }
1418 }
1419 }
1420 }
1421 }
1422 if frame.parent_id.is_none() {
1423 tracing::warn!(
1424 chunk_frame_id = frame_id,
1425 parent_seq = parent_seq,
1426 "chunk has parent_sequence but parent not found in batch"
1427 );
1428 }
1429 }
1430 }
1431
1432 #[cfg(feature = "lex")]
1433 let index_text = if self.tantivy.is_some() {
1434 if let Some(text) = entry.search_text.clone() {
1435 if text.trim().is_empty() {
1436 None
1437 } else {
1438 Some(text)
1439 }
1440 } else {
1441 Some(self.frame_content(&frame)?)
1442 }
1443 } else {
1444 None
1445 };
1446 #[cfg(feature = "lex")]
1447 if let (Some(engine), Some(text)) =
1448 (self.tantivy.as_mut(), index_text.as_ref())
1449 {
1450 engine.add_frame(&frame, text)?;
1451 self.tantivy_dirty = true;
1452
1453 if !text.trim().is_empty() {
1456 let entry = crate::types::generate_sketch(
1457 frame_id,
1458 text,
1459 crate::types::SketchVariant::Small,
1460 None,
1461 );
1462 self.sketch_track.insert(entry);
1463 }
1464 }
1465
1466 if let Some(embedding) = entry.embedding.take() {
1467 delta
1468 .inserted_embeddings
1469 .push((frame_id, embedding.clone()));
1470 }
1471
1472 if entry.role == FrameRole::Document {
1473 delta
1474 .inserted_time_entries
1475 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1476 #[cfg(feature = "temporal_track")]
1477 {
1478 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1479 frame_id,
1480 anchor_ts,
1481 anchor_source,
1482 ));
1483 delta.inserted_temporal_mentions.extend(
1484 Self::collect_temporal_mentions(
1485 entry.search_text.as_deref(),
1486 frame_id,
1487 anchor_ts,
1488 ),
1489 );
1490 }
1491 }
1492
1493 if let Some(predecessor) = frame.supersedes {
1494 self.mark_frame_superseded(predecessor, frame_id)?;
1495 }
1496
1497 self.toc.frames.push(frame);
1498 delta.inserted_frames.push(frame_id);
1499 sequence_to_frame.insert(record.sequence, frame_id);
1500 }
1501 FrameWalOp::Tombstone => {
1502 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1503 frame_id: 0,
1504 reason: "tombstone missing frame reference",
1505 })?;
1506 self.mark_frame_deleted(target)?;
1507 delta.mutated_frames = true;
1508 }
1509 }
1510 }
1511 self.data_end = self.data_end.max(data_cursor);
1512 }
1513
1514 let orphan_resolutions: Vec<(u64, u64)> = delta
1518 .inserted_frames
1519 .iter()
1520 .filter_map(|&frame_id| {
1521 let idx = usize::try_from(frame_id).ok()?;
1522 let frame = self.toc.frames.get(idx)?;
1523 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1524 return None;
1525 }
1526 for candidate_id in (0..frame_id).rev() {
1528 if let Ok(idx) = usize::try_from(candidate_id) {
1529 if let Some(candidate) = self.toc.frames.get(idx) {
1530 if candidate.role == FrameRole::Document
1531 && candidate.chunk_manifest.is_some()
1532 && candidate.status == FrameStatus::Active
1533 {
1534 return Some((frame_id, candidate_id));
1535 }
1536 }
1537 }
1538 }
1539 None
1540 })
1541 .collect();
1542
1543 for (chunk_id, parent_id) in orphan_resolutions {
1545 if let Ok(idx) = usize::try_from(chunk_id) {
1546 if let Some(frame) = self.toc.frames.get_mut(idx) {
1547 frame.parent_id = Some(parent_id);
1548 tracing::debug!(
1549 chunk_frame_id = chunk_id,
1550 parent_frame_id = parent_id,
1551 "resolved orphan chunk parent in second pass"
1552 );
1553 }
1554 }
1555 }
1556
1557 Ok(delta)
1560 }
1561
1562 #[cfg(feature = "temporal_track")]
1563 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1564 (timestamp, AnchorSource::FrameTimestamp)
1565 }
1566
1567 #[cfg(feature = "temporal_track")]
1568 fn collect_temporal_mentions(
1569 text: Option<&str>,
1570 frame_id: FrameId,
1571 anchor_ts: i64,
1572 ) -> Vec<TemporalMention> {
1573 let text = match text {
1574 Some(value) if !value.trim().is_empty() => value,
1575 _ => return Vec::new(),
1576 };
1577
1578 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1579 Ok(ts) => ts,
1580 Err(_) => return Vec::new(),
1581 };
1582
1583 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1584 let normalizer = TemporalNormalizer::new(context);
1585 let mut spans: Vec<(usize, usize)> = Vec::new();
1586 let lower = text.to_ascii_lowercase();
1587
1588 for phrase in STATIC_TEMPORAL_PHRASES {
1589 let mut search_start = 0usize;
1590 while let Some(idx) = lower[search_start..].find(phrase) {
1591 let abs = search_start + idx;
1592 let end = abs + phrase.len();
1593 spans.push((abs, end));
1594 search_start = end;
1595 }
1596 }
1597
1598 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1599 let regex = NUMERIC_DATE.get_or_init(|| {
1600 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1601 });
1602 let regex = match regex {
1603 Ok(re) => re,
1604 Err(msg) => {
1605 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1606 return Vec::new();
1607 }
1608 };
1609 for mat in regex.find_iter(text) {
1610 spans.push((mat.start(), mat.end()));
1611 }
1612
1613 spans.sort_unstable();
1614 spans.dedup();
1615
1616 let mut mentions: Vec<TemporalMention> = Vec::new();
1617 for (start, end) in spans {
1618 if end > text.len() || start >= end {
1619 continue;
1620 }
1621 let raw = &text[start..end];
1622 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1623 if trimmed.is_empty() {
1624 continue;
1625 }
1626 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1627 let finish = offset + trimmed.len();
1628 match normalizer.resolve(trimmed) {
1629 Ok(resolution) => {
1630 mentions.extend(Self::resolution_to_mentions(
1631 resolution, frame_id, offset, finish,
1632 ));
1633 }
1634 Err(_) => continue,
1635 }
1636 }
1637
1638 mentions
1639 }
1640
1641 #[cfg(feature = "temporal_track")]
1642 fn resolution_to_mentions(
1643 resolution: TemporalResolution,
1644 frame_id: FrameId,
1645 byte_start: usize,
1646 byte_end: usize,
1647 ) -> Vec<TemporalMention> {
1648 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1649 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1650 let mut results = Vec::new();
1651
1652 let base_flags = Self::flags_from_resolution(&resolution.flags);
1653 match resolution.value {
1654 TemporalResolutionValue::Date(date) => {
1655 let ts = Self::date_to_timestamp(date);
1656 results.push(TemporalMention::new(
1657 ts,
1658 frame_id,
1659 byte_start,
1660 byte_len,
1661 TemporalMentionKind::Date,
1662 resolution.confidence,
1663 0,
1664 base_flags,
1665 ));
1666 }
1667 TemporalResolutionValue::DateTime(dt) => {
1668 let ts = dt.unix_timestamp();
1669 let tz_hint = dt.offset().whole_minutes() as i16;
1670 results.push(TemporalMention::new(
1671 ts,
1672 frame_id,
1673 byte_start,
1674 byte_len,
1675 TemporalMentionKind::DateTime,
1676 resolution.confidence,
1677 tz_hint,
1678 base_flags,
1679 ));
1680 }
1681 TemporalResolutionValue::DateRange { start, end } => {
1682 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1683 let start_ts = Self::date_to_timestamp(start);
1684 results.push(TemporalMention::new(
1685 start_ts,
1686 frame_id,
1687 byte_start,
1688 byte_len,
1689 TemporalMentionKind::RangeStart,
1690 resolution.confidence,
1691 0,
1692 flags,
1693 ));
1694 let end_ts = Self::date_to_timestamp(end);
1695 results.push(TemporalMention::new(
1696 end_ts,
1697 frame_id,
1698 byte_start,
1699 byte_len,
1700 TemporalMentionKind::RangeEnd,
1701 resolution.confidence,
1702 0,
1703 flags,
1704 ));
1705 }
1706 TemporalResolutionValue::DateTimeRange { start, end } => {
1707 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1708 results.push(TemporalMention::new(
1709 start.unix_timestamp(),
1710 frame_id,
1711 byte_start,
1712 byte_len,
1713 TemporalMentionKind::RangeStart,
1714 resolution.confidence,
1715 start.offset().whole_minutes() as i16,
1716 flags,
1717 ));
1718 results.push(TemporalMention::new(
1719 end.unix_timestamp(),
1720 frame_id,
1721 byte_start,
1722 byte_len,
1723 TemporalMentionKind::RangeEnd,
1724 resolution.confidence,
1725 end.offset().whole_minutes() as i16,
1726 flags,
1727 ));
1728 }
1729 TemporalResolutionValue::Month { year, month } => {
1730 let start_date = match Date::from_calendar_date(year, month, 1) {
1731 Ok(date) => date,
1732 Err(err) => {
1733 tracing::warn!(
1734 target = "memvid::temporal",
1735 %err,
1736 year,
1737 month = month as u8,
1738 "skipping invalid month resolution"
1739 );
1740 return results;
1742 }
1743 };
1744 let end_date = match Self::last_day_in_month(year, month) {
1745 Some(date) => date,
1746 None => {
1747 tracing::warn!(
1748 target = "memvid::temporal",
1749 year,
1750 month = month as u8,
1751 "skipping month resolution with invalid calendar range"
1752 );
1753 return results;
1754 }
1755 };
1756 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1757 results.push(TemporalMention::new(
1758 Self::date_to_timestamp(start_date),
1759 frame_id,
1760 byte_start,
1761 byte_len,
1762 TemporalMentionKind::RangeStart,
1763 resolution.confidence,
1764 0,
1765 flags,
1766 ));
1767 results.push(TemporalMention::new(
1768 Self::date_to_timestamp(end_date),
1769 frame_id,
1770 byte_start,
1771 byte_len,
1772 TemporalMentionKind::RangeEnd,
1773 resolution.confidence,
1774 0,
1775 flags,
1776 ));
1777 }
1778 }
1779
1780 results
1781 }
1782
1783 #[cfg(feature = "temporal_track")]
1784 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1785 let mut result = TemporalMentionFlags::empty();
1786 if flags
1787 .iter()
1788 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1789 {
1790 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1791 }
1792 if flags
1793 .iter()
1794 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1795 {
1796 result = result.set(TemporalMentionFlags::DERIVED, true);
1797 }
1798 result
1799 }
1800
1801 #[cfg(feature = "temporal_track")]
1802 fn date_to_timestamp(date: Date) -> i64 {
1803 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1804 .assume_offset(UtcOffset::UTC)
1805 .unix_timestamp()
1806 }
1807
1808 #[cfg(feature = "temporal_track")]
1809 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1810 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1811 while let Some(next) = date.next_day() {
1812 if next.month() == month {
1813 date = next;
1814 } else {
1815 break;
1816 }
1817 }
1818 Some(date)
1819 }
1820
1821 #[allow(dead_code)]
1822 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1823 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1824 return Ok(false);
1825 }
1826
1827 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1828 Some(artifact) => artifact,
1829 None => return Ok(false),
1830 };
1831
1832 let segment_id = self.toc.segment_catalog.next_segment_id;
1833 #[cfg(feature = "parallel_segments")]
1834 let span =
1835 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1836
1837 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1838 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1839 #[cfg(feature = "parallel_segments")]
1840 if let Some(span) = span {
1841 Self::decorate_segment_common(&mut descriptor.common, span);
1842 }
1843 #[cfg(feature = "parallel_segments")]
1844 let descriptor_for_manifest = descriptor.clone();
1845 self.toc.segment_catalog.lex_segments.push(descriptor);
1846 #[cfg(feature = "parallel_segments")]
1847 if let Err(err) = self.record_index_segment(
1848 SegmentKind::Lexical,
1849 descriptor_for_manifest.common,
1850 SegmentStats {
1851 doc_count: artifact.doc_count,
1852 vector_count: 0,
1853 time_entries: 0,
1854 bytes_uncompressed: artifact.bytes.len() as u64,
1855 build_micros: 0,
1856 },
1857 ) {
1858 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1859 }
1860 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1861 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1862 Ok(true)
1863 }
1864
1865 #[allow(dead_code)]
1866 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1867 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1868 return Ok(false);
1869 }
1870
1871 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1872 Some(artifact) => artifact,
1873 None => return Ok(false),
1874 };
1875
1876 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1877 if existing_dim != artifact.dimension {
1878 return Err(MemvidError::VecDimensionMismatch {
1879 expected: existing_dim,
1880 actual: artifact.dimension as usize,
1881 });
1882 }
1883 }
1884
1885 let segment_id = self.toc.segment_catalog.next_segment_id;
1886 #[cfg(feature = "parallel_segments")]
1887 #[cfg(feature = "parallel_segments")]
1888 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1889
1890 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1891 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1892 #[cfg(feature = "parallel_segments")]
1893 if let Some(span) = span {
1894 Self::decorate_segment_common(&mut descriptor.common, span);
1895 }
1896 #[cfg(feature = "parallel_segments")]
1897 let descriptor_for_manifest = descriptor.clone();
1898 self.toc.segment_catalog.vec_segments.push(descriptor);
1899 #[cfg(feature = "parallel_segments")]
1900 if let Err(err) = self.record_index_segment(
1901 SegmentKind::Vector,
1902 descriptor_for_manifest.common,
1903 SegmentStats {
1904 doc_count: 0,
1905 vector_count: artifact.vector_count,
1906 time_entries: 0,
1907 bytes_uncompressed: artifact.bytes_uncompressed,
1908 build_micros: 0,
1909 },
1910 ) {
1911 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1912 }
1913 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1914 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1915
1916 if self.toc.indexes.vec.is_none() {
1918 let empty_offset = self.data_end;
1919 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1920 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1921 self.toc.indexes.vec = Some(VecIndexManifest {
1922 vector_count: 0,
1923 dimension: 0,
1924 bytes_offset: empty_offset,
1925 bytes_length: 0,
1926 checksum: empty_checksum,
1927 compression_mode: self.vec_compression.clone(),
1928 model: self.vec_model.clone(),
1929 });
1930 }
1931 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1932 if manifest.dimension == 0 {
1933 manifest.dimension = artifact.dimension;
1934 }
1935 if manifest.bytes_length == 0 {
1936 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1937 manifest.compression_mode = artifact.compression.clone();
1938 }
1939 }
1940
1941 self.vec_enabled = true;
1942 Ok(true)
1943 }
1944
1945 #[allow(dead_code)]
1946 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1947 if delta.inserted_time_entries.is_empty() {
1948 return Ok(false);
1949 }
1950
1951 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1952 Some(artifact) => artifact,
1953 None => return Ok(false),
1954 };
1955
1956 let segment_id = self.toc.segment_catalog.next_segment_id;
1957 #[cfg(feature = "parallel_segments")]
1958 #[cfg(feature = "parallel_segments")]
1959 let span = self.segment_span_from_iter(
1960 delta
1961 .inserted_time_entries
1962 .iter()
1963 .map(|entry| entry.frame_id),
1964 );
1965
1966 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1967 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1968 #[cfg(feature = "parallel_segments")]
1969 if let Some(span) = span {
1970 Self::decorate_segment_common(&mut descriptor.common, span);
1971 }
1972 #[cfg(feature = "parallel_segments")]
1973 let descriptor_for_manifest = descriptor.clone();
1974 self.toc.segment_catalog.time_segments.push(descriptor);
1975 #[cfg(feature = "parallel_segments")]
1976 if let Err(err) = self.record_index_segment(
1977 SegmentKind::Time,
1978 descriptor_for_manifest.common,
1979 SegmentStats {
1980 doc_count: 0,
1981 vector_count: 0,
1982 time_entries: artifact.entry_count,
1983 bytes_uncompressed: artifact.bytes.len() as u64,
1984 build_micros: 0,
1985 },
1986 ) {
1987 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1988 }
1989 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1990 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1991 Ok(true)
1992 }
1993
1994 #[cfg(feature = "temporal_track")]
1995 #[allow(dead_code)]
1996 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1997 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1998 {
1999 return Ok(false);
2000 }
2001
2002 debug_assert!(
2003 delta.inserted_temporal_mentions.len() < 1_000_000,
2004 "temporal delta mentions unexpectedly large: {}",
2005 delta.inserted_temporal_mentions.len()
2006 );
2007 debug_assert!(
2008 delta.inserted_temporal_anchors.len() < 1_000_000,
2009 "temporal delta anchors unexpectedly large: {}",
2010 delta.inserted_temporal_anchors.len()
2011 );
2012
2013 let artifact = match self.build_temporal_segment_from_records(
2014 &delta.inserted_temporal_mentions,
2015 &delta.inserted_temporal_anchors,
2016 )? {
2017 Some(artifact) => artifact,
2018 None => return Ok(false),
2019 };
2020
2021 let segment_id = self.toc.segment_catalog.next_segment_id;
2022 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2023 self.toc
2024 .segment_catalog
2025 .temporal_segments
2026 .push(descriptor.clone());
2027 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2028 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2029
2030 self.toc.temporal_track = Some(TemporalTrackManifest {
2031 bytes_offset: descriptor.common.bytes_offset,
2032 bytes_length: descriptor.common.bytes_length,
2033 entry_count: artifact.entry_count,
2034 anchor_count: artifact.anchor_count,
2035 checksum: artifact.checksum,
2036 flags: artifact.flags,
2037 });
2038
2039 self.clear_temporal_track_cache();
2040
2041 Ok(true)
2042 }
2043
2044 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2045 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2046 frame_id,
2047 reason: "frame id too large",
2048 })?;
2049 let frame = self
2050 .toc
2051 .frames
2052 .get_mut(index)
2053 .ok_or(MemvidError::InvalidFrame {
2054 frame_id,
2055 reason: "supersede target missing",
2056 })?;
2057 frame.status = FrameStatus::Superseded;
2058 frame.superseded_by = Some(successor_id);
2059 self.remove_frame_from_indexes(frame_id)
2060 }
2061
2062 pub(crate) fn rebuild_indexes(
2063 &mut self,
2064 new_vec_docs: &[(FrameId, Vec<f32>)],
2065 inserted_frame_ids: &[FrameId],
2066 ) -> Result<()> {
2067 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2068 return Ok(());
2069 }
2070
2071 let payload_end = self.payload_region_end();
2072 self.data_end = payload_end;
2073 let safe_truncate_len = self.header.footer_offset.max(payload_end);
2076 if self.file.metadata()?.len() > safe_truncate_len {
2077 self.file.set_len(safe_truncate_len)?;
2078 }
2079 self.file.seek(SeekFrom::Start(payload_end))?;
2080
2081 self.toc.segment_catalog.lex_segments.clear();
2083 self.toc.segment_catalog.vec_segments.clear();
2084 self.toc.segment_catalog.time_segments.clear();
2085 #[cfg(feature = "temporal_track")]
2086 self.toc.segment_catalog.temporal_segments.clear();
2087 #[cfg(feature = "parallel_segments")]
2088 self.toc.segment_catalog.index_segments.clear();
2089 self.toc.segment_catalog.tantivy_segments.clear();
2091 self.toc.indexes.lex_segments.clear();
2093
2094 let mut time_entries: Vec<TimeIndexEntry> = self
2095 .toc
2096 .frames
2097 .iter()
2098 .filter(|frame| {
2099 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2100 })
2101 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2102 .collect();
2103 let (ti_offset, ti_length, ti_checksum) =
2104 time_index_append(&mut self.file, &mut time_entries)?;
2105 self.toc.time_index = Some(TimeIndexManifest {
2106 bytes_offset: ti_offset,
2107 bytes_length: ti_length,
2108 entry_count: time_entries.len() as u64,
2109 checksum: ti_checksum,
2110 });
2111
2112 let mut footer_offset = ti_offset + ti_length;
2113
2114 #[cfg(feature = "temporal_track")]
2115 {
2116 self.toc.temporal_track = None;
2117 self.toc.segment_catalog.temporal_segments.clear();
2118 self.clear_temporal_track_cache();
2119 }
2120
2121 if self.lex_enabled {
2122 #[cfg(feature = "lex")]
2123 {
2124 if self.tantivy_dirty {
2125 if let Ok(mut storage) = self.lex_storage.write() {
2129 storage.clear();
2130 storage.set_generation(0);
2131 }
2132 self.init_tantivy()?;
2133 if let Some(mut engine) = self.tantivy.take() {
2134 self.rebuild_tantivy_engine(&mut engine)?;
2135 self.tantivy = Some(engine);
2136 } else {
2137 return Err(MemvidError::InvalidToc {
2138 reason: "tantivy engine missing during rebuild".into(),
2139 });
2140 }
2141 } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2142 let max_payload = crate::memvid::search::max_index_payload();
2148 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2149 for &frame_id in inserted_frame_ids {
2150 let Ok(idx) = usize::try_from(frame_id) else {
2151 continue;
2152 };
2153 let frame = match self.toc.frames.get(idx) {
2154 Some(f) => f.clone(),
2155 None => continue,
2156 };
2157 if frame.status != FrameStatus::Active {
2158 continue;
2159 }
2160 if let Some(search_text) = frame.search_text.clone() {
2161 if !search_text.trim().is_empty() {
2162 prepared_docs.push((frame, search_text));
2163 continue;
2164 }
2165 }
2166 let mime = frame
2167 .metadata
2168 .as_ref()
2169 .and_then(|m| m.mime.as_deref())
2170 .unwrap_or("application/octet-stream");
2171 if !crate::memvid::search::is_text_indexable_mime(mime) {
2172 continue;
2173 }
2174 if frame.payload_length > max_payload {
2175 continue;
2176 }
2177 let text = self.frame_search_text(&frame)?;
2178 if !text.trim().is_empty() {
2179 prepared_docs.push((frame, text));
2180 }
2181 }
2182 if let Some(engine) = self.tantivy.as_mut() {
2183 for (frame, text) in &prepared_docs {
2184 engine.add_frame(frame, text)?;
2185 }
2186 engine.commit()?;
2187 }
2188 } else {
2189 if let Ok(mut storage) = self.lex_storage.write() {
2192 storage.clear();
2193 storage.set_generation(0);
2194 }
2195 self.init_tantivy()?;
2196 if let Some(mut engine) = self.tantivy.take() {
2197 self.rebuild_tantivy_engine(&mut engine)?;
2198 self.tantivy = Some(engine);
2199 } else {
2200 return Err(MemvidError::InvalidToc {
2201 reason: "tantivy engine missing during rebuild".into(),
2202 });
2203 }
2204 }
2205
2206 self.lex_enabled = true;
2208
2209 self.tantivy_dirty = true;
2211
2212 self.data_end = footer_offset;
2214
2215 self.flush_tantivy()?;
2217
2218 footer_offset = self.header.footer_offset;
2220
2221 self.data_end = payload_end;
2223 }
2224 #[cfg(not(feature = "lex"))]
2225 {
2226 self.toc.indexes.lex = None;
2227 self.toc.indexes.lex_segments.clear();
2228 }
2229 } else {
2230 self.toc.indexes.lex = None;
2232 self.toc.indexes.lex_segments.clear();
2233 #[cfg(feature = "lex")]
2234 if let Ok(mut storage) = self.lex_storage.write() {
2235 storage.clear();
2236 }
2237 }
2238
2239 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2240 let vec_offset = footer_offset;
2241 self.file.seek(SeekFrom::Start(vec_offset))?;
2242 self.file.write_all(&artifact.bytes)?;
2243 footer_offset += artifact.bytes.len() as u64;
2244 self.toc.indexes.vec = Some(VecIndexManifest {
2245 vector_count: artifact.vector_count,
2246 dimension: artifact.dimension,
2247 bytes_offset: vec_offset,
2248 bytes_length: artifact.bytes.len() as u64,
2249 checksum: artifact.checksum,
2250 compression_mode: self.vec_compression.clone(),
2251 model: self.vec_model.clone(),
2252 });
2253 self.vec_index = Some(index);
2254 } else {
2255 if !self.vec_enabled {
2257 self.toc.indexes.vec = None;
2258 }
2259 self.vec_index = None;
2260 }
2261
2262 if self.clip_enabled {
2264 if let Some(ref clip_index) = self.clip_index {
2265 if !clip_index.is_empty() {
2266 let artifact = clip_index.encode()?;
2267 let clip_offset = footer_offset;
2268 self.file.seek(SeekFrom::Start(clip_offset))?;
2269 self.file.write_all(&artifact.bytes)?;
2270 footer_offset += artifact.bytes.len() as u64;
2271 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2272 bytes_offset: clip_offset,
2273 bytes_length: artifact.bytes.len() as u64,
2274 vector_count: artifact.vector_count,
2275 dimension: artifact.dimension,
2276 checksum: artifact.checksum,
2277 model_name: crate::clip::default_model_info().name.to_string(),
2278 });
2279 tracing::info!(
2280 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2281 artifact.vector_count,
2282 clip_offset
2283 );
2284 }
2285 }
2286 } else {
2287 self.toc.indexes.clip = None;
2288 }
2289
2290 if self.memories_track.card_count() > 0 {
2292 let memories_offset = footer_offset;
2293 let memories_bytes = self.memories_track.serialize()?;
2294 let memories_checksum = blake3::hash(&memories_bytes).into();
2295 self.file.seek(SeekFrom::Start(memories_offset))?;
2296 self.file.write_all(&memories_bytes)?;
2297 footer_offset += memories_bytes.len() as u64;
2298
2299 let stats = self.memories_track.stats();
2300 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2301 bytes_offset: memories_offset,
2302 bytes_length: memories_bytes.len() as u64,
2303 card_count: stats.card_count as u64,
2304 entity_count: stats.entity_count as u64,
2305 checksum: memories_checksum,
2306 });
2307 } else {
2308 self.toc.memories_track = None;
2309 }
2310
2311 if self.logic_mesh.is_empty() {
2313 self.toc.logic_mesh = None;
2314 } else {
2315 let mesh_offset = footer_offset;
2316 let mesh_bytes = self.logic_mesh.serialize()?;
2317 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2318 self.file.seek(SeekFrom::Start(mesh_offset))?;
2319 self.file.write_all(&mesh_bytes)?;
2320 footer_offset += mesh_bytes.len() as u64;
2321
2322 let stats = self.logic_mesh.stats();
2323 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2324 bytes_offset: mesh_offset,
2325 bytes_length: mesh_bytes.len() as u64,
2326 node_count: stats.node_count as u64,
2327 edge_count: stats.edge_count as u64,
2328 checksum: mesh_checksum,
2329 });
2330 }
2331
2332 tracing::info!(
2334 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2335 ti_offset,
2336 ti_length,
2337 footer_offset,
2338 self.header.footer_offset
2339 );
2340
2341 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2344
2345 if self.file.metadata()?.len() < self.header.footer_offset {
2347 self.file.set_len(self.header.footer_offset)?;
2348 }
2349
2350 self.rewrite_toc_footer()?;
2351 self.header.toc_checksum = self.toc.toc_checksum;
2352 crate::persist_header(&mut self.file, &self.header)?;
2353
2354 #[cfg(feature = "lex")]
2355 if self.lex_enabled {
2356 if let Some(ref engine) = self.tantivy {
2357 let doc_count = engine.num_docs();
2358 let active_frame_count = self
2359 .toc
2360 .frames
2361 .iter()
2362 .filter(|f| f.status == FrameStatus::Active)
2363 .count();
2364
2365 let text_indexable_count = self
2368 .toc
2369 .frames
2370 .iter()
2371 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2372 .count();
2373
2374 if doc_count == 0 && text_indexable_count > 0 {
2377 return Err(MemvidError::Doctor {
2378 reason: format!(
2379 "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2380 This indicates a critical failure in the rebuild process."
2381 ),
2382 });
2383 }
2384
2385 log::info!(
2387 "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2388 );
2389 }
2390 }
2391
2392 Ok(())
2393 }
2394
2395 fn persist_memories_track(&mut self) -> Result<()> {
2400 if self.memories_track.card_count() == 0 {
2401 self.toc.memories_track = None;
2402 return Ok(());
2403 }
2404
2405 let memories_offset = self.header.footer_offset;
2407 let memories_bytes = self.memories_track.serialize()?;
2408 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2409
2410 self.file.seek(SeekFrom::Start(memories_offset))?;
2411 self.file.write_all(&memories_bytes)?;
2412
2413 let stats = self.memories_track.stats();
2414 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2415 bytes_offset: memories_offset,
2416 bytes_length: memories_bytes.len() as u64,
2417 card_count: stats.card_count as u64,
2418 entity_count: stats.entity_count as u64,
2419 checksum: memories_checksum,
2420 });
2421
2422 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2424
2425 if self.file.metadata()?.len() < self.header.footer_offset {
2427 self.file.set_len(self.header.footer_offset)?;
2428 }
2429
2430 Ok(())
2431 }
2432
2433 fn persist_clip_index(&mut self) -> Result<()> {
2438 if !self.clip_enabled {
2439 self.toc.indexes.clip = None;
2440 return Ok(());
2441 }
2442
2443 let clip_index = match &self.clip_index {
2444 Some(idx) if !idx.is_empty() => idx,
2445 _ => {
2446 self.toc.indexes.clip = None;
2447 return Ok(());
2448 }
2449 };
2450
2451 let artifact = clip_index.encode()?;
2453
2454 let clip_offset = self.header.footer_offset;
2456 self.file.seek(SeekFrom::Start(clip_offset))?;
2457 self.file.write_all(&artifact.bytes)?;
2458
2459 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2460 bytes_offset: clip_offset,
2461 bytes_length: artifact.bytes.len() as u64,
2462 vector_count: artifact.vector_count,
2463 dimension: artifact.dimension,
2464 checksum: artifact.checksum,
2465 model_name: crate::clip::default_model_info().name.to_string(),
2466 });
2467
2468 tracing::info!(
2469 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2470 artifact.vector_count,
2471 clip_offset
2472 );
2473
2474 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2476
2477 if self.file.metadata()?.len() < self.header.footer_offset {
2479 self.file.set_len(self.header.footer_offset)?;
2480 }
2481
2482 Ok(())
2483 }
2484
2485 fn persist_logic_mesh(&mut self) -> Result<()> {
2490 if self.logic_mesh.is_empty() {
2491 self.toc.logic_mesh = None;
2492 return Ok(());
2493 }
2494
2495 let mesh_offset = self.header.footer_offset;
2497 let mesh_bytes = self.logic_mesh.serialize()?;
2498 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2499
2500 self.file.seek(SeekFrom::Start(mesh_offset))?;
2501 self.file.write_all(&mesh_bytes)?;
2502
2503 let stats = self.logic_mesh.stats();
2504 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2505 bytes_offset: mesh_offset,
2506 bytes_length: mesh_bytes.len() as u64,
2507 node_count: stats.node_count as u64,
2508 edge_count: stats.edge_count as u64,
2509 checksum: mesh_checksum,
2510 });
2511
2512 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2514
2515 if self.file.metadata()?.len() < self.header.footer_offset {
2517 self.file.set_len(self.header.footer_offset)?;
2518 }
2519
2520 Ok(())
2521 }
2522
2523 fn persist_sketch_track(&mut self) -> Result<()> {
2528 if self.sketch_track.is_empty() {
2529 self.toc.sketch_track = None;
2530 return Ok(());
2531 }
2532
2533 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2535
2536 let (sketch_offset, sketch_length, sketch_checksum) =
2538 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2539
2540 let stats = self.sketch_track.stats();
2541 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2542 bytes_offset: sketch_offset,
2543 bytes_length: sketch_length,
2544 entry_count: stats.entry_count,
2545 #[allow(clippy::cast_possible_truncation)]
2546 entry_size: stats.variant.entry_size() as u16,
2547 flags: 0,
2548 checksum: sketch_checksum,
2549 });
2550
2551 self.header.footer_offset = sketch_offset + sketch_length;
2553
2554 if self.file.metadata()?.len() < self.header.footer_offset {
2556 self.file.set_len(self.header.footer_offset)?;
2557 }
2558
2559 tracing::debug!(
2560 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2561 stats.entry_count,
2562 sketch_offset
2563 );
2564
2565 Ok(())
2566 }
2567
2568 #[cfg(feature = "lex")]
2569 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2570 let LexWalBatch {
2571 generation,
2572 doc_count,
2573 checksum,
2574 segments,
2575 } = batch;
2576
2577 if let Ok(mut storage) = self.lex_storage.write() {
2578 storage.replace(doc_count, checksum, segments);
2579 storage.set_generation(generation);
2580 }
2581
2582 self.persist_lex_manifest()
2583 }
2584
2585 #[cfg(feature = "lex")]
2586 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2587 let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2588 self.append_wal_entry(&payload)?;
2589 Ok(())
2590 }
2591
2592 #[cfg(feature = "lex")]
2593 fn persist_lex_manifest(&mut self) -> Result<()> {
2594 let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2595 storage.to_manifest()
2596 } else {
2597 (None, Vec::new())
2598 };
2599
2600 if let Some(storage_manifest) = index_manifest {
2602 self.toc.indexes.lex = Some(storage_manifest);
2604 } else {
2605 self.toc.indexes.lex = None;
2608 }
2609
2610 self.toc.indexes.lex_segments = segments;
2611
2612 self.rewrite_toc_footer()?;
2616 self.header.toc_checksum = self.toc.toc_checksum;
2617 crate::persist_header(&mut self.file, &self.header)?;
2618 Ok(())
2619 }
2620
2621 #[cfg(feature = "lex")]
2622 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2623 let TantivySnapshot {
2624 doc_count,
2625 checksum,
2626 segments,
2627 } = snapshot;
2628
2629 let mut footer_offset = self.data_end;
2630 self.file.seek(SeekFrom::Start(footer_offset))?;
2631
2632 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2633 for segment in segments {
2634 let bytes_length = segment.bytes.len() as u64;
2635 self.file.write_all(&segment.bytes)?;
2636 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2638 path: segment.path,
2639 bytes_offset: footer_offset,
2640 bytes_length,
2641 checksum: segment.checksum,
2642 });
2643 footer_offset += bytes_length;
2644 }
2645 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2650
2651 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2652 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2653 Vec::with_capacity(embedded_segments.len());
2654 for segment in &embedded_segments {
2655 let descriptor = TantivySegmentDescriptor::from_common(
2656 SegmentCommon::new(
2657 next_segment_id,
2658 segment.bytes_offset,
2659 segment.bytes_length,
2660 segment.checksum,
2661 ),
2662 segment.path.clone(),
2663 );
2664 catalog_segments.push(descriptor);
2665 next_segment_id = next_segment_id.saturating_add(1);
2666 }
2667 if catalog_segments.is_empty() {
2668 self.toc.segment_catalog.tantivy_segments.clear();
2669 } else {
2670 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2671 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2672 }
2673 self.toc.segment_catalog.next_segment_id = next_segment_id;
2674
2675 let generation = self
2682 .lex_storage
2683 .write()
2684 .map_err(|_| MemvidError::Tantivy {
2685 reason: "embedded lex storage lock poisoned".into(),
2686 })
2687 .map(|mut storage| {
2688 storage.replace(doc_count, checksum, embedded_segments.clone());
2689 storage.generation()
2690 })?;
2691
2692 let batch = LexWalBatch {
2693 generation,
2694 doc_count,
2695 checksum,
2696 segments: embedded_segments.clone(),
2697 };
2698 self.append_lex_batch(&batch)?;
2699 self.persist_lex_manifest()?;
2700 self.header.toc_checksum = self.toc.toc_checksum;
2701 crate::persist_header(&mut self.file, &self.header)?;
2702 Ok(())
2703 }
2704
2705 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2706 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2707 frame_id,
2708 reason: "frame id too large",
2709 })?;
2710 let frame = self
2711 .toc
2712 .frames
2713 .get_mut(index)
2714 .ok_or(MemvidError::InvalidFrame {
2715 frame_id,
2716 reason: "delete target missing",
2717 })?;
2718 frame.status = FrameStatus::Deleted;
2719 frame.superseded_by = None;
2720 self.remove_frame_from_indexes(frame_id)
2721 }
2722
2723 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2724 #[cfg(feature = "lex")]
2725 if let Some(engine) = self.tantivy.as_mut() {
2726 engine.delete_frame(frame_id)?;
2727 self.tantivy_dirty = true;
2728 }
2729 if let Some(index) = self.lex_index.as_mut() {
2730 index.remove_document(frame_id);
2731 }
2732 if let Some(index) = self.vec_index.as_mut() {
2733 index.remove(frame_id);
2734 }
2735 Ok(())
2736 }
2737
2738 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2739 let Ok(index) = usize::try_from(frame_id) else {
2740 return false;
2741 };
2742 self.toc
2743 .frames
2744 .get(index)
2745 .is_some_and(|frame| frame.status == FrameStatus::Active)
2746 }
2747
2748 #[cfg(feature = "parallel_segments")]
2749 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2750 where
2751 I: IntoIterator<Item = FrameId>,
2752 {
2753 let mut iter = iter.into_iter();
2754 let first_id = iter.next()?;
2755 let first_frame = self.toc.frames.get(first_id as usize);
2756 let mut min_id = first_id;
2757 let mut max_id = first_id;
2758 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2759 let mut page_end = first_frame
2760 .and_then(|frame| frame.chunk_count)
2761 .map(|count| page_start + count.saturating_sub(1))
2762 .unwrap_or(page_start);
2763 for frame_id in iter {
2764 if frame_id < min_id {
2765 min_id = frame_id;
2766 }
2767 if frame_id > max_id {
2768 max_id = frame_id;
2769 }
2770 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2771 if let Some(idx) = frame.chunk_index {
2772 page_start = page_start.min(idx);
2773 if let Some(count) = frame.chunk_count {
2774 let end = idx + count.saturating_sub(1);
2775 page_end = page_end.max(end);
2776 } else {
2777 page_end = page_end.max(idx);
2778 }
2779 }
2780 }
2781 }
2782 Some(SegmentSpan {
2783 frame_start: min_id,
2784 frame_end: max_id,
2785 page_start,
2786 page_end,
2787 ..SegmentSpan::default()
2788 })
2789 }
2790
2791 #[cfg(feature = "parallel_segments")]
2792 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2793 common.span = Some(span);
2794 if common.codec_version == 0 {
2795 common.codec_version = 1;
2796 }
2797 }
2798
2799 #[cfg(feature = "parallel_segments")]
2800 pub(crate) fn record_index_segment(
2801 &mut self,
2802 kind: SegmentKind,
2803 common: SegmentCommon,
2804 stats: SegmentStats,
2805 ) -> Result<()> {
2806 let entry = IndexSegmentRef {
2807 kind,
2808 common,
2809 stats,
2810 };
2811 self.toc.segment_catalog.index_segments.push(entry.clone());
2812 if let Some(wal) = self.manifest_wal.as_mut() {
2813 wal.append_segments(&[entry])?;
2814 }
2815 Ok(())
2816 }
2817
2818 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2819 self.ensure_writable()?;
2820 if self.toc.ticket_ref.issuer == "free-tier" {
2821 return Ok(());
2822 }
2823 match self.tier() {
2824 Tier::Free => Ok(()),
2825 tier => {
2826 if self.toc.ticket_ref.issuer.trim().is_empty() {
2827 Err(MemvidError::TicketRequired { tier })
2828 } else {
2829 Ok(())
2830 }
2831 }
2832 }
2833 }
2834
2835 pub(crate) fn tier(&self) -> Tier {
2836 if self.header.wal_size >= WAL_SIZE_LARGE {
2837 Tier::Enterprise
2838 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2839 Tier::Dev
2840 } else {
2841 Tier::Free
2842 }
2843 }
2844
2845 pub(crate) fn capacity_limit(&self) -> u64 {
2846 if self.toc.ticket_ref.capacity_bytes != 0 {
2847 self.toc.ticket_ref.capacity_bytes
2848 } else {
2849 self.tier().capacity_bytes()
2850 }
2851 }
2852
2853 #[must_use]
2858 pub fn get_capacity(&self) -> u64 {
2859 self.capacity_limit()
2860 }
2861
2862 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2863 tracing::info!(
2864 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2865 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2866 time_segments = self.toc.segment_catalog.time_segments.len(),
2867 footer_offset = self.header.footer_offset,
2868 data_end = self.data_end,
2869 "rewrite_toc_footer: about to serialize TOC"
2870 );
2871 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2872 let footer_offset = self.header.footer_offset;
2873 self.file.seek(SeekFrom::Start(footer_offset))?;
2874 self.file.write_all(&toc_bytes)?;
2875 let footer = CommitFooter {
2876 toc_len: toc_bytes.len() as u64,
2877 toc_hash: *hash(&toc_bytes).as_bytes(),
2878 generation: self.generation,
2879 };
2880 let encoded_footer = footer.encode();
2881 self.file.write_all(&encoded_footer)?;
2882
2883 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2885 let min_len = self.header.wal_offset + self.header.wal_size;
2886 let final_len = new_len.max(min_len);
2887
2888 if new_len < min_len {
2889 tracing::warn!(
2890 file.new_len = new_len,
2891 file.min_len = min_len,
2892 file.final_len = final_len,
2893 "truncation would cut into WAL region, clamping to min_len"
2894 );
2895 }
2896
2897 self.file.set_len(final_len)?;
2898 self.file.sync_all()?;
2900 Ok(())
2901 }
2902}
2903
2904#[cfg(feature = "parallel_segments")]
2905impl Memvid {
2906 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2907 let chunks = self.collect_segment_chunks(delta)?;
2908 if chunks.is_empty() {
2909 return Ok(false);
2910 }
2911 let planner = SegmentPlanner::new(opts.clone());
2912 let plans = planner.plan_from_chunks(chunks);
2913 if plans.is_empty() {
2914 return Ok(false);
2915 }
2916 let worker_pool = SegmentWorkerPool::new(opts);
2917 let results = worker_pool.execute(plans)?;
2918 if results.is_empty() {
2919 return Ok(false);
2920 }
2921 self.append_parallel_segments(results)?;
2922 Ok(true)
2923 }
2924
2925 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2926 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2927 delta.inserted_embeddings.iter().cloned().collect();
2928 tracing::info!(
2929 inserted_frames = ?delta.inserted_frames,
2930 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2931 "collect_segment_chunks: comparing frame IDs"
2932 );
2933 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2934 for frame_id in &delta.inserted_frames {
2935 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2936 MemvidError::InvalidFrame {
2937 frame_id: *frame_id,
2938 reason: "frame id out of range while planning segments",
2939 },
2940 )?;
2941 let text = self.frame_content(&frame)?;
2942 if text.trim().is_empty() {
2943 continue;
2944 }
2945 let token_estimate = estimate_tokens(&text);
2946 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2947 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2948 let page_start = if frame.chunk_index.is_some() {
2949 chunk_index + 1
2950 } else {
2951 0
2952 };
2953 let page_end = if frame.chunk_index.is_some() {
2954 page_start
2955 } else {
2956 0
2957 };
2958 chunks.push(SegmentChunkPlan {
2959 text,
2960 frame_id: *frame_id,
2961 timestamp: frame.timestamp,
2962 chunk_index,
2963 chunk_count: chunk_count.max(1),
2964 token_estimate,
2965 token_start: 0,
2966 token_end: 0,
2967 page_start,
2968 page_end,
2969 embedding: embedding_map.remove(frame_id),
2970 });
2971 }
2972 Ok(chunks)
2973 }
2974}
2975
2976#[cfg(feature = "parallel_segments")]
2977fn estimate_tokens(text: &str) -> usize {
2978 text.split_whitespace().count().max(1)
2979}
2980
2981impl Memvid {
2982 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2983 let catalog_end = self.catalog_data_end();
2984 if catalog_end <= self.header.footer_offset {
2985 return Ok(false);
2986 }
2987 self.header.footer_offset = catalog_end;
2988 self.rewrite_toc_footer()?;
2989 self.header.toc_checksum = self.toc.toc_checksum;
2990 crate::persist_header(&mut self.file, &self.header)?;
2991 Ok(true)
2992 }
2993}
2994
2995impl Memvid {
2996 pub fn vacuum(&mut self) -> Result<()> {
2997 self.commit()?;
2998
2999 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3000 let frames: Vec<Frame> = self
3001 .toc
3002 .frames
3003 .iter()
3004 .filter(|frame| frame.status == FrameStatus::Active)
3005 .cloned()
3006 .collect();
3007 for frame in frames {
3008 let bytes = self.read_frame_payload_bytes(&frame)?;
3009 active_payloads.insert(frame.id, bytes);
3010 }
3011
3012 let mut cursor = self.header.wal_offset + self.header.wal_size;
3013 self.file.seek(SeekFrom::Start(cursor))?;
3014 for frame in &mut self.toc.frames {
3015 if frame.status == FrameStatus::Active {
3016 if let Some(bytes) = active_payloads.get(&frame.id) {
3017 self.file.write_all(bytes)?;
3018 frame.payload_offset = cursor;
3019 frame.payload_length = bytes.len() as u64;
3020 cursor += bytes.len() as u64;
3021 } else {
3022 frame.payload_offset = 0;
3023 frame.payload_length = 0;
3024 }
3025 } else {
3026 frame.payload_offset = 0;
3027 frame.payload_length = 0;
3028 }
3029 }
3030
3031 self.data_end = cursor;
3032
3033 self.toc.segments.clear();
3034 self.toc.indexes.lex_segments.clear();
3035 self.toc.segment_catalog.lex_segments.clear();
3036 self.toc.segment_catalog.vec_segments.clear();
3037 self.toc.segment_catalog.time_segments.clear();
3038 #[cfg(feature = "temporal_track")]
3039 {
3040 self.toc.temporal_track = None;
3041 self.toc.segment_catalog.temporal_segments.clear();
3042 }
3043 #[cfg(feature = "lex")]
3044 {
3045 self.toc.segment_catalog.tantivy_segments.clear();
3046 }
3047 #[cfg(feature = "parallel_segments")]
3048 {
3049 self.toc.segment_catalog.index_segments.clear();
3050 }
3051
3052 #[cfg(feature = "lex")]
3054 {
3055 self.tantivy = None;
3056 self.tantivy_dirty = false;
3057 }
3058
3059 self.rebuild_indexes(&[], &[])?;
3060 self.file.sync_all()?;
3061 Ok(())
3062 }
3063
3064 #[must_use]
3082 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3083 plan_document_chunks(payload).map(|plan| plan.chunks)
3084 }
3085
3086 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3088 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3089 }
3090
3091 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3093 self.put_internal(Some(payload), None, None, None, options, None)
3094 }
3095
3096 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3098 self.put_internal(
3099 Some(payload),
3100 None,
3101 Some(embedding),
3102 None,
3103 PutOptions::default(),
3104 None,
3105 )
3106 }
3107
3108 pub fn put_with_embedding_and_options(
3109 &mut self,
3110 payload: &[u8],
3111 embedding: Vec<f32>,
3112 options: PutOptions,
3113 ) -> Result<u64> {
3114 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3115 }
3116
3117 pub fn put_with_chunk_embeddings(
3130 &mut self,
3131 payload: &[u8],
3132 parent_embedding: Option<Vec<f32>>,
3133 chunk_embeddings: Vec<Vec<f32>>,
3134 options: PutOptions,
3135 ) -> Result<u64> {
3136 self.put_internal(
3137 Some(payload),
3138 None,
3139 parent_embedding,
3140 Some(chunk_embeddings),
3141 options,
3142 None,
3143 )
3144 }
3145
3146 pub fn update_frame(
3148 &mut self,
3149 frame_id: FrameId,
3150 payload: Option<Vec<u8>>,
3151 mut options: PutOptions,
3152 embedding: Option<Vec<f32>>,
3153 ) -> Result<u64> {
3154 self.ensure_mutation_allowed()?;
3155 let existing = self.frame_by_id(frame_id)?;
3156 if existing.status != FrameStatus::Active {
3157 return Err(MemvidError::InvalidFrame {
3158 frame_id,
3159 reason: "frame is not active",
3160 });
3161 }
3162
3163 if options.timestamp.is_none() {
3164 options.timestamp = Some(existing.timestamp);
3165 }
3166 if options.track.is_none() {
3167 options.track = existing.track.clone();
3168 }
3169 if options.kind.is_none() {
3170 options.kind = existing.kind.clone();
3171 }
3172 if options.uri.is_none() {
3173 options.uri = existing.uri.clone();
3174 }
3175 if options.title.is_none() {
3176 options.title = existing.title.clone();
3177 }
3178 if options.metadata.is_none() {
3179 options.metadata = existing.metadata.clone();
3180 }
3181 if options.search_text.is_none() {
3182 options.search_text = existing.search_text.clone();
3183 }
3184 if options.tags.is_empty() {
3185 options.tags = existing.tags.clone();
3186 }
3187 if options.labels.is_empty() {
3188 options.labels = existing.labels.clone();
3189 }
3190 if options.extra_metadata.is_empty() {
3191 options.extra_metadata = existing.extra_metadata.clone();
3192 }
3193
3194 let reuse_frame = if payload.is_none() {
3195 options.auto_tag = false;
3196 options.extract_dates = false;
3197 Some(existing.clone())
3198 } else {
3199 None
3200 };
3201
3202 let effective_embedding = if let Some(explicit) = embedding {
3203 Some(explicit)
3204 } else if self.vec_enabled {
3205 self.frame_embedding(frame_id)?
3206 } else {
3207 None
3208 };
3209
3210 let payload_slice = payload.as_deref();
3211 let reuse_flag = reuse_frame.is_some();
3212 let replace_flag = payload_slice.is_some();
3213 let seq = self.put_internal(
3214 payload_slice,
3215 reuse_frame,
3216 effective_embedding,
3217 None, options,
3219 Some(frame_id),
3220 )?;
3221 info!(
3222 "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3223 );
3224 Ok(seq)
3225 }
3226
3227 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3228 self.ensure_mutation_allowed()?;
3229 let frame = self.frame_by_id(frame_id)?;
3230 if frame.status != FrameStatus::Active {
3231 return Err(MemvidError::InvalidFrame {
3232 frame_id,
3233 reason: "frame is not active",
3234 });
3235 }
3236
3237 let mut tombstone = WalEntryData {
3238 timestamp: SystemTime::now()
3239 .duration_since(UNIX_EPOCH)
3240 .map(|d| d.as_secs() as i64)
3241 .unwrap_or(frame.timestamp),
3242 kind: None,
3243 track: None,
3244 payload: Vec::new(),
3245 embedding: None,
3246 uri: frame.uri.clone(),
3247 title: frame.title.clone(),
3248 canonical_encoding: frame.canonical_encoding,
3249 canonical_length: frame.canonical_length,
3250 metadata: None,
3251 search_text: None,
3252 tags: Vec::new(),
3253 labels: Vec::new(),
3254 extra_metadata: BTreeMap::new(),
3255 content_dates: Vec::new(),
3256 chunk_manifest: None,
3257 role: frame.role,
3258 parent_sequence: None,
3259 chunk_index: frame.chunk_index,
3260 chunk_count: frame.chunk_count,
3261 op: FrameWalOp::Tombstone,
3262 target_frame_id: Some(frame_id),
3263 supersedes_frame_id: None,
3264 reuse_payload_from: None,
3265 source_sha256: None,
3266 source_path: None,
3267 enrichment_state: crate::types::EnrichmentState::default(),
3268 };
3269 tombstone.kind = frame.kind.clone();
3270 tombstone.track = frame.track.clone();
3271
3272 let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3273 let seq = self.append_wal_entry(&payload_bytes)?;
3274 self.dirty = true;
3275 let suppress_checkpoint = self
3276 .batch_opts
3277 .as_ref()
3278 .is_some_and(|o| o.disable_auto_checkpoint);
3279 if !suppress_checkpoint && self.wal.should_checkpoint() {
3280 self.commit()?;
3281 }
3282 info!("frame_delete frame_id={frame_id} seq={seq}");
3283 Ok(seq)
3284 }
3285}
3286
3287impl Memvid {
3288 fn put_internal(
3289 &mut self,
3290 payload: Option<&[u8]>,
3291 reuse_frame: Option<Frame>,
3292 embedding: Option<Vec<f32>>,
3293 chunk_embeddings: Option<Vec<Vec<f32>>>,
3294 mut options: PutOptions,
3295 supersedes: Option<FrameId>,
3296 ) -> Result<u64> {
3297 self.ensure_mutation_allowed()?;
3298
3299 if options.dedup {
3301 if let Some(bytes) = payload {
3302 let content_hash = hash(bytes);
3303 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3304 tracing::debug!(
3306 frame_id = existing_frame.id,
3307 "dedup: skipping ingestion, identical content already exists"
3308 );
3309 return Ok(existing_frame.id);
3311 }
3312 }
3313 }
3314
3315 if payload.is_some() && reuse_frame.is_some() {
3316 let frame_id = reuse_frame
3317 .as_ref()
3318 .map(|frame| frame.id)
3319 .unwrap_or_default();
3320 return Err(MemvidError::InvalidFrame {
3321 frame_id,
3322 reason: "cannot reuse payload when bytes are provided",
3323 });
3324 }
3325
3326 let incoming_dimension = {
3329 let mut dim: Option<u32> = None;
3330
3331 if let Some(ref vector) = embedding {
3332 if !vector.is_empty() {
3333 #[allow(clippy::cast_possible_truncation)]
3334 let len = vector.len() as u32;
3335 dim = Some(len);
3336 }
3337 }
3338
3339 if let Some(ref vectors) = chunk_embeddings {
3340 for vector in vectors {
3341 if vector.is_empty() {
3342 continue;
3343 }
3344 let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3345 match dim {
3346 None => dim = Some(vec_dim),
3347 Some(existing) if existing == vec_dim => {}
3348 Some(existing) => {
3349 return Err(MemvidError::VecDimensionMismatch {
3350 expected: existing,
3351 actual: vector.len(),
3352 });
3353 }
3354 }
3355 }
3356 }
3357
3358 dim
3359 };
3360
3361 if let Some(incoming_dimension) = incoming_dimension {
3362 if !self.vec_enabled {
3364 self.enable_vec()?;
3365 }
3366
3367 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3368 if existing_dimension != incoming_dimension {
3369 return Err(MemvidError::VecDimensionMismatch {
3370 expected: existing_dimension,
3371 actual: incoming_dimension as usize,
3372 });
3373 }
3374 }
3375
3376 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3378 if manifest.dimension == 0 {
3379 manifest.dimension = incoming_dimension;
3380 }
3381 }
3382 }
3383
3384 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3385 let payload_tail = self.payload_region_end();
3386 let projected = if let Some(bytes) = payload {
3387 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3388 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3389 } else {
3390 prepare_canonical_payload(bytes)?
3391 };
3392 let len = prepared.len();
3393 prepared_payload = Some((prepared, encoding, length));
3394 payload_tail.saturating_add(len as u64)
3395 } else if reuse_frame.is_some() {
3396 payload_tail
3397 } else {
3398 return Err(MemvidError::InvalidFrame {
3399 frame_id: 0,
3400 reason: "payload required for frame insertion",
3401 });
3402 };
3403
3404 let capacity_limit = self.capacity_limit();
3405 if projected > capacity_limit {
3406 let incoming_size = projected.saturating_sub(payload_tail);
3407 return Err(MemvidError::CapacityExceeded {
3408 current: payload_tail,
3409 limit: capacity_limit,
3410 required: incoming_size,
3411 });
3412 }
3413 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3414 SystemTime::now()
3415 .duration_since(UNIX_EPOCH)
3416 .map(|d| d.as_secs() as i64)
3417 .unwrap_or(0)
3418 });
3419
3420 #[allow(unused_assignments)]
3421 let mut reuse_bytes: Option<Vec<u8>> = None;
3422 let payload_for_processing = if let Some(bytes) = payload {
3423 Some(bytes)
3424 } else if let Some(frame) = reuse_frame.as_ref() {
3425 let bytes = self.frame_canonical_bytes(frame)?;
3426 reuse_bytes = Some(bytes);
3427 reuse_bytes.as_deref()
3428 } else {
3429 None
3430 };
3431
3432 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3434 (Some(bytes), None) => plan_document_chunks(bytes),
3435 _ => None,
3436 };
3437
3438 let mut source_sha256: Option<[u8; 32]> = None;
3442 let source_path_value = options.source_path.take();
3443
3444 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3445 if raw_chunk_plan.is_some() {
3446 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3448 } else if options.no_raw {
3449 if let Some(bytes) = payload {
3451 let hash_result = hash(bytes);
3453 source_sha256 = Some(*hash_result.as_bytes());
3454 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3456 } else {
3457 return Err(MemvidError::InvalidFrame {
3458 frame_id: 0,
3459 reason: "payload required for --no-raw mode",
3460 });
3461 }
3462 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3463 (prepared, encoding, length, None)
3464 } else if let Some(bytes) = payload {
3465 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3466 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3467 } else {
3468 prepare_canonical_payload(bytes)?
3469 };
3470 (prepared, encoding, length, None)
3471 } else if let Some(frame) = reuse_frame.as_ref() {
3472 (
3473 Vec::new(),
3474 frame.canonical_encoding,
3475 frame.canonical_length,
3476 Some(frame.id),
3477 )
3478 } else {
3479 return Err(MemvidError::InvalidFrame {
3480 frame_id: 0,
3481 reason: "payload required for frame insertion",
3482 });
3483 };
3484
3485 let mut chunk_plan = raw_chunk_plan;
3487
3488 let mut metadata = options.metadata.take();
3489 let mut search_text = options
3490 .search_text
3491 .take()
3492 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3493 let mut tags = std::mem::take(&mut options.tags);
3494 let mut labels = std::mem::take(&mut options.labels);
3495 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3496 let mut content_dates: Vec<String> = Vec::new();
3497
3498 let need_search_text = search_text
3499 .as_ref()
3500 .is_none_or(|text| text.trim().is_empty());
3501 let need_metadata = metadata.is_none();
3502 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3503
3504 let mut extraction_error = None;
3505 let mut is_skim_extraction = false; let extracted = if run_extractor {
3508 if let Some(bytes) = payload_for_processing {
3509 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3510 let uri_hint = options.uri.as_deref();
3511
3512 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3514
3515 if use_budgeted {
3516 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3518 options.extraction_budget_ms,
3519 );
3520 match crate::extract_budgeted::extract_with_budget(
3521 bytes, mime_hint, uri_hint, budget,
3522 ) {
3523 Ok(result) => {
3524 is_skim_extraction = result.is_skim();
3525 if is_skim_extraction {
3526 tracing::debug!(
3527 coverage = result.coverage,
3528 elapsed_ms = result.elapsed_ms,
3529 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3530 "time-budgeted extraction (skim)"
3531 );
3532 }
3533 let doc = crate::extract::ExtractedDocument {
3535 text: if result.text.is_empty() {
3536 None
3537 } else {
3538 Some(result.text)
3539 },
3540 metadata: serde_json::json!({
3541 "skim": is_skim_extraction,
3542 "coverage": result.coverage,
3543 "sections_extracted": result.sections_extracted,
3544 "sections_total": result.sections_total,
3545 }),
3546 mime_type: mime_hint.map(std::string::ToString::to_string),
3547 };
3548 Some(doc)
3549 }
3550 Err(err) => {
3551 tracing::warn!(
3553 ?err,
3554 "budgeted extraction failed, trying full extraction"
3555 );
3556 match extract_via_registry(bytes, mime_hint, uri_hint) {
3557 Ok(doc) => Some(doc),
3558 Err(err) => {
3559 extraction_error = Some(err);
3560 None
3561 }
3562 }
3563 }
3564 }
3565 } else {
3566 match extract_via_registry(bytes, mime_hint, uri_hint) {
3568 Ok(doc) => Some(doc),
3569 Err(err) => {
3570 extraction_error = Some(err);
3571 None
3572 }
3573 }
3574 }
3575 } else {
3576 None
3577 }
3578 } else {
3579 None
3580 };
3581
3582 if let Some(err) = extraction_error {
3583 return Err(err);
3584 }
3585
3586 if let Some(doc) = &extracted {
3587 if need_search_text {
3588 if let Some(text) = &doc.text {
3589 if let Some(normalized) =
3590 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3591 {
3592 search_text = Some(normalized);
3593 }
3594 }
3595 }
3596
3597 if chunk_plan.is_none() {
3600 if let Some(text) = &doc.text {
3601 chunk_plan = plan_text_chunks(text);
3602 }
3603 }
3604
3605 if let Some(mime) = doc.mime_type.as_ref() {
3606 if let Some(existing) = &mut metadata {
3607 if existing.mime.is_none() {
3608 existing.mime = Some(mime.clone());
3609 }
3610 } else {
3611 let mut doc_meta = DocMetadata::default();
3612 doc_meta.mime = Some(mime.clone());
3613 metadata = Some(doc_meta);
3614 }
3615 }
3616
3617 if options.auto_tag {
3620 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3621 {
3622 extra_metadata
3623 .entry("extractous_metadata".to_string())
3624 .or_insert(meta_json);
3625 }
3626 }
3627 }
3628
3629 if options.auto_tag {
3630 if let Some(ref text) = search_text {
3631 if !text.trim().is_empty() {
3632 let result = AutoTagger.analyse(text, options.extract_dates);
3633 merge_unique(&mut tags, result.tags);
3634 merge_unique(&mut labels, result.labels);
3635 if options.extract_dates && content_dates.is_empty() {
3636 content_dates = result.content_dates;
3637 }
3638 }
3639 }
3640 }
3641
3642 if content_dates.is_empty() {
3643 if let Some(frame) = reuse_frame.as_ref() {
3644 content_dates = frame.content_dates.clone();
3645 }
3646 }
3647
3648 let metadata_ref = metadata.as_ref();
3649 let mut search_text = augment_search_text(
3650 search_text,
3651 options.uri.as_deref(),
3652 options.title.as_deref(),
3653 options.track.as_deref(),
3654 &tags,
3655 &labels,
3656 &extra_metadata,
3657 &content_dates,
3658 metadata_ref,
3659 );
3660 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3661 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3662 let mut parent_chunk_count: Option<u32> = None;
3663
3664 let kind_value = options.kind.take();
3665 let track_value = options.track.take();
3666 let uri_value = options.uri.take();
3667 let title_value = options.title.take();
3668 let should_extract_triplets = options.extract_triplets;
3669 let triplet_uri = uri_value.clone();
3671 let triplet_title = title_value.clone();
3672
3673 if let Some(plan) = chunk_plan.as_ref() {
3674 let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3675 parent_chunk_manifest = Some(plan.manifest.clone());
3676 parent_chunk_count = Some(chunk_total);
3677
3678 if let Some(first_chunk) = plan.chunks.first() {
3679 if let Some(normalized) =
3680 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3681 {
3682 if !normalized.trim().is_empty() {
3683 search_text = Some(normalized);
3684 }
3685 }
3686 }
3687
3688 let chunk_tags = tags.clone();
3689 let chunk_labels = labels.clone();
3690 let chunk_metadata = metadata.clone();
3691 let chunk_extra_metadata = extra_metadata.clone();
3692 let chunk_content_dates = content_dates.clone();
3693
3694 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3695 let (chunk_payload, chunk_encoding, chunk_length) =
3696 prepare_canonical_payload(chunk_text.as_bytes())?;
3697 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3698 .map(|n| n.text)
3699 .filter(|text| !text.trim().is_empty());
3700
3701 let chunk_uri = uri_value
3702 .as_ref()
3703 .map(|uri| format!("{uri}#page-{}", idx + 1));
3704 let chunk_title = title_value
3705 .as_ref()
3706 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3707
3708 let chunk_embedding = chunk_embeddings
3710 .as_ref()
3711 .and_then(|embeddings| embeddings.get(idx).cloned());
3712
3713 chunk_entries.push(WalEntryData {
3714 timestamp,
3715 kind: kind_value.clone(),
3716 track: track_value.clone(),
3717 payload: chunk_payload,
3718 embedding: chunk_embedding,
3719 uri: chunk_uri,
3720 title: chunk_title,
3721 canonical_encoding: chunk_encoding,
3722 canonical_length: chunk_length,
3723 metadata: chunk_metadata.clone(),
3724 search_text: chunk_search_text,
3725 tags: chunk_tags.clone(),
3726 labels: chunk_labels.clone(),
3727 extra_metadata: chunk_extra_metadata.clone(),
3728 content_dates: chunk_content_dates.clone(),
3729 chunk_manifest: None,
3730 role: FrameRole::DocumentChunk,
3731 parent_sequence: None,
3732 chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3733 chunk_count: Some(chunk_total),
3734 op: FrameWalOp::Insert,
3735 target_frame_id: None,
3736 supersedes_frame_id: None,
3737 reuse_payload_from: None,
3738 source_sha256: None, source_path: None,
3740 enrichment_state: crate::types::EnrichmentState::Enriched,
3742 });
3743 }
3744 }
3745
3746 let parent_uri = uri_value.clone();
3747 let parent_title = title_value.clone();
3748
3749 let parent_sequence = if let Some(parent_id) = options.parent_id {
3752 usize::try_from(parent_id)
3757 .ok()
3758 .and_then(|idx| self.toc.frames.get(idx))
3759 .map(|_| parent_id + 2) } else {
3761 None
3762 };
3763
3764 let triplet_text = search_text.clone();
3766
3767 #[cfg(feature = "lex")]
3769 let instant_index_tags = if options.instant_index {
3770 tags.clone()
3771 } else {
3772 Vec::new()
3773 };
3774 #[cfg(feature = "lex")]
3775 let instant_index_labels = if options.instant_index {
3776 labels.clone()
3777 } else {
3778 Vec::new()
3779 };
3780
3781 #[cfg(feature = "lex")]
3783 let needs_enrichment =
3784 options.instant_index && (options.enable_embedding || is_skim_extraction);
3785 #[cfg(feature = "lex")]
3786 let enrichment_state = if needs_enrichment {
3787 crate::types::EnrichmentState::Searchable
3788 } else {
3789 crate::types::EnrichmentState::Enriched
3790 };
3791 #[cfg(not(feature = "lex"))]
3792 let enrichment_state = crate::types::EnrichmentState::Enriched;
3793
3794 let entry = WalEntryData {
3795 timestamp,
3796 kind: kind_value,
3797 track: track_value,
3798 payload: storage_payload,
3799 embedding,
3800 uri: parent_uri,
3801 title: parent_title,
3802 canonical_encoding,
3803 canonical_length,
3804 metadata,
3805 search_text,
3806 tags,
3807 labels,
3808 extra_metadata,
3809 content_dates,
3810 chunk_manifest: parent_chunk_manifest,
3811 role: options.role,
3812 parent_sequence,
3813 chunk_index: None,
3814 chunk_count: parent_chunk_count,
3815 op: FrameWalOp::Insert,
3816 target_frame_id: None,
3817 supersedes_frame_id: supersedes,
3818 reuse_payload_from,
3819 source_sha256,
3820 source_path: source_path_value,
3821 enrichment_state,
3822 };
3823
3824 let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3825 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3826 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3827
3828 #[cfg(feature = "lex")]
3831 if options.instant_index && self.tantivy.is_some() {
3832 let frame_id = parent_seq as FrameId;
3834
3835 if let Some(ref text) = triplet_text {
3837 if !text.trim().is_empty() {
3838 let temp_frame = Frame {
3840 id: frame_id,
3841 timestamp,
3842 anchor_ts: None,
3843 anchor_source: None,
3844 kind: options.kind.clone(),
3845 track: options.track.clone(),
3846 payload_offset: 0,
3847 payload_length: 0,
3848 checksum: [0u8; 32],
3849 uri: options
3850 .uri
3851 .clone()
3852 .or_else(|| Some(crate::default_uri(frame_id))),
3853 title: options.title.clone(),
3854 canonical_encoding: crate::types::CanonicalEncoding::default(),
3855 canonical_length: None,
3856 metadata: None, search_text: triplet_text.clone(),
3858 tags: instant_index_tags.clone(),
3859 labels: instant_index_labels.clone(),
3860 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3863 role: options.role,
3864 parent_id: None,
3865 chunk_index: None,
3866 chunk_count: None,
3867 status: FrameStatus::Active,
3868 supersedes,
3869 superseded_by: None,
3870 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3873 };
3874
3875 if let Some(engine) = self.tantivy.as_mut() {
3877 engine.add_frame(&temp_frame, text)?;
3878 engine.soft_commit()?;
3879 self.tantivy_dirty = true;
3880
3881 tracing::debug!(
3882 frame_id = frame_id,
3883 "instant index: frame searchable immediately"
3884 );
3885 }
3886 }
3887 }
3888 }
3889
3890 #[cfg(feature = "lex")]
3894 if needs_enrichment {
3895 let frame_id = parent_seq as FrameId;
3896 self.toc.enrichment_queue.push(frame_id);
3897 tracing::debug!(
3898 frame_id = frame_id,
3899 is_skim = is_skim_extraction,
3900 needs_embedding = options.enable_embedding,
3901 "queued frame for background enrichment"
3902 );
3903 }
3904
3905 for mut chunk_entry in chunk_entries {
3906 chunk_entry.parent_sequence = Some(parent_seq);
3907 let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3908 self.append_wal_entry(&chunk_bytes)?;
3909 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3910 }
3911
3912 self.dirty = true;
3913 let suppress_checkpoint = self
3914 .batch_opts
3915 .as_ref()
3916 .is_some_and(|o| o.disable_auto_checkpoint);
3917 if !suppress_checkpoint && self.wal.should_checkpoint() {
3918 self.commit()?;
3919 }
3920
3921 #[cfg(feature = "replay")]
3923 if let Some(input_bytes) = payload {
3924 self.record_put_action(parent_seq, input_bytes);
3925 }
3926
3927 if should_extract_triplets {
3930 if let Some(ref text) = triplet_text {
3931 if !text.trim().is_empty() {
3932 let extractor = TripletExtractor::default();
3933 let frame_id = parent_seq as FrameId;
3934 let (cards, _stats) = extractor.extract(
3935 frame_id,
3936 text,
3937 triplet_uri.as_deref(),
3938 triplet_title.as_deref(),
3939 timestamp,
3940 );
3941
3942 if !cards.is_empty() {
3943 let card_ids = self.memories_track.add_cards(cards);
3945
3946 self.memories_track
3948 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3949 }
3950 }
3951 }
3952 }
3953
3954 Ok(parent_seq)
3955 }
3956}
3957
3958#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3959#[serde(rename_all = "snake_case")]
3960pub(crate) enum FrameWalOp {
3961 Insert,
3962 Tombstone,
3963}
3964
3965impl Default for FrameWalOp {
3966 fn default() -> Self {
3967 Self::Insert
3968 }
3969}
3970
3971#[derive(Debug, Serialize, Deserialize)]
3972enum WalEntry {
3973 Frame(WalEntryData),
3974 #[cfg(feature = "lex")]
3975 Lex(LexWalBatch),
3976}
3977
3978fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3979 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3980 return Ok(entry);
3981 }
3982 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3983 Ok(WalEntry::Frame(legacy))
3984}
3985
3986#[derive(Debug, Serialize, Deserialize)]
3987pub(crate) struct WalEntryData {
3988 pub(crate) timestamp: i64,
3989 pub(crate) kind: Option<String>,
3990 pub(crate) track: Option<String>,
3991 pub(crate) payload: Vec<u8>,
3992 pub(crate) embedding: Option<Vec<f32>>,
3993 #[serde(default)]
3994 pub(crate) uri: Option<String>,
3995 #[serde(default)]
3996 pub(crate) title: Option<String>,
3997 #[serde(default)]
3998 pub(crate) canonical_encoding: CanonicalEncoding,
3999 #[serde(default)]
4000 pub(crate) canonical_length: Option<u64>,
4001 #[serde(default)]
4002 pub(crate) metadata: Option<DocMetadata>,
4003 #[serde(default)]
4004 pub(crate) search_text: Option<String>,
4005 #[serde(default)]
4006 pub(crate) tags: Vec<String>,
4007 #[serde(default)]
4008 pub(crate) labels: Vec<String>,
4009 #[serde(default)]
4010 pub(crate) extra_metadata: BTreeMap<String, String>,
4011 #[serde(default)]
4012 pub(crate) content_dates: Vec<String>,
4013 #[serde(default)]
4014 pub(crate) chunk_manifest: Option<TextChunkManifest>,
4015 #[serde(default)]
4016 pub(crate) role: FrameRole,
4017 #[serde(default)]
4018 pub(crate) parent_sequence: Option<u64>,
4019 #[serde(default)]
4020 pub(crate) chunk_index: Option<u32>,
4021 #[serde(default)]
4022 pub(crate) chunk_count: Option<u32>,
4023 #[serde(default)]
4024 pub(crate) op: FrameWalOp,
4025 #[serde(default)]
4026 pub(crate) target_frame_id: Option<FrameId>,
4027 #[serde(default)]
4028 pub(crate) supersedes_frame_id: Option<FrameId>,
4029 #[serde(default)]
4030 pub(crate) reuse_payload_from: Option<FrameId>,
4031 #[serde(default)]
4033 pub(crate) source_sha256: Option<[u8; 32]>,
4034 #[serde(default)]
4036 pub(crate) source_path: Option<String>,
4037 #[serde(default)]
4039 pub(crate) enrichment_state: crate::types::EnrichmentState,
4040}
4041
4042pub(crate) fn prepare_canonical_payload(
4043 payload: &[u8],
4044) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4045 prepare_canonical_payload_with_level(payload, 3)
4046}
4047
4048pub(crate) fn prepare_canonical_payload_with_level(
4049 payload: &[u8],
4050 level: i32,
4051) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4052 if level == 0 {
4053 return Ok((
4055 payload.to_vec(),
4056 CanonicalEncoding::Plain,
4057 Some(payload.len() as u64),
4058 ));
4059 }
4060 if std::str::from_utf8(payload).is_ok() {
4061 let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4062 Ok((
4063 compressed,
4064 CanonicalEncoding::Zstd,
4065 Some(payload.len() as u64),
4066 ))
4067 } else {
4068 Ok((
4069 payload.to_vec(),
4070 CanonicalEncoding::Plain,
4071 Some(payload.len() as u64),
4072 ))
4073 }
4074}
4075
4076pub(crate) fn augment_search_text(
4077 base: Option<String>,
4078 uri: Option<&str>,
4079 title: Option<&str>,
4080 track: Option<&str>,
4081 tags: &[String],
4082 labels: &[String],
4083 extra_metadata: &BTreeMap<String, String>,
4084 content_dates: &[String],
4085 metadata: Option<&DocMetadata>,
4086) -> Option<String> {
4087 let mut segments: Vec<String> = Vec::new();
4088 if let Some(text) = base {
4089 let trimmed = text.trim();
4090 if !trimmed.is_empty() {
4091 segments.push(trimmed.to_string());
4092 }
4093 }
4094
4095 if let Some(title) = title {
4096 if !title.trim().is_empty() {
4097 segments.push(format!("title: {}", title.trim()));
4098 }
4099 }
4100
4101 if let Some(uri) = uri {
4102 if !uri.trim().is_empty() {
4103 segments.push(format!("uri: {}", uri.trim()));
4104 }
4105 }
4106
4107 if let Some(track) = track {
4108 if !track.trim().is_empty() {
4109 segments.push(format!("track: {}", track.trim()));
4110 }
4111 }
4112
4113 if !tags.is_empty() {
4114 segments.push(format!("tags: {}", tags.join(" ")));
4115 }
4116
4117 if !labels.is_empty() {
4118 segments.push(format!("labels: {}", labels.join(" ")));
4119 }
4120
4121 if !extra_metadata.is_empty() {
4122 for (key, value) in extra_metadata {
4123 if value.trim().is_empty() {
4124 continue;
4125 }
4126 segments.push(format!("{key}: {value}"));
4127 }
4128 }
4129
4130 if !content_dates.is_empty() {
4131 segments.push(format!("dates: {}", content_dates.join(" ")));
4132 }
4133
4134 if let Some(meta) = metadata {
4135 if let Ok(meta_json) = serde_json::to_string(meta) {
4136 segments.push(format!("metadata: {meta_json}"));
4137 }
4138 }
4139
4140 if segments.is_empty() {
4141 None
4142 } else {
4143 Some(segments.join("\n"))
4144 }
4145}
4146
4147pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4148 if additions.is_empty() {
4149 return;
4150 }
4151 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4152 for value in additions {
4153 let trimmed = value.trim();
4154 if trimmed.is_empty() {
4155 continue;
4156 }
4157 let candidate = trimmed.to_string();
4158 if seen.insert(candidate.clone()) {
4159 target.push(candidate);
4160 }
4161 }
4162}