1use std::cmp::min;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use std::fs::{File, OpenOptions};
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::path::Path;
15use std::sync::OnceLock;
16use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
17
18use bincode::serde::{decode_from_slice, encode_to_vec};
19use blake3::hash;
20use log::info;
21#[cfg(feature = "temporal_track")]
22use once_cell::sync::OnceCell;
23#[cfg(feature = "temporal_track")]
24use regex::Regex;
25use serde::{Deserialize, Serialize};
26use serde_json;
27use zstd;
28
29use atomic_write_file::AtomicWriteFile;
30
31use tracing::instrument;
32
33#[cfg(feature = "parallel_segments")]
34use super::{
35 builder::BuildOpts,
36 planner::{SegmentChunkPlan, SegmentPlanner},
37 workers::SegmentWorkerPool,
38};
39#[cfg(feature = "temporal_track")]
40use crate::TemporalTrackManifest;
41use crate::analysis::auto_tag::AutoTagger;
42use crate::constants::{WAL_SIZE_LARGE, WAL_SIZE_MEDIUM};
43use crate::footer::CommitFooter;
44use crate::io::wal::{EmbeddedWal, WalRecord};
45use crate::memvid::chunks::{plan_document_chunks, plan_text_chunks};
46use crate::memvid::lifecycle::{Memvid, prepare_toc_bytes};
47use crate::reader::{
48 DocumentFormat, DocumentReader, PassthroughReader, ReaderDiagnostics, ReaderHint, ReaderOutput,
49 ReaderRegistry,
50};
51#[cfg(feature = "lex")]
52use crate::search::{EmbeddedLexSegment, LexWalBatch, TantivySnapshot};
53use crate::triplet::TripletExtractor;
54#[cfg(feature = "lex")]
55use crate::types::TantivySegmentDescriptor;
56use crate::types::{
57 CanonicalEncoding, DocMetadata, Frame, FrameId, FrameRole, FrameStatus, PutOptions,
58 SegmentCommon, TextChunkManifest, Tier,
59};
60#[cfg(feature = "parallel_segments")]
61use crate::types::{IndexSegmentRef, SegmentKind, SegmentSpan, SegmentStats};
62#[cfg(feature = "temporal_track")]
63use crate::{
64 AnchorSource, TemporalAnchor, TemporalContext, TemporalMention, TemporalMentionFlags,
65 TemporalMentionKind, TemporalNormalizer, TemporalResolution, TemporalResolutionFlag,
66 TemporalResolutionValue,
67};
68use crate::{
69 DEFAULT_SEARCH_TEXT_LIMIT, ExtractedDocument, MemvidError, Result, TimeIndexEntry,
70 TimeIndexManifest, VecIndexManifest, normalize_text, time_index_append, wal_config,
71};
72#[cfg(feature = "temporal_track")]
73use time::{Date, Month, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset};
74
75const MAGIC_SNIFF_BYTES: usize = 16;
76const WAL_ENTRY_HEADER_SIZE: u64 = 48;
77const WAL_SHIFT_BUFFER_SIZE: usize = 8 * 1024 * 1024;
78
79#[cfg(feature = "temporal_track")]
80const DEFAULT_TEMPORAL_TZ: &str = "America/Chicago";
81
82#[cfg(feature = "temporal_track")]
83const STATIC_TEMPORAL_PHRASES: &[&str] = &[
84 "today",
85 "yesterday",
86 "tomorrow",
87 "two days ago",
88 "in 3 days",
89 "two weeks from now",
90 "2 weeks from now",
91 "two fridays ago",
92 "last friday",
93 "next friday",
94 "this friday",
95 "next week",
96 "last week",
97 "end of this month",
98 "start of next month",
99 "last month",
100 "3 months ago",
101 "in 90 minutes",
102 "at 5pm today",
103 "in the last 24 hours",
104 "this morning",
105 "on the sunday after next",
106 "next daylight saving change",
107 "midnight tomorrow",
108 "noon next tuesday",
109 "first business day of next month",
110 "the first business day of next month",
111 "end of q3",
112 "next wednesday at 9",
113 "sunday at 1:30am",
114 "monday",
115 "tuesday",
116 "wednesday",
117 "thursday",
118 "friday",
119 "saturday",
120 "sunday",
121];
122
123struct CommitStaging {
124 atomic: AtomicWriteFile,
125}
126
127impl CommitStaging {
128 fn prepare(path: &Path) -> Result<Self> {
129 let mut options = AtomicWriteFile::options();
130 options.read(true);
131 let atomic = options.open(path)?;
132 Ok(Self { atomic })
133 }
134
135 fn copy_from(&mut self, source: &File) -> Result<()> {
136 let mut reader = source.try_clone()?;
137 reader.seek(SeekFrom::Start(0))?;
138
139 let writer = self.atomic.as_file_mut();
140 writer.set_len(0)?;
141 writer.seek(SeekFrom::Start(0))?;
142 std::io::copy(&mut reader, writer)?;
143 writer.flush()?;
144 writer.sync_all()?;
145 Ok(())
146 }
147
148 fn clone_file(&self) -> Result<File> {
149 Ok(self.atomic.as_file().try_clone()?)
150 }
151
152 fn commit(self) -> Result<()> {
153 self.atomic.commit().map_err(Into::into)
154 }
155
156 fn discard(self) -> Result<()> {
157 self.atomic.discard().map_err(Into::into)
158 }
159}
160
161#[derive(Debug, Default)]
162struct IngestionDelta {
163 inserted_frames: Vec<FrameId>,
164 inserted_embeddings: Vec<(FrameId, Vec<f32>)>,
165 inserted_time_entries: Vec<TimeIndexEntry>,
166 mutated_frames: bool,
167 #[cfg(feature = "temporal_track")]
168 inserted_temporal_mentions: Vec<TemporalMention>,
169 #[cfg(feature = "temporal_track")]
170 inserted_temporal_anchors: Vec<TemporalAnchor>,
171}
172
173impl IngestionDelta {
174 fn is_empty(&self) -> bool {
175 #[allow(unused_mut)]
176 let mut empty = self.inserted_frames.is_empty()
177 && self.inserted_embeddings.is_empty()
178 && self.inserted_time_entries.is_empty()
179 && !self.mutated_frames;
180 #[cfg(feature = "temporal_track")]
181 {
182 empty = empty
183 && self.inserted_temporal_mentions.is_empty()
184 && self.inserted_temporal_anchors.is_empty();
185 }
186 empty
187 }
188}
189
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub enum CommitMode {
192 Full,
193 Incremental,
194}
195
196impl Default for CommitMode {
197 fn default() -> Self {
198 Self::Full
199 }
200}
201
202#[derive(Clone, Copy, Debug, Default)]
203pub struct CommitOptions {
204 pub mode: CommitMode,
205 pub background: bool,
206}
207
208impl CommitOptions {
209 pub fn new(mode: CommitMode) -> Self {
210 Self {
211 mode,
212 background: false,
213 }
214 }
215
216 pub fn background(mut self, background: bool) -> Self {
217 self.background = background;
218 self
219 }
220}
221
222fn default_reader_registry() -> &'static ReaderRegistry {
223 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
224 REGISTRY.get_or_init(ReaderRegistry::default)
225}
226
227fn infer_document_format(
228 mime: Option<&str>,
229 magic: Option<&[u8]>,
230 uri: Option<&str>,
231) -> Option<DocumentFormat> {
232 if detect_pdf_magic(magic) {
234 return Some(DocumentFormat::Pdf);
235 }
236
237 if let Some(magic_bytes) = magic {
240 if magic_bytes.starts_with(&[0x50, 0x4B, 0x03, 0x04]) {
241 if let Some(format) = infer_format_from_extension(uri) {
243 return Some(format);
244 }
245 }
246 }
247
248 if let Some(mime_str) = mime {
250 let mime_lower = mime_str.trim().to_ascii_lowercase();
251 let format = match mime_lower.as_str() {
252 "application/pdf" => Some(DocumentFormat::Pdf),
253 "text/plain" => Some(DocumentFormat::PlainText),
254 "text/markdown" => Some(DocumentFormat::Markdown),
255 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
256 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
257 Some(DocumentFormat::Docx)
258 }
259 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
260 Some(DocumentFormat::Xlsx)
261 }
262 "application/vnd.ms-excel" => Some(DocumentFormat::Xls),
263 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
264 Some(DocumentFormat::Pptx)
265 }
266 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
267 _ => None,
268 };
269 if format.is_some() {
270 return format;
271 }
272 }
273
274 infer_format_from_extension(uri)
276}
277
278fn infer_format_from_extension(uri: Option<&str>) -> Option<DocumentFormat> {
280 let uri = uri?;
281 let path = std::path::Path::new(uri);
282 let ext = path.extension()?.to_str()?.to_ascii_lowercase();
283 match ext.as_str() {
284 "pdf" => Some(DocumentFormat::Pdf),
285 "docx" => Some(DocumentFormat::Docx),
286 "xlsx" => Some(DocumentFormat::Xlsx),
287 "xls" => Some(DocumentFormat::Xls),
288 "pptx" => Some(DocumentFormat::Pptx),
289 "txt" | "text" | "log" | "cfg" | "ini" | "json" | "yaml" | "yml" | "toml" | "csv"
290 | "tsv" | "rs" | "py" | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go"
291 | "rb" | "php" | "css" | "scss" | "sh" | "bash" | "swift" | "kt" | "java" | "scala"
292 | "sql" => Some(DocumentFormat::PlainText),
293 "md" | "markdown" => Some(DocumentFormat::Markdown),
294 "html" | "htm" => Some(DocumentFormat::Html),
295 _ => None,
296 }
297}
298
299fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
300 let mut slice = match magic {
301 Some(slice) if !slice.is_empty() => slice,
302 _ => return false,
303 };
304 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
305 slice = &slice[3..];
306 }
307 while let Some((first, rest)) = slice.split_first() {
308 if first.is_ascii_whitespace() {
309 slice = rest;
310 } else {
311 break;
312 }
313 }
314 slice.starts_with(b"%PDF")
315}
316
317#[instrument(
318 target = "memvid::extract",
319 skip_all,
320 fields(mime = mime_hint, uri = uri)
321)]
322fn extract_via_registry(
323 bytes: &[u8],
324 mime_hint: Option<&str>,
325 uri: Option<&str>,
326) -> Result<ExtractedDocument> {
327 let registry = default_reader_registry();
328 let magic = bytes
329 .get(..MAGIC_SNIFF_BYTES)
330 .and_then(|slice| if slice.is_empty() { None } else { Some(slice) });
331 let hint = ReaderHint::new(mime_hint, infer_document_format(mime_hint, magic, uri))
332 .with_uri(uri)
333 .with_magic(magic);
334
335 let fallback_reason = if let Some(reader) = registry.find_reader(&hint) {
336 let start = Instant::now();
337 match reader.extract(bytes, &hint) {
338 Ok(output) => {
339 return Ok(finalize_reader_output(output, start));
340 }
341 Err(err) => {
342 tracing::error!(
343 target = "memvid::extract",
344 reader = reader.name(),
345 error = %err,
346 "reader failed; falling back"
347 );
348 Some(format!("reader {} failed: {err}", reader.name()))
349 }
350 }
351 } else {
352 tracing::debug!(
353 target = "memvid::extract",
354 format = hint.format.map(|f| f.label()),
355 "no reader matched; using default extractor"
356 );
357 Some("no registered reader matched; using default extractor".to_string())
358 };
359
360 let start = Instant::now();
361 let mut output = PassthroughReader.extract(bytes, &hint)?;
362 if let Some(reason) = fallback_reason {
363 output.diagnostics.track_warning(reason);
364 }
365 Ok(finalize_reader_output(output, start))
366}
367
368fn finalize_reader_output(output: ReaderOutput, start: Instant) -> ExtractedDocument {
369 let elapsed = start.elapsed();
370 let ReaderOutput {
371 document,
372 reader_name,
373 diagnostics,
374 } = output;
375 log_reader_result(&reader_name, &diagnostics, elapsed);
376 document
377}
378
379fn log_reader_result(reader: &str, diagnostics: &ReaderDiagnostics, elapsed: Duration) {
380 let duration_ms = diagnostics
381 .duration_ms
382 .unwrap_or_else(|| elapsed.as_millis() as u64);
383 let warnings = diagnostics.warnings.len();
384 let pages = diagnostics.pages_processed;
385
386 if warnings > 0 || diagnostics.fallback {
387 tracing::warn!(
388 target = "memvid::extract",
389 reader,
390 duration_ms,
391 pages,
392 warnings,
393 fallback = diagnostics.fallback,
394 "extraction completed with warnings"
395 );
396 for warning in diagnostics.warnings.iter() {
397 tracing::warn!(target = "memvid::extract", reader, warning = %warning);
398 }
399 } else {
400 tracing::info!(
401 target = "memvid::extract",
402 reader,
403 duration_ms,
404 pages,
405 "extraction completed"
406 );
407 }
408}
409
410impl Memvid {
411 fn with_staging_lock<F>(&mut self, op: F) -> Result<()>
414 where
415 F: FnOnce(&mut Self) -> Result<()>,
416 {
417 self.file.sync_all()?;
418 let mut staging = CommitStaging::prepare(self.path())?;
419 staging.copy_from(&self.file)?;
420
421 let staging_handle = staging.clone_file()?;
422 let new_wal = EmbeddedWal::open(&staging_handle, &self.header)?;
423 let original_file = std::mem::replace(&mut self.file, staging_handle);
424 let original_wal = std::mem::replace(&mut self.wal, new_wal);
425 let original_header = self.header.clone();
426 let original_toc = self.toc.clone();
427 let original_data_end = self.data_end;
428 let original_generation = self.generation;
429 let original_dirty = self.dirty;
430 #[cfg(feature = "lex")]
431 let original_tantivy_dirty = self.tantivy_dirty;
432
433 let destination_path = self.path().to_path_buf();
434 let mut original_file = Some(original_file);
435 let mut original_wal = Some(original_wal);
436
437 match op(self) {
438 Ok(()) => {
439 self.file.sync_all()?;
440 match staging.commit() {
441 Ok(()) => {
442 drop(original_file.take());
443 drop(original_wal.take());
444 self.file = OpenOptions::new()
445 .read(true)
446 .write(true)
447 .open(&destination_path)?;
448 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
449 Ok(())
450 }
451 Err(commit_err) => {
452 if let Some(file) = original_file.take() {
453 self.file = file;
454 }
455 if let Some(wal) = original_wal.take() {
456 self.wal = wal;
457 }
458 self.header = original_header;
459 self.toc = original_toc;
460 self.data_end = original_data_end;
461 self.generation = original_generation;
462 self.dirty = original_dirty;
463 #[cfg(feature = "lex")]
464 {
465 self.tantivy_dirty = original_tantivy_dirty;
466 }
467 Err(commit_err.into())
468 }
469 }
470 }
471 Err(err) => {
472 let _ = staging.discard();
473 if let Some(file) = original_file.take() {
474 self.file = file;
475 }
476 if let Some(wal) = original_wal.take() {
477 self.wal = wal;
478 }
479 self.header = original_header;
480 self.toc = original_toc;
481 self.data_end = original_data_end;
482 self.generation = original_generation;
483 self.dirty = original_dirty;
484 #[cfg(feature = "lex")]
485 {
486 self.tantivy_dirty = original_tantivy_dirty;
487 }
488 Err(err)
489 }
490 }
491 }
492
493 pub(crate) fn catalog_data_end(&self) -> u64 {
494 let mut max_end = self.header.wal_offset + self.header.wal_size;
495
496 for descriptor in &self.toc.segment_catalog.lex_segments {
497 if descriptor.common.bytes_length == 0 {
498 continue;
499 }
500 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
501 }
502
503 for descriptor in &self.toc.segment_catalog.vec_segments {
504 if descriptor.common.bytes_length == 0 {
505 continue;
506 }
507 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
508 }
509
510 for descriptor in &self.toc.segment_catalog.time_segments {
511 if descriptor.common.bytes_length == 0 {
512 continue;
513 }
514 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
515 }
516
517 #[cfg(feature = "temporal_track")]
518 for descriptor in &self.toc.segment_catalog.temporal_segments {
519 if descriptor.common.bytes_length == 0 {
520 continue;
521 }
522 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
523 }
524
525 #[cfg(feature = "lex")]
526 for descriptor in &self.toc.segment_catalog.tantivy_segments {
527 if descriptor.common.bytes_length == 0 {
528 continue;
529 }
530 max_end = max_end.max(descriptor.common.bytes_offset + descriptor.common.bytes_length);
531 }
532
533 if let Some(manifest) = self.toc.indexes.lex.as_ref() {
534 if manifest.bytes_length != 0 {
535 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
536 }
537 }
538
539 if let Some(manifest) = self.toc.indexes.vec.as_ref() {
540 if manifest.bytes_length != 0 {
541 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
542 }
543 }
544
545 if let Some(manifest) = self.toc.time_index.as_ref() {
546 if manifest.bytes_length != 0 {
547 max_end = max_end.max(manifest.bytes_offset + manifest.bytes_length);
548 }
549 }
550
551 #[cfg(feature = "temporal_track")]
552 if let Some(track) = self.toc.temporal_track.as_ref() {
553 if track.bytes_length != 0 {
554 max_end = max_end.max(track.bytes_offset + track.bytes_length);
555 }
556 }
557
558 max_end
559 }
560
561 fn payload_region_end(&self) -> u64 {
562 let wal_region_end = self.header.wal_offset + self.header.wal_size;
563 let frames_with_payload: Vec<_> = self
564 .toc
565 .frames
566 .iter()
567 .filter(|frame| frame.payload_length != 0)
568 .collect();
569
570 tracing::info!(
571 "payload_region_end: found {} frames with payloads out of {} total frames, wal_region_end={}",
572 frames_with_payload.len(),
573 self.toc.frames.len(),
574 wal_region_end
575 );
576
577 for (idx, frame) in frames_with_payload.iter().enumerate().take(3) {
578 tracing::info!(
579 " frame[{}]: id={} offset={} length={} status={:?}",
580 idx,
581 frame.id,
582 frame.payload_offset,
583 frame.payload_length,
584 frame.status
585 );
586 }
587
588 let result = frames_with_payload
589 .iter()
590 .fold(wal_region_end, |max_end, frame| {
591 match frame.payload_offset.checked_add(frame.payload_length) {
592 Some(end) => max_end.max(end),
593 None => max_end,
594 }
595 });
596
597 tracing::info!("payload_region_end: returning {}", result);
598 result
599 }
600
601 fn append_wal_entry(&mut self, payload: &[u8]) -> Result<u64> {
602 loop {
603 match self.wal.append_entry(payload) {
604 Ok(seq) => return Ok(seq),
605 Err(MemvidError::CheckpointFailed { reason })
606 if reason == "embedded WAL region too small for entry"
607 || reason == "embedded WAL region full" =>
608 {
609 let required = WAL_ENTRY_HEADER_SIZE
612 .saturating_add(payload.len() as u64)
613 .max(self.header.wal_size + 1);
614 self.grow_wal_region(required)?;
615 }
616 Err(err) => return Err(err),
617 }
618 }
619 }
620
621 fn grow_wal_region(&mut self, required_entry_size: u64) -> Result<()> {
622 let mut new_size = self.header.wal_size;
623 let mut target = required_entry_size;
624 if target == 0 {
625 target = self.header.wal_size;
626 }
627 while new_size <= target {
628 new_size = new_size
629 .checked_mul(2)
630 .ok_or_else(|| MemvidError::CheckpointFailed {
631 reason: "wal_size overflow".into(),
632 })?;
633 }
634 let delta = new_size - self.header.wal_size;
635 if delta == 0 {
636 return Ok(());
637 }
638
639 self.shift_data_for_wal_growth(delta)?;
640 self.header.wal_size = new_size;
641 self.header.footer_offset = self.header.footer_offset.saturating_add(delta);
642 self.data_end = self.data_end.saturating_add(delta);
643 self.adjust_offsets_after_wal_growth(delta);
644
645 let catalog_end = self.catalog_data_end();
646 self.header.footer_offset = catalog_end
647 .max(self.header.footer_offset)
648 .max(self.data_end);
649
650 self.rewrite_toc_footer()?;
651 self.header.toc_checksum = self.toc.toc_checksum;
652 crate::persist_header(&mut self.file, &self.header)?;
653 self.file.sync_all()?;
654 self.wal = EmbeddedWal::open(&self.file, &self.header)?;
655 Ok(())
656 }
657
658 fn shift_data_for_wal_growth(&mut self, delta: u64) -> Result<()> {
659 if delta == 0 {
660 return Ok(());
661 }
662 let original_len = self.file.metadata()?.len();
663 let data_start = self.header.wal_offset + self.header.wal_size;
664 self.file.set_len(original_len + delta)?;
665
666 let mut remaining = original_len.saturating_sub(data_start);
667 let mut buffer = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
668 while remaining > 0 {
669 let chunk = min(remaining, buffer.len() as u64);
670 let src = data_start + remaining - chunk;
671 self.file.seek(SeekFrom::Start(src))?;
672 self.file.read_exact(&mut buffer[..chunk as usize])?;
673 let dst = src + delta;
674 self.file.seek(SeekFrom::Start(dst))?;
675 self.file.write_all(&buffer[..chunk as usize])?;
676 remaining -= chunk;
677 }
678
679 self.file.seek(SeekFrom::Start(data_start))?;
680 let zero_buf = vec![0u8; WAL_SHIFT_BUFFER_SIZE];
681 let mut remaining = delta;
682 while remaining > 0 {
683 let write = min(remaining, zero_buf.len() as u64);
684 self.file.write_all(&zero_buf[..write as usize])?;
685 remaining -= write;
686 }
687 Ok(())
688 }
689
690 fn adjust_offsets_after_wal_growth(&mut self, delta: u64) {
691 if delta == 0 {
692 return;
693 }
694
695 for frame in &mut self.toc.frames {
696 if frame.payload_offset != 0 {
697 frame.payload_offset += delta;
698 }
699 }
700
701 for segment in &mut self.toc.segments {
702 if segment.bytes_offset != 0 {
703 segment.bytes_offset += delta;
704 }
705 }
706
707 if let Some(lex) = self.toc.indexes.lex.as_mut() {
708 if lex.bytes_offset != 0 {
709 lex.bytes_offset += delta;
710 }
711 }
712 for manifest in &mut self.toc.indexes.lex_segments {
713 if manifest.bytes_offset != 0 {
714 manifest.bytes_offset += delta;
715 }
716 }
717 if let Some(vec) = self.toc.indexes.vec.as_mut() {
718 if vec.bytes_offset != 0 {
719 vec.bytes_offset += delta;
720 }
721 }
722 if let Some(time_index) = self.toc.time_index.as_mut() {
723 if time_index.bytes_offset != 0 {
724 time_index.bytes_offset += delta;
725 }
726 }
727 #[cfg(feature = "temporal_track")]
728 if let Some(track) = self.toc.temporal_track.as_mut() {
729 if track.bytes_offset != 0 {
730 track.bytes_offset += delta;
731 }
732 }
733
734 let catalog = &mut self.toc.segment_catalog;
735 for descriptor in &mut catalog.lex_segments {
736 if descriptor.common.bytes_offset != 0 {
737 descriptor.common.bytes_offset += delta;
738 }
739 }
740 for descriptor in &mut catalog.vec_segments {
741 if descriptor.common.bytes_offset != 0 {
742 descriptor.common.bytes_offset += delta;
743 }
744 }
745 for descriptor in &mut catalog.time_segments {
746 if descriptor.common.bytes_offset != 0 {
747 descriptor.common.bytes_offset += delta;
748 }
749 }
750 #[cfg(feature = "temporal_track")]
751 for descriptor in &mut catalog.temporal_segments {
752 if descriptor.common.bytes_offset != 0 {
753 descriptor.common.bytes_offset += delta;
754 }
755 }
756 for descriptor in &mut catalog.tantivy_segments {
757 if descriptor.common.bytes_offset != 0 {
758 descriptor.common.bytes_offset += delta;
759 }
760 }
761
762 #[cfg(feature = "lex")]
763 if let Ok(mut storage) = self.lex_storage.write() {
764 storage.adjust_offsets(delta);
765 }
766 }
767 pub fn commit_with_options(&mut self, options: CommitOptions) -> Result<()> {
768 self.ensure_writable()?;
769 if options.background {
770 tracing::debug!("commit background flag ignored; running synchronously");
771 }
772 let mode = options.mode;
773 let records = self.wal.pending_records()?;
774 if records.is_empty() && !self.dirty && !self.tantivy_index_pending() {
775 return Ok(());
776 }
777 self.with_staging_lock(move |mem| mem.commit_from_records(records, mode))
778 }
779
780 pub fn commit(&mut self) -> Result<()> {
781 self.ensure_writable()?;
782 self.commit_with_options(CommitOptions::new(CommitMode::Full))
783 }
784
785 fn commit_from_records(&mut self, records: Vec<WalRecord>, _mode: CommitMode) -> Result<()> {
786 self.generation = self.generation.wrapping_add(1);
787
788 let delta = self.apply_records(records)?;
789 let mut indexes_rebuilt = false;
790
791 let clip_needs_persist = self
793 .clip_index
794 .as_ref()
795 .map_or(false, |idx| !idx.is_empty());
796
797 if !delta.is_empty() || clip_needs_persist {
798 tracing::debug!(
799 inserted_frames = delta.inserted_frames.len(),
800 inserted_embeddings = delta.inserted_embeddings.len(),
801 inserted_time_entries = delta.inserted_time_entries.len(),
802 clip_needs_persist = clip_needs_persist,
803 "commit applied delta"
804 );
805 self.rebuild_indexes(&delta.inserted_embeddings)?;
806 indexes_rebuilt = true;
807 }
808
809 if !indexes_rebuilt && self.tantivy_index_pending() {
810 self.flush_tantivy()?;
811 }
812
813 if !indexes_rebuilt && self.clip_enabled {
815 if let Some(ref clip_index) = self.clip_index {
816 if !clip_index.is_empty() {
817 self.persist_clip_index()?;
818 }
819 }
820 }
821
822 if !indexes_rebuilt && self.memories_track.card_count() > 0 {
824 self.persist_memories_track()?;
825 }
826
827 if !indexes_rebuilt && !self.logic_mesh.is_empty() {
829 self.persist_logic_mesh()?;
830 }
831
832 if !self.sketch_track.is_empty() {
834 self.persist_sketch_track()?;
835 }
836
837 self.rewrite_toc_footer()?;
841 self.header.toc_checksum = self.toc.toc_checksum;
842 self.wal.record_checkpoint(&mut self.header)?;
843 self.header.toc_checksum = self.toc.toc_checksum;
844 crate::persist_header(&mut self.file, &self.header)?;
845 self.file.sync_all()?;
846 #[cfg(feature = "parallel_segments")]
847 if let Some(wal) = self.manifest_wal.as_mut() {
848 wal.flush()?;
849 wal.truncate()?;
850 }
851 self.pending_frame_inserts = 0;
852 self.dirty = false;
853 Ok(())
854 }
855
856 #[cfg(feature = "parallel_segments")]
857 pub(crate) fn commit_parallel_with_opts(&mut self, opts: &BuildOpts) -> Result<()> {
858 self.ensure_writable()?;
859 if !self.dirty && !self.tantivy_index_pending() {
860 return Ok(());
861 }
862 let opts = opts.clone();
863 self.with_staging_lock(move |mem| mem.commit_parallel_inner(&opts))
864 }
865
866 #[cfg(feature = "parallel_segments")]
867 fn commit_parallel_inner(&mut self, opts: &BuildOpts) -> Result<()> {
868 if !self.dirty && !self.tantivy_index_pending() {
869 return Ok(());
870 }
871 let records = self.wal.pending_records()?;
872 let delta = self.apply_records(records)?;
873 self.generation = self.generation.wrapping_add(1);
874 let mut indexes_rebuilt = false;
875 if !delta.is_empty() {
876 tracing::info!(
877 inserted_frames = delta.inserted_frames.len(),
878 inserted_embeddings = delta.inserted_embeddings.len(),
879 inserted_time_entries = delta.inserted_time_entries.len(),
880 "parallel commit applied delta"
881 );
882 let used_parallel = self.publish_parallel_delta(&delta, opts)?;
884 tracing::info!(
885 "parallel_commit: used_parallel={}, lex_enabled={}",
886 used_parallel,
887 self.lex_enabled
888 );
889 if used_parallel {
890 self.header.footer_offset = self.data_end;
893 indexes_rebuilt = true;
894
895 #[cfg(feature = "lex")]
898 if self.lex_enabled {
899 tracing::info!(
900 "parallel_commit: incremental Tantivy update, new_frames={}, total_frames={}",
901 delta.inserted_frames.len(),
902 self.toc.frames.len()
903 );
904
905 if self.tantivy.is_none() {
907 self.init_tantivy()?;
908 }
909
910 let max_payload = crate::memvid::search::max_index_payload();
912 let mut prepared_docs: Vec<(Frame, String)> = Vec::new();
913
914 for frame_id in &delta.inserted_frames {
915 let frame = match self.toc.frames.get(*frame_id as usize) {
917 Some(f) => f.clone(),
918 None => continue,
919 };
920
921 let explicit_text = frame.search_text.clone();
923 if let Some(ref search_text) = explicit_text {
924 if !search_text.trim().is_empty() {
925 prepared_docs.push((frame, search_text.clone()));
926 continue;
927 }
928 }
929
930 let mime = frame
932 .metadata
933 .as_ref()
934 .and_then(|m| m.mime.as_deref())
935 .unwrap_or("application/octet-stream");
936
937 if !crate::memvid::search::is_text_indexable_mime(mime) {
938 continue;
939 }
940
941 if frame.payload_length > max_payload {
942 continue;
943 }
944
945 let text = self.frame_search_text(&frame)?;
946 if !text.trim().is_empty() {
947 prepared_docs.push((frame, text));
948 }
949 }
950
951 if let Some(ref mut engine) = self.tantivy {
953 for (frame, text) in &prepared_docs {
954 engine.add_frame(frame, text)?;
955 }
956
957 if !prepared_docs.is_empty() {
958 engine.commit()?;
959 self.tantivy_dirty = true;
960 }
961
962 tracing::info!(
963 "parallel_commit: Tantivy incremental update, added={}, total_docs={}",
964 prepared_docs.len(),
965 engine.num_docs()
966 );
967 } else {
968 tracing::warn!(
969 "parallel_commit: Tantivy engine is None after init_tantivy"
970 );
971 }
972 }
973
974 self.file.seek(SeekFrom::Start(self.data_end))?;
977 let mut time_entries: Vec<TimeIndexEntry> = self
978 .toc
979 .frames
980 .iter()
981 .filter(|frame| {
982 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
983 })
984 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
985 .collect();
986 let (ti_offset, ti_length, ti_checksum) =
987 time_index_append(&mut self.file, &mut time_entries)?;
988 self.toc.time_index = Some(TimeIndexManifest {
989 bytes_offset: ti_offset,
990 bytes_length: ti_length,
991 entry_count: time_entries.len() as u64,
992 checksum: ti_checksum,
993 });
994 self.data_end = ti_offset + ti_length;
996 self.header.footer_offset = self.data_end;
997 tracing::info!(
998 "parallel_commit: rebuilt time_index at offset={}, length={}, entries={}",
999 ti_offset,
1000 ti_length,
1001 time_entries.len()
1002 );
1003 } else {
1004 self.rebuild_indexes(&delta.inserted_embeddings)?;
1006 indexes_rebuilt = true;
1007 }
1008 }
1009
1010 if self.tantivy_dirty || (!indexes_rebuilt && self.tantivy_index_pending()) {
1012 self.flush_tantivy()?;
1013 }
1014
1015 if self.clip_enabled {
1017 if let Some(ref clip_index) = self.clip_index {
1018 if !clip_index.is_empty() {
1019 self.persist_clip_index()?;
1020 }
1021 }
1022 }
1023
1024 if self.memories_track.card_count() > 0 {
1026 self.persist_memories_track()?;
1027 }
1028
1029 if !self.logic_mesh.is_empty() {
1031 self.persist_logic_mesh()?;
1032 }
1033
1034 if !self.sketch_track.is_empty() {
1036 self.persist_sketch_track()?;
1037 }
1038
1039 self.rewrite_toc_footer()?;
1042 self.header.toc_checksum = self.toc.toc_checksum;
1043 self.wal.record_checkpoint(&mut self.header)?;
1044 self.header.toc_checksum = self.toc.toc_checksum;
1045 crate::persist_header(&mut self.file, &self.header)?;
1046 self.file.sync_all()?;
1047 if let Some(wal) = self.manifest_wal.as_mut() {
1048 wal.flush()?;
1049 wal.truncate()?;
1050 }
1051 self.pending_frame_inserts = 0;
1052 self.dirty = false;
1053 Ok(())
1054 }
1055
1056 pub(crate) fn recover_wal(&mut self) -> Result<()> {
1057 let records = self.wal.records_after(self.header.wal_sequence)?;
1058 if records.is_empty() {
1059 if self.tantivy_index_pending() {
1060 self.flush_tantivy()?;
1061 }
1062 return Ok(());
1063 }
1064 let delta = self.apply_records(records)?;
1065 if !delta.is_empty() {
1066 tracing::debug!(
1067 inserted_frames = delta.inserted_frames.len(),
1068 inserted_embeddings = delta.inserted_embeddings.len(),
1069 inserted_time_entries = delta.inserted_time_entries.len(),
1070 "recover applied delta"
1071 );
1072 self.rebuild_indexes(&delta.inserted_embeddings)?;
1073 } else if self.tantivy_index_pending() {
1074 self.flush_tantivy()?;
1075 }
1076 self.wal.record_checkpoint(&mut self.header)?;
1077 crate::persist_header(&mut self.file, &self.header)?;
1078 if !delta.is_empty() {
1079 } else if self.tantivy_index_pending() {
1081 self.flush_tantivy()?;
1082 crate::persist_header(&mut self.file, &self.header)?;
1083 }
1084 self.file.sync_all()?;
1085 self.pending_frame_inserts = 0;
1086 self.dirty = false;
1087 Ok(())
1088 }
1089
1090 fn apply_records(&mut self, records: Vec<WalRecord>) -> Result<IngestionDelta> {
1091 let mut delta = IngestionDelta::default();
1092 if records.is_empty() {
1093 return Ok(delta);
1094 }
1095
1096 let mut data_cursor = self.data_end;
1101 let mut sequence_to_frame: HashMap<u64, FrameId> = HashMap::new();
1102
1103 if !records.is_empty() {
1104 self.file.seek(SeekFrom::Start(data_cursor))?;
1105 for record in records {
1106 let mut entry = match decode_wal_entry(&record.payload)? {
1107 WalEntry::Frame(entry) => entry,
1108 #[cfg(feature = "lex")]
1109 WalEntry::Lex(batch) => {
1110 self.apply_lex_wal(batch)?;
1111 continue;
1112 }
1113 };
1114
1115 match entry.op {
1116 FrameWalOp::Insert => {
1117 let frame_id = self.toc.frames.len() as u64;
1118
1119 let (
1120 payload_offset,
1121 payload_length,
1122 checksum_bytes,
1123 canonical_length_value,
1124 ) = if let Some(source_id) = entry.reuse_payload_from {
1125 if !entry.payload.is_empty() {
1126 return Err(MemvidError::InvalidFrame {
1127 frame_id: source_id,
1128 reason: "reused payload entry contained inline bytes",
1129 });
1130 }
1131 let source = self.toc.frames.get(source_id as usize).cloned().ok_or(
1132 MemvidError::InvalidFrame {
1133 frame_id: source_id,
1134 reason: "reused payload source missing",
1135 },
1136 )?;
1137 (
1138 source.payload_offset,
1139 source.payload_length,
1140 source.checksum,
1141 entry
1142 .canonical_length
1143 .or(source.canonical_length)
1144 .unwrap_or(source.payload_length),
1145 )
1146 } else {
1147 self.file.seek(SeekFrom::Start(data_cursor))?;
1148 self.file.write_all(&entry.payload)?;
1149 let checksum = hash(&entry.payload);
1150 let payload_length = entry.payload.len() as u64;
1151 let canonical_length =
1152 if entry.canonical_encoding == CanonicalEncoding::Zstd {
1153 match entry.canonical_length {
1154 Some(len) => len,
1155 None => {
1156 let decoded = crate::decode_canonical_bytes(
1157 &entry.payload,
1158 CanonicalEncoding::Zstd,
1159 frame_id,
1160 )?;
1161 decoded.len() as u64
1162 }
1163 }
1164 } else {
1165 entry.canonical_length.unwrap_or(entry.payload.len() as u64)
1166 };
1167 let payload_offset = data_cursor;
1168 data_cursor += payload_length;
1169 (
1170 payload_offset,
1171 payload_length,
1172 *checksum.as_bytes(),
1173 canonical_length,
1174 )
1175 };
1176
1177 let uri = entry
1178 .uri
1179 .clone()
1180 .unwrap_or_else(|| crate::default_uri(frame_id));
1181 let title = entry
1182 .title
1183 .clone()
1184 .or_else(|| crate::infer_title_from_uri(&uri));
1185
1186 #[cfg(feature = "temporal_track")]
1187 let (anchor_ts, anchor_source) =
1188 self.determine_temporal_anchor(entry.timestamp);
1189
1190 let mut frame = Frame {
1191 id: frame_id,
1192 timestamp: entry.timestamp,
1193 anchor_ts: {
1194 #[cfg(feature = "temporal_track")]
1195 {
1196 Some(anchor_ts)
1197 }
1198 #[cfg(not(feature = "temporal_track"))]
1199 {
1200 None
1201 }
1202 },
1203 anchor_source: {
1204 #[cfg(feature = "temporal_track")]
1205 {
1206 Some(anchor_source)
1207 }
1208 #[cfg(not(feature = "temporal_track"))]
1209 {
1210 None
1211 }
1212 },
1213 kind: entry.kind.clone(),
1214 track: entry.track.clone(),
1215 payload_offset,
1216 payload_length,
1217 checksum: checksum_bytes,
1218 uri: Some(uri),
1219 title,
1220 canonical_encoding: entry.canonical_encoding,
1221 canonical_length: Some(canonical_length_value),
1222 metadata: entry.metadata.clone(),
1223 search_text: entry.search_text.clone(),
1224 tags: entry.tags.clone(),
1225 labels: entry.labels.clone(),
1226 extra_metadata: entry.extra_metadata.clone(),
1227 content_dates: entry.content_dates.clone(),
1228 chunk_manifest: entry.chunk_manifest.clone(),
1229 role: entry.role,
1230 parent_id: None,
1231 chunk_index: entry.chunk_index,
1232 chunk_count: entry.chunk_count,
1233 status: FrameStatus::Active,
1234 supersedes: entry.supersedes_frame_id,
1235 superseded_by: None,
1236 source_sha256: entry.source_sha256,
1237 source_path: entry.source_path.clone(),
1238 enrichment_state: entry.enrichment_state,
1239 };
1240
1241 if let Some(parent_seq) = entry.parent_sequence {
1242 if let Some(parent_frame_id) = sequence_to_frame.get(&parent_seq) {
1243 frame.parent_id = Some(*parent_frame_id);
1244 } else {
1245 if entry.role == FrameRole::DocumentChunk {
1251 for &candidate_id in delta.inserted_frames.iter().rev() {
1253 if let Some(candidate) =
1254 self.toc.frames.get(candidate_id as usize)
1255 {
1256 if candidate.role == FrameRole::Document
1257 && candidate.chunk_manifest.is_some()
1258 {
1259 frame.parent_id = Some(candidate_id);
1261 tracing::debug!(
1262 chunk_frame_id = frame_id,
1263 parent_frame_id = candidate_id,
1264 parent_seq = parent_seq,
1265 "resolved chunk parent via fallback"
1266 );
1267 break;
1268 }
1269 }
1270 }
1271 }
1272 if frame.parent_id.is_none() {
1273 tracing::warn!(
1274 chunk_frame_id = frame_id,
1275 parent_seq = parent_seq,
1276 "chunk has parent_sequence but parent not found in batch"
1277 );
1278 }
1279 }
1280 }
1281
1282 #[cfg(feature = "lex")]
1283 let index_text = if self.tantivy.is_some() {
1284 if let Some(text) = entry.search_text.clone() {
1285 if text.trim().is_empty() {
1286 None
1287 } else {
1288 Some(text)
1289 }
1290 } else {
1291 Some(self.frame_content(&frame)?)
1292 }
1293 } else {
1294 None
1295 };
1296 #[cfg(feature = "lex")]
1297 if let (Some(engine), Some(ref text)) =
1298 (self.tantivy.as_mut(), index_text.as_ref())
1299 {
1300 engine.add_frame(&frame, text)?;
1301 self.tantivy_dirty = true;
1302
1303 if !text.trim().is_empty() {
1306 let entry = crate::types::generate_sketch(
1307 frame_id,
1308 text,
1309 crate::types::SketchVariant::Small,
1310 None,
1311 );
1312 self.sketch_track.insert(entry);
1313 }
1314 }
1315
1316 if let Some(embedding) = entry.embedding.take() {
1317 delta
1318 .inserted_embeddings
1319 .push((frame_id, embedding.clone()));
1320 }
1321
1322 if entry.role == FrameRole::Document {
1323 delta
1324 .inserted_time_entries
1325 .push(TimeIndexEntry::new(entry.timestamp, frame_id));
1326 #[cfg(feature = "temporal_track")]
1327 {
1328 delta.inserted_temporal_anchors.push(TemporalAnchor::new(
1329 frame_id,
1330 anchor_ts,
1331 anchor_source,
1332 ));
1333 delta.inserted_temporal_mentions.extend(
1334 Self::collect_temporal_mentions(
1335 entry.search_text.as_deref(),
1336 frame_id,
1337 anchor_ts,
1338 ),
1339 );
1340 }
1341 }
1342
1343 if let Some(predecessor) = frame.supersedes {
1344 self.mark_frame_superseded(predecessor, frame_id)?;
1345 }
1346
1347 self.toc.frames.push(frame);
1348 delta.inserted_frames.push(frame_id);
1349 sequence_to_frame.insert(record.sequence, frame_id);
1350 }
1351 FrameWalOp::Tombstone => {
1352 let target = entry.target_frame_id.ok_or(MemvidError::InvalidFrame {
1353 frame_id: 0,
1354 reason: "tombstone missing frame reference",
1355 })?;
1356 self.mark_frame_deleted(target)?;
1357 delta.mutated_frames = true;
1358 }
1359 }
1360 }
1361 self.data_end = self.data_end.max(data_cursor);
1362 }
1363
1364 let orphan_resolutions: Vec<(u64, u64)> = delta
1368 .inserted_frames
1369 .iter()
1370 .filter_map(|&frame_id| {
1371 let frame = self.toc.frames.get(frame_id as usize)?;
1372 if frame.role != FrameRole::DocumentChunk || frame.parent_id.is_some() {
1373 return None;
1374 }
1375 for candidate_id in (0..frame_id).rev() {
1377 if let Some(candidate) = self.toc.frames.get(candidate_id as usize) {
1378 if candidate.role == FrameRole::Document
1379 && candidate.chunk_manifest.is_some()
1380 && candidate.status == FrameStatus::Active
1381 {
1382 return Some((frame_id, candidate_id));
1383 }
1384 }
1385 }
1386 None
1387 })
1388 .collect();
1389
1390 for (chunk_id, parent_id) in orphan_resolutions {
1392 if let Some(frame) = self.toc.frames.get_mut(chunk_id as usize) {
1393 frame.parent_id = Some(parent_id);
1394 tracing::debug!(
1395 chunk_frame_id = chunk_id,
1396 parent_frame_id = parent_id,
1397 "resolved orphan chunk parent in second pass"
1398 );
1399 }
1400 }
1401
1402 Ok(delta)
1405 }
1406
1407 #[cfg(feature = "temporal_track")]
1408 fn determine_temporal_anchor(&self, timestamp: i64) -> (i64, AnchorSource) {
1409 (timestamp, AnchorSource::FrameTimestamp)
1410 }
1411
1412 #[cfg(feature = "temporal_track")]
1413 fn collect_temporal_mentions(
1414 text: Option<&str>,
1415 frame_id: FrameId,
1416 anchor_ts: i64,
1417 ) -> Vec<TemporalMention> {
1418 let text = match text {
1419 Some(value) if !value.trim().is_empty() => value,
1420 _ => return Vec::new(),
1421 };
1422
1423 let anchor = match OffsetDateTime::from_unix_timestamp(anchor_ts) {
1424 Ok(ts) => ts,
1425 Err(_) => return Vec::new(),
1426 };
1427
1428 let context = TemporalContext::new(anchor, DEFAULT_TEMPORAL_TZ.to_string());
1429 let normalizer = TemporalNormalizer::new(context);
1430 let mut spans: Vec<(usize, usize)> = Vec::new();
1431 let lower = text.to_ascii_lowercase();
1432
1433 for phrase in STATIC_TEMPORAL_PHRASES {
1434 let mut search_start = 0usize;
1435 while let Some(idx) = lower[search_start..].find(phrase) {
1436 let abs = search_start + idx;
1437 let end = abs + phrase.len();
1438 spans.push((abs, end));
1439 search_start = end;
1440 }
1441 }
1442
1443 static NUMERIC_DATE: OnceCell<std::result::Result<Regex, String>> = OnceCell::new();
1444 let regex = NUMERIC_DATE.get_or_init(|| {
1445 Regex::new(r"\b\d{1,2}/\d{1,2}/\d{2,4}\b").map_err(|err| err.to_string())
1446 });
1447 let regex = match regex {
1448 Ok(re) => re,
1449 Err(msg) => {
1450 tracing::error!(target = "memvid::temporal", error = %msg, "numeric date regex init failed");
1451 return Vec::new();
1452 }
1453 };
1454 for mat in regex.find_iter(text) {
1455 spans.push((mat.start(), mat.end()));
1456 }
1457
1458 spans.sort_unstable();
1459 spans.dedup();
1460
1461 let mut mentions: Vec<TemporalMention> = Vec::new();
1462 for (start, end) in spans {
1463 if end > text.len() || start >= end {
1464 continue;
1465 }
1466 let raw = &text[start..end];
1467 let trimmed = raw.trim_matches(|c: char| matches!(c, '"' | '\'' | '.' | ',' | ';'));
1468 if trimmed.is_empty() {
1469 continue;
1470 }
1471 let offset = raw.find(trimmed).map(|idx| start + idx).unwrap_or(start);
1472 let finish = offset + trimmed.len();
1473 match normalizer.resolve(trimmed) {
1474 Ok(resolution) => {
1475 mentions.extend(Self::resolution_to_mentions(
1476 resolution, frame_id, offset, finish,
1477 ));
1478 }
1479 Err(_) => continue,
1480 }
1481 }
1482
1483 mentions
1484 }
1485
1486 #[cfg(feature = "temporal_track")]
1487 fn resolution_to_mentions(
1488 resolution: TemporalResolution,
1489 frame_id: FrameId,
1490 byte_start: usize,
1491 byte_end: usize,
1492 ) -> Vec<TemporalMention> {
1493 let byte_len = byte_end.saturating_sub(byte_start) as u32;
1494 let byte_start = byte_start.min(u32::MAX as usize) as u32;
1495 let mut results = Vec::new();
1496
1497 let base_flags = Self::flags_from_resolution(&resolution.flags);
1498 match resolution.value {
1499 TemporalResolutionValue::Date(date) => {
1500 let ts = Self::date_to_timestamp(date);
1501 results.push(TemporalMention::new(
1502 ts,
1503 frame_id,
1504 byte_start,
1505 byte_len,
1506 TemporalMentionKind::Date,
1507 resolution.confidence,
1508 0,
1509 base_flags,
1510 ));
1511 }
1512 TemporalResolutionValue::DateTime(dt) => {
1513 let ts = dt.unix_timestamp();
1514 let tz_hint = dt.offset().whole_minutes() as i16;
1515 results.push(TemporalMention::new(
1516 ts,
1517 frame_id,
1518 byte_start,
1519 byte_len,
1520 TemporalMentionKind::DateTime,
1521 resolution.confidence,
1522 tz_hint,
1523 base_flags,
1524 ));
1525 }
1526 TemporalResolutionValue::DateRange { start, end } => {
1527 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1528 let start_ts = Self::date_to_timestamp(start);
1529 results.push(TemporalMention::new(
1530 start_ts,
1531 frame_id,
1532 byte_start,
1533 byte_len,
1534 TemporalMentionKind::RangeStart,
1535 resolution.confidence,
1536 0,
1537 flags,
1538 ));
1539 let end_ts = Self::date_to_timestamp(end);
1540 results.push(TemporalMention::new(
1541 end_ts,
1542 frame_id,
1543 byte_start,
1544 byte_len,
1545 TemporalMentionKind::RangeEnd,
1546 resolution.confidence,
1547 0,
1548 flags,
1549 ));
1550 }
1551 TemporalResolutionValue::DateTimeRange { start, end } => {
1552 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1553 results.push(TemporalMention::new(
1554 start.unix_timestamp(),
1555 frame_id,
1556 byte_start,
1557 byte_len,
1558 TemporalMentionKind::RangeStart,
1559 resolution.confidence,
1560 start.offset().whole_minutes() as i16,
1561 flags,
1562 ));
1563 results.push(TemporalMention::new(
1564 end.unix_timestamp(),
1565 frame_id,
1566 byte_start,
1567 byte_len,
1568 TemporalMentionKind::RangeEnd,
1569 resolution.confidence,
1570 end.offset().whole_minutes() as i16,
1571 flags,
1572 ));
1573 }
1574 TemporalResolutionValue::Month { year, month } => {
1575 let start_date = match Date::from_calendar_date(year, month, 1) {
1576 Ok(date) => date,
1577 Err(err) => {
1578 tracing::warn!(
1579 target = "memvid::temporal",
1580 %err,
1581 year,
1582 month = month as u8,
1583 "skipping invalid month resolution"
1584 );
1585 return results;
1587 }
1588 };
1589 let end_date = match Self::last_day_in_month(year, month) {
1590 Some(date) => date,
1591 None => {
1592 tracing::warn!(
1593 target = "memvid::temporal",
1594 year,
1595 month = month as u8,
1596 "skipping month resolution with invalid calendar range"
1597 );
1598 return results;
1599 }
1600 };
1601 let flags = base_flags.set(TemporalMentionFlags::HAS_RANGE, true);
1602 results.push(TemporalMention::new(
1603 Self::date_to_timestamp(start_date),
1604 frame_id,
1605 byte_start,
1606 byte_len,
1607 TemporalMentionKind::RangeStart,
1608 resolution.confidence,
1609 0,
1610 flags,
1611 ));
1612 results.push(TemporalMention::new(
1613 Self::date_to_timestamp(end_date),
1614 frame_id,
1615 byte_start,
1616 byte_len,
1617 TemporalMentionKind::RangeEnd,
1618 resolution.confidence,
1619 0,
1620 flags,
1621 ));
1622 }
1623 }
1624
1625 results
1626 }
1627
1628 #[cfg(feature = "temporal_track")]
1629 fn flags_from_resolution(flags: &[TemporalResolutionFlag]) -> TemporalMentionFlags {
1630 let mut result = TemporalMentionFlags::empty();
1631 if flags
1632 .iter()
1633 .any(|flag| matches!(flag, TemporalResolutionFlag::Ambiguous))
1634 {
1635 result = result.set(TemporalMentionFlags::AMBIGUOUS, true);
1636 }
1637 if flags
1638 .iter()
1639 .any(|flag| matches!(flag, TemporalResolutionFlag::Relative))
1640 {
1641 result = result.set(TemporalMentionFlags::DERIVED, true);
1642 }
1643 result
1644 }
1645
1646 #[cfg(feature = "temporal_track")]
1647 fn date_to_timestamp(date: Date) -> i64 {
1648 PrimitiveDateTime::new(date, Time::MIDNIGHT)
1649 .assume_offset(UtcOffset::UTC)
1650 .unix_timestamp()
1651 }
1652
1653 #[cfg(feature = "temporal_track")]
1654 fn last_day_in_month(year: i32, month: Month) -> Option<Date> {
1655 let mut date = Date::from_calendar_date(year, month, 1).ok()?;
1656 while let Some(next) = date.next_day() {
1657 if next.month() == month {
1658 date = next;
1659 } else {
1660 break;
1661 }
1662 }
1663 Some(date)
1664 }
1665
1666 #[allow(dead_code)]
1667 fn publish_lex_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1668 if delta.inserted_frames.is_empty() || !self.lex_enabled {
1669 return Ok(false);
1670 }
1671
1672 let artifact = match self.build_lex_segment_from_frames(&delta.inserted_frames)? {
1673 Some(artifact) => artifact,
1674 None => return Ok(false),
1675 };
1676
1677 let segment_id = self.toc.segment_catalog.next_segment_id;
1678 #[cfg(feature = "parallel_segments")]
1679 let span =
1680 self.segment_span_from_iter(delta.inserted_frames.iter().map(|frame_id| *frame_id));
1681
1682 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1683 let mut descriptor = self.append_lex_segment(&artifact, segment_id)?;
1684 #[cfg(feature = "parallel_segments")]
1685 if let Some(span) = span {
1686 Self::decorate_segment_common(&mut descriptor.common, span);
1687 }
1688 #[cfg(feature = "parallel_segments")]
1689 let descriptor_for_manifest = descriptor.clone();
1690 self.toc.segment_catalog.lex_segments.push(descriptor);
1691 #[cfg(feature = "parallel_segments")]
1692 if let Err(err) = self.record_index_segment(
1693 SegmentKind::Lexical,
1694 descriptor_for_manifest.common,
1695 SegmentStats {
1696 doc_count: artifact.doc_count,
1697 vector_count: 0,
1698 time_entries: 0,
1699 bytes_uncompressed: artifact.bytes.len() as u64,
1700 build_micros: 0,
1701 },
1702 ) {
1703 tracing::warn!(error = %err, "manifest WAL append failed for lex segment");
1704 }
1705 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1706 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1707 Ok(true)
1708 }
1709
1710 #[allow(dead_code)]
1711 fn publish_vec_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1712 if delta.inserted_embeddings.is_empty() || !self.vec_enabled {
1713 return Ok(false);
1714 }
1715
1716 let artifact = match self.build_vec_segment_from_embeddings(&delta.inserted_embeddings)? {
1717 Some(artifact) => artifact,
1718 None => return Ok(false),
1719 };
1720
1721 if let Some(existing_dim) = self.effective_vec_index_dimension()? {
1722 if existing_dim != artifact.dimension {
1723 return Err(MemvidError::VecDimensionMismatch {
1724 expected: existing_dim,
1725 actual: artifact.dimension as usize,
1726 });
1727 }
1728 }
1729
1730 let segment_id = self.toc.segment_catalog.next_segment_id;
1731 #[cfg(feature = "parallel_segments")]
1732 #[cfg(feature = "parallel_segments")]
1733 let span = self.segment_span_from_iter(delta.inserted_embeddings.iter().map(|(id, _)| *id));
1734
1735 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1736 let mut descriptor = self.append_vec_segment(&artifact, segment_id)?;
1737 #[cfg(feature = "parallel_segments")]
1738 if let Some(span) = span {
1739 Self::decorate_segment_common(&mut descriptor.common, span);
1740 }
1741 #[cfg(feature = "parallel_segments")]
1742 let descriptor_for_manifest = descriptor.clone();
1743 self.toc.segment_catalog.vec_segments.push(descriptor);
1744 #[cfg(feature = "parallel_segments")]
1745 if let Err(err) = self.record_index_segment(
1746 SegmentKind::Vector,
1747 descriptor_for_manifest.common,
1748 SegmentStats {
1749 doc_count: 0,
1750 vector_count: artifact.vector_count,
1751 time_entries: 0,
1752 bytes_uncompressed: artifact.bytes_uncompressed,
1753 build_micros: 0,
1754 },
1755 ) {
1756 tracing::warn!(error = %err, "manifest WAL append failed for vec segment");
1757 }
1758 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1759 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1760
1761 if self.toc.indexes.vec.is_none() {
1763 let empty_offset = self.data_end;
1764 let empty_checksum = *b"\xe3\xb0\xc4\x42\x98\xfc\x1c\x14\x9a\xfb\xf4\xc8\x99\x6f\xb9\x24\
1765 \x27\xae\x41\xe4\x64\x9b\x93\x4c\xa4\x95\x99\x1b\x78\x52\xb8\x55";
1766 self.toc.indexes.vec = Some(VecIndexManifest {
1767 vector_count: 0,
1768 dimension: 0,
1769 bytes_offset: empty_offset,
1770 bytes_length: 0,
1771 checksum: empty_checksum,
1772 compression_mode: self.vec_compression.clone(),
1773 });
1774 }
1775 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
1776 if manifest.dimension == 0 {
1777 manifest.dimension = artifact.dimension;
1778 }
1779 if manifest.bytes_length == 0 {
1780 manifest.vector_count = manifest.vector_count.saturating_add(artifact.vector_count);
1781 manifest.compression_mode = artifact.compression.clone();
1782 }
1783 }
1784
1785 self.vec_enabled = true;
1786 Ok(true)
1787 }
1788
1789 #[allow(dead_code)]
1790 fn publish_time_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1791 if delta.inserted_time_entries.is_empty() {
1792 return Ok(false);
1793 }
1794
1795 let artifact = match self.build_time_segment_from_entries(&delta.inserted_time_entries)? {
1796 Some(artifact) => artifact,
1797 None => return Ok(false),
1798 };
1799
1800 let segment_id = self.toc.segment_catalog.next_segment_id;
1801 #[cfg(feature = "parallel_segments")]
1802 #[cfg(feature = "parallel_segments")]
1803 let span = self.segment_span_from_iter(
1804 delta
1805 .inserted_time_entries
1806 .iter()
1807 .map(|entry| entry.frame_id),
1808 );
1809
1810 #[cfg_attr(not(feature = "parallel_segments"), allow(unused_mut))]
1811 let mut descriptor = self.append_time_segment(&artifact, segment_id)?;
1812 #[cfg(feature = "parallel_segments")]
1813 if let Some(span) = span {
1814 Self::decorate_segment_common(&mut descriptor.common, span);
1815 }
1816 #[cfg(feature = "parallel_segments")]
1817 let descriptor_for_manifest = descriptor.clone();
1818 self.toc.segment_catalog.time_segments.push(descriptor);
1819 #[cfg(feature = "parallel_segments")]
1820 if let Err(err) = self.record_index_segment(
1821 SegmentKind::Time,
1822 descriptor_for_manifest.common,
1823 SegmentStats {
1824 doc_count: 0,
1825 vector_count: 0,
1826 time_entries: artifact.entry_count,
1827 bytes_uncompressed: artifact.bytes.len() as u64,
1828 build_micros: 0,
1829 },
1830 ) {
1831 tracing::warn!(error = %err, "manifest WAL append failed for time segment");
1832 }
1833 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1834 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1835 Ok(true)
1836 }
1837
1838 #[cfg(feature = "temporal_track")]
1839 #[allow(dead_code)]
1840 fn publish_temporal_delta(&mut self, delta: &IngestionDelta) -> Result<bool> {
1841 if delta.inserted_temporal_mentions.is_empty() && delta.inserted_temporal_anchors.is_empty()
1842 {
1843 return Ok(false);
1844 }
1845
1846 debug_assert!(
1847 delta.inserted_temporal_mentions.len() < 1_000_000,
1848 "temporal delta mentions unexpectedly large: {}",
1849 delta.inserted_temporal_mentions.len()
1850 );
1851 debug_assert!(
1852 delta.inserted_temporal_anchors.len() < 1_000_000,
1853 "temporal delta anchors unexpectedly large: {}",
1854 delta.inserted_temporal_anchors.len()
1855 );
1856
1857 let artifact = match self.build_temporal_segment_from_records(
1858 &delta.inserted_temporal_mentions,
1859 &delta.inserted_temporal_anchors,
1860 )? {
1861 Some(artifact) => artifact,
1862 None => return Ok(false),
1863 };
1864
1865 let segment_id = self.toc.segment_catalog.next_segment_id;
1866 let descriptor = self.append_temporal_segment(&artifact, segment_id)?;
1867 self.toc
1868 .segment_catalog
1869 .temporal_segments
1870 .push(descriptor.clone());
1871 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
1872 self.toc.segment_catalog.next_segment_id = segment_id.saturating_add(1);
1873
1874 self.toc.temporal_track = Some(TemporalTrackManifest {
1875 bytes_offset: descriptor.common.bytes_offset,
1876 bytes_length: descriptor.common.bytes_length,
1877 entry_count: artifact.entry_count,
1878 anchor_count: artifact.anchor_count,
1879 checksum: artifact.checksum,
1880 flags: artifact.flags,
1881 });
1882
1883 self.clear_temporal_track_cache();
1884
1885 Ok(true)
1886 }
1887
1888 fn mark_frame_superseded(&mut self, frame_id: FrameId, successor_id: FrameId) -> Result<()> {
1889 let frame =
1890 self.toc
1891 .frames
1892 .get_mut(frame_id as usize)
1893 .ok_or(MemvidError::InvalidFrame {
1894 frame_id,
1895 reason: "supersede target missing",
1896 })?;
1897 frame.status = FrameStatus::Superseded;
1898 frame.superseded_by = Some(successor_id);
1899 self.remove_frame_from_indexes(frame_id)
1900 }
1901
1902 pub(crate) fn rebuild_indexes(&mut self, new_vec_docs: &[(FrameId, Vec<f32>)]) -> Result<()> {
1903 if self.toc.frames.is_empty() && !self.lex_enabled && !self.vec_enabled {
1904 return Ok(());
1905 }
1906
1907 let payload_end = self.payload_region_end();
1908 self.data_end = payload_end;
1909 let safe_truncate_len = self.header.footer_offset.max(payload_end);
1912 if self.file.metadata()?.len() > safe_truncate_len {
1913 self.file.set_len(safe_truncate_len)?;
1914 }
1915 self.file.seek(SeekFrom::Start(payload_end))?;
1916
1917 self.toc.segment_catalog.lex_segments.clear();
1919 self.toc.segment_catalog.vec_segments.clear();
1920 self.toc.segment_catalog.time_segments.clear();
1921 #[cfg(feature = "temporal_track")]
1922 self.toc.segment_catalog.temporal_segments.clear();
1923 #[cfg(feature = "parallel_segments")]
1924 self.toc.segment_catalog.index_segments.clear();
1925 self.toc.segment_catalog.tantivy_segments.clear();
1927 self.toc.indexes.lex_segments.clear();
1929
1930 let mut time_entries: Vec<TimeIndexEntry> = self
1931 .toc
1932 .frames
1933 .iter()
1934 .filter(|frame| {
1935 frame.status == FrameStatus::Active && frame.role == FrameRole::Document
1936 })
1937 .map(|frame| TimeIndexEntry::new(frame.timestamp, frame.id))
1938 .collect();
1939 let (ti_offset, ti_length, ti_checksum) =
1940 time_index_append(&mut self.file, &mut time_entries)?;
1941 self.toc.time_index = Some(TimeIndexManifest {
1942 bytes_offset: ti_offset,
1943 bytes_length: ti_length,
1944 entry_count: time_entries.len() as u64,
1945 checksum: ti_checksum,
1946 });
1947
1948 let mut footer_offset = ti_offset + ti_length;
1949
1950 #[cfg(feature = "temporal_track")]
1951 {
1952 self.toc.temporal_track = None;
1953 self.toc.segment_catalog.temporal_segments.clear();
1954 self.clear_temporal_track_cache();
1955 }
1956
1957 if self.lex_enabled {
1958 #[cfg(feature = "lex")]
1959 {
1960 if let Ok(mut storage) = self.lex_storage.write() {
1962 storage.clear();
1963 storage.set_generation(0);
1964 }
1965
1966 self.init_tantivy()?;
1968
1969 if let Some(mut engine) = self.tantivy.take() {
1970 self.rebuild_tantivy_engine(&mut engine)?;
1971 self.tantivy = Some(engine);
1972 } else {
1973 return Err(MemvidError::InvalidToc {
1974 reason: "tantivy engine missing during doctor rebuild".into(),
1975 });
1976 }
1977
1978 self.lex_enabled = true;
1980
1981 self.tantivy_dirty = true;
1983
1984 self.data_end = footer_offset;
1986
1987 self.flush_tantivy()?;
1989
1990 footer_offset = self.header.footer_offset;
1992
1993 self.data_end = payload_end;
1995 }
1996 #[cfg(not(feature = "lex"))]
1997 {
1998 self.toc.indexes.lex = None;
1999 self.toc.indexes.lex_segments.clear();
2000 }
2001 } else {
2002 self.toc.indexes.lex = None;
2004 self.toc.indexes.lex_segments.clear();
2005 #[cfg(feature = "lex")]
2006 if let Ok(mut storage) = self.lex_storage.write() {
2007 storage.clear();
2008 }
2009 }
2010
2011 if let Some((artifact, index)) = self.build_vec_artifact(new_vec_docs)? {
2012 let vec_offset = footer_offset;
2013 self.file.seek(SeekFrom::Start(vec_offset))?;
2014 self.file.write_all(&artifact.bytes)?;
2015 footer_offset += artifact.bytes.len() as u64;
2016 self.toc.indexes.vec = Some(VecIndexManifest {
2017 vector_count: artifact.vector_count,
2018 dimension: artifact.dimension,
2019 bytes_offset: vec_offset,
2020 bytes_length: artifact.bytes.len() as u64,
2021 checksum: artifact.checksum,
2022 compression_mode: self.vec_compression.clone(),
2023 });
2024 self.vec_index = Some(index);
2025 } else {
2026 if !self.vec_enabled {
2028 self.toc.indexes.vec = None;
2029 }
2030 self.vec_index = None;
2031 }
2032
2033 if self.clip_enabled {
2035 if let Some(ref clip_index) = self.clip_index {
2036 if !clip_index.is_empty() {
2037 let artifact = clip_index.encode()?;
2038 let clip_offset = footer_offset;
2039 self.file.seek(SeekFrom::Start(clip_offset))?;
2040 self.file.write_all(&artifact.bytes)?;
2041 footer_offset += artifact.bytes.len() as u64;
2042 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2043 bytes_offset: clip_offset,
2044 bytes_length: artifact.bytes.len() as u64,
2045 vector_count: artifact.vector_count,
2046 dimension: artifact.dimension,
2047 checksum: artifact.checksum,
2048 model_name: crate::clip::default_model_info().name.to_string(),
2049 });
2050 tracing::info!(
2051 "rebuild_indexes: persisted CLIP index with {} vectors at offset {}",
2052 artifact.vector_count,
2053 clip_offset
2054 );
2055 }
2056 }
2057 } else {
2058 self.toc.indexes.clip = None;
2059 }
2060
2061 if self.memories_track.card_count() > 0 {
2063 let memories_offset = footer_offset;
2064 let memories_bytes = self.memories_track.serialize()?;
2065 let memories_checksum = blake3::hash(&memories_bytes).into();
2066 self.file.seek(SeekFrom::Start(memories_offset))?;
2067 self.file.write_all(&memories_bytes)?;
2068 footer_offset += memories_bytes.len() as u64;
2069
2070 let stats = self.memories_track.stats();
2071 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2072 bytes_offset: memories_offset,
2073 bytes_length: memories_bytes.len() as u64,
2074 card_count: stats.card_count as u64,
2075 entity_count: stats.entity_count as u64,
2076 checksum: memories_checksum,
2077 });
2078 } else {
2079 self.toc.memories_track = None;
2080 }
2081
2082 if !self.logic_mesh.is_empty() {
2084 let mesh_offset = footer_offset;
2085 let mesh_bytes = self.logic_mesh.serialize()?;
2086 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2087 self.file.seek(SeekFrom::Start(mesh_offset))?;
2088 self.file.write_all(&mesh_bytes)?;
2089 footer_offset += mesh_bytes.len() as u64;
2090
2091 let stats = self.logic_mesh.stats();
2092 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2093 bytes_offset: mesh_offset,
2094 bytes_length: mesh_bytes.len() as u64,
2095 node_count: stats.node_count as u64,
2096 edge_count: stats.edge_count as u64,
2097 checksum: mesh_checksum,
2098 });
2099 } else {
2100 self.toc.logic_mesh = None;
2101 }
2102
2103 tracing::info!(
2105 "rebuild_indexes: ti_offset={} ti_length={} computed_footer={} current_footer={} (before setting)",
2106 ti_offset,
2107 ti_length,
2108 footer_offset,
2109 self.header.footer_offset
2110 );
2111
2112 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2115
2116 if self.file.metadata()?.len() < self.header.footer_offset {
2118 self.file.set_len(self.header.footer_offset)?;
2119 }
2120
2121 self.rewrite_toc_footer()?;
2122 self.header.toc_checksum = self.toc.toc_checksum;
2123 crate::persist_header(&mut self.file, &self.header)?;
2124
2125 #[cfg(feature = "lex")]
2126 if self.lex_enabled {
2127 if let Some(ref engine) = self.tantivy {
2128 let doc_count = engine.num_docs();
2129 let active_frame_count = self
2130 .toc
2131 .frames
2132 .iter()
2133 .filter(|f| f.status == FrameStatus::Active)
2134 .count();
2135
2136 let text_indexable_count = self
2139 .toc
2140 .frames
2141 .iter()
2142 .filter(|f| crate::memvid::search::is_frame_text_indexable(f))
2143 .count();
2144
2145 if doc_count == 0 && text_indexable_count > 0 {
2148 return Err(MemvidError::Doctor {
2149 reason: format!(
2150 "Lex index rebuild failed: 0 documents indexed from {} text-indexable frames. \
2151 This indicates a critical failure in the rebuild process.",
2152 text_indexable_count
2153 ),
2154 });
2155 }
2156
2157 log::info!(
2159 "✓ Doctor lex index rebuild succeeded: {} docs from {} frames ({} text-indexable)",
2160 doc_count,
2161 active_frame_count,
2162 text_indexable_count
2163 );
2164 }
2165 }
2166
2167 Ok(())
2168 }
2169
2170 fn persist_memories_track(&mut self) -> Result<()> {
2175 if self.memories_track.card_count() == 0 {
2176 self.toc.memories_track = None;
2177 return Ok(());
2178 }
2179
2180 let memories_offset = self.header.footer_offset;
2182 let memories_bytes = self.memories_track.serialize()?;
2183 let memories_checksum: [u8; 32] = blake3::hash(&memories_bytes).into();
2184
2185 self.file.seek(SeekFrom::Start(memories_offset))?;
2186 self.file.write_all(&memories_bytes)?;
2187
2188 let stats = self.memories_track.stats();
2189 self.toc.memories_track = Some(crate::types::MemoriesTrackManifest {
2190 bytes_offset: memories_offset,
2191 bytes_length: memories_bytes.len() as u64,
2192 card_count: stats.card_count as u64,
2193 entity_count: stats.entity_count as u64,
2194 checksum: memories_checksum,
2195 });
2196
2197 self.header.footer_offset = memories_offset + memories_bytes.len() as u64;
2199
2200 if self.file.metadata()?.len() < self.header.footer_offset {
2202 self.file.set_len(self.header.footer_offset)?;
2203 }
2204
2205 Ok(())
2206 }
2207
2208 fn persist_clip_index(&mut self) -> Result<()> {
2213 if !self.clip_enabled {
2214 self.toc.indexes.clip = None;
2215 return Ok(());
2216 }
2217
2218 let clip_index = match &self.clip_index {
2219 Some(idx) if !idx.is_empty() => idx,
2220 _ => {
2221 self.toc.indexes.clip = None;
2222 return Ok(());
2223 }
2224 };
2225
2226 let artifact = clip_index.encode()?;
2228
2229 let clip_offset = self.header.footer_offset;
2231 self.file.seek(SeekFrom::Start(clip_offset))?;
2232 self.file.write_all(&artifact.bytes)?;
2233
2234 self.toc.indexes.clip = Some(crate::clip::ClipIndexManifest {
2235 bytes_offset: clip_offset,
2236 bytes_length: artifact.bytes.len() as u64,
2237 vector_count: artifact.vector_count,
2238 dimension: artifact.dimension,
2239 checksum: artifact.checksum,
2240 model_name: crate::clip::default_model_info().name.to_string(),
2241 });
2242
2243 tracing::info!(
2244 "persist_clip_index: persisted CLIP index with {} vectors at offset {}",
2245 artifact.vector_count,
2246 clip_offset
2247 );
2248
2249 self.header.footer_offset = clip_offset + artifact.bytes.len() as u64;
2251
2252 if self.file.metadata()?.len() < self.header.footer_offset {
2254 self.file.set_len(self.header.footer_offset)?;
2255 }
2256
2257 Ok(())
2258 }
2259
2260 fn persist_logic_mesh(&mut self) -> Result<()> {
2265 if self.logic_mesh.is_empty() {
2266 self.toc.logic_mesh = None;
2267 return Ok(());
2268 }
2269
2270 let mesh_offset = self.header.footer_offset;
2272 let mesh_bytes = self.logic_mesh.serialize()?;
2273 let mesh_checksum: [u8; 32] = blake3::hash(&mesh_bytes).into();
2274
2275 self.file.seek(SeekFrom::Start(mesh_offset))?;
2276 self.file.write_all(&mesh_bytes)?;
2277
2278 let stats = self.logic_mesh.stats();
2279 self.toc.logic_mesh = Some(crate::types::LogicMeshManifest {
2280 bytes_offset: mesh_offset,
2281 bytes_length: mesh_bytes.len() as u64,
2282 node_count: stats.node_count as u64,
2283 edge_count: stats.edge_count as u64,
2284 checksum: mesh_checksum,
2285 });
2286
2287 self.header.footer_offset = mesh_offset + mesh_bytes.len() as u64;
2289
2290 if self.file.metadata()?.len() < self.header.footer_offset {
2292 self.file.set_len(self.header.footer_offset)?;
2293 }
2294
2295 Ok(())
2296 }
2297
2298 fn persist_sketch_track(&mut self) -> Result<()> {
2303 if self.sketch_track.is_empty() {
2304 self.toc.sketch_track = None;
2305 return Ok(());
2306 }
2307
2308 self.file.seek(SeekFrom::Start(self.header.footer_offset))?;
2310
2311 let (sketch_offset, sketch_length, sketch_checksum) =
2313 crate::types::write_sketch_track(&mut self.file, &self.sketch_track)?;
2314
2315 let stats = self.sketch_track.stats();
2316 self.toc.sketch_track = Some(crate::types::SketchTrackManifest {
2317 bytes_offset: sketch_offset,
2318 bytes_length: sketch_length,
2319 entry_count: stats.entry_count,
2320 entry_size: stats.variant.entry_size() as u16,
2321 flags: 0,
2322 checksum: sketch_checksum,
2323 });
2324
2325 self.header.footer_offset = sketch_offset + sketch_length;
2327
2328 if self.file.metadata()?.len() < self.header.footer_offset {
2330 self.file.set_len(self.header.footer_offset)?;
2331 }
2332
2333 tracing::debug!(
2334 "persist_sketch_track: persisted sketch track with {} entries at offset {}",
2335 stats.entry_count,
2336 sketch_offset
2337 );
2338
2339 Ok(())
2340 }
2341
2342 #[cfg(feature = "lex")]
2343 fn apply_lex_wal(&mut self, batch: LexWalBatch) -> Result<()> {
2344 let LexWalBatch {
2345 generation,
2346 doc_count,
2347 checksum,
2348 segments,
2349 } = batch;
2350
2351 if let Some(mut storage) = self.lex_storage.write().ok() {
2352 storage.replace(doc_count, checksum, segments);
2353 storage.set_generation(generation);
2354 }
2355
2356 let result = self.persist_lex_manifest();
2357 result
2358 }
2359
2360 #[cfg(feature = "lex")]
2361 fn append_lex_batch(&mut self, batch: &LexWalBatch) -> Result<()> {
2362 let payload = encode_to_vec(&WalEntry::Lex(batch.clone()), wal_config())?;
2363 self.append_wal_entry(&payload)?;
2364 Ok(())
2365 }
2366
2367 #[cfg(feature = "lex")]
2368 fn persist_lex_manifest(&mut self) -> Result<()> {
2369 let (index_manifest, segments) = if let Some(storage) = self.lex_storage.read().ok() {
2370 storage.to_manifest()
2371 } else {
2372 (None, Vec::new())
2373 };
2374
2375 if let Some(storage_manifest) = index_manifest {
2377 self.toc.indexes.lex = Some(storage_manifest);
2379 } else {
2380 self.toc.indexes.lex = None;
2383 }
2384
2385 self.toc.indexes.lex_segments = segments;
2386
2387 self.rewrite_toc_footer()?;
2391 self.header.toc_checksum = self.toc.toc_checksum;
2392 crate::persist_header(&mut self.file, &self.header)?;
2393 Ok(())
2394 }
2395
2396 #[cfg(feature = "lex")]
2397 pub(crate) fn update_embedded_lex_snapshot(&mut self, snapshot: TantivySnapshot) -> Result<()> {
2398 let TantivySnapshot {
2399 doc_count,
2400 checksum,
2401 segments,
2402 } = snapshot;
2403
2404 let mut footer_offset = self.data_end;
2405 self.file.seek(SeekFrom::Start(footer_offset))?;
2406
2407 let mut embedded_segments: Vec<EmbeddedLexSegment> = Vec::with_capacity(segments.len());
2408 for segment in segments {
2409 let bytes_length = segment.bytes.len() as u64;
2410 self.file.write_all(&segment.bytes)?;
2411 self.file.flush()?; embedded_segments.push(EmbeddedLexSegment {
2413 path: segment.path,
2414 bytes_offset: footer_offset,
2415 bytes_length,
2416 checksum: segment.checksum,
2417 });
2418 footer_offset += bytes_length;
2419 }
2420 self.header.footer_offset = self.header.footer_offset.max(footer_offset);
2425
2426 let mut next_segment_id = self.toc.segment_catalog.next_segment_id;
2427 let mut catalog_segments: Vec<TantivySegmentDescriptor> =
2428 Vec::with_capacity(embedded_segments.len());
2429 for segment in &embedded_segments {
2430 let descriptor = TantivySegmentDescriptor::from_common(
2431 SegmentCommon::new(
2432 next_segment_id,
2433 segment.bytes_offset,
2434 segment.bytes_length,
2435 segment.checksum,
2436 ),
2437 segment.path.clone(),
2438 );
2439 catalog_segments.push(descriptor);
2440 next_segment_id = next_segment_id.saturating_add(1);
2441 }
2442 if catalog_segments.is_empty() {
2443 self.toc.segment_catalog.tantivy_segments.clear();
2444 } else {
2445 self.toc.segment_catalog.tantivy_segments = catalog_segments;
2446 self.toc.segment_catalog.version = self.toc.segment_catalog.version.max(1);
2447 }
2448 self.toc.segment_catalog.next_segment_id = next_segment_id;
2449
2450 let generation = self
2457 .lex_storage
2458 .write()
2459 .map_err(|_| MemvidError::Tantivy {
2460 reason: "embedded lex storage lock poisoned".into(),
2461 })
2462 .map(|mut storage| {
2463 storage.replace(doc_count, checksum, embedded_segments.clone());
2464 storage.generation()
2465 })?;
2466
2467 let batch = LexWalBatch {
2468 generation,
2469 doc_count,
2470 checksum,
2471 segments: embedded_segments.clone(),
2472 };
2473 self.append_lex_batch(&batch)?;
2474 self.persist_lex_manifest()?;
2475 self.header.toc_checksum = self.toc.toc_checksum;
2476 crate::persist_header(&mut self.file, &self.header)?;
2477 Ok(())
2478 }
2479
2480 fn mark_frame_deleted(&mut self, frame_id: FrameId) -> Result<()> {
2481 let frame =
2482 self.toc
2483 .frames
2484 .get_mut(frame_id as usize)
2485 .ok_or(MemvidError::InvalidFrame {
2486 frame_id,
2487 reason: "delete target missing",
2488 })?;
2489 frame.status = FrameStatus::Deleted;
2490 frame.superseded_by = None;
2491 self.remove_frame_from_indexes(frame_id)
2492 }
2493
2494 fn remove_frame_from_indexes(&mut self, frame_id: FrameId) -> Result<()> {
2495 #[cfg(feature = "lex")]
2496 if let Some(engine) = self.tantivy.as_mut() {
2497 engine.delete_frame(frame_id)?;
2498 self.tantivy_dirty = true;
2499 }
2500 if let Some(index) = self.lex_index.as_mut() {
2501 index.remove_document(frame_id);
2502 }
2503 if let Some(index) = self.vec_index.as_mut() {
2504 index.remove(frame_id);
2505 }
2506 Ok(())
2507 }
2508
2509 pub(crate) fn frame_is_active(&self, frame_id: FrameId) -> bool {
2510 self.toc
2511 .frames
2512 .get(frame_id as usize)
2513 .map_or(false, |frame| frame.status == FrameStatus::Active)
2514 }
2515
2516 #[cfg(feature = "parallel_segments")]
2517 fn segment_span_from_iter<I>(&self, iter: I) -> Option<SegmentSpan>
2518 where
2519 I: IntoIterator<Item = FrameId>,
2520 {
2521 let mut iter = iter.into_iter();
2522 let first_id = iter.next()?;
2523 let first_frame = self.toc.frames.get(first_id as usize);
2524 let mut min_id = first_id;
2525 let mut max_id = first_id;
2526 let mut page_start = first_frame.and_then(|frame| frame.chunk_index).unwrap_or(0);
2527 let mut page_end = first_frame
2528 .and_then(|frame| frame.chunk_count)
2529 .map(|count| page_start + count.saturating_sub(1))
2530 .unwrap_or(page_start);
2531 for frame_id in iter {
2532 if frame_id < min_id {
2533 min_id = frame_id;
2534 }
2535 if frame_id > max_id {
2536 max_id = frame_id;
2537 }
2538 if let Some(frame) = self.toc.frames.get(frame_id as usize) {
2539 if let Some(idx) = frame.chunk_index {
2540 page_start = page_start.min(idx);
2541 if let Some(count) = frame.chunk_count {
2542 let end = idx + count.saturating_sub(1);
2543 page_end = page_end.max(end);
2544 } else {
2545 page_end = page_end.max(idx);
2546 }
2547 }
2548 }
2549 }
2550 Some(SegmentSpan {
2551 frame_start: min_id,
2552 frame_end: max_id,
2553 page_start,
2554 page_end,
2555 ..SegmentSpan::default()
2556 })
2557 }
2558
2559 #[cfg(feature = "parallel_segments")]
2560 pub(crate) fn decorate_segment_common(common: &mut SegmentCommon, span: SegmentSpan) {
2561 common.span = Some(span);
2562 if common.codec_version == 0 {
2563 common.codec_version = 1;
2564 }
2565 }
2566
2567 #[cfg(feature = "parallel_segments")]
2568 pub(crate) fn record_index_segment(
2569 &mut self,
2570 kind: SegmentKind,
2571 common: SegmentCommon,
2572 stats: SegmentStats,
2573 ) -> Result<()> {
2574 let entry = IndexSegmentRef {
2575 kind,
2576 common,
2577 stats,
2578 };
2579 self.toc.segment_catalog.index_segments.push(entry.clone());
2580 if let Some(wal) = self.manifest_wal.as_mut() {
2581 wal.append_segments(&[entry])?;
2582 }
2583 Ok(())
2584 }
2585
2586 fn ensure_mutation_allowed(&mut self) -> Result<()> {
2587 self.ensure_writable()?;
2588 if self.toc.ticket_ref.issuer == "free-tier" {
2589 return Ok(());
2590 }
2591 match self.tier() {
2592 Tier::Free => Ok(()),
2593 tier => {
2594 if self.toc.ticket_ref.issuer.trim().is_empty() {
2595 Err(MemvidError::TicketRequired { tier })
2596 } else {
2597 Ok(())
2598 }
2599 }
2600 }
2601 }
2602
2603 pub(crate) fn tier(&self) -> Tier {
2604 if self.header.wal_size >= WAL_SIZE_LARGE {
2605 Tier::Enterprise
2606 } else if self.header.wal_size >= WAL_SIZE_MEDIUM {
2607 Tier::Dev
2608 } else {
2609 Tier::Free
2610 }
2611 }
2612
2613 pub(crate) fn capacity_limit(&self) -> u64 {
2614 if self.toc.ticket_ref.capacity_bytes != 0 {
2615 self.toc.ticket_ref.capacity_bytes
2616 } else {
2617 self.tier().capacity_bytes()
2618 }
2619 }
2620
2621 pub fn get_capacity(&self) -> u64 {
2626 self.capacity_limit()
2627 }
2628
2629 pub(crate) fn rewrite_toc_footer(&mut self) -> Result<()> {
2630 tracing::info!(
2631 vec_segments = self.toc.segment_catalog.vec_segments.len(),
2632 lex_segments = self.toc.segment_catalog.lex_segments.len(),
2633 time_segments = self.toc.segment_catalog.time_segments.len(),
2634 footer_offset = self.header.footer_offset,
2635 data_end = self.data_end,
2636 "rewrite_toc_footer: about to serialize TOC"
2637 );
2638 let toc_bytes = prepare_toc_bytes(&mut self.toc)?;
2639 let footer_offset = self.header.footer_offset;
2640 self.file.seek(SeekFrom::Start(footer_offset))?;
2641 self.file.write_all(&toc_bytes)?;
2642 let footer = CommitFooter {
2643 toc_len: toc_bytes.len() as u64,
2644 toc_hash: *hash(&toc_bytes).as_bytes(),
2645 generation: self.generation,
2646 };
2647 let encoded_footer = footer.encode();
2648 self.file.write_all(&encoded_footer)?;
2649
2650 let new_len = footer_offset + toc_bytes.len() as u64 + encoded_footer.len() as u64;
2652 let min_len = self.header.wal_offset + self.header.wal_size;
2653 let final_len = new_len.max(min_len);
2654
2655 if new_len < min_len {
2656 tracing::warn!(
2657 file.new_len = new_len,
2658 file.min_len = min_len,
2659 file.final_len = final_len,
2660 "truncation would cut into WAL region, clamping to min_len"
2661 );
2662 }
2663
2664 self.file.set_len(final_len)?;
2665 self.file.sync_all()?;
2667 Ok(())
2668 }
2669}
2670
2671#[cfg(feature = "parallel_segments")]
2672impl Memvid {
2673 fn publish_parallel_delta(&mut self, delta: &IngestionDelta, opts: &BuildOpts) -> Result<bool> {
2674 let chunks = self.collect_segment_chunks(delta)?;
2675 if chunks.is_empty() {
2676 return Ok(false);
2677 }
2678 let planner = SegmentPlanner::new(opts.clone());
2679 let plans = planner.plan_from_chunks(chunks);
2680 if plans.is_empty() {
2681 return Ok(false);
2682 }
2683 let worker_pool = SegmentWorkerPool::new(opts);
2684 let results = worker_pool.execute(plans)?;
2685 if results.is_empty() {
2686 return Ok(false);
2687 }
2688 self.append_parallel_segments(results)?;
2689 Ok(true)
2690 }
2691
2692 fn collect_segment_chunks(&mut self, delta: &IngestionDelta) -> Result<Vec<SegmentChunkPlan>> {
2693 let mut embedding_map: HashMap<FrameId, Vec<f32>> =
2694 delta.inserted_embeddings.iter().cloned().collect();
2695 tracing::info!(
2696 inserted_frames = ?delta.inserted_frames,
2697 embedding_keys = ?embedding_map.keys().collect::<Vec<_>>(),
2698 "collect_segment_chunks: comparing frame IDs"
2699 );
2700 let mut chunks = Vec::with_capacity(delta.inserted_frames.len());
2701 for frame_id in &delta.inserted_frames {
2702 let frame = self.toc.frames.get(*frame_id as usize).cloned().ok_or(
2703 MemvidError::InvalidFrame {
2704 frame_id: *frame_id,
2705 reason: "frame id out of range while planning segments",
2706 },
2707 )?;
2708 let text = self.frame_content(&frame)?;
2709 if text.trim().is_empty() {
2710 continue;
2711 }
2712 let token_estimate = estimate_tokens(&text);
2713 let chunk_index = frame.chunk_index.unwrap_or(0) as usize;
2714 let chunk_count = frame.chunk_count.unwrap_or(1) as usize;
2715 let page_start = if frame.chunk_index.is_some() {
2716 chunk_index + 1
2717 } else {
2718 0
2719 };
2720 let page_end = if frame.chunk_index.is_some() {
2721 page_start
2722 } else {
2723 0
2724 };
2725 chunks.push(SegmentChunkPlan {
2726 text,
2727 frame_id: *frame_id,
2728 timestamp: frame.timestamp,
2729 chunk_index,
2730 chunk_count: chunk_count.max(1),
2731 token_estimate,
2732 token_start: 0,
2733 token_end: 0,
2734 page_start,
2735 page_end,
2736 embedding: embedding_map.remove(frame_id),
2737 });
2738 }
2739 Ok(chunks)
2740 }
2741}
2742
2743#[cfg(feature = "parallel_segments")]
2744fn estimate_tokens(text: &str) -> usize {
2745 text.split_whitespace().count().max(1)
2746}
2747
2748impl Memvid {
2749 pub(crate) fn align_footer_with_catalog(&mut self) -> Result<bool> {
2750 let catalog_end = self.catalog_data_end();
2751 if catalog_end <= self.header.footer_offset {
2752 return Ok(false);
2753 }
2754 self.header.footer_offset = catalog_end;
2755 self.rewrite_toc_footer()?;
2756 self.header.toc_checksum = self.toc.toc_checksum;
2757 crate::persist_header(&mut self.file, &self.header)?;
2758 Ok(true)
2759 }
2760}
2761
2762impl Memvid {
2763 pub fn vacuum(&mut self) -> Result<()> {
2764 self.commit()?;
2765
2766 let mut active_payloads: HashMap<FrameId, Vec<u8>> = HashMap::new();
2767 let frames: Vec<Frame> = self
2768 .toc
2769 .frames
2770 .iter()
2771 .filter(|frame| frame.status == FrameStatus::Active)
2772 .cloned()
2773 .collect();
2774 for frame in frames {
2775 let bytes = self.read_frame_payload_bytes(&frame)?;
2776 active_payloads.insert(frame.id, bytes);
2777 }
2778
2779 let mut cursor = self.header.wal_offset + self.header.wal_size;
2780 self.file.seek(SeekFrom::Start(cursor))?;
2781 for frame in &mut self.toc.frames {
2782 if frame.status == FrameStatus::Active {
2783 if let Some(bytes) = active_payloads.get(&frame.id) {
2784 self.file.write_all(bytes)?;
2785 frame.payload_offset = cursor;
2786 frame.payload_length = bytes.len() as u64;
2787 cursor += bytes.len() as u64;
2788 } else {
2789 frame.payload_offset = 0;
2790 frame.payload_length = 0;
2791 }
2792 } else {
2793 frame.payload_offset = 0;
2794 frame.payload_length = 0;
2795 }
2796 }
2797
2798 self.data_end = cursor;
2799
2800 self.toc.segments.clear();
2801 self.toc.indexes.lex_segments.clear();
2802 self.toc.segment_catalog.lex_segments.clear();
2803 self.toc.segment_catalog.vec_segments.clear();
2804 self.toc.segment_catalog.time_segments.clear();
2805 #[cfg(feature = "temporal_track")]
2806 {
2807 self.toc.temporal_track = None;
2808 self.toc.segment_catalog.temporal_segments.clear();
2809 }
2810 #[cfg(feature = "lex")]
2811 {
2812 self.toc.segment_catalog.tantivy_segments.clear();
2813 }
2814 #[cfg(feature = "parallel_segments")]
2815 {
2816 self.toc.segment_catalog.index_segments.clear();
2817 }
2818
2819 #[cfg(feature = "lex")]
2821 {
2822 self.tantivy = None;
2823 self.tantivy_dirty = false;
2824 }
2825
2826 self.rebuild_indexes(&[])?;
2827 self.file.sync_all()?;
2828 Ok(())
2829 }
2830
2831 pub fn preview_chunks(&self, payload: &[u8]) -> Option<Vec<String>> {
2849 plan_document_chunks(payload).map(|plan| plan.chunks)
2850 }
2851
2852 pub fn put_bytes(&mut self, payload: &[u8]) -> Result<u64> {
2854 self.put_internal(Some(payload), None, None, None, PutOptions::default(), None)
2855 }
2856
2857 pub fn put_bytes_with_options(&mut self, payload: &[u8], options: PutOptions) -> Result<u64> {
2859 self.put_internal(Some(payload), None, None, None, options, None)
2860 }
2861
2862 pub fn put_with_embedding(&mut self, payload: &[u8], embedding: Vec<f32>) -> Result<u64> {
2864 self.put_internal(
2865 Some(payload),
2866 None,
2867 Some(embedding),
2868 None,
2869 PutOptions::default(),
2870 None,
2871 )
2872 }
2873
2874 pub fn put_with_embedding_and_options(
2875 &mut self,
2876 payload: &[u8],
2877 embedding: Vec<f32>,
2878 options: PutOptions,
2879 ) -> Result<u64> {
2880 self.put_internal(Some(payload), None, Some(embedding), None, options, None)
2881 }
2882
2883 pub fn put_with_chunk_embeddings(
2896 &mut self,
2897 payload: &[u8],
2898 parent_embedding: Option<Vec<f32>>,
2899 chunk_embeddings: Vec<Vec<f32>>,
2900 options: PutOptions,
2901 ) -> Result<u64> {
2902 self.put_internal(
2903 Some(payload),
2904 None,
2905 parent_embedding,
2906 Some(chunk_embeddings),
2907 options,
2908 None,
2909 )
2910 }
2911
2912 pub fn update_frame(
2914 &mut self,
2915 frame_id: FrameId,
2916 payload: Option<Vec<u8>>,
2917 mut options: PutOptions,
2918 embedding: Option<Vec<f32>>,
2919 ) -> Result<u64> {
2920 self.ensure_mutation_allowed()?;
2921 let existing = self.frame_by_id(frame_id)?;
2922 if existing.status != FrameStatus::Active {
2923 return Err(MemvidError::InvalidFrame {
2924 frame_id,
2925 reason: "frame is not active",
2926 });
2927 }
2928
2929 if options.timestamp.is_none() {
2930 options.timestamp = Some(existing.timestamp);
2931 }
2932 if options.track.is_none() {
2933 options.track = existing.track.clone();
2934 }
2935 if options.kind.is_none() {
2936 options.kind = existing.kind.clone();
2937 }
2938 if options.uri.is_none() {
2939 options.uri = existing.uri.clone();
2940 }
2941 if options.title.is_none() {
2942 options.title = existing.title.clone();
2943 }
2944 if options.metadata.is_none() {
2945 options.metadata = existing.metadata.clone();
2946 }
2947 if options.search_text.is_none() {
2948 options.search_text = existing.search_text.clone();
2949 }
2950 if options.tags.is_empty() {
2951 options.tags = existing.tags.clone();
2952 }
2953 if options.labels.is_empty() {
2954 options.labels = existing.labels.clone();
2955 }
2956 if options.extra_metadata.is_empty() {
2957 options.extra_metadata = existing.extra_metadata.clone();
2958 }
2959
2960 let reuse_frame = if payload.is_none() {
2961 options.auto_tag = false;
2962 options.extract_dates = false;
2963 Some(existing.clone())
2964 } else {
2965 None
2966 };
2967
2968 let effective_embedding = if let Some(explicit) = embedding {
2969 Some(explicit)
2970 } else if self.vec_enabled {
2971 self.frame_embedding(frame_id)?
2972 } else {
2973 None
2974 };
2975
2976 let payload_slice = payload.as_deref();
2977 let reuse_flag = reuse_frame.is_some();
2978 let replace_flag = payload_slice.is_some();
2979 let seq = self.put_internal(
2980 payload_slice,
2981 reuse_frame,
2982 effective_embedding,
2983 None, options,
2985 Some(frame_id),
2986 )?;
2987 info!(
2988 "frame_update frame_id={} seq={} reused_payload={} replaced_payload={}",
2989 frame_id, seq, reuse_flag, replace_flag
2990 );
2991 Ok(seq)
2992 }
2993
2994 pub fn delete_frame(&mut self, frame_id: FrameId) -> Result<u64> {
2995 self.ensure_mutation_allowed()?;
2996 let frame = self.frame_by_id(frame_id)?;
2997 if frame.status != FrameStatus::Active {
2998 return Err(MemvidError::InvalidFrame {
2999 frame_id,
3000 reason: "frame is not active",
3001 });
3002 }
3003
3004 let mut tombstone = WalEntryData {
3005 timestamp: SystemTime::now()
3006 .duration_since(UNIX_EPOCH)
3007 .map(|d| d.as_secs() as i64)
3008 .unwrap_or(frame.timestamp),
3009 kind: None,
3010 track: None,
3011 payload: Vec::new(),
3012 embedding: None,
3013 uri: frame.uri.clone(),
3014 title: frame.title.clone(),
3015 canonical_encoding: frame.canonical_encoding,
3016 canonical_length: frame.canonical_length,
3017 metadata: None,
3018 search_text: None,
3019 tags: Vec::new(),
3020 labels: Vec::new(),
3021 extra_metadata: BTreeMap::new(),
3022 content_dates: Vec::new(),
3023 chunk_manifest: None,
3024 role: frame.role,
3025 parent_sequence: None,
3026 chunk_index: frame.chunk_index,
3027 chunk_count: frame.chunk_count,
3028 op: FrameWalOp::Tombstone,
3029 target_frame_id: Some(frame_id),
3030 supersedes_frame_id: None,
3031 reuse_payload_from: None,
3032 source_sha256: None,
3033 source_path: None,
3034 enrichment_state: crate::types::EnrichmentState::default(),
3035 };
3036 tombstone.kind = frame.kind.clone();
3037 tombstone.track = frame.track.clone();
3038
3039 let payload_bytes = encode_to_vec(&WalEntry::Frame(tombstone), wal_config())?;
3040 let seq = self.append_wal_entry(&payload_bytes)?;
3041 self.dirty = true;
3042 if self.wal.should_checkpoint() {
3043 self.commit()?;
3044 }
3045 info!("frame_delete frame_id={} seq={}", frame_id, seq);
3046 Ok(seq)
3047 }
3048}
3049
3050impl Memvid {
3051 fn put_internal(
3052 &mut self,
3053 payload: Option<&[u8]>,
3054 reuse_frame: Option<Frame>,
3055 embedding: Option<Vec<f32>>,
3056 chunk_embeddings: Option<Vec<Vec<f32>>>,
3057 mut options: PutOptions,
3058 supersedes: Option<FrameId>,
3059 ) -> Result<u64> {
3060 self.ensure_mutation_allowed()?;
3061
3062 if options.dedup {
3064 if let Some(bytes) = payload {
3065 let content_hash = hash(bytes);
3066 if let Some(existing_frame) = self.find_frame_by_hash(content_hash.as_bytes()) {
3067 tracing::debug!(
3069 frame_id = existing_frame.id,
3070 "dedup: skipping ingestion, identical content already exists"
3071 );
3072 return Ok(existing_frame.id);
3074 }
3075 }
3076 }
3077
3078 if payload.is_some() && reuse_frame.is_some() {
3079 let frame_id = reuse_frame
3080 .as_ref()
3081 .map(|frame| frame.id)
3082 .unwrap_or_default();
3083 return Err(MemvidError::InvalidFrame {
3084 frame_id,
3085 reason: "cannot reuse payload when bytes are provided",
3086 });
3087 }
3088
3089 let incoming_dimension = {
3092 let mut dim: Option<u32> = None;
3093
3094 if let Some(ref vector) = embedding {
3095 if !vector.is_empty() {
3096 dim = Some(vector.len() as u32);
3097 }
3098 }
3099
3100 if let Some(ref vectors) = chunk_embeddings {
3101 for vector in vectors {
3102 if vector.is_empty() {
3103 continue;
3104 }
3105 let vec_dim = vector.len() as u32;
3106 match dim {
3107 None => dim = Some(vec_dim),
3108 Some(existing) if existing == vec_dim => {}
3109 Some(existing) => {
3110 return Err(MemvidError::VecDimensionMismatch {
3111 expected: existing,
3112 actual: vector.len(),
3113 });
3114 }
3115 }
3116 }
3117 }
3118
3119 dim
3120 };
3121
3122 if let Some(incoming_dimension) = incoming_dimension {
3123 if !self.vec_enabled {
3125 self.enable_vec()?;
3126 }
3127
3128 if let Some(existing_dimension) = self.effective_vec_index_dimension()? {
3129 if existing_dimension != incoming_dimension {
3130 return Err(MemvidError::VecDimensionMismatch {
3131 expected: existing_dimension,
3132 actual: incoming_dimension as usize,
3133 });
3134 }
3135 }
3136
3137 if let Some(manifest) = self.toc.indexes.vec.as_mut() {
3139 if manifest.dimension == 0 {
3140 manifest.dimension = incoming_dimension;
3141 }
3142 }
3143 }
3144
3145 let mut prepared_payload: Option<(Vec<u8>, CanonicalEncoding, Option<u64>)> = None;
3146 let payload_tail = self.payload_region_end();
3147 let projected = if let Some(bytes) = payload {
3148 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3149 let len = prepared.len();
3150 prepared_payload = Some((prepared, encoding, length));
3151 payload_tail.saturating_add(len as u64)
3152 } else if reuse_frame.is_some() {
3153 payload_tail
3154 } else {
3155 return Err(MemvidError::InvalidFrame {
3156 frame_id: 0,
3157 reason: "payload required for frame insertion",
3158 });
3159 };
3160
3161 let capacity_limit = self.capacity_limit();
3162 if projected > capacity_limit {
3163 let incoming_size = projected.saturating_sub(payload_tail);
3164 return Err(MemvidError::CapacityExceeded {
3165 current: payload_tail,
3166 limit: capacity_limit,
3167 required: incoming_size,
3168 });
3169 }
3170 let timestamp = options.timestamp.take().unwrap_or_else(|| {
3171 SystemTime::now()
3172 .duration_since(UNIX_EPOCH)
3173 .map(|d| d.as_secs() as i64)
3174 .unwrap_or(0)
3175 });
3176
3177 let mut _reuse_bytes: Option<Vec<u8>> = None;
3178 let payload_for_processing = if let Some(bytes) = payload {
3179 Some(bytes)
3180 } else if let Some(frame) = reuse_frame.as_ref() {
3181 let bytes = self.frame_canonical_bytes(frame)?;
3182 _reuse_bytes = Some(bytes);
3183 _reuse_bytes.as_deref()
3184 } else {
3185 None
3186 };
3187
3188 let raw_chunk_plan = match (payload, reuse_frame.as_ref()) {
3190 (Some(bytes), None) => plan_document_chunks(bytes),
3191 _ => None,
3192 };
3193
3194 let mut source_sha256: Option<[u8; 32]> = None;
3198 let source_path_value = options.source_path.take();
3199
3200 let (storage_payload, canonical_encoding, canonical_length, reuse_payload_from) =
3201 if raw_chunk_plan.is_some() {
3202 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3204 } else if options.no_raw {
3205 if let Some(bytes) = payload {
3207 let hash_result = hash(bytes);
3209 source_sha256 = Some(*hash_result.as_bytes());
3210 (Vec::new(), CanonicalEncoding::Plain, Some(0), None)
3212 } else {
3213 return Err(MemvidError::InvalidFrame {
3214 frame_id: 0,
3215 reason: "payload required for --no-raw mode",
3216 });
3217 }
3218 } else if let Some((prepared, encoding, length)) = prepared_payload.take() {
3219 (prepared, encoding, length, None)
3220 } else if let Some(bytes) = payload {
3221 let (prepared, encoding, length) = prepare_canonical_payload(bytes)?;
3222 (prepared, encoding, length, None)
3223 } else if let Some(frame) = reuse_frame.as_ref() {
3224 (
3225 Vec::new(),
3226 frame.canonical_encoding,
3227 frame.canonical_length,
3228 Some(frame.id),
3229 )
3230 } else {
3231 return Err(MemvidError::InvalidFrame {
3232 frame_id: 0,
3233 reason: "payload required for frame insertion",
3234 });
3235 };
3236
3237 let mut chunk_plan = raw_chunk_plan;
3239
3240 let mut metadata = options.metadata.take();
3241 let mut search_text = options
3242 .search_text
3243 .take()
3244 .and_then(|text| normalize_text(&text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text));
3245 let mut tags = std::mem::take(&mut options.tags);
3246 let mut labels = std::mem::take(&mut options.labels);
3247 let mut extra_metadata = std::mem::take(&mut options.extra_metadata);
3248 let mut content_dates: Vec<String> = Vec::new();
3249
3250 let need_search_text = search_text
3251 .as_ref()
3252 .map_or(true, |text| text.trim().is_empty());
3253 let need_metadata = metadata.is_none();
3254 let run_extractor = need_search_text || need_metadata || options.auto_tag;
3255
3256 let mut extraction_error = None;
3257 let mut is_skim_extraction = false; let extracted = if run_extractor {
3260 if let Some(bytes) = payload_for_processing {
3261 let mime_hint = metadata.as_ref().and_then(|m| m.mime.as_deref());
3262 let uri_hint = options.uri.as_deref();
3263
3264 let use_budgeted = options.instant_index && options.extraction_budget_ms > 0;
3266
3267 if use_budgeted {
3268 let budget = crate::extract_budgeted::ExtractionBudget::with_ms(
3270 options.extraction_budget_ms,
3271 );
3272 match crate::extract_budgeted::extract_with_budget(
3273 bytes, mime_hint, uri_hint, budget,
3274 ) {
3275 Ok(result) => {
3276 is_skim_extraction = result.is_skim();
3277 if is_skim_extraction {
3278 tracing::debug!(
3279 coverage = result.coverage,
3280 elapsed_ms = result.elapsed_ms,
3281 sections = %format!("{}/{}", result.sections_extracted, result.sections_total),
3282 "time-budgeted extraction (skim)"
3283 );
3284 }
3285 let doc = crate::extract::ExtractedDocument {
3287 text: if result.text.is_empty() {
3288 None
3289 } else {
3290 Some(result.text)
3291 },
3292 metadata: serde_json::json!({
3293 "skim": is_skim_extraction,
3294 "coverage": result.coverage,
3295 "sections_extracted": result.sections_extracted,
3296 "sections_total": result.sections_total,
3297 }),
3298 mime_type: mime_hint.map(|s| s.to_string()),
3299 };
3300 Some(doc)
3301 }
3302 Err(err) => {
3303 tracing::warn!(
3305 ?err,
3306 "budgeted extraction failed, trying full extraction"
3307 );
3308 match extract_via_registry(bytes, mime_hint, uri_hint) {
3309 Ok(doc) => Some(doc),
3310 Err(err) => {
3311 extraction_error = Some(err);
3312 None
3313 }
3314 }
3315 }
3316 }
3317 } else {
3318 match extract_via_registry(bytes, mime_hint, uri_hint) {
3320 Ok(doc) => Some(doc),
3321 Err(err) => {
3322 extraction_error = Some(err);
3323 None
3324 }
3325 }
3326 }
3327 } else {
3328 None
3329 }
3330 } else {
3331 None
3332 };
3333
3334 if let Some(err) = extraction_error {
3335 return Err(err);
3336 }
3337
3338 if let Some(doc) = &extracted {
3339 if need_search_text {
3340 if let Some(text) = &doc.text {
3341 if let Some(normalized) =
3342 normalize_text(text, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3343 {
3344 search_text = Some(normalized);
3345 }
3346 }
3347 }
3348
3349 if chunk_plan.is_none() {
3352 if let Some(text) = &doc.text {
3353 chunk_plan = plan_text_chunks(text);
3354 }
3355 }
3356
3357 if let Some(mime) = doc.mime_type.as_ref() {
3358 match &mut metadata {
3359 Some(existing) => {
3360 if existing.mime.is_none() {
3361 existing.mime = Some(mime.clone());
3362 }
3363 }
3364 None => {
3365 let mut doc_meta = DocMetadata::default();
3366 doc_meta.mime = Some(mime.clone());
3367 metadata = Some(doc_meta);
3368 }
3369 }
3370 }
3371
3372 if let Some(meta_json) = (!doc.metadata.is_null()).then(|| doc.metadata.to_string()) {
3373 extra_metadata
3374 .entry("extractous_metadata".to_string())
3375 .or_insert(meta_json);
3376 }
3377 }
3378
3379 if options.auto_tag {
3380 if let Some(ref text) = search_text {
3381 if !text.trim().is_empty() {
3382 let result = AutoTagger::default().analyse(text, options.extract_dates);
3383 merge_unique(&mut tags, result.tags);
3384 merge_unique(&mut labels, result.labels);
3385 if options.extract_dates && content_dates.is_empty() {
3386 content_dates = result.content_dates;
3387 }
3388 }
3389 }
3390 }
3391
3392 if content_dates.is_empty() {
3393 if let Some(frame) = reuse_frame.as_ref() {
3394 content_dates = frame.content_dates.clone();
3395 }
3396 }
3397
3398 let metadata_ref = metadata.as_ref();
3399 let mut search_text = augment_search_text(
3400 search_text,
3401 options.uri.as_deref(),
3402 options.title.as_deref(),
3403 options.track.as_deref(),
3404 &tags,
3405 &labels,
3406 &extra_metadata,
3407 &content_dates,
3408 metadata_ref,
3409 );
3410 let mut chunk_entries: Vec<WalEntryData> = Vec::new();
3411 let mut parent_chunk_manifest: Option<TextChunkManifest> = None;
3412 let mut parent_chunk_count: Option<u32> = None;
3413
3414 let kind_value = options.kind.take();
3415 let track_value = options.track.take();
3416 let uri_value = options.uri.take();
3417 let title_value = options.title.take();
3418 let should_extract_triplets = options.extract_triplets;
3419 let triplet_uri = uri_value.clone();
3421 let triplet_title = title_value.clone();
3422
3423 if let Some(plan) = chunk_plan.as_ref() {
3424 let chunk_total = plan.chunks.len() as u32;
3425 parent_chunk_manifest = Some(plan.manifest.clone());
3426 parent_chunk_count = Some(chunk_total);
3427
3428 if let Some(first_chunk) = plan.chunks.first() {
3429 if let Some(normalized) =
3430 normalize_text(first_chunk, DEFAULT_SEARCH_TEXT_LIMIT).map(|n| n.text)
3431 {
3432 if !normalized.trim().is_empty() {
3433 search_text = Some(normalized);
3434 }
3435 }
3436 }
3437
3438 let chunk_tags = tags.clone();
3439 let chunk_labels = labels.clone();
3440 let chunk_metadata = metadata.clone();
3441 let chunk_extra_metadata = extra_metadata.clone();
3442 let chunk_content_dates = content_dates.clone();
3443
3444 for (idx, chunk_text) in plan.chunks.iter().enumerate() {
3445 let (chunk_payload, chunk_encoding, chunk_length) =
3446 prepare_canonical_payload(chunk_text.as_bytes())?;
3447 let chunk_search_text = normalize_text(chunk_text, DEFAULT_SEARCH_TEXT_LIMIT)
3448 .map(|n| n.text)
3449 .filter(|text| !text.trim().is_empty());
3450
3451 let chunk_uri = uri_value
3452 .as_ref()
3453 .map(|uri| format!("{uri}#page-{}", idx + 1));
3454 let chunk_title = title_value
3455 .as_ref()
3456 .map(|title| format!("{title} (page {}/{})", idx + 1, chunk_total));
3457
3458 let chunk_embedding = chunk_embeddings
3460 .as_ref()
3461 .and_then(|embeddings| embeddings.get(idx).cloned());
3462
3463 chunk_entries.push(WalEntryData {
3464 timestamp,
3465 kind: kind_value.clone(),
3466 track: track_value.clone(),
3467 payload: chunk_payload,
3468 embedding: chunk_embedding,
3469 uri: chunk_uri,
3470 title: chunk_title,
3471 canonical_encoding: chunk_encoding,
3472 canonical_length: chunk_length,
3473 metadata: chunk_metadata.clone(),
3474 search_text: chunk_search_text,
3475 tags: chunk_tags.clone(),
3476 labels: chunk_labels.clone(),
3477 extra_metadata: chunk_extra_metadata.clone(),
3478 content_dates: chunk_content_dates.clone(),
3479 chunk_manifest: None,
3480 role: FrameRole::DocumentChunk,
3481 parent_sequence: None,
3482 chunk_index: Some(idx as u32),
3483 chunk_count: Some(chunk_total),
3484 op: FrameWalOp::Insert,
3485 target_frame_id: None,
3486 supersedes_frame_id: None,
3487 reuse_payload_from: None,
3488 source_sha256: None, source_path: None,
3490 enrichment_state: crate::types::EnrichmentState::Enriched,
3492 });
3493 }
3494 }
3495
3496 let parent_uri = uri_value.clone();
3497 let parent_title = title_value.clone();
3498
3499 let parent_sequence = if let Some(parent_id) = options.parent_id {
3502 self.toc
3507 .frames
3508 .get(parent_id as usize)
3509 .map(|_| parent_id + 2) } else {
3511 None
3512 };
3513
3514 let triplet_text = search_text.clone();
3516
3517 #[cfg(feature = "lex")]
3519 let instant_index_tags = if options.instant_index {
3520 tags.clone()
3521 } else {
3522 Vec::new()
3523 };
3524 #[cfg(feature = "lex")]
3525 let instant_index_labels = if options.instant_index {
3526 labels.clone()
3527 } else {
3528 Vec::new()
3529 };
3530
3531 #[cfg(feature = "lex")]
3533 let needs_enrichment =
3534 options.instant_index && (options.enable_embedding || is_skim_extraction);
3535 #[cfg(feature = "lex")]
3536 let enrichment_state = if needs_enrichment {
3537 crate::types::EnrichmentState::Searchable
3538 } else {
3539 crate::types::EnrichmentState::Enriched
3540 };
3541 #[cfg(not(feature = "lex"))]
3542 let enrichment_state = crate::types::EnrichmentState::Enriched;
3543
3544 let entry = WalEntryData {
3545 timestamp,
3546 kind: kind_value,
3547 track: track_value,
3548 payload: storage_payload,
3549 embedding,
3550 uri: parent_uri,
3551 title: parent_title,
3552 canonical_encoding,
3553 canonical_length,
3554 metadata,
3555 search_text,
3556 tags,
3557 labels,
3558 extra_metadata,
3559 content_dates,
3560 chunk_manifest: parent_chunk_manifest,
3561 role: options.role,
3562 parent_sequence,
3563 chunk_index: None,
3564 chunk_count: parent_chunk_count,
3565 op: FrameWalOp::Insert,
3566 target_frame_id: None,
3567 supersedes_frame_id: supersedes,
3568 reuse_payload_from,
3569 source_sha256,
3570 source_path: source_path_value,
3571 enrichment_state,
3572 };
3573
3574 let parent_bytes = encode_to_vec(&WalEntry::Frame(entry), wal_config())?;
3575 let parent_seq = self.append_wal_entry(&parent_bytes)?;
3576 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3577
3578 #[cfg(feature = "lex")]
3581 if options.instant_index {
3582 if self.tantivy.is_some() {
3583 let frame_id = parent_seq as FrameId;
3585
3586 if let Some(ref text) = triplet_text {
3588 if !text.trim().is_empty() {
3589 let temp_frame = Frame {
3591 id: frame_id,
3592 timestamp,
3593 anchor_ts: None,
3594 anchor_source: None,
3595 kind: options.kind.clone(),
3596 track: options.track.clone(),
3597 payload_offset: 0,
3598 payload_length: 0,
3599 checksum: [0u8; 32],
3600 uri: options
3601 .uri
3602 .clone()
3603 .or_else(|| Some(crate::default_uri(frame_id))),
3604 title: options.title.clone(),
3605 canonical_encoding: crate::types::CanonicalEncoding::default(),
3606 canonical_length: None,
3607 metadata: None, search_text: triplet_text.clone(),
3609 tags: instant_index_tags.clone(),
3610 labels: instant_index_labels.clone(),
3611 extra_metadata: std::collections::BTreeMap::new(), content_dates: Vec::new(), chunk_manifest: None,
3614 role: options.role,
3615 parent_id: None,
3616 chunk_index: None,
3617 chunk_count: None,
3618 status: FrameStatus::Active,
3619 supersedes: supersedes,
3620 superseded_by: None,
3621 source_sha256: None, source_path: None, enrichment_state: crate::types::EnrichmentState::Searchable,
3624 };
3625
3626 if let Some(engine) = self.tantivy.as_mut() {
3628 engine.add_frame(&temp_frame, text)?;
3629 engine.soft_commit()?;
3630 self.tantivy_dirty = true;
3631
3632 tracing::debug!(
3633 frame_id = frame_id,
3634 "instant index: frame searchable immediately"
3635 );
3636 }
3637 }
3638 }
3639 }
3640 }
3641
3642 #[cfg(feature = "lex")]
3646 if needs_enrichment {
3647 let frame_id = parent_seq as FrameId;
3648 self.toc.enrichment_queue.push(frame_id);
3649 tracing::debug!(
3650 frame_id = frame_id,
3651 is_skim = is_skim_extraction,
3652 needs_embedding = options.enable_embedding,
3653 "queued frame for background enrichment"
3654 );
3655 }
3656
3657 for mut chunk_entry in chunk_entries {
3658 chunk_entry.parent_sequence = Some(parent_seq);
3659 let chunk_bytes = encode_to_vec(&WalEntry::Frame(chunk_entry), wal_config())?;
3660 self.append_wal_entry(&chunk_bytes)?;
3661 self.pending_frame_inserts = self.pending_frame_inserts.saturating_add(1);
3662 }
3663
3664 self.dirty = true;
3665 if self.wal.should_checkpoint() {
3666 self.commit()?;
3667 }
3668
3669 #[cfg(feature = "replay")]
3671 if let Some(input_bytes) = payload {
3672 self.record_put_action(parent_seq, input_bytes);
3673 }
3674
3675 if should_extract_triplets {
3678 if let Some(ref text) = triplet_text {
3679 if !text.trim().is_empty() {
3680 let extractor = TripletExtractor::default();
3681 let frame_id = parent_seq as FrameId;
3682 let (cards, _stats) = extractor.extract(
3683 frame_id,
3684 text,
3685 triplet_uri.as_deref(),
3686 triplet_title.as_deref(),
3687 timestamp,
3688 );
3689
3690 if !cards.is_empty() {
3691 let card_ids = self.memories_track.add_cards(cards);
3693
3694 self.memories_track
3696 .record_enrichment(frame_id, "rules", "1.0.0", card_ids);
3697 }
3698 }
3699 }
3700 }
3701
3702 Ok(parent_seq)
3703 }
3704}
3705
3706#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
3707#[serde(rename_all = "snake_case")]
3708pub(crate) enum FrameWalOp {
3709 Insert,
3710 Tombstone,
3711}
3712
3713impl Default for FrameWalOp {
3714 fn default() -> Self {
3715 Self::Insert
3716 }
3717}
3718
3719#[derive(Debug, Serialize, Deserialize)]
3720enum WalEntry {
3721 Frame(WalEntryData),
3722 #[cfg(feature = "lex")]
3723 Lex(LexWalBatch),
3724}
3725
3726fn decode_wal_entry(bytes: &[u8]) -> Result<WalEntry> {
3727 if let Ok((entry, _)) = decode_from_slice::<WalEntry, _>(bytes, wal_config()) {
3728 return Ok(entry);
3729 }
3730 let (legacy, _) = decode_from_slice::<WalEntryData, _>(bytes, wal_config())?;
3731 Ok(WalEntry::Frame(legacy))
3732}
3733
3734#[derive(Debug, Serialize, Deserialize)]
3735pub(crate) struct WalEntryData {
3736 pub(crate) timestamp: i64,
3737 pub(crate) kind: Option<String>,
3738 pub(crate) track: Option<String>,
3739 pub(crate) payload: Vec<u8>,
3740 pub(crate) embedding: Option<Vec<f32>>,
3741 #[serde(default)]
3742 pub(crate) uri: Option<String>,
3743 #[serde(default)]
3744 pub(crate) title: Option<String>,
3745 #[serde(default)]
3746 pub(crate) canonical_encoding: CanonicalEncoding,
3747 #[serde(default)]
3748 pub(crate) canonical_length: Option<u64>,
3749 #[serde(default)]
3750 pub(crate) metadata: Option<DocMetadata>,
3751 #[serde(default)]
3752 pub(crate) search_text: Option<String>,
3753 #[serde(default)]
3754 pub(crate) tags: Vec<String>,
3755 #[serde(default)]
3756 pub(crate) labels: Vec<String>,
3757 #[serde(default)]
3758 pub(crate) extra_metadata: BTreeMap<String, String>,
3759 #[serde(default)]
3760 pub(crate) content_dates: Vec<String>,
3761 #[serde(default)]
3762 pub(crate) chunk_manifest: Option<TextChunkManifest>,
3763 #[serde(default)]
3764 pub(crate) role: FrameRole,
3765 #[serde(default)]
3766 pub(crate) parent_sequence: Option<u64>,
3767 #[serde(default)]
3768 pub(crate) chunk_index: Option<u32>,
3769 #[serde(default)]
3770 pub(crate) chunk_count: Option<u32>,
3771 #[serde(default)]
3772 pub(crate) op: FrameWalOp,
3773 #[serde(default)]
3774 pub(crate) target_frame_id: Option<FrameId>,
3775 #[serde(default)]
3776 pub(crate) supersedes_frame_id: Option<FrameId>,
3777 #[serde(default)]
3778 pub(crate) reuse_payload_from: Option<FrameId>,
3779 #[serde(default)]
3781 pub(crate) source_sha256: Option<[u8; 32]>,
3782 #[serde(default)]
3784 pub(crate) source_path: Option<String>,
3785 #[serde(default)]
3787 pub(crate) enrichment_state: crate::types::EnrichmentState,
3788}
3789
3790pub(crate) fn prepare_canonical_payload(
3791 payload: &[u8],
3792) -> Result<(Vec<u8>, CanonicalEncoding, Option<u64>)> {
3793 if std::str::from_utf8(payload).is_ok() {
3794 let compressed = zstd::encode_all(std::io::Cursor::new(payload), 3)?;
3795 Ok((
3796 compressed,
3797 CanonicalEncoding::Zstd,
3798 Some(payload.len() as u64),
3799 ))
3800 } else {
3801 Ok((
3802 payload.to_vec(),
3803 CanonicalEncoding::Plain,
3804 Some(payload.len() as u64),
3805 ))
3806 }
3807}
3808
3809pub(crate) fn augment_search_text(
3810 base: Option<String>,
3811 uri: Option<&str>,
3812 title: Option<&str>,
3813 track: Option<&str>,
3814 tags: &[String],
3815 labels: &[String],
3816 extra_metadata: &BTreeMap<String, String>,
3817 content_dates: &[String],
3818 metadata: Option<&DocMetadata>,
3819) -> Option<String> {
3820 let mut segments: Vec<String> = Vec::new();
3821 if let Some(text) = base {
3822 let trimmed = text.trim();
3823 if !trimmed.is_empty() {
3824 segments.push(trimmed.to_string());
3825 }
3826 }
3827
3828 if let Some(title) = title {
3829 if !title.trim().is_empty() {
3830 segments.push(format!("title: {}", title.trim()));
3831 }
3832 }
3833
3834 if let Some(uri) = uri {
3835 if !uri.trim().is_empty() {
3836 segments.push(format!("uri: {}", uri.trim()));
3837 }
3838 }
3839
3840 if let Some(track) = track {
3841 if !track.trim().is_empty() {
3842 segments.push(format!("track: {}", track.trim()));
3843 }
3844 }
3845
3846 if !tags.is_empty() {
3847 segments.push(format!("tags: {}", tags.join(" ")));
3848 }
3849
3850 if !labels.is_empty() {
3851 segments.push(format!("labels: {}", labels.join(" ")));
3852 }
3853
3854 if !extra_metadata.is_empty() {
3855 for (key, value) in extra_metadata {
3856 if value.trim().is_empty() {
3857 continue;
3858 }
3859 segments.push(format!("{}: {}", key, value));
3860 }
3861 }
3862
3863 if !content_dates.is_empty() {
3864 segments.push(format!("dates: {}", content_dates.join(" ")));
3865 }
3866
3867 if let Some(meta) = metadata {
3868 if let Ok(meta_json) = serde_json::to_string(meta) {
3869 segments.push(format!("metadata: {}", meta_json));
3870 }
3871 }
3872
3873 if segments.is_empty() {
3874 None
3875 } else {
3876 Some(segments.join("\n"))
3877 }
3878}
3879
3880pub(crate) fn merge_unique(target: &mut Vec<String>, additions: Vec<String>) {
3881 if additions.is_empty() {
3882 return;
3883 }
3884 let mut seen: BTreeSet<String> = target.iter().cloned().collect();
3885 for value in additions {
3886 let trimmed = value.trim();
3887 if trimmed.is_empty() {
3888 continue;
3889 }
3890 let candidate = trimmed.to_string();
3891 if seen.insert(candidate.clone()) {
3892 target.push(candidate);
3893 }
3894 }
3895}