1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutManyOpts,
58 PutOptions, SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 #[must_use]
210 pub fn new(mode: CommitMode) -> Self {
211 Self {
212 mode,
213 background: false,
214 }
215 }
216
217 #[must_use]
218 pub fn background(mut self, background: bool) -> Self {
219 self.background = background;
220 self
221 }
222}
223
224fn default_reader_registry() -> &'static ReaderRegistry {
225 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
226 REGISTRY.get_or_init(ReaderRegistry::default)
227}
228
229fn infer_document_format(
230 mime: Option<&str>,
231 magic: Option<&[u8]>,
232 uri: Option<&str>,
233) -> Option<DocumentFormat> {
234 if detect_pdf_magic(magic) {
236 return Some(DocumentFormat::Pdf);
237 }
238
239 if let Some(magic_bytes) = magic {
242 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
243 if let Some(format) = infer_format_from_extension(uri) {
245 return Some(format);
246 }
247 }
248 }
249
250 if let Some(mime_str) = mime {
252 let mime_lower = mime_str.trim().to_ascii_lowercase();
253 let format = match mime_lower.as_str() {
254 "application/pdf" => Some(DocumentFormat::Pdf),
255 "text/plain" => Some(DocumentFormat::PlainText),
256 "text/markdown" => Some(DocumentFormat::Markdown),
257 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
258 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
259 Some(DocumentFormat::Docx)
260 }
261 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
262 Some(DocumentFormat::Xlsx)
263 }
264 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
265 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
266 Some(DocumentFormat::Pptx)
267 }
268 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
269 _ => None,
270 };
271 if format.is_some() {
272 return format;
273 }
274 }
275
276 infer_format_from_extension(uri)
278}
279
280fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
282 let uri = uri?;
283 let path = std::path::Path::new(uri);
284 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
285 match ext.as_str() {
286 "pdf" => Some(DocumentFormat::Pdf),
287 "docx" => Some(DocumentFormat::Docx),
288 "xlsx" => Some(DocumentFormat::Xlsx),
289 "xls" => Some(DocumentFormat::Xls),
290 "pptx" => Some(DocumentFormat::Pptx),
291 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
292 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
293 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
294 | "sql" => Some(DocumentFormat::PlainText),
295 "md" | "markdown" => Some(DocumentFormat::Markdown),
296 "html" | "htm" => Some(DocumentFormat::Html),
297 _ => None,
298 }
299}
300
301fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
302 let mut slice = match magic {
303 Some(slice) if !slice.is_empty() => slice,
304 _ => return false,
305 };
306 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
307 slice = &slice[3..];
308 }
309 while let Some((first, rest)) = slice.split_first() {
310 if first.is_ascii_whitespace() {
311 slice = rest;
312 } else {
313 break;
314 }
315 }
316 slice.starts_with(b"%PDF")
317}
318
319#[instrument(
320 target = "memvid::extract",
321 skip_all,
322 fields(mime = mime_hint, uri = uri)
323)]
324fn extract_via_registry(
325 bytes: &[u8],
326 mime_hint: Option<&str>,
327 uri: Option<&str>,
328) -> Result<ExtractedDocument> {
329 let registry = default_reader_registry();
330 let magic = bytes
331 .get(..MAGIC_SNIFF_BYTES)
332 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
333 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
334 .with_uri(uri)
335 .with_magic(magic);
336
337 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
338 let start = Instant::now();
339 match reader.extract(bytes, &hint) {
340 Ok(output) => {
341 return Ok(finalize_reader_output(output, start));
342 }
343 Err(err) => {
344 tracing::error!(
345 target = "memvid::extract",
346 reader = reader.name(),
347 error = %err,
348 "reader failed; falling back"
349 );
350 Some(format!("reader {} failed: {err}", reader.name()))
351 }
352 }
353 } else {
354 tracing::debug!(
355 target = "memvid::extract",
356 format = hint.format.map(super::super::reader::DocumentFormat::label),
357 "no reader matched; using default extractor"
358 );
359 Some("no registered reader matched; using default extractor".to_string())
360 };
361
362 let start = Instant::now();
363 let mut output = PassthroughReader.extract(bytes, &hint)?;
364 if let Some(reason) = fallback_reason {
365 output.diagnostics.track_warning(reason);
366 }
367 Ok(finalize_reader_output(output, start))
368}
369
370fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
371 let elapsed = start.elapsed();
372 let ReaderOutput {
373 document,
374 reader_name,
375 diagnostics,
376 } = output;
377 log_reader_result(&reader_name, &diagnostics, elapsed);
378 document
379}
380
381fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
382 let duration_ms = diagnostics
383 .duration_ms
384 .unwrap_or(elapsed.as_millis().try_into().unwrap_or(u64::MAX));
385 let warnings = diagnostics.warnings.len();
386 let pages = diagnostics.pages_processed;
387
388 if warnings > 0 || diagnostics.fallback {
389 tracing::warn!(
390 target = "memvid::extract",
391 reader,
392 duration_ms,
393 pages,
394 warnings,
395 fallback = diagnostics.fallback,
396 "extraction completed with warnings"
397 );
398 for warning in &diagnostics.warnings {
399 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
400 }
401 } else {
402 tracing::info!(
403 target = "memvid::extract",
404 reader,
405 duration_ms,
406 pages,
407 "extraction completed"
408 );
409 }
410}
411
412impl Memvid {
413 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
416 where
417 F: FnOnce(&mut Self) -> Result<()>,
418 {
419 self.file.sync_all()?;
420 let mut staging = CommitStaging::prepare(self.path())?;
421 staging.copy_from(&self.file)?;
422
423 let staging_handle = staging.clone_file()?;
424 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
425 let original_file = std::mem::replace(&mut self.file, staging_handle);
426 let original_wal = std::mem::replace(&mut self.wal, new_wal);
427 let original_header = self.header.clone();
428 let original_toc = self.toc.clone();
429 let original_data_end = self.data_end;
430 let original_generation = self.generation;
431 let original_dirty = self.dirty;
432 #[cfg(feature = "lex")]
433 let original_tantivy_dirty = self.tantivy_dirty;
434
435 let destination_path = self.path().to_path_buf();
436 let mut original_file = Some(original_file);
437 let mut original_wal = Some(original_wal);
438
439 match op(self) {
440 Ok(()) => {
441 self.file.sync_all()?;
442 match staging.commit() {
443 Ok(()) => {
444 drop(original_file.take());
445 drop(original_wal.take());
446 self.file = OpenOptions::new()
447 .read(true)
448 .write(true)
449 .open(&destination_path)?;
450 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
451 Ok(())
452 }
453 Err(commit_err) => {
454 if let Some(file) = original_file.take() {
455 self.file = file;
456 }
457 if let Some(wal) = original_wal.take() {
458 self.wal = wal;
459 }
460 self.header = original_header;
461 self.toc = original_toc;
462 self.data_end = original_data_end;
463 self.generation = original_generation;
464 self.dirty = original_dirty;
465 #[cfg(feature = "lex")]
466 {
467 self.tantivy_dirty = original_tantivy_dirty;
468 }
469 Err(commit_err)
470 }
471 }
472 }
473 Err(err) => {
474 let _ = staging.discard();
475 if let Some(file) = original_file.take() {
476 self.file = file;
477 }
478 if let Some(wal) = original_wal.take() {
479 self.wal = wal;
480 }
481 self.header = original_header;
482 self.toc = original_toc;
483 self.data_end = original_data_end;
484 self.generation = original_generation;
485 self.dirty = original_dirty;
486 #[cfg(feature = "lex")]
487 {
488 self.tantivy_dirty = original_tantivy_dirty;
489 }
490 Err(err)
491 }
492 }
493 }
494
495 pub(crate) fn catalog_data_end(&self) -> u64 {
496 let mut max_end = self.header.wal_offset + self.header.wal_size;
497
498 for descriptor in &self.toc.segment_catalog.lex_segments {
499 if descriptor.common.bytes_length == 0 {
500 continue;
501 }
502 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
503 }
504
505 for descriptor in &self.toc.segment_catalog.vec_segments {
506 if descriptor.common.bytes_length == 0 {
507 continue;
508 }
509 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
510 }
511
512 for descriptor in &self.toc.segment_catalog.time_segments {
513 if descriptor.common.bytes_length == 0 {
514 continue;
515 }
516 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
517 }
518
519 #[cfg(feature = "temporal_track")]
520 for descriptor in &self.toc.segment_catalog.temporal_segments {
521 if descriptor.common.bytes_length == 0 {
522 continue;
523 }
524 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
525 }
526
527 #[cfg(feature = "lex")]
528 for descriptor in &self.toc.segment_catalog.tantivy_segments {
529 if descriptor.common.bytes_length == 0 {
530 continue;
531 }
532 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
533 }
534
535 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
536 if manifest.bytes_length != 0 {
537 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
538 }
539 }
540
541 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
542 if manifest.bytes_length != 0 {
543 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
544 }
545 }
546
547 if let Some(manifest) = self.toc.time_index.as_ref() {
548 if manifest.bytes_length != 0 {
549 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
550 }
551 }
552
553 #[cfg(feature = "temporal_track")]
554 if let Some(track) = self.toc.temporal_track.as_ref() {
555 if track.bytes_length != 0 {
556 max_end = max_end.max(track.bytes_offset + track.bytes_length);
557 }
558 }
559
560 max_end
561 }
562
563 fn payload_region_end(&self) -> u64 {
564 self.cached_payload_end
565 }
566
567 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
568 loop {
569 match self.wal.append_entry(payload) {
570 Ok(seq) => return Ok(seq),
571 Err(MemvidError::CheckpointFailed { reason })
572 if reason == "embedded WAL region too small for entry"
573 || reason == "embedded WAL region full" =>
574 {
575 let required = WAL_ENTRY_HEADER_SIZE
578 .saturating_add(payload.len() as u64)
579 .max(self.header.wal_size + 1);
580 self.grow_wal_region(required)?;
581 }
582 Err(err) => return Err(err),
583 }
584 }
585 }
586
587 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
588 let mut new_size = self.header.wal_size;
589 let mut target = required_entry_size;
590 if target == 0 {
591 target = self.header.wal_size;
592 }
593 while new_size <= target {
594 new_size = new_size
595 .checked_mul(2)
596 .ok_or_else(|| MemvidError::CheckpointFailed {
597 reason: "wal_size overflow".into(),
598 })?;
599 }
600 let delta = new_size - self.header.wal_size;
601 if delta == 0 {
602 return Ok(());
603 }
604
605 self.shift_data_for_wal_growth(delta)?;
606 self.header.wal_size = new_size;
607 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
608 self.data_end = self.data_end.saturating_add(delta);
609 self.adjust_offsets_after_wal_growth(delta);
610
611 let catalog_end = self.catalog_data_end();
612 self.header.footer_offset = catalog_end
613 .max(self.header.footer_offset)
614 .max(self.data_end);
615
616 self.rewrite_toc_footer()?;
617 self.header.toc_checksum = self.toc.toc_checksum;
618 crate::persist_header(&mut self.file, &self.header)?;
619 self.file.sync_all()?;
620 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
621 Ok(())
622 }
623
624 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
625 if delta == 0 {
626 return Ok(());
627 }
628 let original_len = self.file.metadata()?.len();
629 let data_start = self.header.wal_offset + self.header.wal_size;
630 self.file.set_len(original_len + delta)?;
631
632 let mut remaining = original_len.saturating_sub(data_start);
633 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
634 while remaining > 0 {
635 let chunk = min(remaining, buffer.len() as u64);
636 let src = data_start + remaining - chunk;
637 self.file.seek(SeekFrom::Start(src))?;
638 #[allow(clippy::cast_possible_truncation)]
639 self.file.read_exact(&mut buffer[..chunk as usize])?;
640 let dst = src + delta;
641 self.file.seek(SeekFrom::Start(dst))?;
642 #[allow(clippy::cast_possible_truncation)]
643 self.file.write_all(&buffer[..chunk as usize])?;
644 remaining -= chunk;
645 }
646
647 self.file.seek(SeekFrom::Start(data_start))?;
648 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
649 let mut remaining = delta;
650 while remaining > 0 {
651 let write = min(remaining, zero_buf.len() as u64);
652 #[allow(clippy::cast_possible_truncation)]
653 self.file.write_all(&zero_buf[..write as usize])?;
654 remaining -= write;
655 }
656 Ok(())
657 }
658
659 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
660 if delta == 0 {
661 return;
662 }
663
664 for frame in &mut self.toc.frames {
665 if frame.payload_offset != 0 {
666 frame.payload_offset += delta;
667 }
668 }
669
670 for segment in &mut self.toc.segments {
671 if segment.bytes_offset != 0 {
672 segment.bytes_offset += delta;
673 }
674 }
675
676 if let Some(lex) = self.toc.indexes.lex.as_mut() {
677 if lex.bytes_offset != 0 {
678 lex.bytes_offset += delta;
679 }
680 }
681 for manifest in &mut self.toc.indexes.lex_segments {
682 if manifest.bytes_offset != 0 {
683 manifest.bytes_offset += delta;
684 }
685 }
686 if let Some(vec) = self.toc.indexes.vec.as_mut() {
687 if vec.bytes_offset != 0 {
688 vec.bytes_offset += delta;
689 }
690 }
691 if let Some(time_index) = self.toc.time_index.as_mut() {
692 if time_index.bytes_offset != 0 {
693 time_index.bytes_offset += delta;
694 }
695 }
696 #[cfg(feature = "temporal_track")]
697 if let Some(track) = self.toc.temporal_track.as_mut() {
698 if track.bytes_offset != 0 {
699 track.bytes_offset += delta;
700 }
701 }
702
703 let catalog = &mut self.toc.segment_catalog;
704 for descriptor in &mut catalog.lex_segments {
705 if descriptor.common.bytes_offset != 0 {
706 descriptor.common.bytes_offset += delta;
707 }
708 }
709 for descriptor in &mut catalog.vec_segments {
710 if descriptor.common.bytes_offset != 0 {
711 descriptor.common.bytes_offset += delta;
712 }
713 }
714 for descriptor in &mut catalog.time_segments {
715 if descriptor.common.bytes_offset != 0 {
716 descriptor.common.bytes_offset += delta;
717 }
718 }
719 #[cfg(feature = "temporal_track")]
720 for descriptor in &mut catalog.temporal_segments {
721 if descriptor.common.bytes_offset != 0 {
722 descriptor.common.bytes_offset += delta;
723 }
724 }
725 for descriptor in &mut catalog.tantivy_segments {
726 if descriptor.common.bytes_offset != 0 {
727 descriptor.common.bytes_offset += delta;
728 }
729 }
730
731 #[cfg(feature = "lex")]
732 if let Ok(mut storage) = self.lex_storage.write() {
733 storage.adjust_offsets(delta);
734 }
735 }
736 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
737 self.ensure_writable()?;
738 if options.background {
739 tracing::debug!("commit background flag ignored; running synchronously");
740 }
741 let mode = options.mode;
742 let records = self.wal.pending_records()?;
743 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
744 return Ok(());
745 }
746 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
747 }
748
749 pub fn commit(&mut self) -> Result<()> {
750 self.ensure_writable()?;
751 self.commit_with_options(CommitOptions::new(CommitMode::Full))
752 }
753
754 pub fn begin_batch(&mut self, opts: PutManyOpts) -> Result<()> {
765 if opts.wal_pre_size_bytes > 0 {
766 self.ensure_wal_capacity(opts.wal_pre_size_bytes)?;
767 }
768 self.wal.set_skip_sync(opts.skip_sync);
769 self.batch_opts = Some(opts);
770 Ok(())
771 }
772
773 fn ensure_wal_capacity(&mut self, min_bytes: u64) -> Result<()> {
782 if min_bytes <= self.header.wal_size {
783 return Ok(());
784 }
785 let target = min_bytes.next_power_of_two();
787 let delta = target.saturating_sub(self.header.wal_size);
788 if delta == 0 {
789 return Ok(());
790 }
791
792 tracing::info!(
793 current_wal = self.header.wal_size,
794 target_wal = target,
795 delta,
796 "pre-sizing WAL for batch mode"
797 );
798
799 self.shift_data_for_wal_growth(delta)?;
800 self.header.wal_size = target;
801 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
802 self.data_end = self.data_end.saturating_add(delta);
803 self.adjust_offsets_after_wal_growth(delta);
804
805 let catalog_end = self.catalog_data_end();
806 self.header.footer_offset = catalog_end
807 .max(self.header.footer_offset)
808 .max(self.data_end);
809
810 self.rewrite_toc_footer()?;
811 self.header.toc_checksum = self.toc.toc_checksum;
812 crate::persist_header(&mut self.file, &self.header)?;
813 self.file.sync_all()?;
814 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
815 Ok(())
816 }
817
818 pub fn end_batch(&mut self) -> Result<()> {
823 self.wal.flush()?;
825 self.wal.set_skip_sync(false);
826 self.batch_opts = None;
827 Ok(())
828 }
829
830 pub fn commit_skip_indexes(&mut self) -> Result<()> {
839 self.ensure_writable()?;
840 let records = self.wal.pending_records()?;
841 if records.is_empty() && !self.dirty {
842 return Ok(());
843 }
844 self.commit_skip_indexes_inner(records)
845 }
846
847 fn commit_skip_indexes_inner(&mut self, records: Vec<WalRecord>) -> Result<()> {
848 self.generation = self.generation.wrapping_add(1);
849
850 #[cfg(feature = "lex")]
853 let tantivy_backup = self.tantivy.take();
854
855 let result = self.apply_records(records);
856
857 #[cfg(feature = "lex")]
859 {
860 self.tantivy = tantivy_backup;
861 self.tantivy_dirty = false;
862 }
863
864 let _delta = result?;
865
866 self.header.footer_offset = self.data_end;
868
869 self.toc.time_index = None;
872 self.toc.indexes.lex_segments.clear();
873 if let Some(vec) = self.toc.indexes.vec.as_mut() {
875 vec.bytes_offset = 0;
876 vec.bytes_length = 0;
877 vec.vector_count = 0;
878 }
879 self.toc.indexes.lex = None;
880 self.toc.indexes.clip = None;
881 self.toc.segment_catalog.lex_segments.clear();
882 self.toc.segment_catalog.vec_segments.clear();
883 self.toc.segment_catalog.time_segments.clear();
884 self.toc.segment_catalog.tantivy_segments.clear();
885 #[cfg(feature = "temporal_track")]
886 {
887 self.toc.temporal_track = None;
888 self.toc.segment_catalog.temporal_segments.clear();
889 }
890 self.toc.memories_track = None;
891 self.toc.logic_mesh = None;
892 self.toc.sketch_track = None;
893
894 self.rewrite_toc_footer()?;
895 self.header.toc_checksum = self.toc.toc_checksum;
896 self.wal.record_checkpoint(&mut self.header)?;
897 self.header.toc_checksum = self.toc.toc_checksum;
898 crate::persist_header(&mut self.file, &self.header)?;
899 self.file.sync_all()?;
900 self.pending_frame_inserts = 0;
901 self.dirty = false;
902 Ok(())
903 }
904
905 pub fn finalize_indexes(&mut self) -> Result<()> {
911 self.ensure_writable()?;
912 self.rebuild_indexes(&[], &[])?;
913 self.rewrite_toc_footer()?;
914 self.header.toc_checksum = self.toc.toc_checksum;
915 crate::persist_header(&mut self.file, &self.header)?;
916 self.file.sync_all()?;
917 Ok(())
918 }
919
920 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
921 self.generation = self.generation.wrapping_add(1);
922
923 let delta = self.apply_records(records)?;
924 let mut indexes_rebuilt = false;
925
926 let clip_needs_persist = self.clip_index.as_ref().is_some_and(|idx| !idx.is_empty());
928
929 if !delta.is_empty() || clip_needs_persist {
930 tracing::debug!(
931 inserted_frames = delta.inserted_frames.len(),
932 inserted_embeddings = delta.inserted_embeddings.len(),
933 inserted_time_entries = delta.inserted_time_entries.len(),
934 clip_needs_persist = clip_needs_persist,
935 "commit applied delta"
936 );
937 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
938 indexes_rebuilt = true;
939 }
940
941 if !indexes_rebuilt && self.tantivy_index_pending() {
942 self.flush_tantivy()?;
943 }
944
945 if !indexes_rebuilt && self.clip_enabled {
947 if let Some(ref clip_index) = self.clip_index {
948 if !clip_index.is_empty() {
949 self.persist_clip_index()?;
950 }
951 }
952 }
953
954 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
956 self.persist_memories_track()?;
957 }
958
959 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
961 self.persist_logic_mesh()?;
962 }
963
964 if !self.sketch_track.is_empty() {
966 self.persist_sketch_track()?;
967 }
968
969 self.rewrite_toc_footer()?;
973 self.header.toc_checksum = self.toc.toc_checksum;
974 self.wal.record_checkpoint(&mut self.header)?;
975 self.header.toc_checksum = self.toc.toc_checksum;
976 crate::persist_header(&mut self.file, &self.header)?;
977 self.file.sync_all()?;
978 #[cfg(feature = "parallel_segments")]
979 if let Some(wal) = self.manifest_wal.as_mut() {
980 wal.flush()?;
981 wal.truncate()?;
982 }
983 self.pending_frame_inserts = 0;
984 self.dirty = false;
985 Ok(())
986 }
987
988 #[cfg(feature = "parallel_segments")]
989 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
990 self.ensure_writable()?;
991 if !self.dirty && !self.tantivy_index_pending() {
992 return Ok(());
993 }
994 let opts = opts.clone();
995 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
996 }
997
998 #[cfg(feature = "parallel_segments")]
999 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
1000 if !self.dirty && !self.tantivy_index_pending() {
1001 return Ok(());
1002 }
1003 let records = self.wal.pending_records()?;
1004 let delta = self.apply_records(records)?;
1005 self.generation = self.generation.wrapping_add(1);
1006 let mut indexes_rebuilt = false;
1007 if !delta.is_empty() {
1008 tracing::info!(
1009 inserted_frames = delta.inserted_frames.len(),
1010 inserted_embeddings = delta.inserted_embeddings.len(),
1011 inserted_time_entries = delta.inserted_time_entries.len(),
1012 "parallel commit applied delta"
1013 );
1014 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
1016 tracing::info!(
1017 "parallel_commit: used_parallel={}, lex_enabled={}",
1018 used_parallel,
1019 self.lex_enabled
1020 );
1021 if used_parallel {
1022 self.header.footer_offset = self.data_end;
1025 indexes_rebuilt = true;
1026
1027 #[cfg(feature = "lex")]
1030 if self.lex_enabled {
1031 tracing::info!(
1032 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
1033 delta.inserted_frames.len(),
1034 self.toc.frames.len()
1035 );
1036
1037 let tantivy_was_present = self.tantivy.is_some();
1041 if self.tantivy.is_none() {
1042 self.init_tantivy()?;
1043 }
1044
1045 if tantivy_was_present {
1047 tracing::info!(
1048 "parallel_commit: skipping Tantivy indexing (already indexed during put)"
1049 );
1050 } else {
1051 let max_payload = crate::memvid::search::max_index_payload();
1053 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
1054
1055 for frame_id in &delta.inserted_frames {
1056 let frame = match self.toc.frames.get(*frame_id as usize) {
1058 Some(f) => f.clone(),
1059 None => continue,
1060 };
1061
1062 let explicit_text = frame.search_text.clone();
1064 if let Some(ref search_text) = explicit_text {
1065 if !search_text.trim().is_empty() {
1066 prepared_docs.push((frame, search_text.clone()));
1067 continue;
1068 }
1069 }
1070
1071 let mime = frame
1073 .metadata
1074 .as_ref()
1075 .and_then(|m| m.mime.as_deref())
1076 .unwrap_or("application/octet-stream");
1077
1078 if !crate::memvid::search::is_text_indexable_mime(mime) {
1079 continue;
1080 }
1081
1082 if frame.payload_length > max_payload {
1083 continue;
1084 }
1085
1086 let text = self.frame_search_text(&frame)?;
1087 if !text.trim().is_empty() {
1088 prepared_docs.push((frame, text));
1089 }
1090 }
1091
1092 if let Some(ref mut engine) = self.tantivy {
1094 for (frame, text) in &prepared_docs {
1095 engine.add_frame(frame, text)?;
1096 }
1097
1098 if !prepared_docs.is_empty() {
1099 engine.commit()?;
1100 self.tantivy_dirty = true;
1101 }
1102
1103 tracing::info!(
1104 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
1105 prepared_docs.len(),
1106 engine.num_docs()
1107 );
1108 } else {
1109 tracing::warn!(
1110 "parallel_commit: Tantivy engine is None after init_tantivy"
1111 );
1112 }
1113 } }
1115
1116 self.file.seek(SeekFrom::Start(self.data_end))?;
1119 let mut time_entries: Vec<TimeIndexEntry> = self
1120 .toc
1121 .frames
1122 .iter()
1123 .filter(|frame| {
1124 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1125 })
1126 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1127 .collect();
1128 let (ti_offset, ti_length, ti_checksum) =
1129 time_index_append(&mut self.file, &mut time_entries)?;
1130 self.toc.time_index = Some(TimeIndexManifest {
1131 bytes_offset: ti_offset,
1132 bytes_length: ti_length,
1133 entry_count: time_entries.len() as u64,
1134 checksum: ti_checksum,
1135 });
1136 self.data_end = ti_offset + ti_length;
1138 self.header.footer_offset = self.data_end;
1139 tracing::info!(
1140 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
1141 ti_offset,
1142 ti_length,
1143 time_entries.len()
1144 );
1145 } else {
1146 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1148 indexes_rebuilt = true;
1149 }
1150 }
1151
1152 #[cfg(feature = "lex")]
1154 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1155 self.flush_tantivy()?;
1156 }
1157
1158 if self.clip_enabled {
1160 if let Some(ref clip_index) = self.clip_index {
1161 if !clip_index.is_empty() {
1162 self.persist_clip_index()?;
1163 }
1164 }
1165 }
1166
1167 if self.memories_track.card_count() > 0 {
1169 self.persist_memories_track()?;
1170 }
1171
1172 if !self.logic_mesh.is_empty() {
1174 self.persist_logic_mesh()?;
1175 }
1176
1177 if !self.sketch_track.is_empty() {
1179 self.persist_sketch_track()?;
1180 }
1181
1182 self.rewrite_toc_footer()?;
1185 self.header.toc_checksum = self.toc.toc_checksum;
1186 self.wal.record_checkpoint(&mut self.header)?;
1187 self.header.toc_checksum = self.toc.toc_checksum;
1188 crate::persist_header(&mut self.file, &self.header)?;
1189 self.file.sync_all()?;
1190 if let Some(wal) = self.manifest_wal.as_mut() {
1191 wal.flush()?;
1192 wal.truncate()?;
1193 }
1194 self.pending_frame_inserts = 0;
1195 self.dirty = false;
1196 Ok(())
1197 }
1198
1199 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1200 let records = self.wal.records_after(self.header.wal_sequence)?;
1201 if records.is_empty() {
1202 if self.tantivy_index_pending() {
1203 self.flush_tantivy()?;
1204 }
1205 return Ok(());
1206 }
1207 let delta = self.apply_records(records)?;
1208 if !delta.is_empty() {
1209 tracing::debug!(
1210 inserted_frames = delta.inserted_frames.len(),
1211 inserted_embeddings = delta.inserted_embeddings.len(),
1212 inserted_time_entries = delta.inserted_time_entries.len(),
1213 "recover applied delta"
1214 );
1215 self.rebuild_indexes(&delta.inserted_embeddings, &delta.inserted_frames)?;
1216 } else if self.tantivy_index_pending() {
1217 self.flush_tantivy()?;
1218 }
1219 self.wal.record_checkpoint(&mut self.header)?;
1220 crate::persist_header(&mut self.file, &self.header)?;
1221 if !delta.is_empty() {
1222 } else if self.tantivy_index_pending() {
1224 self.flush_tantivy()?;
1225 crate::persist_header(&mut self.file, &self.header)?;
1226 }
1227 self.file.sync_all()?;
1228 self.pending_frame_inserts = 0;
1229 self.dirty = false;
1230 Ok(())
1231 }
1232
1233 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1234 let mut delta = IngestionDelta::default();
1235 if records.is_empty() {
1236 return Ok(delta);
1237 }
1238
1239 let mut data_cursor = self.data_end;
1244 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1245
1246 if !records.is_empty() {
1247 self.file.seek(SeekFrom::Start(data_cursor))?;
1248 for record in records {
1249 let mut entry = match decode_wal_entry(&record.payload)? {
1250 WalEntry::Frame(entry) => entry,
1251 #[cfg(feature = "lex")]
1252 WalEntry::Lex(batch) => {
1253 self.apply_lex_wal(batch)?;
1254 continue;
1255 }
1256 };
1257
1258 match entry.op {
1259 FrameWalOp::Insert => {
1260 let frame_id = self.toc.frames.len() as u64;
1261
1262 let (
1263 payload_offset,
1264 payload_length,
1265 checksum_bytes,
1266 canonical_length_value,
1267 ) = if let Some(source_id) = entry.reuse_payload_from {
1268 if !entry.payload.is_empty() {
1269 return Err(MemvidError::InvalidFrame {
1270 frame_id: source_id,
1271 reason: "reused payload entry contained inline bytes",
1272 });
1273 }
1274 let source_idx = usize::try_from(source_id).map_err(|_| {
1275 MemvidError::InvalidFrame {
1276 frame_id: source_id,
1277 reason: "frame id too large for memory",
1278 }
1279 })?;
1280 let source = self.toc.frames.get(source_idx).cloned().ok_or(
1281 MemvidError::InvalidFrame {
1282 frame_id: source_id,
1283 reason: "reused payload source missing",
1284 },
1285 )?;
1286 (
1287 source.payload_offset,
1288 source.payload_length,
1289 source.checksum,
1290 entry
1291 .canonical_length
1292 .or(source.canonical_length)
1293 .unwrap_or(source.payload_length),
1294 )
1295 } else {
1296 self.file.seek(SeekFrom::Start(data_cursor))?;
1297 self.file.write_all(&entry.payload)?;
1298 let checksum = hash(&entry.payload);
1299 let payload_length = entry.payload.len() as u64;
1300 let canonical_length =
1301 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1302 if let Some(len) = entry.canonical_length {
1303 len
1304 } else {
1305 let decoded = crate::decode_canonical_bytes(
1306 &entry.payload,
1307 CanonicalEncoding::Zstd,
1308 frame_id,
1309 )?;
1310 decoded.len() as u64
1311 }
1312 } else {
1313 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1314 };
1315 let payload_offset = data_cursor;
1316 data_cursor += payload_length;
1317 self.cached_payload_end = self.cached_payload_end.max(data_cursor);
1319 (
1320 payload_offset,
1321 payload_length,
1322 *checksum.as_bytes(),
1323 canonical_length,
1324 )
1325 };
1326
1327 let uri = entry
1328 .uri
1329 .clone()
1330 .unwrap_or_else(|| crate::default_uri(frame_id));
1331 let title = entry
1332 .title
1333 .clone()
1334 .or_else(|| crate::infer_title_from_uri(&uri));
1335
1336 #[cfg(feature = "temporal_track")]
1337 let (anchor_ts, anchor_source) =
1338 self.determine_temporal_anchor(entry.timestamp);
1339
1340 let mut frame = Frame {
1341 id: frame_id,
1342 timestamp: entry.timestamp,
1343 anchor_ts: {
1344 #[cfg(feature = "temporal_track")]
1345 {
1346 Some(anchor_ts)
1347 }
1348 #[cfg(not(feature = "temporal_track"))]
1349 {
1350 None
1351 }
1352 },
1353 anchor_source: {
1354 #[cfg(feature = "temporal_track")]
1355 {
1356 Some(anchor_source)
1357 }
1358 #[cfg(not(feature = "temporal_track"))]
1359 {
1360 None
1361 }
1362 },
1363 kind: entry.kind.clone(),
1364 track: entry.track.clone(),
1365 payload_offset,
1366 payload_length,
1367 checksum: checksum_bytes,
1368 uri: Some(uri),
1369 title,
1370 canonical_encoding: entry.canonical_encoding,
1371 canonical_length: Some(canonical_length_value),
1372 metadata: entry.metadata.clone(),
1373 search_text: entry.search_text.clone(),
1374 tags: entry.tags.clone(),
1375 labels: entry.labels.clone(),
1376 extra_metadata: entry.extra_metadata.clone(),
1377 content_dates: entry.content_dates.clone(),
1378 chunk_manifest: entry.chunk_manifest.clone(),
1379 role: entry.role,
1380 parent_id: None,
1381 chunk_index: entry.chunk_index,
1382 chunk_count: entry.chunk_count,
1383 status: FrameStatus::Active,
1384 supersedes: entry.supersedes_frame_id,
1385 superseded_by: None,
1386 source_sha256: entry.source_sha256,
1387 source_path: entry.source_path.clone(),
1388 enrichment_state: entry.enrichment_state,
1389 };
1390
1391 if let Some(parent_seq) = entry.parent_sequence {
1392 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1393 frame.parent_id = Some(*parent_frame_id);
1394 } else {
1395 if entry.role == FrameRole::DocumentChunk {
1401 for &candidate_id in delta.inserted_frames.iter().rev() {
1403 if let Ok(idx) = usize::try_from(candidate_id) {
1404 if let Some(candidate) = self.toc.frames.get(idx) {
1405 if candidate.role == FrameRole::Document
1406 && candidate.chunk_manifest.is_some()
1407 {
1408 frame.parent_id = Some(candidate_id);
1410 tracing::debug!(
1411 chunk_frame_id = frame_id,
1412 parent_frame_id = candidate_id,
1413 parent_seq = parent_seq,
1414 "resolved chunk parent via fallback"
1415 );
1416 break;
1417 }
1418 }
1419 }
1420 }
1421 }
1422 if frame.parent_id.is_none() {
1423 tracing::warn!(
1424 chunk_frame_id = frame_id,
1425 parent_seq = parent_seq,
1426 "chunk has parent_sequence but parent not found in batch"
1427 );
1428 }
1429 }
1430 }
1431
1432 #[cfg(feature = "lex")]
1433 let index_text = if self.tantivy.is_some() {
1434 if let Some(text) = entry.search_text.clone() {
1435 if text.trim().is_empty() {
1436 None
1437 } else {
1438 Some(text)
1439 }
1440 } else {
1441 Some(self.frame_content(&frame)?)
1442 }
1443 } else {
1444 None
1445 };
1446 #[cfg(feature = "lex")]
1447 if let (Some(engine), Some(text)) =
1448 (self.tantivy.as_mut(), index_text.as_ref())
1449 {
1450 engine.add_frame(&frame, text)?;
1451 self.tantivy_dirty = true;
1452
1453 if !text.trim().is_empty() {
1456 let entry = crate::types::generate_sketch(
1457 frame_id,
1458 text,
1459 crate::types::SketchVariant::Small,
1460 None,
1461 );
1462 self.sketch_track.insert(entry);
1463 }
1464 }
1465
1466 if let Some(embedding) = entry.embedding.take() {
1467 delta
1468 .inserted_embeddings
1469 .push((frame_id, embedding.clone()));
1470 }
1471
1472 if entry.role == FrameRole::Document {
1473 delta
1474 .inserted_time_entries
1475 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1476 #[cfg(feature = "temporal_track")]
1477 {
1478 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1479 frame_id,
1480 anchor_ts,
1481 anchor_source,
1482 ));
1483 delta.inserted_temporal_mentions.extend(
1484 Self::collect_temporal_mentions(
1485 entry.search_text.as_deref(),
1486 frame_id,
1487 anchor_ts,
1488 ),
1489 );
1490 }
1491 }
1492
1493 if let Some(predecessor) = frame.supersedes {
1494 self.mark_frame_superseded(predecessor, frame_id)?;
1495 }
1496
1497 self.toc.frames.push(frame);
1498 delta.inserted_frames.push(frame_id);
1499 sequence_to_frame.insert(record.sequence, frame_id);
1500 }
1501 FrameWalOp::Tombstone => {
1502 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1503 frame_id: 0,
1504 reason: "tombstone missing frame reference",
1505 })?;
1506 self.mark_frame_deleted(target)?;
1507 delta.mutated_frames = true;
1508 }
1509 }
1510 }
1511 self.data_end = self.data_end.max(data_cursor);
1512 }
1513
1514 let orphan_resolutions: Vec<(u64, u64)> = delta
1518 .inserted_frames
1519 .iter()
1520 .filter_map(|&frame_id| {
1521 let idx = usize::try_from(frame_id).ok()?;
1522 let frame = self.toc.frames.get(idx)?;
1523 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1524 return None;
1525 }
1526 for candidate_id in (0..frame_id).rev() {
1528 if let Ok(idx) = usize::try_from(candidate_id) {
1529 if let Some(candidate) = self.toc.frames.get(idx) {
1530 if candidate.role == FrameRole::Document
1531 && candidate.chunk_manifest.is_some()
1532 && candidate.status == FrameStatus::Active
1533 {
1534 return Some((frame_id, candidate_id));
1535 }
1536 }
1537 }
1538 }
1539 None
1540 })
1541 .collect();
1542
1543 for (chunk_id, parent_id) in orphan_resolutions {
1545 if let Ok(idx) = usize::try_from(chunk_id) {
1546 if let Some(frame) = self.toc.frames.get_mut(idx) {
1547 frame.parent_id = Some(parent_id);
1548 tracing::debug!(
1549 chunk_frame_id = chunk_id,
1550 parent_frame_id = parent_id,
1551 "resolved orphan chunk parent in second pass"
1552 );
1553 }
1554 }
1555 }
1556
1557 Ok(delta)
1560 }
1561
1562 #[cfg(feature = "temporal_track")]
1563 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1564 (timestamp, AnchorSource::FrameTimestamp)
1565 }
1566
1567 #[cfg(feature = "temporal_track")]
1568 fn collect_temporal_mentions(
1569 text: Option<&str>,
1570 frame_id: FrameId,
1571 anchor_ts: i64,
1572 ) -> Vec<TemporalMention> {
1573 let text = match text {
1574 Some(value) if !value.trim().is_empty() => value,
1575 _ => return Vec::new(),
1576 };
1577
1578 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1579 Ok(ts) => ts,
1580 Err(_) => return Vec::new(),
1581 };
1582
1583 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1584 let normalizer = TemporalNormalizer::new(context);
1585 let mut spans: Vec<(usize, usize)> = Vec::new();
1586 let lower = text.to_ascii_lowercase();
1587
1588 for phrase in STATIC_TEMPORAL_PHRASES {
1589 let mut search_start = 0usize;
1590 while let Some(idx) = lower[search_start..].find(phrase) {
1591 let abs = search_start + idx;
1592 let end = abs + phrase.len();
1593 spans.push((abs, end));
1594 search_start = end;
1595 }
1596 }
1597
1598 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1599 let regex = NUMERIC_DATE.get_or_init(|| {
1600 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1601 });
1602 let regex = match regex {
1603 Ok(re) => re,
1604 Err(msg) => {
1605 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1606 return Vec::new();
1607 }
1608 };
1609 for mat in regex.find_iter(text) {
1610 spans.push((mat.start(), mat.end()));
1611 }
1612
1613 spans.sort_unstable();
1614 spans.dedup();
1615
1616 let mut mentions: Vec<TemporalMention> = Vec::new();
1617 for (start, end) in spans {
1618 if end > text.len() || start >= end {
1619 continue;
1620 }
1621 let raw = &text[start..end];
1622 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1623 if trimmed.is_empty() {
1624 continue;
1625 }
1626 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1627 let finish = offset + trimmed.len();
1628 match normalizer.resolve(trimmed) {
1629 Ok(resolution) => {
1630 mentions.extend(Self::resolution_to_mentions(
1631 resolution, frame_id, offset, finish,
1632 ));
1633 }
1634 Err(_) => continue,
1635 }
1636 }
1637
1638 mentions
1639 }
1640
1641 #[cfg(feature = "temporal_track")]
1642 fn resolution_to_mentions(
1643 resolution: TemporalResolution,
1644 frame_id: FrameId,
1645 byte_start: usize,
1646 byte_end: usize,
1647 ) -> Vec<TemporalMention> {
1648 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1649 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1650 let mut results = Vec::new();
1651
1652 let base_flags = Self::flags_from_resolution(&resolution.flags);
1653 match resolution.value {
1654 TemporalResolutionValue::Date(date) => {
1655 let ts = Self::date_to_timestamp(date);
1656 results.push(TemporalMention::new(
1657 ts,
1658 frame_id,
1659 byte_start,
1660 byte_len,
1661 TemporalMentionKind::Date,
1662 resolution.confidence,
1663 0,
1664 base_flags,
1665 ));
1666 }
1667 TemporalResolutionValue::DateTime(dt) => {
1668 let ts = dt.unix_timestamp();
1669 let tz_hint = dt.offset().whole_minutes() as i16;
1670 results.push(TemporalMention::new(
1671 ts,
1672 frame_id,
1673 byte_start,
1674 byte_len,
1675 TemporalMentionKind::DateTime,
1676 resolution.confidence,
1677 tz_hint,
1678 base_flags,
1679 ));
1680 }
1681 TemporalResolutionValue::DateRange { start, end } => {
1682 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1683 let start_ts = Self::date_to_timestamp(start);
1684 results.push(TemporalMention::new(
1685 start_ts,
1686 frame_id,
1687 byte_start,
1688 byte_len,
1689 TemporalMentionKind::RangeStart,
1690 resolution.confidence,
1691 0,
1692 flags,
1693 ));
1694 let end_ts = Self::date_to_timestamp(end);
1695 results.push(TemporalMention::new(
1696 end_ts,
1697 frame_id,
1698 byte_start,
1699 byte_len,
1700 TemporalMentionKind::RangeEnd,
1701 resolution.confidence,
1702 0,
1703 flags,
1704 ));
1705 }
1706 TemporalResolutionValue::DateTimeRange { start, end } => {
1707 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1708 results.push(TemporalMention::new(
1709 start.unix_timestamp(),
1710 frame_id,
1711 byte_start,
1712 byte_len,
1713 TemporalMentionKind::RangeStart,
1714 resolution.confidence,
1715 start.offset().whole_minutes() as i16,
1716 flags,
1717 ));
1718 results.push(TemporalMention::new(
1719 end.unix_timestamp(),
1720 frame_id,
1721 byte_start,
1722 byte_len,
1723 TemporalMentionKind::RangeEnd,
1724 resolution.confidence,
1725 end.offset().whole_minutes() as i16,
1726 flags,
1727 ));
1728 }
1729 TemporalResolutionValue::Month { year, month } => {
1730 let start_date = match Date::from_calendar_date(year, month, 1) {
1731 Ok(date) => date,
1732 Err(err) => {
1733 tracing::warn!(
1734 target = "memvid::temporal",
1735 %err,
1736 year,
1737 month = month as u8,
1738 "skipping invalid month resolution"
1739 );
1740 return results;
1742 }
1743 };
1744 let end_date = match Self::last_day_in_month(year, month) {
1745 Some(date) => date,
1746 None => {
1747 tracing::warn!(
1748 target = "memvid::temporal",
1749 year,
1750 month = month as u8,
1751 "skipping month resolution with invalid calendar range"
1752 );
1753 return results;
1754 }
1755 };
1756 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1757 results.push(TemporalMention::new(
1758 Self::date_to_timestamp(start_date),
1759 frame_id,
1760 byte_start,
1761 byte_len,
1762 TemporalMentionKind::RangeStart,
1763 resolution.confidence,
1764 0,
1765 flags,
1766 ));
1767 results.push(TemporalMention::new(
1768 Self::date_to_timestamp(end_date),
1769 frame_id,
1770 byte_start,
1771 byte_len,
1772 TemporalMentionKind::RangeEnd,
1773 resolution.confidence,
1774 0,
1775 flags,
1776 ));
1777 }
1778 }
1779
1780 results
1781 }
1782
1783 #[cfg(feature = "temporal_track")]
1784 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1785 let mut result = TemporalMentionFlags::empty();
1786 if flags
1787 .iter()
1788 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1789 {
1790 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1791 }
1792 if flags
1793 .iter()
1794 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1795 {
1796 result = result.set(TemporalMentionFlags::DERIVED, true);
1797 }
1798 result
1799 }
1800
1801 #[cfg(feature = "temporal_track")]
1802 fn date_to_timestamp(date: Date) -> i64 {
1803 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1804 .assume_offset(UtcOffset::UTC)
1805 .unix_timestamp()
1806 }
1807
1808 #[cfg(feature = "temporal_track")]
1809 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1810 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1811 while let Some(next) = date.next_day() {
1812 if next.month() == month {
1813 date = next;
1814 } else {
1815 break;
1816 }
1817 }
1818 Some(date)
1819 }
1820
1821 #[allow(dead_code)]
1822 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1823 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1824 return Ok(false);
1825 }
1826
1827 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1828 Some(artifact) => artifact,
1829 None => return Ok(false),
1830 };
1831
1832 let segment_id = self.toc.segment_catalog.next_segment_id;
1833 #[cfg(feature = "parallel_segments")]
1834 let span =
1835 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1836
1837 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1838 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1839 #[cfg(feature = "parallel_segments")]
1840 if let Some(span) = span {
1841 Self::decorate_segment_common(&mut descriptor.common, span);
1842 }
1843 #[cfg(feature = "parallel_segments")]
1844 let descriptor_for_manifest = descriptor.clone();
1845 self.toc.segment_catalog.lex_segments.push(descriptor);
1846 #[cfg(feature = "parallel_segments")]
1847 if let Err(err) = self.record_index_segment(
1848 SegmentKind::Lexical,
1849 descriptor_for_manifest.common,
1850 SegmentStats {
1851 doc_count: artifact.doc_count,
1852 vector_count: 0,
1853 time_entries: 0,
1854 bytes_uncompressed: artifact.bytes.len() as u64,
1855 build_micros: 0,
1856 },
1857 ) {
1858 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1859 }
1860 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1861 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1862 Ok(true)
1863 }
1864
1865 #[allow(dead_code)]
1866 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1867 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1868 return Ok(false);
1869 }
1870
1871 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1872 Some(artifact) => artifact,
1873 None => return Ok(false),
1874 };
1875
1876 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1877 if existing_dim != artifact.dimension {
1878 return Err(MemvidError::VecDimensionMismatch {
1879 expected: existing_dim,
1880 actual: artifact.dimension as usize,
1881 });
1882 }
1883 }
1884
1885 let segment_id = self.toc.segment_catalog.next_segment_id;
1886 #[cfg(feature = "parallel_segments")]
1887 #[cfg(feature = "parallel_segments")]
1888 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1889
1890 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1891 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1892 #[cfg(feature = "parallel_segments")]
1893 if let Some(span) = span {
1894 Self::decorate_segment_common(&mut descriptor.common, span);
1895 }
1896 #[cfg(feature = "parallel_segments")]
1897 let descriptor_for_manifest = descriptor.clone();
1898 self.toc.segment_catalog.vec_segments.push(descriptor);
1899 #[cfg(feature = "parallel_segments")]
1900 if let Err(err) = self.record_index_segment(
1901 SegmentKind::Vector,
1902 descriptor_for_manifest.common,
1903 SegmentStats {
1904 doc_count: 0,
1905 vector_count: artifact.vector_count,
1906 time_entries: 0,
1907 bytes_uncompressed: artifact.bytes_uncompressed,
1908 build_micros: 0,
1909 },
1910 ) {
1911 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1912 }
1913 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1914 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1915
1916 if self.toc.indexes.vec.is_none() {
1918 let empty_offset = self.data_end;
1919 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1920 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1921 self.toc.indexes.vec = Some(VecIndexManifest {
1922 vector_count: 0,
1923 dimension: 0,
1924 bytes_offset: empty_offset,
1925 bytes_length: 0,
1926 checksum: empty_checksum,
1927 compression_mode: self.vec_compression.clone(),
1928 });
1929 }
1930 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1931 if manifest.dimension == 0 {
1932 manifest.dimension = artifact.dimension;
1933 }
1934 if manifest.bytes_length == 0 {
1935 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1936 manifest.compression_mode = artifact.compression.clone();
1937 }
1938 }
1939
1940 self.vec_enabled = true;
1941 Ok(true)
1942 }
1943
1944 #[allow(dead_code)]
1945 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1946 if delta.inserted_time_entries.is_empty() {
1947 return Ok(false);
1948 }
1949
1950 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1951 Some(artifact) => artifact,
1952 None => return Ok(false),
1953 };
1954
1955 let segment_id = self.toc.segment_catalog.next_segment_id;
1956 #[cfg(feature = "parallel_segments")]
1957 #[cfg(feature = "parallel_segments")]
1958 let span = self.segment_span_from_iter(
1959 delta
1960 .inserted_time_entries
1961 .iter()
1962 .map(|entry| entry.frame_id),
1963 );
1964
1965 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1966 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1967 #[cfg(feature = "parallel_segments")]
1968 if let Some(span) = span {
1969 Self::decorate_segment_common(&mut descriptor.common, span);
1970 }
1971 #[cfg(feature = "parallel_segments")]
1972 let descriptor_for_manifest = descriptor.clone();
1973 self.toc.segment_catalog.time_segments.push(descriptor);
1974 #[cfg(feature = "parallel_segments")]
1975 if let Err(err) = self.record_index_segment(
1976 SegmentKind::Time,
1977 descriptor_for_manifest.common,
1978 SegmentStats {
1979 doc_count: 0,
1980 vector_count: 0,
1981 time_entries: artifact.entry_count,
1982 bytes_uncompressed: artifact.bytes.len() as u64,
1983 build_micros: 0,
1984 },
1985 ) {
1986 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1987 }
1988 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1989 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1990 Ok(true)
1991 }
1992
1993 #[cfg(feature = "temporal_track")]
1994 #[allow(dead_code)]
1995 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1996 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1997 {
1998 return Ok(false);
1999 }
2000
2001 debug_assert!(
2002 delta.inserted_temporal_mentions.len() < 1_000_000,
2003 "temporal delta mentions unexpectedly large: {}",
2004 delta.inserted_temporal_mentions.len()
2005 );
2006 debug_assert!(
2007 delta.inserted_temporal_anchors.len() < 1_000_000,
2008 "temporal delta anchors unexpectedly large: {}",
2009 delta.inserted_temporal_anchors.len()
2010 );
2011
2012 let artifact = match self.build_temporal_segment_from_records(
2013 &delta.inserted_temporal_mentions,
2014 &delta.inserted_temporal_anchors,
2015 )? {
2016 Some(artifact) => artifact,
2017 None => return Ok(false),
2018 };
2019
2020 let segment_id = self.toc.segment_catalog.next_segment_id;
2021 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
2022 self.toc
2023 .segment_catalog
2024 .temporal_segments
2025 .push(descriptor.clone());
2026 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2027 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
2028
2029 self.toc.temporal_track = Some(TemporalTrackManifest {
2030 bytes_offset: descriptor.common.bytes_offset,
2031 bytes_length: descriptor.common.bytes_length,
2032 entry_count: artifact.entry_count,
2033 anchor_count: artifact.anchor_count,
2034 checksum: artifact.checksum,
2035 flags: artifact.flags,
2036 });
2037
2038 self.clear_temporal_track_cache();
2039
2040 Ok(true)
2041 }
2042
2043 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
2044 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2045 frame_id,
2046 reason: "frame id too large",
2047 })?;
2048 let frame = self
2049 .toc
2050 .frames
2051 .get_mut(index)
2052 .ok_or(MemvidError::InvalidFrame {
2053 frame_id,
2054 reason: "supersede target missing",
2055 })?;
2056 frame.status = FrameStatus::Superseded;
2057 frame.superseded_by = Some(successor_id);
2058 self.remove_frame_from_indexes(frame_id)
2059 }
2060
2061 pub(crate) fn rebuild_indexes(
2062 &mut self,
2063 new_vec_docs: &[(FrameId, Vec<f32>)],
2064 inserted_frame_ids: &[FrameId],
2065 ) -> Result<()> {
2066 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
2067 return Ok(());
2068 }
2069
2070 let payload_end = self.payload_region_end();
2071 self.data_end = payload_end;
2072 let safe_truncate_len = self.header.footer_offset.max(payload_end);
2075 if self.file.metadata()?.len() > safe_truncate_len {
2076 self.file.set_len(safe_truncate_len)?;
2077 }
2078 self.file.seek(SeekFrom::Start(payload_end))?;
2079
2080 self.toc.segment_catalog.lex_segments.clear();
2082 self.toc.segment_catalog.vec_segments.clear();
2083 self.toc.segment_catalog.time_segments.clear();
2084 #[cfg(feature = "temporal_track")]
2085 self.toc.segment_catalog.temporal_segments.clear();
2086 #[cfg(feature = "parallel_segments")]
2087 self.toc.segment_catalog.index_segments.clear();
2088 self.toc.segment_catalog.tantivy_segments.clear();
2090 self.toc.indexes.lex_segments.clear();
2092
2093 let mut time_entries: Vec<TimeIndexEntry> = self
2094 .toc
2095 .frames
2096 .iter()
2097 .filter(|frame| {
2098 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
2099 })
2100 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
2101 .collect();
2102 let (ti_offset, ti_length, ti_checksum) =
2103 time_index_append(&mut self.file, &mut time_entries)?;
2104 self.toc.time_index = Some(TimeIndexManifest {
2105 bytes_offset: ti_offset,
2106 bytes_length: ti_length,
2107 entry_count: time_entries.len() as u64,
2108 checksum: ti_checksum,
2109 });
2110
2111 let mut footer_offset = ti_offset + ti_length;
2112
2113 #[cfg(feature = "temporal_track")]
2114 {
2115 self.toc.temporal_track = None;
2116 self.toc.segment_catalog.temporal_segments.clear();
2117 self.clear_temporal_track_cache();
2118 }
2119
2120 if self.lex_enabled {
2121 #[cfg(feature = "lex")]
2122 {
2123 if self.tantivy_dirty {
2124 if let Ok(mut storage) = self.lex_storage.write() {
2128 storage.clear();
2129 storage.set_generation(0);
2130 }
2131 self.init_tantivy()?;
2132 if let Some(mut engine) = self.tantivy.take() {
2133 self.rebuild_tantivy_engine(&mut engine)?;
2134 self.tantivy = Some(engine);
2135 } else {
2136 return Err(MemvidError::InvalidToc {
2137 reason: "tantivy engine missing during rebuild".into(),
2138 });
2139 }
2140 } else if self.tantivy.is_some() && !inserted_frame_ids.is_empty() {
2141 let max_payload = crate::memvid::search::max_index_payload();
2147 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
2148 for &frame_id in inserted_frame_ids {
2149 let frame = match self.toc.frames.get(frame_id as usize) {
2150 Some(f) => f.clone(),
2151 None => continue,
2152 };
2153 if frame.status != FrameStatus::Active {
2154 continue;
2155 }
2156 if let Some(search_text) = frame.search_text.clone() {
2157 if !search_text.trim().is_empty() {
2158 prepared_docs.push((frame, search_text));
2159 continue;
2160 }
2161 }
2162 let mime = frame
2163 .metadata
2164 .as_ref()
2165 .and_then(|m| m.mime.as_deref())
2166 .unwrap_or("application/octet-stream");
2167 if !crate::memvid::search::is_text_indexable_mime(mime) {
2168 continue;
2169 }
2170 if frame.payload_length > max_payload {
2171 continue;
2172 }
2173 let text = self.frame_search_text(&frame)?;
2174 if !text.trim().is_empty() {
2175 prepared_docs.push((frame, text));
2176 }
2177 }
2178 if let Some(engine) = self.tantivy.as_mut() {
2179 for (frame, text) in &prepared_docs {
2180 engine.add_frame(frame, text)?;
2181 }
2182 engine.commit()?;
2183 }
2184 } else {
2185 if let Ok(mut storage) = self.lex_storage.write() {
2188 storage.clear();
2189 storage.set_generation(0);
2190 }
2191 self.init_tantivy()?;
2192 if let Some(mut engine) = self.tantivy.take() {
2193 self.rebuild_tantivy_engine(&mut engine)?;
2194 self.tantivy = Some(engine);
2195 } else {
2196 return Err(MemvidError::InvalidToc {
2197 reason: "tantivy engine missing during rebuild".into(),
2198 });
2199 }
2200 }
2201
2202 self.lex_enabled = true;
2204
2205 self.tantivy_dirty = true;
2207
2208 self.data_end = footer_offset;
2210
2211 self.flush_tantivy()?;
2213
2214 footer_offset = self.header.footer_offset;
2216
2217 self.data_end = payload_end;
2219 }
2220 #[cfg(not(feature = "lex"))]
2221 {
2222 self.toc.indexes.lex = None;
2223 self.toc.indexes.lex_segments.clear();
2224 }
2225 } else {
2226 self.toc.indexes.lex = None;
2228 self.toc.indexes.lex_segments.clear();
2229 #[cfg(feature = "lex")]
2230 if let Ok(mut storage) = self.lex_storage.write() {
2231 storage.clear();
2232 }
2233 }
2234
2235 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2236 let vec_offset = footer_offset;
2237 self.file.seek(SeekFrom::Start(vec_offset))?;
2238 self.file.write_all(&artifact.bytes)?;
2239 footer_offset += artifact.bytes.len() as u64;
2240 self.toc.indexes.vec = Some(VecIndexManifest {
2241 vector_count: artifact.vector_count,
2242 dimension: artifact.dimension,
2243 bytes_offset: vec_offset,
2244 bytes_length: artifact.bytes.len() as u64,
2245 checksum: artifact.checksum,
2246 compression_mode: self.vec_compression.clone(),
2247 });
2248 self.vec_index = Some(index);
2249 } else {
2250 if !self.vec_enabled {
2252 self.toc.indexes.vec = None;
2253 }
2254 self.vec_index = None;
2255 }
2256
2257 if self.clip_enabled {
2259 if let Some(ref clip_index) = self.clip_index {
2260 if !clip_index.is_empty() {
2261 let artifact = clip_index.encode()?;
2262 let clip_offset = footer_offset;
2263 self.file.seek(SeekFrom::Start(clip_offset))?;
2264 self.file.write_all(&artifact.bytes)?;
2265 footer_offset += artifact.bytes.len() as u64;
2266 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2267 bytes_offset: clip_offset,
2268 bytes_length: artifact.bytes.len() as u64,
2269 vector_count: artifact.vector_count,
2270 dimension: artifact.dimension,
2271 checksum: artifact.checksum,
2272 model_name: crate::clip::default_model_info().name.to_string(),
2273 });
2274 tracing::info!(
2275 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2276 artifact.vector_count,
2277 clip_offset
2278 );
2279 }
2280 }
2281 } else {
2282 self.toc.indexes.clip = None;
2283 }
2284
2285 if self.memories_track.card_count() > 0 {
2287 let memories_offset = footer_offset;
2288 let memories_bytes = self.memories_track.serialize()?;
2289 let memories_checksum = blake3::hash(&memories_bytes).into();
2290 self.file.seek(SeekFrom::Start(memories_offset))?;
2291 self.file.write_all(&memories_bytes)?;
2292 footer_offset += memories_bytes.len() as u64;
2293
2294 let stats = self.memories_track.stats();
2295 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2296 bytes_offset: memories_offset,
2297 bytes_length: memories_bytes.len() as u64,
2298 card_count: stats.card_count as u64,
2299 entity_count: stats.entity_count as u64,
2300 checksum: memories_checksum,
2301 });
2302 } else {
2303 self.toc.memories_track = None;
2304 }
2305
2306 if self.logic_mesh.is_empty() {
2308 self.toc.logic_mesh = None;
2309 } else {
2310 let mesh_offset = footer_offset;
2311 let mesh_bytes = self.logic_mesh.serialize()?;
2312 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2313 self.file.seek(SeekFrom::Start(mesh_offset))?;
2314 self.file.write_all(&mesh_bytes)?;
2315 footer_offset += mesh_bytes.len() as u64;
2316
2317 let stats = self.logic_mesh.stats();
2318 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2319 bytes_offset: mesh_offset,
2320 bytes_length: mesh_bytes.len() as u64,
2321 node_count: stats.node_count as u64,
2322 edge_count: stats.edge_count as u64,
2323 checksum: mesh_checksum,
2324 });
2325 }
2326
2327 tracing::info!(
2329 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2330 ti_offset,
2331 ti_length,
2332 footer_offset,
2333 self.header.footer_offset
2334 );
2335
2336 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2339
2340 if self.file.metadata()?.len() < self.header.footer_offset {
2342 self.file.set_len(self.header.footer_offset)?;
2343 }
2344
2345 self.rewrite_toc_footer()?;
2346 self.header.toc_checksum = self.toc.toc_checksum;
2347 crate::persist_header(&mut self.file, &self.header)?;
2348
2349 #[cfg(feature = "lex")]
2350 if self.lex_enabled {
2351 if let Some(ref engine) = self.tantivy {
2352 let doc_count = engine.num_docs();
2353 let active_frame_count = self
2354 .toc
2355 .frames
2356 .iter()
2357 .filter(|f| f.status == FrameStatus::Active)
2358 .count();
2359
2360 let text_indexable_count = self
2363 .toc
2364 .frames
2365 .iter()
2366 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2367 .count();
2368
2369 if doc_count == 0 && text_indexable_count > 0 {
2372 return Err(MemvidError::Doctor {
2373 reason: format!(
2374 "Lex index rebuild failed: 0 documents indexed from {text_indexable_count} text-indexable frames. \
2375 This indicates a critical failure in the rebuild process."
2376 ),
2377 });
2378 }
2379
2380 log::info!(
2382 "✓ Doctor lex index rebuild succeeded: {doc_count} docs from {active_frame_count} frames ({text_indexable_count} text-indexable)"
2383 );
2384 }
2385 }
2386
2387 Ok(())
2388 }
2389
2390 fn persist_memories_track(&mut self) -> Result<()> {
2395 if self.memories_track.card_count() == 0 {
2396 self.toc.memories_track = None;
2397 return Ok(());
2398 }
2399
2400 let memories_offset = self.header.footer_offset;
2402 let memories_bytes = self.memories_track.serialize()?;
2403 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2404
2405 self.file.seek(SeekFrom::Start(memories_offset))?;
2406 self.file.write_all(&memories_bytes)?;
2407
2408 let stats = self.memories_track.stats();
2409 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2410 bytes_offset: memories_offset,
2411 bytes_length: memories_bytes.len() as u64,
2412 card_count: stats.card_count as u64,
2413 entity_count: stats.entity_count as u64,
2414 checksum: memories_checksum,
2415 });
2416
2417 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2419
2420 if self.file.metadata()?.len() < self.header.footer_offset {
2422 self.file.set_len(self.header.footer_offset)?;
2423 }
2424
2425 Ok(())
2426 }
2427
2428 fn persist_clip_index(&mut self) -> Result<()> {
2433 if !self.clip_enabled {
2434 self.toc.indexes.clip = None;
2435 return Ok(());
2436 }
2437
2438 let clip_index = match &self.clip_index {
2439 Some(idx) if !idx.is_empty() => idx,
2440 _ => {
2441 self.toc.indexes.clip = None;
2442 return Ok(());
2443 }
2444 };
2445
2446 let artifact = clip_index.encode()?;
2448
2449 let clip_offset = self.header.footer_offset;
2451 self.file.seek(SeekFrom::Start(clip_offset))?;
2452 self.file.write_all(&artifact.bytes)?;
2453
2454 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2455 bytes_offset: clip_offset,
2456 bytes_length: artifact.bytes.len() as u64,
2457 vector_count: artifact.vector_count,
2458 dimension: artifact.dimension,
2459 checksum: artifact.checksum,
2460 model_name: crate::clip::default_model_info().name.to_string(),
2461 });
2462
2463 tracing::info!(
2464 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2465 artifact.vector_count,
2466 clip_offset
2467 );
2468
2469 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2471
2472 if self.file.metadata()?.len() < self.header.footer_offset {
2474 self.file.set_len(self.header.footer_offset)?;
2475 }
2476
2477 Ok(())
2478 }
2479
2480 fn persist_logic_mesh(&mut self) -> Result<()> {
2485 if self.logic_mesh.is_empty() {
2486 self.toc.logic_mesh = None;
2487 return Ok(());
2488 }
2489
2490 let mesh_offset = self.header.footer_offset;
2492 let mesh_bytes = self.logic_mesh.serialize()?;
2493 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2494
2495 self.file.seek(SeekFrom::Start(mesh_offset))?;
2496 self.file.write_all(&mesh_bytes)?;
2497
2498 let stats = self.logic_mesh.stats();
2499 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2500 bytes_offset: mesh_offset,
2501 bytes_length: mesh_bytes.len() as u64,
2502 node_count: stats.node_count as u64,
2503 edge_count: stats.edge_count as u64,
2504 checksum: mesh_checksum,
2505 });
2506
2507 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2509
2510 if self.file.metadata()?.len() < self.header.footer_offset {
2512 self.file.set_len(self.header.footer_offset)?;
2513 }
2514
2515 Ok(())
2516 }
2517
2518 fn persist_sketch_track(&mut self) -> Result<()> {
2523 if self.sketch_track.is_empty() {
2524 self.toc.sketch_track = None;
2525 return Ok(());
2526 }
2527
2528 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2530
2531 let (sketch_offset, sketch_length, sketch_checksum) =
2533 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2534
2535 let stats = self.sketch_track.stats();
2536 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2537 bytes_offset: sketch_offset,
2538 bytes_length: sketch_length,
2539 entry_count: stats.entry_count,
2540 #[allow(clippy::cast_possible_truncation)]
2541 entry_size: stats.variant.entry_size() as u16,
2542 flags: 0,
2543 checksum: sketch_checksum,
2544 });
2545
2546 self.header.footer_offset = sketch_offset + sketch_length;
2548
2549 if self.file.metadata()?.len() < self.header.footer_offset {
2551 self.file.set_len(self.header.footer_offset)?;
2552 }
2553
2554 tracing::debug!(
2555 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2556 stats.entry_count,
2557 sketch_offset
2558 );
2559
2560 Ok(())
2561 }
2562
2563 #[cfg(feature = "lex")]
2564 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2565 let LexWalBatch {
2566 generation,
2567 doc_count,
2568 checksum,
2569 segments,
2570 } = batch;
2571
2572 if let Ok(mut storage) = self.lex_storage.write() {
2573 storage.replace(doc_count, checksum, segments);
2574 storage.set_generation(generation);
2575 }
2576
2577 self.persist_lex_manifest()
2578 }
2579
2580 #[cfg(feature = "lex")]
2581 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2582 let payload = encode_to_vec(WalEntry::Lex(batch.clone()), wal_config())?;
2583 self.append_wal_entry(&payload)?;
2584 Ok(())
2585 }
2586
2587 #[cfg(feature = "lex")]
2588 fn persist_lex_manifest(&mut self) -> Result<()> {
2589 let (index_manifest, segments) = if let Ok(storage) = self.lex_storage.read() {
2590 storage.to_manifest()
2591 } else {
2592 (None, Vec::new())
2593 };
2594
2595 if let Some(storage_manifest) = index_manifest {
2597 self.toc.indexes.lex = Some(storage_manifest);
2599 } else {
2600 self.toc.indexes.lex = None;
2603 }
2604
2605 self.toc.indexes.lex_segments = segments;
2606
2607 self.rewrite_toc_footer()?;
2611 self.header.toc_checksum = self.toc.toc_checksum;
2612 crate::persist_header(&mut self.file, &self.header)?;
2613 Ok(())
2614 }
2615
2616 #[cfg(feature = "lex")]
2617 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2618 let TantivySnapshot {
2619 doc_count,
2620 checksum,
2621 segments,
2622 } = snapshot;
2623
2624 let mut footer_offset = self.data_end;
2625 self.file.seek(SeekFrom::Start(footer_offset))?;
2626
2627 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2628 for segment in segments {
2629 let bytes_length = segment.bytes.len() as u64;
2630 self.file.write_all(&segment.bytes)?;
2631 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2633 path: segment.path,
2634 bytes_offset: footer_offset,
2635 bytes_length,
2636 checksum: segment.checksum,
2637 });
2638 footer_offset += bytes_length;
2639 }
2640 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2645
2646 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2647 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2648 Vec::with_capacity(embedded_segments.len());
2649 for segment in &embedded_segments {
2650 let descriptor = TantivySegmentDescriptor::from_common(
2651 SegmentCommon::new(
2652 next_segment_id,
2653 segment.bytes_offset,
2654 segment.bytes_length,
2655 segment.checksum,
2656 ),
2657 segment.path.clone(),
2658 );
2659 catalog_segments.push(descriptor);
2660 next_segment_id = next_segment_id.saturating_add(1);
2661 }
2662 if catalog_segments.is_empty() {
2663 self.toc.segment_catalog.tantivy_segments.clear();
2664 } else {
2665 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2666 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2667 }
2668 self.toc.segment_catalog.next_segment_id = next_segment_id;
2669
2670 let generation = self
2677 .lex_storage
2678 .write()
2679 .map_err(|_| MemvidError::Tantivy {
2680 reason: "embedded lex storage lock poisoned".into(),
2681 })
2682 .map(|mut storage| {
2683 storage.replace(doc_count, checksum, embedded_segments.clone());
2684 storage.generation()
2685 })?;
2686
2687 let batch = LexWalBatch {
2688 generation,
2689 doc_count,
2690 checksum,
2691 segments: embedded_segments.clone(),
2692 };
2693 self.append_lex_batch(&batch)?;
2694 self.persist_lex_manifest()?;
2695 self.header.toc_checksum = self.toc.toc_checksum;
2696 crate::persist_header(&mut self.file, &self.header)?;
2697 Ok(())
2698 }
2699
2700 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2701 let index = usize::try_from(frame_id).map_err(|_| MemvidError::InvalidFrame {
2702 frame_id,
2703 reason: "frame id too large",
2704 })?;
2705 let frame = self
2706 .toc
2707 .frames
2708 .get_mut(index)
2709 .ok_or(MemvidError::InvalidFrame {
2710 frame_id,
2711 reason: "delete target missing",
2712 })?;
2713 frame.status = FrameStatus::Deleted;
2714 frame.superseded_by = None;
2715 self.remove_frame_from_indexes(frame_id)
2716 }
2717
2718 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2719 #[cfg(feature = "lex")]
2720 if let Some(engine) = self.tantivy.as_mut() {
2721 engine.delete_frame(frame_id)?;
2722 self.tantivy_dirty = true;
2723 }
2724 if let Some(index) = self.lex_index.as_mut() {
2725 index.remove_document(frame_id);
2726 }
2727 if let Some(index) = self.vec_index.as_mut() {
2728 index.remove(frame_id);
2729 }
2730 Ok(())
2731 }
2732
2733 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2734 let Ok(index) = usize::try_from(frame_id) else {
2735 return false;
2736 };
2737 self.toc
2738 .frames
2739 .get(index)
2740 .is_some_and(|frame| frame.status == FrameStatus::Active)
2741 }
2742
2743 #[cfg(feature = "parallel_segments")]
2744 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2745 where
2746 I: IntoIterator<Item = FrameId>,
2747 {
2748 let mut iter = iter.into_iter();
2749 let first_id = iter.next()?;
2750 let first_frame = self.toc.frames.get(first_id as usize);
2751 let mut min_id = first_id;
2752 let mut max_id = first_id;
2753 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2754 let mut page_end = first_frame
2755 .and_then(|frame| frame.chunk_count)
2756 .map(|count| page_start + count.saturating_sub(1))
2757 .unwrap_or(page_start);
2758 for frame_id in iter {
2759 if frame_id < min_id {
2760 min_id = frame_id;
2761 }
2762 if frame_id > max_id {
2763 max_id = frame_id;
2764 }
2765 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2766 if let Some(idx) = frame.chunk_index {
2767 page_start = page_start.min(idx);
2768 if let Some(count) = frame.chunk_count {
2769 let end = idx + count.saturating_sub(1);
2770 page_end = page_end.max(end);
2771 } else {
2772 page_end = page_end.max(idx);
2773 }
2774 }
2775 }
2776 }
2777 Some(SegmentSpan {
2778 frame_start: min_id,
2779 frame_end: max_id,
2780 page_start,
2781 page_end,
2782 ..SegmentSpan::default()
2783 })
2784 }
2785
2786 #[cfg(feature = "parallel_segments")]
2787 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2788 common.span = Some(span);
2789 if common.codec_version == 0 {
2790 common.codec_version = 1;
2791 }
2792 }
2793
2794 #[cfg(feature = "parallel_segments")]
2795 pub(crate) fn record_index_segment(
2796 &mut self,
2797 kind: SegmentKind,
2798 common: SegmentCommon,
2799 stats: SegmentStats,
2800 ) -> Result<()> {
2801 let entry = IndexSegmentRef {
2802 kind,
2803 common,
2804 stats,
2805 };
2806 self.toc.segment_catalog.index_segments.push(entry.clone());
2807 if let Some(wal) = self.manifest_wal.as_mut() {
2808 wal.append_segments(&[entry])?;
2809 }
2810 Ok(())
2811 }
2812
2813 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2814 self.ensure_writable()?;
2815 if self.toc.ticket_ref.issuer == "free-tier" {
2816 return Ok(());
2817 }
2818 match self.tier() {
2819 Tier::Free => Ok(()),
2820 tier => {
2821 if self.toc.ticket_ref.issuer.trim().is_empty() {
2822 Err(MemvidError::TicketRequired { tier })
2823 } else {
2824 Ok(())
2825 }
2826 }
2827 }
2828 }
2829
2830 pub(crate) fn tier(&self) -> Tier {
2831 if self.header.wal_size >= WAL_SIZE_LARGE {
2832 Tier::Enterprise
2833 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2834 Tier::Dev
2835 } else {
2836 Tier::Free
2837 }
2838 }
2839
2840 pub(crate) fn capacity_limit(&self) -> u64 {
2841 if self.toc.ticket_ref.capacity_bytes != 0 {
2842 self.toc.ticket_ref.capacity_bytes
2843 } else {
2844 self.tier().capacity_bytes()
2845 }
2846 }
2847
2848 #[must_use]
2853 pub fn get_capacity(&self) -> u64 {
2854 self.capacity_limit()
2855 }
2856
2857 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2858 tracing::info!(
2859 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2860 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2861 time_segments = self.toc.segment_catalog.time_segments.len(),
2862 footer_offset = self.header.footer_offset,
2863 data_end = self.data_end,
2864 "rewrite_toc_footer: about to serialize TOC"
2865 );
2866 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2867 let footer_offset = self.header.footer_offset;
2868 self.file.seek(SeekFrom::Start(footer_offset))?;
2869 self.file.write_all(&toc_bytes)?;
2870 let footer = CommitFooter {
2871 toc_len: toc_bytes.len() as u64,
2872 toc_hash: *hash(&toc_bytes).as_bytes(),
2873 generation: self.generation,
2874 };
2875 let encoded_footer = footer.encode();
2876 self.file.write_all(&encoded_footer)?;
2877
2878 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2880 let min_len = self.header.wal_offset + self.header.wal_size;
2881 let final_len = new_len.max(min_len);
2882
2883 if new_len < min_len {
2884 tracing::warn!(
2885 file.new_len = new_len,
2886 file.min_len = min_len,
2887 file.final_len = final_len,
2888 "truncation would cut into WAL region, clamping to min_len"
2889 );
2890 }
2891
2892 self.file.set_len(final_len)?;
2893 self.file.sync_all()?;
2895 Ok(())
2896 }
2897}
2898
2899#[cfg(feature = "parallel_segments")]
2900impl Memvid {
2901 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2902 let chunks = self.collect_segment_chunks(delta)?;
2903 if chunks.is_empty() {
2904 return Ok(false);
2905 }
2906 let planner = SegmentPlanner::new(opts.clone());
2907 let plans = planner.plan_from_chunks(chunks);
2908 if plans.is_empty() {
2909 return Ok(false);
2910 }
2911 let worker_pool = SegmentWorkerPool::new(opts);
2912 let results = worker_pool.execute(plans)?;
2913 if results.is_empty() {
2914 return Ok(false);
2915 }
2916 self.append_parallel_segments(results)?;
2917 Ok(true)
2918 }
2919
2920 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2921 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2922 delta.inserted_embeddings.iter().cloned().collect();
2923 tracing::info!(
2924 inserted_frames = ?delta.inserted_frames,
2925 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2926 "collect_segment_chunks: comparing frame IDs"
2927 );
2928 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2929 for frame_id in &delta.inserted_frames {
2930 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2931 MemvidError::InvalidFrame {
2932 frame_id: *frame_id,
2933 reason: "frame id out of range while planning segments",
2934 },
2935 )?;
2936 let text = self.frame_content(&frame)?;
2937 if text.trim().is_empty() {
2938 continue;
2939 }
2940 let token_estimate = estimate_tokens(&text);
2941 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2942 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2943 let page_start = if frame.chunk_index.is_some() {
2944 chunk_index + 1
2945 } else {
2946 0
2947 };
2948 let page_end = if frame.chunk_index.is_some() {
2949 page_start
2950 } else {
2951 0
2952 };
2953 chunks.push(SegmentChunkPlan {
2954 text,
2955 frame_id: *frame_id,
2956 timestamp: frame.timestamp,
2957 chunk_index,
2958 chunk_count: chunk_count.max(1),
2959 token_estimate,
2960 token_start: 0,
2961 token_end: 0,
2962 page_start,
2963 page_end,
2964 embedding: embedding_map.remove(frame_id),
2965 });
2966 }
2967 Ok(chunks)
2968 }
2969}
2970
2971#[cfg(feature = "parallel_segments")]
2972fn estimate_tokens(text: &str) -> usize {
2973 text.split_whitespace().count().max(1)
2974}
2975
2976impl Memvid {
2977 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2978 let catalog_end = self.catalog_data_end();
2979 if catalog_end <= self.header.footer_offset {
2980 return Ok(false);
2981 }
2982 self.header.footer_offset = catalog_end;
2983 self.rewrite_toc_footer()?;
2984 self.header.toc_checksum = self.toc.toc_checksum;
2985 crate::persist_header(&mut self.file, &self.header)?;
2986 Ok(true)
2987 }
2988}
2989
2990impl Memvid {
2991 pub fn vacuum(&mut self) -> Result<()> {
2992 self.commit()?;
2993
2994 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2995 let frames: Vec<Frame> = self
2996 .toc
2997 .frames
2998 .iter()
2999 .filter(|frame| frame.status == FrameStatus::Active)
3000 .cloned()
3001 .collect();
3002 for frame in frames {
3003 let bytes = self.read_frame_payload_bytes(&frame)?;
3004 active_payloads.insert(frame.id, bytes);
3005 }
3006
3007 let mut cursor = self.header.wal_offset + self.header.wal_size;
3008 self.file.seek(SeekFrom::Start(cursor))?;
3009 for frame in &mut self.toc.frames {
3010 if frame.status == FrameStatus::Active {
3011 if let Some(bytes) = active_payloads.get(&frame.id) {
3012 self.file.write_all(bytes)?;
3013 frame.payload_offset = cursor;
3014 frame.payload_length = bytes.len() as u64;
3015 cursor += bytes.len() as u64;
3016 } else {
3017 frame.payload_offset = 0;
3018 frame.payload_length = 0;
3019 }
3020 } else {
3021 frame.payload_offset = 0;
3022 frame.payload_length = 0;
3023 }
3024 }
3025
3026 self.data_end = cursor;
3027
3028 self.toc.segments.clear();
3029 self.toc.indexes.lex_segments.clear();
3030 self.toc.segment_catalog.lex_segments.clear();
3031 self.toc.segment_catalog.vec_segments.clear();
3032 self.toc.segment_catalog.time_segments.clear();
3033 #[cfg(feature = "temporal_track")]
3034 {
3035 self.toc.temporal_track = None;
3036 self.toc.segment_catalog.temporal_segments.clear();
3037 }
3038 #[cfg(feature = "lex")]
3039 {
3040 self.toc.segment_catalog.tantivy_segments.clear();
3041 }
3042 #[cfg(feature = "parallel_segments")]
3043 {
3044 self.toc.segment_catalog.index_segments.clear();
3045 }
3046
3047 #[cfg(feature = "lex")]
3049 {
3050 self.tantivy = None;
3051 self.tantivy_dirty = false;
3052 }
3053
3054 self.rebuild_indexes(&[], &[])?;
3055 self.file.sync_all()?;
3056 Ok(())
3057 }
3058
3059 #[must_use]
3077 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
3078 plan_document_chunks(payload).map(|plan| plan.chunks)
3079 }
3080
3081 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
3083 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
3084 }
3085
3086 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
3088 self.put_internal(Some(payload), None, None, None, options, None)
3089 }
3090
3091 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
3093 self.put_internal(
3094 Some(payload),
3095 None,
3096 Some(embedding),
3097 None,
3098 PutOptions::default(),
3099 None,
3100 )
3101 }
3102
3103 pub fn put_with_embedding_and_options(
3104 &mut self,
3105 payload: &[u8],
3106 embedding: Vec<f32>,
3107 options: PutOptions,
3108 ) -> Result<u64> {
3109 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
3110 }
3111
3112 pub fn put_with_chunk_embeddings(
3125 &mut self,
3126 payload: &[u8],
3127 parent_embedding: Option<Vec<f32>>,
3128 chunk_embeddings: Vec<Vec<f32>>,
3129 options: PutOptions,
3130 ) -> Result<u64> {
3131 self.put_internal(
3132 Some(payload),
3133 None,
3134 parent_embedding,
3135 Some(chunk_embeddings),
3136 options,
3137 None,
3138 )
3139 }
3140
3141 pub fn update_frame(
3143 &mut self,
3144 frame_id: FrameId,
3145 payload: Option<Vec<u8>>,
3146 mut options: PutOptions,
3147 embedding: Option<Vec<f32>>,
3148 ) -> Result<u64> {
3149 self.ensure_mutation_allowed()?;
3150 let existing = self.frame_by_id(frame_id)?;
3151 if existing.status != FrameStatus::Active {
3152 return Err(MemvidError::InvalidFrame {
3153 frame_id,
3154 reason: "frame is not active",
3155 });
3156 }
3157
3158 if options.timestamp.is_none() {
3159 options.timestamp = Some(existing.timestamp);
3160 }
3161 if options.track.is_none() {
3162 options.track = existing.track.clone();
3163 }
3164 if options.kind.is_none() {
3165 options.kind = existing.kind.clone();
3166 }
3167 if options.uri.is_none() {
3168 options.uri = existing.uri.clone();
3169 }
3170 if options.title.is_none() {
3171 options.title = existing.title.clone();
3172 }
3173 if options.metadata.is_none() {
3174 options.metadata = existing.metadata.clone();
3175 }
3176 if options.search_text.is_none() {
3177 options.search_text = existing.search_text.clone();
3178 }
3179 if options.tags.is_empty() {
3180 options.tags = existing.tags.clone();
3181 }
3182 if options.labels.is_empty() {
3183 options.labels = existing.labels.clone();
3184 }
3185 if options.extra_metadata.is_empty() {
3186 options.extra_metadata = existing.extra_metadata.clone();
3187 }
3188
3189 let reuse_frame = if payload.is_none() {
3190 options.auto_tag = false;
3191 options.extract_dates = false;
3192 Some(existing.clone())
3193 } else {
3194 None
3195 };
3196
3197 let effective_embedding = if let Some(explicit) = embedding {
3198 Some(explicit)
3199 } else if self.vec_enabled {
3200 self.frame_embedding(frame_id)?
3201 } else {
3202 None
3203 };
3204
3205 let payload_slice = payload.as_deref();
3206 let reuse_flag = reuse_frame.is_some();
3207 let replace_flag = payload_slice.is_some();
3208 let seq = self.put_internal(
3209 payload_slice,
3210 reuse_frame,
3211 effective_embedding,
3212 None, options,
3214 Some(frame_id),
3215 )?;
3216 info!(
3217 "frame_update frame_id={frame_id} seq={seq} reused_payload={reuse_flag} replaced_payload={replace_flag}"
3218 );
3219 Ok(seq)
3220 }
3221
3222 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
3223 self.ensure_mutation_allowed()?;
3224 let frame = self.frame_by_id(frame_id)?;
3225 if frame.status != FrameStatus::Active {
3226 return Err(MemvidError::InvalidFrame {
3227 frame_id,
3228 reason: "frame is not active",
3229 });
3230 }
3231
3232 let mut tombstone = WalEntryData {
3233 timestamp: SystemTime::now()
3234 .duration_since(UNIX_EPOCH)
3235 .map(|d| d.as_secs() as i64)
3236 .unwrap_or(frame.timestamp),
3237 kind: None,
3238 track: None,
3239 payload: Vec::new(),
3240 embedding: None,
3241 uri: frame.uri.clone(),
3242 title: frame.title.clone(),
3243 canonical_encoding: frame.canonical_encoding,
3244 canonical_length: frame.canonical_length,
3245 metadata: None,
3246 search_text: None,
3247 tags: Vec::new(),
3248 labels: Vec::new(),
3249 extra_metadata: BTreeMap::new(),
3250 content_dates: Vec::new(),
3251 chunk_manifest: None,
3252 role: frame.role,
3253 parent_sequence: None,
3254 chunk_index: frame.chunk_index,
3255 chunk_count: frame.chunk_count,
3256 op: FrameWalOp::Tombstone,
3257 target_frame_id: Some(frame_id),
3258 supersedes_frame_id: None,
3259 reuse_payload_from: None,
3260 source_sha256: None,
3261 source_path: None,
3262 enrichment_state: crate::types::EnrichmentState::default(),
3263 };
3264 tombstone.kind = frame.kind.clone();
3265 tombstone.track = frame.track.clone();
3266
3267 let payload_bytes = encode_to_vec(WalEntry::Frame(tombstone), wal_config())?;
3268 let seq = self.append_wal_entry(&payload_bytes)?;
3269 self.dirty = true;
3270 let suppress_checkpoint = self
3271 .batch_opts
3272 .as_ref()
3273 .is_some_and(|o| o.disable_auto_checkpoint);
3274 if !suppress_checkpoint && self.wal.should_checkpoint() {
3275 self.commit()?;
3276 }
3277 info!("frame_delete frame_id={frame_id} seq={seq}");
3278 Ok(seq)
3279 }
3280}
3281
3282impl Memvid {
3283 fn put_internal(
3284 &mut self,
3285 payload: Option<&[u8]>,
3286 reuse_frame: Option<Frame>,
3287 embedding: Option<Vec<f32>>,
3288 chunk_embeddings: Option<Vec<Vec<f32>>>,
3289 mut options: PutOptions,
3290 supersedes: Option<FrameId>,
3291 ) -> Result<u64> {
3292 self.ensure_mutation_allowed()?;
3293
3294 if options.dedup {
3296 if let Some(bytes) = payload {
3297 let content_hash = hash(bytes);
3298 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3299 tracing::debug!(
3301 frame_id = existing_frame.id,
3302 "dedup: skipping ingestion, identical content already exists"
3303 );
3304 return Ok(existing_frame.id);
3306 }
3307 }
3308 }
3309
3310 if payload.is_some() && reuse_frame.is_some() {
3311 let frame_id = reuse_frame
3312 .as_ref()
3313 .map(|frame| frame.id)
3314 .unwrap_or_default();
3315 return Err(MemvidError::InvalidFrame {
3316 frame_id,
3317 reason: "cannot reuse payload when bytes are provided",
3318 });
3319 }
3320
3321 let incoming_dimension = {
3324 let mut dim: Option<u32> = None;
3325
3326 if let Some(ref vector) = embedding {
3327 if !vector.is_empty() {
3328 #[allow(clippy::cast_possible_truncation)]
3329 let len = vector.len() as u32;
3330 dim = Some(len);
3331 }
3332 }
3333
3334 if let Some(ref vectors) = chunk_embeddings {
3335 for vector in vectors {
3336 if vector.is_empty() {
3337 continue;
3338 }
3339 let vec_dim = u32::try_from(vector.len()).unwrap_or(0);
3340 match dim {
3341 None => dim = Some(vec_dim),
3342 Some(existing) if existing == vec_dim => {}
3343 Some(existing) => {
3344 return Err(MemvidError::VecDimensionMismatch {
3345 expected: existing,
3346 actual: vector.len(),
3347 });
3348 }
3349 }
3350 }
3351 }
3352
3353 dim
3354 };
3355
3356 if let Some(incoming_dimension) = incoming_dimension {
3357 if !self.vec_enabled {
3359 self.enable_vec()?;
3360 }
3361
3362 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3363 if existing_dimension != incoming_dimension {
3364 return Err(MemvidError::VecDimensionMismatch {
3365 expected: existing_dimension,
3366 actual: incoming_dimension as usize,
3367 });
3368 }
3369 }
3370
3371 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3373 if manifest.dimension == 0 {
3374 manifest.dimension = incoming_dimension;
3375 }
3376 }
3377 }
3378
3379 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3380 let payload_tail = self.payload_region_end();
3381 let projected = if let Some(bytes) = payload {
3382 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3383 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3384 } else {
3385 prepare_canonical_payload(bytes)?
3386 };
3387 let len = prepared.len();
3388 prepared_payload = Some((prepared, encoding, length));
3389 payload_tail.saturating_add(len as u64)
3390 } else if reuse_frame.is_some() {
3391 payload_tail
3392 } else {
3393 return Err(MemvidError::InvalidFrame {
3394 frame_id: 0,
3395 reason: "payload required for frame insertion",
3396 });
3397 };
3398
3399 let capacity_limit = self.capacity_limit();
3400 if projected > capacity_limit {
3401 let incoming_size = projected.saturating_sub(payload_tail);
3402 return Err(MemvidError::CapacityExceeded {
3403 current: payload_tail,
3404 limit: capacity_limit,
3405 required: incoming_size,
3406 });
3407 }
3408 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3409 SystemTime::now()
3410 .duration_since(UNIX_EPOCH)
3411 .map(|d| d.as_secs() as i64)
3412 .unwrap_or(0)
3413 });
3414
3415 #[allow(unused_assignments)]
3416 let mut reuse_bytes: Option<Vec<u8>> = None;
3417 let payload_for_processing = if let Some(bytes) = payload {
3418 Some(bytes)
3419 } else if let Some(frame) = reuse_frame.as_ref() {
3420 let bytes = self.frame_canonical_bytes(frame)?;
3421 reuse_bytes = Some(bytes);
3422 reuse_bytes.as_deref()
3423 } else {
3424 None
3425 };
3426
3427 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3429 (Some(bytes), None) => plan_document_chunks(bytes),
3430 _ => None,
3431 };
3432
3433 let mut source_sha256: Option<[u8; 32]> = None;
3437 let source_path_value = options.source_path.take();
3438
3439 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3440 if raw_chunk_plan.is_some() {
3441 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3443 } else if options.no_raw {
3444 if let Some(bytes) = payload {
3446 let hash_result = hash(bytes);
3448 source_sha256 = Some(*hash_result.as_bytes());
3449 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3451 } else {
3452 return Err(MemvidError::InvalidFrame {
3453 frame_id: 0,
3454 reason: "payload required for --no-raw mode",
3455 });
3456 }
3457 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3458 (prepared, encoding, length, None)
3459 } else if let Some(bytes) = payload {
3460 let (prepared, encoding, length) = if let Some(ref opts) = self.batch_opts {
3461 prepare_canonical_payload_with_level(bytes, opts.compression_level)?
3462 } else {
3463 prepare_canonical_payload(bytes)?
3464 };
3465 (prepared, encoding, length, None)
3466 } else if let Some(frame) = reuse_frame.as_ref() {
3467 (
3468 Vec::new(),
3469 frame.canonical_encoding,
3470 frame.canonical_length,
3471 Some(frame.id),
3472 )
3473 } else {
3474 return Err(MemvidError::InvalidFrame {
3475 frame_id: 0,
3476 reason: "payload required for frame insertion",
3477 });
3478 };
3479
3480 let mut chunk_plan = raw_chunk_plan;
3482
3483 let mut metadata = options.metadata.take();
3484 let mut search_text = options
3485 .search_text
3486 .take()
3487 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3488 let mut tags = std::mem::take(&mut options.tags);
3489 let mut labels = std::mem::take(&mut options.labels);
3490 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3491 let mut content_dates: Vec<String> = Vec::new();
3492
3493 let need_search_text = search_text
3494 .as_ref()
3495 .is_none_or(|text| text.trim().is_empty());
3496 let need_metadata = metadata.is_none();
3497 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3498
3499 let mut extraction_error = None;
3500 let mut is_skim_extraction = false; let extracted = if run_extractor {
3503 if let Some(bytes) = payload_for_processing {
3504 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3505 let uri_hint = options.uri.as_deref();
3506
3507 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3509
3510 if use_budgeted {
3511 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3513 options.extraction_budget_ms,
3514 );
3515 match crate::extract_budgeted::extract_with_budget(
3516 bytes, mime_hint, uri_hint, budget,
3517 ) {
3518 Ok(result) => {
3519 is_skim_extraction = result.is_skim();
3520 if is_skim_extraction {
3521 tracing::debug!(
3522 coverage = result.coverage,
3523 elapsed_ms = result.elapsed_ms,
3524 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3525 "time-budgeted extraction (skim)"
3526 );
3527 }
3528 let doc = crate::extract::ExtractedDocument {
3530 text: if result.text.is_empty() {
3531 None
3532 } else {
3533 Some(result.text)
3534 },
3535 metadata: serde_json::json!({
3536 "skim": is_skim_extraction,
3537 "coverage": result.coverage,
3538 "sections_extracted": result.sections_extracted,
3539 "sections_total": result.sections_total,
3540 }),
3541 mime_type: mime_hint.map(std::string::ToString::to_string),
3542 };
3543 Some(doc)
3544 }
3545 Err(err) => {
3546 tracing::warn!(
3548 ?err,
3549 "budgeted extraction failed, trying full extraction"
3550 );
3551 match extract_via_registry(bytes, mime_hint, uri_hint) {
3552 Ok(doc) => Some(doc),
3553 Err(err) => {
3554 extraction_error = Some(err);
3555 None
3556 }
3557 }
3558 }
3559 }
3560 } else {
3561 match extract_via_registry(bytes, mime_hint, uri_hint) {
3563 Ok(doc) => Some(doc),
3564 Err(err) => {
3565 extraction_error = Some(err);
3566 None
3567 }
3568 }
3569 }
3570 } else {
3571 None
3572 }
3573 } else {
3574 None
3575 };
3576
3577 if let Some(err) = extraction_error {
3578 return Err(err);
3579 }
3580
3581 if let Some(doc) = &extracted {
3582 if need_search_text {
3583 if let Some(text) = &doc.text {
3584 if let Some(normalized) =
3585 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3586 {
3587 search_text = Some(normalized);
3588 }
3589 }
3590 }
3591
3592 if chunk_plan.is_none() {
3595 if let Some(text) = &doc.text {
3596 chunk_plan = plan_text_chunks(text);
3597 }
3598 }
3599
3600 if let Some(mime) = doc.mime_type.as_ref() {
3601 if let Some(existing) = &mut metadata {
3602 if existing.mime.is_none() {
3603 existing.mime = Some(mime.clone());
3604 }
3605 } else {
3606 let mut doc_meta = DocMetadata::default();
3607 doc_meta.mime = Some(mime.clone());
3608 metadata = Some(doc_meta);
3609 }
3610 }
3611
3612 if options.auto_tag {
3615 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string())
3616 {
3617 extra_metadata
3618 .entry("extractous_metadata".to_string())
3619 .or_insert(meta_json);
3620 }
3621 }
3622 }
3623
3624 if options.auto_tag {
3625 if let Some(ref text) = search_text {
3626 if !text.trim().is_empty() {
3627 let result = AutoTagger.analyse(text, options.extract_dates);
3628 merge_unique(&mut tags, result.tags);
3629 merge_unique(&mut labels, result.labels);
3630 if options.extract_dates && content_dates.is_empty() {
3631 content_dates = result.content_dates;
3632 }
3633 }
3634 }
3635 }
3636
3637 if content_dates.is_empty() {
3638 if let Some(frame) = reuse_frame.as_ref() {
3639 content_dates = frame.content_dates.clone();
3640 }
3641 }
3642
3643 let metadata_ref = metadata.as_ref();
3644 let mut search_text = augment_search_text(
3645 search_text,
3646 options.uri.as_deref(),
3647 options.title.as_deref(),
3648 options.track.as_deref(),
3649 &tags,
3650 &labels,
3651 &extra_metadata,
3652 &content_dates,
3653 metadata_ref,
3654 );
3655 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3656 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3657 let mut parent_chunk_count: Option<u32> = None;
3658
3659 let kind_value = options.kind.take();
3660 let track_value = options.track.take();
3661 let uri_value = options.uri.take();
3662 let title_value = options.title.take();
3663 let should_extract_triplets = options.extract_triplets;
3664 let triplet_uri = uri_value.clone();
3666 let triplet_title = title_value.clone();
3667
3668 if let Some(plan) = chunk_plan.as_ref() {
3669 let chunk_total = u32::try_from(plan.chunks.len()).unwrap_or(0);
3670 parent_chunk_manifest = Some(plan.manifest.clone());
3671 parent_chunk_count = Some(chunk_total);
3672
3673 if let Some(first_chunk) = plan.chunks.first() {
3674 if let Some(normalized) =
3675 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3676 {
3677 if !normalized.trim().is_empty() {
3678 search_text = Some(normalized);
3679 }
3680 }
3681 }
3682
3683 let chunk_tags = tags.clone();
3684 let chunk_labels = labels.clone();
3685 let chunk_metadata = metadata.clone();
3686 let chunk_extra_metadata = extra_metadata.clone();
3687 let chunk_content_dates = content_dates.clone();
3688
3689 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3690 let (chunk_payload, chunk_encoding, chunk_length) =
3691 prepare_canonical_payload(chunk_text.as_bytes())?;
3692 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3693 .map(|n| n.text)
3694 .filter(|text| !text.trim().is_empty());
3695
3696 let chunk_uri = uri_value
3697 .as_ref()
3698 .map(|uri| format!("{uri}#page-{}", idx + 1));
3699 let chunk_title = title_value
3700 .as_ref()
3701 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3702
3703 let chunk_embedding = chunk_embeddings
3705 .as_ref()
3706 .and_then(|embeddings| embeddings.get(idx).cloned());
3707
3708 chunk_entries.push(WalEntryData {
3709 timestamp,
3710 kind: kind_value.clone(),
3711 track: track_value.clone(),
3712 payload: chunk_payload,
3713 embedding: chunk_embedding,
3714 uri: chunk_uri,
3715 title: chunk_title,
3716 canonical_encoding: chunk_encoding,
3717 canonical_length: chunk_length,
3718 metadata: chunk_metadata.clone(),
3719 search_text: chunk_search_text,
3720 tags: chunk_tags.clone(),
3721 labels: chunk_labels.clone(),
3722 extra_metadata: chunk_extra_metadata.clone(),
3723 content_dates: chunk_content_dates.clone(),
3724 chunk_manifest: None,
3725 role: FrameRole::DocumentChunk,
3726 parent_sequence: None,
3727 chunk_index: Some(u32::try_from(idx).unwrap_or(0)),
3728 chunk_count: Some(chunk_total),
3729 op: FrameWalOp::Insert,
3730 target_frame_id: None,
3731 supersedes_frame_id: None,
3732 reuse_payload_from: None,
3733 source_sha256: None, source_path: None,
3735 enrichment_state: crate::types::EnrichmentState::Enriched,
3737 });
3738 }
3739 }
3740
3741 let parent_uri = uri_value.clone();
3742 let parent_title = title_value.clone();
3743
3744 let parent_sequence = if let Some(parent_id) = options.parent_id {
3747 usize::try_from(parent_id)
3752 .ok()
3753 .and_then(|idx| self.toc.frames.get(idx))
3754 .map(|_| parent_id + 2) } else {
3756 None
3757 };
3758
3759 let triplet_text = search_text.clone();
3761
3762 #[cfg(feature = "lex")]
3764 let instant_index_tags = if options.instant_index {
3765 tags.clone()
3766 } else {
3767 Vec::new()
3768 };
3769 #[cfg(feature = "lex")]
3770 let instant_index_labels = if options.instant_index {
3771 labels.clone()
3772 } else {
3773 Vec::new()
3774 };
3775
3776 #[cfg(feature = "lex")]
3778 let needs_enrichment =
3779 options.instant_index && (options.enable_embedding || is_skim_extraction);
3780 #[cfg(feature = "lex")]
3781 let enrichment_state = if needs_enrichment {
3782 crate::types::EnrichmentState::Searchable
3783 } else {
3784 crate::types::EnrichmentState::Enriched
3785 };
3786 #[cfg(not(feature = "lex"))]
3787 let enrichment_state = crate::types::EnrichmentState::Enriched;
3788
3789 let entry = WalEntryData {
3790 timestamp,
3791 kind: kind_value,
3792 track: track_value,
3793 payload: storage_payload,
3794 embedding,
3795 uri: parent_uri,
3796 title: parent_title,
3797 canonical_encoding,
3798 canonical_length,
3799 metadata,
3800 search_text,
3801 tags,
3802 labels,
3803 extra_metadata,
3804 content_dates,
3805 chunk_manifest: parent_chunk_manifest,
3806 role: options.role,
3807 parent_sequence,
3808 chunk_index: None,
3809 chunk_count: parent_chunk_count,
3810 op: FrameWalOp::Insert,
3811 target_frame_id: None,
3812 supersedes_frame_id: supersedes,
3813 reuse_payload_from,
3814 source_sha256,
3815 source_path: source_path_value,
3816 enrichment_state,
3817 };
3818
3819 let parent_bytes = encode_to_vec(WalEntry::Frame(entry), wal_config())?;
3820 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3821 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3822
3823 #[cfg(feature = "lex")]
3826 if options.instant_index && self.tantivy.is_some() {
3827 let frame_id = parent_seq as FrameId;
3829
3830 if let Some(ref text) = triplet_text {
3832 if !text.trim().is_empty() {
3833 let temp_frame = Frame {
3835 id: frame_id,
3836 timestamp,
3837 anchor_ts: None,
3838 anchor_source: None,
3839 kind: options.kind.clone(),
3840 track: options.track.clone(),
3841 payload_offset: 0,
3842 payload_length: 0,
3843 checksum: [0u8; 32],
3844 uri: options
3845 .uri
3846 .clone()
3847 .or_else(|| Some(crate::default_uri(frame_id))),
3848 title: options.title.clone(),
3849 canonical_encoding: crate::types::CanonicalEncoding::default(),
3850 canonical_length: None,
3851 metadata: None, search_text: triplet_text.clone(),
3853 tags: instant_index_tags.clone(),
3854 labels: instant_index_labels.clone(),
3855 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3858 role: options.role,
3859 parent_id: None,
3860 chunk_index: None,
3861 chunk_count: None,
3862 status: FrameStatus::Active,
3863 supersedes,
3864 superseded_by: None,
3865 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3868 };
3869
3870 if let Some(engine) = self.tantivy.as_mut() {
3872 engine.add_frame(&temp_frame, text)?;
3873 engine.soft_commit()?;
3874 self.tantivy_dirty = true;
3875
3876 tracing::debug!(
3877 frame_id = frame_id,
3878 "instant index: frame searchable immediately"
3879 );
3880 }
3881 }
3882 }
3883 }
3884
3885 #[cfg(feature = "lex")]
3889 if needs_enrichment {
3890 let frame_id = parent_seq as FrameId;
3891 self.toc.enrichment_queue.push(frame_id);
3892 tracing::debug!(
3893 frame_id = frame_id,
3894 is_skim = is_skim_extraction,
3895 needs_embedding = options.enable_embedding,
3896 "queued frame for background enrichment"
3897 );
3898 }
3899
3900 for mut chunk_entry in chunk_entries {
3901 chunk_entry.parent_sequence = Some(parent_seq);
3902 let chunk_bytes = encode_to_vec(WalEntry::Frame(chunk_entry), wal_config())?;
3903 self.append_wal_entry(&chunk_bytes)?;
3904 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3905 }
3906
3907 self.dirty = true;
3908 let suppress_checkpoint = self
3909 .batch_opts
3910 .as_ref()
3911 .is_some_and(|o| o.disable_auto_checkpoint);
3912 if !suppress_checkpoint && self.wal.should_checkpoint() {
3913 self.commit()?;
3914 }
3915
3916 #[cfg(feature = "replay")]
3918 if let Some(input_bytes) = payload {
3919 self.record_put_action(parent_seq, input_bytes);
3920 }
3921
3922 if should_extract_triplets {
3925 if let Some(ref text) = triplet_text {
3926 if !text.trim().is_empty() {
3927 let extractor = TripletExtractor::default();
3928 let frame_id = parent_seq as FrameId;
3929 let (cards, _stats) = extractor.extract(
3930 frame_id,
3931 text,
3932 triplet_uri.as_deref(),
3933 triplet_title.as_deref(),
3934 timestamp,
3935 );
3936
3937 if !cards.is_empty() {
3938 let card_ids = self.memories_track.add_cards(cards);
3940
3941 self.memories_track
3943 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3944 }
3945 }
3946 }
3947 }
3948
3949 Ok(parent_seq)
3950 }
3951}
3952
3953#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3954#[serde(rename_all = "snake_case")]
3955pub(crate) enum FrameWalOp {
3956 Insert,
3957 Tombstone,
3958}
3959
3960impl Default for FrameWalOp {
3961 fn default() -> Self {
3962 Self::Insert
3963 }
3964}
3965
3966#[derive(Debug, Serialize, Deserialize)]
3967enum WalEntry {
3968 Frame(WalEntryData),
3969 #[cfg(feature = "lex")]
3970 Lex(LexWalBatch),
3971}
3972
3973fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3974 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3975 return Ok(entry);
3976 }
3977 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3978 Ok(WalEntry::Frame(legacy))
3979}
3980
3981#[derive(Debug, Serialize, Deserialize)]
3982pub(crate) struct WalEntryData {
3983 pub(crate) timestamp: i64,
3984 pub(crate) kind: Option<String>,
3985 pub(crate) track: Option<String>,
3986 pub(crate) payload: Vec<u8>,
3987 pub(crate) embedding: Option<Vec<f32>>,
3988 #[serde(default)]
3989 pub(crate) uri: Option<String>,
3990 #[serde(default)]
3991 pub(crate) title: Option<String>,
3992 #[serde(default)]
3993 pub(crate) canonical_encoding: CanonicalEncoding,
3994 #[serde(default)]
3995 pub(crate) canonical_length: Option<u64>,
3996 #[serde(default)]
3997 pub(crate) metadata: Option<DocMetadata>,
3998 #[serde(default)]
3999 pub(crate) search_text: Option<String>,
4000 #[serde(default)]
4001 pub(crate) tags: Vec<String>,
4002 #[serde(default)]
4003 pub(crate) labels: Vec<String>,
4004 #[serde(default)]
4005 pub(crate) extra_metadata: BTreeMap<String, String>,
4006 #[serde(default)]
4007 pub(crate) content_dates: Vec<String>,
4008 #[serde(default)]
4009 pub(crate) chunk_manifest: Option<TextChunkManifest>,
4010 #[serde(default)]
4011 pub(crate) role: FrameRole,
4012 #[serde(default)]
4013 pub(crate) parent_sequence: Option<u64>,
4014 #[serde(default)]
4015 pub(crate) chunk_index: Option<u32>,
4016 #[serde(default)]
4017 pub(crate) chunk_count: Option<u32>,
4018 #[serde(default)]
4019 pub(crate) op: FrameWalOp,
4020 #[serde(default)]
4021 pub(crate) target_frame_id: Option<FrameId>,
4022 #[serde(default)]
4023 pub(crate) supersedes_frame_id: Option<FrameId>,
4024 #[serde(default)]
4025 pub(crate) reuse_payload_from: Option<FrameId>,
4026 #[serde(default)]
4028 pub(crate) source_sha256: Option<[u8; 32]>,
4029 #[serde(default)]
4031 pub(crate) source_path: Option<String>,
4032 #[serde(default)]
4034 pub(crate) enrichment_state: crate::types::EnrichmentState,
4035}
4036
4037pub(crate) fn prepare_canonical_payload(
4038 payload: &[u8],
4039) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4040 prepare_canonical_payload_with_level(payload, 3)
4041}
4042
4043pub(crate) fn prepare_canonical_payload_with_level(
4044 payload: &[u8],
4045 level: i32,
4046) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
4047 if level == 0 {
4048 return Ok((
4050 payload.to_vec(),
4051 CanonicalEncoding::Plain,
4052 Some(payload.len() as u64),
4053 ));
4054 }
4055 if std::str::from_utf8(payload).is_ok() {
4056 let compressed = zstd::encode_all(std::io::Cursor::new(payload), level)?;
4057 Ok((
4058 compressed,
4059 CanonicalEncoding::Zstd,
4060 Some(payload.len() as u64),
4061 ))
4062 } else {
4063 Ok((
4064 payload.to_vec(),
4065 CanonicalEncoding::Plain,
4066 Some(payload.len() as u64),
4067 ))
4068 }
4069}
4070
4071pub(crate) fn augment_search_text(
4072 base: Option<String>,
4073 uri: Option<&str>,
4074 title: Option<&str>,
4075 track: Option<&str>,
4076 tags: &[String],
4077 labels: &[String],
4078 extra_metadata: &BTreeMap<String, String>,
4079 content_dates: &[String],
4080 metadata: Option<&DocMetadata>,
4081) -> Option<String> {
4082 let mut segments: Vec<String> = Vec::new();
4083 if let Some(text) = base {
4084 let trimmed = text.trim();
4085 if !trimmed.is_empty() {
4086 segments.push(trimmed.to_string());
4087 }
4088 }
4089
4090 if let Some(title) = title {
4091 if !title.trim().is_empty() {
4092 segments.push(format!("title: {}", title.trim()));
4093 }
4094 }
4095
4096 if let Some(uri) = uri {
4097 if !uri.trim().is_empty() {
4098 segments.push(format!("uri: {}", uri.trim()));
4099 }
4100 }
4101
4102 if let Some(track) = track {
4103 if !track.trim().is_empty() {
4104 segments.push(format!("track: {}", track.trim()));
4105 }
4106 }
4107
4108 if !tags.is_empty() {
4109 segments.push(format!("tags: {}", tags.join(" ")));
4110 }
4111
4112 if !labels.is_empty() {
4113 segments.push(format!("labels: {}", labels.join(" ")));
4114 }
4115
4116 if !extra_metadata.is_empty() {
4117 for (key, value) in extra_metadata {
4118 if value.trim().is_empty() {
4119 continue;
4120 }
4121 segments.push(format!("{key}: {value}"));
4122 }
4123 }
4124
4125 if !content_dates.is_empty() {
4126 segments.push(format!("dates: {}", content_dates.join(" ")));
4127 }
4128
4129 if let Some(meta) = metadata {
4130 if let Ok(meta_json) = serde_json::to_string(meta) {
4131 segments.push(format!("metadata: {meta_json}"));
4132 }
4133 }
4134
4135 if segments.is_empty() {
4136 None
4137 } else {
4138 Some(segments.join("\n"))
4139 }
4140}
4141
4142pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
4143 if additions.is_empty() {
4144 return;
4145 }
4146 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
4147 for value in additions {
4148 let trimmed = value.trim();
4149 if trimmed.is_empty() {
4150 continue;
4151 }
4152 let candidate = trimmed.to_string();
4153 if seen.insert(candidate.clone()) {
4154 target.push(candidate);
4155 }
4156 }
4157}