1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34 MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60fn truncate_for_embedding(text: &str) -> String {
63 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64 text.to_string()
65 } else {
66 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68 let end = truncated
70 .char_indices()
71 .rev()
72 .next()
73 .map(|(i, c)| i + c.len_utf8())
74 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75 text[..end].to_string()
76 }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80 options.extra_metadata.insert(
81 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82 runtime.provider_kind().to_string(),
83 );
84 options.extra_metadata.insert(
85 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86 runtime.provider_model_id(),
87 );
88 options.extra_metadata.insert(
89 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90 runtime.dimension().to_string(),
91 );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95 options: &mut PutOptions,
96 model: EmbeddingModelChoice,
97 dimension: usize,
98 model_id_override: Option<&str>,
99) {
100 let provider = match model {
101 EmbeddingModelChoice::OpenAILarge
102 | EmbeddingModelChoice::OpenAISmall
103 | EmbeddingModelChoice::OpenAIAda => "openai",
104 EmbeddingModelChoice::Nvidia => "nvidia",
105 _ => "fastembed",
106 };
107
108 let model_id = match model {
109 EmbeddingModelChoice::Nvidia => model_id_override
110 .map(str::trim)
111 .filter(|value| !value.is_empty())
112 .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113 .filter(|value| !value.is_empty())
114 .map(|value| {
115 if value.contains('/') {
116 value.to_string()
117 } else {
118 format!("nvidia/{value}")
119 }
120 })
121 .unwrap_or_else(|| model.canonical_model_id().to_string()),
122 _ => model.canonical_model_id().to_string(),
123 };
124
125 options.extra_metadata.insert(
126 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127 provider.to_string(),
128 );
129 options.extra_metadata.insert(
130 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131 model_id,
132 );
133 options.extra_metadata.insert(
134 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135 dimension.to_string(),
136 );
137}
138
139#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144 let expanded = if custom.starts_with("~/") {
146 if let Ok(home) = env::var("HOME") {
147 PathBuf::from(home).join(&custom[2..])
148 } else {
149 PathBuf::from(&custom)
150 }
151 } else {
152 PathBuf::from(&custom)
153 };
154 expanded
155 } else {
156 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158 PathBuf::from(home).join(".memvid").join("models")
159 };
160
161 let model_path = models_dir
162 .join("llm")
163 .join("phi-3-mini-q4")
164 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166 if model_path.exists() {
167 Ok(model_path)
168 } else {
169 Err(anyhow!(
170 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171 model_path.display()
172 ))
173 }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185 select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190fn parse_bool_flag(value: &str) -> Option<bool> {
192 match value.trim().to_ascii_lowercase().as_str() {
193 "1" | "true" | "yes" | "on" => Some(true),
194 "0" | "false" | "no" | "off" => Some(false),
195 _ => None,
196 }
197}
198
199#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202 std::env::var("MEMVID_PARALLEL_SEGMENTS")
203 .ok()
204 .and_then(|value| parse_bool_flag(&value))
205}
206
207#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211 PathBuf::from(custom)
212 } else if let Ok(home) = env::var("HOME") {
213 PathBuf::from(home).join(".memvid/models")
214 } else {
215 PathBuf::from(".memvid/models")
216 };
217
218 let model_name =
219 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224 vision_model.exists() && text_model.exists()
225}
226
227#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249 entities: Vec<memvid_core::ExtractedEntity>,
250 original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252 use memvid_core::ExtractedEntity;
253
254 if entities.is_empty() {
255 return entities;
256 }
257
258 let mut sorted: Vec<ExtractedEntity> = entities;
260 sorted.sort_by_key(|e| e.byte_start);
261
262 let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264 for entity in sorted {
265 if let Some(last) = merged.last_mut() {
266 let gap = entity.byte_start.saturating_sub(last.byte_end);
268 let same_type = last.entity_type == entity.entity_type;
269
270 let gap_is_mergeable = if gap == 0 {
272 true
273 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274 let gap_text = &original_text[last.byte_end..entity.byte_start];
276 if gap_text.contains('\n') || gap_text.contains('\r') {
278 false
279 } else {
280 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281 }
282 } else {
283 false
284 };
285
286 if same_type && gap_is_mergeable {
287 let merged_text = if entity.byte_end <= original_text.len() {
289 original_text[last.byte_start..entity.byte_end].to_string()
290 } else {
291 format!("{}{}", last.text, entity.text)
292 };
293
294 tracing::debug!(
295 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296 last.text, entity.text, merged_text, gap
297 );
298
299 last.text = merged_text;
300 last.byte_end = entity.byte_end;
301 last.confidence = (last.confidence + entity.confidence) / 2.0;
303 continue;
304 }
305 }
306
307 merged.push(entity);
308 }
309
310 merged
311}
312
313#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317 if confidence < NER_MIN_CONFIDENCE {
319 return false;
320 }
321
322 let trimmed = text.trim();
323
324 if trimmed.len() < NER_MIN_ENTITY_LEN {
326 return false;
327 }
328
329 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331 return false;
332 }
333
334 if trimmed.contains('\n') || trimmed.contains('\r') {
336 return false;
337 }
338
339 if trimmed.starts_with("##") || trimmed.ends_with("##") {
341 return false;
342 }
343
344 if trimmed.ends_with('.') {
346 return false;
347 }
348
349 let lower = trimmed.to_lowercase();
350
351 if trimmed.len() == 1 {
353 return false;
354 }
355
356 if trimmed.len() == 2 {
358 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360 return true;
361 }
362 return false;
364 }
365
366 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368 return false;
369 }
370
371 let words: Vec<&str> = trimmed.split_whitespace().collect();
374 if words.len() >= 2 {
375 let first_word = words[0];
376 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380 return false;
381 }
382 }
383 }
384
385 if let Some(first_char) = trimmed.chars().next() {
387 if first_char.is_lowercase() {
388 return false;
389 }
390 }
391
392 true
393}
394
395pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397 let (key, value) = s
398 .split_once('=')
399 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400 Ok((key.to_string(), value.to_string()))
401}
402
403#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408 pub lock_timeout: u64,
409 #[arg(long = "force", action = ArgAction::SetTrue)]
411 pub force: bool,
412}
413
414#[derive(Args)]
416pub struct PutArgs {
417 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419 pub file: PathBuf,
420 #[arg(long)]
422 pub json: bool,
423 #[arg(long, value_name = "PATH")]
425 pub input: Option<String>,
426 #[arg(long, value_name = "URI")]
428 pub uri: Option<String>,
429 #[arg(long, value_name = "TITLE")]
431 pub title: Option<String>,
432 #[arg(long)]
434 pub timestamp: Option<i64>,
435 #[arg(long)]
437 pub track: Option<String>,
438 #[arg(long)]
440 pub kind: Option<String>,
441 #[arg(long, action = ArgAction::SetTrue)]
443 pub video: bool,
444 #[arg(long, action = ArgAction::SetTrue)]
446 pub transcript: bool,
447 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449 pub tags: Vec<(String, String)>,
450 #[arg(long = "label", value_name = "LABEL")]
452 pub labels: Vec<String>,
453 #[arg(long = "metadata", value_name = "JSON")]
455 pub metadata: Vec<String>,
456 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458 pub no_auto_tag: bool,
459 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461 pub no_extract_dates: bool,
462 #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
464 pub no_extract_triplets: bool,
465 #[arg(long, action = ArgAction::SetTrue)]
467 pub audio: bool,
468 #[arg(long, action = ArgAction::SetTrue)]
471 pub transcribe: bool,
472 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
474 pub audio_segment_seconds: Option<u32>,
475 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
478 pub embedding: bool,
479 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
481 pub no_embedding: bool,
482 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
485 pub embedding_vec: Option<PathBuf>,
486 #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
494 pub embedding_vec_model: Option<String>,
495 #[arg(long, action = ArgAction::SetTrue)]
499 pub vector_compression: bool,
500 #[arg(long, action = ArgAction::SetTrue)]
504 pub contextual: bool,
505 #[arg(long = "contextual-model", value_name = "MODEL")]
507 pub contextual_model: Option<String>,
508 #[arg(long, action = ArgAction::SetTrue)]
511 pub tables: bool,
512 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
514 pub no_tables: bool,
515 #[cfg(feature = "clip")]
519 #[arg(long, action = ArgAction::SetTrue)]
520 pub clip: bool,
521
522 #[cfg(feature = "clip")]
524 #[arg(long, action = ArgAction::SetTrue)]
525 pub no_clip: bool,
526
527 #[arg(long, conflicts_with = "allow_duplicate")]
529 pub update_existing: bool,
530 #[arg(long)]
532 pub allow_duplicate: bool,
533
534 #[arg(long, action = ArgAction::SetTrue)]
538 pub temporal_enrich: bool,
539
540 #[arg(long = "memory-id", value_name = "ID")]
543 pub memory_id: Option<crate::commands::tickets::MemoryId>,
544
545 #[arg(long, action = ArgAction::SetTrue)]
550 pub logic_mesh: bool,
551
552 #[cfg(feature = "parallel_segments")]
554 #[arg(long, action = ArgAction::SetTrue)]
555 pub parallel_segments: bool,
556 #[cfg(feature = "parallel_segments")]
558 #[arg(long, action = ArgAction::SetTrue)]
559 pub no_parallel_segments: bool,
560 #[cfg(feature = "parallel_segments")]
562 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
563 pub parallel_segment_tokens: Option<usize>,
564 #[cfg(feature = "parallel_segments")]
566 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
567 pub parallel_segment_pages: Option<usize>,
568 #[cfg(feature = "parallel_segments")]
570 #[arg(long = "parallel-threads", value_name = "N")]
571 pub parallel_threads: Option<usize>,
572 #[cfg(feature = "parallel_segments")]
574 #[arg(long = "parallel-queue-depth", value_name = "N")]
575 pub parallel_queue_depth: Option<usize>,
576
577 #[command(flatten)]
578 pub lock: LockCliArgs,
579}
580
581#[cfg(feature = "parallel_segments")]
582impl PutArgs {
583 pub fn wants_parallel(&self) -> bool {
584 if self.parallel_segments {
585 return true;
586 }
587 if self.no_parallel_segments {
588 return false;
589 }
590 if let Some(env_flag) = parallel_env_override() {
591 return env_flag;
592 }
593 false
594 }
595
596 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
597 let mut opts = memvid_core::BuildOpts::default();
598 if let Some(tokens) = self.parallel_segment_tokens {
599 opts.segment_tokens = tokens;
600 }
601 if let Some(pages) = self.parallel_segment_pages {
602 opts.segment_pages = pages;
603 }
604 if let Some(threads) = self.parallel_threads {
605 opts.threads = threads;
606 }
607 if let Some(depth) = self.parallel_queue_depth {
608 opts.queue_depth = depth;
609 }
610 opts.sanitize();
611 opts
612 }
613}
614
615#[cfg(not(feature = "parallel_segments"))]
616impl PutArgs {
617 pub fn wants_parallel(&self) -> bool {
618 false
619 }
620}
621
622#[derive(Args)]
624pub struct ApiFetchArgs {
625 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
627 pub file: PathBuf,
628 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
630 pub config: PathBuf,
631 #[arg(long, action = ArgAction::SetTrue)]
633 pub dry_run: bool,
634 #[arg(long, value_name = "MODE")]
636 pub mode: Option<ApiFetchMode>,
637 #[arg(long, value_name = "URI")]
639 pub uri: Option<String>,
640 #[arg(long, action = ArgAction::SetTrue)]
642 pub json: bool,
643
644 #[command(flatten)]
645 pub lock: LockCliArgs,
646}
647
648#[derive(Args)]
650pub struct UpdateArgs {
651 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
652 pub file: PathBuf,
653 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
654 pub frame_id: Option<u64>,
655 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
656 pub uri: Option<String>,
657 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
658 pub input: Option<PathBuf>,
659 #[arg(long = "set-uri", value_name = "URI")]
660 pub set_uri: Option<String>,
661 #[arg(long, value_name = "TITLE")]
662 pub title: Option<String>,
663 #[arg(long, value_name = "TIMESTAMP")]
664 pub timestamp: Option<i64>,
665 #[arg(long, value_name = "TRACK")]
666 pub track: Option<String>,
667 #[arg(long, value_name = "KIND")]
668 pub kind: Option<String>,
669 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
670 pub tags: Vec<(String, String)>,
671 #[arg(long = "label", value_name = "LABEL")]
672 pub labels: Vec<String>,
673 #[arg(long = "metadata", value_name = "JSON")]
674 pub metadata: Vec<String>,
675 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
676 pub embeddings: bool,
677 #[arg(long)]
678 pub json: bool,
679
680 #[command(flatten)]
681 pub lock: LockCliArgs,
682}
683
684#[derive(Args)]
686pub struct DeleteArgs {
687 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
688 pub file: PathBuf,
689 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
690 pub frame_id: Option<u64>,
691 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
692 pub uri: Option<String>,
693 #[arg(long, action = ArgAction::SetTrue)]
694 pub yes: bool,
695 #[arg(long)]
696 pub json: bool,
697
698 #[command(flatten)]
699 pub lock: LockCliArgs,
700}
701
702pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
704 let command = ApiFetchCommand {
705 file: args.file,
706 config_path: args.config,
707 dry_run: args.dry_run,
708 mode_override: args.mode,
709 uri_override: args.uri,
710 output_json: args.json,
711 lock_timeout_ms: args.lock.lock_timeout,
712 force_lock: args.lock.force,
713 };
714 run_api_fetch(config, command)
715}
716
717pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
719 let mut mem = Memvid::open(&args.file)?;
720 ensure_cli_mutation_allowed(&mem)?;
721 apply_lock_cli(&mut mem, &args.lock);
722 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
723
724 if !args.yes {
725 if let Some(uri) = &frame.uri {
726 println!("About to delete frame {} ({uri})", frame.id);
727 } else {
728 println!("About to delete frame {}", frame.id);
729 }
730 print!("Confirm? [y/N]: ");
731 io::stdout().flush()?;
732 let mut input = String::new();
733 io::stdin().read_line(&mut input)?;
734 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
735 if !confirmed {
736 println!("Aborted");
737 return Ok(());
738 }
739 }
740
741 let seq = mem.delete_frame(frame.id)?;
742 mem.commit()?;
743 let deleted = mem.frame_by_id(frame.id)?;
744
745 if args.json {
746 let json = json!({
747 "frame_id": frame.id,
748 "sequence": seq,
749 "status": frame_status_str(deleted.status),
750 });
751 println!("{}", serde_json::to_string_pretty(&json)?);
752 } else {
753 println!("Deleted frame {} (seq {})", frame.id, seq);
754 }
755
756 Ok(())
757}
758pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
759 crate::utils::require_active_plan(config, "put")?;
761
762 let mut mem = Memvid::open(&args.file)?;
763 ensure_cli_mutation_allowed(&mem)?;
764 apply_lock_cli(&mut mem, &args.lock);
765
766 if let Some(memory_id) = &args.memory_id {
768 let needs_binding = match mem.get_memory_binding() {
769 Some(binding) => binding.memory_id != **memory_id,
770 None => true,
771 };
772 if needs_binding {
773 match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
774 Ok((bound_id, capacity)) => {
775 eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
776 }
777 Err(e) => {
778 eprintln!("⚠️ Failed to bind to dashboard memory: {}", e);
779 eprintln!(" Continuing with existing capacity. Bind manually with:");
780 eprintln!(" memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
781 }
782 }
783 }
784 }
785
786 #[cfg(feature = "replay")]
788 let _ = mem.load_active_session();
789 let mut stats = mem.stats()?;
790 if stats.frame_count == 0 && !stats.has_lex_index {
791 mem.enable_lex()?;
792 stats = mem.stats()?;
793 }
794
795 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
798
799 if args.vector_compression {
801 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
802 }
803
804 let mut capacity_guard = CapacityGuard::from_stats(&stats);
805 let use_parallel = args.wants_parallel();
806 #[cfg(feature = "parallel_segments")]
807 let mut parallel_opts = if use_parallel {
808 Some(args.sanitized_parallel_opts())
809 } else {
810 None
811 };
812 #[cfg(feature = "parallel_segments")]
813 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
814 #[cfg(feature = "parallel_segments")]
815 let mut pending_parallel_indices: Vec<usize> = Vec::new();
816 let input_set = resolve_inputs(args.input.as_deref())?;
817 if args.embedding && args.embedding_vec.is_some() {
818 anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
819 }
820 if args.embedding_vec.is_some() {
821 match &input_set {
822 InputSet::Files(paths) if paths.len() != 1 => {
823 anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
824 }
825 _ => {}
826 }
827 }
828
829 if args.video {
830 if let Some(kind) = &args.kind {
831 if !kind.eq_ignore_ascii_case("video") {
832 anyhow::bail!("--video conflicts with explicit --kind={kind}");
833 }
834 }
835 if matches!(input_set, InputSet::Stdin) {
836 anyhow::bail!("--video requires --input <file>");
837 }
838 }
839
840 #[allow(dead_code)]
841 enum EmbeddingMode {
842 None,
843 Auto,
844 Explicit,
845 }
846
847 let mv2_dimension = mem.effective_vec_index_dimension()?;
851 let inferred_embedding_model = if args.embedding {
852 match mem.embedding_identity_summary(10_000) {
853 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
854 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
855 let models: Vec<_> = identities
856 .iter()
857 .filter_map(|entry| entry.identity.model.as_deref())
858 .collect();
859 anyhow::bail!(
860 "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
861 Detected models: {:?}\n\n\
862 Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
863 models
864 );
865 }
866 memvid_core::EmbeddingIdentitySummary::Unknown => None,
867 }
868 } else {
869 None
870 };
871 let (embedding_mode, embedding_runtime) = if args.embedding {
872 (
873 EmbeddingMode::Explicit,
874 Some(load_embedding_runtime_for_mv2(
875 config,
876 inferred_embedding_model.as_deref(),
877 mv2_dimension,
878 )?),
879 )
880 } else if args.embedding_vec.is_some() {
881 (EmbeddingMode::Explicit, None)
882 } else {
883 (EmbeddingMode::None, None)
884 };
885 let embedding_enabled = args.embedding || args.embedding_vec.is_some();
886 let runtime_ref = embedding_runtime.as_ref();
887
888 #[cfg(feature = "clip")]
893 let clip_enabled = {
894 if args.no_clip {
895 false
896 } else if args.clip {
897 true } else {
899 let model_available = is_clip_model_installed();
901 if model_available {
902 match &input_set {
904 InputSet::Files(paths) => paths.iter().any(|p| {
905 is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
906 }),
907 InputSet::Stdin => false,
908 }
909 } else {
910 false
911 }
912 }
913 };
914 #[cfg(feature = "clip")]
915 let clip_model = if clip_enabled {
916 mem.enable_clip()?;
917 if !args.json {
918 let mode = if args.clip {
919 "explicit"
920 } else {
921 "auto-detected"
922 };
923 eprintln!("ℹ️ CLIP visual embeddings enabled ({mode})");
924 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
925 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
926 eprintln!(" Use --no-clip to disable auto-detection");
927 eprintln!();
928 }
929 match memvid_core::ClipModel::default_model() {
931 Ok(model) => Some(model),
932 Err(e) => {
933 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
934 None
935 }
936 }
937 } else {
938 None
939 };
940
941 if embedding_enabled && !args.json {
943 let capacity = stats.capacity_bytes;
944 if args.vector_compression {
945 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
948 eprintln!(" Storage: ~20 KB per document (16x compressed)");
949 eprintln!(" Accuracy: ~95% recall@10 maintained");
950 eprintln!(
951 " Capacity: ~{} documents max for {:.1} GB",
952 max_docs,
953 capacity as f64 / 1e9
954 );
955 eprintln!();
956 } else {
957 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
959 eprintln!(" Storage: ~270 KB per document");
960 eprintln!(
961 " Capacity: ~{} documents max for {:.1} GB",
962 max_docs,
963 capacity as f64 / 1e9
964 );
965 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
966 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
967 eprintln!();
968 }
969 }
970
971 let embed_progress = if embedding_enabled {
972 let total = match &input_set {
973 InputSet::Files(paths) => Some(paths.len() as u64),
974 InputSet::Stdin => None,
975 };
976 Some(create_task_progress_bar(total, "embed", false))
977 } else {
978 None
979 };
980 let mut embedded_docs = 0usize;
981
982 if let InputSet::Files(paths) = &input_set {
983 if paths.len() > 1 {
984 if args.uri.is_some() || args.title.is_some() {
985 anyhow::bail!(
986 "--uri/--title apply to a single file; omit them when using a directory or glob"
987 );
988 }
989 }
990 }
991
992 let mut reports = Vec::new();
993 let mut processed = 0usize;
994 let mut skipped = 0usize;
995 let mut bytes_added = 0u64;
996 let mut summary_warnings: Vec<String> = Vec::new();
997 #[cfg(feature = "clip")]
998 let mut clip_embeddings_added = false;
999
1000 let mut capacity_reached = false;
1001 match input_set {
1002 InputSet::Stdin => {
1003 processed += 1;
1004 match read_payload(None) {
1005 Ok(payload) => {
1006 let mut analyze_spinner = if args.json {
1007 None
1008 } else {
1009 Some(create_spinner("Analyzing stdin payload..."))
1010 };
1011 match ingest_payload(
1012 &mut mem,
1013 &args,
1014 config,
1015 payload,
1016 None,
1017 capacity_guard.as_mut(),
1018 embedding_enabled,
1019 runtime_ref,
1020 use_parallel,
1021 ) {
1022 Ok(outcome) => {
1023 if let Some(pb) = analyze_spinner.take() {
1024 pb.finish_and_clear();
1025 }
1026 bytes_added += outcome.report.stored_bytes as u64;
1027 if args.json {
1028 summary_warnings
1029 .extend(outcome.report.extraction.warnings.iter().cloned());
1030 }
1031 reports.push(outcome.report);
1032 let report_index = reports.len() - 1;
1033 if !args.json && !use_parallel {
1034 if let Some(report) = reports.get(report_index) {
1035 print_report(report);
1036 }
1037 }
1038 if args.transcript {
1039 let notice = transcript_notice_message(&mut mem)?;
1040 if args.json {
1041 summary_warnings.push(notice);
1042 } else {
1043 println!("{notice}");
1044 }
1045 }
1046 if outcome.embedded {
1047 if let Some(pb) = embed_progress.as_ref() {
1048 pb.inc(1);
1049 }
1050 embedded_docs += 1;
1051 }
1052 if use_parallel {
1053 #[cfg(feature = "parallel_segments")]
1054 if let Some(input) = outcome.parallel_input {
1055 pending_parallel_indices.push(report_index);
1056 pending_parallel_inputs.push(input);
1057 }
1058 } else if let Some(guard) = capacity_guard.as_mut() {
1059 mem.commit()?;
1060 let stats = mem.stats()?;
1061 guard.update_after_commit(&stats);
1062 guard.check_and_warn_capacity(); }
1064 }
1065 Err(err) => {
1066 if let Some(pb) = analyze_spinner.take() {
1067 pb.finish_and_clear();
1068 }
1069 if err.downcast_ref::<DuplicateUriError>().is_some() {
1070 return Err(err);
1071 }
1072 warn!("skipped stdin payload: {err}");
1073 skipped += 1;
1074 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1075 capacity_reached = true;
1076 }
1077 }
1078 }
1079 }
1080 Err(err) => {
1081 warn!("failed to read stdin: {err}");
1082 skipped += 1;
1083 }
1084 }
1085 }
1086 InputSet::Files(ref paths) => {
1087 let mut transcript_notice_printed = false;
1088 for path in paths {
1089 processed += 1;
1090 match read_payload(Some(&path)) {
1091 Ok(payload) => {
1092 let label = path.display().to_string();
1093 let mut analyze_spinner = if args.json {
1094 None
1095 } else {
1096 Some(create_spinner(&format!("Analyzing {label}...")))
1097 };
1098 match ingest_payload(
1099 &mut mem,
1100 &args,
1101 config,
1102 payload,
1103 Some(&path),
1104 capacity_guard.as_mut(),
1105 embedding_enabled,
1106 runtime_ref,
1107 use_parallel,
1108 ) {
1109 Ok(outcome) => {
1110 if let Some(pb) = analyze_spinner.take() {
1111 pb.finish_and_clear();
1112 }
1113 bytes_added += outcome.report.stored_bytes as u64;
1114
1115 #[cfg(feature = "clip")]
1117 if let Some(ref model) = clip_model {
1118 let mime = outcome.report.mime.as_deref();
1119 let clip_frame_id = outcome.report.frame_id;
1120
1121 if is_image_file(&path) {
1122 match model.encode_image_file(&path) {
1123 Ok(embedding) => {
1124 if let Err(e) = mem.add_clip_embedding_with_page(
1125 clip_frame_id,
1126 None,
1127 embedding,
1128 ) {
1129 warn!(
1130 "Failed to add CLIP embedding for {}: {e}",
1131 path.display()
1132 );
1133 } else {
1134 info!(
1135 "Added CLIP embedding for frame {}",
1136 clip_frame_id
1137 );
1138 clip_embeddings_added = true;
1139 }
1140 }
1141 Err(e) => {
1142 warn!(
1143 "Failed to encode CLIP embedding for {}: {e}",
1144 path.display()
1145 );
1146 }
1147 }
1148 } else if matches!(mime, Some(m) if m == "application/pdf") {
1149 if let Err(e) = mem.commit() {
1151 warn!("Failed to commit before CLIP extraction: {e}");
1152 }
1153
1154 match render_pdf_pages_for_clip(
1155 &path,
1156 CLIP_PDF_MAX_IMAGES,
1157 CLIP_PDF_TARGET_PX,
1158 ) {
1159 Ok(rendered_pages) => {
1160 use rayon::prelude::*;
1161
1162 let path_for_closure = path.clone();
1165
1166 let prev_hook = std::panic::take_hook();
1169 std::panic::set_hook(Box::new(|_| {
1170 }));
1172
1173 let encoded_images: Vec<_> = rendered_pages
1174 .into_par_iter()
1175 .filter_map(|(page_num, image)| {
1176 let image_info = get_image_info(&image);
1178 if !image_info.should_embed() {
1179 info!(
1180 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1181 path_for_closure.display(),
1182 page_num,
1183 image_info.color_variance,
1184 image_info.width,
1185 image_info.height
1186 );
1187 return None;
1188 }
1189
1190 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1194 let rgb_image = image.to_rgb8();
1195 let mut png_bytes = Vec::new();
1196 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1197 &mut Cursor::new(&mut png_bytes),
1198 image::ImageFormat::Png,
1199 ).map(|()| png_bytes)
1200 }));
1201
1202 match encode_result {
1203 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1204 Ok(Err(e)) => {
1205 info!(
1206 "Skipping image for {} page {}: {e}",
1207 path_for_closure.display(),
1208 page_num
1209 );
1210 None
1211 }
1212 Err(_) => {
1213 info!(
1215 "Skipping malformed image for {} page {}",
1216 path_for_closure.display(),
1217 page_num
1218 );
1219 None
1220 }
1221 }
1222 })
1223 .collect();
1224
1225 std::panic::set_hook(prev_hook);
1227
1228 let mut child_frame_offset = 1u64;
1231 let parent_uri = outcome.report.uri.clone();
1232 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1233 let total_images = encoded_images.len();
1234
1235 let clip_pb = if !args.json && total_images > 0 {
1237 let pb = ProgressBar::new(total_images as u64);
1238 pb.set_style(
1239 ProgressStyle::with_template(
1240 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1241 )
1242 .unwrap_or_else(|_| ProgressStyle::default_bar())
1243 .progress_chars("█▓░"),
1244 );
1245 Some(pb)
1246 } else {
1247 None
1248 };
1249
1250 for (page_num, image, png_bytes) in encoded_images {
1251 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1252 let child_title = format!(
1253 "{} - Image {} (page {})",
1254 parent_title,
1255 child_frame_offset,
1256 page_num
1257 );
1258
1259 let child_options = PutOptions::builder()
1260 .uri(&child_uri)
1261 .title(&child_title)
1262 .parent_id(clip_frame_id)
1263 .role(FrameRole::ExtractedImage)
1264 .auto_tag(false)
1265 .extract_dates(false)
1266 .build();
1267
1268 match mem.put_bytes_with_options(&png_bytes, child_options) {
1269 Ok(_child_seq) => {
1270 let child_frame_id = clip_frame_id + child_frame_offset;
1271 child_frame_offset += 1;
1272
1273 match model.encode_image(&image) {
1274 Ok(embedding) => {
1275 if let Err(e) = mem
1276 .add_clip_embedding_with_page(
1277 child_frame_id,
1278 Some(page_num),
1279 embedding,
1280 )
1281 {
1282 warn!(
1283 "Failed to add CLIP embedding for {} page {}: {e}",
1284 path.display(),
1285 page_num
1286 );
1287 } else {
1288 info!(
1289 "Added CLIP image frame {} for {} page {}",
1290 child_frame_id,
1291 clip_frame_id,
1292 page_num
1293 );
1294 clip_embeddings_added = true;
1295 }
1296 }
1297 Err(e) => warn!(
1298 "Failed to encode CLIP embedding for {} page {}: {e}",
1299 path.display(),
1300 page_num
1301 ),
1302 }
1303
1304 if let Err(e) = mem.commit() {
1306 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1307 }
1308 }
1309 Err(e) => warn!(
1310 "Failed to store image frame for {} page {}: {e}",
1311 path.display(),
1312 page_num
1313 ),
1314 }
1315
1316 if let Some(ref pb) = clip_pb {
1317 pb.inc(1);
1318 }
1319 }
1320
1321 if let Some(pb) = clip_pb {
1322 pb.finish_and_clear();
1323 }
1324 }
1325 Err(err) => warn!(
1326 "Failed to render PDF pages for CLIP {}: {err}",
1327 path.display()
1328 ),
1329 }
1330 }
1331 }
1332
1333 if args.json {
1334 summary_warnings
1335 .extend(outcome.report.extraction.warnings.iter().cloned());
1336 }
1337 reports.push(outcome.report);
1338 let report_index = reports.len() - 1;
1339 if !args.json && !use_parallel {
1340 if let Some(report) = reports.get(report_index) {
1341 print_report(report);
1342 }
1343 }
1344 if args.transcript && !transcript_notice_printed {
1345 let notice = transcript_notice_message(&mut mem)?;
1346 if args.json {
1347 summary_warnings.push(notice);
1348 } else {
1349 println!("{notice}");
1350 }
1351 transcript_notice_printed = true;
1352 }
1353 if outcome.embedded {
1354 if let Some(pb) = embed_progress.as_ref() {
1355 pb.inc(1);
1356 }
1357 embedded_docs += 1;
1358 }
1359 if use_parallel {
1360 #[cfg(feature = "parallel_segments")]
1361 if let Some(input) = outcome.parallel_input {
1362 pending_parallel_indices.push(report_index);
1363 pending_parallel_inputs.push(input);
1364 }
1365 } else if let Some(guard) = capacity_guard.as_mut() {
1366 mem.commit()?;
1367 let stats = mem.stats()?;
1368 guard.update_after_commit(&stats);
1369 guard.check_and_warn_capacity(); }
1371 }
1372 Err(err) => {
1373 if let Some(pb) = analyze_spinner.take() {
1374 pb.finish_and_clear();
1375 }
1376 if err.downcast_ref::<DuplicateUriError>().is_some() {
1377 return Err(err);
1378 }
1379 warn!("skipped {}: {err}", path.display());
1380 skipped += 1;
1381 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1382 capacity_reached = true;
1383 }
1384 }
1385 }
1386 }
1387 Err(err) => {
1388 warn!("failed to read {}: {err}", path.display());
1389 skipped += 1;
1390 }
1391 }
1392 if capacity_reached {
1393 break;
1394 }
1395 }
1396 }
1397 }
1398
1399 if capacity_reached {
1400 warn!("capacity limit reached; remaining inputs were not processed");
1401 }
1402
1403 if let Some(pb) = embed_progress.as_ref() {
1404 pb.finish_with_message("embed");
1405 }
1406
1407 if let Some(runtime) = embedding_runtime.as_ref() {
1408 if embedded_docs > 0 {
1409 match embedding_mode {
1410 EmbeddingMode::Explicit => println!(
1411 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1412 embedded_docs,
1413 runtime.dimension()
1414 ),
1415 EmbeddingMode::Auto => println!(
1416 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1417 embedded_docs,
1418 runtime.dimension()
1419 ),
1420 EmbeddingMode::None => {}
1421 }
1422 } else {
1423 match embedding_mode {
1424 EmbeddingMode::Explicit => {
1425 println!("No embeddings were generated (no textual content available)");
1426 }
1427 EmbeddingMode::Auto => {
1428 println!(
1429 "Semantic runtime available, but no textual content produced embeddings"
1430 );
1431 }
1432 EmbeddingMode::None => {}
1433 }
1434 }
1435 }
1436
1437 if reports.is_empty() {
1438 if args.json {
1439 if capacity_reached {
1440 summary_warnings.push("capacity_limit_reached".to_string());
1441 }
1442 let summary_json = json!({
1443 "processed": processed,
1444 "ingested": 0,
1445 "skipped": skipped,
1446 "bytes_added": bytes_added,
1447 "bytes_added_human": format_bytes(bytes_added),
1448 "embedded_documents": embedded_docs,
1449 "capacity_reached": capacity_reached,
1450 "warnings": summary_warnings,
1451 "reports": Vec::<serde_json::Value>::new(),
1452 });
1453 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1454 } else {
1455 println!(
1456 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1457 processed, skipped
1458 );
1459 }
1460 return Ok(());
1461 }
1462
1463 #[cfg(feature = "parallel_segments")]
1464 let mut parallel_flush_spinner = if use_parallel && !args.json {
1465 Some(create_spinner("Flushing parallel segments..."))
1466 } else {
1467 None
1468 };
1469
1470 if use_parallel {
1471 #[cfg(feature = "parallel_segments")]
1472 {
1473 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1474 let mut opts = BuildOpts::default();
1475 opts.sanitize();
1476 opts
1477 });
1478 opts.vec_compression = mem.vector_compression().clone();
1480 let seqs = if pending_parallel_inputs.is_empty() {
1481 mem.commit_parallel(opts)?;
1482 Vec::new()
1483 } else {
1484 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1485 };
1486 if let Some(pb) = parallel_flush_spinner.take() {
1487 pb.finish_with_message("Flushed parallel segments");
1488 }
1489 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1490 if let Some(report) = reports.get_mut(idx) {
1491 report.seq = seq;
1492 }
1493 }
1494 }
1495 #[cfg(not(feature = "parallel_segments"))]
1496 {
1497 mem.commit()?;
1498 }
1499 } else {
1500 mem.commit()?;
1501 }
1502
1503 #[cfg(feature = "clip")]
1505 if clip_embeddings_added {
1506 mem.commit()?;
1507 }
1508
1509 if !args.json && use_parallel {
1510 for report in &reports {
1511 print_report(report);
1512 }
1513 }
1514
1515 let mut tables_extracted = 0usize;
1517 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1518 if args.tables && !args.no_tables {
1519 if let InputSet::Files(ref paths) = input_set {
1520 let pdf_paths: Vec<_> = paths
1521 .iter()
1522 .filter(|p| {
1523 p.extension()
1524 .and_then(|e| e.to_str())
1525 .map(|e| e.eq_ignore_ascii_case("pdf"))
1526 .unwrap_or(false)
1527 })
1528 .collect();
1529
1530 if !pdf_paths.is_empty() {
1531 let table_spinner = if args.json {
1532 None
1533 } else {
1534 Some(create_spinner(&format!(
1535 "Extracting tables from {} PDF(s)...",
1536 pdf_paths.len()
1537 )))
1538 };
1539
1540 let table_options = TableExtractionOptions::builder()
1542 .mode(memvid_core::table::ExtractionMode::Aggressive)
1543 .min_rows(2)
1544 .min_cols(2)
1545 .min_quality(memvid_core::table::TableQuality::Low)
1546 .merge_multi_page(true)
1547 .build();
1548
1549 for pdf_path in pdf_paths {
1550 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1551 let filename = pdf_path
1552 .file_name()
1553 .and_then(|s| s.to_str())
1554 .unwrap_or("unknown.pdf");
1555
1556 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1557 for table in &result.tables {
1558 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1559 {
1560 tables_extracted += 1;
1561 table_summaries.push(json!({
1562 "table_id": table.table_id,
1563 "source_file": table.source_file,
1564 "meta_frame_id": meta_id,
1565 "rows": table.n_rows,
1566 "cols": table.n_cols,
1567 "quality": format!("{:?}", table.quality),
1568 "pages": format!("{}-{}", table.page_start, table.page_end),
1569 }));
1570 }
1571 }
1572 }
1573 }
1574 }
1575
1576 if let Some(pb) = table_spinner {
1577 if tables_extracted > 0 {
1578 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1579 } else {
1580 pb.finish_with_message("No tables found");
1581 }
1582 }
1583
1584 if tables_extracted > 0 {
1586 mem.commit()?;
1587 }
1588 }
1589 }
1590 }
1591
1592 #[cfg(feature = "logic_mesh")]
1595 let mut logic_mesh_entities = 0usize;
1596 #[cfg(feature = "logic_mesh")]
1597 {
1598 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1599
1600 let models_dir = config.models_dir.clone();
1601 let model_path = ner_model_path(&models_dir);
1602 let tokenizer_path = ner_tokenizer_path(&models_dir);
1603
1604 let ner_available = model_path.exists() && tokenizer_path.exists();
1606 let should_run_ner = args.logic_mesh;
1607
1608 if should_run_ner && !ner_available {
1609 if args.json {
1611 summary_warnings.push(
1612 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1613 );
1614 } else {
1615 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1616 eprintln!(" Run: memvid models install --ner distilbert-ner");
1617 }
1618 } else if should_run_ner {
1619 let ner_spinner = if args.json {
1620 None
1621 } else {
1622 Some(create_spinner("Building Logic-Mesh entity graph..."))
1623 };
1624
1625 match NerModel::load(&model_path, &tokenizer_path, None) {
1626 Ok(mut ner_model) => {
1627 let mut mesh = LogicMesh::new();
1628 let mut filtered_count = 0usize;
1629
1630 for report in &reports {
1632 let frame_id = report.frame_id;
1633 if let Some(ref content) = report.search_text {
1635 if !content.is_empty() {
1636 match ner_model.extract(content) {
1637 Ok(raw_entities) => {
1638 let merged_entities = merge_adjacent_entities(raw_entities, content);
1641
1642 for entity in &merged_entities {
1643 if !is_valid_entity(&entity.text, entity.confidence) {
1645 tracing::debug!(
1646 "Filtered entity: '{}' (confidence: {:.0}%)",
1647 entity.text, entity.confidence * 100.0
1648 );
1649 filtered_count += 1;
1650 continue;
1651 }
1652
1653 let kind = entity.to_entity_kind();
1655 let node = MeshNode::new(
1657 entity.text.to_lowercase(),
1658 entity.text.trim().to_string(),
1659 kind,
1660 entity.confidence,
1661 frame_id,
1662 entity.byte_start as u32,
1663 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1664 );
1665 mesh.merge_node(node);
1666 logic_mesh_entities += 1;
1667 }
1668 }
1669 Err(e) => {
1670 warn!("NER extraction failed for frame {frame_id}: {e}");
1671 }
1672 }
1673 }
1674 }
1675 }
1676
1677 if logic_mesh_entities > 0 {
1678 mesh.finalize();
1679 let node_count = mesh.stats().node_count;
1680 mem.set_logic_mesh(mesh);
1682 mem.commit()?;
1683 info!(
1684 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1685 logic_mesh_entities,
1686 node_count,
1687 filtered_count
1688 );
1689 }
1690
1691 if let Some(pb) = ner_spinner {
1692 pb.finish_with_message(format!(
1693 "Logic-Mesh: {} entities ({} unique)",
1694 logic_mesh_entities,
1695 mem.mesh_node_count()
1696 ));
1697 }
1698 }
1699 Err(e) => {
1700 if let Some(pb) = ner_spinner {
1701 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1702 }
1703 if args.json {
1704 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1705 }
1706 }
1707 }
1708 }
1709 }
1710
1711 let stats = mem.stats()?;
1712 if args.json {
1713 if capacity_reached {
1714 summary_warnings.push("capacity_limit_reached".to_string());
1715 }
1716 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1717 let total_duration_ms: Option<u64> = {
1718 let sum: u64 = reports
1719 .iter()
1720 .filter_map(|r| r.extraction.duration_ms)
1721 .sum();
1722 if sum > 0 {
1723 Some(sum)
1724 } else {
1725 None
1726 }
1727 };
1728 let total_pages: Option<u32> = {
1729 let sum: u32 = reports
1730 .iter()
1731 .filter_map(|r| r.extraction.pages_processed)
1732 .sum();
1733 if sum > 0 {
1734 Some(sum)
1735 } else {
1736 None
1737 }
1738 };
1739 let embedding_mode_str = match embedding_mode {
1740 EmbeddingMode::None => "none",
1741 EmbeddingMode::Auto => "auto",
1742 EmbeddingMode::Explicit => "explicit",
1743 };
1744 let summary_json = json!({
1745 "processed": processed,
1746 "ingested": reports.len(),
1747 "skipped": skipped,
1748 "bytes_added": bytes_added,
1749 "bytes_added_human": format_bytes(bytes_added),
1750 "embedded_documents": embedded_docs,
1751 "embedding": {
1752 "enabled": embedding_enabled,
1753 "mode": embedding_mode_str,
1754 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1755 },
1756 "tables": {
1757 "enabled": args.tables && !args.no_tables,
1758 "extracted": tables_extracted,
1759 "tables": table_summaries,
1760 },
1761 "capacity_reached": capacity_reached,
1762 "warnings": summary_warnings,
1763 "extraction": {
1764 "total_duration_ms": total_duration_ms,
1765 "total_pages_processed": total_pages,
1766 },
1767 "memory": {
1768 "path": args.file.display().to_string(),
1769 "frame_count": stats.frame_count,
1770 "size_bytes": stats.size_bytes,
1771 "tier": format!("{:?}", stats.tier),
1772 },
1773 "reports": reports_json,
1774 });
1775 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1776 } else {
1777 println!(
1778 "Committed {} frame(s); total frames {}",
1779 reports.len(),
1780 stats.frame_count
1781 );
1782 println!(
1783 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1784 processed,
1785 reports.len(),
1786 skipped,
1787 format_bytes(bytes_added)
1788 );
1789 if tables_extracted > 0 {
1790 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1791 }
1792 }
1793
1794 #[cfg(feature = "replay")]
1796 let _ = mem.save_active_session();
1797
1798 Ok(())
1799}
1800
1801pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1802 let mut mem = Memvid::open(&args.file)?;
1803 ensure_cli_mutation_allowed(&mem)?;
1804 apply_lock_cli(&mut mem, &args.lock);
1805 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1806 let frame_id = existing.id;
1807
1808 let payload_bytes = match args.input.as_ref() {
1809 Some(path) => Some(read_payload(Some(path))?),
1810 None => None,
1811 };
1812 let payload_slice = payload_bytes.as_deref();
1813 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1814 let source_path = args.input.as_deref();
1815
1816 let mut options = PutOptions::default();
1817 options.enable_embedding = false;
1818 options.auto_tag = payload_slice.is_some();
1819 options.extract_dates = payload_slice.is_some();
1820 options.timestamp = Some(existing.timestamp);
1821 options.track = existing.track.clone();
1822 options.kind = existing.kind.clone();
1823 options.uri = existing.uri.clone();
1824 options.title = existing.title.clone();
1825 options.metadata = existing.metadata.clone();
1826 options.search_text = existing.search_text.clone();
1827 options.tags = existing.tags.clone();
1828 options.labels = existing.labels.clone();
1829 options.extra_metadata = existing.extra_metadata.clone();
1830
1831 if let Some(new_uri) = &args.set_uri {
1832 options.uri = Some(derive_uri(Some(new_uri), None));
1833 }
1834
1835 if options.uri.is_none() && payload_slice.is_some() {
1836 options.uri = Some(derive_uri(None, source_path));
1837 }
1838
1839 if let Some(title) = &args.title {
1840 options.title = Some(title.clone());
1841 }
1842 if let Some(ts) = args.timestamp {
1843 options.timestamp = Some(ts);
1844 }
1845 if let Some(track) = &args.track {
1846 options.track = Some(track.clone());
1847 }
1848 if let Some(kind) = &args.kind {
1849 options.kind = Some(kind.clone());
1850 }
1851
1852 if !args.labels.is_empty() {
1853 options.labels = args.labels.clone();
1854 }
1855
1856 if !args.tags.is_empty() {
1857 options.tags.clear();
1858 for (key, value) in &args.tags {
1859 if !key.is_empty() {
1860 options.tags.push(key.clone());
1861 }
1862 if !value.is_empty() && value != key {
1863 options.tags.push(value.clone());
1864 }
1865 options.extra_metadata.insert(key.clone(), value.clone());
1866 }
1867 }
1868
1869 apply_metadata_overrides(&mut options, &args.metadata);
1870
1871 let mut search_source = options.search_text.clone();
1872
1873 if let Some(payload) = payload_slice {
1874 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1875 if let Some(title) = inferred_title {
1876 options.title = Some(title);
1877 }
1878
1879 if options.uri.is_none() {
1880 options.uri = Some(derive_uri(None, source_path));
1881 }
1882
1883 let analysis = analyze_file(
1884 source_path,
1885 payload,
1886 payload_utf8,
1887 options.title.as_deref(),
1888 AudioAnalyzeOptions::default(),
1889 false,
1890 );
1891 if let Some(meta) = analysis.metadata {
1892 options.metadata = Some(meta);
1893 }
1894 if let Some(text) = analysis.search_text {
1895 search_source = Some(text.clone());
1896 options.search_text = Some(text);
1897 }
1898 } else {
1899 options.auto_tag = false;
1900 options.extract_dates = false;
1901 }
1902
1903 let stats = mem.stats()?;
1904 options.enable_embedding = stats.has_vec_index;
1905
1906 let mv2_dimension = mem.effective_vec_index_dimension()?;
1908 let mut embedding: Option<Vec<f32>> = None;
1909 if args.embeddings {
1910 let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1911 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1912 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1913 let models: Vec<_> = identities
1914 .iter()
1915 .filter_map(|entry| entry.identity.model.as_deref())
1916 .collect();
1917 anyhow::bail!(
1918 "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1919 Detected models: {:?}",
1920 models
1921 );
1922 }
1923 memvid_core::EmbeddingIdentitySummary::Unknown => None,
1924 };
1925
1926 let runtime = load_embedding_runtime_for_mv2(
1927 config,
1928 inferred_embedding_model.as_deref(),
1929 mv2_dimension,
1930 )?;
1931 if let Some(text) = search_source.as_ref() {
1932 if !text.trim().is_empty() {
1933 embedding = Some(runtime.embed_passage(text)?);
1934 }
1935 }
1936 if embedding.is_none() {
1937 if let Some(text) = payload_utf8 {
1938 if !text.trim().is_empty() {
1939 embedding = Some(runtime.embed_passage(text)?);
1940 }
1941 }
1942 }
1943 if embedding.is_none() {
1944 warn!("no textual content available; embeddings not recomputed");
1945 }
1946 if embedding.is_some() {
1947 apply_embedding_identity_metadata(&mut options, &runtime);
1948 }
1949 }
1950
1951 let mut effective_embedding = embedding;
1952 if effective_embedding.is_none() && stats.has_vec_index {
1953 effective_embedding = mem.frame_embedding(frame_id)?;
1954 }
1955
1956 let final_uri = options.uri.clone();
1957 let replaced_payload = payload_slice.is_some();
1958 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1959 mem.commit()?;
1960
1961 let updated_frame =
1962 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1963 mem.frame_by_uri(&uri)?
1964 } else {
1965 let mut query = TimelineQueryBuilder::default();
1966 if let Some(limit) = NonZeroU64::new(1) {
1967 query = query.limit(limit).reverse(true);
1968 }
1969 let latest = mem.timeline(query.build())?;
1970 if let Some(entry) = latest.first() {
1971 mem.frame_by_id(entry.frame_id)?
1972 } else {
1973 mem.frame_by_id(frame_id)?
1974 }
1975 };
1976
1977 if args.json {
1978 let json = json!({
1979 "sequence": seq,
1980 "previous_frame_id": frame_id,
1981 "frame": frame_to_json(&updated_frame),
1982 "replaced_payload": replaced_payload,
1983 });
1984 println!("{}", serde_json::to_string_pretty(&json)?);
1985 } else {
1986 println!(
1987 "Updated frame {} → new frame {} (seq {})",
1988 frame_id, updated_frame.id, seq
1989 );
1990 println!(
1991 "Payload: {}",
1992 if replaced_payload {
1993 "replaced"
1994 } else {
1995 "reused"
1996 }
1997 );
1998 print_frame_summary(&mut mem, &updated_frame)?;
1999 }
2000
2001 Ok(())
2002}
2003
2004fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2005 if let Some(uri) = provided {
2006 let sanitized = sanitize_uri(uri, true);
2007 return format!("mv2://{}", sanitized);
2008 }
2009
2010 if let Some(path) = source {
2011 let raw = path.to_string_lossy();
2012 let sanitized = sanitize_uri(&raw, false);
2013 return format!("mv2://{}", sanitized);
2014 }
2015
2016 format!("mv2://frames/{}", Uuid::new_v4())
2017}
2018
2019pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2020 let digest = hash(payload).to_hex();
2021 let short = &digest[..32];
2022 let ext_from_path = source
2023 .and_then(|path| path.extension())
2024 .and_then(|ext| ext.to_str())
2025 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2026 .filter(|ext| !ext.is_empty());
2027 let ext = ext_from_path
2028 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2029 .unwrap_or_else(|| "bin".to_string());
2030 format!("mv2://video/{short}.{ext}")
2031}
2032
2033fn derive_title(
2034 provided: Option<String>,
2035 source: Option<&Path>,
2036 payload_utf8: Option<&str>,
2037) -> Option<String> {
2038 if let Some(title) = provided {
2039 let trimmed = title.trim();
2040 if trimmed.is_empty() {
2041 None
2042 } else {
2043 Some(trimmed.to_string())
2044 }
2045 } else {
2046 if let Some(text) = payload_utf8 {
2047 if let Some(markdown_title) = extract_markdown_title(text) {
2048 return Some(markdown_title);
2049 }
2050 }
2051 source
2052 .and_then(|path| path.file_stem())
2053 .and_then(|stem| stem.to_str())
2054 .map(to_title_case)
2055 }
2056}
2057
2058fn extract_markdown_title(text: &str) -> Option<String> {
2059 for line in text.lines() {
2060 let trimmed = line.trim();
2061 if trimmed.starts_with('#') {
2062 let title = trimmed.trim_start_matches('#').trim();
2063 if !title.is_empty() {
2064 return Some(title.to_string());
2065 }
2066 }
2067 }
2068 None
2069}
2070
2071fn to_title_case(input: &str) -> String {
2072 if input.is_empty() {
2073 return String::new();
2074 }
2075 let mut chars = input.chars();
2076 let Some(first) = chars.next() else {
2077 return String::new();
2078 };
2079 let prefix = first.to_uppercase().collect::<String>();
2080 prefix + chars.as_str()
2081}
2082
2083fn should_skip_input(path: &Path) -> bool {
2084 matches!(
2085 path.file_name().and_then(|name| name.to_str()),
2086 Some(".DS_Store")
2087 )
2088}
2089
2090fn should_skip_dir(path: &Path) -> bool {
2091 matches!(
2092 path.file_name().and_then(|name| name.to_str()),
2093 Some(name) if name.starts_with('.')
2094 )
2095}
2096
2097enum InputSet {
2098 Stdin,
2099 Files(Vec<PathBuf>),
2100}
2101
2102#[cfg(feature = "clip")]
2104fn is_image_file(path: &std::path::Path) -> bool {
2105 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2106 return false;
2107 };
2108 matches!(
2109 ext.to_ascii_lowercase().as_str(),
2110 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2111 )
2112}
2113
2114fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2115 let Some(raw) = input else {
2116 return Ok(InputSet::Stdin);
2117 };
2118
2119 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2120 let mut files = Vec::new();
2121 let mut matched_any = false;
2122 for entry in glob(raw)? {
2123 let path = entry?;
2124 if path.is_file() {
2125 matched_any = true;
2126 if should_skip_input(&path) {
2127 continue;
2128 }
2129 files.push(path);
2130 }
2131 }
2132 files.sort();
2133 if files.is_empty() {
2134 if matched_any {
2135 anyhow::bail!(
2136 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2137 );
2138 }
2139 anyhow::bail!("pattern '{raw}' did not match any files");
2140 }
2141 return Ok(InputSet::Files(files));
2142 }
2143
2144 let path = PathBuf::from(raw);
2145 if path.is_dir() {
2146 let (mut files, skipped_any) = collect_directory_files(&path)?;
2147 files.sort();
2148 if files.is_empty() {
2149 if skipped_any {
2150 anyhow::bail!(
2151 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2152 path.display()
2153 );
2154 }
2155 anyhow::bail!(
2156 "directory '{}' contains no ingestible files",
2157 path.display()
2158 );
2159 }
2160 Ok(InputSet::Files(files))
2161 } else {
2162 Ok(InputSet::Files(vec![path]))
2163 }
2164}
2165
2166fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2167 let mut files = Vec::new();
2168 let mut skipped_any = false;
2169 let mut stack = vec![root.to_path_buf()];
2170 while let Some(dir) = stack.pop() {
2171 for entry in fs::read_dir(&dir)? {
2172 let entry = entry?;
2173 let path = entry.path();
2174 let file_type = entry.file_type()?;
2175 if file_type.is_dir() {
2176 if should_skip_dir(&path) {
2177 skipped_any = true;
2178 continue;
2179 }
2180 stack.push(path);
2181 } else if file_type.is_file() {
2182 if should_skip_input(&path) {
2183 skipped_any = true;
2184 continue;
2185 }
2186 files.push(path);
2187 }
2188 }
2189 }
2190 Ok((files, skipped_any))
2191}
2192
2193struct IngestOutcome {
2194 report: PutReport,
2195 embedded: bool,
2196 #[cfg(feature = "parallel_segments")]
2197 parallel_input: Option<ParallelInput>,
2198}
2199
2200struct PutReport {
2201 seq: u64,
2202 #[allow(dead_code)] frame_id: u64,
2204 uri: String,
2205 title: Option<String>,
2206 original_bytes: usize,
2207 stored_bytes: usize,
2208 compressed: bool,
2209 source: Option<PathBuf>,
2210 mime: Option<String>,
2211 metadata: Option<DocMetadata>,
2212 extraction: ExtractionSummary,
2213 #[cfg(feature = "logic_mesh")]
2215 search_text: Option<String>,
2216}
2217
2218struct CapacityGuard {
2219 #[allow(dead_code)]
2220 tier: Tier,
2221 capacity: u64,
2222 current_size: u64,
2223}
2224
2225impl CapacityGuard {
2226 const MIN_OVERHEAD: u64 = 128 * 1024;
2227 const RELATIVE_OVERHEAD: f64 = 0.05;
2228
2229 fn from_stats(stats: &Stats) -> Option<Self> {
2230 if stats.capacity_bytes == 0 {
2231 return None;
2232 }
2233 Some(Self {
2234 tier: stats.tier,
2235 capacity: stats.capacity_bytes,
2236 current_size: stats.size_bytes,
2237 })
2238 }
2239
2240 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2241 let projected = self
2242 .current_size
2243 .saturating_add(additional)
2244 .saturating_add(Self::estimate_overhead(additional));
2245 if projected > self.capacity {
2246 return Err(map_put_error(
2247 MemvidError::CapacityExceeded {
2248 current: self.current_size,
2249 limit: self.capacity,
2250 required: projected,
2251 },
2252 Some(self.capacity),
2253 ));
2254 }
2255 Ok(())
2256 }
2257
2258 fn update_after_commit(&mut self, stats: &Stats) {
2259 self.current_size = stats.size_bytes;
2260 }
2261
2262 fn capacity_hint(&self) -> Option<u64> {
2263 Some(self.capacity)
2264 }
2265
2266 fn check_and_warn_capacity(&self) {
2268 if self.capacity == 0 {
2269 return;
2270 }
2271
2272 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2273
2274 if usage_pct >= 90.0 && usage_pct < 91.0 {
2276 eprintln!(
2277 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2278 format_bytes(self.current_size),
2279 format_bytes(self.capacity)
2280 );
2281 eprintln!(" Actions:");
2282 eprintln!(" 1. Recreate with larger --size");
2283 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2284 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2285 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2286 eprintln!(
2287 "⚠️ WARNING: 75% capacity used ({} / {})",
2288 format_bytes(self.current_size),
2289 format_bytes(self.capacity)
2290 );
2291 eprintln!(" Consider planning for capacity expansion soon");
2292 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2293 eprintln!(
2294 "ℹ️ INFO: 50% capacity used ({} / {})",
2295 format_bytes(self.current_size),
2296 format_bytes(self.capacity)
2297 );
2298 }
2299 }
2300
2301 fn estimate_overhead(additional: u64) -> u64 {
2302 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2303 fractional.max(Self::MIN_OVERHEAD)
2304 }
2305}
2306
2307#[derive(Debug, Clone)]
2308pub struct FileAnalysis {
2309 pub mime: String,
2310 pub metadata: Option<DocMetadata>,
2311 pub search_text: Option<String>,
2312 pub extraction: ExtractionSummary,
2313}
2314
2315#[derive(Clone, Copy)]
2316pub struct AudioAnalyzeOptions {
2317 force: bool,
2318 segment_secs: u32,
2319 transcribe: bool,
2320}
2321
2322impl AudioAnalyzeOptions {
2323 const DEFAULT_SEGMENT_SECS: u32 = 30;
2324
2325 fn normalised_segment_secs(self) -> u32 {
2326 self.segment_secs.max(1)
2327 }
2328}
2329
2330impl Default for AudioAnalyzeOptions {
2331 fn default() -> Self {
2332 Self {
2333 force: false,
2334 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2335 transcribe: false,
2336 }
2337 }
2338}
2339
2340struct AudioAnalysis {
2341 metadata: DocAudioMetadata,
2342 caption: Option<String>,
2343 search_terms: Vec<String>,
2344 transcript: Option<String>,
2345}
2346
2347struct ImageAnalysis {
2348 width: u32,
2349 height: u32,
2350 palette: Vec<String>,
2351 caption: Option<String>,
2352 exif: Option<DocExifMetadata>,
2353}
2354
2355#[derive(Debug, Clone)]
2356pub struct ExtractionSummary {
2357 reader: Option<String>,
2358 status: ExtractionStatus,
2359 warnings: Vec<String>,
2360 duration_ms: Option<u64>,
2361 pages_processed: Option<u32>,
2362}
2363
2364impl ExtractionSummary {
2365 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2366 self.warnings.push(warning.into());
2367 }
2368}
2369
2370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2371enum ExtractionStatus {
2372 Skipped,
2373 Ok,
2374 FallbackUsed,
2375 Empty,
2376 Failed,
2377}
2378
2379impl ExtractionStatus {
2380 fn label(self) -> &'static str {
2381 match self {
2382 Self::Skipped => "skipped",
2383 Self::Ok => "ok",
2384 Self::FallbackUsed => "fallback_used",
2385 Self::Empty => "empty",
2386 Self::Failed => "failed",
2387 }
2388 }
2389}
2390
2391fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2392 let filename = source_path
2393 .and_then(|path| path.file_name())
2394 .and_then(|name| name.to_str())
2395 .map(|value| value.to_string());
2396 MediaManifest {
2397 kind: "video".to_string(),
2398 mime: mime.to_string(),
2399 bytes,
2400 filename,
2401 duration_ms: None,
2402 width: None,
2403 height: None,
2404 codec: None,
2405 }
2406}
2407
2408pub fn analyze_file(
2409 source_path: Option<&Path>,
2410 payload: &[u8],
2411 payload_utf8: Option<&str>,
2412 inferred_title: Option<&str>,
2413 audio_opts: AudioAnalyzeOptions,
2414 force_video: bool,
2415) -> FileAnalysis {
2416 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2417 let is_video = force_video || mime.starts_with("video/");
2418 let treat_as_text = if is_video { false } else { treat_as_text_base };
2419
2420 let mut metadata = DocMetadata::default();
2421 metadata.mime = Some(mime.clone());
2422 metadata.bytes = Some(payload.len() as u64);
2423 metadata.hash = Some(hash(payload).to_hex().to_string());
2424
2425 let mut search_text = None;
2426
2427 let mut extraction = ExtractionSummary {
2428 reader: None,
2429 status: ExtractionStatus::Skipped,
2430 warnings: Vec::new(),
2431 duration_ms: None,
2432 pages_processed: None,
2433 };
2434
2435 let reader_registry = default_reader_registry();
2436 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2437 if slice.is_empty() {
2438 None
2439 } else {
2440 Some(slice)
2441 }
2442 });
2443
2444 let apply_extracted_text = |text: &str,
2445 reader_label: &str,
2446 status_if_applied: ExtractionStatus,
2447 extraction: &mut ExtractionSummary,
2448 metadata: &mut DocMetadata,
2449 search_text: &mut Option<String>|
2450 -> bool {
2451 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2452 let mut value = normalized.text;
2453 if normalized.truncated {
2454 value.push('…');
2455 }
2456 if metadata.caption.is_none() {
2457 if let Some(caption) = caption_from_text(&value) {
2458 metadata.caption = Some(caption);
2459 }
2460 }
2461 *search_text = Some(value);
2462 extraction.reader = Some(reader_label.to_string());
2463 extraction.status = status_if_applied;
2464 true
2465 } else {
2466 false
2467 }
2468 };
2469
2470 if is_video {
2471 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2472 }
2473
2474 if treat_as_text {
2475 if let Some(text) = payload_utf8 {
2476 if !apply_extracted_text(
2477 text,
2478 "bytes",
2479 ExtractionStatus::Ok,
2480 &mut extraction,
2481 &mut metadata,
2482 &mut search_text,
2483 ) {
2484 extraction.status = ExtractionStatus::Empty;
2485 extraction.reader = Some("bytes".to_string());
2486 let msg = "text payload contained no searchable content after normalization";
2487 extraction.record_warning(msg);
2488 warn!("{msg}");
2489 }
2490 } else {
2491 extraction.reader = Some("bytes".to_string());
2492 extraction.status = ExtractionStatus::Failed;
2493 let msg = "payload reported as text but was not valid UTF-8";
2494 extraction.record_warning(msg);
2495 warn!("{msg}");
2496 }
2497 } else if mime == "application/pdf"
2498 || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2499 || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2500 || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2501 || mime == "application/vnd.ms-excel"
2502 || mime == "application/msword"
2503 {
2504 let mut applied = false;
2506
2507 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2508 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2509 .with_uri(source_path.and_then(|p| p.to_str()))
2510 .with_magic(magic);
2511
2512 if let Some(reader) = reader_registry.find_reader(&hint) {
2513 match reader.extract(payload, &hint) {
2514 Ok(output) => {
2515 extraction.reader = Some(output.reader_name.clone());
2516 if let Some(ms) = output.diagnostics.duration_ms {
2517 extraction.duration_ms = Some(ms);
2518 }
2519 if let Some(pages) = output.diagnostics.pages_processed {
2520 extraction.pages_processed = Some(pages);
2521 }
2522 if let Some(mime_type) = output.document.mime_type.clone() {
2523 metadata.mime = Some(mime_type);
2524 }
2525
2526 for warning in output.diagnostics.warnings.iter() {
2527 extraction.record_warning(warning);
2528 warn!("{warning}");
2529 }
2530
2531 let status = if output.diagnostics.fallback {
2532 ExtractionStatus::FallbackUsed
2533 } else {
2534 ExtractionStatus::Ok
2535 };
2536
2537 if let Some(doc_text) = output.document.text.as_ref() {
2538 applied = apply_extracted_text(
2539 doc_text,
2540 output.reader_name.as_str(),
2541 status,
2542 &mut extraction,
2543 &mut metadata,
2544 &mut search_text,
2545 );
2546 if !applied {
2547 extraction.reader = Some(output.reader_name);
2548 extraction.status = ExtractionStatus::Empty;
2549 let msg = "primary reader returned no usable text";
2550 extraction.record_warning(msg);
2551 warn!("{msg}");
2552 }
2553 } else {
2554 extraction.reader = Some(output.reader_name);
2555 extraction.status = ExtractionStatus::Empty;
2556 let msg = "primary reader returned empty text";
2557 extraction.record_warning(msg);
2558 warn!("{msg}");
2559 }
2560 }
2561 Err(err) => {
2562 let name = reader.name();
2563 extraction.reader = Some(name.to_string());
2564 extraction.status = ExtractionStatus::Failed;
2565 let msg = format!("primary reader {name} failed: {err}");
2566 extraction.record_warning(&msg);
2567 warn!("{msg}");
2568 }
2569 }
2570 } else {
2571 extraction.record_warning("no registered reader matched this PDF payload");
2572 warn!("no registered reader matched this PDF payload");
2573 }
2574
2575 if !applied {
2576 match extract_pdf_text(payload) {
2577 Ok(pdf_text) => {
2578 if !apply_extracted_text(
2579 &pdf_text,
2580 "pdf_extract",
2581 ExtractionStatus::FallbackUsed,
2582 &mut extraction,
2583 &mut metadata,
2584 &mut search_text,
2585 ) {
2586 extraction.reader = Some("pdf_extract".to_string());
2587 extraction.status = ExtractionStatus::Empty;
2588 let msg = "PDF text extraction yielded no searchable content";
2589 extraction.record_warning(msg);
2590 warn!("{msg}");
2591 }
2592 }
2593 Err(err) => {
2594 extraction.reader = Some("pdf_extract".to_string());
2595 extraction.status = ExtractionStatus::Failed;
2596 let msg = format!("failed to extract PDF text via fallback: {err}");
2597 extraction.record_warning(&msg);
2598 warn!("{msg}");
2599 }
2600 }
2601 }
2602 }
2603
2604 if !is_video && mime.starts_with("image/") {
2605 if let Some(image_meta) = analyze_image(payload) {
2606 let ImageAnalysis {
2607 width,
2608 height,
2609 palette,
2610 caption,
2611 exif,
2612 } = image_meta;
2613 metadata.width = Some(width);
2614 metadata.height = Some(height);
2615 if !palette.is_empty() {
2616 metadata.colors = Some(palette);
2617 }
2618 if metadata.caption.is_none() {
2619 metadata.caption = caption;
2620 }
2621 if let Some(exif) = exif {
2622 metadata.exif = Some(exif);
2623 }
2624 }
2625 }
2626
2627 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2628 if let Some(AudioAnalysis {
2629 metadata: audio_metadata,
2630 caption: audio_caption,
2631 mut search_terms,
2632 transcript,
2633 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2634 {
2635 if metadata.audio.is_none()
2636 || metadata
2637 .audio
2638 .as_ref()
2639 .map_or(true, DocAudioMetadata::is_empty)
2640 {
2641 metadata.audio = Some(audio_metadata);
2642 }
2643 if metadata.caption.is_none() {
2644 metadata.caption = audio_caption;
2645 }
2646
2647 if let Some(ref text) = transcript {
2649 extraction.reader = Some("whisper".to_string());
2650 extraction.status = ExtractionStatus::Ok;
2651 search_text = Some(text.clone());
2652 } else if !search_terms.is_empty() {
2653 match &mut search_text {
2654 Some(existing) => {
2655 for term in search_terms.drain(..) {
2656 if !existing.contains(&term) {
2657 if !existing.ends_with(' ') {
2658 existing.push(' ');
2659 }
2660 existing.push_str(&term);
2661 }
2662 }
2663 }
2664 None => {
2665 search_text = Some(search_terms.join(" "));
2666 }
2667 }
2668 }
2669 }
2670 }
2671
2672 if metadata.caption.is_none() {
2673 if let Some(title) = inferred_title {
2674 let trimmed = title.trim();
2675 if !trimmed.is_empty() {
2676 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2677 }
2678 }
2679 }
2680
2681 FileAnalysis {
2682 mime,
2683 metadata: if metadata.is_empty() {
2684 None
2685 } else {
2686 Some(metadata)
2687 },
2688 search_text,
2689 extraction,
2690 }
2691}
2692
2693fn default_reader_registry() -> &'static ReaderRegistry {
2694 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2695 REGISTRY.get_or_init(ReaderRegistry::default)
2696}
2697
2698fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2699 if detect_pdf_magic(magic) {
2700 return Some(DocumentFormat::Pdf);
2701 }
2702
2703 let mime = mime?.trim().to_ascii_lowercase();
2704 match mime.as_str() {
2705 "application/pdf" => Some(DocumentFormat::Pdf),
2706 "text/plain" => Some(DocumentFormat::PlainText),
2707 "text/markdown" => Some(DocumentFormat::Markdown),
2708 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2709 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2710 Some(DocumentFormat::Docx)
2711 }
2712 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2713 Some(DocumentFormat::Pptx)
2714 }
2715 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2716 Some(DocumentFormat::Xlsx)
2717 }
2718 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2719 _ => None,
2720 }
2721}
2722
2723fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2724 let mut slice = match magic {
2725 Some(slice) if !slice.is_empty() => slice,
2726 _ => return false,
2727 };
2728 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2729 slice = &slice[3..];
2730 }
2731 while let Some((first, rest)) = slice.split_first() {
2732 if first.is_ascii_whitespace() {
2733 slice = rest;
2734 } else {
2735 break;
2736 }
2737 }
2738 slice.starts_with(b"%PDF")
2739}
2740
2741fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2744 let mut best_text = String::new();
2745 let mut best_source = "none";
2746
2747 let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2750
2751 match pdf_extract::extract_text_from_mem(payload) {
2753 Ok(text) => {
2754 let trimmed = text.trim();
2755 if trimmed.len() > best_text.len() {
2756 best_text = trimmed.to_string();
2757 best_source = "pdf_extract";
2758 }
2759 if trimmed.len() >= min_good_chars {
2760 info!("pdf_extract extracted {} chars (good)", trimmed.len());
2761 return Ok(best_text);
2762 } else if !trimmed.is_empty() {
2763 warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2764 }
2765 }
2766 Err(err) => {
2767 warn!("pdf_extract failed: {err}");
2768 }
2769 }
2770
2771 match extract_pdf_with_lopdf(payload) {
2773 Ok(text) => {
2774 let trimmed = text.trim();
2775 if trimmed.len() > best_text.len() {
2776 best_text = trimmed.to_string();
2777 best_source = "lopdf";
2778 }
2779 if trimmed.len() >= min_good_chars {
2780 info!("lopdf extracted {} chars (good)", trimmed.len());
2781 return Ok(best_text);
2782 } else if !trimmed.is_empty() {
2783 warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2784 }
2785 }
2786 Err(err) => {
2787 warn!("lopdf failed: {err}");
2788 }
2789 }
2790
2791 match fallback_pdftotext(payload) {
2793 Ok(text) => {
2794 let trimmed = text.trim();
2795 if trimmed.len() > best_text.len() {
2796 best_text = trimmed.to_string();
2797 best_source = "pdftotext";
2798 }
2799 if !trimmed.is_empty() {
2800 info!("pdftotext extracted {} chars", trimmed.len());
2801 }
2802 }
2803 Err(err) => {
2804 warn!("pdftotext failed: {err}");
2805 }
2806 }
2807
2808 if !best_text.is_empty() {
2810 info!("using {} extraction ({} chars)", best_source, best_text.len());
2811 Ok(best_text)
2812 } else {
2813 warn!("all PDF extraction methods failed, storing without searchable text");
2814 Ok(String::new())
2815 }
2816}
2817
2818fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2820 use lopdf::Document;
2821
2822 let mut document = Document::load_mem(payload)
2823 .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2824
2825 if document.is_encrypted() {
2827 let _ = document.decrypt("");
2828 }
2829
2830 let _ = document.decompress();
2832
2833 let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
2835 if page_numbers.is_empty() {
2836 return Err(anyhow!("PDF has no pages"));
2837 }
2838 page_numbers.sort_unstable();
2839
2840 if page_numbers.len() > 4096 {
2842 page_numbers.truncate(4096);
2843 warn!("PDF has more than 4096 pages, truncating extraction");
2844 }
2845
2846 let text = document
2848 .extract_text(&page_numbers)
2849 .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
2850
2851 Ok(text.trim().to_string())
2852}
2853
2854fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2855 use std::io::Write;
2856 use std::process::{Command, Stdio};
2857
2858 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2859
2860 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2861 temp_pdf
2862 .write_all(payload)
2863 .context("failed to write pdf payload to temp file")?;
2864 temp_pdf.flush().context("failed to flush temp pdf file")?;
2865
2866 let child = Command::new(pdftotext)
2867 .arg("-layout")
2868 .arg(temp_pdf.path())
2869 .arg("-")
2870 .stdout(Stdio::piped())
2871 .spawn()
2872 .context("failed to spawn pdftotext")?;
2873
2874 let output = child
2875 .wait_with_output()
2876 .context("failed to read pdftotext output")?;
2877
2878 if !output.status.success() {
2879 return Err(anyhow!("pdftotext exited with status {}", output.status));
2880 }
2881
2882 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2883 Ok(text)
2884}
2885
2886fn detect_mime(
2887 source_path: Option<&Path>,
2888 payload: &[u8],
2889 payload_utf8: Option<&str>,
2890) -> (String, bool) {
2891 if let Some(kind) = infer::get(payload) {
2892 let mime = kind.mime_type().to_string();
2893 let treat_as_text = mime.starts_with("text/")
2894 || matches!(
2895 mime.as_str(),
2896 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2897 );
2898 return (mime, treat_as_text);
2899 }
2900
2901 let magic = magic_from_u8(payload);
2902 if !magic.is_empty() && magic != "application/octet-stream" {
2903 let treat_as_text = magic.starts_with("text/")
2904 || matches!(
2905 magic,
2906 "application/json"
2907 | "application/xml"
2908 | "application/javascript"
2909 | "image/svg+xml"
2910 | "text/plain"
2911 );
2912 return (magic.to_string(), treat_as_text);
2913 }
2914
2915 if let Some(path) = source_path {
2916 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2917 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2918 return (mime.to_string(), treat_as_text);
2919 }
2920 }
2921 }
2922
2923 if payload_utf8.is_some() {
2924 return ("text/plain".to_string(), true);
2925 }
2926
2927 ("application/octet-stream".to_string(), false)
2928}
2929
2930fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2931 let ext_lower = ext.to_ascii_lowercase();
2932 match ext_lower.as_str() {
2933 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2934 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2935 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2936 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2937 "md" | "markdown" => Some(("text/markdown", true)),
2938 "rst" => Some(("text/x-rst", true)),
2939 "json" => Some(("application/json", true)),
2940 "csv" => Some(("text/csv", true)),
2941 "tsv" => Some(("text/tab-separated-values", true)),
2942 "yaml" | "yml" => Some(("application/yaml", true)),
2943 "toml" => Some(("application/toml", true)),
2944 "html" | "htm" => Some(("text/html", true)),
2945 "xml" => Some(("application/xml", true)),
2946 "svg" => Some(("image/svg+xml", true)),
2947 "gif" => Some(("image/gif", false)),
2948 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2949 "png" => Some(("image/png", false)),
2950 "bmp" => Some(("image/bmp", false)),
2951 "ico" => Some(("image/x-icon", false)),
2952 "tif" | "tiff" => Some(("image/tiff", false)),
2953 "webp" => Some(("image/webp", false)),
2954 _ => None,
2955 }
2956}
2957
2958fn caption_from_text(text: &str) -> Option<String> {
2959 for line in text.lines() {
2960 let trimmed = line.trim();
2961 if !trimmed.is_empty() {
2962 return Some(truncate_to_boundary(trimmed, 240));
2963 }
2964 }
2965 None
2966}
2967
2968fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2969 if text.len() <= max_len {
2970 return text.to_string();
2971 }
2972 let mut end = max_len;
2973 while end > 0 && !text.is_char_boundary(end) {
2974 end -= 1;
2975 }
2976 if end == 0 {
2977 return String::new();
2978 }
2979 let mut truncated = text[..end].trim_end().to_string();
2980 truncated.push('…');
2981 truncated
2982}
2983
2984fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2985 let reader = ImageReader::new(Cursor::new(payload))
2986 .with_guessed_format()
2987 .map_err(|err| warn!("failed to guess image format: {err}"))
2988 .ok()?;
2989 let image = reader
2990 .decode()
2991 .map_err(|err| warn!("failed to decode image: {err}"))
2992 .ok()?;
2993 let width = image.width();
2994 let height = image.height();
2995 let rgb = image.to_rgb8();
2996 let palette = match get_palette(
2997 rgb.as_raw(),
2998 ColorFormat::Rgb,
2999 COLOR_PALETTE_SIZE,
3000 COLOR_PALETTE_QUALITY,
3001 ) {
3002 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3003 Err(err) => {
3004 warn!("failed to compute colour palette: {err}");
3005 Vec::new()
3006 }
3007 };
3008
3009 let (exif, exif_caption) = extract_exif_metadata(payload);
3010
3011 Some(ImageAnalysis {
3012 width,
3013 height,
3014 palette,
3015 caption: exif_caption,
3016 exif,
3017 })
3018}
3019
3020fn color_to_hex(color: Color) -> String {
3021 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3022}
3023
3024fn analyze_audio(
3025 payload: &[u8],
3026 source_path: Option<&Path>,
3027 mime: &str,
3028 options: AudioAnalyzeOptions,
3029) -> Option<AudioAnalysis> {
3030 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3031 use symphonia::core::errors::Error as SymphoniaError;
3032 use symphonia::core::formats::FormatOptions;
3033 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3034 use symphonia::core::meta::MetadataOptions;
3035 use symphonia::core::probe::Hint;
3036
3037 let cursor = Cursor::new(payload.to_vec());
3038 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3039
3040 let mut hint = Hint::new();
3041 if let Some(path) = source_path {
3042 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3043 hint.with_extension(ext);
3044 }
3045 }
3046 if let Some(suffix) = mime.split('/').nth(1) {
3047 hint.with_extension(suffix);
3048 }
3049
3050 let probed = symphonia::default::get_probe()
3051 .format(
3052 &hint,
3053 mss,
3054 &FormatOptions::default(),
3055 &MetadataOptions::default(),
3056 )
3057 .map_err(|err| warn!("failed to probe audio stream: {err}"))
3058 .ok()?;
3059
3060 let mut format = probed.format;
3061 let track = format.default_track()?;
3062 let track_id = track.id;
3063 let codec_params: CodecParameters = track.codec_params.clone();
3064 let sample_rate = codec_params.sample_rate;
3065 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3066
3067 let mut decoder = symphonia::default::get_codecs()
3068 .make(&codec_params, &DecoderOptions::default())
3069 .map_err(|err| warn!("failed to create audio decoder: {err}"))
3070 .ok()?;
3071
3072 let mut decoded_frames: u64 = 0;
3073 loop {
3074 let packet = match format.next_packet() {
3075 Ok(packet) => packet,
3076 Err(SymphoniaError::IoError(_)) => break,
3077 Err(SymphoniaError::DecodeError(err)) => {
3078 warn!("skipping undecodable audio packet: {err}");
3079 continue;
3080 }
3081 Err(SymphoniaError::ResetRequired) => {
3082 decoder.reset();
3083 continue;
3084 }
3085 Err(other) => {
3086 warn!("stopping audio analysis due to error: {other}");
3087 break;
3088 }
3089 };
3090
3091 if packet.track_id() != track_id {
3092 continue;
3093 }
3094
3095 match decoder.decode(&packet) {
3096 Ok(audio_buf) => {
3097 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3098 }
3099 Err(SymphoniaError::DecodeError(err)) => {
3100 warn!("failed to decode audio packet: {err}");
3101 }
3102 Err(SymphoniaError::IoError(_)) => break,
3103 Err(SymphoniaError::ResetRequired) => {
3104 decoder.reset();
3105 }
3106 Err(other) => {
3107 warn!("decoder error: {other}");
3108 break;
3109 }
3110 }
3111 }
3112
3113 let duration_secs = match sample_rate {
3114 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3115 _ => 0.0,
3116 };
3117
3118 let bitrate_kbps = if duration_secs > 0.0 {
3119 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3120 } else {
3121 None
3122 };
3123
3124 let tags = extract_lofty_tags(payload);
3125 let caption = tags
3126 .get("title")
3127 .cloned()
3128 .or_else(|| tags.get("tracktitle").cloned());
3129
3130 let mut search_terms = Vec::new();
3131 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3132 search_terms.push(title.clone());
3133 }
3134 if let Some(artist) = tags.get("artist") {
3135 search_terms.push(artist.clone());
3136 }
3137 if let Some(album) = tags.get("album") {
3138 search_terms.push(album.clone());
3139 }
3140 if let Some(genre) = tags.get("genre") {
3141 search_terms.push(genre.clone());
3142 }
3143
3144 let mut segments = Vec::new();
3145 if duration_secs > 0.0 {
3146 let segment_len = options.normalised_segment_secs() as f64;
3147 if segment_len > 0.0 {
3148 let mut start = 0.0;
3149 let mut idx = 1usize;
3150 while start < duration_secs {
3151 let end = (start + segment_len).min(duration_secs);
3152 segments.push(AudioSegmentMetadata {
3153 start_seconds: start as f32,
3154 end_seconds: end as f32,
3155 label: Some(format!("Segment {}", idx)),
3156 });
3157 if end >= duration_secs {
3158 break;
3159 }
3160 start = end;
3161 idx += 1;
3162 }
3163 }
3164 }
3165
3166 let mut metadata = DocAudioMetadata::default();
3167 if duration_secs > 0.0 {
3168 metadata.duration_secs = Some(duration_secs as f32);
3169 }
3170 if let Some(rate) = sample_rate {
3171 metadata.sample_rate_hz = Some(rate);
3172 }
3173 if let Some(channels) = channel_count {
3174 metadata.channels = Some(channels);
3175 }
3176 if let Some(bitrate) = bitrate_kbps {
3177 metadata.bitrate_kbps = Some(bitrate);
3178 }
3179 if codec_params.codec != CODEC_TYPE_NULL {
3180 metadata.codec = Some(format!("{:?}", codec_params.codec));
3181 }
3182 if !segments.is_empty() {
3183 metadata.segments = segments;
3184 }
3185 if !tags.is_empty() {
3186 metadata.tags = tags
3187 .iter()
3188 .map(|(k, v)| (k.clone(), v.clone()))
3189 .collect::<BTreeMap<_, _>>();
3190 }
3191
3192 let transcript = if options.transcribe {
3194 transcribe_audio(source_path)
3195 } else {
3196 None
3197 };
3198
3199 Some(AudioAnalysis {
3200 metadata,
3201 caption,
3202 search_terms,
3203 transcript,
3204 })
3205}
3206
3207#[cfg(feature = "whisper")]
3209fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3210 use memvid_core::{WhisperConfig, WhisperTranscriber};
3211
3212 let path = source_path?;
3213
3214 tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3215
3216 let config = WhisperConfig::default();
3217 let mut transcriber = match WhisperTranscriber::new(&config) {
3218 Ok(t) => t,
3219 Err(e) => {
3220 tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3221 return None;
3222 }
3223 };
3224
3225 match transcriber.transcribe_file(path) {
3226 Ok(result) => {
3227 tracing::info!(
3228 duration = result.duration_secs,
3229 text_len = result.text.len(),
3230 "Audio transcription complete"
3231 );
3232 Some(result.text)
3233 }
3234 Err(e) => {
3235 tracing::warn!(error = %e, "Failed to transcribe audio");
3236 None
3237 }
3238 }
3239}
3240
3241#[cfg(not(feature = "whisper"))]
3242fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3243 tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3244 None
3245}
3246
3247fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3248 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3249
3250 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3251 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3252 out.entry("title".into())
3253 .or_insert_with(|| value.to_string());
3254 out.entry("tracktitle".into())
3255 .or_insert_with(|| value.to_string());
3256 }
3257 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3258 out.entry("artist".into())
3259 .or_insert_with(|| value.to_string());
3260 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3261 out.entry("artist".into())
3262 .or_insert_with(|| value.to_string());
3263 }
3264 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3265 out.entry("album".into())
3266 .or_insert_with(|| value.to_string());
3267 }
3268 if let Some(value) = tag.get_string(&ItemKey::Genre) {
3269 out.entry("genre".into())
3270 .or_insert_with(|| value.to_string());
3271 }
3272 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3273 out.entry("track_number".into())
3274 .or_insert_with(|| value.to_string());
3275 }
3276 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3277 out.entry("disc_number".into())
3278 .or_insert_with(|| value.to_string());
3279 }
3280 }
3281
3282 let mut tags = HashMap::new();
3283 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3284 Ok(probe) => probe,
3285 Err(err) => {
3286 warn!("failed to guess audio tag file type: {err}");
3287 return tags;
3288 }
3289 };
3290
3291 let tagged_file = match probe.read() {
3292 Ok(file) => file,
3293 Err(err) => {
3294 warn!("failed to read audio tags: {err}");
3295 return tags;
3296 }
3297 };
3298
3299 if let Some(primary) = tagged_file.primary_tag() {
3300 collect_tag(primary, &mut tags);
3301 }
3302 for tag in tagged_file.tags() {
3303 collect_tag(tag, &mut tags);
3304 }
3305
3306 tags
3307}
3308
3309fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3310 let mut cursor = Cursor::new(payload);
3311 let exif = match ExifReader::new().read_from_container(&mut cursor) {
3312 Ok(exif) => exif,
3313 Err(err) => {
3314 warn!("failed to read EXIF metadata: {err}");
3315 return (None, None);
3316 }
3317 };
3318
3319 let mut doc = DocExifMetadata::default();
3320 let mut has_data = false;
3321
3322 if let Some(make) = exif_string(&exif, Tag::Make) {
3323 doc.make = Some(make);
3324 has_data = true;
3325 }
3326 if let Some(model) = exif_string(&exif, Tag::Model) {
3327 doc.model = Some(model);
3328 has_data = true;
3329 }
3330 if let Some(lens) =
3331 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3332 {
3333 doc.lens = Some(lens);
3334 has_data = true;
3335 }
3336 if let Some(dt) =
3337 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3338 {
3339 doc.datetime = Some(dt);
3340 has_data = true;
3341 }
3342
3343 if let Some(gps) = extract_gps_metadata(&exif) {
3344 doc.gps = Some(gps);
3345 has_data = true;
3346 }
3347
3348 let caption = exif_string(&exif, Tag::ImageDescription);
3349
3350 let metadata = if has_data { Some(doc) } else { None };
3351 (metadata, caption)
3352}
3353
3354fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3355 exif.fields()
3356 .find(|field| field.tag == tag)
3357 .and_then(|field| field_to_string(field, exif))
3358}
3359
3360fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3361 let value = field.display_value().with_unit(exif).to_string();
3362 let trimmed = value.trim_matches('\0').trim();
3363 if trimmed.is_empty() {
3364 None
3365 } else {
3366 Some(trimmed.to_string())
3367 }
3368}
3369
3370fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3371 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3372 let latitude_ref_field = exif
3373 .fields()
3374 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3375 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3376 let longitude_ref_field = exif
3377 .fields()
3378 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3379
3380 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3381 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3382
3383 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3384 if reference.eq_ignore_ascii_case("S") {
3385 latitude = -latitude;
3386 }
3387 }
3388 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3389 if reference.eq_ignore_ascii_case("W") {
3390 longitude = -longitude;
3391 }
3392 }
3393
3394 Some(DocGpsMetadata {
3395 latitude,
3396 longitude,
3397 })
3398}
3399
3400fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3401 match value {
3402 ExifValue::Rational(values) if !values.is_empty() => {
3403 let deg = rational_to_f64_u(values.get(0)?)?;
3404 let min = values
3405 .get(1)
3406 .and_then(|v| rational_to_f64_u(v))
3407 .unwrap_or(0.0);
3408 let sec = values
3409 .get(2)
3410 .and_then(|v| rational_to_f64_u(v))
3411 .unwrap_or(0.0);
3412 Some(deg + (min / 60.0) + (sec / 3600.0))
3413 }
3414 ExifValue::SRational(values) if !values.is_empty() => {
3415 let deg = rational_to_f64_i(values.get(0)?)?;
3416 let min = values
3417 .get(1)
3418 .and_then(|v| rational_to_f64_i(v))
3419 .unwrap_or(0.0);
3420 let sec = values
3421 .get(2)
3422 .and_then(|v| rational_to_f64_i(v))
3423 .unwrap_or(0.0);
3424 Some(deg + (min / 60.0) + (sec / 3600.0))
3425 }
3426 _ => None,
3427 }
3428}
3429
3430fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3431 if value.denom == 0 {
3432 None
3433 } else {
3434 Some(value.num as f64 / value.denom as f64)
3435 }
3436}
3437
3438fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3439 if value.denom == 0 {
3440 None
3441 } else {
3442 Some(value.num as f64 / value.denom as f64)
3443 }
3444}
3445
3446fn value_to_ascii(value: &ExifValue) -> Option<String> {
3447 if let ExifValue::Ascii(values) = value {
3448 values
3449 .first()
3450 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3451 .map(|s| s.trim_matches('\0').trim().to_string())
3452 .filter(|s| !s.is_empty())
3453 } else {
3454 None
3455 }
3456}
3457
3458fn ingest_payload(
3459 mem: &mut Memvid,
3460 args: &PutArgs,
3461 config: &CliConfig,
3462 payload: Vec<u8>,
3463 source_path: Option<&Path>,
3464 capacity_guard: Option<&mut CapacityGuard>,
3465 enable_embedding: bool,
3466 runtime: Option<&EmbeddingRuntime>,
3467 parallel_mode: bool,
3468) -> Result<IngestOutcome> {
3469 let original_bytes = payload.len();
3470 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3471
3472 let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3475 crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3476
3477 let payload_text = std::str::from_utf8(&payload).ok();
3478 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3479
3480 let audio_opts = AudioAnalyzeOptions {
3481 force: args.audio || args.transcribe,
3482 segment_secs: args
3483 .audio_segment_seconds
3484 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3485 transcribe: args.transcribe,
3486 };
3487
3488 let mut analysis = analyze_file(
3489 source_path,
3490 &payload,
3491 payload_text,
3492 inferred_title.as_deref(),
3493 audio_opts,
3494 args.video,
3495 );
3496
3497 if args.video && !analysis.mime.starts_with("video/") {
3498 anyhow::bail!(
3499 "--video requires a video input; detected MIME type {}",
3500 analysis.mime
3501 );
3502 }
3503
3504 let mut search_text = analysis.search_text.clone();
3505 if let Some(ref mut text) = search_text {
3506 if text.len() > MAX_SEARCH_TEXT_LEN {
3507 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3508 *text = truncated;
3509 }
3510 }
3511 analysis.search_text = search_text.clone();
3512
3513 let uri = if let Some(ref explicit) = args.uri {
3514 derive_uri(Some(explicit), None)
3515 } else if args.video {
3516 derive_video_uri(&payload, source_path, &analysis.mime)
3517 } else {
3518 derive_uri(None, source_path)
3519 };
3520
3521 let mut options_builder = PutOptions::builder()
3522 .enable_embedding(enable_embedding)
3523 .auto_tag(!args.no_auto_tag)
3524 .extract_dates(!args.no_extract_dates)
3525 .extract_triplets(!args.no_extract_triplets);
3526 if let Some(ts) = args.timestamp {
3527 options_builder = options_builder.timestamp(ts);
3528 }
3529 if let Some(track) = &args.track {
3530 options_builder = options_builder.track(track.clone());
3531 }
3532 if args.video {
3533 options_builder = options_builder.kind("video");
3534 options_builder = options_builder.tag("kind", "video");
3535 options_builder = options_builder.push_tag("video");
3536 } else if let Some(kind) = &args.kind {
3537 options_builder = options_builder.kind(kind.clone());
3538 }
3539 options_builder = options_builder.uri(uri.clone());
3540 if let Some(ref title) = inferred_title {
3541 options_builder = options_builder.title(title.clone());
3542 }
3543 for (key, value) in &args.tags {
3544 options_builder = options_builder.tag(key.clone(), value.clone());
3545 if !key.is_empty() {
3546 options_builder = options_builder.push_tag(key.clone());
3547 }
3548 if !value.is_empty() && value != key {
3549 options_builder = options_builder.push_tag(value.clone());
3550 }
3551 }
3552 for label in &args.labels {
3553 options_builder = options_builder.label(label.clone());
3554 }
3555 if let Some(metadata) = analysis.metadata.clone() {
3556 if !metadata.is_empty() {
3557 options_builder = options_builder.metadata(metadata);
3558 }
3559 }
3560 for (idx, entry) in args.metadata.iter().enumerate() {
3561 match serde_json::from_str::<DocMetadata>(entry) {
3562 Ok(meta) => {
3563 options_builder = options_builder.metadata(meta);
3564 }
3565 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3566 Ok(value) => {
3567 options_builder =
3568 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3569 }
3570 Err(err) => {
3571 warn!("failed to parse --metadata JSON: {err}");
3572 }
3573 },
3574 }
3575 }
3576 if let Some(text) = search_text.clone() {
3577 if !text.trim().is_empty() {
3578 options_builder = options_builder.search_text(text);
3579 }
3580 }
3581 let mut options = options_builder.build();
3582
3583 let existing_frame = if args.update_existing || !args.allow_duplicate {
3584 match mem.frame_by_uri(&uri) {
3585 Ok(frame) => Some(frame),
3586 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3587 Err(err) => return Err(err.into()),
3588 }
3589 } else {
3590 None
3591 };
3592
3593 let frame_id = if let Some(ref existing) = existing_frame {
3596 existing.id
3597 } else {
3598 mem.stats()?.frame_count
3599 };
3600
3601 let mut embedded = false;
3602 let allow_embedding = enable_embedding && !args.video;
3603 if enable_embedding && !allow_embedding && args.video {
3604 warn!("semantic embeddings are not generated for video payloads");
3605 }
3606 let seq = if let Some(existing) = existing_frame {
3607 if args.update_existing {
3608 let payload_for_update = payload.clone();
3609 if allow_embedding {
3610 if let Some(path) = args.embedding_vec.as_deref() {
3611 let embedding = crate::utils::read_embedding(path)?;
3612 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3613 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3614 let expected = choice.dimensions();
3615 if expected != 0 && embedding.len() != expected {
3616 anyhow::bail!(
3617 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3618 embedding.len(),
3619 choice.name(),
3620 expected
3621 );
3622 }
3623 apply_embedding_identity_metadata_from_choice(
3624 &mut options,
3625 choice,
3626 embedding.len(),
3627 Some(model_str),
3628 );
3629 } else {
3630 options.extra_metadata.insert(
3631 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3632 embedding.len().to_string(),
3633 );
3634 }
3635 embedded = true;
3636 mem.update_frame(
3637 existing.id,
3638 Some(payload_for_update),
3639 options.clone(),
3640 Some(embedding),
3641 )
3642 .map_err(|err| {
3643 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3644 })?
3645 } else {
3646 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3647 let embed_text = search_text
3648 .clone()
3649 .or_else(|| payload_text.map(|text| text.to_string()))
3650 .unwrap_or_default();
3651 if embed_text.trim().is_empty() {
3652 warn!("no textual content available; embedding skipped");
3653 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3654 .map_err(|err| {
3655 map_put_error(
3656 err,
3657 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3658 )
3659 })?
3660 } else {
3661 let truncated_text = truncate_for_embedding(&embed_text);
3663 if truncated_text.len() < embed_text.len() {
3664 info!(
3665 "Truncated text from {} to {} chars for embedding",
3666 embed_text.len(),
3667 truncated_text.len()
3668 );
3669 }
3670 let embedding = runtime.embed_passage(&truncated_text)?;
3671 embedded = true;
3672 apply_embedding_identity_metadata(&mut options, runtime);
3673 mem.update_frame(
3674 existing.id,
3675 Some(payload_for_update),
3676 options.clone(),
3677 Some(embedding),
3678 )
3679 .map_err(|err| {
3680 map_put_error(
3681 err,
3682 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3683 )
3684 })?
3685 }
3686 }
3687 } else {
3688 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3689 .map_err(|err| {
3690 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3691 })?
3692 }
3693 } else {
3694 return Err(DuplicateUriError::new(uri.as_str()).into());
3695 }
3696 } else {
3697 if let Some(guard) = capacity_guard.as_ref() {
3698 guard.ensure_capacity(stored_bytes as u64)?;
3699 }
3700 if parallel_mode {
3701 #[cfg(feature = "parallel_segments")]
3702 {
3703 let mut parent_embedding = None;
3704 let mut chunk_embeddings_vec = None;
3705 if allow_embedding {
3706 if let Some(path) = args.embedding_vec.as_deref() {
3707 let embedding = crate::utils::read_embedding(path)?;
3708 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3709 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3710 let expected = choice.dimensions();
3711 if expected != 0 && embedding.len() != expected {
3712 anyhow::bail!(
3713 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3714 embedding.len(),
3715 choice.name(),
3716 expected
3717 );
3718 }
3719 apply_embedding_identity_metadata_from_choice(
3720 &mut options,
3721 choice,
3722 embedding.len(),
3723 Some(model_str),
3724 );
3725 } else {
3726 options.extra_metadata.insert(
3727 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3728 embedding.len().to_string(),
3729 );
3730 }
3731 parent_embedding = Some(embedding);
3732 embedded = true;
3733 } else {
3734 let runtime =
3735 runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3736 let embed_text = search_text
3737 .clone()
3738 .or_else(|| payload_text.map(|text| text.to_string()))
3739 .unwrap_or_default();
3740 if embed_text.trim().is_empty() {
3741 warn!("no textual content available; embedding skipped");
3742 } else {
3743 info!(
3745 "parallel mode: checking for chunks on {} bytes payload",
3746 payload.len()
3747 );
3748 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3749 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3751
3752 #[cfg(feature = "temporal_enrich")]
3754 let enriched_chunks = if args.temporal_enrich {
3755 info!(
3756 "Temporal enrichment enabled, processing {} chunks",
3757 chunk_texts.len()
3758 );
3759 let today = chrono::Local::now().date_naive();
3761 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3762 let enriched: Vec<String> =
3764 results.iter().map(|(text, _)| text.clone()).collect();
3765 let resolved_count = results
3766 .iter()
3767 .filter(|(_, e)| {
3768 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3769 })
3770 .count();
3771 info!(
3772 "Temporal enrichment: resolved {} chunks with temporal context",
3773 resolved_count
3774 );
3775 enriched
3776 } else {
3777 chunk_texts.clone()
3778 };
3779 #[cfg(not(feature = "temporal_enrich"))]
3780 let enriched_chunks = {
3781 if args.temporal_enrich {
3782 warn!(
3783 "Temporal enrichment requested but feature not compiled in"
3784 );
3785 }
3786 chunk_texts.clone()
3787 };
3788
3789 let embed_chunks = if args.contextual {
3791 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3792 let engine = if args.contextual_model.as_deref() == Some("local") {
3793 #[cfg(feature = "llama-cpp")]
3794 {
3795 let model_path = get_local_contextual_model_path()?;
3796 ContextualEngine::local(model_path)
3797 }
3798 #[cfg(not(feature = "llama-cpp"))]
3799 {
3800 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3801 }
3802 } else if let Some(model) = &args.contextual_model {
3803 ContextualEngine::openai_with_model(model)?
3804 } else {
3805 ContextualEngine::openai()?
3806 };
3807
3808 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3809 {
3810 Ok(contexts) => {
3811 info!("Generated {} contextual prefixes", contexts.len());
3812 apply_contextual_prefixes(
3813 &embed_text,
3814 &enriched_chunks,
3815 &contexts,
3816 )
3817 }
3818 Err(e) => {
3819 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3820 enriched_chunks.clone()
3821 }
3822 }
3823 } else {
3824 enriched_chunks
3825 };
3826
3827 let truncated_chunks: Vec<String> = embed_chunks
3828 .iter()
3829 .map(|chunk_text| {
3830 let truncated_chunk = truncate_for_embedding(chunk_text);
3832 if truncated_chunk.len() < chunk_text.len() {
3833 info!(
3834 "parallel mode: Truncated chunk from {} to {} chars for embedding",
3835 chunk_text.len(),
3836 truncated_chunk.len()
3837 );
3838 }
3839 truncated_chunk
3840 })
3841 .collect();
3842 let truncated_refs: Vec<&str> =
3843 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3844 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3845 let num_chunks = chunk_embeddings.len();
3846 chunk_embeddings_vec = Some(chunk_embeddings);
3847 let parent_text = truncate_for_embedding(&embed_text);
3849 if parent_text.len() < embed_text.len() {
3850 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3851 }
3852 parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3853 embedded = true;
3854 info!(
3855 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3856 num_chunks
3857 );
3858 } else {
3859 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3861 let truncated_text = truncate_for_embedding(&embed_text);
3862 if truncated_text.len() < embed_text.len() {
3863 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3864 }
3865 parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3866 embedded = true;
3867 }
3868 }
3869 }
3870 }
3871 if embedded {
3872 if let Some(runtime) = runtime {
3873 apply_embedding_identity_metadata(&mut options, runtime);
3874 }
3875 }
3876 let payload_variant = if let Some(path) = source_path {
3877 ParallelPayload::Path(path.to_path_buf())
3878 } else {
3879 ParallelPayload::Bytes(payload)
3880 };
3881 let input = ParallelInput {
3882 payload: payload_variant,
3883 options: options.clone(),
3884 embedding: parent_embedding,
3885 chunk_embeddings: chunk_embeddings_vec,
3886 };
3887 return Ok(IngestOutcome {
3888 report: PutReport {
3889 seq: 0,
3890 frame_id: 0, uri,
3892 title: inferred_title,
3893 original_bytes,
3894 stored_bytes,
3895 compressed,
3896 source: source_path.map(Path::to_path_buf),
3897 mime: Some(analysis.mime),
3898 metadata: analysis.metadata,
3899 extraction: analysis.extraction,
3900 #[cfg(feature = "logic_mesh")]
3901 search_text: search_text.clone(),
3902 },
3903 embedded,
3904 parallel_input: Some(input),
3905 });
3906 }
3907 #[cfg(not(feature = "parallel_segments"))]
3908 {
3909 mem.put_bytes_with_options(&payload, options)
3910 .map_err(|err| {
3911 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3912 })?
3913 }
3914 } else if allow_embedding {
3915 if let Some(path) = args.embedding_vec.as_deref() {
3916 let embedding = crate::utils::read_embedding(path)?;
3917 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3918 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3919 let expected = choice.dimensions();
3920 if expected != 0 && embedding.len() != expected {
3921 anyhow::bail!(
3922 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3923 embedding.len(),
3924 choice.name(),
3925 expected
3926 );
3927 }
3928 apply_embedding_identity_metadata_from_choice(
3929 &mut options,
3930 choice,
3931 embedding.len(),
3932 Some(model_str),
3933 );
3934 } else {
3935 options.extra_metadata.insert(
3936 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3937 embedding.len().to_string(),
3938 );
3939 }
3940 embedded = true;
3941 mem.put_with_embedding_and_options(&payload, embedding, options)
3942 .map_err(|err| {
3943 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3944 })?
3945 } else {
3946 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3947 let embed_text = search_text
3948 .clone()
3949 .or_else(|| payload_text.map(|text| text.to_string()))
3950 .unwrap_or_default();
3951 if embed_text.trim().is_empty() {
3952 warn!("no textual content available; embedding skipped");
3953 mem.put_bytes_with_options(&payload, options)
3954 .map_err(|err| {
3955 map_put_error(
3956 err,
3957 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3958 )
3959 })?
3960 } else {
3961 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3963 info!(
3965 "Document will be chunked into {} chunks, generating embeddings for each",
3966 chunk_texts.len()
3967 );
3968
3969 #[cfg(feature = "temporal_enrich")]
3971 let enriched_chunks = if args.temporal_enrich {
3972 info!(
3973 "Temporal enrichment enabled, processing {} chunks",
3974 chunk_texts.len()
3975 );
3976 let today = chrono::Local::now().date_naive();
3978 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3979 let enriched: Vec<String> =
3981 results.iter().map(|(text, _)| text.clone()).collect();
3982 let resolved_count = results
3983 .iter()
3984 .filter(|(_, e)| {
3985 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3986 })
3987 .count();
3988 info!(
3989 "Temporal enrichment: resolved {} chunks with temporal context",
3990 resolved_count
3991 );
3992 enriched
3993 } else {
3994 chunk_texts.clone()
3995 };
3996 #[cfg(not(feature = "temporal_enrich"))]
3997 let enriched_chunks = {
3998 if args.temporal_enrich {
3999 warn!("Temporal enrichment requested but feature not compiled in");
4000 }
4001 chunk_texts.clone()
4002 };
4003
4004 let embed_chunks = if args.contextual {
4006 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4007 let engine = if args.contextual_model.as_deref() == Some("local") {
4008 #[cfg(feature = "llama-cpp")]
4009 {
4010 let model_path = get_local_contextual_model_path()?;
4011 ContextualEngine::local(model_path)
4012 }
4013 #[cfg(not(feature = "llama-cpp"))]
4014 {
4015 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4016 }
4017 } else if let Some(model) = &args.contextual_model {
4018 ContextualEngine::openai_with_model(model)?
4019 } else {
4020 ContextualEngine::openai()?
4021 };
4022
4023 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4024 Ok(contexts) => {
4025 info!("Generated {} contextual prefixes", contexts.len());
4026 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4027 }
4028 Err(e) => {
4029 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4030 enriched_chunks.clone()
4031 }
4032 }
4033 } else {
4034 enriched_chunks
4035 };
4036
4037 let truncated_chunks: Vec<String> = embed_chunks
4038 .iter()
4039 .map(|chunk_text| {
4040 let truncated_chunk = truncate_for_embedding(chunk_text);
4042 if truncated_chunk.len() < chunk_text.len() {
4043 info!(
4044 "Truncated chunk from {} to {} chars for embedding",
4045 chunk_text.len(),
4046 truncated_chunk.len()
4047 );
4048 }
4049 truncated_chunk
4050 })
4051 .collect();
4052 let truncated_refs: Vec<&str> =
4053 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4054 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4055 let parent_text = truncate_for_embedding(&embed_text);
4057 if parent_text.len() < embed_text.len() {
4058 info!(
4059 "Truncated parent text from {} to {} chars for embedding",
4060 embed_text.len(),
4061 parent_text.len()
4062 );
4063 }
4064 let parent_embedding = runtime.embed_passage(&parent_text)?;
4065 embedded = true;
4066 apply_embedding_identity_metadata(&mut options, runtime);
4067 info!(
4068 "Calling put_with_chunk_embeddings with {} chunk embeddings",
4069 chunk_embeddings.len()
4070 );
4071 mem.put_with_chunk_embeddings(
4072 &payload,
4073 Some(parent_embedding),
4074 chunk_embeddings,
4075 options,
4076 )
4077 .map_err(|err| {
4078 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4079 })?
4080 } else {
4081 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4083 let truncated_text = truncate_for_embedding(&embed_text);
4084 if truncated_text.len() < embed_text.len() {
4085 info!(
4086 "Truncated text from {} to {} chars for embedding",
4087 embed_text.len(),
4088 truncated_text.len()
4089 );
4090 }
4091 let embedding = runtime.embed_passage(&truncated_text)?;
4092 embedded = true;
4093 apply_embedding_identity_metadata(&mut options, runtime);
4094 mem.put_with_embedding_and_options(&payload, embedding, options)
4095 .map_err(|err| {
4096 map_put_error(
4097 err,
4098 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4099 )
4100 })?
4101 }
4102 }
4103 }
4104 } else {
4105 mem.put_bytes_with_options(&payload, options)
4106 .map_err(|err| {
4107 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4108 })?
4109 }
4110 };
4111
4112 Ok(IngestOutcome {
4113 report: PutReport {
4114 seq,
4115 frame_id,
4116 uri,
4117 title: inferred_title,
4118 original_bytes,
4119 stored_bytes,
4120 compressed,
4121 source: source_path.map(Path::to_path_buf),
4122 mime: Some(analysis.mime),
4123 metadata: analysis.metadata,
4124 extraction: analysis.extraction,
4125 #[cfg(feature = "logic_mesh")]
4126 search_text,
4127 },
4128 embedded,
4129 #[cfg(feature = "parallel_segments")]
4130 parallel_input: None,
4131 })
4132}
4133
4134fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4135 if std::str::from_utf8(payload).is_ok() {
4136 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4137 Ok((compressed.len(), true))
4138 } else {
4139 Ok((payload.len(), false))
4140 }
4141}
4142
4143fn print_report(report: &PutReport) {
4144 let name = report
4145 .source
4146 .as_ref()
4147 .map(|path| {
4148 let pretty = env::current_dir()
4149 .ok()
4150 .as_ref()
4151 .and_then(|cwd| diff_paths(path, cwd))
4152 .unwrap_or_else(|| path.to_path_buf());
4153 pretty.to_string_lossy().into_owned()
4154 })
4155 .unwrap_or_else(|| "stdin".to_string());
4156
4157 let ratio = if report.original_bytes > 0 {
4158 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4159 } else {
4160 100.0
4161 };
4162
4163 println!("• {name} → {}", report.uri);
4164 println!(" seq: {}", report.seq);
4165 if report.compressed {
4166 println!(
4167 " size: {} B → {} B ({:.1}% of original, compressed)",
4168 report.original_bytes, report.stored_bytes, ratio
4169 );
4170 } else {
4171 println!(" size: {} B (stored as-is)", report.original_bytes);
4172 }
4173 if let Some(mime) = &report.mime {
4174 println!(" mime: {mime}");
4175 }
4176 if let Some(title) = &report.title {
4177 println!(" title: {title}");
4178 }
4179 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4180 println!(
4181 " extraction: status={} reader={}",
4182 report.extraction.status.label(),
4183 extraction_reader
4184 );
4185 if let Some(ms) = report.extraction.duration_ms {
4186 println!(" extraction-duration: {} ms", ms);
4187 }
4188 if let Some(pages) = report.extraction.pages_processed {
4189 println!(" extraction-pages: {}", pages);
4190 }
4191 for warning in &report.extraction.warnings {
4192 println!(" warning: {warning}");
4193 }
4194 if let Some(metadata) = &report.metadata {
4195 if let Some(caption) = &metadata.caption {
4196 println!(" caption: {caption}");
4197 }
4198 if let Some(audio) = metadata.audio.as_ref() {
4199 if audio.duration_secs.is_some()
4200 || audio.sample_rate_hz.is_some()
4201 || audio.channels.is_some()
4202 {
4203 let duration = audio
4204 .duration_secs
4205 .map(|secs| format!("{secs:.1}s"))
4206 .unwrap_or_else(|| "unknown".into());
4207 let rate = audio
4208 .sample_rate_hz
4209 .map(|hz| format!("{} Hz", hz))
4210 .unwrap_or_else(|| "? Hz".into());
4211 let channels = audio
4212 .channels
4213 .map(|ch| ch.to_string())
4214 .unwrap_or_else(|| "?".into());
4215 println!(" audio: duration {duration}, {channels} ch, {rate}");
4216 }
4217 }
4218 }
4219
4220 println!();
4221}
4222
4223fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4224 let source = report
4225 .source
4226 .as_ref()
4227 .map(|path| path.to_string_lossy().into_owned());
4228 let extraction = json!({
4229 "status": report.extraction.status.label(),
4230 "reader": report.extraction.reader,
4231 "duration_ms": report.extraction.duration_ms,
4232 "pages_processed": report.extraction.pages_processed,
4233 "warnings": report.extraction.warnings,
4234 });
4235 json!({
4236 "seq": report.seq,
4237 "uri": report.uri,
4238 "title": report.title,
4239 "original_bytes": report.original_bytes,
4240 "original_bytes_human": format_bytes(report.original_bytes as u64),
4241 "stored_bytes": report.stored_bytes,
4242 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4243 "compressed": report.compressed,
4244 "mime": report.mime,
4245 "source": source,
4246 "metadata": report.metadata,
4247 "extraction": extraction,
4248 })
4249}
4250
4251fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4252 match err {
4253 MemvidError::CapacityExceeded {
4254 current,
4255 limit,
4256 required,
4257 } => anyhow!(CapacityExceededMessage {
4258 current,
4259 limit,
4260 required,
4261 }),
4262 other => other.into(),
4263 }
4264}
4265
4266fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4267 for (idx, entry) in entries.iter().enumerate() {
4268 match serde_json::from_str::<DocMetadata>(entry) {
4269 Ok(meta) => {
4270 options.metadata = Some(meta);
4271 }
4272 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4273 Ok(value) => {
4274 options
4275 .extra_metadata
4276 .insert(format!("custom_metadata_{idx}"), value.to_string());
4277 }
4278 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4279 },
4280 }
4281 }
4282}
4283
4284fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4285 let stats = mem.stats()?;
4286 let message = match stats.tier {
4287 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4288 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4289 };
4290 Ok(message)
4291}
4292
4293fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4294 let trimmed = raw.trim().trim_start_matches("mv2://");
4295 let normalized = trimmed.replace('\\', "/");
4296 let mut segments: Vec<String> = Vec::new();
4297 for segment in normalized.split('/') {
4298 if segment.is_empty() || segment == "." {
4299 continue;
4300 }
4301 if segment == ".." {
4302 segments.pop();
4303 continue;
4304 }
4305 segments.push(segment.to_string());
4306 }
4307
4308 if segments.is_empty() {
4309 return normalized
4310 .split('/')
4311 .last()
4312 .filter(|s| !s.is_empty())
4313 .unwrap_or("document")
4314 .to_string();
4315 }
4316
4317 if keep_path {
4318 segments.join("/")
4319 } else {
4320 segments
4321 .last()
4322 .cloned()
4323 .unwrap_or_else(|| "document".to_string())
4324 }
4325}
4326
4327fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4328 if quiet {
4329 let pb = ProgressBar::hidden();
4330 pb.set_message(message.to_string());
4331 return pb;
4332 }
4333
4334 match total {
4335 Some(len) => {
4336 let pb = ProgressBar::new(len);
4337 pb.set_draw_target(ProgressDrawTarget::stderr());
4338 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4339 .unwrap_or_else(|_| ProgressStyle::default_bar());
4340 pb.set_style(style);
4341 pb.set_message(message.to_string());
4342 pb
4343 }
4344 None => {
4345 let pb = ProgressBar::new_spinner();
4346 pb.set_draw_target(ProgressDrawTarget::stderr());
4347 pb.set_message(message.to_string());
4348 pb.enable_steady_tick(Duration::from_millis(120));
4349 pb
4350 }
4351 }
4352}
4353
4354fn create_spinner(message: &str) -> ProgressBar {
4355 let pb = ProgressBar::new_spinner();
4356 pb.set_draw_target(ProgressDrawTarget::stderr());
4357 let style = ProgressStyle::with_template("{spinner} {msg}")
4358 .unwrap_or_else(|_| ProgressStyle::default_spinner())
4359 .tick_strings(&["-", "\\", "|", "/"]);
4360 pb.set_style(style);
4361 pb.set_message(message.to_string());
4362 pb.enable_steady_tick(Duration::from_millis(120));
4363 pb
4364}
4365
4366#[cfg(feature = "parallel_segments")]
4372#[derive(Args)]
4373pub struct PutManyArgs {
4374 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4376 pub file: PathBuf,
4377 #[arg(long)]
4379 pub json: bool,
4380 #[arg(long, value_name = "PATH")]
4383 pub input: Option<PathBuf>,
4384 #[arg(long, default_value = "3")]
4386 pub compression_level: i32,
4387 #[command(flatten)]
4388 pub lock: LockCliArgs,
4389}
4390
4391#[cfg(feature = "parallel_segments")]
4393#[derive(Debug, serde::Deserialize)]
4394pub struct PutManyRequest {
4395 pub title: String,
4396 #[serde(default)]
4397 pub label: String,
4398 pub text: String,
4399 #[serde(default)]
4400 pub uri: Option<String>,
4401 #[serde(default)]
4402 pub tags: Vec<String>,
4403 #[serde(default)]
4404 pub labels: Vec<String>,
4405 #[serde(default)]
4406 pub metadata: serde_json::Value,
4407 #[serde(default)]
4410 pub extra_metadata: BTreeMap<String, String>,
4411 #[serde(default)]
4413 pub embedding: Option<Vec<f32>>,
4414 #[serde(default)]
4418 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4419}
4420
4421#[cfg(feature = "parallel_segments")]
4423#[derive(Debug, serde::Deserialize)]
4424#[serde(transparent)]
4425pub struct PutManyBatch {
4426 pub requests: Vec<PutManyRequest>,
4427}
4428
4429#[cfg(feature = "parallel_segments")]
4430pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4431 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4432 use std::io::Read;
4433
4434 crate::utils::require_active_plan(config, "put-many")?;
4436
4437 let mut mem = Memvid::open(&args.file)?;
4438 crate::utils::ensure_cli_mutation_allowed(&mem)?;
4439 crate::utils::apply_lock_cli(&mut mem, &args.lock);
4440
4441 let stats = mem.stats()?;
4443 if !stats.has_lex_index {
4444 mem.enable_lex()?;
4445 }
4446 if !stats.has_vec_index {
4447 mem.enable_vec()?;
4448 }
4449
4450 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4452
4453 let batch_json: String = if let Some(input_path) = &args.input {
4455 fs::read_to_string(input_path)
4456 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4457 } else {
4458 let mut buffer = String::new();
4459 io::stdin()
4460 .read_to_string(&mut buffer)
4461 .context("Failed to read from stdin")?;
4462 buffer
4463 };
4464
4465 let batch: PutManyBatch =
4466 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4467
4468 if batch.requests.is_empty() {
4469 if args.json {
4470 println!("{{\"frame_ids\": [], \"count\": 0}}");
4471 } else {
4472 println!("No requests to process");
4473 }
4474 return Ok(());
4475 }
4476
4477 let count = batch.requests.len();
4478 if !args.json {
4479 eprintln!("Processing {} documents...", count);
4480 }
4481
4482 let inputs: Vec<ParallelInput> = batch
4484 .requests
4485 .into_iter()
4486 .enumerate()
4487 .map(|(i, req)| {
4488 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4489 let mut options = PutOptions::default();
4490 options.title = Some(req.title);
4491 options.uri = Some(uri);
4492 options.tags = req.tags;
4493 options.labels = req.labels;
4494 options.extra_metadata = req.extra_metadata;
4495 if !req.metadata.is_null() {
4496 match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4497 Ok(meta) => options.metadata = Some(meta),
4498 Err(_) => {
4499 options
4500 .extra_metadata
4501 .entry("memvid.metadata.json".to_string())
4502 .or_insert_with(|| req.metadata.to_string());
4503 }
4504 }
4505 }
4506
4507 if let Some(embedding) = req
4508 .embedding
4509 .as_ref()
4510 .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4511 {
4512 options
4513 .extra_metadata
4514 .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4515 .or_insert_with(|| embedding.len().to_string());
4516 }
4517
4518 ParallelInput {
4519 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4520 options,
4521 embedding: req.embedding,
4522 chunk_embeddings: req.chunk_embeddings,
4523 }
4524 })
4525 .collect();
4526
4527 let build_opts = BuildOpts {
4529 zstd_level: args.compression_level.clamp(1, 9),
4530 ..BuildOpts::default()
4531 };
4532
4533 let frame_ids = mem
4535 .put_parallel_inputs(&inputs, build_opts)
4536 .context("Failed to ingest batch")?;
4537
4538 if args.json {
4539 let output = serde_json::json!({
4540 "frame_ids": frame_ids,
4541 "count": frame_ids.len()
4542 });
4543 println!("{}", serde_json::to_string(&output)?);
4544 } else {
4545 println!("Ingested {} documents", frame_ids.len());
4546 if frame_ids.len() <= 10 {
4547 for fid in &frame_ids {
4548 println!(" frame_id: {}", fid);
4549 }
4550 } else {
4551 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4552 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4553 }
4554 }
4555
4556 Ok(())
4557}