1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34 MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60fn truncate_for_embedding(text: &str) -> String {
63 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64 text.to_string()
65 } else {
66 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68 let end = truncated
70 .char_indices()
71 .rev()
72 .next()
73 .map(|(i, c)| i + c.len_utf8())
74 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75 text[..end].to_string()
76 }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80 options.extra_metadata.insert(
81 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82 runtime.provider_kind().to_string(),
83 );
84 options.extra_metadata.insert(
85 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86 runtime.provider_model_id(),
87 );
88 options.extra_metadata.insert(
89 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90 runtime.dimension().to_string(),
91 );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95 options: &mut PutOptions,
96 model: EmbeddingModelChoice,
97 dimension: usize,
98 model_id_override: Option<&str>,
99) {
100 let provider = match model {
101 EmbeddingModelChoice::OpenAILarge
102 | EmbeddingModelChoice::OpenAISmall
103 | EmbeddingModelChoice::OpenAIAda => "openai",
104 EmbeddingModelChoice::Nvidia => "nvidia",
105 _ => "fastembed",
106 };
107
108 let model_id = match model {
109 EmbeddingModelChoice::Nvidia => model_id_override
110 .map(str::trim)
111 .filter(|value| !value.is_empty())
112 .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113 .filter(|value| !value.is_empty())
114 .map(|value| {
115 if value.contains('/') {
116 value.to_string()
117 } else {
118 format!("nvidia/{value}")
119 }
120 })
121 .unwrap_or_else(|| model.canonical_model_id().to_string()),
122 _ => model.canonical_model_id().to_string(),
123 };
124
125 options.extra_metadata.insert(
126 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127 provider.to_string(),
128 );
129 options.extra_metadata.insert(
130 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131 model_id,
132 );
133 options.extra_metadata.insert(
134 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135 dimension.to_string(),
136 );
137}
138
139#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144 let expanded = if custom.starts_with("~/") {
146 if let Ok(home) = env::var("HOME") {
147 PathBuf::from(home).join(&custom[2..])
148 } else {
149 PathBuf::from(&custom)
150 }
151 } else {
152 PathBuf::from(&custom)
153 };
154 expanded
155 } else {
156 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158 PathBuf::from(home).join(".memvid").join("models")
159 };
160
161 let model_path = models_dir
162 .join("llm")
163 .join("phi-3-mini-q4")
164 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166 if model_path.exists() {
167 Ok(model_path)
168 } else {
169 Err(anyhow!(
170 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171 model_path.display()
172 ))
173 }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185 select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190fn parse_bool_flag(value: &str) -> Option<bool> {
192 match value.trim().to_ascii_lowercase().as_str() {
193 "1" | "true" | "yes" | "on" => Some(true),
194 "0" | "false" | "no" | "off" => Some(false),
195 _ => None,
196 }
197}
198
199#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202 std::env::var("MEMVID_PARALLEL_SEGMENTS")
203 .ok()
204 .and_then(|value| parse_bool_flag(&value))
205}
206
207#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211 PathBuf::from(custom)
212 } else if let Ok(home) = env::var("HOME") {
213 PathBuf::from(home).join(".memvid/models")
214 } else {
215 PathBuf::from(".memvid/models")
216 };
217
218 let model_name =
219 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224 vision_model.exists() && text_model.exists()
225}
226
227#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249 entities: Vec<memvid_core::ExtractedEntity>,
250 original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252 use memvid_core::ExtractedEntity;
253
254 if entities.is_empty() {
255 return entities;
256 }
257
258 let mut sorted: Vec<ExtractedEntity> = entities;
260 sorted.sort_by_key(|e| e.byte_start);
261
262 let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264 for entity in sorted {
265 if let Some(last) = merged.last_mut() {
266 let gap = entity.byte_start.saturating_sub(last.byte_end);
268 let same_type = last.entity_type == entity.entity_type;
269
270 let gap_is_mergeable = if gap == 0 {
272 true
273 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274 let gap_text = &original_text[last.byte_end..entity.byte_start];
276 if gap_text.contains('\n') || gap_text.contains('\r') {
278 false
279 } else {
280 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281 }
282 } else {
283 false
284 };
285
286 if same_type && gap_is_mergeable {
287 let merged_text = if entity.byte_end <= original_text.len() {
289 original_text[last.byte_start..entity.byte_end].to_string()
290 } else {
291 format!("{}{}", last.text, entity.text)
292 };
293
294 tracing::debug!(
295 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296 last.text, entity.text, merged_text, gap
297 );
298
299 last.text = merged_text;
300 last.byte_end = entity.byte_end;
301 last.confidence = (last.confidence + entity.confidence) / 2.0;
303 continue;
304 }
305 }
306
307 merged.push(entity);
308 }
309
310 merged
311}
312
313#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317 if confidence < NER_MIN_CONFIDENCE {
319 return false;
320 }
321
322 let trimmed = text.trim();
323
324 if trimmed.len() < NER_MIN_ENTITY_LEN {
326 return false;
327 }
328
329 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331 return false;
332 }
333
334 if trimmed.contains('\n') || trimmed.contains('\r') {
336 return false;
337 }
338
339 if trimmed.starts_with("##") || trimmed.ends_with("##") {
341 return false;
342 }
343
344 if trimmed.ends_with('.') {
346 return false;
347 }
348
349 let lower = trimmed.to_lowercase();
350
351 if trimmed.len() == 1 {
353 return false;
354 }
355
356 if trimmed.len() == 2 {
358 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360 return true;
361 }
362 return false;
364 }
365
366 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368 return false;
369 }
370
371 let words: Vec<&str> = trimmed.split_whitespace().collect();
374 if words.len() >= 2 {
375 let first_word = words[0];
376 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380 return false;
381 }
382 }
383 }
384
385 if let Some(first_char) = trimmed.chars().next() {
387 if first_char.is_lowercase() {
388 return false;
389 }
390 }
391
392 true
393}
394
395pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397 let (key, value) = s
398 .split_once('=')
399 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400 Ok((key.to_string(), value.to_string()))
401}
402
403#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408 pub lock_timeout: u64,
409 #[arg(long = "force", action = ArgAction::SetTrue)]
411 pub force: bool,
412}
413
414#[derive(Args)]
416pub struct PutArgs {
417 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419 pub file: PathBuf,
420 #[arg(long)]
422 pub json: bool,
423 #[arg(long, value_name = "PATH")]
425 pub input: Option<String>,
426 #[arg(long, value_name = "URI")]
428 pub uri: Option<String>,
429 #[arg(long, value_name = "TITLE")]
431 pub title: Option<String>,
432 #[arg(long)]
434 pub timestamp: Option<i64>,
435 #[arg(long)]
437 pub track: Option<String>,
438 #[arg(long)]
440 pub kind: Option<String>,
441 #[arg(long, action = ArgAction::SetTrue)]
443 pub video: bool,
444 #[arg(long, action = ArgAction::SetTrue)]
446 pub transcript: bool,
447 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449 pub tags: Vec<(String, String)>,
450 #[arg(long = "label", value_name = "LABEL")]
452 pub labels: Vec<String>,
453 #[arg(long = "metadata", value_name = "JSON")]
455 pub metadata: Vec<String>,
456 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458 pub no_auto_tag: bool,
459 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461 pub no_extract_dates: bool,
462 #[arg(long, action = ArgAction::SetTrue)]
464 pub audio: bool,
465 #[arg(long, action = ArgAction::SetTrue)]
468 pub transcribe: bool,
469 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
471 pub audio_segment_seconds: Option<u32>,
472 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
475 pub embedding: bool,
476 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
478 pub no_embedding: bool,
479 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
482 pub embedding_vec: Option<PathBuf>,
483 #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
491 pub embedding_vec_model: Option<String>,
492 #[arg(long, action = ArgAction::SetTrue)]
496 pub vector_compression: bool,
497 #[arg(long, action = ArgAction::SetTrue)]
501 pub contextual: bool,
502 #[arg(long = "contextual-model", value_name = "MODEL")]
504 pub contextual_model: Option<String>,
505 #[arg(long, action = ArgAction::SetTrue)]
508 pub tables: bool,
509 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
511 pub no_tables: bool,
512 #[cfg(feature = "clip")]
516 #[arg(long, action = ArgAction::SetTrue)]
517 pub clip: bool,
518
519 #[cfg(feature = "clip")]
521 #[arg(long, action = ArgAction::SetTrue)]
522 pub no_clip: bool,
523
524 #[arg(long, conflicts_with = "allow_duplicate")]
526 pub update_existing: bool,
527 #[arg(long)]
529 pub allow_duplicate: bool,
530
531 #[arg(long, action = ArgAction::SetTrue)]
535 pub temporal_enrich: bool,
536
537 #[arg(long, action = ArgAction::SetTrue)]
542 pub logic_mesh: bool,
543
544 #[cfg(feature = "parallel_segments")]
546 #[arg(long, action = ArgAction::SetTrue)]
547 pub parallel_segments: bool,
548 #[cfg(feature = "parallel_segments")]
550 #[arg(long, action = ArgAction::SetTrue)]
551 pub no_parallel_segments: bool,
552 #[cfg(feature = "parallel_segments")]
554 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
555 pub parallel_segment_tokens: Option<usize>,
556 #[cfg(feature = "parallel_segments")]
558 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
559 pub parallel_segment_pages: Option<usize>,
560 #[cfg(feature = "parallel_segments")]
562 #[arg(long = "parallel-threads", value_name = "N")]
563 pub parallel_threads: Option<usize>,
564 #[cfg(feature = "parallel_segments")]
566 #[arg(long = "parallel-queue-depth", value_name = "N")]
567 pub parallel_queue_depth: Option<usize>,
568
569 #[command(flatten)]
570 pub lock: LockCliArgs,
571}
572
573#[cfg(feature = "parallel_segments")]
574impl PutArgs {
575 pub fn wants_parallel(&self) -> bool {
576 if self.parallel_segments {
577 return true;
578 }
579 if self.no_parallel_segments {
580 return false;
581 }
582 if let Some(env_flag) = parallel_env_override() {
583 return env_flag;
584 }
585 false
586 }
587
588 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
589 let mut opts = memvid_core::BuildOpts::default();
590 if let Some(tokens) = self.parallel_segment_tokens {
591 opts.segment_tokens = tokens;
592 }
593 if let Some(pages) = self.parallel_segment_pages {
594 opts.segment_pages = pages;
595 }
596 if let Some(threads) = self.parallel_threads {
597 opts.threads = threads;
598 }
599 if let Some(depth) = self.parallel_queue_depth {
600 opts.queue_depth = depth;
601 }
602 opts.sanitize();
603 opts
604 }
605}
606
607#[cfg(not(feature = "parallel_segments"))]
608impl PutArgs {
609 pub fn wants_parallel(&self) -> bool {
610 false
611 }
612}
613
614#[derive(Args)]
616pub struct ApiFetchArgs {
617 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
619 pub file: PathBuf,
620 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
622 pub config: PathBuf,
623 #[arg(long, action = ArgAction::SetTrue)]
625 pub dry_run: bool,
626 #[arg(long, value_name = "MODE")]
628 pub mode: Option<ApiFetchMode>,
629 #[arg(long, value_name = "URI")]
631 pub uri: Option<String>,
632 #[arg(long, action = ArgAction::SetTrue)]
634 pub json: bool,
635
636 #[command(flatten)]
637 pub lock: LockCliArgs,
638}
639
640#[derive(Args)]
642pub struct UpdateArgs {
643 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
644 pub file: PathBuf,
645 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
646 pub frame_id: Option<u64>,
647 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
648 pub uri: Option<String>,
649 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
650 pub input: Option<PathBuf>,
651 #[arg(long = "set-uri", value_name = "URI")]
652 pub set_uri: Option<String>,
653 #[arg(long, value_name = "TITLE")]
654 pub title: Option<String>,
655 #[arg(long, value_name = "TIMESTAMP")]
656 pub timestamp: Option<i64>,
657 #[arg(long, value_name = "TRACK")]
658 pub track: Option<String>,
659 #[arg(long, value_name = "KIND")]
660 pub kind: Option<String>,
661 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
662 pub tags: Vec<(String, String)>,
663 #[arg(long = "label", value_name = "LABEL")]
664 pub labels: Vec<String>,
665 #[arg(long = "metadata", value_name = "JSON")]
666 pub metadata: Vec<String>,
667 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
668 pub embeddings: bool,
669 #[arg(long)]
670 pub json: bool,
671
672 #[command(flatten)]
673 pub lock: LockCliArgs,
674}
675
676#[derive(Args)]
678pub struct DeleteArgs {
679 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
680 pub file: PathBuf,
681 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
682 pub frame_id: Option<u64>,
683 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
684 pub uri: Option<String>,
685 #[arg(long, action = ArgAction::SetTrue)]
686 pub yes: bool,
687 #[arg(long)]
688 pub json: bool,
689
690 #[command(flatten)]
691 pub lock: LockCliArgs,
692}
693
694pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
696 let command = ApiFetchCommand {
697 file: args.file,
698 config_path: args.config,
699 dry_run: args.dry_run,
700 mode_override: args.mode,
701 uri_override: args.uri,
702 output_json: args.json,
703 lock_timeout_ms: args.lock.lock_timeout,
704 force_lock: args.lock.force,
705 };
706 run_api_fetch(config, command)
707}
708
709pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
711 let mut mem = Memvid::open(&args.file)?;
712 ensure_cli_mutation_allowed(&mem)?;
713 apply_lock_cli(&mut mem, &args.lock);
714 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
715
716 if !args.yes {
717 if let Some(uri) = &frame.uri {
718 println!("About to delete frame {} ({uri})", frame.id);
719 } else {
720 println!("About to delete frame {}", frame.id);
721 }
722 print!("Confirm? [y/N]: ");
723 io::stdout().flush()?;
724 let mut input = String::new();
725 io::stdin().read_line(&mut input)?;
726 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
727 if !confirmed {
728 println!("Aborted");
729 return Ok(());
730 }
731 }
732
733 let seq = mem.delete_frame(frame.id)?;
734 mem.commit()?;
735 let deleted = mem.frame_by_id(frame.id)?;
736
737 if args.json {
738 let json = json!({
739 "frame_id": frame.id,
740 "sequence": seq,
741 "status": frame_status_str(deleted.status),
742 });
743 println!("{}", serde_json::to_string_pretty(&json)?);
744 } else {
745 println!("Deleted frame {} (seq {})", frame.id, seq);
746 }
747
748 Ok(())
749}
750pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
751 let mut mem = Memvid::open(&args.file)?;
752 ensure_cli_mutation_allowed(&mem)?;
753 apply_lock_cli(&mut mem, &args.lock);
754
755 #[cfg(feature = "replay")]
757 let _ = mem.load_active_session();
758 let mut stats = mem.stats()?;
759 if stats.frame_count == 0 && !stats.has_lex_index {
760 mem.enable_lex()?;
761 stats = mem.stats()?;
762 }
763
764 if args.vector_compression {
766 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
767 }
768
769 let mut capacity_guard = CapacityGuard::from_stats(&stats);
770 let use_parallel = args.wants_parallel();
771 #[cfg(feature = "parallel_segments")]
772 let mut parallel_opts = if use_parallel {
773 Some(args.sanitized_parallel_opts())
774 } else {
775 None
776 };
777 #[cfg(feature = "parallel_segments")]
778 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
779 #[cfg(feature = "parallel_segments")]
780 let mut pending_parallel_indices: Vec<usize> = Vec::new();
781 let input_set = resolve_inputs(args.input.as_deref())?;
782 if args.embedding && args.embedding_vec.is_some() {
783 anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
784 }
785 if args.embedding_vec.is_some() {
786 match &input_set {
787 InputSet::Files(paths) if paths.len() != 1 => {
788 anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
789 }
790 _ => {}
791 }
792 }
793
794 if args.video {
795 if let Some(kind) = &args.kind {
796 if !kind.eq_ignore_ascii_case("video") {
797 anyhow::bail!("--video conflicts with explicit --kind={kind}");
798 }
799 }
800 if matches!(input_set, InputSet::Stdin) {
801 anyhow::bail!("--video requires --input <file>");
802 }
803 }
804
805 #[allow(dead_code)]
806 enum EmbeddingMode {
807 None,
808 Auto,
809 Explicit,
810 }
811
812 let mv2_dimension = mem.effective_vec_index_dimension()?;
816 let inferred_embedding_model = if args.embedding {
817 match mem.embedding_identity_summary(10_000) {
818 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
819 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
820 let models: Vec<_> = identities
821 .iter()
822 .filter_map(|entry| entry.identity.model.as_deref())
823 .collect();
824 anyhow::bail!(
825 "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
826 Detected models: {:?}\n\n\
827 Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
828 models
829 );
830 }
831 memvid_core::EmbeddingIdentitySummary::Unknown => None,
832 }
833 } else {
834 None
835 };
836 let (embedding_mode, embedding_runtime) = if args.embedding {
837 (
838 EmbeddingMode::Explicit,
839 Some(load_embedding_runtime_for_mv2(
840 config,
841 inferred_embedding_model.as_deref(),
842 mv2_dimension,
843 )?),
844 )
845 } else if args.embedding_vec.is_some() {
846 (EmbeddingMode::Explicit, None)
847 } else {
848 (EmbeddingMode::None, None)
849 };
850 let embedding_enabled = args.embedding || args.embedding_vec.is_some();
851 let runtime_ref = embedding_runtime.as_ref();
852
853 #[cfg(feature = "clip")]
858 let clip_enabled = {
859 if args.no_clip {
860 false
861 } else if args.clip {
862 true } else {
864 let model_available = is_clip_model_installed();
866 if model_available {
867 match &input_set {
869 InputSet::Files(paths) => paths.iter().any(|p| {
870 is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
871 }),
872 InputSet::Stdin => false,
873 }
874 } else {
875 false
876 }
877 }
878 };
879 #[cfg(feature = "clip")]
880 let clip_model = if clip_enabled {
881 mem.enable_clip()?;
882 if !args.json {
883 let mode = if args.clip {
884 "explicit"
885 } else {
886 "auto-detected"
887 };
888 eprintln!("ℹ️ CLIP visual embeddings enabled ({mode})");
889 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
890 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
891 eprintln!(" Use --no-clip to disable auto-detection");
892 eprintln!();
893 }
894 match memvid_core::ClipModel::default_model() {
896 Ok(model) => Some(model),
897 Err(e) => {
898 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
899 None
900 }
901 }
902 } else {
903 None
904 };
905
906 if embedding_enabled && !args.json {
908 let capacity = stats.capacity_bytes;
909 if args.vector_compression {
910 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
913 eprintln!(" Storage: ~20 KB per document (16x compressed)");
914 eprintln!(" Accuracy: ~95% recall@10 maintained");
915 eprintln!(
916 " Capacity: ~{} documents max for {:.1} GB",
917 max_docs,
918 capacity as f64 / 1e9
919 );
920 eprintln!();
921 } else {
922 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
924 eprintln!(" Storage: ~270 KB per document");
925 eprintln!(
926 " Capacity: ~{} documents max for {:.1} GB",
927 max_docs,
928 capacity as f64 / 1e9
929 );
930 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
931 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
932 eprintln!();
933 }
934 }
935
936 let embed_progress = if embedding_enabled {
937 let total = match &input_set {
938 InputSet::Files(paths) => Some(paths.len() as u64),
939 InputSet::Stdin => None,
940 };
941 Some(create_task_progress_bar(total, "embed", false))
942 } else {
943 None
944 };
945 let mut embedded_docs = 0usize;
946
947 if let InputSet::Files(paths) = &input_set {
948 if paths.len() > 1 {
949 if args.uri.is_some() || args.title.is_some() {
950 anyhow::bail!(
951 "--uri/--title apply to a single file; omit them when using a directory or glob"
952 );
953 }
954 }
955 }
956
957 let mut reports = Vec::new();
958 let mut processed = 0usize;
959 let mut skipped = 0usize;
960 let mut bytes_added = 0u64;
961 let mut summary_warnings: Vec<String> = Vec::new();
962 #[cfg(feature = "clip")]
963 let mut clip_embeddings_added = false;
964
965 let mut capacity_reached = false;
966 match input_set {
967 InputSet::Stdin => {
968 processed += 1;
969 match read_payload(None) {
970 Ok(payload) => {
971 let mut analyze_spinner = if args.json {
972 None
973 } else {
974 Some(create_spinner("Analyzing stdin payload..."))
975 };
976 match ingest_payload(
977 &mut mem,
978 &args,
979 payload,
980 None,
981 capacity_guard.as_mut(),
982 embedding_enabled,
983 runtime_ref,
984 use_parallel,
985 ) {
986 Ok(outcome) => {
987 if let Some(pb) = analyze_spinner.take() {
988 pb.finish_and_clear();
989 }
990 bytes_added += outcome.report.stored_bytes as u64;
991 if args.json {
992 summary_warnings
993 .extend(outcome.report.extraction.warnings.iter().cloned());
994 }
995 reports.push(outcome.report);
996 let report_index = reports.len() - 1;
997 if !args.json && !use_parallel {
998 if let Some(report) = reports.get(report_index) {
999 print_report(report);
1000 }
1001 }
1002 if args.transcript {
1003 let notice = transcript_notice_message(&mut mem)?;
1004 if args.json {
1005 summary_warnings.push(notice);
1006 } else {
1007 println!("{notice}");
1008 }
1009 }
1010 if outcome.embedded {
1011 if let Some(pb) = embed_progress.as_ref() {
1012 pb.inc(1);
1013 }
1014 embedded_docs += 1;
1015 }
1016 if use_parallel {
1017 #[cfg(feature = "parallel_segments")]
1018 if let Some(input) = outcome.parallel_input {
1019 pending_parallel_indices.push(report_index);
1020 pending_parallel_inputs.push(input);
1021 }
1022 } else if let Some(guard) = capacity_guard.as_mut() {
1023 mem.commit()?;
1024 let stats = mem.stats()?;
1025 guard.update_after_commit(&stats);
1026 guard.check_and_warn_capacity(); }
1028 }
1029 Err(err) => {
1030 if let Some(pb) = analyze_spinner.take() {
1031 pb.finish_and_clear();
1032 }
1033 if err.downcast_ref::<DuplicateUriError>().is_some() {
1034 return Err(err);
1035 }
1036 warn!("skipped stdin payload: {err}");
1037 skipped += 1;
1038 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1039 capacity_reached = true;
1040 }
1041 }
1042 }
1043 }
1044 Err(err) => {
1045 warn!("failed to read stdin: {err}");
1046 skipped += 1;
1047 }
1048 }
1049 }
1050 InputSet::Files(ref paths) => {
1051 let mut transcript_notice_printed = false;
1052 for path in paths {
1053 processed += 1;
1054 match read_payload(Some(&path)) {
1055 Ok(payload) => {
1056 let label = path.display().to_string();
1057 let mut analyze_spinner = if args.json {
1058 None
1059 } else {
1060 Some(create_spinner(&format!("Analyzing {label}...")))
1061 };
1062 match ingest_payload(
1063 &mut mem,
1064 &args,
1065 payload,
1066 Some(&path),
1067 capacity_guard.as_mut(),
1068 embedding_enabled,
1069 runtime_ref,
1070 use_parallel,
1071 ) {
1072 Ok(outcome) => {
1073 if let Some(pb) = analyze_spinner.take() {
1074 pb.finish_and_clear();
1075 }
1076 bytes_added += outcome.report.stored_bytes as u64;
1077
1078 #[cfg(feature = "clip")]
1080 if let Some(ref model) = clip_model {
1081 let mime = outcome.report.mime.as_deref();
1082 let clip_frame_id = outcome.report.frame_id;
1083
1084 if is_image_file(&path) {
1085 match model.encode_image_file(&path) {
1086 Ok(embedding) => {
1087 if let Err(e) = mem.add_clip_embedding_with_page(
1088 clip_frame_id,
1089 None,
1090 embedding,
1091 ) {
1092 warn!(
1093 "Failed to add CLIP embedding for {}: {e}",
1094 path.display()
1095 );
1096 } else {
1097 info!(
1098 "Added CLIP embedding for frame {}",
1099 clip_frame_id
1100 );
1101 clip_embeddings_added = true;
1102 }
1103 }
1104 Err(e) => {
1105 warn!(
1106 "Failed to encode CLIP embedding for {}: {e}",
1107 path.display()
1108 );
1109 }
1110 }
1111 } else if matches!(mime, Some(m) if m == "application/pdf") {
1112 if let Err(e) = mem.commit() {
1114 warn!("Failed to commit before CLIP extraction: {e}");
1115 }
1116
1117 match render_pdf_pages_for_clip(
1118 &path,
1119 CLIP_PDF_MAX_IMAGES,
1120 CLIP_PDF_TARGET_PX,
1121 ) {
1122 Ok(rendered_pages) => {
1123 use rayon::prelude::*;
1124
1125 let path_for_closure = path.clone();
1128
1129 let prev_hook = std::panic::take_hook();
1132 std::panic::set_hook(Box::new(|_| {
1133 }));
1135
1136 let encoded_images: Vec<_> = rendered_pages
1137 .into_par_iter()
1138 .filter_map(|(page_num, image)| {
1139 let image_info = get_image_info(&image);
1141 if !image_info.should_embed() {
1142 info!(
1143 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1144 path_for_closure.display(),
1145 page_num,
1146 image_info.color_variance,
1147 image_info.width,
1148 image_info.height
1149 );
1150 return None;
1151 }
1152
1153 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1157 let rgb_image = image.to_rgb8();
1158 let mut png_bytes = Vec::new();
1159 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1160 &mut Cursor::new(&mut png_bytes),
1161 image::ImageFormat::Png,
1162 ).map(|()| png_bytes)
1163 }));
1164
1165 match encode_result {
1166 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1167 Ok(Err(e)) => {
1168 info!(
1169 "Skipping image for {} page {}: {e}",
1170 path_for_closure.display(),
1171 page_num
1172 );
1173 None
1174 }
1175 Err(_) => {
1176 info!(
1178 "Skipping malformed image for {} page {}",
1179 path_for_closure.display(),
1180 page_num
1181 );
1182 None
1183 }
1184 }
1185 })
1186 .collect();
1187
1188 std::panic::set_hook(prev_hook);
1190
1191 let mut child_frame_offset = 1u64;
1194 let parent_uri = outcome.report.uri.clone();
1195 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1196 let total_images = encoded_images.len();
1197
1198 let clip_pb = if !args.json && total_images > 0 {
1200 let pb = ProgressBar::new(total_images as u64);
1201 pb.set_style(
1202 ProgressStyle::with_template(
1203 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1204 )
1205 .unwrap_or_else(|_| ProgressStyle::default_bar())
1206 .progress_chars("█▓░"),
1207 );
1208 Some(pb)
1209 } else {
1210 None
1211 };
1212
1213 for (page_num, image, png_bytes) in encoded_images {
1214 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1215 let child_title = format!(
1216 "{} - Image {} (page {})",
1217 parent_title,
1218 child_frame_offset,
1219 page_num
1220 );
1221
1222 let child_options = PutOptions::builder()
1223 .uri(&child_uri)
1224 .title(&child_title)
1225 .parent_id(clip_frame_id)
1226 .role(FrameRole::ExtractedImage)
1227 .auto_tag(false)
1228 .extract_dates(false)
1229 .build();
1230
1231 match mem.put_bytes_with_options(&png_bytes, child_options) {
1232 Ok(_child_seq) => {
1233 let child_frame_id = clip_frame_id + child_frame_offset;
1234 child_frame_offset += 1;
1235
1236 match model.encode_image(&image) {
1237 Ok(embedding) => {
1238 if let Err(e) = mem
1239 .add_clip_embedding_with_page(
1240 child_frame_id,
1241 Some(page_num),
1242 embedding,
1243 )
1244 {
1245 warn!(
1246 "Failed to add CLIP embedding for {} page {}: {e}",
1247 path.display(),
1248 page_num
1249 );
1250 } else {
1251 info!(
1252 "Added CLIP image frame {} for {} page {}",
1253 child_frame_id,
1254 clip_frame_id,
1255 page_num
1256 );
1257 clip_embeddings_added = true;
1258 }
1259 }
1260 Err(e) => warn!(
1261 "Failed to encode CLIP embedding for {} page {}: {e}",
1262 path.display(),
1263 page_num
1264 ),
1265 }
1266
1267 if let Err(e) = mem.commit() {
1269 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1270 }
1271 }
1272 Err(e) => warn!(
1273 "Failed to store image frame for {} page {}: {e}",
1274 path.display(),
1275 page_num
1276 ),
1277 }
1278
1279 if let Some(ref pb) = clip_pb {
1280 pb.inc(1);
1281 }
1282 }
1283
1284 if let Some(pb) = clip_pb {
1285 pb.finish_and_clear();
1286 }
1287 }
1288 Err(err) => warn!(
1289 "Failed to render PDF pages for CLIP {}: {err}",
1290 path.display()
1291 ),
1292 }
1293 }
1294 }
1295
1296 if args.json {
1297 summary_warnings
1298 .extend(outcome.report.extraction.warnings.iter().cloned());
1299 }
1300 reports.push(outcome.report);
1301 let report_index = reports.len() - 1;
1302 if !args.json && !use_parallel {
1303 if let Some(report) = reports.get(report_index) {
1304 print_report(report);
1305 }
1306 }
1307 if args.transcript && !transcript_notice_printed {
1308 let notice = transcript_notice_message(&mut mem)?;
1309 if args.json {
1310 summary_warnings.push(notice);
1311 } else {
1312 println!("{notice}");
1313 }
1314 transcript_notice_printed = true;
1315 }
1316 if outcome.embedded {
1317 if let Some(pb) = embed_progress.as_ref() {
1318 pb.inc(1);
1319 }
1320 embedded_docs += 1;
1321 }
1322 if use_parallel {
1323 #[cfg(feature = "parallel_segments")]
1324 if let Some(input) = outcome.parallel_input {
1325 pending_parallel_indices.push(report_index);
1326 pending_parallel_inputs.push(input);
1327 }
1328 } else if let Some(guard) = capacity_guard.as_mut() {
1329 mem.commit()?;
1330 let stats = mem.stats()?;
1331 guard.update_after_commit(&stats);
1332 guard.check_and_warn_capacity(); }
1334 }
1335 Err(err) => {
1336 if let Some(pb) = analyze_spinner.take() {
1337 pb.finish_and_clear();
1338 }
1339 if err.downcast_ref::<DuplicateUriError>().is_some() {
1340 return Err(err);
1341 }
1342 warn!("skipped {}: {err}", path.display());
1343 skipped += 1;
1344 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1345 capacity_reached = true;
1346 }
1347 }
1348 }
1349 }
1350 Err(err) => {
1351 warn!("failed to read {}: {err}", path.display());
1352 skipped += 1;
1353 }
1354 }
1355 if capacity_reached {
1356 break;
1357 }
1358 }
1359 }
1360 }
1361
1362 if capacity_reached {
1363 warn!("capacity limit reached; remaining inputs were not processed");
1364 }
1365
1366 if let Some(pb) = embed_progress.as_ref() {
1367 pb.finish_with_message("embed");
1368 }
1369
1370 if let Some(runtime) = embedding_runtime.as_ref() {
1371 if embedded_docs > 0 {
1372 match embedding_mode {
1373 EmbeddingMode::Explicit => println!(
1374 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1375 embedded_docs,
1376 runtime.dimension()
1377 ),
1378 EmbeddingMode::Auto => println!(
1379 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1380 embedded_docs,
1381 runtime.dimension()
1382 ),
1383 EmbeddingMode::None => {}
1384 }
1385 } else {
1386 match embedding_mode {
1387 EmbeddingMode::Explicit => {
1388 println!("No embeddings were generated (no textual content available)");
1389 }
1390 EmbeddingMode::Auto => {
1391 println!(
1392 "Semantic runtime available, but no textual content produced embeddings"
1393 );
1394 }
1395 EmbeddingMode::None => {}
1396 }
1397 }
1398 }
1399
1400 if reports.is_empty() {
1401 if args.json {
1402 if capacity_reached {
1403 summary_warnings.push("capacity_limit_reached".to_string());
1404 }
1405 let summary_json = json!({
1406 "processed": processed,
1407 "ingested": 0,
1408 "skipped": skipped,
1409 "bytes_added": bytes_added,
1410 "bytes_added_human": format_bytes(bytes_added),
1411 "embedded_documents": embedded_docs,
1412 "capacity_reached": capacity_reached,
1413 "warnings": summary_warnings,
1414 "reports": Vec::<serde_json::Value>::new(),
1415 });
1416 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1417 } else {
1418 println!(
1419 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1420 processed, skipped
1421 );
1422 }
1423 return Ok(());
1424 }
1425
1426 #[cfg(feature = "parallel_segments")]
1427 let mut parallel_flush_spinner = if use_parallel && !args.json {
1428 Some(create_spinner("Flushing parallel segments..."))
1429 } else {
1430 None
1431 };
1432
1433 if use_parallel {
1434 #[cfg(feature = "parallel_segments")]
1435 {
1436 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1437 let mut opts = BuildOpts::default();
1438 opts.sanitize();
1439 opts
1440 });
1441 opts.vec_compression = mem.vector_compression().clone();
1443 let seqs = if pending_parallel_inputs.is_empty() {
1444 mem.commit_parallel(opts)?;
1445 Vec::new()
1446 } else {
1447 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1448 };
1449 if let Some(pb) = parallel_flush_spinner.take() {
1450 pb.finish_with_message("Flushed parallel segments");
1451 }
1452 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1453 if let Some(report) = reports.get_mut(idx) {
1454 report.seq = seq;
1455 }
1456 }
1457 }
1458 #[cfg(not(feature = "parallel_segments"))]
1459 {
1460 mem.commit()?;
1461 }
1462 } else {
1463 mem.commit()?;
1464 }
1465
1466 #[cfg(feature = "clip")]
1468 if clip_embeddings_added {
1469 mem.commit()?;
1470 }
1471
1472 if !args.json && use_parallel {
1473 for report in &reports {
1474 print_report(report);
1475 }
1476 }
1477
1478 let mut tables_extracted = 0usize;
1480 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1481 if args.tables && !args.no_tables {
1482 if let InputSet::Files(ref paths) = input_set {
1483 let pdf_paths: Vec<_> = paths
1484 .iter()
1485 .filter(|p| {
1486 p.extension()
1487 .and_then(|e| e.to_str())
1488 .map(|e| e.eq_ignore_ascii_case("pdf"))
1489 .unwrap_or(false)
1490 })
1491 .collect();
1492
1493 if !pdf_paths.is_empty() {
1494 let table_spinner = if args.json {
1495 None
1496 } else {
1497 Some(create_spinner(&format!(
1498 "Extracting tables from {} PDF(s)...",
1499 pdf_paths.len()
1500 )))
1501 };
1502
1503 let table_options = TableExtractionOptions::builder()
1505 .mode(memvid_core::table::ExtractionMode::Aggressive)
1506 .min_rows(2)
1507 .min_cols(2)
1508 .min_quality(memvid_core::table::TableQuality::Low)
1509 .merge_multi_page(true)
1510 .build();
1511
1512 for pdf_path in pdf_paths {
1513 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1514 let filename = pdf_path
1515 .file_name()
1516 .and_then(|s| s.to_str())
1517 .unwrap_or("unknown.pdf");
1518
1519 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1520 for table in &result.tables {
1521 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1522 {
1523 tables_extracted += 1;
1524 table_summaries.push(json!({
1525 "table_id": table.table_id,
1526 "source_file": table.source_file,
1527 "meta_frame_id": meta_id,
1528 "rows": table.n_rows,
1529 "cols": table.n_cols,
1530 "quality": format!("{:?}", table.quality),
1531 "pages": format!("{}-{}", table.page_start, table.page_end),
1532 }));
1533 }
1534 }
1535 }
1536 }
1537 }
1538
1539 if let Some(pb) = table_spinner {
1540 if tables_extracted > 0 {
1541 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1542 } else {
1543 pb.finish_with_message("No tables found");
1544 }
1545 }
1546
1547 if tables_extracted > 0 {
1549 mem.commit()?;
1550 }
1551 }
1552 }
1553 }
1554
1555 #[cfg(feature = "logic_mesh")]
1558 let mut logic_mesh_entities = 0usize;
1559 #[cfg(feature = "logic_mesh")]
1560 {
1561 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1562
1563 let models_dir = config.models_dir.clone();
1564 let model_path = ner_model_path(&models_dir);
1565 let tokenizer_path = ner_tokenizer_path(&models_dir);
1566
1567 let ner_available = model_path.exists() && tokenizer_path.exists();
1569 let should_run_ner = args.logic_mesh;
1570
1571 if should_run_ner && !ner_available {
1572 if args.json {
1574 summary_warnings.push(
1575 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1576 );
1577 } else {
1578 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1579 eprintln!(" Run: memvid models install --ner distilbert-ner");
1580 }
1581 } else if should_run_ner {
1582 let ner_spinner = if args.json {
1583 None
1584 } else {
1585 Some(create_spinner("Building Logic-Mesh entity graph..."))
1586 };
1587
1588 match NerModel::load(&model_path, &tokenizer_path, None) {
1589 Ok(mut ner_model) => {
1590 let mut mesh = LogicMesh::new();
1591 let mut filtered_count = 0usize;
1592
1593 for report in &reports {
1595 let frame_id = report.frame_id;
1596 if let Some(ref content) = report.search_text {
1598 if !content.is_empty() {
1599 match ner_model.extract(content) {
1600 Ok(raw_entities) => {
1601 let merged_entities = merge_adjacent_entities(raw_entities, content);
1604
1605 for entity in &merged_entities {
1606 if !is_valid_entity(&entity.text, entity.confidence) {
1608 tracing::debug!(
1609 "Filtered entity: '{}' (confidence: {:.0}%)",
1610 entity.text, entity.confidence * 100.0
1611 );
1612 filtered_count += 1;
1613 continue;
1614 }
1615
1616 let kind = entity.to_entity_kind();
1618 let node = MeshNode::new(
1620 entity.text.to_lowercase(),
1621 entity.text.trim().to_string(),
1622 kind,
1623 entity.confidence,
1624 frame_id,
1625 entity.byte_start as u32,
1626 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1627 );
1628 mesh.merge_node(node);
1629 logic_mesh_entities += 1;
1630 }
1631 }
1632 Err(e) => {
1633 warn!("NER extraction failed for frame {frame_id}: {e}");
1634 }
1635 }
1636 }
1637 }
1638 }
1639
1640 if logic_mesh_entities > 0 {
1641 mesh.finalize();
1642 let node_count = mesh.stats().node_count;
1643 mem.set_logic_mesh(mesh);
1645 mem.commit()?;
1646 info!(
1647 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1648 logic_mesh_entities,
1649 node_count,
1650 filtered_count
1651 );
1652 }
1653
1654 if let Some(pb) = ner_spinner {
1655 pb.finish_with_message(format!(
1656 "Logic-Mesh: {} entities ({} unique)",
1657 logic_mesh_entities,
1658 mem.mesh_node_count()
1659 ));
1660 }
1661 }
1662 Err(e) => {
1663 if let Some(pb) = ner_spinner {
1664 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1665 }
1666 if args.json {
1667 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1668 }
1669 }
1670 }
1671 }
1672 }
1673
1674 let stats = mem.stats()?;
1675 if args.json {
1676 if capacity_reached {
1677 summary_warnings.push("capacity_limit_reached".to_string());
1678 }
1679 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1680 let total_duration_ms: Option<u64> = {
1681 let sum: u64 = reports
1682 .iter()
1683 .filter_map(|r| r.extraction.duration_ms)
1684 .sum();
1685 if sum > 0 {
1686 Some(sum)
1687 } else {
1688 None
1689 }
1690 };
1691 let total_pages: Option<u32> = {
1692 let sum: u32 = reports
1693 .iter()
1694 .filter_map(|r| r.extraction.pages_processed)
1695 .sum();
1696 if sum > 0 {
1697 Some(sum)
1698 } else {
1699 None
1700 }
1701 };
1702 let embedding_mode_str = match embedding_mode {
1703 EmbeddingMode::None => "none",
1704 EmbeddingMode::Auto => "auto",
1705 EmbeddingMode::Explicit => "explicit",
1706 };
1707 let summary_json = json!({
1708 "processed": processed,
1709 "ingested": reports.len(),
1710 "skipped": skipped,
1711 "bytes_added": bytes_added,
1712 "bytes_added_human": format_bytes(bytes_added),
1713 "embedded_documents": embedded_docs,
1714 "embedding": {
1715 "enabled": embedding_enabled,
1716 "mode": embedding_mode_str,
1717 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1718 },
1719 "tables": {
1720 "enabled": args.tables && !args.no_tables,
1721 "extracted": tables_extracted,
1722 "tables": table_summaries,
1723 },
1724 "capacity_reached": capacity_reached,
1725 "warnings": summary_warnings,
1726 "extraction": {
1727 "total_duration_ms": total_duration_ms,
1728 "total_pages_processed": total_pages,
1729 },
1730 "memory": {
1731 "path": args.file.display().to_string(),
1732 "frame_count": stats.frame_count,
1733 "size_bytes": stats.size_bytes,
1734 "tier": format!("{:?}", stats.tier),
1735 },
1736 "reports": reports_json,
1737 });
1738 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1739 } else {
1740 println!(
1741 "Committed {} frame(s); total frames {}",
1742 reports.len(),
1743 stats.frame_count
1744 );
1745 println!(
1746 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1747 processed,
1748 reports.len(),
1749 skipped,
1750 format_bytes(bytes_added)
1751 );
1752 if tables_extracted > 0 {
1753 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1754 }
1755 }
1756
1757 #[cfg(feature = "replay")]
1759 let _ = mem.save_active_session();
1760
1761 Ok(())
1762}
1763
1764pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1765 let mut mem = Memvid::open(&args.file)?;
1766 ensure_cli_mutation_allowed(&mem)?;
1767 apply_lock_cli(&mut mem, &args.lock);
1768 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1769 let frame_id = existing.id;
1770
1771 let payload_bytes = match args.input.as_ref() {
1772 Some(path) => Some(read_payload(Some(path))?),
1773 None => None,
1774 };
1775 let payload_slice = payload_bytes.as_deref();
1776 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1777 let source_path = args.input.as_deref();
1778
1779 let mut options = PutOptions::default();
1780 options.enable_embedding = false;
1781 options.auto_tag = payload_slice.is_some();
1782 options.extract_dates = payload_slice.is_some();
1783 options.timestamp = Some(existing.timestamp);
1784 options.track = existing.track.clone();
1785 options.kind = existing.kind.clone();
1786 options.uri = existing.uri.clone();
1787 options.title = existing.title.clone();
1788 options.metadata = existing.metadata.clone();
1789 options.search_text = existing.search_text.clone();
1790 options.tags = existing.tags.clone();
1791 options.labels = existing.labels.clone();
1792 options.extra_metadata = existing.extra_metadata.clone();
1793
1794 if let Some(new_uri) = &args.set_uri {
1795 options.uri = Some(derive_uri(Some(new_uri), None));
1796 }
1797
1798 if options.uri.is_none() && payload_slice.is_some() {
1799 options.uri = Some(derive_uri(None, source_path));
1800 }
1801
1802 if let Some(title) = &args.title {
1803 options.title = Some(title.clone());
1804 }
1805 if let Some(ts) = args.timestamp {
1806 options.timestamp = Some(ts);
1807 }
1808 if let Some(track) = &args.track {
1809 options.track = Some(track.clone());
1810 }
1811 if let Some(kind) = &args.kind {
1812 options.kind = Some(kind.clone());
1813 }
1814
1815 if !args.labels.is_empty() {
1816 options.labels = args.labels.clone();
1817 }
1818
1819 if !args.tags.is_empty() {
1820 options.tags.clear();
1821 for (key, value) in &args.tags {
1822 if !key.is_empty() {
1823 options.tags.push(key.clone());
1824 }
1825 if !value.is_empty() && value != key {
1826 options.tags.push(value.clone());
1827 }
1828 options.extra_metadata.insert(key.clone(), value.clone());
1829 }
1830 }
1831
1832 apply_metadata_overrides(&mut options, &args.metadata);
1833
1834 let mut search_source = options.search_text.clone();
1835
1836 if let Some(payload) = payload_slice {
1837 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1838 if let Some(title) = inferred_title {
1839 options.title = Some(title);
1840 }
1841
1842 if options.uri.is_none() {
1843 options.uri = Some(derive_uri(None, source_path));
1844 }
1845
1846 let analysis = analyze_file(
1847 source_path,
1848 payload,
1849 payload_utf8,
1850 options.title.as_deref(),
1851 AudioAnalyzeOptions::default(),
1852 false,
1853 );
1854 if let Some(meta) = analysis.metadata {
1855 options.metadata = Some(meta);
1856 }
1857 if let Some(text) = analysis.search_text {
1858 search_source = Some(text.clone());
1859 options.search_text = Some(text);
1860 }
1861 } else {
1862 options.auto_tag = false;
1863 options.extract_dates = false;
1864 }
1865
1866 let stats = mem.stats()?;
1867 options.enable_embedding = stats.has_vec_index;
1868
1869 let mv2_dimension = mem.effective_vec_index_dimension()?;
1871 let mut embedding: Option<Vec<f32>> = None;
1872 if args.embeddings {
1873 let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1874 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1875 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1876 let models: Vec<_> = identities
1877 .iter()
1878 .filter_map(|entry| entry.identity.model.as_deref())
1879 .collect();
1880 anyhow::bail!(
1881 "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1882 Detected models: {:?}",
1883 models
1884 );
1885 }
1886 memvid_core::EmbeddingIdentitySummary::Unknown => None,
1887 };
1888
1889 let runtime = load_embedding_runtime_for_mv2(
1890 config,
1891 inferred_embedding_model.as_deref(),
1892 mv2_dimension,
1893 )?;
1894 if let Some(text) = search_source.as_ref() {
1895 if !text.trim().is_empty() {
1896 embedding = Some(runtime.embed_passage(text)?);
1897 }
1898 }
1899 if embedding.is_none() {
1900 if let Some(text) = payload_utf8 {
1901 if !text.trim().is_empty() {
1902 embedding = Some(runtime.embed_passage(text)?);
1903 }
1904 }
1905 }
1906 if embedding.is_none() {
1907 warn!("no textual content available; embeddings not recomputed");
1908 }
1909 if embedding.is_some() {
1910 apply_embedding_identity_metadata(&mut options, &runtime);
1911 }
1912 }
1913
1914 let mut effective_embedding = embedding;
1915 if effective_embedding.is_none() && stats.has_vec_index {
1916 effective_embedding = mem.frame_embedding(frame_id)?;
1917 }
1918
1919 let final_uri = options.uri.clone();
1920 let replaced_payload = payload_slice.is_some();
1921 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1922 mem.commit()?;
1923
1924 let updated_frame =
1925 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1926 mem.frame_by_uri(&uri)?
1927 } else {
1928 let mut query = TimelineQueryBuilder::default();
1929 if let Some(limit) = NonZeroU64::new(1) {
1930 query = query.limit(limit).reverse(true);
1931 }
1932 let latest = mem.timeline(query.build())?;
1933 if let Some(entry) = latest.first() {
1934 mem.frame_by_id(entry.frame_id)?
1935 } else {
1936 mem.frame_by_id(frame_id)?
1937 }
1938 };
1939
1940 if args.json {
1941 let json = json!({
1942 "sequence": seq,
1943 "previous_frame_id": frame_id,
1944 "frame": frame_to_json(&updated_frame),
1945 "replaced_payload": replaced_payload,
1946 });
1947 println!("{}", serde_json::to_string_pretty(&json)?);
1948 } else {
1949 println!(
1950 "Updated frame {} → new frame {} (seq {})",
1951 frame_id, updated_frame.id, seq
1952 );
1953 println!(
1954 "Payload: {}",
1955 if replaced_payload {
1956 "replaced"
1957 } else {
1958 "reused"
1959 }
1960 );
1961 print_frame_summary(&mut mem, &updated_frame)?;
1962 }
1963
1964 Ok(())
1965}
1966
1967fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1968 if let Some(uri) = provided {
1969 let sanitized = sanitize_uri(uri, true);
1970 return format!("mv2://{}", sanitized);
1971 }
1972
1973 if let Some(path) = source {
1974 let raw = path.to_string_lossy();
1975 let sanitized = sanitize_uri(&raw, false);
1976 return format!("mv2://{}", sanitized);
1977 }
1978
1979 format!("mv2://frames/{}", Uuid::new_v4())
1980}
1981
1982pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1983 let digest = hash(payload).to_hex();
1984 let short = &digest[..32];
1985 let ext_from_path = source
1986 .and_then(|path| path.extension())
1987 .and_then(|ext| ext.to_str())
1988 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1989 .filter(|ext| !ext.is_empty());
1990 let ext = ext_from_path
1991 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1992 .unwrap_or_else(|| "bin".to_string());
1993 format!("mv2://video/{short}.{ext}")
1994}
1995
1996fn derive_title(
1997 provided: Option<String>,
1998 source: Option<&Path>,
1999 payload_utf8: Option<&str>,
2000) -> Option<String> {
2001 if let Some(title) = provided {
2002 let trimmed = title.trim();
2003 if trimmed.is_empty() {
2004 None
2005 } else {
2006 Some(trimmed.to_string())
2007 }
2008 } else {
2009 if let Some(text) = payload_utf8 {
2010 if let Some(markdown_title) = extract_markdown_title(text) {
2011 return Some(markdown_title);
2012 }
2013 }
2014 source
2015 .and_then(|path| path.file_stem())
2016 .and_then(|stem| stem.to_str())
2017 .map(to_title_case)
2018 }
2019}
2020
2021fn extract_markdown_title(text: &str) -> Option<String> {
2022 for line in text.lines() {
2023 let trimmed = line.trim();
2024 if trimmed.starts_with('#') {
2025 let title = trimmed.trim_start_matches('#').trim();
2026 if !title.is_empty() {
2027 return Some(title.to_string());
2028 }
2029 }
2030 }
2031 None
2032}
2033
2034fn to_title_case(input: &str) -> String {
2035 if input.is_empty() {
2036 return String::new();
2037 }
2038 let mut chars = input.chars();
2039 let Some(first) = chars.next() else {
2040 return String::new();
2041 };
2042 let prefix = first.to_uppercase().collect::<String>();
2043 prefix + chars.as_str()
2044}
2045
2046fn should_skip_input(path: &Path) -> bool {
2047 matches!(
2048 path.file_name().and_then(|name| name.to_str()),
2049 Some(".DS_Store")
2050 )
2051}
2052
2053fn should_skip_dir(path: &Path) -> bool {
2054 matches!(
2055 path.file_name().and_then(|name| name.to_str()),
2056 Some(name) if name.starts_with('.')
2057 )
2058}
2059
2060enum InputSet {
2061 Stdin,
2062 Files(Vec<PathBuf>),
2063}
2064
2065#[cfg(feature = "clip")]
2067fn is_image_file(path: &std::path::Path) -> bool {
2068 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2069 return false;
2070 };
2071 matches!(
2072 ext.to_ascii_lowercase().as_str(),
2073 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2074 )
2075}
2076
2077fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2078 let Some(raw) = input else {
2079 return Ok(InputSet::Stdin);
2080 };
2081
2082 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2083 let mut files = Vec::new();
2084 let mut matched_any = false;
2085 for entry in glob(raw)? {
2086 let path = entry?;
2087 if path.is_file() {
2088 matched_any = true;
2089 if should_skip_input(&path) {
2090 continue;
2091 }
2092 files.push(path);
2093 }
2094 }
2095 files.sort();
2096 if files.is_empty() {
2097 if matched_any {
2098 anyhow::bail!(
2099 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2100 );
2101 }
2102 anyhow::bail!("pattern '{raw}' did not match any files");
2103 }
2104 return Ok(InputSet::Files(files));
2105 }
2106
2107 let path = PathBuf::from(raw);
2108 if path.is_dir() {
2109 let (mut files, skipped_any) = collect_directory_files(&path)?;
2110 files.sort();
2111 if files.is_empty() {
2112 if skipped_any {
2113 anyhow::bail!(
2114 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2115 path.display()
2116 );
2117 }
2118 anyhow::bail!(
2119 "directory '{}' contains no ingestible files",
2120 path.display()
2121 );
2122 }
2123 Ok(InputSet::Files(files))
2124 } else {
2125 Ok(InputSet::Files(vec![path]))
2126 }
2127}
2128
2129fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2130 let mut files = Vec::new();
2131 let mut skipped_any = false;
2132 let mut stack = vec![root.to_path_buf()];
2133 while let Some(dir) = stack.pop() {
2134 for entry in fs::read_dir(&dir)? {
2135 let entry = entry?;
2136 let path = entry.path();
2137 let file_type = entry.file_type()?;
2138 if file_type.is_dir() {
2139 if should_skip_dir(&path) {
2140 skipped_any = true;
2141 continue;
2142 }
2143 stack.push(path);
2144 } else if file_type.is_file() {
2145 if should_skip_input(&path) {
2146 skipped_any = true;
2147 continue;
2148 }
2149 files.push(path);
2150 }
2151 }
2152 }
2153 Ok((files, skipped_any))
2154}
2155
2156struct IngestOutcome {
2157 report: PutReport,
2158 embedded: bool,
2159 #[cfg(feature = "parallel_segments")]
2160 parallel_input: Option<ParallelInput>,
2161}
2162
2163struct PutReport {
2164 seq: u64,
2165 #[allow(dead_code)] frame_id: u64,
2167 uri: String,
2168 title: Option<String>,
2169 original_bytes: usize,
2170 stored_bytes: usize,
2171 compressed: bool,
2172 source: Option<PathBuf>,
2173 mime: Option<String>,
2174 metadata: Option<DocMetadata>,
2175 extraction: ExtractionSummary,
2176 #[cfg(feature = "logic_mesh")]
2178 search_text: Option<String>,
2179}
2180
2181struct CapacityGuard {
2182 #[allow(dead_code)]
2183 tier: Tier,
2184 capacity: u64,
2185 current_size: u64,
2186}
2187
2188impl CapacityGuard {
2189 const MIN_OVERHEAD: u64 = 128 * 1024;
2190 const RELATIVE_OVERHEAD: f64 = 0.05;
2191
2192 fn from_stats(stats: &Stats) -> Option<Self> {
2193 if stats.capacity_bytes == 0 {
2194 return None;
2195 }
2196 Some(Self {
2197 tier: stats.tier,
2198 capacity: stats.capacity_bytes,
2199 current_size: stats.size_bytes,
2200 })
2201 }
2202
2203 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2204 let projected = self
2205 .current_size
2206 .saturating_add(additional)
2207 .saturating_add(Self::estimate_overhead(additional));
2208 if projected > self.capacity {
2209 return Err(map_put_error(
2210 MemvidError::CapacityExceeded {
2211 current: self.current_size,
2212 limit: self.capacity,
2213 required: projected,
2214 },
2215 Some(self.capacity),
2216 ));
2217 }
2218 Ok(())
2219 }
2220
2221 fn update_after_commit(&mut self, stats: &Stats) {
2222 self.current_size = stats.size_bytes;
2223 }
2224
2225 fn capacity_hint(&self) -> Option<u64> {
2226 Some(self.capacity)
2227 }
2228
2229 fn check_and_warn_capacity(&self) {
2231 if self.capacity == 0 {
2232 return;
2233 }
2234
2235 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2236
2237 if usage_pct >= 90.0 && usage_pct < 91.0 {
2239 eprintln!(
2240 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2241 format_bytes(self.current_size),
2242 format_bytes(self.capacity)
2243 );
2244 eprintln!(" Actions:");
2245 eprintln!(" 1. Recreate with larger --size");
2246 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2247 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2248 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2249 eprintln!(
2250 "⚠️ WARNING: 75% capacity used ({} / {})",
2251 format_bytes(self.current_size),
2252 format_bytes(self.capacity)
2253 );
2254 eprintln!(" Consider planning for capacity expansion soon");
2255 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2256 eprintln!(
2257 "ℹ️ INFO: 50% capacity used ({} / {})",
2258 format_bytes(self.current_size),
2259 format_bytes(self.capacity)
2260 );
2261 }
2262 }
2263
2264 fn estimate_overhead(additional: u64) -> u64 {
2265 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2266 fractional.max(Self::MIN_OVERHEAD)
2267 }
2268}
2269
2270#[derive(Debug, Clone)]
2271pub struct FileAnalysis {
2272 pub mime: String,
2273 pub metadata: Option<DocMetadata>,
2274 pub search_text: Option<String>,
2275 pub extraction: ExtractionSummary,
2276}
2277
2278#[derive(Clone, Copy)]
2279pub struct AudioAnalyzeOptions {
2280 force: bool,
2281 segment_secs: u32,
2282 transcribe: bool,
2283}
2284
2285impl AudioAnalyzeOptions {
2286 const DEFAULT_SEGMENT_SECS: u32 = 30;
2287
2288 fn normalised_segment_secs(self) -> u32 {
2289 self.segment_secs.max(1)
2290 }
2291}
2292
2293impl Default for AudioAnalyzeOptions {
2294 fn default() -> Self {
2295 Self {
2296 force: false,
2297 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2298 transcribe: false,
2299 }
2300 }
2301}
2302
2303struct AudioAnalysis {
2304 metadata: DocAudioMetadata,
2305 caption: Option<String>,
2306 search_terms: Vec<String>,
2307 transcript: Option<String>,
2308}
2309
2310struct ImageAnalysis {
2311 width: u32,
2312 height: u32,
2313 palette: Vec<String>,
2314 caption: Option<String>,
2315 exif: Option<DocExifMetadata>,
2316}
2317
2318#[derive(Debug, Clone)]
2319pub struct ExtractionSummary {
2320 reader: Option<String>,
2321 status: ExtractionStatus,
2322 warnings: Vec<String>,
2323 duration_ms: Option<u64>,
2324 pages_processed: Option<u32>,
2325}
2326
2327impl ExtractionSummary {
2328 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2329 self.warnings.push(warning.into());
2330 }
2331}
2332
2333#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2334enum ExtractionStatus {
2335 Skipped,
2336 Ok,
2337 FallbackUsed,
2338 Empty,
2339 Failed,
2340}
2341
2342impl ExtractionStatus {
2343 fn label(self) -> &'static str {
2344 match self {
2345 Self::Skipped => "skipped",
2346 Self::Ok => "ok",
2347 Self::FallbackUsed => "fallback_used",
2348 Self::Empty => "empty",
2349 Self::Failed => "failed",
2350 }
2351 }
2352}
2353
2354fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2355 let filename = source_path
2356 .and_then(|path| path.file_name())
2357 .and_then(|name| name.to_str())
2358 .map(|value| value.to_string());
2359 MediaManifest {
2360 kind: "video".to_string(),
2361 mime: mime.to_string(),
2362 bytes,
2363 filename,
2364 duration_ms: None,
2365 width: None,
2366 height: None,
2367 codec: None,
2368 }
2369}
2370
2371pub fn analyze_file(
2372 source_path: Option<&Path>,
2373 payload: &[u8],
2374 payload_utf8: Option<&str>,
2375 inferred_title: Option<&str>,
2376 audio_opts: AudioAnalyzeOptions,
2377 force_video: bool,
2378) -> FileAnalysis {
2379 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2380 let is_video = force_video || mime.starts_with("video/");
2381 let treat_as_text = if is_video { false } else { treat_as_text_base };
2382
2383 let mut metadata = DocMetadata::default();
2384 metadata.mime = Some(mime.clone());
2385 metadata.bytes = Some(payload.len() as u64);
2386 metadata.hash = Some(hash(payload).to_hex().to_string());
2387
2388 let mut search_text = None;
2389
2390 let mut extraction = ExtractionSummary {
2391 reader: None,
2392 status: ExtractionStatus::Skipped,
2393 warnings: Vec::new(),
2394 duration_ms: None,
2395 pages_processed: None,
2396 };
2397
2398 let reader_registry = default_reader_registry();
2399 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2400 if slice.is_empty() {
2401 None
2402 } else {
2403 Some(slice)
2404 }
2405 });
2406
2407 let apply_extracted_text = |text: &str,
2408 reader_label: &str,
2409 status_if_applied: ExtractionStatus,
2410 extraction: &mut ExtractionSummary,
2411 metadata: &mut DocMetadata,
2412 search_text: &mut Option<String>|
2413 -> bool {
2414 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2415 let mut value = normalized.text;
2416 if normalized.truncated {
2417 value.push('…');
2418 }
2419 if metadata.caption.is_none() {
2420 if let Some(caption) = caption_from_text(&value) {
2421 metadata.caption = Some(caption);
2422 }
2423 }
2424 *search_text = Some(value);
2425 extraction.reader = Some(reader_label.to_string());
2426 extraction.status = status_if_applied;
2427 true
2428 } else {
2429 false
2430 }
2431 };
2432
2433 if is_video {
2434 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2435 }
2436
2437 if treat_as_text {
2438 if let Some(text) = payload_utf8 {
2439 if !apply_extracted_text(
2440 text,
2441 "bytes",
2442 ExtractionStatus::Ok,
2443 &mut extraction,
2444 &mut metadata,
2445 &mut search_text,
2446 ) {
2447 extraction.status = ExtractionStatus::Empty;
2448 extraction.reader = Some("bytes".to_string());
2449 let msg = "text payload contained no searchable content after normalization";
2450 extraction.record_warning(msg);
2451 warn!("{msg}");
2452 }
2453 } else {
2454 extraction.reader = Some("bytes".to_string());
2455 extraction.status = ExtractionStatus::Failed;
2456 let msg = "payload reported as text but was not valid UTF-8";
2457 extraction.record_warning(msg);
2458 warn!("{msg}");
2459 }
2460 } else if mime == "application/pdf"
2461 || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2462 || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2463 || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2464 || mime == "application/vnd.ms-excel"
2465 || mime == "application/msword"
2466 {
2467 let mut applied = false;
2469
2470 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2471 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2472 .with_uri(source_path.and_then(|p| p.to_str()))
2473 .with_magic(magic);
2474
2475 if let Some(reader) = reader_registry.find_reader(&hint) {
2476 match reader.extract(payload, &hint) {
2477 Ok(output) => {
2478 extraction.reader = Some(output.reader_name.clone());
2479 if let Some(ms) = output.diagnostics.duration_ms {
2480 extraction.duration_ms = Some(ms);
2481 }
2482 if let Some(pages) = output.diagnostics.pages_processed {
2483 extraction.pages_processed = Some(pages);
2484 }
2485 if let Some(mime_type) = output.document.mime_type.clone() {
2486 metadata.mime = Some(mime_type);
2487 }
2488
2489 for warning in output.diagnostics.warnings.iter() {
2490 extraction.record_warning(warning);
2491 warn!("{warning}");
2492 }
2493
2494 let status = if output.diagnostics.fallback {
2495 ExtractionStatus::FallbackUsed
2496 } else {
2497 ExtractionStatus::Ok
2498 };
2499
2500 if let Some(doc_text) = output.document.text.as_ref() {
2501 applied = apply_extracted_text(
2502 doc_text,
2503 output.reader_name.as_str(),
2504 status,
2505 &mut extraction,
2506 &mut metadata,
2507 &mut search_text,
2508 );
2509 if !applied {
2510 extraction.reader = Some(output.reader_name);
2511 extraction.status = ExtractionStatus::Empty;
2512 let msg = "primary reader returned no usable text";
2513 extraction.record_warning(msg);
2514 warn!("{msg}");
2515 }
2516 } else {
2517 extraction.reader = Some(output.reader_name);
2518 extraction.status = ExtractionStatus::Empty;
2519 let msg = "primary reader returned empty text";
2520 extraction.record_warning(msg);
2521 warn!("{msg}");
2522 }
2523 }
2524 Err(err) => {
2525 let name = reader.name();
2526 extraction.reader = Some(name.to_string());
2527 extraction.status = ExtractionStatus::Failed;
2528 let msg = format!("primary reader {name} failed: {err}");
2529 extraction.record_warning(&msg);
2530 warn!("{msg}");
2531 }
2532 }
2533 } else {
2534 extraction.record_warning("no registered reader matched this PDF payload");
2535 warn!("no registered reader matched this PDF payload");
2536 }
2537
2538 if !applied {
2539 match extract_pdf_text(payload) {
2540 Ok(pdf_text) => {
2541 if !apply_extracted_text(
2542 &pdf_text,
2543 "pdf_extract",
2544 ExtractionStatus::FallbackUsed,
2545 &mut extraction,
2546 &mut metadata,
2547 &mut search_text,
2548 ) {
2549 extraction.reader = Some("pdf_extract".to_string());
2550 extraction.status = ExtractionStatus::Empty;
2551 let msg = "PDF text extraction yielded no searchable content";
2552 extraction.record_warning(msg);
2553 warn!("{msg}");
2554 }
2555 }
2556 Err(err) => {
2557 extraction.reader = Some("pdf_extract".to_string());
2558 extraction.status = ExtractionStatus::Failed;
2559 let msg = format!("failed to extract PDF text via fallback: {err}");
2560 extraction.record_warning(&msg);
2561 warn!("{msg}");
2562 }
2563 }
2564 }
2565 }
2566
2567 if !is_video && mime.starts_with("image/") {
2568 if let Some(image_meta) = analyze_image(payload) {
2569 let ImageAnalysis {
2570 width,
2571 height,
2572 palette,
2573 caption,
2574 exif,
2575 } = image_meta;
2576 metadata.width = Some(width);
2577 metadata.height = Some(height);
2578 if !palette.is_empty() {
2579 metadata.colors = Some(palette);
2580 }
2581 if metadata.caption.is_none() {
2582 metadata.caption = caption;
2583 }
2584 if let Some(exif) = exif {
2585 metadata.exif = Some(exif);
2586 }
2587 }
2588 }
2589
2590 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2591 if let Some(AudioAnalysis {
2592 metadata: audio_metadata,
2593 caption: audio_caption,
2594 mut search_terms,
2595 transcript,
2596 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2597 {
2598 if metadata.audio.is_none()
2599 || metadata
2600 .audio
2601 .as_ref()
2602 .map_or(true, DocAudioMetadata::is_empty)
2603 {
2604 metadata.audio = Some(audio_metadata);
2605 }
2606 if metadata.caption.is_none() {
2607 metadata.caption = audio_caption;
2608 }
2609
2610 if let Some(ref text) = transcript {
2612 extraction.reader = Some("whisper".to_string());
2613 extraction.status = ExtractionStatus::Ok;
2614 search_text = Some(text.clone());
2615 } else if !search_terms.is_empty() {
2616 match &mut search_text {
2617 Some(existing) => {
2618 for term in search_terms.drain(..) {
2619 if !existing.contains(&term) {
2620 if !existing.ends_with(' ') {
2621 existing.push(' ');
2622 }
2623 existing.push_str(&term);
2624 }
2625 }
2626 }
2627 None => {
2628 search_text = Some(search_terms.join(" "));
2629 }
2630 }
2631 }
2632 }
2633 }
2634
2635 if metadata.caption.is_none() {
2636 if let Some(title) = inferred_title {
2637 let trimmed = title.trim();
2638 if !trimmed.is_empty() {
2639 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2640 }
2641 }
2642 }
2643
2644 FileAnalysis {
2645 mime,
2646 metadata: if metadata.is_empty() {
2647 None
2648 } else {
2649 Some(metadata)
2650 },
2651 search_text,
2652 extraction,
2653 }
2654}
2655
2656fn default_reader_registry() -> &'static ReaderRegistry {
2657 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2658 REGISTRY.get_or_init(ReaderRegistry::default)
2659}
2660
2661fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2662 if detect_pdf_magic(magic) {
2663 return Some(DocumentFormat::Pdf);
2664 }
2665
2666 let mime = mime?.trim().to_ascii_lowercase();
2667 match mime.as_str() {
2668 "application/pdf" => Some(DocumentFormat::Pdf),
2669 "text/plain" => Some(DocumentFormat::PlainText),
2670 "text/markdown" => Some(DocumentFormat::Markdown),
2671 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2672 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2673 Some(DocumentFormat::Docx)
2674 }
2675 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2676 Some(DocumentFormat::Pptx)
2677 }
2678 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2679 Some(DocumentFormat::Xlsx)
2680 }
2681 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2682 _ => None,
2683 }
2684}
2685
2686fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2687 let mut slice = match magic {
2688 Some(slice) if !slice.is_empty() => slice,
2689 _ => return false,
2690 };
2691 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2692 slice = &slice[3..];
2693 }
2694 while let Some((first, rest)) = slice.split_first() {
2695 if first.is_ascii_whitespace() {
2696 slice = rest;
2697 } else {
2698 break;
2699 }
2700 }
2701 slice.starts_with(b"%PDF")
2702}
2703
2704fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2705 match pdf_extract::extract_text_from_mem(payload) {
2706 Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2707 Ok(fallback) => Ok(fallback),
2708 Err(err) => {
2709 warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2710 Ok(text)
2711 }
2712 },
2713 Err(err) => {
2714 warn!("pdf_extract failed: {err}");
2715 match fallback_pdftotext(payload) {
2716 Ok(fallback) => Ok(fallback),
2717 Err(fallback_err) => {
2718 warn!("pdftotext fallback failed: {fallback_err}");
2719 Err(err)
2720 }
2721 }
2722 }
2723 other => other,
2724 }
2725}
2726
2727fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2728 use std::io::Write;
2729 use std::process::{Command, Stdio};
2730
2731 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2732
2733 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2734 temp_pdf
2735 .write_all(payload)
2736 .context("failed to write pdf payload to temp file")?;
2737 temp_pdf.flush().context("failed to flush temp pdf file")?;
2738
2739 let child = Command::new(pdftotext)
2740 .arg("-layout")
2741 .arg(temp_pdf.path())
2742 .arg("-")
2743 .stdout(Stdio::piped())
2744 .spawn()
2745 .context("failed to spawn pdftotext")?;
2746
2747 let output = child
2748 .wait_with_output()
2749 .context("failed to read pdftotext output")?;
2750
2751 if !output.status.success() {
2752 return Err(anyhow!("pdftotext exited with status {}", output.status));
2753 }
2754
2755 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2756 Ok(text)
2757}
2758
2759fn detect_mime(
2760 source_path: Option<&Path>,
2761 payload: &[u8],
2762 payload_utf8: Option<&str>,
2763) -> (String, bool) {
2764 if let Some(kind) = infer::get(payload) {
2765 let mime = kind.mime_type().to_string();
2766 let treat_as_text = mime.starts_with("text/")
2767 || matches!(
2768 mime.as_str(),
2769 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2770 );
2771 return (mime, treat_as_text);
2772 }
2773
2774 let magic = magic_from_u8(payload);
2775 if !magic.is_empty() && magic != "application/octet-stream" {
2776 let treat_as_text = magic.starts_with("text/")
2777 || matches!(
2778 magic,
2779 "application/json"
2780 | "application/xml"
2781 | "application/javascript"
2782 | "image/svg+xml"
2783 | "text/plain"
2784 );
2785 return (magic.to_string(), treat_as_text);
2786 }
2787
2788 if let Some(path) = source_path {
2789 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2790 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2791 return (mime.to_string(), treat_as_text);
2792 }
2793 }
2794 }
2795
2796 if payload_utf8.is_some() {
2797 return ("text/plain".to_string(), true);
2798 }
2799
2800 ("application/octet-stream".to_string(), false)
2801}
2802
2803fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2804 let ext_lower = ext.to_ascii_lowercase();
2805 match ext_lower.as_str() {
2806 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2807 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2808 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2809 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2810 "md" | "markdown" => Some(("text/markdown", true)),
2811 "rst" => Some(("text/x-rst", true)),
2812 "json" => Some(("application/json", true)),
2813 "csv" => Some(("text/csv", true)),
2814 "tsv" => Some(("text/tab-separated-values", true)),
2815 "yaml" | "yml" => Some(("application/yaml", true)),
2816 "toml" => Some(("application/toml", true)),
2817 "html" | "htm" => Some(("text/html", true)),
2818 "xml" => Some(("application/xml", true)),
2819 "svg" => Some(("image/svg+xml", true)),
2820 "gif" => Some(("image/gif", false)),
2821 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2822 "png" => Some(("image/png", false)),
2823 "bmp" => Some(("image/bmp", false)),
2824 "ico" => Some(("image/x-icon", false)),
2825 "tif" | "tiff" => Some(("image/tiff", false)),
2826 "webp" => Some(("image/webp", false)),
2827 _ => None,
2828 }
2829}
2830
2831fn caption_from_text(text: &str) -> Option<String> {
2832 for line in text.lines() {
2833 let trimmed = line.trim();
2834 if !trimmed.is_empty() {
2835 return Some(truncate_to_boundary(trimmed, 240));
2836 }
2837 }
2838 None
2839}
2840
2841fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2842 if text.len() <= max_len {
2843 return text.to_string();
2844 }
2845 let mut end = max_len;
2846 while end > 0 && !text.is_char_boundary(end) {
2847 end -= 1;
2848 }
2849 if end == 0 {
2850 return String::new();
2851 }
2852 let mut truncated = text[..end].trim_end().to_string();
2853 truncated.push('…');
2854 truncated
2855}
2856
2857fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2858 let reader = ImageReader::new(Cursor::new(payload))
2859 .with_guessed_format()
2860 .map_err(|err| warn!("failed to guess image format: {err}"))
2861 .ok()?;
2862 let image = reader
2863 .decode()
2864 .map_err(|err| warn!("failed to decode image: {err}"))
2865 .ok()?;
2866 let width = image.width();
2867 let height = image.height();
2868 let rgb = image.to_rgb8();
2869 let palette = match get_palette(
2870 rgb.as_raw(),
2871 ColorFormat::Rgb,
2872 COLOR_PALETTE_SIZE,
2873 COLOR_PALETTE_QUALITY,
2874 ) {
2875 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2876 Err(err) => {
2877 warn!("failed to compute colour palette: {err}");
2878 Vec::new()
2879 }
2880 };
2881
2882 let (exif, exif_caption) = extract_exif_metadata(payload);
2883
2884 Some(ImageAnalysis {
2885 width,
2886 height,
2887 palette,
2888 caption: exif_caption,
2889 exif,
2890 })
2891}
2892
2893fn color_to_hex(color: Color) -> String {
2894 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2895}
2896
2897fn analyze_audio(
2898 payload: &[u8],
2899 source_path: Option<&Path>,
2900 mime: &str,
2901 options: AudioAnalyzeOptions,
2902) -> Option<AudioAnalysis> {
2903 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2904 use symphonia::core::errors::Error as SymphoniaError;
2905 use symphonia::core::formats::FormatOptions;
2906 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2907 use symphonia::core::meta::MetadataOptions;
2908 use symphonia::core::probe::Hint;
2909
2910 let cursor = Cursor::new(payload.to_vec());
2911 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2912
2913 let mut hint = Hint::new();
2914 if let Some(path) = source_path {
2915 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2916 hint.with_extension(ext);
2917 }
2918 }
2919 if let Some(suffix) = mime.split('/').nth(1) {
2920 hint.with_extension(suffix);
2921 }
2922
2923 let probed = symphonia::default::get_probe()
2924 .format(
2925 &hint,
2926 mss,
2927 &FormatOptions::default(),
2928 &MetadataOptions::default(),
2929 )
2930 .map_err(|err| warn!("failed to probe audio stream: {err}"))
2931 .ok()?;
2932
2933 let mut format = probed.format;
2934 let track = format.default_track()?;
2935 let track_id = track.id;
2936 let codec_params: CodecParameters = track.codec_params.clone();
2937 let sample_rate = codec_params.sample_rate;
2938 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2939
2940 let mut decoder = symphonia::default::get_codecs()
2941 .make(&codec_params, &DecoderOptions::default())
2942 .map_err(|err| warn!("failed to create audio decoder: {err}"))
2943 .ok()?;
2944
2945 let mut decoded_frames: u64 = 0;
2946 loop {
2947 let packet = match format.next_packet() {
2948 Ok(packet) => packet,
2949 Err(SymphoniaError::IoError(_)) => break,
2950 Err(SymphoniaError::DecodeError(err)) => {
2951 warn!("skipping undecodable audio packet: {err}");
2952 continue;
2953 }
2954 Err(SymphoniaError::ResetRequired) => {
2955 decoder.reset();
2956 continue;
2957 }
2958 Err(other) => {
2959 warn!("stopping audio analysis due to error: {other}");
2960 break;
2961 }
2962 };
2963
2964 if packet.track_id() != track_id {
2965 continue;
2966 }
2967
2968 match decoder.decode(&packet) {
2969 Ok(audio_buf) => {
2970 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2971 }
2972 Err(SymphoniaError::DecodeError(err)) => {
2973 warn!("failed to decode audio packet: {err}");
2974 }
2975 Err(SymphoniaError::IoError(_)) => break,
2976 Err(SymphoniaError::ResetRequired) => {
2977 decoder.reset();
2978 }
2979 Err(other) => {
2980 warn!("decoder error: {other}");
2981 break;
2982 }
2983 }
2984 }
2985
2986 let duration_secs = match sample_rate {
2987 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2988 _ => 0.0,
2989 };
2990
2991 let bitrate_kbps = if duration_secs > 0.0 {
2992 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2993 } else {
2994 None
2995 };
2996
2997 let tags = extract_lofty_tags(payload);
2998 let caption = tags
2999 .get("title")
3000 .cloned()
3001 .or_else(|| tags.get("tracktitle").cloned());
3002
3003 let mut search_terms = Vec::new();
3004 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3005 search_terms.push(title.clone());
3006 }
3007 if let Some(artist) = tags.get("artist") {
3008 search_terms.push(artist.clone());
3009 }
3010 if let Some(album) = tags.get("album") {
3011 search_terms.push(album.clone());
3012 }
3013 if let Some(genre) = tags.get("genre") {
3014 search_terms.push(genre.clone());
3015 }
3016
3017 let mut segments = Vec::new();
3018 if duration_secs > 0.0 {
3019 let segment_len = options.normalised_segment_secs() as f64;
3020 if segment_len > 0.0 {
3021 let mut start = 0.0;
3022 let mut idx = 1usize;
3023 while start < duration_secs {
3024 let end = (start + segment_len).min(duration_secs);
3025 segments.push(AudioSegmentMetadata {
3026 start_seconds: start as f32,
3027 end_seconds: end as f32,
3028 label: Some(format!("Segment {}", idx)),
3029 });
3030 if end >= duration_secs {
3031 break;
3032 }
3033 start = end;
3034 idx += 1;
3035 }
3036 }
3037 }
3038
3039 let mut metadata = DocAudioMetadata::default();
3040 if duration_secs > 0.0 {
3041 metadata.duration_secs = Some(duration_secs as f32);
3042 }
3043 if let Some(rate) = sample_rate {
3044 metadata.sample_rate_hz = Some(rate);
3045 }
3046 if let Some(channels) = channel_count {
3047 metadata.channels = Some(channels);
3048 }
3049 if let Some(bitrate) = bitrate_kbps {
3050 metadata.bitrate_kbps = Some(bitrate);
3051 }
3052 if codec_params.codec != CODEC_TYPE_NULL {
3053 metadata.codec = Some(format!("{:?}", codec_params.codec));
3054 }
3055 if !segments.is_empty() {
3056 metadata.segments = segments;
3057 }
3058 if !tags.is_empty() {
3059 metadata.tags = tags
3060 .iter()
3061 .map(|(k, v)| (k.clone(), v.clone()))
3062 .collect::<BTreeMap<_, _>>();
3063 }
3064
3065 let transcript = if options.transcribe {
3067 transcribe_audio(source_path)
3068 } else {
3069 None
3070 };
3071
3072 Some(AudioAnalysis {
3073 metadata,
3074 caption,
3075 search_terms,
3076 transcript,
3077 })
3078}
3079
3080#[cfg(feature = "whisper")]
3082fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3083 use memvid_core::{WhisperConfig, WhisperTranscriber};
3084
3085 let path = source_path?;
3086
3087 tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3088
3089 let config = WhisperConfig::default();
3090 let mut transcriber = match WhisperTranscriber::new(&config) {
3091 Ok(t) => t,
3092 Err(e) => {
3093 tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3094 return None;
3095 }
3096 };
3097
3098 match transcriber.transcribe_file(path) {
3099 Ok(result) => {
3100 tracing::info!(
3101 duration = result.duration_secs,
3102 text_len = result.text.len(),
3103 "Audio transcription complete"
3104 );
3105 Some(result.text)
3106 }
3107 Err(e) => {
3108 tracing::warn!(error = %e, "Failed to transcribe audio");
3109 None
3110 }
3111 }
3112}
3113
3114#[cfg(not(feature = "whisper"))]
3115fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3116 tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3117 None
3118}
3119
3120fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3121 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3122
3123 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3124 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3125 out.entry("title".into())
3126 .or_insert_with(|| value.to_string());
3127 out.entry("tracktitle".into())
3128 .or_insert_with(|| value.to_string());
3129 }
3130 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3131 out.entry("artist".into())
3132 .or_insert_with(|| value.to_string());
3133 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3134 out.entry("artist".into())
3135 .or_insert_with(|| value.to_string());
3136 }
3137 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3138 out.entry("album".into())
3139 .or_insert_with(|| value.to_string());
3140 }
3141 if let Some(value) = tag.get_string(&ItemKey::Genre) {
3142 out.entry("genre".into())
3143 .or_insert_with(|| value.to_string());
3144 }
3145 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3146 out.entry("track_number".into())
3147 .or_insert_with(|| value.to_string());
3148 }
3149 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3150 out.entry("disc_number".into())
3151 .or_insert_with(|| value.to_string());
3152 }
3153 }
3154
3155 let mut tags = HashMap::new();
3156 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3157 Ok(probe) => probe,
3158 Err(err) => {
3159 warn!("failed to guess audio tag file type: {err}");
3160 return tags;
3161 }
3162 };
3163
3164 let tagged_file = match probe.read() {
3165 Ok(file) => file,
3166 Err(err) => {
3167 warn!("failed to read audio tags: {err}");
3168 return tags;
3169 }
3170 };
3171
3172 if let Some(primary) = tagged_file.primary_tag() {
3173 collect_tag(primary, &mut tags);
3174 }
3175 for tag in tagged_file.tags() {
3176 collect_tag(tag, &mut tags);
3177 }
3178
3179 tags
3180}
3181
3182fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3183 let mut cursor = Cursor::new(payload);
3184 let exif = match ExifReader::new().read_from_container(&mut cursor) {
3185 Ok(exif) => exif,
3186 Err(err) => {
3187 warn!("failed to read EXIF metadata: {err}");
3188 return (None, None);
3189 }
3190 };
3191
3192 let mut doc = DocExifMetadata::default();
3193 let mut has_data = false;
3194
3195 if let Some(make) = exif_string(&exif, Tag::Make) {
3196 doc.make = Some(make);
3197 has_data = true;
3198 }
3199 if let Some(model) = exif_string(&exif, Tag::Model) {
3200 doc.model = Some(model);
3201 has_data = true;
3202 }
3203 if let Some(lens) =
3204 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3205 {
3206 doc.lens = Some(lens);
3207 has_data = true;
3208 }
3209 if let Some(dt) =
3210 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3211 {
3212 doc.datetime = Some(dt);
3213 has_data = true;
3214 }
3215
3216 if let Some(gps) = extract_gps_metadata(&exif) {
3217 doc.gps = Some(gps);
3218 has_data = true;
3219 }
3220
3221 let caption = exif_string(&exif, Tag::ImageDescription);
3222
3223 let metadata = if has_data { Some(doc) } else { None };
3224 (metadata, caption)
3225}
3226
3227fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3228 exif.fields()
3229 .find(|field| field.tag == tag)
3230 .and_then(|field| field_to_string(field, exif))
3231}
3232
3233fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3234 let value = field.display_value().with_unit(exif).to_string();
3235 let trimmed = value.trim_matches('\0').trim();
3236 if trimmed.is_empty() {
3237 None
3238 } else {
3239 Some(trimmed.to_string())
3240 }
3241}
3242
3243fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3244 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3245 let latitude_ref_field = exif
3246 .fields()
3247 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3248 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3249 let longitude_ref_field = exif
3250 .fields()
3251 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3252
3253 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3254 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3255
3256 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3257 if reference.eq_ignore_ascii_case("S") {
3258 latitude = -latitude;
3259 }
3260 }
3261 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3262 if reference.eq_ignore_ascii_case("W") {
3263 longitude = -longitude;
3264 }
3265 }
3266
3267 Some(DocGpsMetadata {
3268 latitude,
3269 longitude,
3270 })
3271}
3272
3273fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3274 match value {
3275 ExifValue::Rational(values) if !values.is_empty() => {
3276 let deg = rational_to_f64_u(values.get(0)?)?;
3277 let min = values
3278 .get(1)
3279 .and_then(|v| rational_to_f64_u(v))
3280 .unwrap_or(0.0);
3281 let sec = values
3282 .get(2)
3283 .and_then(|v| rational_to_f64_u(v))
3284 .unwrap_or(0.0);
3285 Some(deg + (min / 60.0) + (sec / 3600.0))
3286 }
3287 ExifValue::SRational(values) if !values.is_empty() => {
3288 let deg = rational_to_f64_i(values.get(0)?)?;
3289 let min = values
3290 .get(1)
3291 .and_then(|v| rational_to_f64_i(v))
3292 .unwrap_or(0.0);
3293 let sec = values
3294 .get(2)
3295 .and_then(|v| rational_to_f64_i(v))
3296 .unwrap_or(0.0);
3297 Some(deg + (min / 60.0) + (sec / 3600.0))
3298 }
3299 _ => None,
3300 }
3301}
3302
3303fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3304 if value.denom == 0 {
3305 None
3306 } else {
3307 Some(value.num as f64 / value.denom as f64)
3308 }
3309}
3310
3311fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3312 if value.denom == 0 {
3313 None
3314 } else {
3315 Some(value.num as f64 / value.denom as f64)
3316 }
3317}
3318
3319fn value_to_ascii(value: &ExifValue) -> Option<String> {
3320 if let ExifValue::Ascii(values) = value {
3321 values
3322 .first()
3323 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3324 .map(|s| s.trim_matches('\0').trim().to_string())
3325 .filter(|s| !s.is_empty())
3326 } else {
3327 None
3328 }
3329}
3330
3331fn ingest_payload(
3332 mem: &mut Memvid,
3333 args: &PutArgs,
3334 payload: Vec<u8>,
3335 source_path: Option<&Path>,
3336 capacity_guard: Option<&mut CapacityGuard>,
3337 enable_embedding: bool,
3338 runtime: Option<&EmbeddingRuntime>,
3339 parallel_mode: bool,
3340) -> Result<IngestOutcome> {
3341 let original_bytes = payload.len();
3342 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3343
3344 let payload_text = std::str::from_utf8(&payload).ok();
3345 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3346
3347 let audio_opts = AudioAnalyzeOptions {
3348 force: args.audio || args.transcribe,
3349 segment_secs: args
3350 .audio_segment_seconds
3351 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3352 transcribe: args.transcribe,
3353 };
3354
3355 let mut analysis = analyze_file(
3356 source_path,
3357 &payload,
3358 payload_text,
3359 inferred_title.as_deref(),
3360 audio_opts,
3361 args.video,
3362 );
3363
3364 if args.video && !analysis.mime.starts_with("video/") {
3365 anyhow::bail!(
3366 "--video requires a video input; detected MIME type {}",
3367 analysis.mime
3368 );
3369 }
3370
3371 let mut search_text = analysis.search_text.clone();
3372 if let Some(ref mut text) = search_text {
3373 if text.len() > MAX_SEARCH_TEXT_LEN {
3374 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3375 *text = truncated;
3376 }
3377 }
3378 analysis.search_text = search_text.clone();
3379
3380 let uri = if let Some(ref explicit) = args.uri {
3381 derive_uri(Some(explicit), None)
3382 } else if args.video {
3383 derive_video_uri(&payload, source_path, &analysis.mime)
3384 } else {
3385 derive_uri(None, source_path)
3386 };
3387
3388 let mut options_builder = PutOptions::builder()
3389 .enable_embedding(enable_embedding)
3390 .auto_tag(!args.no_auto_tag)
3391 .extract_dates(!args.no_extract_dates);
3392 if let Some(ts) = args.timestamp {
3393 options_builder = options_builder.timestamp(ts);
3394 }
3395 if let Some(track) = &args.track {
3396 options_builder = options_builder.track(track.clone());
3397 }
3398 if args.video {
3399 options_builder = options_builder.kind("video");
3400 options_builder = options_builder.tag("kind", "video");
3401 options_builder = options_builder.push_tag("video");
3402 } else if let Some(kind) = &args.kind {
3403 options_builder = options_builder.kind(kind.clone());
3404 }
3405 options_builder = options_builder.uri(uri.clone());
3406 if let Some(ref title) = inferred_title {
3407 options_builder = options_builder.title(title.clone());
3408 }
3409 for (key, value) in &args.tags {
3410 options_builder = options_builder.tag(key.clone(), value.clone());
3411 if !key.is_empty() {
3412 options_builder = options_builder.push_tag(key.clone());
3413 }
3414 if !value.is_empty() && value != key {
3415 options_builder = options_builder.push_tag(value.clone());
3416 }
3417 }
3418 for label in &args.labels {
3419 options_builder = options_builder.label(label.clone());
3420 }
3421 if let Some(metadata) = analysis.metadata.clone() {
3422 if !metadata.is_empty() {
3423 options_builder = options_builder.metadata(metadata);
3424 }
3425 }
3426 for (idx, entry) in args.metadata.iter().enumerate() {
3427 match serde_json::from_str::<DocMetadata>(entry) {
3428 Ok(meta) => {
3429 options_builder = options_builder.metadata(meta);
3430 }
3431 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3432 Ok(value) => {
3433 options_builder =
3434 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3435 }
3436 Err(err) => {
3437 warn!("failed to parse --metadata JSON: {err}");
3438 }
3439 },
3440 }
3441 }
3442 if let Some(text) = search_text.clone() {
3443 if !text.trim().is_empty() {
3444 options_builder = options_builder.search_text(text);
3445 }
3446 }
3447 let mut options = options_builder.build();
3448
3449 let existing_frame = if args.update_existing || !args.allow_duplicate {
3450 match mem.frame_by_uri(&uri) {
3451 Ok(frame) => Some(frame),
3452 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3453 Err(err) => return Err(err.into()),
3454 }
3455 } else {
3456 None
3457 };
3458
3459 let frame_id = if let Some(ref existing) = existing_frame {
3462 existing.id
3463 } else {
3464 mem.stats()?.frame_count
3465 };
3466
3467 let mut embedded = false;
3468 let allow_embedding = enable_embedding && !args.video;
3469 if enable_embedding && !allow_embedding && args.video {
3470 warn!("semantic embeddings are not generated for video payloads");
3471 }
3472 let seq = if let Some(existing) = existing_frame {
3473 if args.update_existing {
3474 let payload_for_update = payload.clone();
3475 if allow_embedding {
3476 if let Some(path) = args.embedding_vec.as_deref() {
3477 let embedding = crate::utils::read_embedding(path)?;
3478 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3479 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3480 let expected = choice.dimensions();
3481 if expected != 0 && embedding.len() != expected {
3482 anyhow::bail!(
3483 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3484 embedding.len(),
3485 choice.name(),
3486 expected
3487 );
3488 }
3489 apply_embedding_identity_metadata_from_choice(
3490 &mut options,
3491 choice,
3492 embedding.len(),
3493 Some(model_str),
3494 );
3495 } else {
3496 options.extra_metadata.insert(
3497 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3498 embedding.len().to_string(),
3499 );
3500 }
3501 embedded = true;
3502 mem.update_frame(
3503 existing.id,
3504 Some(payload_for_update),
3505 options.clone(),
3506 Some(embedding),
3507 )
3508 .map_err(|err| {
3509 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3510 })?
3511 } else {
3512 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3513 let embed_text = search_text
3514 .clone()
3515 .or_else(|| payload_text.map(|text| text.to_string()))
3516 .unwrap_or_default();
3517 if embed_text.trim().is_empty() {
3518 warn!("no textual content available; embedding skipped");
3519 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3520 .map_err(|err| {
3521 map_put_error(
3522 err,
3523 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3524 )
3525 })?
3526 } else {
3527 let truncated_text = truncate_for_embedding(&embed_text);
3529 if truncated_text.len() < embed_text.len() {
3530 info!(
3531 "Truncated text from {} to {} chars for embedding",
3532 embed_text.len(),
3533 truncated_text.len()
3534 );
3535 }
3536 let embedding = runtime.embed_passage(&truncated_text)?;
3537 embedded = true;
3538 apply_embedding_identity_metadata(&mut options, runtime);
3539 mem.update_frame(
3540 existing.id,
3541 Some(payload_for_update),
3542 options.clone(),
3543 Some(embedding),
3544 )
3545 .map_err(|err| {
3546 map_put_error(
3547 err,
3548 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3549 )
3550 })?
3551 }
3552 }
3553 } else {
3554 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3555 .map_err(|err| {
3556 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3557 })?
3558 }
3559 } else {
3560 return Err(DuplicateUriError::new(uri.as_str()).into());
3561 }
3562 } else {
3563 if let Some(guard) = capacity_guard.as_ref() {
3564 guard.ensure_capacity(stored_bytes as u64)?;
3565 }
3566 if parallel_mode {
3567 #[cfg(feature = "parallel_segments")]
3568 {
3569 let mut parent_embedding = None;
3570 let mut chunk_embeddings_vec = None;
3571 if allow_embedding {
3572 if let Some(path) = args.embedding_vec.as_deref() {
3573 let embedding = crate::utils::read_embedding(path)?;
3574 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3575 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3576 let expected = choice.dimensions();
3577 if expected != 0 && embedding.len() != expected {
3578 anyhow::bail!(
3579 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3580 embedding.len(),
3581 choice.name(),
3582 expected
3583 );
3584 }
3585 apply_embedding_identity_metadata_from_choice(
3586 &mut options,
3587 choice,
3588 embedding.len(),
3589 Some(model_str),
3590 );
3591 } else {
3592 options.extra_metadata.insert(
3593 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3594 embedding.len().to_string(),
3595 );
3596 }
3597 parent_embedding = Some(embedding);
3598 embedded = true;
3599 } else {
3600 let runtime =
3601 runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3602 let embed_text = search_text
3603 .clone()
3604 .or_else(|| payload_text.map(|text| text.to_string()))
3605 .unwrap_or_default();
3606 if embed_text.trim().is_empty() {
3607 warn!("no textual content available; embedding skipped");
3608 } else {
3609 info!(
3611 "parallel mode: checking for chunks on {} bytes payload",
3612 payload.len()
3613 );
3614 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3615 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3617
3618 #[cfg(feature = "temporal_enrich")]
3620 let enriched_chunks = if args.temporal_enrich {
3621 info!(
3622 "Temporal enrichment enabled, processing {} chunks",
3623 chunk_texts.len()
3624 );
3625 let today = chrono::Local::now().date_naive();
3627 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3628 let enriched: Vec<String> =
3630 results.iter().map(|(text, _)| text.clone()).collect();
3631 let resolved_count = results
3632 .iter()
3633 .filter(|(_, e)| {
3634 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3635 })
3636 .count();
3637 info!(
3638 "Temporal enrichment: resolved {} chunks with temporal context",
3639 resolved_count
3640 );
3641 enriched
3642 } else {
3643 chunk_texts.clone()
3644 };
3645 #[cfg(not(feature = "temporal_enrich"))]
3646 let enriched_chunks = {
3647 if args.temporal_enrich {
3648 warn!(
3649 "Temporal enrichment requested but feature not compiled in"
3650 );
3651 }
3652 chunk_texts.clone()
3653 };
3654
3655 let embed_chunks = if args.contextual {
3657 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3658 let engine = if args.contextual_model.as_deref() == Some("local") {
3659 #[cfg(feature = "llama-cpp")]
3660 {
3661 let model_path = get_local_contextual_model_path()?;
3662 ContextualEngine::local(model_path)
3663 }
3664 #[cfg(not(feature = "llama-cpp"))]
3665 {
3666 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3667 }
3668 } else if let Some(model) = &args.contextual_model {
3669 ContextualEngine::openai_with_model(model)?
3670 } else {
3671 ContextualEngine::openai()?
3672 };
3673
3674 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3675 {
3676 Ok(contexts) => {
3677 info!("Generated {} contextual prefixes", contexts.len());
3678 apply_contextual_prefixes(
3679 &embed_text,
3680 &enriched_chunks,
3681 &contexts,
3682 )
3683 }
3684 Err(e) => {
3685 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3686 enriched_chunks.clone()
3687 }
3688 }
3689 } else {
3690 enriched_chunks
3691 };
3692
3693 let truncated_chunks: Vec<String> = embed_chunks
3694 .iter()
3695 .map(|chunk_text| {
3696 let truncated_chunk = truncate_for_embedding(chunk_text);
3698 if truncated_chunk.len() < chunk_text.len() {
3699 info!(
3700 "parallel mode: Truncated chunk from {} to {} chars for embedding",
3701 chunk_text.len(),
3702 truncated_chunk.len()
3703 );
3704 }
3705 truncated_chunk
3706 })
3707 .collect();
3708 let truncated_refs: Vec<&str> =
3709 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3710 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3711 let num_chunks = chunk_embeddings.len();
3712 chunk_embeddings_vec = Some(chunk_embeddings);
3713 let parent_text = truncate_for_embedding(&embed_text);
3715 if parent_text.len() < embed_text.len() {
3716 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3717 }
3718 parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3719 embedded = true;
3720 info!(
3721 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3722 num_chunks
3723 );
3724 } else {
3725 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3727 let truncated_text = truncate_for_embedding(&embed_text);
3728 if truncated_text.len() < embed_text.len() {
3729 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3730 }
3731 parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3732 embedded = true;
3733 }
3734 }
3735 }
3736 }
3737 if embedded {
3738 if let Some(runtime) = runtime {
3739 apply_embedding_identity_metadata(&mut options, runtime);
3740 }
3741 }
3742 let payload_variant = if let Some(path) = source_path {
3743 ParallelPayload::Path(path.to_path_buf())
3744 } else {
3745 ParallelPayload::Bytes(payload)
3746 };
3747 let input = ParallelInput {
3748 payload: payload_variant,
3749 options: options.clone(),
3750 embedding: parent_embedding,
3751 chunk_embeddings: chunk_embeddings_vec,
3752 };
3753 return Ok(IngestOutcome {
3754 report: PutReport {
3755 seq: 0,
3756 frame_id: 0, uri,
3758 title: inferred_title,
3759 original_bytes,
3760 stored_bytes,
3761 compressed,
3762 source: source_path.map(Path::to_path_buf),
3763 mime: Some(analysis.mime),
3764 metadata: analysis.metadata,
3765 extraction: analysis.extraction,
3766 #[cfg(feature = "logic_mesh")]
3767 search_text: search_text.clone(),
3768 },
3769 embedded,
3770 parallel_input: Some(input),
3771 });
3772 }
3773 #[cfg(not(feature = "parallel_segments"))]
3774 {
3775 mem.put_bytes_with_options(&payload, options)
3776 .map_err(|err| {
3777 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3778 })?
3779 }
3780 } else if allow_embedding {
3781 if let Some(path) = args.embedding_vec.as_deref() {
3782 let embedding = crate::utils::read_embedding(path)?;
3783 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3784 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3785 let expected = choice.dimensions();
3786 if expected != 0 && embedding.len() != expected {
3787 anyhow::bail!(
3788 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3789 embedding.len(),
3790 choice.name(),
3791 expected
3792 );
3793 }
3794 apply_embedding_identity_metadata_from_choice(
3795 &mut options,
3796 choice,
3797 embedding.len(),
3798 Some(model_str),
3799 );
3800 } else {
3801 options.extra_metadata.insert(
3802 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3803 embedding.len().to_string(),
3804 );
3805 }
3806 embedded = true;
3807 mem.put_with_embedding_and_options(&payload, embedding, options)
3808 .map_err(|err| {
3809 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3810 })?
3811 } else {
3812 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3813 let embed_text = search_text
3814 .clone()
3815 .or_else(|| payload_text.map(|text| text.to_string()))
3816 .unwrap_or_default();
3817 if embed_text.trim().is_empty() {
3818 warn!("no textual content available; embedding skipped");
3819 mem.put_bytes_with_options(&payload, options)
3820 .map_err(|err| {
3821 map_put_error(
3822 err,
3823 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3824 )
3825 })?
3826 } else {
3827 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3829 info!(
3831 "Document will be chunked into {} chunks, generating embeddings for each",
3832 chunk_texts.len()
3833 );
3834
3835 #[cfg(feature = "temporal_enrich")]
3837 let enriched_chunks = if args.temporal_enrich {
3838 info!(
3839 "Temporal enrichment enabled, processing {} chunks",
3840 chunk_texts.len()
3841 );
3842 let today = chrono::Local::now().date_naive();
3844 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3845 let enriched: Vec<String> =
3847 results.iter().map(|(text, _)| text.clone()).collect();
3848 let resolved_count = results
3849 .iter()
3850 .filter(|(_, e)| {
3851 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3852 })
3853 .count();
3854 info!(
3855 "Temporal enrichment: resolved {} chunks with temporal context",
3856 resolved_count
3857 );
3858 enriched
3859 } else {
3860 chunk_texts.clone()
3861 };
3862 #[cfg(not(feature = "temporal_enrich"))]
3863 let enriched_chunks = {
3864 if args.temporal_enrich {
3865 warn!("Temporal enrichment requested but feature not compiled in");
3866 }
3867 chunk_texts.clone()
3868 };
3869
3870 let embed_chunks = if args.contextual {
3872 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3873 let engine = if args.contextual_model.as_deref() == Some("local") {
3874 #[cfg(feature = "llama-cpp")]
3875 {
3876 let model_path = get_local_contextual_model_path()?;
3877 ContextualEngine::local(model_path)
3878 }
3879 #[cfg(not(feature = "llama-cpp"))]
3880 {
3881 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3882 }
3883 } else if let Some(model) = &args.contextual_model {
3884 ContextualEngine::openai_with_model(model)?
3885 } else {
3886 ContextualEngine::openai()?
3887 };
3888
3889 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3890 Ok(contexts) => {
3891 info!("Generated {} contextual prefixes", contexts.len());
3892 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3893 }
3894 Err(e) => {
3895 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3896 enriched_chunks.clone()
3897 }
3898 }
3899 } else {
3900 enriched_chunks
3901 };
3902
3903 let truncated_chunks: Vec<String> = embed_chunks
3904 .iter()
3905 .map(|chunk_text| {
3906 let truncated_chunk = truncate_for_embedding(chunk_text);
3908 if truncated_chunk.len() < chunk_text.len() {
3909 info!(
3910 "Truncated chunk from {} to {} chars for embedding",
3911 chunk_text.len(),
3912 truncated_chunk.len()
3913 );
3914 }
3915 truncated_chunk
3916 })
3917 .collect();
3918 let truncated_refs: Vec<&str> =
3919 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3920 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3921 let parent_text = truncate_for_embedding(&embed_text);
3923 if parent_text.len() < embed_text.len() {
3924 info!(
3925 "Truncated parent text from {} to {} chars for embedding",
3926 embed_text.len(),
3927 parent_text.len()
3928 );
3929 }
3930 let parent_embedding = runtime.embed_passage(&parent_text)?;
3931 embedded = true;
3932 apply_embedding_identity_metadata(&mut options, runtime);
3933 info!(
3934 "Calling put_with_chunk_embeddings with {} chunk embeddings",
3935 chunk_embeddings.len()
3936 );
3937 mem.put_with_chunk_embeddings(
3938 &payload,
3939 Some(parent_embedding),
3940 chunk_embeddings,
3941 options,
3942 )
3943 .map_err(|err| {
3944 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3945 })?
3946 } else {
3947 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3949 let truncated_text = truncate_for_embedding(&embed_text);
3950 if truncated_text.len() < embed_text.len() {
3951 info!(
3952 "Truncated text from {} to {} chars for embedding",
3953 embed_text.len(),
3954 truncated_text.len()
3955 );
3956 }
3957 let embedding = runtime.embed_passage(&truncated_text)?;
3958 embedded = true;
3959 apply_embedding_identity_metadata(&mut options, runtime);
3960 mem.put_with_embedding_and_options(&payload, embedding, options)
3961 .map_err(|err| {
3962 map_put_error(
3963 err,
3964 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3965 )
3966 })?
3967 }
3968 }
3969 }
3970 } else {
3971 mem.put_bytes_with_options(&payload, options)
3972 .map_err(|err| {
3973 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3974 })?
3975 }
3976 };
3977
3978 Ok(IngestOutcome {
3979 report: PutReport {
3980 seq,
3981 frame_id,
3982 uri,
3983 title: inferred_title,
3984 original_bytes,
3985 stored_bytes,
3986 compressed,
3987 source: source_path.map(Path::to_path_buf),
3988 mime: Some(analysis.mime),
3989 metadata: analysis.metadata,
3990 extraction: analysis.extraction,
3991 #[cfg(feature = "logic_mesh")]
3992 search_text,
3993 },
3994 embedded,
3995 #[cfg(feature = "parallel_segments")]
3996 parallel_input: None,
3997 })
3998}
3999
4000fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4001 if std::str::from_utf8(payload).is_ok() {
4002 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4003 Ok((compressed.len(), true))
4004 } else {
4005 Ok((payload.len(), false))
4006 }
4007}
4008
4009fn print_report(report: &PutReport) {
4010 let name = report
4011 .source
4012 .as_ref()
4013 .map(|path| {
4014 let pretty = env::current_dir()
4015 .ok()
4016 .as_ref()
4017 .and_then(|cwd| diff_paths(path, cwd))
4018 .unwrap_or_else(|| path.to_path_buf());
4019 pretty.to_string_lossy().into_owned()
4020 })
4021 .unwrap_or_else(|| "stdin".to_string());
4022
4023 let ratio = if report.original_bytes > 0 {
4024 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4025 } else {
4026 100.0
4027 };
4028
4029 println!("• {name} → {}", report.uri);
4030 println!(" seq: {}", report.seq);
4031 if report.compressed {
4032 println!(
4033 " size: {} B → {} B ({:.1}% of original, compressed)",
4034 report.original_bytes, report.stored_bytes, ratio
4035 );
4036 } else {
4037 println!(" size: {} B (stored as-is)", report.original_bytes);
4038 }
4039 if let Some(mime) = &report.mime {
4040 println!(" mime: {mime}");
4041 }
4042 if let Some(title) = &report.title {
4043 println!(" title: {title}");
4044 }
4045 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4046 println!(
4047 " extraction: status={} reader={}",
4048 report.extraction.status.label(),
4049 extraction_reader
4050 );
4051 if let Some(ms) = report.extraction.duration_ms {
4052 println!(" extraction-duration: {} ms", ms);
4053 }
4054 if let Some(pages) = report.extraction.pages_processed {
4055 println!(" extraction-pages: {}", pages);
4056 }
4057 for warning in &report.extraction.warnings {
4058 println!(" warning: {warning}");
4059 }
4060 if let Some(metadata) = &report.metadata {
4061 if let Some(caption) = &metadata.caption {
4062 println!(" caption: {caption}");
4063 }
4064 if let Some(audio) = metadata.audio.as_ref() {
4065 if audio.duration_secs.is_some()
4066 || audio.sample_rate_hz.is_some()
4067 || audio.channels.is_some()
4068 {
4069 let duration = audio
4070 .duration_secs
4071 .map(|secs| format!("{secs:.1}s"))
4072 .unwrap_or_else(|| "unknown".into());
4073 let rate = audio
4074 .sample_rate_hz
4075 .map(|hz| format!("{} Hz", hz))
4076 .unwrap_or_else(|| "? Hz".into());
4077 let channels = audio
4078 .channels
4079 .map(|ch| ch.to_string())
4080 .unwrap_or_else(|| "?".into());
4081 println!(" audio: duration {duration}, {channels} ch, {rate}");
4082 }
4083 }
4084 }
4085
4086 println!();
4087}
4088
4089fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4090 let source = report
4091 .source
4092 .as_ref()
4093 .map(|path| path.to_string_lossy().into_owned());
4094 let extraction = json!({
4095 "status": report.extraction.status.label(),
4096 "reader": report.extraction.reader,
4097 "duration_ms": report.extraction.duration_ms,
4098 "pages_processed": report.extraction.pages_processed,
4099 "warnings": report.extraction.warnings,
4100 });
4101 json!({
4102 "seq": report.seq,
4103 "uri": report.uri,
4104 "title": report.title,
4105 "original_bytes": report.original_bytes,
4106 "original_bytes_human": format_bytes(report.original_bytes as u64),
4107 "stored_bytes": report.stored_bytes,
4108 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4109 "compressed": report.compressed,
4110 "mime": report.mime,
4111 "source": source,
4112 "metadata": report.metadata,
4113 "extraction": extraction,
4114 })
4115}
4116
4117fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4118 match err {
4119 MemvidError::CapacityExceeded {
4120 current,
4121 limit,
4122 required,
4123 } => anyhow!(CapacityExceededMessage {
4124 current,
4125 limit,
4126 required,
4127 }),
4128 other => other.into(),
4129 }
4130}
4131
4132fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4133 for (idx, entry) in entries.iter().enumerate() {
4134 match serde_json::from_str::<DocMetadata>(entry) {
4135 Ok(meta) => {
4136 options.metadata = Some(meta);
4137 }
4138 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4139 Ok(value) => {
4140 options
4141 .extra_metadata
4142 .insert(format!("custom_metadata_{idx}"), value.to_string());
4143 }
4144 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4145 },
4146 }
4147 }
4148}
4149
4150fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4151 let stats = mem.stats()?;
4152 let message = match stats.tier {
4153 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4154 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4155 };
4156 Ok(message)
4157}
4158
4159fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4160 let trimmed = raw.trim().trim_start_matches("mv2://");
4161 let normalized = trimmed.replace('\\', "/");
4162 let mut segments: Vec<String> = Vec::new();
4163 for segment in normalized.split('/') {
4164 if segment.is_empty() || segment == "." {
4165 continue;
4166 }
4167 if segment == ".." {
4168 segments.pop();
4169 continue;
4170 }
4171 segments.push(segment.to_string());
4172 }
4173
4174 if segments.is_empty() {
4175 return normalized
4176 .split('/')
4177 .last()
4178 .filter(|s| !s.is_empty())
4179 .unwrap_or("document")
4180 .to_string();
4181 }
4182
4183 if keep_path {
4184 segments.join("/")
4185 } else {
4186 segments
4187 .last()
4188 .cloned()
4189 .unwrap_or_else(|| "document".to_string())
4190 }
4191}
4192
4193fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4194 if quiet {
4195 let pb = ProgressBar::hidden();
4196 pb.set_message(message.to_string());
4197 return pb;
4198 }
4199
4200 match total {
4201 Some(len) => {
4202 let pb = ProgressBar::new(len);
4203 pb.set_draw_target(ProgressDrawTarget::stderr());
4204 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4205 .unwrap_or_else(|_| ProgressStyle::default_bar());
4206 pb.set_style(style);
4207 pb.set_message(message.to_string());
4208 pb
4209 }
4210 None => {
4211 let pb = ProgressBar::new_spinner();
4212 pb.set_draw_target(ProgressDrawTarget::stderr());
4213 pb.set_message(message.to_string());
4214 pb.enable_steady_tick(Duration::from_millis(120));
4215 pb
4216 }
4217 }
4218}
4219
4220fn create_spinner(message: &str) -> ProgressBar {
4221 let pb = ProgressBar::new_spinner();
4222 pb.set_draw_target(ProgressDrawTarget::stderr());
4223 let style = ProgressStyle::with_template("{spinner} {msg}")
4224 .unwrap_or_else(|_| ProgressStyle::default_spinner())
4225 .tick_strings(&["-", "\\", "|", "/"]);
4226 pb.set_style(style);
4227 pb.set_message(message.to_string());
4228 pb.enable_steady_tick(Duration::from_millis(120));
4229 pb
4230}
4231
4232#[cfg(feature = "parallel_segments")]
4238#[derive(Args)]
4239pub struct PutManyArgs {
4240 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4242 pub file: PathBuf,
4243 #[arg(long)]
4245 pub json: bool,
4246 #[arg(long, value_name = "PATH")]
4249 pub input: Option<PathBuf>,
4250 #[arg(long, default_value = "3")]
4252 pub compression_level: i32,
4253 #[command(flatten)]
4254 pub lock: LockCliArgs,
4255}
4256
4257#[cfg(feature = "parallel_segments")]
4259#[derive(Debug, serde::Deserialize)]
4260pub struct PutManyRequest {
4261 pub title: String,
4262 #[serde(default)]
4263 pub label: String,
4264 pub text: String,
4265 #[serde(default)]
4266 pub uri: Option<String>,
4267 #[serde(default)]
4268 pub tags: Vec<String>,
4269 #[serde(default)]
4270 pub labels: Vec<String>,
4271 #[serde(default)]
4272 pub metadata: serde_json::Value,
4273 #[serde(default)]
4276 pub extra_metadata: BTreeMap<String, String>,
4277 #[serde(default)]
4279 pub embedding: Option<Vec<f32>>,
4280 #[serde(default)]
4284 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4285}
4286
4287#[cfg(feature = "parallel_segments")]
4289#[derive(Debug, serde::Deserialize)]
4290#[serde(transparent)]
4291pub struct PutManyBatch {
4292 pub requests: Vec<PutManyRequest>,
4293}
4294
4295#[cfg(feature = "parallel_segments")]
4296pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4297 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4298 use std::io::Read;
4299
4300 let mut mem = Memvid::open(&args.file)?;
4301 crate::utils::ensure_cli_mutation_allowed(&mem)?;
4302 crate::utils::apply_lock_cli(&mut mem, &args.lock);
4303
4304 let stats = mem.stats()?;
4306 if !stats.has_lex_index {
4307 mem.enable_lex()?;
4308 }
4309 if !stats.has_vec_index {
4310 mem.enable_vec()?;
4311 }
4312
4313 let batch_json: String = if let Some(input_path) = &args.input {
4315 fs::read_to_string(input_path)
4316 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4317 } else {
4318 let mut buffer = String::new();
4319 io::stdin()
4320 .read_to_string(&mut buffer)
4321 .context("Failed to read from stdin")?;
4322 buffer
4323 };
4324
4325 let batch: PutManyBatch =
4326 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4327
4328 if batch.requests.is_empty() {
4329 if args.json {
4330 println!("{{\"frame_ids\": [], \"count\": 0}}");
4331 } else {
4332 println!("No requests to process");
4333 }
4334 return Ok(());
4335 }
4336
4337 let count = batch.requests.len();
4338 if !args.json {
4339 eprintln!("Processing {} documents...", count);
4340 }
4341
4342 let inputs: Vec<ParallelInput> = batch
4344 .requests
4345 .into_iter()
4346 .enumerate()
4347 .map(|(i, req)| {
4348 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4349 let mut options = PutOptions::default();
4350 options.title = Some(req.title);
4351 options.uri = Some(uri);
4352 options.tags = req.tags;
4353 options.labels = req.labels;
4354 options.extra_metadata = req.extra_metadata;
4355 if !req.metadata.is_null() {
4356 match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4357 Ok(meta) => options.metadata = Some(meta),
4358 Err(_) => {
4359 options
4360 .extra_metadata
4361 .entry("memvid.metadata.json".to_string())
4362 .or_insert_with(|| req.metadata.to_string());
4363 }
4364 }
4365 }
4366
4367 if let Some(embedding) = req
4368 .embedding
4369 .as_ref()
4370 .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4371 {
4372 options
4373 .extra_metadata
4374 .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4375 .or_insert_with(|| embedding.len().to_string());
4376 }
4377
4378 ParallelInput {
4379 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4380 options,
4381 embedding: req.embedding,
4382 chunk_embeddings: req.chunk_embeddings,
4383 }
4384 })
4385 .collect();
4386
4387 let build_opts = BuildOpts {
4389 zstd_level: args.compression_level.clamp(1, 9),
4390 ..BuildOpts::default()
4391 };
4392
4393 let frame_ids = mem
4395 .put_parallel_inputs(&inputs, build_opts)
4396 .context("Failed to ingest batch")?;
4397
4398 if args.json {
4399 let output = serde_json::json!({
4400 "frame_ids": frame_ids,
4401 "count": frame_ids.len()
4402 });
4403 println!("{}", serde_json::to_string(&output)?);
4404 } else {
4405 println!("Ingested {} documents", frame_ids.len());
4406 if frame_ids.len() <= 10 {
4407 for fid in &frame_ids {
4408 println!(" frame_id: {}", fid);
4409 }
4410 } else {
4411 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4412 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4413 }
4414 }
4415
4416 Ok(())
4417}