1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58 PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 #[must_use]
210 pub fn new(mode: CommitMode) -> Self {
211 Self {
212 mode,
213 background: false,
214 }
215 }
216
217 #[must_use]
218 pub fn background(mut self, background: bool) -> Self {
219 self.background = background;
220 self
221 }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226 REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230 mime: Option<&str>,
231 magic: Option<&[u8]>,
232 uri: Option<&str>,
233) -> Option<DocumentFormat> {
234 if detect_pdf_magic(magic) {
236 return Some(DocumentFormat::Pdf);
237 }
238
239 if let Some(magic_bytes) = magic {
242 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243 if let Some(format) = infer_format_from_extension(uri) {
245 return Some(format);
246 }
247 }
248 }
249
250 if let Some(mime_str) = mime {
252 let mime_lower = mime_str.trim().to_ascii_lowercase();
253 let format = match mime_lower.as_str() {
254 "application/pdf" => Some(DocumentFormat::Pdf),
255 "text/plain" => Some(DocumentFormat::PlainText),
256 "text/markdown" => Some(DocumentFormat::Markdown),
257 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259 Some(DocumentFormat::Docx)
260 }
261 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262 Some(DocumentFormat::Xlsx)
263 }
264 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266 Some(DocumentFormat::Pptx)
267 }
268 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269 _ => None,
270 };
271 if format.is_some() {
272 return format;
273 }
274 }
275
276 infer_format_from_extension(uri)
278}
279
280fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282 let uri = uri?;
283 let path = std::path::Path::new(uri);
284 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285 match ext.as_str() {
286 "pdf" => Some(DocumentFormat::Pdf),
287 "docx" => Some(DocumentFormat::Docx),
288 "xlsx" => Some(DocumentFormat::Xlsx),
289 "xls" => Some(DocumentFormat::Xls),
290 "pptx" => Some(DocumentFormat::Pptx),
291 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294 | "sql" => Some(DocumentFormat::PlainText),
295 "md" | "markdown" => Some(DocumentFormat::Markdown),
296 "html" | "htm" => Some(DocumentFormat::Html),
297 _ => None,
298 }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302 let mut slice = match magic {
303 Some(slice) if !slice.is_empty() => slice,
304 _ => return false,
305 };
306 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307 slice = &slice[3..];
308 }
309 while let Some((first, rest)) = slice.split_first() {
310 if first.is_ascii_whitespace() {
311 slice = rest;
312 } else {
313 break;
314 }
315 }
316 slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320 target = "memvid::extract",
321 skip_all,
322 fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325 bytes: &[u8],
326 mime_hint: Option<&str>,
327 uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329 let registry = default_reader_registry();
330 let magic = bytes
331 .get(..MAGIC_SNIFF_BYTES)
332 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334 .with_uri(uri)
335 .with_magic(magic);
336
337 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338 let start = Instant::now();
339 match reader.extract(bytes, &hint) {
340 Ok(output) => {
341 return Ok(finalize_reader_output(output, start));
342 }
343 Err(err) => {
344 tracing::error!(
345 target = "memvid::extract",
346 reader = reader.name(),
347 error = %err,
348 "reader failed; falling back"
349 );
350 Some(format!("reader {} failed: {err}", reader.name()))
351 }
352 }
353 } else {
354 tracing::debug!(
355 target = "memvid::extract",
356 format = hint.format.map(super::super::reader::DocumentFormat::label),
357 "no reader matched; using default extractor"
358 );
359 Some("no registered reader matched; using default extractor".to_string())
360 };
361
362 let start = Instant::now();
363 let mut output = PassthroughReader.extract(bytes, &hint)?;
364 if let Some(reason) = fallback_reason {
365 output.diagnostics.track_warning(reason);
366 }
367 Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371 let elapsed = start.elapsed();
372 let ReaderOutput {
373 document,
374 reader_name,
375 diagnostics,
376 } = output;
377 log_reader_result(&reader_name, &diagnostics, elapsed);
378 document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382 let duration_ms = diagnostics
383 .duration_ms
384 .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385 let warnings = diagnostics.warnings.len();
386 let pages = diagnostics.pages_processed;
387
388 if warnings > 0 || diagnostics.fallback {
389 tracing::warn!(
390 target = "memvid::extract",
391 reader,
392 duration_ms,
393 pages,
394 warnings,
395 fallback = diagnostics.fallback,
396 "extraction completed with warnings"
397 );
398 for warning in &diagnostics.warnings {
399 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400 }
401 } else {
402 tracing::info!(
403 target = "memvid::extract",
404 reader,
405 duration_ms,
406 pages,
407 "extraction completed"
408 );
409 }
410}
411
412impl Memvid {
413 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416 where
417 F: FnOnce(&mut Self) -> Result<()>,
418 {
419 self.file.sync_all()?;
420 let mut staging = CommitStaging::prepare(self.path())?;
421 staging.copy_from(&self.file)?;
422
423 let staging_handle = staging.clone_file()?;
424 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425 let original_file = std::mem::replace(&mut self.file, staging_handle);
426 let original_wal = std::mem::replace(&mut self.wal, new_wal);
427 let original_header = self.header.clone();
428 let original_toc = self.toc.clone();
429 let original_data_end = self.data_end;
430 let original_generation = self.generation;
431 let original_dirty = self.dirty;
432 let original_lex_enabled = self.lex_enabled;
433 #[cfg(feature = "lex")]
434 let original_tantivy_dirty = self.tantivy_dirty;
435
436 let destination_path = self.path().to_path_buf();
437 let mut original_file = Some(original_file);
438 let mut original_wal = Some(original_wal);
439
440 match op(self) {
441 Ok(()) => {
442 self.file.sync_all()?;
443 match staging.commit() {
444 Ok(()) => {
445 drop(original_file.take());
446 drop(original_wal.take());
447 self.file = OpenOptions::new()
448 .read(true)
449 .write(true)
450 .open(&destination_path)?;
451 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
452 Ok(())
453 }
454 Err(commit_err) => {
455 if let Some(file) = original_file.take() {
456 self.file = file;
457 }
458 if let Some(wal) = original_wal.take() {
459 self.wal = wal;
460 }
461 self.header = original_header;
462 self.toc = original_toc;
463 self.data_end = original_data_end;
464 self.generation = original_generation;
465 self.dirty = original_dirty;
466 self.lex_enabled = original_lex_enabled;
467 #[cfg(feature = "lex")]
468 {
469 self.tantivy_dirty = original_tantivy_dirty;
470 }
471 Err(commit_err)
472 }
473 }
474 }
475 Err(err) => {
476 let _ = staging.discard();
477 if let Some(file) = original_file.take() {
478 self.file = file;
479 }
480 if let Some(wal) = original_wal.take() {
481 self.wal = wal;
482 }
483 self.header = original_header;
484 self.toc = original_toc;
485 self.data_end = original_data_end;
486 self.generation = original_generation;
487 self.dirty = original_dirty;
488 self.lex_enabled = original_lex_enabled;
489 #[cfg(feature = "lex")]
490 {
491 self.tantivy_dirty = original_tantivy_dirty;
492 }
493 Err(err)
494 }
495 }
496 }
497
498 pub(crate) fn catalog_data_end(&self) -> u64 {
499 let mut max_end = self.header.wal_offset + self.header.wal_size;
500
501 for descriptor in &self.toc.segment_catalog.lex_segments {
502 if descriptor.common.bytes_length == 0 {
503 continue;
504 }
505 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
506 }
507
508 for descriptor in &self.toc.segment_catalog.vec_segments {
509 if descriptor.common.bytes_length == 0 {
510 continue;
511 }
512 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
513 }
514
515 for descriptor in &self.toc.segment_catalog.time_segments {
516 if descriptor.common.bytes_length == 0 {
517 continue;
518 }
519 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
520 }
521
522 #[cfg(feature = "temporal_track")]
523 for descriptor in &self.toc.segment_catalog.temporal_segments {
524 if descriptor.common.bytes_length == 0 {
525 continue;
526 }
527 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
528 }
529
530 #[cfg(feature = "lex")]
531 for descriptor in &self.toc.segment_catalog.tantivy_segments {
532 if descriptor.common.bytes_length == 0 {
533 continue;
534 }
535 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
536 }
537
538 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
539 if manifest.bytes_length != 0 {
540 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
541 }
542 }
543
544 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
545 if manifest.bytes_length != 0 {
546 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
547 }
548 }
549
550 if let Some(manifest) = self.toc.time_index.as_ref() {
551 if manifest.bytes_length != 0 {
552 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
553 }
554 }
555
556 #[cfg(feature = "temporal_track")]
557 if let Some(track) = self.toc.temporal_track.as_ref() {
558 if track.bytes_length != 0 {
559 max_end = max_end.max(track.bytes_offset + track.bytes_length);
560 }
561 }
562
563 max_end
564 }
565
566 fn payload_region_end(&self) -> u64 {
567 self.cached_payload_end
568 }
569
570 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
571 loop {
572 match self.wal.append_entry(payload) {
573 Ok(seq) => return Ok(seq),
574 Err(MemvidError::CheckpointFailed { reason })
575 if reason == "embedded WAL region too small for entry"
576 || reason == "embedded WAL region full" =>
577 {
578 let required = WAL_ENTRY_HEADER_SIZE
581 .saturating_add(payload.len() as u64)
582 .max(self.header.wal_size + 1);
583 self.grow_wal_region(required)?;
584 }
585 Err(err) => return Err(err),
586 }
587 }
588 }
589
590 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
591 let mut new_size = self.header.wal_size;
592 let mut target = required_entry_size;
593 if target == 0 {
594 target = self.header.wal_size;
595 }
596 while new_size <= target {
597 new_size = new_size
598 .checked_mul(2)
599 .ok_or_else(|| MemvidError::CheckpointFailed {
600 reason: "wal_size overflow".into(),
601 })?;
602 }
603 let delta = new_size - self.header.wal_size;
604 if delta == 0 {
605 return Ok(());
606 }
607
608 self.shift_data_for_wal_growth(delta)?;
609 self.header.wal_size = new_size;
610 self.apply_wal_growth_offsets(delta);
611
612 let catalog_end = self.catalog_data_end();
613 self.header.footer_offset = catalog_end
614 .max(self.header.footer_offset)
615 .max(self.data_end);
616
617 self.rewrite_toc_footer()?;
618 self.header.toc_checksum = self.toc.toc_checksum;
619 crate::persist_header(&mut self.file, &self.header)?;
620 self.file.sync_all()?;
621 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
622 Ok(())
623 }
624
625 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
626 if delta == 0 {
627 return Ok(());
628 }
629 let original_len = self.file.metadata()?.len();
630 let data_start = self.header.wal_offset + self.header.wal_size;
631 self.file.set_len(original_len + delta)?;
632
633 let mut remaining = original_len.saturating_sub(data_start);
634 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
635 while remaining > 0 {
636 let chunk = min(remaining, buffer.len() as u64);
637 let src = data_start + remaining - chunk;
638 self.file.seek(SeekFrom::Start(src))?;
639 #[allow(clippy::cast_possible_truncation)]
640 self.file.read_exact(&mut buffer[..chunk as usize])?;
641 let dst = src + delta;
642 self.file.seek(SeekFrom::Start(dst))?;
643 #[allow(clippy::cast_possible_truncation)]
644 self.file.write_all(&buffer[..chunk as usize])?;
645 remaining -= chunk;
646 }
647
648 self.file.seek(SeekFrom::Start(data_start))?;
649 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
650 let mut remaining = delta;
651 while remaining > 0 {
652 let write = min(remaining, zero_buf.len() as u64);
653 #[allow(clippy::cast_possible_truncation)]
654 self.file.write_all(&zero_buf[..write as usize])?;
655 remaining -= write;
656 }
657 Ok(())
658 }
659
660 fn apply_wal_growth_offsets(&mut self, delta: u64) {
661 if delta == 0 {
662 return;
663 }
664
665 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
666 self.data_end = self.data_end.saturating_add(delta);
667 self.cached_payload_end = self.cached_payload_end.saturating_add(delta);
675 self.adjust_offsets_after_wal_growth(delta);
676 }
677
678 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
679 if delta == 0 {
680 return;
681 }
682
683 for frame in &mut self.toc.frames {
684 if frame.payload_offset != 0 {
685 frame.payload_offset += delta;
686 }
687 }
688
689 for segment in &mut self.toc.segments {
690 if segment.bytes_offset != 0 {
691 segment.bytes_offset += delta;
692 }
693 }
694
695 if let Some(lex) = self.toc.indexes.lex.as_mut() {
696 if lex.bytes_offset != 0 {
697 lex.bytes_offset += delta;
698 }
699 }
700 for manifest in &mut self.toc.indexes.lex_segments {
701 if manifest.bytes_offset != 0 {
702 manifest.bytes_offset += delta;
703 }
704 }
705 if let Some(vec) = self.toc.indexes.vec.as_mut() {
706 if vec.bytes_offset != 0 {
707 vec.bytes_offset += delta;
708 }
709 }
710 if let Some(time_index) = self.toc.time_index.as_mut() {
711 if time_index.bytes_offset != 0 {
712 time_index.bytes_offset += delta;
713 }
714 }
715 #[cfg(feature = "temporal_track")]
716 if let Some(track) = self.toc.temporal_track.as_mut() {
717 if track.bytes_offset != 0 {
718 track.bytes_offset += delta;
719 }
720 }
721
722 let catalog = &mut self.toc.segment_catalog;
723 for descriptor in &mut catalog.lex_segments {
724 if descriptor.common.bytes_offset != 0 {
725 descriptor.common.bytes_offset += delta;
726 }
727 }
728 for descriptor in &mut catalog.vec_segments {
729 if descriptor.common.bytes_offset != 0 {
730 descriptor.common.bytes_offset += delta;
731 }
732 }
733 for descriptor in &mut catalog.time_segments {
734 if descriptor.common.bytes_offset != 0 {
735 descriptor.common.bytes_offset += delta;
736 }
737 }
738 #[cfg(feature = "temporal_track")]
739 for descriptor in &mut catalog.temporal_segments {
740 if descriptor.common.bytes_offset != 0 {
741 descriptor.common.bytes_offset += delta;
742 }
743 }
744 for descriptor in &mut catalog.tantivy_segments {
745 if descriptor.common.bytes_offset != 0 {
746 descriptor.common.bytes_offset += delta;
747 }
748 }
749
750 #[cfg(feature = "lex")]
751 if let Ok(mut storage) = self.lex_storage.write() {
752 storage.adjust_offsets(delta);
753 }
754 }
755 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
756 self.ensure_writable()?;
757 if options.background {
758 tracing::debug!("commit background flag ignored; running synchronously");
759 }
760 let mode = options.mode;
761 let records = self.wal.pending_records()?;
762 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
763 return Ok(());
764 }
765 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
766 }
767
768 pub fn commit(&mut self) -> Result<()> {
769 self.ensure_writable()?;
770 self.commit_with_options(CommitOptions::new(CommitMode::Full))
771 }
772
773 pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
784 if opts.wal_pre_size_bytes > 0 {
785 self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
786 }
787 self.wal.set_skip_sync(opts.skip_sync);
788 self.batch_opts = Some(opts);
789 Ok(())
790 }
791
792 fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
801 if min_bytes <= self.header.wal_size {
802 return Ok(());
803 }
804 let target = min_bytes.next_power_of_two();
806 let delta = target.saturating_sub(self.header.wal_size);
807 if delta == 0 {
808 return Ok(());
809 }
810
811 tracing::info!(
812 current_wal = self.header.wal_size,
813 target_wal = target,
814 delta,
815 "pre-sizing WAL for batch mode"
816 );
817
818 self.shift_data_for_wal_growth(delta)?;
819 self.header.wal_size = target;
820 self.apply_wal_growth_offsets(delta);
821
822 let catalog_end = self.catalog_data_end();
823 self.header.footer_offset = catalog_end
824 .max(self.header.footer_offset)
825 .max(self.data_end);
826
827 self.rewrite_toc_footer()?;
828 self.header.toc_checksum = self.toc.toc_checksum;
829 crate::persist_header(&mut self.file, &self.header)?;
830 self.file.sync_all()?;
831 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
832 Ok(())
833 }
834
835 pub fn end_batch(&mut self) -> Result<()> {
840 self.wal.flush()?;
842 self.wal.set_skip_sync(false);
843 self.batch_opts = None;
844 Ok(())
845 }
846
847 pub fn commit_skip_indexes(&mut self) -> Result<()> {
856 self.ensure_writable()?;
857 let records = self.wal.pending_records()?;
858 if records.is_empty() && !self.dirty {
859 return Ok(());
860 }
861 self.commit_skip_indexes_inner(records)
862 }
863
864 fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
865 self.generation = self.generation.wrapping_add(1);
866
867 #[cfg(feature = "lex")]
870 let tantivy_backup = self.tantivy.take();
871
872 let result = self.apply_records(records);
873
874 #[cfg(feature = "lex")]
876 {
877 self.tantivy = tantivy_backup;
878 self.tantivy_dirty = false;
879 }
880
881 let _delta = result?;
882
883 self.header.footer_offset = self.data_end;
885
886 self.toc.time_index = None;
889 self.toc.indexes.lex_segments.clear();
890 if let Some(vec) = self.toc.indexes.vec.as_mut() {
892 vec.bytes_offset = 0;
893 vec.bytes_length = 0;
894 vec.vector_count = 0;
895 }
896 self.toc.indexes.lex = None;
897 self.toc.indexes.clip = None;
898 self.toc.segment_catalog.lex_segments.clear();
899 self.toc.segment_catalog.vec_segments.clear();
900 self.toc.segment_catalog.time_segments.clear();
901 self.toc.segment_catalog.tantivy_segments.clear();
902 #[cfg(feature = "temporal_track")]
903 {
904 self.toc.temporal_track = None;
905 self.toc.segment_catalog.temporal_segments.clear();
906 }
907 self.toc.memories_track = None;
908 self.toc.logic_mesh = None;
909 self.toc.sketch_track = None;
910
911 self.rewrite_toc_footer()?;
912 self.header.toc_checksum = self.toc.toc_checksum;
913 self.wal.record_checkpoint(&mut self.header)?;
914 self.header.toc_checksum = self.toc.toc_checksum;
915 crate::persist_header(&mut self.file, &self.header)?;
916 self.file.sync_all()?;
917 self.pending_frame_inserts = 0;
918 self.dirty = false;
919 Ok(())
920 }
921
922 pub fn finalize_indexes(&mut self) -> Result<()> {
928 self.ensure_writable()?;
929 self.rebuild_indexes(&[], &[])?;
930 self.rewrite_toc_footer()?;
931 self.header.toc_checksum = self.toc.toc_checksum;
932 crate::persist_header(&mut self.file, &self.header)?;
933 self.file.sync_all()?;
934 Ok(())
935 }
936
937 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
938 self.generation = self.generation.wrapping_add(1);
939
940 let delta = self.apply_records(records)?;
941 let mut indexes_rebuilt = false;
942
943 let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
945
946 if !delta.is_empty() || clip_needs_persist {
947 tracing::debug!(
948 inserted_frames = delta.inserted_frames.len(),
949 inserted_embeddings = delta.inserted_embeddings.len(),
950 inserted_time_entries = delta.inserted_time_entries.len(),
951 clip_needs_persist = clip_needs_persist,
952 "commit applied delta"
953 );
954 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
955 indexes_rebuilt = true;
956 }
957
958 if !indexes_rebuilt && self.tantivy_index_pending() {
959 self.flush_tantivy()?;
960 }
961
962 if !indexes_rebuilt && self.clip_enabled {
964 if let Some(ref clip_index) = self.clip_index {
965 if !clip_index.is_empty() {
966 self.persist_clip_index()?;
967 }
968 }
969 }
970
971 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
973 self.persist_memories_track()?;
974 }
975
976 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
978 self.persist_logic_mesh()?;
979 }
980
981 if !self.sketch_track.is_empty() {
983 self.persist_sketch_track()?;
984 }
985
986 self.rewrite_toc_footer()?;
990 self.header.toc_checksum = self.toc.toc_checksum;
991 self.wal.record_checkpoint(&mut self.header)?;
992 self.header.toc_checksum = self.toc.toc_checksum;
993 crate::persist_header(&mut self.file, &self.header)?;
994 self.file.sync_all()?;
995 #[cfg(feature = "parallel_segments")]
996 if let Some(wal) = self.manifest_wal.as_mut() {
997 wal.flush()?;
998 wal.truncate()?;
999 }
1000 self.pending_frame_inserts = 0;
1001 self.dirty = false;
1002 Ok(())
1003 }
1004
1005 #[cfg(feature = "parallel_segments")]
1006 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
1007 self.ensure_writable()?;
1008 if !self.dirty && !self.tantivy_index_pending() {
1009 return Ok(());
1010 }
1011 let opts = opts.clone();
1012 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
1013 }
1014
1015 #[cfg(feature = "parallel_segments")]
1016 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1017 if !self.dirty && !self.tantivy_index_pending() {
1018 return Ok(());
1019 }
1020 let records = self.wal.pending_records()?;
1021 let delta = self.apply_records(records)?;
1022 self.generation = self.generation.wrapping_add(1);
1023 let mut indexes_rebuilt = false;
1024 if !delta.is_empty() {
1025 tracing::info!(
1026 inserted_frames = delta.inserted_frames.len(),
1027 inserted_embeddings = delta.inserted_embeddings.len(),
1028 inserted_time_entries = delta.inserted_time_entries.len(),
1029 "parallel commit applied delta"
1030 );
1031 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1033 tracing::info!(
1034 "parallel_commit: used_parallel={}, lex_enabled={}",
1035 used_parallel,
1036 self.lex_enabled
1037 );
1038 if used_parallel {
1039 self.header.footer_offset = self.data_end;
1042 indexes_rebuilt = true;
1043
1044 #[cfg(feature = "lex")]
1047 if self.lex_enabled {
1048 tracing::info!(
1049 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1050 delta.inserted_frames.len(),
1051 self.toc.frames.len()
1052 );
1053
1054 let tantivy_was_present = self.tantivy.is_some();
1058 if self.tantivy.is_none() {
1059 self.init_tantivy()?;
1060 }
1061
1062 if tantivy_was_present {
1064 tracing::info!(
1065 "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1066 );
1067 } else {
1068 let max_payload = crate::memvid::search::max_index_payload();
1070 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1071
1072 for frame_id in &delta.inserted_frames {
1073 let frame = match self.toc.frames.get(*frame_id as usize) {
1075 Some(f) => f.clone(),
1076 None => continue,
1077 };
1078
1079 let explicit_text = frame.search_text.clone();
1081 if let Some(ref search_text) = explicit_text {
1082 if !search_text.trim().is_empty() {
1083 prepared_docs.push((frame, search_text.clone()));
1084 continue;
1085 }
1086 }
1087
1088 let mime = frame
1090 .metadata
1091 .as_ref()
1092 .and_then(|m| m.mime.as_deref())
1093 .unwrap_or("application/octet-stream");
1094
1095 if !crate::memvid::search::is_text_indexable_mime(mime) {
1096 continue;
1097 }
1098
1099 if frame.payload_length > max_payload {
1100 continue;
1101 }
1102
1103 let text = self.frame_search_text(&frame)?;
1104 if !text.trim().is_empty() {
1105 prepared_docs.push((frame, text));
1106 }
1107 }
1108
1109 if let Some(ref mut engine) = self.tantivy {
1111 for (frame, text) in &prepared_docs {
1112 engine.add_frame(frame, text)?;
1113 }
1114
1115 if !prepared_docs.is_empty() {
1116 engine.commit()?;
1117 self.tantivy_dirty = true;
1118 }
1119
1120 tracing::info!(
1121 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1122 prepared_docs.len(),
1123 engine.num_docs()
1124 );
1125 } else {
1126 tracing::warn!(
1127 "parallel_commit: Tantivy engine is None after init_tantivy"
1128 );
1129 }
1130 } }
1132
1133 self.file.seek(SeekFrom::Start(self.data_end))?;
1136 let mut time_entries: Vec<TimeIndexEntry> = self
1137 .toc
1138 .frames
1139 .iter()
1140 .filter(|frame| {
1141 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1142 })
1143 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1144 .collect();
1145 let (ti_offset, ti_length, ti_checksum) =
1146 time_index_append(&mut self.file, &mut time_entries)?;
1147 self.toc.time_index = Some(TimeIndexManifest {
1148 bytes_offset: ti_offset,
1149 bytes_length: ti_length,
1150 entry_count: time_entries.len() as u64,
1151 checksum: ti_checksum,
1152 });
1153 self.data_end = ti_offset + ti_length;
1155 self.header.footer_offset = self.data_end;
1156 tracing::info!(
1157 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1158 ti_offset,
1159 ti_length,
1160 time_entries.len()
1161 );
1162 } else {
1163 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1165 indexes_rebuilt = true;
1166 }
1167 }
1168
1169 #[cfg(feature = "lex")]
1171 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1172 self.flush_tantivy()?;
1173 }
1174
1175 if self.clip_enabled {
1177 if let Some(ref clip_index) = self.clip_index {
1178 if !clip_index.is_empty() {
1179 self.persist_clip_index()?;
1180 }
1181 }
1182 }
1183
1184 if self.memories_track.card_count() > 0 {
1186 self.persist_memories_track()?;
1187 }
1188
1189 if !self.logic_mesh.is_empty() {
1191 self.persist_logic_mesh()?;
1192 }
1193
1194 if !self.sketch_track.is_empty() {
1196 self.persist_sketch_track()?;
1197 }
1198
1199 self.rewrite_toc_footer()?;
1202 self.header.toc_checksum = self.toc.toc_checksum;
1203 self.wal.record_checkpoint(&mut self.header)?;
1204 self.header.toc_checksum = self.toc.toc_checksum;
1205 crate::persist_header(&mut self.file, &self.header)?;
1206 self.file.sync_all()?;
1207 if let Some(wal) = self.manifest_wal.as_mut() {
1208 wal.flush()?;
1209 wal.truncate()?;
1210 }
1211 self.pending_frame_inserts = 0;
1212 self.dirty = false;
1213 Ok(())
1214 }
1215
1216 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1217 let records = self.wal.records_after(self.header.wal_sequence)?;
1218 if records.is_empty() {
1219 if self.tantivy_index_pending() {
1220 self.flush_tantivy()?;
1221 }
1222 return Ok(());
1223 }
1224 let delta = self.apply_records(records)?;
1225 if !delta.is_empty() {
1226 tracing::debug!(
1227 inserted_frames = delta.inserted_frames.len(),
1228 inserted_embeddings = delta.inserted_embeddings.len(),
1229 inserted_time_entries = delta.inserted_time_entries.len(),
1230 "recover applied delta"
1231 );
1232 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1233 } else if self.tantivy_index_pending() {
1234 self.flush_tantivy()?;
1235 }
1236 self.wal.record_checkpoint(&mut self.header)?;
1237 crate::persist_header(&mut self.file, &self.header)?;
1238 if !delta.is_empty() {
1239 } else if self.tantivy_index_pending() {
1241 self.flush_tantivy()?;
1242 crate::persist_header(&mut self.file, &self.header)?;
1243 }
1244 self.file.sync_all()?;
1245 self.pending_frame_inserts = 0;
1246 self.dirty = false;
1247 Ok(())
1248 }
1249
1250 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1251 let mut delta = IngestionDelta::default();
1252 if records.is_empty() {
1253 return Ok(delta);
1254 }
1255
1256 let mut data_cursor = self.data_end;
1261 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1262
1263 if !records.is_empty() {
1264 self.file.seek(SeekFrom::Start(data_cursor))?;
1265 for record in records {
1266 let mut entry = match decode_wal_entry(&record.payload)? {
1267 WalEntry::Frame(entry) => entry,
1268 #[cfg(feature = "lex")]
1269 WalEntry::Lex(batch) => {
1270 self.apply_lex_wal(batch)?;
1271 continue;
1272 }
1273 };
1274
1275 match entry.op {
1276 FrameWalOp::Insert => {
1277 let frame_id = self.toc.frames.len() as u64;
1278
1279 let (
1280 payload_offset,
1281 payload_length,
1282 checksum_bytes,
1283 canonical_length_value,
1284 ) = if let Some(source_id) = entry.reuse_payload_from {
1285 if !entry.payload.is_empty() {
1286 return Err(MemvidError::InvalidFrame {
1287 frame_id: source_id,
1288 reason: "reused payload entry contained inline bytes",
1289 });
1290 }
1291 let source_idx = usize::try_from(source_id).map_err(|_| {
1292 MemvidError::InvalidFrame {
1293 frame_id: source_id,
1294 reason: "frame id too large for memory",
1295 }
1296 })?;
1297 let source = self.toc.frames.get(source_idx).cloned().ok_or(
1298 MemvidError::InvalidFrame {
1299 frame_id: source_id,
1300 reason: "reused payload source missing",
1301 },
1302 )?;
1303 (
1304 source.payload_offset,
1305 source.payload_length,
1306 source.checksum,
1307 entry
1308 .canonical_length
1309 .or(source.canonical_length)
1310 .unwrap_or(source.payload_length),
1311 )
1312 } else {
1313 self.file.seek(SeekFrom::Start(data_cursor))?;
1314 self.file.write_all(&entry.payload)?;
1315 let checksum = hash(&entry.payload);
1316 let payload_length = entry.payload.len() as u64;
1317 let canonical_length =
1318 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1319 if let Some(len) = entry.canonical_length {
1320 len
1321 } else {
1322 let decoded = crate::decode_canonical_bytes(
1323 &entry.payload,
1324 CanonicalEncoding::Zstd,
1325 frame_id,
1326 )?;
1327 decoded.len() as u64
1328 }
1329 } else {
1330 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1331 };
1332 let payload_offset = data_cursor;
1333 data_cursor += payload_length;
1334 self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1336 (
1337 payload_offset,
1338 payload_length,
1339 *checksum.as_bytes(),
1340 canonical_length,
1341 )
1342 };
1343
1344 let uri = entry
1345 .uri
1346 .clone()
1347 .unwrap_or_else(|| crate::default_uri(frame_id));
1348 let title = entry
1349 .title
1350 .clone()
1351 .or_else(|| crate::infer_title_from_uri(&uri));
1352
1353 #[cfg(feature = "temporal_track")]
1354 let (anchor_ts, anchor_source) =
1355 self.determine_temporal_anchor(entry.timestamp);
1356
1357 let mut frame = Frame {
1358 id: frame_id,
1359 timestamp: entry.timestamp,
1360 anchor_ts: {
1361 #[cfg(feature = "temporal_track")]
1362 {
1363 Some(anchor_ts)
1364 }
1365 #[cfg(not(feature = "temporal_track"))]
1366 {
1367 None
1368 }
1369 },
1370 anchor_source: {
1371 #[cfg(feature = "temporal_track")]
1372 {
1373 Some(anchor_source)
1374 }
1375 #[cfg(not(feature = "temporal_track"))]
1376 {
1377 None
1378 }
1379 },
1380 kind: entry.kind.clone(),
1381 track: entry.track.clone(),
1382 payload_offset,
1383 payload_length,
1384 checksum: checksum_bytes,
1385 uri: Some(uri),
1386 title,
1387 canonical_encoding: entry.canonical_encoding,
1388 canonical_length: Some(canonical_length_value),
1389 metadata: entry.metadata.clone(),
1390 search_text: entry.search_text.clone(),
1391 tags: entry.tags.clone(),
1392 labels: entry.labels.clone(),
1393 extra_metadata: entry.extra_metadata.clone(),
1394 content_dates: entry.content_dates.clone(),
1395 chunk_manifest: entry.chunk_manifest.clone(),
1396 role: entry.role,
1397 parent_id: None,
1398 chunk_index: entry.chunk_index,
1399 chunk_count: entry.chunk_count,
1400 status: FrameStatus::Active,
1401 supersedes: entry.supersedes_frame_id,
1402 superseded_by: None,
1403 source_sha256: entry.source_sha256,
1404 source_path: entry.source_path.clone(),
1405 enrichment_state: entry.enrichment_state,
1406 };
1407
1408 if let Some(parent_seq) = entry.parent_sequence {
1409 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1410 frame.parent_id = Some(*parent_frame_id);
1411 } else {
1412 if entry.role == FrameRole::DocumentChunk {
1418 for &candidate_id in delta.inserted_frames.iter().rev() {
1420 if let Ok(idx) = usize::try_from(candidate_id) {
1421 if let Some(candidate) = self.toc.frames.get(idx) {
1422 if candidate.role == FrameRole::Document
1423 && candidate.chunk_manifest.is_some()
1424 {
1425 frame.parent_id = Some(candidate_id);
1427 tracing::debug!(
1428 chunk_frame_id = frame_id,
1429 parent_frame_id = candidate_id,
1430 parent_seq = parent_seq,
1431 "resolved chunk parent via fallback"
1432 );
1433 break;
1434 }
1435 }
1436 }
1437 }
1438 }
1439 if frame.parent_id.is_none() {
1440 tracing::warn!(
1441 chunk_frame_id = frame_id,
1442 parent_seq = parent_seq,
1443 "chunk has parent_sequence but parent not found in batch"
1444 );
1445 }
1446 }
1447 }
1448
1449 #[cfg(feature = "lex")]
1450 let index_text = if self.tantivy.is_some() {
1451 if let Some(text) = entry.search_text.clone() {
1452 if text.trim().is_empty() {
1453 None
1454 } else {
1455 Some(text)
1456 }
1457 } else {
1458 Some(self.frame_content(&frame)?)
1459 }
1460 } else {
1461 None
1462 };
1463 #[cfg(feature = "lex")]
1464 if let (Some(engine), Some(text)) =
1465 (self.tantivy.as_mut(), index_text.as_ref())
1466 {
1467 engine.add_frame(&frame, text)?;
1468 self.tantivy_dirty = true;
1469
1470 if !text.trim().is_empty() {
1473 let entry = crate::types::generate_sketch(
1474 frame_id,
1475 text,
1476 crate::types::SketchVariant::Small,
1477 None,
1478 );
1479 self.sketch_track.insert(entry);
1480 }
1481 }
1482
1483 if let Some(embedding) = entry.embedding.take() {
1484 delta
1485 .inserted_embeddings
1486 .push((frame_id, embedding.clone()));
1487 }
1488
1489 if entry.role == FrameRole::Document {
1490 delta
1491 .inserted_time_entries
1492 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1493 #[cfg(feature = "temporal_track")]
1494 {
1495 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1496 frame_id,
1497 anchor_ts,
1498 anchor_source,
1499 ));
1500 delta.inserted_temporal_mentions.extend(
1501 Self::collect_temporal_mentions(
1502 entry.search_text.as_deref(),
1503 frame_id,
1504 anchor_ts,
1505 ),
1506 );
1507 }
1508 }
1509
1510 if let Some(predecessor) = frame.supersedes {
1511 self.mark_frame_superseded(predecessor, frame_id)?;
1512 }
1513
1514 self.toc.frames.push(frame);
1515 delta.inserted_frames.push(frame_id);
1516 sequence_to_frame.insert(record.sequence, frame_id);
1517 }
1518 FrameWalOp::Tombstone => {
1519 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1520 frame_id: 0,
1521 reason: "tombstone missing frame reference",
1522 })?;
1523 self.mark_frame_deleted(target)?;
1524 delta.mutated_frames = true;
1525 }
1526 }
1527 }
1528 self.data_end = self.data_end.max(data_cursor);
1529 }
1530
1531 let orphan_resolutions: Vec<(u64, u64)> = delta
1535 .inserted_frames
1536 .iter()
1537 .filter_map(|&frame_id| {
1538 let idx = usize::try_from(frame_id).ok()?;
1539 let frame = self.toc.frames.get(idx)?;
1540 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1541 return None;
1542 }
1543 for candidate_id in (0..frame_id).rev() {
1545 if let Ok(idx) = usize::try_from(candidate_id) {
1546 if let Some(candidate) = self.toc.frames.get(idx) {
1547 if candidate.role == FrameRole::Document
1548 && candidate.chunk_manifest.is_some()
1549 && candidate.status == FrameStatus::Active
1550 {
1551 return Some((frame_id, candidate_id));
1552 }
1553 }
1554 }
1555 }
1556 None
1557 })
1558 .collect();
1559
1560 for (chunk_id, parent_id) in orphan_resolutions {
1562 if let Ok(idx) = usize::try_from(chunk_id) {
1563 if let Some(frame) = self.toc.frames.get_mut(idx) {
1564 frame.parent_id = Some(parent_id);
1565 tracing::debug!(
1566 chunk_frame_id = chunk_id,
1567 parent_frame_id = parent_id,
1568 "resolved orphan chunk parent in second pass"
1569 );
1570 }
1571 }
1572 }
1573
1574 Ok(delta)
1577 }
1578
1579 #[cfg(feature = "temporal_track")]
1580 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1581 (timestamp, AnchorSource::FrameTimestamp)
1582 }
1583
1584 #[cfg(feature = "temporal_track")]
1585 fn collect_temporal_mentions(
1586 text: Option<&str>,
1587 frame_id: FrameId,
1588 anchor_ts: i64,
1589 ) -> Vec<TemporalMention> {
1590 let text = match text {
1591 Some(value) if !value.trim().is_empty() => value,
1592 _ => return Vec::new(),
1593 };
1594
1595 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1596 Ok(ts) => ts,
1597 Err(_) => return Vec::new(),
1598 };
1599
1600 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1601 let normalizer = TemporalNormalizer::new(context);
1602 let mut spans: Vec<(usize, usize)> = Vec::new();
1603 let lower = text.to_ascii_lowercase();
1604
1605 for phrase in STATIC_TEMPORAL_PHRASES {
1606 let mut search_start = 0usize;
1607 while let Some(idx) = lower[search_start..].find(phrase) {
1608 let abs = search_start + idx;
1609 let end = abs + phrase.len();
1610 spans.push((abs, end));
1611 search_start = end;
1612 }
1613 }
1614
1615 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1616 let regex = NUMERIC_DATE.get_or_init(|| {
1617 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1618 });
1619 let regex = match regex {
1620 Ok(re) => re,
1621 Err(msg) => {
1622 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1623 return Vec::new();
1624 }
1625 };
1626 for mat in regex.find_iter(text) {
1627 spans.push((mat.start(), mat.end()));
1628 }
1629
1630 spans.sort_unstable();
1631 spans.dedup();
1632
1633 let mut mentions: Vec<TemporalMention> = Vec::new();
1634 for (start, end) in spans {
1635 if end > text.len() || start >= end {
1636 continue;
1637 }
1638 let raw = &text[start..end];
1639 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1640 if trimmed.is_empty() {
1641 continue;
1642 }
1643 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1644 let finish = offset + trimmed.len();
1645 match normalizer.resolve(trimmed) {
1646 Ok(resolution) => {
1647 mentions.extend(Self::resolution_to_mentions(
1648 resolution, frame_id, offset, finish,
1649 ));
1650 }
1651 Err(_) => continue,
1652 }
1653 }
1654
1655 mentions
1656 }
1657
1658 #[cfg(feature = "temporal_track")]
1659 fn resolution_to_mentions(
1660 resolution: TemporalResolution,
1661 frame_id: FrameId,
1662 byte_start: usize,
1663 byte_end: usize,
1664 ) -> Vec<TemporalMention> {
1665 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1666 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1667 let mut results = Vec::new();
1668
1669 let base_flags = Self::flags_from_resolution(&resolution.flags);
1670 match resolution.value {
1671 TemporalResolutionValue::Date(date) => {
1672 let ts = Self::date_to_timestamp(date);
1673 results.push(TemporalMention::new(
1674 ts,
1675 frame_id,
1676 byte_start,
1677 byte_len,
1678 TemporalMentionKind::Date,
1679 resolution.confidence,
1680 0,
1681 base_flags,
1682 ));
1683 }
1684 TemporalResolutionValue::DateTime(dt) => {
1685 let ts = dt.unix_timestamp();
1686 let tz_hint = dt.offset().whole_minutes() as i16;
1687 results.push(TemporalMention::new(
1688 ts,
1689 frame_id,
1690 byte_start,
1691 byte_len,
1692 TemporalMentionKind::DateTime,
1693 resolution.confidence,
1694 tz_hint,
1695 base_flags,
1696 ));
1697 }
1698 TemporalResolutionValue::DateRange { start, end } => {
1699 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1700 let start_ts = Self::date_to_timestamp(start);
1701 results.push(TemporalMention::new(
1702 start_ts,
1703 frame_id,
1704 byte_start,
1705 byte_len,
1706 TemporalMentionKind::RangeStart,
1707 resolution.confidence,
1708 0,
1709 flags,
1710 ));
1711 let end_ts = Self::date_to_timestamp(end);
1712 results.push(TemporalMention::new(
1713 end_ts,
1714 frame_id,
1715 byte_start,
1716 byte_len,
1717 TemporalMentionKind::RangeEnd,
1718 resolution.confidence,
1719 0,
1720 flags,
1721 ));
1722 }
1723 TemporalResolutionValue::DateTimeRange { start, end } => {
1724 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1725 results.push(TemporalMention::new(
1726 start.unix_timestamp(),
1727 frame_id,
1728 byte_start,
1729 byte_len,
1730 TemporalMentionKind::RangeStart,
1731 resolution.confidence,
1732 start.offset().whole_minutes() as i16,
1733 flags,
1734 ));
1735 results.push(TemporalMention::new(
1736 end.unix_timestamp(),
1737 frame_id,
1738 byte_start,
1739 byte_len,
1740 TemporalMentionKind::RangeEnd,
1741 resolution.confidence,
1742 end.offset().whole_minutes() as i16,
1743 flags,
1744 ));
1745 }
1746 TemporalResolutionValue::Month { year, month } => {
1747 let start_date = match Date::from_calendar_date(year, month, 1) {
1748 Ok(date) => date,
1749 Err(err) => {
1750 tracing::warn!(
1751 target = "memvid::temporal",
1752 %err,
1753 year,
1754 month = month as u8,
1755 "skipping invalid month resolution"
1756 );
1757 return results;
1759 }
1760 };
1761 let end_date = match Self::last_day_in_month(year, month) {
1762 Some(date) => date,
1763 None => {
1764 tracing::warn!(
1765 target = "memvid::temporal",
1766 year,
1767 month = month as u8,
1768 "skipping month resolution with invalid calendar range"
1769 );
1770 return results;
1771 }
1772 };
1773 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1774 results.push(TemporalMention::new(
1775 Self::date_to_timestamp(start_date),
1776 frame_id,
1777 byte_start,
1778 byte_len,
1779 TemporalMentionKind::RangeStart,
1780 resolution.confidence,
1781 0,
1782 flags,
1783 ));
1784 results.push(TemporalMention::new(
1785 Self::date_to_timestamp(end_date),
1786 frame_id,
1787 byte_start,
1788 byte_len,
1789 TemporalMentionKind::RangeEnd,
1790 resolution.confidence,
1791 0,
1792 flags,
1793 ));
1794 }
1795 }
1796
1797 results
1798 }
1799
1800 #[cfg(feature = "temporal_track")]
1801 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1802 let mut result = TemporalMentionFlags::empty();
1803 if flags
1804 .iter()
1805 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1806 {
1807 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1808 }
1809 if flags
1810 .iter()
1811 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1812 {
1813 result = result.set(TemporalMentionFlags::DERIVED, true);
1814 }
1815 result
1816 }
1817
1818 #[cfg(feature = "temporal_track")]
1819 fn date_to_timestamp(date: Date) -> i64 {
1820 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1821 .assume_offset(UtcOffset::UTC)
1822 .unix_timestamp()
1823 }
1824
1825 #[cfg(feature = "temporal_track")]
1826 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1827 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1828 while let Some(next) = date.next_day() {
1829 if next.month() == month {
1830 date = next;
1831 } else {
1832 break;
1833 }
1834 }
1835 Some(date)
1836 }
1837
1838 #[allow(dead_code)]
1839 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1840 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1841 return Ok(false);
1842 }
1843
1844 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1845 Some(artifact) => artifact,
1846 None => return Ok(false),
1847 };
1848
1849 let segment_id = self.toc.segment_catalog.next_segment_id;
1850 #[cfg(feature = "parallel_segments")]
1851 let span =
1852 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1853
1854 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1855 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1856 #[cfg(feature = "parallel_segments")]
1857 if let Some(span) = span {
1858 Self::decorate_segment_common(&mut descriptor.common, span);
1859 }
1860 #[cfg(feature = "parallel_segments")]
1861 let descriptor_for_manifest = descriptor.clone();
1862 self.toc.segment_catalog.lex_segments.push(descriptor);
1863 #[cfg(feature = "parallel_segments")]
1864 if let Err(err) = self.record_index_segment(
1865 SegmentKind::Lexical,
1866 descriptor_for_manifest.common,
1867 SegmentStats {
1868 doc_count: artifact.doc_count,
1869 vector_count: 0,
1870 time_entries: 0,
1871 bytes_uncompressed: artifact.bytes.len() as u64,
1872 build_micros: 0,
1873 },
1874 ) {
1875 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1876 }
1877 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1878 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1879 Ok(true)
1880 }
1881
1882 #[allow(dead_code)]
1883 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1884 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1885 return Ok(false);
1886 }
1887
1888 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1889 Some(artifact) => artifact,
1890 None => return Ok(false),
1891 };
1892
1893 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1894 if existing_dim != artifact.dimension {
1895 return Err(MemvidError::VecDimensionMismatch {
1896 expected: existing_dim,
1897 actual: artifact.dimension as usize,
1898 });
1899 }
1900 }
1901
1902 let segment_id = self.toc.segment_catalog.next_segment_id;
1903 #[cfg(feature = "parallel_segments")]
1904 #[cfg(feature = "parallel_segments")]
1905 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1906
1907 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1908 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1909 #[cfg(feature = "parallel_segments")]
1910 if let Some(span) = span {
1911 Self::decorate_segment_common(&mut descriptor.common, span);
1912 }
1913 #[cfg(feature = "parallel_segments")]
1914 let descriptor_for_manifest = descriptor.clone();
1915 self.toc.segment_catalog.vec_segments.push(descriptor);
1916 #[cfg(feature = "parallel_segments")]
1917 if let Err(err) = self.record_index_segment(
1918 SegmentKind::Vector,
1919 descriptor_for_manifest.common,
1920 SegmentStats {
1921 doc_count: 0,
1922 vector_count: artifact.vector_count,
1923 time_entries: 0,
1924 bytes_uncompressed: artifact.bytes_uncompressed,
1925 build_micros: 0,
1926 },
1927 ) {
1928 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1929 }
1930 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1931 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1932
1933 if self.toc.indexes.vec.is_none() {
1935 let empty_offset = self.data_end;
1936 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1937 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1938 self.toc.indexes.vec = Some(VecIndexManifest {
1939 vector_count: 0,
1940 dimension: 0,
1941 bytes_offset: empty_offset,
1942 bytes_length: 0,
1943 checksum: empty_checksum,
1944 compression_mode: self.vec_compression.clone(),
1945 model: self.vec_model.clone(),
1946 });
1947 }
1948 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1949 if manifest.dimension == 0 {
1950 manifest.dimension = artifact.dimension;
1951 }
1952 if manifest.bytes_length == 0 {
1953 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1954 manifest.compression_mode = artifact.compression.clone();
1955 }
1956 }
1957
1958 self.vec_enabled = true;
1959 Ok(true)
1960 }
1961
1962 #[allow(dead_code)]
1963 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1964 if delta.inserted_time_entries.is_empty() {
1965 return Ok(false);
1966 }
1967
1968 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1969 Some(artifact) => artifact,
1970 None => return Ok(false),
1971 };
1972
1973 let segment_id = self.toc.segment_catalog.next_segment_id;
1974 #[cfg(feature = "parallel_segments")]
1975 #[cfg(feature = "parallel_segments")]
1976 let span = self.segment_span_from_iter(
1977 delta
1978 .inserted_time_entries
1979 .iter()
1980 .map(|entry| entry.frame_id),
1981 );
1982
1983 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1984 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1985 #[cfg(feature = "parallel_segments")]
1986 if let Some(span) = span {
1987 Self::decorate_segment_common(&mut descriptor.common, span);
1988 }
1989 #[cfg(feature = "parallel_segments")]
1990 let descriptor_for_manifest = descriptor.clone();
1991 self.toc.segment_catalog.time_segments.push(descriptor);
1992 #[cfg(feature = "parallel_segments")]
1993 if let Err(err) = self.record_index_segment(
1994 SegmentKind::Time,
1995 descriptor_for_manifest.common,
1996 SegmentStats {
1997 doc_count: 0,
1998 vector_count: 0,
1999 time_entries: artifact.entry_count,
2000 bytes_uncompressed: artifact.bytes.len() as u64,
2001 build_micros: 0,
2002 },
2003 ) {
2004 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
2005 }
2006 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2007 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2008 Ok(true)
2009 }
2010
2011 #[cfg(feature = "temporal_track")]
2012 #[allow(dead_code)]
2013 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
2014 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
2015 {
2016 return Ok(false);
2017 }
2018
2019 debug_assert!(
2020 delta.inserted_temporal_mentions.len() < 1_000_000,
2021 "temporal delta mentions unexpectedly large: {}",
2022 delta.inserted_temporal_mentions.len()
2023 );
2024 debug_assert!(
2025 delta.inserted_temporal_anchors.len() < 1_000_000,
2026 "temporal delta anchors unexpectedly large: {}",
2027 delta.inserted_temporal_anchors.len()
2028 );
2029
2030 let artifact = match self.build_temporal_segment_from_records(
2031 &delta.inserted_temporal_mentions,
2032 &delta.inserted_temporal_anchors,
2033 )? {
2034 Some(artifact) => artifact,
2035 None => return Ok(false),
2036 };
2037
2038 let segment_id = self.toc.segment_catalog.next_segment_id;
2039 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2040 self.toc
2041 .segment_catalog
2042 .temporal_segments
2043 .push(descriptor.clone());
2044 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2045 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2046
2047 self.toc.temporal_track = Some(TemporalTrackManifest {
2048 bytes_offset: descriptor.common.bytes_offset,
2049 bytes_length: descriptor.common.bytes_length,
2050 entry_count: artifact.entry_count,
2051 anchor_count: artifact.anchor_count,
2052 checksum: artifact.checksum,
2053 flags: artifact.flags,
2054 });
2055
2056 self.clear_temporal_track_cache();
2057
2058 Ok(true)
2059 }
2060
2061 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2062 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2063 frame_id,
2064 reason: "frame id too large",
2065 })?;
2066 let frame = self
2067 .toc
2068 .frames
2069 .get_mut(index)
2070 .ok_or(MemvidError::InvalidFrame {
2071 frame_id,
2072 reason: "supersede target missing",
2073 })?;
2074 frame.status = FrameStatus::Superseded;
2075 frame.superseded_by = Some(successor_id);
2076 self.remove_frame_from_indexes(frame_id)
2077 }
2078
2079 pub(crate) fn rebuild_indexes(
2080 &mut self,
2081 new_vec_docs: &[(FrameId, Vec<f32>)],
2082 inserted_frame_ids: &[FrameId],
2083 ) -> Result<()> {
2084 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2085 return Ok(());
2086 }
2087
2088 let payload_end = self.payload_region_end();
2089 self.data_end = payload_end;
2090 let safe_truncate_len = self.header.footer_offset.max(payload_end);
2093 if self.file.metadata()?.len() > safe_truncate_len {
2094 self.file.set_len(safe_truncate_len)?;
2095 }
2096 self.file.seek(SeekFrom::Start(payload_end))?;
2097
2098 self.toc.segment_catalog.lex_segments.clear();
2100 self.toc.segment_catalog.vec_segments.clear();
2101 self.toc.segment_catalog.time_segments.clear();
2102 #[cfg(feature = "temporal_track")]
2103 self.toc.segment_catalog.temporal_segments.clear();
2104 #[cfg(feature = "parallel_segments")]
2105 self.toc.segment_catalog.index_segments.clear();
2106 self.toc.segment_catalog.tantivy_segments.clear();
2108 self.toc.indexes.lex_segments.clear();
2110
2111 let mut time_entries: Vec<TimeIndexEntry> = self
2112 .toc
2113 .frames
2114 .iter()
2115 .filter(|frame| {
2116 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2117 })
2118 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2119 .collect();
2120 let (ti_offset, ti_length, ti_checksum) =
2121 time_index_append(&mut self.file, &mut time_entries)?;
2122 self.toc.time_index = Some(TimeIndexManifest {
2123 bytes_offset: ti_offset,
2124 bytes_length: ti_length,
2125 entry_count: time_entries.len() as u64,
2126 checksum: ti_checksum,
2127 });
2128
2129 let mut footer_offset = ti_offset + ti_length;
2130
2131 #[cfg(feature = "temporal_track")]
2132 {
2133 self.toc.temporal_track = None;
2134 self.toc.segment_catalog.temporal_segments.clear();
2135 self.clear_temporal_track_cache();
2136 }
2137
2138 if self.lex_enabled {
2139 #[cfg(feature = "lex")]
2140 {
2141 if self.tantivy_dirty {
2142 if let Ok(mut storage) = self.lex_storage.write() {
2146 storage.clear();
2147 storage.set_generation(0);
2148 }
2149 self.init_tantivy()?;
2150 if let Some(mut engine) = self.tantivy.take() {
2151 self.rebuild_tantivy_engine(&mut engine)?;
2152 self.tantivy = Some(engine);
2153 } else {
2154 return Err(MemvidError::InvalidToc {
2155 reason: "tantivy engine missing during rebuild".into(),
2156 });
2157 }
2158 } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2159 let max_payload = crate::memvid::search::max_index_payload();
2165 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2166 for &frame_id in inserted_frame_ids {
2167 let Ok(idx) = usize::try_from(frame_id) else {
2168 continue;
2169 };
2170 let frame = match self.toc.frames.get(idx) {
2171 Some(f) => f.clone(),
2172 None => continue,
2173 };
2174 if frame.status != FrameStatus::Active {
2175 continue;
2176 }
2177 if let Some(search_text) = frame.search_text.clone() {
2178 if !search_text.trim().is_empty() {
2179 prepared_docs.push((frame, search_text));
2180 continue;
2181 }
2182 }
2183 let mime = frame
2184 .metadata
2185 .as_ref()
2186 .and_then(|m| m.mime.as_deref())
2187 .unwrap_or("application/octet-stream");
2188 if !crate::memvid::search::is_text_indexable_mime(mime) {
2189 continue;
2190 }
2191 if frame.payload_length > max_payload {
2192 continue;
2193 }
2194 let text = self.frame_search_text(&frame)?;
2195 if !text.trim().is_empty() {
2196 prepared_docs.push((frame, text));
2197 }
2198 }
2199 if let Some(engine) = self.tantivy.as_mut() {
2200 for (frame, text) in &prepared_docs {
2201 engine.add_frame(frame, text)?;
2202 }
2203 engine.commit()?;
2204 }
2205 } else {
2206 if let Ok(mut storage) = self.lex_storage.write() {
2209 storage.clear();
2210 storage.set_generation(0);
2211 }
2212 self.init_tantivy()?;
2213 if let Some(mut engine) = self.tantivy.take() {
2214 self.rebuild_tantivy_engine(&mut engine)?;
2215 self.tantivy = Some(engine);
2216 } else {
2217 return Err(MemvidError::InvalidToc {
2218 reason: "tantivy engine missing during rebuild".into(),
2219 });
2220 }
2221 }
2222
2223 self.lex_enabled = true;
2225
2226 self.tantivy_dirty = true;
2228
2229 self.data_end = footer_offset;
2231
2232 self.flush_tantivy()?;
2234
2235 footer_offset = self.header.footer_offset;
2237
2238 self.data_end = payload_end;
2240 }
2241 #[cfg(not(feature = "lex"))]
2242 {
2243 self.toc.indexes.lex = None;
2244 self.toc.indexes.lex_segments.clear();
2245 }
2246 } else {
2247 self.toc.indexes.lex = None;
2249 self.toc.indexes.lex_segments.clear();
2250 #[cfg(feature = "lex")]
2251 if let Ok(mut storage) = self.lex_storage.write() {
2252 storage.clear();
2253 }
2254 }
2255
2256 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2257 let vec_offset = footer_offset;
2258 self.file.seek(SeekFrom::Start(vec_offset))?;
2259 self.file.write_all(&artifact.bytes)?;
2260 footer_offset += artifact.bytes.len() as u64;
2261 self.toc.indexes.vec = Some(VecIndexManifest {
2262 vector_count: artifact.vector_count,
2263 dimension: artifact.dimension,
2264 bytes_offset: vec_offset,
2265 bytes_length: artifact.bytes.len() as u64,
2266 checksum: artifact.checksum,
2267 compression_mode: self.vec_compression.clone(),
2268 model: self.vec_model.clone(),
2269 });
2270 self.vec_index = Some(index);
2271 } else {
2272 if !self.vec_enabled {
2274 self.toc.indexes.vec = None;
2275 }
2276 self.vec_index = None;
2277 }
2278
2279 if self.clip_enabled {
2281 if let Some(ref clip_index) = self.clip_index {
2282 if !clip_index.is_empty() {
2283 let artifact = clip_index.encode()?;
2284 let clip_offset = footer_offset;
2285 self.file.seek(SeekFrom::Start(clip_offset))?;
2286 self.file.write_all(&artifact.bytes)?;
2287 footer_offset += artifact.bytes.len() as u64;
2288 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2289 bytes_offset: clip_offset,
2290 bytes_length: artifact.bytes.len() as u64,
2291 vector_count: artifact.vector_count,
2292 dimension: artifact.dimension,
2293 checksum: artifact.checksum,
2294 model_name: crate::clip::default_model_info().name.to_string(),
2295 });
2296 tracing::info!(
2297 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2298 artifact.vector_count,
2299 clip_offset
2300 );
2301 }
2302 }
2303 } else {
2304 self.toc.indexes.clip = None;
2305 }
2306
2307 if self.memories_track.card_count() > 0 {
2309 let memories_offset = footer_offset;
2310 let memories_bytes = self.memories_track.serialize()?;
2311 let memories_checksum = blake3::hash(&memories_bytes).into();
2312 self.file.seek(SeekFrom::Start(memories_offset))?;
2313 self.file.write_all(&memories_bytes)?;
2314 footer_offset += memories_bytes.len() as u64;
2315
2316 let stats = self.memories_track.stats();
2317 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2318 bytes_offset: memories_offset,
2319 bytes_length: memories_bytes.len() as u64,
2320 card_count: stats.card_count as u64,
2321 entity_count: stats.entity_count as u64,
2322 checksum: memories_checksum,
2323 });
2324 } else {
2325 self.toc.memories_track = None;
2326 }
2327
2328 if self.logic_mesh.is_empty() {
2330 self.toc.logic_mesh = None;
2331 } else {
2332 let mesh_offset = footer_offset;
2333 let mesh_bytes = self.logic_mesh.serialize()?;
2334 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2335 self.file.seek(SeekFrom::Start(mesh_offset))?;
2336 self.file.write_all(&mesh_bytes)?;
2337 footer_offset += mesh_bytes.len() as u64;
2338
2339 let stats = self.logic_mesh.stats();
2340 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2341 bytes_offset: mesh_offset,
2342 bytes_length: mesh_bytes.len() as u64,
2343 node_count: stats.node_count as u64,
2344 edge_count: stats.edge_count as u64,
2345 checksum: mesh_checksum,
2346 });
2347 }
2348
2349 tracing::info!(
2351 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2352 ti_offset,
2353 ti_length,
2354 footer_offset,
2355 self.header.footer_offset
2356 );
2357
2358 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2361
2362 if self.file.metadata()?.len() < self.header.footer_offset {
2364 self.file.set_len(self.header.footer_offset)?;
2365 }
2366
2367 self.rewrite_toc_footer()?;
2368 self.header.toc_checksum = self.toc.toc_checksum;
2369 crate::persist_header(&mut self.file, &self.header)?;
2370
2371 #[cfg(feature = "lex")]
2372 if self.lex_enabled {
2373 if let Some(ref engine) = self.tantivy {
2374 let doc_count = engine.num_docs();
2375 let active_frame_count = self
2376 .toc
2377 .frames
2378 .iter()
2379 .filter(|f| f.status == FrameStatus::Active)
2380 .count();
2381
2382 let text_indexable_count = self
2385 .toc
2386 .frames
2387 .iter()
2388 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2389 .count();
2390
2391 if doc_count == 0 && text_indexable_count > 0 {
2394 return Err(MemvidError::Doctor {
2395 reason: format!(
2396 "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2397 This indicates a critical failure in the rebuild process."
2398 ),
2399 });
2400 }
2401
2402 log::info!(
2404 "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2405 );
2406 }
2407 }
2408
2409 Ok(())
2410 }
2411
2412 fn persist_memories_track(&mut self) -> Result<()> {
2417 if self.memories_track.card_count() == 0 {
2418 self.toc.memories_track = None;
2419 return Ok(());
2420 }
2421
2422 let memories_offset = self.header.footer_offset;
2424 let memories_bytes = self.memories_track.serialize()?;
2425 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2426
2427 self.file.seek(SeekFrom::Start(memories_offset))?;
2428 self.file.write_all(&memories_bytes)?;
2429
2430 let stats = self.memories_track.stats();
2431 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2432 bytes_offset: memories_offset,
2433 bytes_length: memories_bytes.len() as u64,
2434 card_count: stats.card_count as u64,
2435 entity_count: stats.entity_count as u64,
2436 checksum: memories_checksum,
2437 });
2438
2439 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2441
2442 if self.file.metadata()?.len() < self.header.footer_offset {
2444 self.file.set_len(self.header.footer_offset)?;
2445 }
2446
2447 Ok(())
2448 }
2449
2450 fn persist_clip_index(&mut self) -> Result<()> {
2455 if !self.clip_enabled {
2456 self.toc.indexes.clip = None;
2457 return Ok(());
2458 }
2459
2460 let clip_index = match &self.clip_index {
2461 Some(idx) if !idx.is_empty() => idx,
2462 _ => {
2463 self.toc.indexes.clip = None;
2464 return Ok(());
2465 }
2466 };
2467
2468 let artifact = clip_index.encode()?;
2470
2471 let clip_offset = self.header.footer_offset;
2473 self.file.seek(SeekFrom::Start(clip_offset))?;
2474 self.file.write_all(&artifact.bytes)?;
2475
2476 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2477 bytes_offset: clip_offset,
2478 bytes_length: artifact.bytes.len() as u64,
2479 vector_count: artifact.vector_count,
2480 dimension: artifact.dimension,
2481 checksum: artifact.checksum,
2482 model_name: crate::clip::default_model_info().name.to_string(),
2483 });
2484
2485 tracing::info!(
2486 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2487 artifact.vector_count,
2488 clip_offset
2489 );
2490
2491 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2493
2494 if self.file.metadata()?.len() < self.header.footer_offset {
2496 self.file.set_len(self.header.footer_offset)?;
2497 }
2498
2499 Ok(())
2500 }
2501
2502 fn persist_logic_mesh(&mut self) -> Result<()> {
2507 if self.logic_mesh.is_empty() {
2508 self.toc.logic_mesh = None;
2509 return Ok(());
2510 }
2511
2512 let mesh_offset = self.header.footer_offset;
2514 let mesh_bytes = self.logic_mesh.serialize()?;
2515 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2516
2517 self.file.seek(SeekFrom::Start(mesh_offset))?;
2518 self.file.write_all(&mesh_bytes)?;
2519
2520 let stats = self.logic_mesh.stats();
2521 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2522 bytes_offset: mesh_offset,
2523 bytes_length: mesh_bytes.len() as u64,
2524 node_count: stats.node_count as u64,
2525 edge_count: stats.edge_count as u64,
2526 checksum: mesh_checksum,
2527 });
2528
2529 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2531
2532 if self.file.metadata()?.len() < self.header.footer_offset {
2534 self.file.set_len(self.header.footer_offset)?;
2535 }
2536
2537 Ok(())
2538 }
2539
2540 fn persist_sketch_track(&mut self) -> Result<()> {
2545 if self.sketch_track.is_empty() {
2546 self.toc.sketch_track = None;
2547 return Ok(());
2548 }
2549
2550 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2552
2553 let (sketch_offset, sketch_length, sketch_checksum) =
2555 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2556
2557 let stats = self.sketch_track.stats();
2558 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2559 bytes_offset: sketch_offset,
2560 bytes_length: sketch_length,
2561 entry_count: stats.entry_count,
2562 #[allow(clippy::cast_possible_truncation)]
2563 entry_size: stats.variant.entry_size() as u16,
2564 flags: 0,
2565 checksum: sketch_checksum,
2566 });
2567
2568 self.header.footer_offset = sketch_offset + sketch_length;
2570
2571 if self.file.metadata()?.len() < self.header.footer_offset {
2573 self.file.set_len(self.header.footer_offset)?;
2574 }
2575
2576 tracing::debug!(
2577 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2578 stats.entry_count,
2579 sketch_offset
2580 );
2581
2582 Ok(())
2583 }
2584
2585 #[cfg(feature = "lex")]
2586 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2587 let LexWalBatch {
2588 generation,
2589 doc_count,
2590 checksum,
2591 segments,
2592 } = batch;
2593
2594 if let Ok(mut storage) = self.lex_storage.write() {
2595 storage.replace(doc_count, checksum, segments);
2596 storage.set_generation(generation);
2597 }
2598
2599 self.persist_lex_manifest()
2600 }
2601
2602 #[cfg(feature = "lex")]
2603 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2604 let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2605 self.append_wal_entry(&payload)?;
2606 Ok(())
2607 }
2608
2609 #[cfg(feature = "lex")]
2610 fn persist_lex_manifest(&mut self) -> Result<()> {
2611 let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2612 storage.to_manifest()
2613 } else {
2614 (None, Vec::new())
2615 };
2616
2617 if let Some(storage_manifest) = index_manifest {
2619 self.toc.indexes.lex = Some(storage_manifest);
2621 } else {
2622 self.toc.indexes.lex = None;
2625 }
2626
2627 self.toc.indexes.lex_segments = segments;
2628
2629 self.rewrite_toc_footer()?;
2633 self.header.toc_checksum = self.toc.toc_checksum;
2634 crate::persist_header(&mut self.file, &self.header)?;
2635 Ok(())
2636 }
2637
2638 #[cfg(feature = "lex")]
2639 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2640 let TantivySnapshot {
2641 doc_count,
2642 checksum,
2643 segments,
2644 } = snapshot;
2645
2646 let mut footer_offset = self.data_end;
2647 self.file.seek(SeekFrom::Start(footer_offset))?;
2648
2649 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2650 for segment in segments {
2651 let bytes_length = segment.bytes.len() as u64;
2652 self.file.write_all(&segment.bytes)?;
2653 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2655 path: segment.path,
2656 bytes_offset: footer_offset,
2657 bytes_length,
2658 checksum: segment.checksum,
2659 });
2660 footer_offset += bytes_length;
2661 }
2662 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2667
2668 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2669 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2670 Vec::with_capacity(embedded_segments.len());
2671 for segment in &embedded_segments {
2672 let descriptor = TantivySegmentDescriptor::from_common(
2673 SegmentCommon::new(
2674 next_segment_id,
2675 segment.bytes_offset,
2676 segment.bytes_length,
2677 segment.checksum,
2678 ),
2679 segment.path.clone(),
2680 );
2681 catalog_segments.push(descriptor);
2682 next_segment_id = next_segment_id.saturating_add(1);
2683 }
2684 if catalog_segments.is_empty() {
2685 self.toc.segment_catalog.tantivy_segments.clear();
2686 } else {
2687 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2688 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2689 }
2690 self.toc.segment_catalog.next_segment_id = next_segment_id;
2691
2692 let generation = self
2699 .lex_storage
2700 .write()
2701 .map_err(|_| MemvidError::Tantivy {
2702 reason: "embedded lex storage lock poisoned".into(),
2703 })
2704 .map(|mut storage| {
2705 storage.replace(doc_count, checksum, embedded_segments.clone());
2706 storage.generation()
2707 })?;
2708
2709 let batch = LexWalBatch {
2710 generation,
2711 doc_count,
2712 checksum,
2713 segments: embedded_segments.clone(),
2714 };
2715 self.append_lex_batch(&batch)?;
2716 self.persist_lex_manifest()?;
2717 self.header.toc_checksum = self.toc.toc_checksum;
2718 crate::persist_header(&mut self.file, &self.header)?;
2719 Ok(())
2720 }
2721
2722 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2723 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2724 frame_id,
2725 reason: "frame id too large",
2726 })?;
2727 let frame = self
2728 .toc
2729 .frames
2730 .get_mut(index)
2731 .ok_or(MemvidError::InvalidFrame {
2732 frame_id,
2733 reason: "delete target missing",
2734 })?;
2735 frame.status = FrameStatus::Deleted;
2736 frame.superseded_by = None;
2737 self.remove_frame_from_indexes(frame_id)
2738 }
2739
2740 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2741 #[cfg(feature = "lex")]
2742 if let Some(engine) = self.tantivy.as_mut() {
2743 engine.delete_frame(frame_id)?;
2744 self.tantivy_dirty = true;
2745 }
2746 if let Some(index) = self.lex_index.as_mut() {
2747 index.remove_document(frame_id);
2748 }
2749 if let Some(index) = self.vec_index.as_mut() {
2750 index.remove(frame_id);
2751 }
2752 Ok(())
2753 }
2754
2755 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2756 let Ok(index) = usize::try_from(frame_id) else {
2757 return false;
2758 };
2759 self.toc
2760 .frames
2761 .get(index)
2762 .is_some_and(|frame| frame.status == FrameStatus::Active)
2763 }
2764
2765 #[cfg(feature = "parallel_segments")]
2766 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2767 where
2768 I: IntoIterator<Item = FrameId>,
2769 {
2770 let mut iter = iter.into_iter();
2771 let first_id = iter.next()?;
2772 let first_frame = self.toc.frames.get(first_id as usize);
2773 let mut min_id = first_id;
2774 let mut max_id = first_id;
2775 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2776 let mut page_end = first_frame
2777 .and_then(|frame| frame.chunk_count)
2778 .map(|count| page_start + count.saturating_sub(1))
2779 .unwrap_or(page_start);
2780 for frame_id in iter {
2781 if frame_id < min_id {
2782 min_id = frame_id;
2783 }
2784 if frame_id > max_id {
2785 max_id = frame_id;
2786 }
2787 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2788 if let Some(idx) = frame.chunk_index {
2789 page_start = page_start.min(idx);
2790 if let Some(count) = frame.chunk_count {
2791 let end = idx + count.saturating_sub(1);
2792 page_end = page_end.max(end);
2793 } else {
2794 page_end = page_end.max(idx);
2795 }
2796 }
2797 }
2798 }
2799 Some(SegmentSpan {
2800 frame_start: min_id,
2801 frame_end: max_id,
2802 page_start,
2803 page_end,
2804 ..SegmentSpan::default()
2805 })
2806 }
2807
2808 #[cfg(feature = "parallel_segments")]
2809 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2810 common.span = Some(span);
2811 if common.codec_version == 0 {
2812 common.codec_version = 1;
2813 }
2814 }
2815
2816 #[cfg(feature = "parallel_segments")]
2817 pub(crate) fn record_index_segment(
2818 &mut self,
2819 kind: SegmentKind,
2820 common: SegmentCommon,
2821 stats: SegmentStats,
2822 ) -> Result<()> {
2823 let entry = IndexSegmentRef {
2824 kind,
2825 common,
2826 stats,
2827 };
2828 self.toc.segment_catalog.index_segments.push(entry.clone());
2829 if let Some(wal) = self.manifest_wal.as_mut() {
2830 wal.append_segments(&[entry])?;
2831 }
2832 Ok(())
2833 }
2834
2835 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2836 self.ensure_writable()?;
2837 if self.toc.ticket_ref.issuer == "free-tier" {
2838 return Ok(());
2839 }
2840 match self.tier() {
2841 Tier::Free => Ok(()),
2842 tier => {
2843 if self.toc.ticket_ref.issuer.trim().is_empty() {
2844 Err(MemvidError::TicketRequired { tier })
2845 } else {
2846 Ok(())
2847 }
2848 }
2849 }
2850 }
2851
2852 pub(crate) fn tier(&self) -> Tier {
2853 if self.header.wal_size >= WAL_SIZE_LARGE {
2854 Tier::Enterprise
2855 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2856 Tier::Dev
2857 } else {
2858 Tier::Free
2859 }
2860 }
2861
2862 pub(crate) fn capacity_limit(&self) -> u64 {
2863 if self.toc.ticket_ref.capacity_bytes != 0 {
2864 self.toc.ticket_ref.capacity_bytes
2865 } else {
2866 self.tier().capacity_bytes()
2867 }
2868 }
2869
2870 #[must_use]
2875 pub fn get_capacity(&self) -> u64 {
2876 self.capacity_limit()
2877 }
2878
2879 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2880 tracing::info!(
2881 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2882 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2883 time_segments = self.toc.segment_catalog.time_segments.len(),
2884 footer_offset = self.header.footer_offset,
2885 data_end = self.data_end,
2886 "rewrite_toc_footer: about to serialize TOC"
2887 );
2888 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2889 let footer_offset = self.header.footer_offset;
2890 self.file.seek(SeekFrom::Start(footer_offset))?;
2891 self.file.write_all(&toc_bytes)?;
2892 let footer = CommitFooter {
2893 toc_len: toc_bytes.len() as u64,
2894 toc_hash: *hash(&toc_bytes).as_bytes(),
2895 generation: self.generation,
2896 };
2897 let encoded_footer = footer.encode();
2898 self.file.write_all(&encoded_footer)?;
2899
2900 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2902 let min_len = self.header.wal_offset + self.header.wal_size;
2903 let final_len = new_len.max(min_len);
2904
2905 if new_len < min_len {
2906 tracing::warn!(
2907 file.new_len = new_len,
2908 file.min_len = min_len,
2909 file.final_len = final_len,
2910 "truncation would cut into WAL region, clamping to min_len"
2911 );
2912 }
2913
2914 self.file.set_len(final_len)?;
2915 self.file.sync_all()?;
2917 Ok(())
2918 }
2919}
2920
2921#[cfg(feature = "parallel_segments")]
2922impl Memvid {
2923 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2924 let chunks = self.collect_segment_chunks(delta)?;
2925 if chunks.is_empty() {
2926 return Ok(false);
2927 }
2928 let planner = SegmentPlanner::new(opts.clone());
2929 let plans = planner.plan_from_chunks(chunks);
2930 if plans.is_empty() {
2931 return Ok(false);
2932 }
2933 let worker_pool = SegmentWorkerPool::new(opts);
2934 let results = worker_pool.execute(plans)?;
2935 if results.is_empty() {
2936 return Ok(false);
2937 }
2938 self.append_parallel_segments(results)?;
2939 Ok(true)
2940 }
2941
2942 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2943 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2944 delta.inserted_embeddings.iter().cloned().collect();
2945 tracing::info!(
2946 inserted_frames = ?delta.inserted_frames,
2947 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2948 "collect_segment_chunks: comparing frame IDs"
2949 );
2950 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2951 for frame_id in &delta.inserted_frames {
2952 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2953 MemvidError::InvalidFrame {
2954 frame_id: *frame_id,
2955 reason: "frame id out of range while planning segments",
2956 },
2957 )?;
2958 let text = self.frame_content(&frame)?;
2959 if text.trim().is_empty() {
2960 continue;
2961 }
2962 let token_estimate = estimate_tokens(&text);
2963 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2964 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2965 let page_start = if frame.chunk_index.is_some() {
2966 chunk_index + 1
2967 } else {
2968 0
2969 };
2970 let page_end = if frame.chunk_index.is_some() {
2971 page_start
2972 } else {
2973 0
2974 };
2975 chunks.push(SegmentChunkPlan {
2976 text,
2977 frame_id: *frame_id,
2978 timestamp: frame.timestamp,
2979 chunk_index,
2980 chunk_count: chunk_count.max(1),
2981 token_estimate,
2982 token_start: 0,
2983 token_end: 0,
2984 page_start,
2985 page_end,
2986 embedding: embedding_map.remove(frame_id),
2987 });
2988 }
2989 Ok(chunks)
2990 }
2991}
2992
2993#[cfg(feature = "parallel_segments")]
2994fn estimate_tokens(text: &str) -> usize {
2995 text.split_whitespace().count().max(1)
2996}
2997
2998impl Memvid {
2999 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
3000 let catalog_end = self.catalog_data_end();
3001 if catalog_end <= self.header.footer_offset {
3002 return Ok(false);
3003 }
3004 self.header.footer_offset = catalog_end;
3005 self.rewrite_toc_footer()?;
3006 self.header.toc_checksum = self.toc.toc_checksum;
3007 crate::persist_header(&mut self.file, &self.header)?;
3008 Ok(true)
3009 }
3010}
3011
3012impl Memvid {
3013 pub fn vacuum(&mut self) -> Result<()> {
3014 self.commit()?;
3015
3016 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
3017 let frames: Vec<Frame> = self
3018 .toc
3019 .frames
3020 .iter()
3021 .filter(|frame| frame.status == FrameStatus::Active)
3022 .cloned()
3023 .collect();
3024 for frame in frames {
3025 let bytes = self.read_frame_payload_bytes(&frame)?;
3026 active_payloads.insert(frame.id, bytes);
3027 }
3028
3029 let mut cursor = self.header.wal_offset + self.header.wal_size;
3030 self.file.seek(SeekFrom::Start(cursor))?;
3031 for frame in &mut self.toc.frames {
3032 if frame.status == FrameStatus::Active {
3033 if let Some(bytes) = active_payloads.get(&frame.id) {
3034 self.file.write_all(bytes)?;
3035 frame.payload_offset = cursor;
3036 frame.payload_length = bytes.len() as u64;
3037 cursor += bytes.len() as u64;
3038 } else {
3039 frame.payload_offset = 0;
3040 frame.payload_length = 0;
3041 }
3042 } else {
3043 frame.payload_offset = 0;
3044 frame.payload_length = 0;
3045 }
3046 }
3047
3048 self.data_end = cursor;
3049
3050 self.toc.segments.clear();
3051 self.toc.indexes.lex_segments.clear();
3052 self.toc.segment_catalog.lex_segments.clear();
3053 self.toc.segment_catalog.vec_segments.clear();
3054 self.toc.segment_catalog.time_segments.clear();
3055 #[cfg(feature = "temporal_track")]
3056 {
3057 self.toc.temporal_track = None;
3058 self.toc.segment_catalog.temporal_segments.clear();
3059 }
3060 #[cfg(feature = "lex")]
3061 {
3062 self.toc.segment_catalog.tantivy_segments.clear();
3063 }
3064 #[cfg(feature = "parallel_segments")]
3065 {
3066 self.toc.segment_catalog.index_segments.clear();
3067 }
3068
3069 #[cfg(feature = "lex")]
3071 {
3072 self.tantivy = None;
3073 self.tantivy_dirty = false;
3074 }
3075
3076 self.rebuild_indexes(&[], &[])?;
3077 self.file.sync_all()?;
3078 Ok(())
3079 }
3080
3081 #[must_use]
3099 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3100 plan_document_chunks(payload).map(|plan| plan.chunks)
3101 }
3102
3103 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3105 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3106 }
3107
3108 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3110 self.put_internal(Some(payload), None, None, None, options, None)
3111 }
3112
3113 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3115 self.put_internal(
3116 Some(payload),
3117 None,
3118 Some(embedding),
3119 None,
3120 PutOptions::default(),
3121 None,
3122 )
3123 }
3124
3125 pub fn put_with_embedding_and_options(
3126 &mut self,
3127 payload: &[u8],
3128 embedding: Vec<f32>,
3129 options: PutOptions,
3130 ) -> Result<u64> {
3131 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3132 }
3133
3134 pub fn put_with_chunk_embeddings(
3147 &mut self,
3148 payload: &[u8],
3149 parent_embedding: Option<Vec<f32>>,
3150 chunk_embeddings: Vec<Vec<f32>>,
3151 options: PutOptions,
3152 ) -> Result<u64> {
3153 self.put_internal(
3154 Some(payload),
3155 None,
3156 parent_embedding,
3157 Some(chunk_embeddings),
3158 options,
3159 None,
3160 )
3161 }
3162
3163 pub fn update_frame(
3165 &mut self,
3166 frame_id: FrameId,
3167 payload: Option<Vec<u8>>,
3168 mut options: PutOptions,
3169 embedding: Option<Vec<f32>>,
3170 ) -> Result<u64> {
3171 self.ensure_mutation_allowed()?;
3172 let existing = self.frame_by_id(frame_id)?;
3173 if existing.status != FrameStatus::Active {
3174 return Err(MemvidError::InvalidFrame {
3175 frame_id,
3176 reason: "frame is not active",
3177 });
3178 }
3179
3180 if options.timestamp.is_none() {
3181 options.timestamp = Some(existing.timestamp);
3182 }
3183 if options.track.is_none() {
3184 options.track = existing.track.clone();
3185 }
3186 if options.kind.is_none() {
3187 options.kind = existing.kind.clone();
3188 }
3189 if options.uri.is_none() {
3190 options.uri = existing.uri.clone();
3191 }
3192 if options.title.is_none() {
3193 options.title = existing.title.clone();
3194 }
3195 if options.metadata.is_none() {
3196 options.metadata = existing.metadata.clone();
3197 }
3198 if options.search_text.is_none() {
3199 options.search_text = existing.search_text.clone();
3200 }
3201 if options.tags.is_empty() {
3202 options.tags = existing.tags.clone();
3203 }
3204 if options.labels.is_empty() {
3205 options.labels = existing.labels.clone();
3206 }
3207 if options.extra_metadata.is_empty() {
3208 options.extra_metadata = existing.extra_metadata.clone();
3209 }
3210
3211 let reuse_frame = if payload.is_none() {
3212 options.auto_tag = false;
3213 options.extract_dates = false;
3214 Some(existing.clone())
3215 } else {
3216 None
3217 };
3218
3219 let effective_embedding = if let Some(explicit) = embedding {
3220 Some(explicit)
3221 } else if self.vec_enabled {
3222 self.frame_embedding(frame_id)?
3223 } else {
3224 None
3225 };
3226
3227 let payload_slice = payload.as_deref();
3228 let reuse_flag = reuse_frame.is_some();
3229 let replace_flag = payload_slice.is_some();
3230 let seq = self.put_internal(
3231 payload_slice,
3232 reuse_frame,
3233 effective_embedding,
3234 None, options,
3236 Some(frame_id),
3237 )?;
3238 info!(
3239 "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3240 );
3241 Ok(seq)
3242 }
3243
3244 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3245 self.ensure_mutation_allowed()?;
3246 let frame = self.frame_by_id(frame_id)?;
3247 if frame.status != FrameStatus::Active {
3248 return Err(MemvidError::InvalidFrame {
3249 frame_id,
3250 reason: "frame is not active",
3251 });
3252 }
3253
3254 let mut tombstone = WalEntryData {
3255 timestamp: SystemTime::now()
3256 .duration_since(UNIX_EPOCH)
3257 .map(|d| d.as_secs() as i64)
3258 .unwrap_or(frame.timestamp),
3259 kind: None,
3260 track: None,
3261 payload: Vec::new(),
3262 embedding: None,
3263 uri: frame.uri.clone(),
3264 title: frame.title.clone(),
3265 canonical_encoding: frame.canonical_encoding,
3266 canonical_length: frame.canonical_length,
3267 metadata: None,
3268 search_text: None,
3269 tags: Vec::new(),
3270 labels: Vec::new(),
3271 extra_metadata: BTreeMap::new(),
3272 content_dates: Vec::new(),
3273 chunk_manifest: None,
3274 role: frame.role,
3275 parent_sequence: None,
3276 chunk_index: frame.chunk_index,
3277 chunk_count: frame.chunk_count,
3278 op: FrameWalOp::Tombstone,
3279 target_frame_id: Some(frame_id),
3280 supersedes_frame_id: None,
3281 reuse_payload_from: None,
3282 source_sha256: None,
3283 source_path: None,
3284 enrichment_state: crate::types::EnrichmentState::default(),
3285 };
3286 tombstone.kind = frame.kind.clone();
3287 tombstone.track = frame.track.clone();
3288
3289 let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3290 let seq = self.append_wal_entry(&payload_bytes)?;
3291 self.dirty = true;
3292 let suppress_checkpoint = self
3293 .batch_opts
3294 .as_ref()
3295 .is_some_and(|o| o.disable_auto_checkpoint);
3296 if !suppress_checkpoint && self.wal.should_checkpoint() {
3297 self.commit()?;
3298 }
3299 info!("frame_delete frame_id={frame_id} seq={seq}");
3300 Ok(seq)
3301 }
3302}
3303
3304impl Memvid {
3305 fn put_internal(
3306 &mut self,
3307 payload: Option<&[u8]>,
3308 reuse_frame: Option<Frame>,
3309 embedding: Option<Vec<f32>>,
3310 chunk_embeddings: Option<Vec<Vec<f32>>>,
3311 mut options: PutOptions,
3312 supersedes: Option<FrameId>,
3313 ) -> Result<u64> {
3314 self.ensure_mutation_allowed()?;
3315
3316 if options.dedup {
3318 if let Some(bytes) = payload {
3319 let content_hash = hash(bytes);
3320 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3321 tracing::debug!(
3323 frame_id = existing_frame.id,
3324 "dedup: skipping ingestion, identical content already exists"
3325 );
3326 return Ok(existing_frame.id);
3328 }
3329 }
3330 }
3331
3332 if payload.is_some() && reuse_frame.is_some() {
3333 let frame_id = reuse_frame
3334 .as_ref()
3335 .map(|frame| frame.id)
3336 .unwrap_or_default();
3337 return Err(MemvidError::InvalidFrame {
3338 frame_id,
3339 reason: "cannot reuse payload when bytes are provided",
3340 });
3341 }
3342
3343 let incoming_dimension = {
3346 let mut dim: Option<u32> = None;
3347
3348 if let Some(ref vector) = embedding {
3349 if !vector.is_empty() {
3350 #[allow(clippy::cast_possible_truncation)]
3351 let len = vector.len() as u32;
3352 dim = Some(len);
3353 }
3354 }
3355
3356 if let Some(ref vectors) = chunk_embeddings {
3357 for vector in vectors {
3358 if vector.is_empty() {
3359 continue;
3360 }
3361 let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3362 match dim {
3363 None => dim = Some(vec_dim),
3364 Some(existing) if existing == vec_dim => {}
3365 Some(existing) => {
3366 return Err(MemvidError::VecDimensionMismatch {
3367 expected: existing,
3368 actual: vector.len(),
3369 });
3370 }
3371 }
3372 }
3373 }
3374
3375 dim
3376 };
3377
3378 if let Some(incoming_dimension) = incoming_dimension {
3379 if !self.vec_enabled {
3381 self.enable_vec()?;
3382 }
3383
3384 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3385 if existing_dimension != incoming_dimension {
3386 return Err(MemvidError::VecDimensionMismatch {
3387 expected: existing_dimension,
3388 actual: incoming_dimension as usize,
3389 });
3390 }
3391 }
3392
3393 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3395 if manifest.dimension == 0 {
3396 manifest.dimension = incoming_dimension;
3397 }
3398 }
3399 }
3400
3401 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3402 let payload_tail = self.payload_region_end();
3403 let projected = if let Some(bytes) = payload {
3404 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3405 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3406 } else {
3407 prepare_canonical_payload(bytes)?
3408 };
3409 let len = prepared.len();
3410 prepared_payload = Some((prepared, encoding, length));
3411 payload_tail.saturating_add(len as u64)
3412 } else if reuse_frame.is_some() {
3413 payload_tail
3414 } else {
3415 return Err(MemvidError::InvalidFrame {
3416 frame_id: 0,
3417 reason: "payload required for frame insertion",
3418 });
3419 };
3420
3421 let capacity_limit = self.capacity_limit();
3422 if projected > capacity_limit {
3423 let incoming_size = projected.saturating_sub(payload_tail);
3424 return Err(MemvidError::CapacityExceeded {
3425 current: payload_tail,
3426 limit: capacity_limit,
3427 required: incoming_size,
3428 });
3429 }
3430 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3431 SystemTime::now()
3432 .duration_since(UNIX_EPOCH)
3433 .map(|d| d.as_secs() as i64)
3434 .unwrap_or(0)
3435 });
3436
3437 #[allow(unused_assignments)]
3438 let mut reuse_bytes: Option<Vec<u8>> = None;
3439 let payload_for_processing = if let Some(bytes) = payload {
3440 Some(bytes)
3441 } else if let Some(frame) = reuse_frame.as_ref() {
3442 let bytes = self.frame_canonical_bytes(frame)?;
3443 reuse_bytes = Some(bytes);
3444 reuse_bytes.as_deref()
3445 } else {
3446 None
3447 };
3448
3449 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3451 (Some(bytes), None) => plan_document_chunks(bytes),
3452 _ => None,
3453 };
3454
3455 let mut source_sha256: Option<[u8; 32]> = None;
3459 let source_path_value = options.source_path.take();
3460
3461 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3462 if raw_chunk_plan.is_some() {
3463 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3465 } else if options.no_raw {
3466 if let Some(bytes) = payload {
3468 let hash_result = hash(bytes);
3470 source_sha256 = Some(*hash_result.as_bytes());
3471 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3473 } else {
3474 return Err(MemvidError::InvalidFrame {
3475 frame_id: 0,
3476 reason: "payload required for --no-raw mode",
3477 });
3478 }
3479 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3480 (prepared, encoding, length, None)
3481 } else if let Some(bytes) = payload {
3482 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3483 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3484 } else {
3485 prepare_canonical_payload(bytes)?
3486 };
3487 (prepared, encoding, length, None)
3488 } else if let Some(frame) = reuse_frame.as_ref() {
3489 (
3490 Vec::new(),
3491 frame.canonical_encoding,
3492 frame.canonical_length,
3493 Some(frame.id),
3494 )
3495 } else {
3496 return Err(MemvidError::InvalidFrame {
3497 frame_id: 0,
3498 reason: "payload required for frame insertion",
3499 });
3500 };
3501
3502 let mut chunk_plan = raw_chunk_plan;
3504
3505 let mut metadata = options.metadata.take();
3506 let mut search_text = options
3507 .search_text
3508 .take()
3509 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3510 let mut tags = std::mem::take(&mut options.tags);
3511 let mut labels = std::mem::take(&mut options.labels);
3512 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3513 let mut content_dates: Vec<String> = Vec::new();
3514
3515 let need_search_text = search_text
3516 .as_ref()
3517 .is_none_or(|text| text.trim().is_empty());
3518 let need_metadata = metadata.is_none();
3519 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3520
3521 let mut extraction_error = None;
3522 let mut is_skim_extraction = false; let extracted = if run_extractor {
3525 if let Some(bytes) = payload_for_processing {
3526 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3527 let uri_hint = options.uri.as_deref();
3528
3529 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3531
3532 if use_budgeted {
3533 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3535 options.extraction_budget_ms,
3536 );
3537 match crate::extract_budgeted::extract_with_budget(
3538 bytes, mime_hint, uri_hint, budget,
3539 ) {
3540 Ok(result) => {
3541 is_skim_extraction = result.is_skim();
3542 if is_skim_extraction {
3543 tracing::debug!(
3544 coverage = result.coverage,
3545 elapsed_ms = result.elapsed_ms,
3546 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3547 "time-budgeted extraction (skim)"
3548 );
3549 }
3550 let doc = crate::extract::ExtractedDocument {
3552 text: if result.text.is_empty() {
3553 None
3554 } else {
3555 Some(result.text)
3556 },
3557 metadata: serde_json::json!({
3558 "skim": is_skim_extraction,
3559 "coverage": result.coverage,
3560 "sections_extracted": result.sections_extracted,
3561 "sections_total": result.sections_total,
3562 }),
3563 mime_type: mime_hint.map(std::string::ToString::to_string),
3564 };
3565 Some(doc)
3566 }
3567 Err(err) => {
3568 tracing::warn!(
3570 ?err,
3571 "budgeted extraction failed, trying full extraction"
3572 );
3573 match extract_via_registry(bytes, mime_hint, uri_hint) {
3574 Ok(doc) => Some(doc),
3575 Err(err) => {
3576 extraction_error = Some(err);
3577 None
3578 }
3579 }
3580 }
3581 }
3582 } else {
3583 match extract_via_registry(bytes, mime_hint, uri_hint) {
3585 Ok(doc) => Some(doc),
3586 Err(err) => {
3587 extraction_error = Some(err);
3588 None
3589 }
3590 }
3591 }
3592 } else {
3593 None
3594 }
3595 } else {
3596 None
3597 };
3598
3599 if let Some(err) = extraction_error {
3600 return Err(err);
3601 }
3602
3603 if let Some(doc) = &extracted {
3604 if need_search_text {
3605 if let Some(text) = &doc.text {
3606 if let Some(normalized) =
3607 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3608 {
3609 search_text = Some(normalized);
3610 }
3611 }
3612 }
3613
3614 if chunk_plan.is_none() {
3617 if let Some(text) = &doc.text {
3618 chunk_plan = plan_text_chunks(text);
3619 }
3620 }
3621
3622 if let Some(mime) = doc.mime_type.as_ref() {
3623 if let Some(existing) = &mut metadata {
3624 if existing.mime.is_none() {
3625 existing.mime = Some(mime.clone());
3626 }
3627 } else {
3628 let mut doc_meta = DocMetadata::default();
3629 doc_meta.mime = Some(mime.clone());
3630 metadata = Some(doc_meta);
3631 }
3632 }
3633
3634 if options.auto_tag {
3637 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3638 {
3639 extra_metadata
3640 .entry("extractous_metadata".to_string())
3641 .or_insert(meta_json);
3642 }
3643 }
3644 }
3645
3646 if options.auto_tag {
3647 if let Some(ref text) = search_text {
3648 if !text.trim().is_empty() {
3649 let result = AutoTagger.analyse(text, options.extract_dates);
3650 merge_unique(&mut tags, result.tags);
3651 merge_unique(&mut labels, result.labels);
3652 if options.extract_dates && content_dates.is_empty() {
3653 content_dates = result.content_dates;
3654 }
3655 }
3656 }
3657 }
3658
3659 if content_dates.is_empty() {
3660 if let Some(frame) = reuse_frame.as_ref() {
3661 content_dates = frame.content_dates.clone();
3662 }
3663 }
3664
3665 let metadata_ref = metadata.as_ref();
3666 let mut search_text = augment_search_text(
3667 search_text,
3668 options.uri.as_deref(),
3669 options.title.as_deref(),
3670 options.track.as_deref(),
3671 &tags,
3672 &labels,
3673 &extra_metadata,
3674 &content_dates,
3675 metadata_ref,
3676 );
3677 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3678 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3679 let mut parent_chunk_count: Option<u32> = None;
3680
3681 let kind_value = options.kind.take();
3682 let track_value = options.track.take();
3683 let uri_value = options.uri.take();
3684 let title_value = options.title.take();
3685 let should_extract_triplets = options.extract_triplets;
3686 let triplet_uri = uri_value.clone();
3688 let triplet_title = title_value.clone();
3689
3690 if let Some(plan) = chunk_plan.as_ref() {
3691 let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3692 parent_chunk_manifest = Some(plan.manifest.clone());
3693 parent_chunk_count = Some(chunk_total);
3694
3695 if let Some(first_chunk) = plan.chunks.first() {
3696 if let Some(normalized) =
3697 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3698 {
3699 if !normalized.trim().is_empty() {
3700 search_text = Some(normalized);
3701 }
3702 }
3703 }
3704
3705 let chunk_tags = tags.clone();
3706 let chunk_labels = labels.clone();
3707 let chunk_metadata = metadata.clone();
3708 let chunk_extra_metadata = extra_metadata.clone();
3709 let chunk_content_dates = content_dates.clone();
3710
3711 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3712 let (chunk_payload, chunk_encoding, chunk_length) =
3713 prepare_canonical_payload(chunk_text.as_bytes())?;
3714 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3715 .map(|n| n.text)
3716 .filter(|text| !text.trim().is_empty());
3717
3718 let chunk_uri = uri_value
3719 .as_ref()
3720 .map(|uri| format!("{uri}#page-{}", idx + 1));
3721 let chunk_title = title_value
3722 .as_ref()
3723 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3724
3725 let chunk_embedding = chunk_embeddings
3727 .as_ref()
3728 .and_then(|embeddings| embeddings.get(idx).cloned());
3729
3730 chunk_entries.push(WalEntryData {
3731 timestamp,
3732 kind: kind_value.clone(),
3733 track: track_value.clone(),
3734 payload: chunk_payload,
3735 embedding: chunk_embedding,
3736 uri: chunk_uri,
3737 title: chunk_title,
3738 canonical_encoding: chunk_encoding,
3739 canonical_length: chunk_length,
3740 metadata: chunk_metadata.clone(),
3741 search_text: chunk_search_text,
3742 tags: chunk_tags.clone(),
3743 labels: chunk_labels.clone(),
3744 extra_metadata: chunk_extra_metadata.clone(),
3745 content_dates: chunk_content_dates.clone(),
3746 chunk_manifest: None,
3747 role: FrameRole::DocumentChunk,
3748 parent_sequence: None,
3749 chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3750 chunk_count: Some(chunk_total),
3751 op: FrameWalOp::Insert,
3752 target_frame_id: None,
3753 supersedes_frame_id: None,
3754 reuse_payload_from: None,
3755 source_sha256: None, source_path: None,
3757 enrichment_state: crate::types::EnrichmentState::Enriched,
3759 });
3760 }
3761 }
3762
3763 let parent_uri = uri_value.clone();
3764 let parent_title = title_value.clone();
3765
3766 let parent_sequence = if let Some(parent_id) = options.parent_id {
3769 usize::try_from(parent_id)
3774 .ok()
3775 .and_then(|idx| self.toc.frames.get(idx))
3776 .map(|_| parent_id + 2) } else {
3778 None
3779 };
3780
3781 let triplet_text = search_text.clone();
3783
3784 #[cfg(feature = "lex")]
3786 let instant_index_tags = if options.instant_index {
3787 tags.clone()
3788 } else {
3789 Vec::new()
3790 };
3791 #[cfg(feature = "lex")]
3792 let instant_index_labels = if options.instant_index {
3793 labels.clone()
3794 } else {
3795 Vec::new()
3796 };
3797
3798 #[cfg(feature = "lex")]
3800 let needs_enrichment =
3801 options.instant_index && (options.enable_embedding || is_skim_extraction);
3802 #[cfg(feature = "lex")]
3803 let enrichment_state = if needs_enrichment {
3804 crate::types::EnrichmentState::Searchable
3805 } else {
3806 crate::types::EnrichmentState::Enriched
3807 };
3808 #[cfg(not(feature = "lex"))]
3809 let enrichment_state = crate::types::EnrichmentState::Enriched;
3810
3811 let entry = WalEntryData {
3812 timestamp,
3813 kind: kind_value,
3814 track: track_value,
3815 payload: storage_payload,
3816 embedding,
3817 uri: parent_uri,
3818 title: parent_title,
3819 canonical_encoding,
3820 canonical_length,
3821 metadata,
3822 search_text,
3823 tags,
3824 labels,
3825 extra_metadata,
3826 content_dates,
3827 chunk_manifest: parent_chunk_manifest,
3828 role: options.role,
3829 parent_sequence,
3830 chunk_index: None,
3831 chunk_count: parent_chunk_count,
3832 op: FrameWalOp::Insert,
3833 target_frame_id: None,
3834 supersedes_frame_id: supersedes,
3835 reuse_payload_from,
3836 source_sha256,
3837 source_path: source_path_value,
3838 enrichment_state,
3839 };
3840
3841 let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3842 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3843 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3844
3845 #[cfg(feature = "lex")]
3848 if options.instant_index && self.tantivy.is_some() {
3849 let frame_id = parent_seq as FrameId;
3851
3852 if let Some(ref text) = triplet_text {
3854 if !text.trim().is_empty() {
3855 let temp_frame = Frame {
3857 id: frame_id,
3858 timestamp,
3859 anchor_ts: None,
3860 anchor_source: None,
3861 kind: options.kind.clone(),
3862 track: options.track.clone(),
3863 payload_offset: 0,
3864 payload_length: 0,
3865 checksum: [0u8; 32],
3866 uri: options
3867 .uri
3868 .clone()
3869 .or_else(|| Some(crate::default_uri(frame_id))),
3870 title: options.title.clone(),
3871 canonical_encoding: crate::types::CanonicalEncoding::default(),
3872 canonical_length: None,
3873 metadata: None, search_text: triplet_text.clone(),
3875 tags: instant_index_tags.clone(),
3876 labels: instant_index_labels.clone(),
3877 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3880 role: options.role,
3881 parent_id: None,
3882 chunk_index: None,
3883 chunk_count: None,
3884 status: FrameStatus::Active,
3885 supersedes,
3886 superseded_by: None,
3887 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3890 };
3891
3892 if let Some(engine) = self.tantivy.as_mut() {
3894 engine.add_frame(&temp_frame, text)?;
3895 engine.soft_commit()?;
3896 self.tantivy_dirty = true;
3897
3898 tracing::debug!(
3899 frame_id = frame_id,
3900 "instant index: frame searchable immediately"
3901 );
3902 }
3903 }
3904 }
3905 }
3906
3907 #[cfg(feature = "lex")]
3911 if needs_enrichment {
3912 let frame_id = parent_seq as FrameId;
3913 self.toc.enrichment_queue.push(frame_id);
3914 tracing::debug!(
3915 frame_id = frame_id,
3916 is_skim = is_skim_extraction,
3917 needs_embedding = options.enable_embedding,
3918 "queued frame for background enrichment"
3919 );
3920 }
3921
3922 for mut chunk_entry in chunk_entries {
3923 chunk_entry.parent_sequence = Some(parent_seq);
3924 let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3925 self.append_wal_entry(&chunk_bytes)?;
3926 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3927 }
3928
3929 self.dirty = true;
3930 let suppress_checkpoint = self
3931 .batch_opts
3932 .as_ref()
3933 .is_some_and(|o| o.disable_auto_checkpoint);
3934 if !suppress_checkpoint && self.wal.should_checkpoint() {
3935 self.commit()?;
3936 }
3937
3938 #[cfg(feature = "replay")]
3940 if let Some(input_bytes) = payload {
3941 self.record_put_action(parent_seq, input_bytes);
3942 }
3943
3944 if should_extract_triplets {
3947 if let Some(ref text) = triplet_text {
3948 if !text.trim().is_empty() {
3949 let extractor = TripletExtractor::default();
3950 let frame_id = parent_seq as FrameId;
3951 let (cards, _stats) = extractor.extract(
3952 frame_id,
3953 text,
3954 triplet_uri.as_deref(),
3955 triplet_title.as_deref(),
3956 timestamp,
3957 );
3958
3959 if !cards.is_empty() {
3960 let card_ids = self.memories_track.add_cards(cards);
3962
3963 self.memories_track
3965 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3966 }
3967 }
3968 }
3969 }
3970
3971 Ok(parent_seq)
3972 }
3973}
3974
3975#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3976#[serde(rename_all = "snake_case")]
3977pub(crate) enum FrameWalOp {
3978 Insert,
3979 Tombstone,
3980}
3981
3982impl Default for FrameWalOp {
3983 fn default() -> Self {
3984 Self::Insert
3985 }
3986}
3987
3988#[derive(Debug, Serialize, Deserialize)]
3989enum WalEntry {
3990 Frame(WalEntryData),
3991 #[cfg(feature = "lex")]
3992 Lex(LexWalBatch),
3993}
3994
3995fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3996 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3997 return Ok(entry);
3998 }
3999 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
4000 Ok(WalEntry::Frame(legacy))
4001}
4002
4003#[derive(Debug, Serialize, Deserialize)]
4004pub(crate) struct WalEntryData {
4005 pub(crate) timestamp: i64,
4006 pub(crate) kind: Option<String>,
4007 pub(crate) track: Option<String>,
4008 pub(crate) payload: Vec<u8>,
4009 pub(crate) embedding: Option<Vec<f32>>,
4010 #[serde(default)]
4011 pub(crate) uri: Option<String>,
4012 #[serde(default)]
4013 pub(crate) title: Option<String>,
4014 #[serde(default)]
4015 pub(crate) canonical_encoding: CanonicalEncoding,
4016 #[serde(default)]
4017 pub(crate) canonical_length: Option<u64>,
4018 #[serde(default)]
4019 pub(crate) metadata: Option<DocMetadata>,
4020 #[serde(default)]
4021 pub(crate) search_text: Option<String>,
4022 #[serde(default)]
4023 pub(crate) tags: Vec<String>,
4024 #[serde(default)]
4025 pub(crate) labels: Vec<String>,
4026 #[serde(default)]
4027 pub(crate) extra_metadata: BTreeMap<String, String>,
4028 #[serde(default)]
4029 pub(crate) content_dates: Vec<String>,
4030 #[serde(default)]
4031 pub(crate) chunk_manifest: Option<TextChunkManifest>,
4032 #[serde(default)]
4033 pub(crate) role: FrameRole,
4034 #[serde(default)]
4035 pub(crate) parent_sequence: Option<u64>,
4036 #[serde(default)]
4037 pub(crate) chunk_index: Option<u32>,
4038 #[serde(default)]
4039 pub(crate) chunk_count: Option<u32>,
4040 #[serde(default)]
4041 pub(crate) op: FrameWalOp,
4042 #[serde(default)]
4043 pub(crate) target_frame_id: Option<FrameId>,
4044 #[serde(default)]
4045 pub(crate) supersedes_frame_id: Option<FrameId>,
4046 #[serde(default)]
4047 pub(crate) reuse_payload_from: Option<FrameId>,
4048 #[serde(default)]
4050 pub(crate) source_sha256: Option<[u8; 32]>,
4051 #[serde(default)]
4053 pub(crate) source_path: Option<String>,
4054 #[serde(default)]
4056 pub(crate) enrichment_state: crate::types::EnrichmentState,
4057}
4058
4059pub(crate) fn prepare_canonical_payload(
4060 payload: &[u8],
4061) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4062 prepare_canonical_payload_with_level(payload, 3)
4063}
4064
4065pub(crate) fn prepare_canonical_payload_with_level(
4066 payload: &[u8],
4067 level: i32,
4068) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4069 if level == 0 {
4070 return Ok((
4072 payload.to_vec(),
4073 CanonicalEncoding::Plain,
4074 Some(payload.len() as u64),
4075 ));
4076 }
4077 if std::str::from_utf8(payload).is_ok() {
4078 let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4079 Ok((
4080 compressed,
4081 CanonicalEncoding::Zstd,
4082 Some(payload.len() as u64),
4083 ))
4084 } else {
4085 Ok((
4086 payload.to_vec(),
4087 CanonicalEncoding::Plain,
4088 Some(payload.len() as u64),
4089 ))
4090 }
4091}
4092
4093pub(crate) fn augment_search_text(
4094 base: Option<String>,
4095 uri: Option<&str>,
4096 title: Option<&str>,
4097 track: Option<&str>,
4098 tags: &[String],
4099 labels: &[String],
4100 extra_metadata: &BTreeMap<String, String>,
4101 content_dates: &[String],
4102 metadata: Option<&DocMetadata>,
4103) -> Option<String> {
4104 let mut segments: Vec<String> = Vec::new();
4105 if let Some(text) = base {
4106 let trimmed = text.trim();
4107 if !trimmed.is_empty() {
4108 segments.push(trimmed.to_string());
4109 }
4110 }
4111
4112 if let Some(title) = title {
4113 if !title.trim().is_empty() {
4114 segments.push(format!("title: {}", title.trim()));
4115 }
4116 }
4117
4118 if let Some(uri) = uri {
4119 if !uri.trim().is_empty() {
4120 segments.push(format!("uri: {}", uri.trim()));
4121 }
4122 }
4123
4124 if let Some(track) = track {
4125 if !track.trim().is_empty() {
4126 segments.push(format!("track: {}", track.trim()));
4127 }
4128 }
4129
4130 if !tags.is_empty() {
4131 segments.push(format!("tags: {}", tags.join(" ")));
4132 }
4133
4134 if !labels.is_empty() {
4135 segments.push(format!("labels: {}", labels.join(" ")));
4136 }
4137
4138 if !extra_metadata.is_empty() {
4139 for (key, value) in extra_metadata {
4140 if value.trim().is_empty() {
4141 continue;
4142 }
4143 segments.push(format!("{key}: {value}"));
4144 }
4145 }
4146
4147 if !content_dates.is_empty() {
4148 segments.push(format!("dates: {}", content_dates.join(" ")));
4149 }
4150
4151 if let Some(meta) = metadata {
4152 if let Ok(meta_json) = serde_json::to_string(meta) {
4153 segments.push(format!("metadata: {meta_json}"));
4154 }
4155 }
4156
4157 if segments.is_empty() {
4158 None
4159 } else {
4160 Some(segments.join("\n"))
4161 }
4162}
4163
4164pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4165 if additions.is_empty() {
4166 return;
4167 }
4168 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4169 for value in additions {
4170 let trimmed = value.trim();
4171 if trimmed.is_empty() {
4172 continue;
4173 }
4174 let candidate = trimmed.to_string();
4175 if seen.insert(candidate.clone()) {
4176 target.push(candidate);
4177 }
4178 }
4179}