1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34 MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60fn truncate_for_embedding(text: &str) -> String {
63 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64 text.to_string()
65 } else {
66 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68 let end = truncated
70 .char_indices()
71 .rev()
72 .next()
73 .map(|(i, c)| i + c.len_utf8())
74 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75 text[..end].to_string()
76 }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80 options.extra_metadata.insert(
81 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82 runtime.provider_kind().to_string(),
83 );
84 options.extra_metadata.insert(
85 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86 runtime.provider_model_id(),
87 );
88 options.extra_metadata.insert(
89 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90 runtime.dimension().to_string(),
91 );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95 options: &mut PutOptions,
96 model: EmbeddingModelChoice,
97 dimension: usize,
98 model_id_override: Option<&str>,
99) {
100 let provider = match model {
101 EmbeddingModelChoice::OpenAILarge
102 | EmbeddingModelChoice::OpenAISmall
103 | EmbeddingModelChoice::OpenAIAda => "openai",
104 EmbeddingModelChoice::Nvidia => "nvidia",
105 _ => "fastembed",
106 };
107
108 let model_id = match model {
109 EmbeddingModelChoice::Nvidia => model_id_override
110 .map(str::trim)
111 .filter(|value| !value.is_empty())
112 .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113 .filter(|value| !value.is_empty())
114 .map(|value| {
115 if value.contains('/') {
116 value.to_string()
117 } else {
118 format!("nvidia/{value}")
119 }
120 })
121 .unwrap_or_else(|| model.canonical_model_id().to_string()),
122 _ => model.canonical_model_id().to_string(),
123 };
124
125 options.extra_metadata.insert(
126 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127 provider.to_string(),
128 );
129 options.extra_metadata.insert(
130 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131 model_id,
132 );
133 options.extra_metadata.insert(
134 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135 dimension.to_string(),
136 );
137}
138
139#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144 let expanded = if custom.starts_with("~/") {
146 if let Ok(home) = env::var("HOME") {
147 PathBuf::from(home).join(&custom[2..])
148 } else {
149 PathBuf::from(&custom)
150 }
151 } else {
152 PathBuf::from(&custom)
153 };
154 expanded
155 } else {
156 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158 PathBuf::from(home).join(".memvid").join("models")
159 };
160
161 let model_path = models_dir
162 .join("llm")
163 .join("phi-3-mini-q4")
164 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166 if model_path.exists() {
167 Ok(model_path)
168 } else {
169 Err(anyhow!(
170 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171 model_path.display()
172 ))
173 }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185 select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190fn parse_bool_flag(value: &str) -> Option<bool> {
192 match value.trim().to_ascii_lowercase().as_str() {
193 "1" | "true" | "yes" | "on" => Some(true),
194 "0" | "false" | "no" | "off" => Some(false),
195 _ => None,
196 }
197}
198
199#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202 std::env::var("MEMVID_PARALLEL_SEGMENTS")
203 .ok()
204 .and_then(|value| parse_bool_flag(&value))
205}
206
207#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211 PathBuf::from(custom)
212 } else if let Ok(home) = env::var("HOME") {
213 PathBuf::from(home).join(".memvid/models")
214 } else {
215 PathBuf::from(".memvid/models")
216 };
217
218 let model_name =
219 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224 vision_model.exists() && text_model.exists()
225}
226
227#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249 entities: Vec<memvid_core::ExtractedEntity>,
250 original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252 use memvid_core::ExtractedEntity;
253
254 if entities.is_empty() {
255 return entities;
256 }
257
258 let mut sorted: Vec<ExtractedEntity> = entities;
260 sorted.sort_by_key(|e| e.byte_start);
261
262 let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264 for entity in sorted {
265 if let Some(last) = merged.last_mut() {
266 let gap = entity.byte_start.saturating_sub(last.byte_end);
268 let same_type = last.entity_type == entity.entity_type;
269
270 let gap_is_mergeable = if gap == 0 {
272 true
273 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274 let gap_text = &original_text[last.byte_end..entity.byte_start];
276 if gap_text.contains('\n') || gap_text.contains('\r') {
278 false
279 } else {
280 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281 }
282 } else {
283 false
284 };
285
286 if same_type && gap_is_mergeable {
287 let merged_text = if entity.byte_end <= original_text.len() {
289 original_text[last.byte_start..entity.byte_end].to_string()
290 } else {
291 format!("{}{}", last.text, entity.text)
292 };
293
294 tracing::debug!(
295 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296 last.text, entity.text, merged_text, gap
297 );
298
299 last.text = merged_text;
300 last.byte_end = entity.byte_end;
301 last.confidence = (last.confidence + entity.confidence) / 2.0;
303 continue;
304 }
305 }
306
307 merged.push(entity);
308 }
309
310 merged
311}
312
313#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317 if confidence < NER_MIN_CONFIDENCE {
319 return false;
320 }
321
322 let trimmed = text.trim();
323
324 if trimmed.len() < NER_MIN_ENTITY_LEN {
326 return false;
327 }
328
329 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331 return false;
332 }
333
334 if trimmed.contains('\n') || trimmed.contains('\r') {
336 return false;
337 }
338
339 if trimmed.starts_with("##") || trimmed.ends_with("##") {
341 return false;
342 }
343
344 if trimmed.ends_with('.') {
346 return false;
347 }
348
349 let lower = trimmed.to_lowercase();
350
351 if trimmed.len() == 1 {
353 return false;
354 }
355
356 if trimmed.len() == 2 {
358 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360 return true;
361 }
362 return false;
364 }
365
366 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368 return false;
369 }
370
371 let words: Vec<&str> = trimmed.split_whitespace().collect();
374 if words.len() >= 2 {
375 let first_word = words[0];
376 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380 return false;
381 }
382 }
383 }
384
385 if let Some(first_char) = trimmed.chars().next() {
387 if first_char.is_lowercase() {
388 return false;
389 }
390 }
391
392 true
393}
394
395pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397 let (key, value) = s
398 .split_once('=')
399 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400 Ok((key.to_string(), value.to_string()))
401}
402
403#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408 pub lock_timeout: u64,
409 #[arg(long = "force", action = ArgAction::SetTrue)]
411 pub force: bool,
412}
413
414#[derive(Args)]
416pub struct PutArgs {
417 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419 pub file: PathBuf,
420 #[arg(long)]
422 pub json: bool,
423 #[arg(long, value_name = "PATH")]
425 pub input: Option<String>,
426 #[arg(long, value_name = "URI")]
428 pub uri: Option<String>,
429 #[arg(long, value_name = "TITLE")]
431 pub title: Option<String>,
432 #[arg(long)]
434 pub timestamp: Option<i64>,
435 #[arg(long)]
437 pub track: Option<String>,
438 #[arg(long)]
440 pub kind: Option<String>,
441 #[arg(long, action = ArgAction::SetTrue)]
443 pub video: bool,
444 #[arg(long, action = ArgAction::SetTrue)]
446 pub transcript: bool,
447 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449 pub tags: Vec<(String, String)>,
450 #[arg(long = "label", value_name = "LABEL")]
452 pub labels: Vec<String>,
453 #[arg(long = "metadata", value_name = "JSON")]
455 pub metadata: Vec<String>,
456 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458 pub no_auto_tag: bool,
459 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461 pub no_extract_dates: bool,
462 #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
464 pub no_extract_triplets: bool,
465 #[arg(long, action = ArgAction::SetTrue)]
467 pub audio: bool,
468 #[arg(long, action = ArgAction::SetTrue)]
471 pub transcribe: bool,
472 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
474 pub audio_segment_seconds: Option<u32>,
475 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
478 pub embedding: bool,
479 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
481 pub no_embedding: bool,
482 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
485 pub embedding_vec: Option<PathBuf>,
486 #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
494 pub embedding_vec_model: Option<String>,
495 #[arg(long, action = ArgAction::SetTrue)]
499 pub vector_compression: bool,
500 #[arg(long, action = ArgAction::SetTrue)]
504 pub contextual: bool,
505 #[arg(long = "contextual-model", value_name = "MODEL")]
507 pub contextual_model: Option<String>,
508 #[arg(long, action = ArgAction::SetTrue)]
511 pub tables: bool,
512 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
514 pub no_tables: bool,
515 #[cfg(feature = "clip")]
519 #[arg(long, action = ArgAction::SetTrue)]
520 pub clip: bool,
521
522 #[cfg(feature = "clip")]
524 #[arg(long, action = ArgAction::SetTrue)]
525 pub no_clip: bool,
526
527 #[arg(long, conflicts_with = "allow_duplicate")]
529 pub update_existing: bool,
530 #[arg(long)]
532 pub allow_duplicate: bool,
533
534 #[arg(long, action = ArgAction::SetTrue)]
538 pub temporal_enrich: bool,
539
540 #[arg(long = "memory-id", value_name = "ID")]
543 pub memory_id: Option<crate::commands::tickets::MemoryId>,
544
545 #[arg(long, action = ArgAction::SetTrue)]
550 pub logic_mesh: bool,
551
552 #[cfg(feature = "parallel_segments")]
554 #[arg(long, action = ArgAction::SetTrue)]
555 pub parallel_segments: bool,
556 #[cfg(feature = "parallel_segments")]
558 #[arg(long, action = ArgAction::SetTrue)]
559 pub no_parallel_segments: bool,
560 #[cfg(feature = "parallel_segments")]
562 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
563 pub parallel_segment_tokens: Option<usize>,
564 #[cfg(feature = "parallel_segments")]
566 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
567 pub parallel_segment_pages: Option<usize>,
568 #[cfg(feature = "parallel_segments")]
570 #[arg(long = "parallel-threads", value_name = "N")]
571 pub parallel_threads: Option<usize>,
572 #[cfg(feature = "parallel_segments")]
574 #[arg(long = "parallel-queue-depth", value_name = "N")]
575 pub parallel_queue_depth: Option<usize>,
576
577 #[command(flatten)]
578 pub lock: LockCliArgs,
579}
580
581#[cfg(feature = "parallel_segments")]
582impl PutArgs {
583 pub fn wants_parallel(&self) -> bool {
584 if self.parallel_segments {
585 return true;
586 }
587 if self.no_parallel_segments {
588 return false;
589 }
590 if let Some(env_flag) = parallel_env_override() {
591 return env_flag;
592 }
593 false
594 }
595
596 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
597 let mut opts = memvid_core::BuildOpts::default();
598 if let Some(tokens) = self.parallel_segment_tokens {
599 opts.segment_tokens = tokens;
600 }
601 if let Some(pages) = self.parallel_segment_pages {
602 opts.segment_pages = pages;
603 }
604 if let Some(threads) = self.parallel_threads {
605 opts.threads = threads;
606 }
607 if let Some(depth) = self.parallel_queue_depth {
608 opts.queue_depth = depth;
609 }
610 opts.sanitize();
611 opts
612 }
613}
614
615#[cfg(not(feature = "parallel_segments"))]
616impl PutArgs {
617 pub fn wants_parallel(&self) -> bool {
618 false
619 }
620}
621
622#[derive(Args)]
624pub struct ApiFetchArgs {
625 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
627 pub file: PathBuf,
628 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
630 pub config: PathBuf,
631 #[arg(long, action = ArgAction::SetTrue)]
633 pub dry_run: bool,
634 #[arg(long, value_name = "MODE")]
636 pub mode: Option<ApiFetchMode>,
637 #[arg(long, value_name = "URI")]
639 pub uri: Option<String>,
640 #[arg(long, action = ArgAction::SetTrue)]
642 pub json: bool,
643
644 #[command(flatten)]
645 pub lock: LockCliArgs,
646}
647
648#[derive(Args)]
650pub struct UpdateArgs {
651 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
652 pub file: PathBuf,
653 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
654 pub frame_id: Option<u64>,
655 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
656 pub uri: Option<String>,
657 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
658 pub input: Option<PathBuf>,
659 #[arg(long = "set-uri", value_name = "URI")]
660 pub set_uri: Option<String>,
661 #[arg(long, value_name = "TITLE")]
662 pub title: Option<String>,
663 #[arg(long, value_name = "TIMESTAMP")]
664 pub timestamp: Option<i64>,
665 #[arg(long, value_name = "TRACK")]
666 pub track: Option<String>,
667 #[arg(long, value_name = "KIND")]
668 pub kind: Option<String>,
669 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
670 pub tags: Vec<(String, String)>,
671 #[arg(long = "label", value_name = "LABEL")]
672 pub labels: Vec<String>,
673 #[arg(long = "metadata", value_name = "JSON")]
674 pub metadata: Vec<String>,
675 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
676 pub embeddings: bool,
677 #[arg(long)]
678 pub json: bool,
679
680 #[command(flatten)]
681 pub lock: LockCliArgs,
682}
683
684#[derive(Args)]
686pub struct DeleteArgs {
687 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
688 pub file: PathBuf,
689 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
690 pub frame_id: Option<u64>,
691 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
692 pub uri: Option<String>,
693 #[arg(long, action = ArgAction::SetTrue)]
694 pub yes: bool,
695 #[arg(long)]
696 pub json: bool,
697
698 #[command(flatten)]
699 pub lock: LockCliArgs,
700}
701
702pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
704 let command = ApiFetchCommand {
705 file: args.file,
706 config_path: args.config,
707 dry_run: args.dry_run,
708 mode_override: args.mode,
709 uri_override: args.uri,
710 output_json: args.json,
711 lock_timeout_ms: args.lock.lock_timeout,
712 force_lock: args.lock.force,
713 };
714 run_api_fetch(config, command)
715}
716
717pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
719 let mut mem = Memvid::open(&args.file)?;
720 ensure_cli_mutation_allowed(&mem)?;
721 apply_lock_cli(&mut mem, &args.lock);
722 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
723
724 if !args.yes {
725 if let Some(uri) = &frame.uri {
726 println!("About to delete frame {} ({uri})", frame.id);
727 } else {
728 println!("About to delete frame {}", frame.id);
729 }
730 print!("Confirm? [y/N]: ");
731 io::stdout().flush()?;
732 let mut input = String::new();
733 io::stdin().read_line(&mut input)?;
734 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
735 if !confirmed {
736 println!("Aborted");
737 return Ok(());
738 }
739 }
740
741 let seq = mem.delete_frame(frame.id)?;
742 mem.commit()?;
743 let deleted = mem.frame_by_id(frame.id)?;
744
745 if args.json {
746 let json = json!({
747 "frame_id": frame.id,
748 "sequence": seq,
749 "status": frame_status_str(deleted.status),
750 });
751 println!("{}", serde_json::to_string_pretty(&json)?);
752 } else {
753 println!("Deleted frame {} (seq {})", frame.id, seq);
754 }
755
756 Ok(())
757}
758pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
759 let mut mem = Memvid::open(&args.file)?;
760 ensure_cli_mutation_allowed(&mem)?;
761 apply_lock_cli(&mut mem, &args.lock);
762
763 if let Some(memory_id) = &args.memory_id {
765 let needs_binding = match mem.get_memory_binding() {
766 Some(binding) => binding.memory_id != **memory_id,
767 None => true,
768 };
769 if needs_binding {
770 match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
771 Ok((bound_id, capacity)) => {
772 eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
773 }
774 Err(e) => {
775 eprintln!("⚠️ Failed to bind to dashboard memory: {}", e);
776 eprintln!(" Continuing with existing capacity. Bind manually with:");
777 eprintln!(" memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
778 }
779 }
780 }
781 }
782
783 #[cfg(feature = "replay")]
785 let _ = mem.load_active_session();
786 let mut stats = mem.stats()?;
787 if stats.frame_count == 0 && !stats.has_lex_index {
788 mem.enable_lex()?;
789 stats = mem.stats()?;
790 }
791
792 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
795
796 if args.vector_compression {
798 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
799 }
800
801 let mut capacity_guard = CapacityGuard::from_stats(&stats);
802 let use_parallel = args.wants_parallel();
803 #[cfg(feature = "parallel_segments")]
804 let mut parallel_opts = if use_parallel {
805 Some(args.sanitized_parallel_opts())
806 } else {
807 None
808 };
809 #[cfg(feature = "parallel_segments")]
810 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
811 #[cfg(feature = "parallel_segments")]
812 let mut pending_parallel_indices: Vec<usize> = Vec::new();
813 let input_set = resolve_inputs(args.input.as_deref())?;
814 if args.embedding && args.embedding_vec.is_some() {
815 anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
816 }
817 if args.embedding_vec.is_some() {
818 match &input_set {
819 InputSet::Files(paths) if paths.len() != 1 => {
820 anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
821 }
822 _ => {}
823 }
824 }
825
826 if args.video {
827 if let Some(kind) = &args.kind {
828 if !kind.eq_ignore_ascii_case("video") {
829 anyhow::bail!("--video conflicts with explicit --kind={kind}");
830 }
831 }
832 if matches!(input_set, InputSet::Stdin) {
833 anyhow::bail!("--video requires --input <file>");
834 }
835 }
836
837 #[allow(dead_code)]
838 enum EmbeddingMode {
839 None,
840 Auto,
841 Explicit,
842 }
843
844 let mv2_dimension = mem.effective_vec_index_dimension()?;
848 let inferred_embedding_model = if args.embedding {
849 match mem.embedding_identity_summary(10_000) {
850 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
851 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
852 let models: Vec<_> = identities
853 .iter()
854 .filter_map(|entry| entry.identity.model.as_deref())
855 .collect();
856 anyhow::bail!(
857 "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
858 Detected models: {:?}\n\n\
859 Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
860 models
861 );
862 }
863 memvid_core::EmbeddingIdentitySummary::Unknown => None,
864 }
865 } else {
866 None
867 };
868 let (embedding_mode, embedding_runtime) = if args.embedding {
869 (
870 EmbeddingMode::Explicit,
871 Some(load_embedding_runtime_for_mv2(
872 config,
873 inferred_embedding_model.as_deref(),
874 mv2_dimension,
875 )?),
876 )
877 } else if args.embedding_vec.is_some() {
878 (EmbeddingMode::Explicit, None)
879 } else {
880 (EmbeddingMode::None, None)
881 };
882 let embedding_enabled = args.embedding || args.embedding_vec.is_some();
883 let runtime_ref = embedding_runtime.as_ref();
884
885 #[cfg(feature = "clip")]
890 let clip_enabled = {
891 if args.no_clip {
892 false
893 } else if args.clip {
894 true } else {
896 let model_available = is_clip_model_installed();
898 if model_available {
899 match &input_set {
901 InputSet::Files(paths) => paths.iter().any(|p| {
902 is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
903 }),
904 InputSet::Stdin => false,
905 }
906 } else {
907 false
908 }
909 }
910 };
911 #[cfg(feature = "clip")]
912 let clip_model = if clip_enabled {
913 mem.enable_clip()?;
914 if !args.json {
915 let mode = if args.clip {
916 "explicit"
917 } else {
918 "auto-detected"
919 };
920 eprintln!("ℹ️ CLIP visual embeddings enabled ({mode})");
921 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
922 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
923 eprintln!(" Use --no-clip to disable auto-detection");
924 eprintln!();
925 }
926 match memvid_core::ClipModel::default_model() {
928 Ok(model) => Some(model),
929 Err(e) => {
930 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
931 None
932 }
933 }
934 } else {
935 None
936 };
937
938 if embedding_enabled && !args.json {
940 let capacity = stats.capacity_bytes;
941 if args.vector_compression {
942 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
945 eprintln!(" Storage: ~20 KB per document (16x compressed)");
946 eprintln!(" Accuracy: ~95% recall@10 maintained");
947 eprintln!(
948 " Capacity: ~{} documents max for {:.1} GB",
949 max_docs,
950 capacity as f64 / 1e9
951 );
952 eprintln!();
953 } else {
954 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
956 eprintln!(" Storage: ~270 KB per document");
957 eprintln!(
958 " Capacity: ~{} documents max for {:.1} GB",
959 max_docs,
960 capacity as f64 / 1e9
961 );
962 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
963 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
964 eprintln!();
965 }
966 }
967
968 let embed_progress = if embedding_enabled {
969 let total = match &input_set {
970 InputSet::Files(paths) => Some(paths.len() as u64),
971 InputSet::Stdin => None,
972 };
973 Some(create_task_progress_bar(total, "embed", false))
974 } else {
975 None
976 };
977 let mut embedded_docs = 0usize;
978
979 if let InputSet::Files(paths) = &input_set {
980 if paths.len() > 1 {
981 if args.uri.is_some() || args.title.is_some() {
982 anyhow::bail!(
983 "--uri/--title apply to a single file; omit them when using a directory or glob"
984 );
985 }
986 }
987 }
988
989 let mut reports = Vec::new();
990 let mut processed = 0usize;
991 let mut skipped = 0usize;
992 let mut bytes_added = 0u64;
993 let mut summary_warnings: Vec<String> = Vec::new();
994 #[cfg(feature = "clip")]
995 let mut clip_embeddings_added = false;
996
997 let mut capacity_reached = false;
998 match input_set {
999 InputSet::Stdin => {
1000 processed += 1;
1001 match read_payload(None) {
1002 Ok(payload) => {
1003 let mut analyze_spinner = if args.json {
1004 None
1005 } else {
1006 Some(create_spinner("Analyzing stdin payload..."))
1007 };
1008 match ingest_payload(
1009 &mut mem,
1010 &args,
1011 config,
1012 payload,
1013 None,
1014 capacity_guard.as_mut(),
1015 embedding_enabled,
1016 runtime_ref,
1017 use_parallel,
1018 ) {
1019 Ok(outcome) => {
1020 if let Some(pb) = analyze_spinner.take() {
1021 pb.finish_and_clear();
1022 }
1023 bytes_added += outcome.report.stored_bytes as u64;
1024 if args.json {
1025 summary_warnings
1026 .extend(outcome.report.extraction.warnings.iter().cloned());
1027 }
1028 reports.push(outcome.report);
1029 let report_index = reports.len() - 1;
1030 if !args.json && !use_parallel {
1031 if let Some(report) = reports.get(report_index) {
1032 print_report(report);
1033 }
1034 }
1035 if args.transcript {
1036 let notice = transcript_notice_message(&mut mem)?;
1037 if args.json {
1038 summary_warnings.push(notice);
1039 } else {
1040 println!("{notice}");
1041 }
1042 }
1043 if outcome.embedded {
1044 if let Some(pb) = embed_progress.as_ref() {
1045 pb.inc(1);
1046 }
1047 embedded_docs += 1;
1048 }
1049 if use_parallel {
1050 #[cfg(feature = "parallel_segments")]
1051 if let Some(input) = outcome.parallel_input {
1052 pending_parallel_indices.push(report_index);
1053 pending_parallel_inputs.push(input);
1054 }
1055 } else if let Some(guard) = capacity_guard.as_mut() {
1056 mem.commit()?;
1057 let stats = mem.stats()?;
1058 guard.update_after_commit(&stats);
1059 guard.check_and_warn_capacity(); }
1061 }
1062 Err(err) => {
1063 if let Some(pb) = analyze_spinner.take() {
1064 pb.finish_and_clear();
1065 }
1066 if err.downcast_ref::<DuplicateUriError>().is_some() {
1067 return Err(err);
1068 }
1069 warn!("skipped stdin payload: {err}");
1070 skipped += 1;
1071 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1072 capacity_reached = true;
1073 }
1074 }
1075 }
1076 }
1077 Err(err) => {
1078 warn!("failed to read stdin: {err}");
1079 skipped += 1;
1080 }
1081 }
1082 }
1083 InputSet::Files(ref paths) => {
1084 let mut transcript_notice_printed = false;
1085 for path in paths {
1086 processed += 1;
1087 match read_payload(Some(&path)) {
1088 Ok(payload) => {
1089 let label = path.display().to_string();
1090 let mut analyze_spinner = if args.json {
1091 None
1092 } else {
1093 Some(create_spinner(&format!("Analyzing {label}...")))
1094 };
1095 match ingest_payload(
1096 &mut mem,
1097 &args,
1098 config,
1099 payload,
1100 Some(&path),
1101 capacity_guard.as_mut(),
1102 embedding_enabled,
1103 runtime_ref,
1104 use_parallel,
1105 ) {
1106 Ok(outcome) => {
1107 if let Some(pb) = analyze_spinner.take() {
1108 pb.finish_and_clear();
1109 }
1110 bytes_added += outcome.report.stored_bytes as u64;
1111
1112 #[cfg(feature = "clip")]
1114 if let Some(ref model) = clip_model {
1115 let mime = outcome.report.mime.as_deref();
1116 let clip_frame_id = outcome.report.frame_id;
1117
1118 if is_image_file(&path) {
1119 match model.encode_image_file(&path) {
1120 Ok(embedding) => {
1121 if let Err(e) = mem.add_clip_embedding_with_page(
1122 clip_frame_id,
1123 None,
1124 embedding,
1125 ) {
1126 warn!(
1127 "Failed to add CLIP embedding for {}: {e}",
1128 path.display()
1129 );
1130 } else {
1131 info!(
1132 "Added CLIP embedding for frame {}",
1133 clip_frame_id
1134 );
1135 clip_embeddings_added = true;
1136 }
1137 }
1138 Err(e) => {
1139 warn!(
1140 "Failed to encode CLIP embedding for {}: {e}",
1141 path.display()
1142 );
1143 }
1144 }
1145 } else if matches!(mime, Some(m) if m == "application/pdf") {
1146 if let Err(e) = mem.commit() {
1148 warn!("Failed to commit before CLIP extraction: {e}");
1149 }
1150
1151 match render_pdf_pages_for_clip(
1152 &path,
1153 CLIP_PDF_MAX_IMAGES,
1154 CLIP_PDF_TARGET_PX,
1155 ) {
1156 Ok(rendered_pages) => {
1157 use rayon::prelude::*;
1158
1159 let path_for_closure = path.clone();
1162
1163 let prev_hook = std::panic::take_hook();
1166 std::panic::set_hook(Box::new(|_| {
1167 }));
1169
1170 let encoded_images: Vec<_> = rendered_pages
1171 .into_par_iter()
1172 .filter_map(|(page_num, image)| {
1173 let image_info = get_image_info(&image);
1175 if !image_info.should_embed() {
1176 info!(
1177 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1178 path_for_closure.display(),
1179 page_num,
1180 image_info.color_variance,
1181 image_info.width,
1182 image_info.height
1183 );
1184 return None;
1185 }
1186
1187 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1191 let rgb_image = image.to_rgb8();
1192 let mut png_bytes = Vec::new();
1193 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1194 &mut Cursor::new(&mut png_bytes),
1195 image::ImageFormat::Png,
1196 ).map(|()| png_bytes)
1197 }));
1198
1199 match encode_result {
1200 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1201 Ok(Err(e)) => {
1202 info!(
1203 "Skipping image for {} page {}: {e}",
1204 path_for_closure.display(),
1205 page_num
1206 );
1207 None
1208 }
1209 Err(_) => {
1210 info!(
1212 "Skipping malformed image for {} page {}",
1213 path_for_closure.display(),
1214 page_num
1215 );
1216 None
1217 }
1218 }
1219 })
1220 .collect();
1221
1222 std::panic::set_hook(prev_hook);
1224
1225 let mut child_frame_offset = 1u64;
1228 let parent_uri = outcome.report.uri.clone();
1229 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1230 let total_images = encoded_images.len();
1231
1232 let clip_pb = if !args.json && total_images > 0 {
1234 let pb = ProgressBar::new(total_images as u64);
1235 pb.set_style(
1236 ProgressStyle::with_template(
1237 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1238 )
1239 .unwrap_or_else(|_| ProgressStyle::default_bar())
1240 .progress_chars("█▓░"),
1241 );
1242 Some(pb)
1243 } else {
1244 None
1245 };
1246
1247 for (page_num, image, png_bytes) in encoded_images {
1248 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1249 let child_title = format!(
1250 "{} - Image {} (page {})",
1251 parent_title,
1252 child_frame_offset,
1253 page_num
1254 );
1255
1256 let child_options = PutOptions::builder()
1257 .uri(&child_uri)
1258 .title(&child_title)
1259 .parent_id(clip_frame_id)
1260 .role(FrameRole::ExtractedImage)
1261 .auto_tag(false)
1262 .extract_dates(false)
1263 .build();
1264
1265 match mem.put_bytes_with_options(&png_bytes, child_options) {
1266 Ok(_child_seq) => {
1267 let child_frame_id = clip_frame_id + child_frame_offset;
1268 child_frame_offset += 1;
1269
1270 match model.encode_image(&image) {
1271 Ok(embedding) => {
1272 if let Err(e) = mem
1273 .add_clip_embedding_with_page(
1274 child_frame_id,
1275 Some(page_num),
1276 embedding,
1277 )
1278 {
1279 warn!(
1280 "Failed to add CLIP embedding for {} page {}: {e}",
1281 path.display(),
1282 page_num
1283 );
1284 } else {
1285 info!(
1286 "Added CLIP image frame {} for {} page {}",
1287 child_frame_id,
1288 clip_frame_id,
1289 page_num
1290 );
1291 clip_embeddings_added = true;
1292 }
1293 }
1294 Err(e) => warn!(
1295 "Failed to encode CLIP embedding for {} page {}: {e}",
1296 path.display(),
1297 page_num
1298 ),
1299 }
1300
1301 if let Err(e) = mem.commit() {
1303 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1304 }
1305 }
1306 Err(e) => warn!(
1307 "Failed to store image frame for {} page {}: {e}",
1308 path.display(),
1309 page_num
1310 ),
1311 }
1312
1313 if let Some(ref pb) = clip_pb {
1314 pb.inc(1);
1315 }
1316 }
1317
1318 if let Some(pb) = clip_pb {
1319 pb.finish_and_clear();
1320 }
1321 }
1322 Err(err) => warn!(
1323 "Failed to render PDF pages for CLIP {}: {err}",
1324 path.display()
1325 ),
1326 }
1327 }
1328 }
1329
1330 if args.json {
1331 summary_warnings
1332 .extend(outcome.report.extraction.warnings.iter().cloned());
1333 }
1334 reports.push(outcome.report);
1335 let report_index = reports.len() - 1;
1336 if !args.json && !use_parallel {
1337 if let Some(report) = reports.get(report_index) {
1338 print_report(report);
1339 }
1340 }
1341 if args.transcript && !transcript_notice_printed {
1342 let notice = transcript_notice_message(&mut mem)?;
1343 if args.json {
1344 summary_warnings.push(notice);
1345 } else {
1346 println!("{notice}");
1347 }
1348 transcript_notice_printed = true;
1349 }
1350 if outcome.embedded {
1351 if let Some(pb) = embed_progress.as_ref() {
1352 pb.inc(1);
1353 }
1354 embedded_docs += 1;
1355 }
1356 if use_parallel {
1357 #[cfg(feature = "parallel_segments")]
1358 if let Some(input) = outcome.parallel_input {
1359 pending_parallel_indices.push(report_index);
1360 pending_parallel_inputs.push(input);
1361 }
1362 } else if let Some(guard) = capacity_guard.as_mut() {
1363 mem.commit()?;
1364 let stats = mem.stats()?;
1365 guard.update_after_commit(&stats);
1366 guard.check_and_warn_capacity(); }
1368 }
1369 Err(err) => {
1370 if let Some(pb) = analyze_spinner.take() {
1371 pb.finish_and_clear();
1372 }
1373 if err.downcast_ref::<DuplicateUriError>().is_some() {
1374 return Err(err);
1375 }
1376 warn!("skipped {}: {err}", path.display());
1377 skipped += 1;
1378 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1379 capacity_reached = true;
1380 }
1381 }
1382 }
1383 }
1384 Err(err) => {
1385 warn!("failed to read {}: {err}", path.display());
1386 skipped += 1;
1387 }
1388 }
1389 if capacity_reached {
1390 break;
1391 }
1392 }
1393 }
1394 }
1395
1396 if capacity_reached {
1397 warn!("capacity limit reached; remaining inputs were not processed");
1398 }
1399
1400 if let Some(pb) = embed_progress.as_ref() {
1401 pb.finish_with_message("embed");
1402 }
1403
1404 if let Some(runtime) = embedding_runtime.as_ref() {
1405 if embedded_docs > 0 {
1406 match embedding_mode {
1407 EmbeddingMode::Explicit => println!(
1408 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1409 embedded_docs,
1410 runtime.dimension()
1411 ),
1412 EmbeddingMode::Auto => println!(
1413 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1414 embedded_docs,
1415 runtime.dimension()
1416 ),
1417 EmbeddingMode::None => {}
1418 }
1419 } else {
1420 match embedding_mode {
1421 EmbeddingMode::Explicit => {
1422 println!("No embeddings were generated (no textual content available)");
1423 }
1424 EmbeddingMode::Auto => {
1425 println!(
1426 "Semantic runtime available, but no textual content produced embeddings"
1427 );
1428 }
1429 EmbeddingMode::None => {}
1430 }
1431 }
1432 }
1433
1434 if reports.is_empty() {
1435 if args.json {
1436 if capacity_reached {
1437 summary_warnings.push("capacity_limit_reached".to_string());
1438 }
1439 let summary_json = json!({
1440 "processed": processed,
1441 "ingested": 0,
1442 "skipped": skipped,
1443 "bytes_added": bytes_added,
1444 "bytes_added_human": format_bytes(bytes_added),
1445 "embedded_documents": embedded_docs,
1446 "capacity_reached": capacity_reached,
1447 "warnings": summary_warnings,
1448 "reports": Vec::<serde_json::Value>::new(),
1449 });
1450 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1451 } else {
1452 println!(
1453 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1454 processed, skipped
1455 );
1456 }
1457 return Ok(());
1458 }
1459
1460 #[cfg(feature = "parallel_segments")]
1461 let mut parallel_flush_spinner = if use_parallel && !args.json {
1462 Some(create_spinner("Flushing parallel segments..."))
1463 } else {
1464 None
1465 };
1466
1467 if use_parallel {
1468 #[cfg(feature = "parallel_segments")]
1469 {
1470 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1471 let mut opts = BuildOpts::default();
1472 opts.sanitize();
1473 opts
1474 });
1475 opts.vec_compression = mem.vector_compression().clone();
1477 let seqs = if pending_parallel_inputs.is_empty() {
1478 mem.commit_parallel(opts)?;
1479 Vec::new()
1480 } else {
1481 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1482 };
1483 if let Some(pb) = parallel_flush_spinner.take() {
1484 pb.finish_with_message("Flushed parallel segments");
1485 }
1486 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1487 if let Some(report) = reports.get_mut(idx) {
1488 report.seq = seq;
1489 }
1490 }
1491 }
1492 #[cfg(not(feature = "parallel_segments"))]
1493 {
1494 mem.commit()?;
1495 }
1496 } else {
1497 mem.commit()?;
1498 }
1499
1500 #[cfg(feature = "clip")]
1502 if clip_embeddings_added {
1503 mem.commit()?;
1504 }
1505
1506 if !args.json && use_parallel {
1507 for report in &reports {
1508 print_report(report);
1509 }
1510 }
1511
1512 let mut tables_extracted = 0usize;
1514 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1515 if args.tables && !args.no_tables {
1516 if let InputSet::Files(ref paths) = input_set {
1517 let pdf_paths: Vec<_> = paths
1518 .iter()
1519 .filter(|p| {
1520 p.extension()
1521 .and_then(|e| e.to_str())
1522 .map(|e| e.eq_ignore_ascii_case("pdf"))
1523 .unwrap_or(false)
1524 })
1525 .collect();
1526
1527 if !pdf_paths.is_empty() {
1528 let table_spinner = if args.json {
1529 None
1530 } else {
1531 Some(create_spinner(&format!(
1532 "Extracting tables from {} PDF(s)...",
1533 pdf_paths.len()
1534 )))
1535 };
1536
1537 let table_options = TableExtractionOptions::builder()
1539 .mode(memvid_core::table::ExtractionMode::Aggressive)
1540 .min_rows(2)
1541 .min_cols(2)
1542 .min_quality(memvid_core::table::TableQuality::Low)
1543 .merge_multi_page(true)
1544 .build();
1545
1546 for pdf_path in pdf_paths {
1547 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1548 let filename = pdf_path
1549 .file_name()
1550 .and_then(|s| s.to_str())
1551 .unwrap_or("unknown.pdf");
1552
1553 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1554 for table in &result.tables {
1555 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1556 {
1557 tables_extracted += 1;
1558 table_summaries.push(json!({
1559 "table_id": table.table_id,
1560 "source_file": table.source_file,
1561 "meta_frame_id": meta_id,
1562 "rows": table.n_rows,
1563 "cols": table.n_cols,
1564 "quality": format!("{:?}", table.quality),
1565 "pages": format!("{}-{}", table.page_start, table.page_end),
1566 }));
1567 }
1568 }
1569 }
1570 }
1571 }
1572
1573 if let Some(pb) = table_spinner {
1574 if tables_extracted > 0 {
1575 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1576 } else {
1577 pb.finish_with_message("No tables found");
1578 }
1579 }
1580
1581 if tables_extracted > 0 {
1583 mem.commit()?;
1584 }
1585 }
1586 }
1587 }
1588
1589 #[cfg(feature = "logic_mesh")]
1592 let mut logic_mesh_entities = 0usize;
1593 #[cfg(feature = "logic_mesh")]
1594 {
1595 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1596
1597 let models_dir = config.models_dir.clone();
1598 let model_path = ner_model_path(&models_dir);
1599 let tokenizer_path = ner_tokenizer_path(&models_dir);
1600
1601 let ner_available = model_path.exists() && tokenizer_path.exists();
1603 let should_run_ner = args.logic_mesh;
1604
1605 if should_run_ner && !ner_available {
1606 if args.json {
1608 summary_warnings.push(
1609 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1610 );
1611 } else {
1612 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1613 eprintln!(" Run: memvid models install --ner distilbert-ner");
1614 }
1615 } else if should_run_ner {
1616 let ner_spinner = if args.json {
1617 None
1618 } else {
1619 Some(create_spinner("Building Logic-Mesh entity graph..."))
1620 };
1621
1622 match NerModel::load(&model_path, &tokenizer_path, None) {
1623 Ok(mut ner_model) => {
1624 let mut mesh = LogicMesh::new();
1625 let mut filtered_count = 0usize;
1626
1627 for report in &reports {
1629 let frame_id = report.frame_id;
1630 if let Some(ref content) = report.search_text {
1632 if !content.is_empty() {
1633 match ner_model.extract(content) {
1634 Ok(raw_entities) => {
1635 let merged_entities = merge_adjacent_entities(raw_entities, content);
1638
1639 for entity in &merged_entities {
1640 if !is_valid_entity(&entity.text, entity.confidence) {
1642 tracing::debug!(
1643 "Filtered entity: '{}' (confidence: {:.0}%)",
1644 entity.text, entity.confidence * 100.0
1645 );
1646 filtered_count += 1;
1647 continue;
1648 }
1649
1650 let kind = entity.to_entity_kind();
1652 let node = MeshNode::new(
1654 entity.text.to_lowercase(),
1655 entity.text.trim().to_string(),
1656 kind,
1657 entity.confidence,
1658 frame_id,
1659 entity.byte_start as u32,
1660 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1661 );
1662 mesh.merge_node(node);
1663 logic_mesh_entities += 1;
1664 }
1665 }
1666 Err(e) => {
1667 warn!("NER extraction failed for frame {frame_id}: {e}");
1668 }
1669 }
1670 }
1671 }
1672 }
1673
1674 if logic_mesh_entities > 0 {
1675 mesh.finalize();
1676 let node_count = mesh.stats().node_count;
1677 mem.set_logic_mesh(mesh);
1679 mem.commit()?;
1680 info!(
1681 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1682 logic_mesh_entities,
1683 node_count,
1684 filtered_count
1685 );
1686 }
1687
1688 if let Some(pb) = ner_spinner {
1689 pb.finish_with_message(format!(
1690 "Logic-Mesh: {} entities ({} unique)",
1691 logic_mesh_entities,
1692 mem.mesh_node_count()
1693 ));
1694 }
1695 }
1696 Err(e) => {
1697 if let Some(pb) = ner_spinner {
1698 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1699 }
1700 if args.json {
1701 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1702 }
1703 }
1704 }
1705 }
1706 }
1707
1708 let stats = mem.stats()?;
1709 if args.json {
1710 if capacity_reached {
1711 summary_warnings.push("capacity_limit_reached".to_string());
1712 }
1713 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1714 let total_duration_ms: Option<u64> = {
1715 let sum: u64 = reports
1716 .iter()
1717 .filter_map(|r| r.extraction.duration_ms)
1718 .sum();
1719 if sum > 0 {
1720 Some(sum)
1721 } else {
1722 None
1723 }
1724 };
1725 let total_pages: Option<u32> = {
1726 let sum: u32 = reports
1727 .iter()
1728 .filter_map(|r| r.extraction.pages_processed)
1729 .sum();
1730 if sum > 0 {
1731 Some(sum)
1732 } else {
1733 None
1734 }
1735 };
1736 let embedding_mode_str = match embedding_mode {
1737 EmbeddingMode::None => "none",
1738 EmbeddingMode::Auto => "auto",
1739 EmbeddingMode::Explicit => "explicit",
1740 };
1741 let summary_json = json!({
1742 "processed": processed,
1743 "ingested": reports.len(),
1744 "skipped": skipped,
1745 "bytes_added": bytes_added,
1746 "bytes_added_human": format_bytes(bytes_added),
1747 "embedded_documents": embedded_docs,
1748 "embedding": {
1749 "enabled": embedding_enabled,
1750 "mode": embedding_mode_str,
1751 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1752 },
1753 "tables": {
1754 "enabled": args.tables && !args.no_tables,
1755 "extracted": tables_extracted,
1756 "tables": table_summaries,
1757 },
1758 "capacity_reached": capacity_reached,
1759 "warnings": summary_warnings,
1760 "extraction": {
1761 "total_duration_ms": total_duration_ms,
1762 "total_pages_processed": total_pages,
1763 },
1764 "memory": {
1765 "path": args.file.display().to_string(),
1766 "frame_count": stats.frame_count,
1767 "size_bytes": stats.size_bytes,
1768 "tier": format!("{:?}", stats.tier),
1769 },
1770 "reports": reports_json,
1771 });
1772 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1773 } else {
1774 println!(
1775 "Committed {} frame(s); total frames {}",
1776 reports.len(),
1777 stats.frame_count
1778 );
1779 println!(
1780 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1781 processed,
1782 reports.len(),
1783 skipped,
1784 format_bytes(bytes_added)
1785 );
1786 if tables_extracted > 0 {
1787 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1788 }
1789 }
1790
1791 #[cfg(feature = "replay")]
1793 let _ = mem.save_active_session();
1794
1795 Ok(())
1796}
1797
1798pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1799 let mut mem = Memvid::open(&args.file)?;
1800 ensure_cli_mutation_allowed(&mem)?;
1801 apply_lock_cli(&mut mem, &args.lock);
1802 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1803 let frame_id = existing.id;
1804
1805 let payload_bytes = match args.input.as_ref() {
1806 Some(path) => Some(read_payload(Some(path))?),
1807 None => None,
1808 };
1809 let payload_slice = payload_bytes.as_deref();
1810 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1811 let source_path = args.input.as_deref();
1812
1813 let mut options = PutOptions::default();
1814 options.enable_embedding = false;
1815 options.auto_tag = payload_slice.is_some();
1816 options.extract_dates = payload_slice.is_some();
1817 options.timestamp = Some(existing.timestamp);
1818 options.track = existing.track.clone();
1819 options.kind = existing.kind.clone();
1820 options.uri = existing.uri.clone();
1821 options.title = existing.title.clone();
1822 options.metadata = existing.metadata.clone();
1823 options.search_text = existing.search_text.clone();
1824 options.tags = existing.tags.clone();
1825 options.labels = existing.labels.clone();
1826 options.extra_metadata = existing.extra_metadata.clone();
1827
1828 if let Some(new_uri) = &args.set_uri {
1829 options.uri = Some(derive_uri(Some(new_uri), None));
1830 }
1831
1832 if options.uri.is_none() && payload_slice.is_some() {
1833 options.uri = Some(derive_uri(None, source_path));
1834 }
1835
1836 if let Some(title) = &args.title {
1837 options.title = Some(title.clone());
1838 }
1839 if let Some(ts) = args.timestamp {
1840 options.timestamp = Some(ts);
1841 }
1842 if let Some(track) = &args.track {
1843 options.track = Some(track.clone());
1844 }
1845 if let Some(kind) = &args.kind {
1846 options.kind = Some(kind.clone());
1847 }
1848
1849 if !args.labels.is_empty() {
1850 options.labels = args.labels.clone();
1851 }
1852
1853 if !args.tags.is_empty() {
1854 options.tags.clear();
1855 for (key, value) in &args.tags {
1856 if !key.is_empty() {
1857 options.tags.push(key.clone());
1858 }
1859 if !value.is_empty() && value != key {
1860 options.tags.push(value.clone());
1861 }
1862 options.extra_metadata.insert(key.clone(), value.clone());
1863 }
1864 }
1865
1866 apply_metadata_overrides(&mut options, &args.metadata);
1867
1868 let mut search_source = options.search_text.clone();
1869
1870 if let Some(payload) = payload_slice {
1871 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1872 if let Some(title) = inferred_title {
1873 options.title = Some(title);
1874 }
1875
1876 if options.uri.is_none() {
1877 options.uri = Some(derive_uri(None, source_path));
1878 }
1879
1880 let analysis = analyze_file(
1881 source_path,
1882 payload,
1883 payload_utf8,
1884 options.title.as_deref(),
1885 AudioAnalyzeOptions::default(),
1886 false,
1887 );
1888 if let Some(meta) = analysis.metadata {
1889 options.metadata = Some(meta);
1890 }
1891 if let Some(text) = analysis.search_text {
1892 search_source = Some(text.clone());
1893 options.search_text = Some(text);
1894 }
1895 } else {
1896 options.auto_tag = false;
1897 options.extract_dates = false;
1898 }
1899
1900 let stats = mem.stats()?;
1901 options.enable_embedding = stats.has_vec_index;
1902
1903 let mv2_dimension = mem.effective_vec_index_dimension()?;
1905 let mut embedding: Option<Vec<f32>> = None;
1906 if args.embeddings {
1907 let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1908 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1909 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1910 let models: Vec<_> = identities
1911 .iter()
1912 .filter_map(|entry| entry.identity.model.as_deref())
1913 .collect();
1914 anyhow::bail!(
1915 "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1916 Detected models: {:?}",
1917 models
1918 );
1919 }
1920 memvid_core::EmbeddingIdentitySummary::Unknown => None,
1921 };
1922
1923 let runtime = load_embedding_runtime_for_mv2(
1924 config,
1925 inferred_embedding_model.as_deref(),
1926 mv2_dimension,
1927 )?;
1928 if let Some(text) = search_source.as_ref() {
1929 if !text.trim().is_empty() {
1930 embedding = Some(runtime.embed_passage(text)?);
1931 }
1932 }
1933 if embedding.is_none() {
1934 if let Some(text) = payload_utf8 {
1935 if !text.trim().is_empty() {
1936 embedding = Some(runtime.embed_passage(text)?);
1937 }
1938 }
1939 }
1940 if embedding.is_none() {
1941 warn!("no textual content available; embeddings not recomputed");
1942 }
1943 if embedding.is_some() {
1944 apply_embedding_identity_metadata(&mut options, &runtime);
1945 }
1946 }
1947
1948 let mut effective_embedding = embedding;
1949 if effective_embedding.is_none() && stats.has_vec_index {
1950 effective_embedding = mem.frame_embedding(frame_id)?;
1951 }
1952
1953 let final_uri = options.uri.clone();
1954 let replaced_payload = payload_slice.is_some();
1955 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1956 mem.commit()?;
1957
1958 let updated_frame =
1959 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1960 mem.frame_by_uri(&uri)?
1961 } else {
1962 let mut query = TimelineQueryBuilder::default();
1963 if let Some(limit) = NonZeroU64::new(1) {
1964 query = query.limit(limit).reverse(true);
1965 }
1966 let latest = mem.timeline(query.build())?;
1967 if let Some(entry) = latest.first() {
1968 mem.frame_by_id(entry.frame_id)?
1969 } else {
1970 mem.frame_by_id(frame_id)?
1971 }
1972 };
1973
1974 if args.json {
1975 let json = json!({
1976 "sequence": seq,
1977 "previous_frame_id": frame_id,
1978 "frame": frame_to_json(&updated_frame),
1979 "replaced_payload": replaced_payload,
1980 });
1981 println!("{}", serde_json::to_string_pretty(&json)?);
1982 } else {
1983 println!(
1984 "Updated frame {} → new frame {} (seq {})",
1985 frame_id, updated_frame.id, seq
1986 );
1987 println!(
1988 "Payload: {}",
1989 if replaced_payload {
1990 "replaced"
1991 } else {
1992 "reused"
1993 }
1994 );
1995 print_frame_summary(&mut mem, &updated_frame)?;
1996 }
1997
1998 Ok(())
1999}
2000
2001fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2002 if let Some(uri) = provided {
2003 let sanitized = sanitize_uri(uri, true);
2004 return format!("mv2://{}", sanitized);
2005 }
2006
2007 if let Some(path) = source {
2008 let raw = path.to_string_lossy();
2009 let sanitized = sanitize_uri(&raw, false);
2010 return format!("mv2://{}", sanitized);
2011 }
2012
2013 format!("mv2://frames/{}", Uuid::new_v4())
2014}
2015
2016pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2017 let digest = hash(payload).to_hex();
2018 let short = &digest[..32];
2019 let ext_from_path = source
2020 .and_then(|path| path.extension())
2021 .and_then(|ext| ext.to_str())
2022 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2023 .filter(|ext| !ext.is_empty());
2024 let ext = ext_from_path
2025 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2026 .unwrap_or_else(|| "bin".to_string());
2027 format!("mv2://video/{short}.{ext}")
2028}
2029
2030fn derive_title(
2031 provided: Option<String>,
2032 source: Option<&Path>,
2033 payload_utf8: Option<&str>,
2034) -> Option<String> {
2035 if let Some(title) = provided {
2036 let trimmed = title.trim();
2037 if trimmed.is_empty() {
2038 None
2039 } else {
2040 Some(trimmed.to_string())
2041 }
2042 } else {
2043 if let Some(text) = payload_utf8 {
2044 if let Some(markdown_title) = extract_markdown_title(text) {
2045 return Some(markdown_title);
2046 }
2047 }
2048 source
2049 .and_then(|path| path.file_stem())
2050 .and_then(|stem| stem.to_str())
2051 .map(to_title_case)
2052 }
2053}
2054
2055fn extract_markdown_title(text: &str) -> Option<String> {
2056 for line in text.lines() {
2057 let trimmed = line.trim();
2058 if trimmed.starts_with('#') {
2059 let title = trimmed.trim_start_matches('#').trim();
2060 if !title.is_empty() {
2061 return Some(title.to_string());
2062 }
2063 }
2064 }
2065 None
2066}
2067
2068fn to_title_case(input: &str) -> String {
2069 if input.is_empty() {
2070 return String::new();
2071 }
2072 let mut chars = input.chars();
2073 let Some(first) = chars.next() else {
2074 return String::new();
2075 };
2076 let prefix = first.to_uppercase().collect::<String>();
2077 prefix + chars.as_str()
2078}
2079
2080fn should_skip_input(path: &Path) -> bool {
2081 matches!(
2082 path.file_name().and_then(|name| name.to_str()),
2083 Some(".DS_Store")
2084 )
2085}
2086
2087fn should_skip_dir(path: &Path) -> bool {
2088 matches!(
2089 path.file_name().and_then(|name| name.to_str()),
2090 Some(name) if name.starts_with('.')
2091 )
2092}
2093
2094enum InputSet {
2095 Stdin,
2096 Files(Vec<PathBuf>),
2097}
2098
2099#[cfg(feature = "clip")]
2101fn is_image_file(path: &std::path::Path) -> bool {
2102 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2103 return false;
2104 };
2105 matches!(
2106 ext.to_ascii_lowercase().as_str(),
2107 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2108 )
2109}
2110
2111fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2112 let Some(raw) = input else {
2113 return Ok(InputSet::Stdin);
2114 };
2115
2116 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2117 let mut files = Vec::new();
2118 let mut matched_any = false;
2119 for entry in glob(raw)? {
2120 let path = entry?;
2121 if path.is_file() {
2122 matched_any = true;
2123 if should_skip_input(&path) {
2124 continue;
2125 }
2126 files.push(path);
2127 }
2128 }
2129 files.sort();
2130 if files.is_empty() {
2131 if matched_any {
2132 anyhow::bail!(
2133 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2134 );
2135 }
2136 anyhow::bail!("pattern '{raw}' did not match any files");
2137 }
2138 return Ok(InputSet::Files(files));
2139 }
2140
2141 let path = PathBuf::from(raw);
2142 if path.is_dir() {
2143 let (mut files, skipped_any) = collect_directory_files(&path)?;
2144 files.sort();
2145 if files.is_empty() {
2146 if skipped_any {
2147 anyhow::bail!(
2148 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2149 path.display()
2150 );
2151 }
2152 anyhow::bail!(
2153 "directory '{}' contains no ingestible files",
2154 path.display()
2155 );
2156 }
2157 Ok(InputSet::Files(files))
2158 } else {
2159 Ok(InputSet::Files(vec![path]))
2160 }
2161}
2162
2163fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2164 let mut files = Vec::new();
2165 let mut skipped_any = false;
2166 let mut stack = vec![root.to_path_buf()];
2167 while let Some(dir) = stack.pop() {
2168 for entry in fs::read_dir(&dir)? {
2169 let entry = entry?;
2170 let path = entry.path();
2171 let file_type = entry.file_type()?;
2172 if file_type.is_dir() {
2173 if should_skip_dir(&path) {
2174 skipped_any = true;
2175 continue;
2176 }
2177 stack.push(path);
2178 } else if file_type.is_file() {
2179 if should_skip_input(&path) {
2180 skipped_any = true;
2181 continue;
2182 }
2183 files.push(path);
2184 }
2185 }
2186 }
2187 Ok((files, skipped_any))
2188}
2189
2190struct IngestOutcome {
2191 report: PutReport,
2192 embedded: bool,
2193 #[cfg(feature = "parallel_segments")]
2194 parallel_input: Option<ParallelInput>,
2195}
2196
2197struct PutReport {
2198 seq: u64,
2199 #[allow(dead_code)] frame_id: u64,
2201 uri: String,
2202 title: Option<String>,
2203 original_bytes: usize,
2204 stored_bytes: usize,
2205 compressed: bool,
2206 source: Option<PathBuf>,
2207 mime: Option<String>,
2208 metadata: Option<DocMetadata>,
2209 extraction: ExtractionSummary,
2210 #[cfg(feature = "logic_mesh")]
2212 search_text: Option<String>,
2213}
2214
2215struct CapacityGuard {
2216 #[allow(dead_code)]
2217 tier: Tier,
2218 capacity: u64,
2219 current_size: u64,
2220}
2221
2222impl CapacityGuard {
2223 const MIN_OVERHEAD: u64 = 128 * 1024;
2224 const RELATIVE_OVERHEAD: f64 = 0.05;
2225
2226 fn from_stats(stats: &Stats) -> Option<Self> {
2227 if stats.capacity_bytes == 0 {
2228 return None;
2229 }
2230 Some(Self {
2231 tier: stats.tier,
2232 capacity: stats.capacity_bytes,
2233 current_size: stats.size_bytes,
2234 })
2235 }
2236
2237 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2238 let projected = self
2239 .current_size
2240 .saturating_add(additional)
2241 .saturating_add(Self::estimate_overhead(additional));
2242 if projected > self.capacity {
2243 return Err(map_put_error(
2244 MemvidError::CapacityExceeded {
2245 current: self.current_size,
2246 limit: self.capacity,
2247 required: projected,
2248 },
2249 Some(self.capacity),
2250 ));
2251 }
2252 Ok(())
2253 }
2254
2255 fn update_after_commit(&mut self, stats: &Stats) {
2256 self.current_size = stats.size_bytes;
2257 }
2258
2259 fn capacity_hint(&self) -> Option<u64> {
2260 Some(self.capacity)
2261 }
2262
2263 fn check_and_warn_capacity(&self) {
2265 if self.capacity == 0 {
2266 return;
2267 }
2268
2269 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2270
2271 if usage_pct >= 90.0 && usage_pct < 91.0 {
2273 eprintln!(
2274 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2275 format_bytes(self.current_size),
2276 format_bytes(self.capacity)
2277 );
2278 eprintln!(" Actions:");
2279 eprintln!(" 1. Recreate with larger --size");
2280 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2281 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2282 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2283 eprintln!(
2284 "⚠️ WARNING: 75% capacity used ({} / {})",
2285 format_bytes(self.current_size),
2286 format_bytes(self.capacity)
2287 );
2288 eprintln!(" Consider planning for capacity expansion soon");
2289 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2290 eprintln!(
2291 "ℹ️ INFO: 50% capacity used ({} / {})",
2292 format_bytes(self.current_size),
2293 format_bytes(self.capacity)
2294 );
2295 }
2296 }
2297
2298 fn estimate_overhead(additional: u64) -> u64 {
2299 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2300 fractional.max(Self::MIN_OVERHEAD)
2301 }
2302}
2303
2304#[derive(Debug, Clone)]
2305pub struct FileAnalysis {
2306 pub mime: String,
2307 pub metadata: Option<DocMetadata>,
2308 pub search_text: Option<String>,
2309 pub extraction: ExtractionSummary,
2310}
2311
2312#[derive(Clone, Copy)]
2313pub struct AudioAnalyzeOptions {
2314 force: bool,
2315 segment_secs: u32,
2316 transcribe: bool,
2317}
2318
2319impl AudioAnalyzeOptions {
2320 const DEFAULT_SEGMENT_SECS: u32 = 30;
2321
2322 fn normalised_segment_secs(self) -> u32 {
2323 self.segment_secs.max(1)
2324 }
2325}
2326
2327impl Default for AudioAnalyzeOptions {
2328 fn default() -> Self {
2329 Self {
2330 force: false,
2331 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2332 transcribe: false,
2333 }
2334 }
2335}
2336
2337struct AudioAnalysis {
2338 metadata: DocAudioMetadata,
2339 caption: Option<String>,
2340 search_terms: Vec<String>,
2341 transcript: Option<String>,
2342}
2343
2344struct ImageAnalysis {
2345 width: u32,
2346 height: u32,
2347 palette: Vec<String>,
2348 caption: Option<String>,
2349 exif: Option<DocExifMetadata>,
2350}
2351
2352#[derive(Debug, Clone)]
2353pub struct ExtractionSummary {
2354 reader: Option<String>,
2355 status: ExtractionStatus,
2356 warnings: Vec<String>,
2357 duration_ms: Option<u64>,
2358 pages_processed: Option<u32>,
2359}
2360
2361impl ExtractionSummary {
2362 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2363 self.warnings.push(warning.into());
2364 }
2365}
2366
2367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2368enum ExtractionStatus {
2369 Skipped,
2370 Ok,
2371 FallbackUsed,
2372 Empty,
2373 Failed,
2374}
2375
2376impl ExtractionStatus {
2377 fn label(self) -> &'static str {
2378 match self {
2379 Self::Skipped => "skipped",
2380 Self::Ok => "ok",
2381 Self::FallbackUsed => "fallback_used",
2382 Self::Empty => "empty",
2383 Self::Failed => "failed",
2384 }
2385 }
2386}
2387
2388fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2389 let filename = source_path
2390 .and_then(|path| path.file_name())
2391 .and_then(|name| name.to_str())
2392 .map(|value| value.to_string());
2393 MediaManifest {
2394 kind: "video".to_string(),
2395 mime: mime.to_string(),
2396 bytes,
2397 filename,
2398 duration_ms: None,
2399 width: None,
2400 height: None,
2401 codec: None,
2402 }
2403}
2404
2405pub fn analyze_file(
2406 source_path: Option<&Path>,
2407 payload: &[u8],
2408 payload_utf8: Option<&str>,
2409 inferred_title: Option<&str>,
2410 audio_opts: AudioAnalyzeOptions,
2411 force_video: bool,
2412) -> FileAnalysis {
2413 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2414 let is_video = force_video || mime.starts_with("video/");
2415 let treat_as_text = if is_video { false } else { treat_as_text_base };
2416
2417 let mut metadata = DocMetadata::default();
2418 metadata.mime = Some(mime.clone());
2419 metadata.bytes = Some(payload.len() as u64);
2420 metadata.hash = Some(hash(payload).to_hex().to_string());
2421
2422 let mut search_text = None;
2423
2424 let mut extraction = ExtractionSummary {
2425 reader: None,
2426 status: ExtractionStatus::Skipped,
2427 warnings: Vec::new(),
2428 duration_ms: None,
2429 pages_processed: None,
2430 };
2431
2432 let reader_registry = default_reader_registry();
2433 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2434 if slice.is_empty() {
2435 None
2436 } else {
2437 Some(slice)
2438 }
2439 });
2440
2441 let apply_extracted_text = |text: &str,
2442 reader_label: &str,
2443 status_if_applied: ExtractionStatus,
2444 extraction: &mut ExtractionSummary,
2445 metadata: &mut DocMetadata,
2446 search_text: &mut Option<String>|
2447 -> bool {
2448 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2449 let mut value = normalized.text;
2450 if normalized.truncated {
2451 value.push('…');
2452 }
2453 if metadata.caption.is_none() {
2454 if let Some(caption) = caption_from_text(&value) {
2455 metadata.caption = Some(caption);
2456 }
2457 }
2458 *search_text = Some(value);
2459 extraction.reader = Some(reader_label.to_string());
2460 extraction.status = status_if_applied;
2461 true
2462 } else {
2463 false
2464 }
2465 };
2466
2467 if is_video {
2468 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2469 }
2470
2471 if treat_as_text {
2472 if let Some(text) = payload_utf8 {
2473 if !apply_extracted_text(
2474 text,
2475 "bytes",
2476 ExtractionStatus::Ok,
2477 &mut extraction,
2478 &mut metadata,
2479 &mut search_text,
2480 ) {
2481 extraction.status = ExtractionStatus::Empty;
2482 extraction.reader = Some("bytes".to_string());
2483 let msg = "text payload contained no searchable content after normalization";
2484 extraction.record_warning(msg);
2485 warn!("{msg}");
2486 }
2487 } else {
2488 extraction.reader = Some("bytes".to_string());
2489 extraction.status = ExtractionStatus::Failed;
2490 let msg = "payload reported as text but was not valid UTF-8";
2491 extraction.record_warning(msg);
2492 warn!("{msg}");
2493 }
2494 } else if mime == "application/pdf"
2495 || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2496 || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2497 || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2498 || mime == "application/vnd.ms-excel"
2499 || mime == "application/msword"
2500 {
2501 let mut applied = false;
2503
2504 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2505 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2506 .with_uri(source_path.and_then(|p| p.to_str()))
2507 .with_magic(magic);
2508
2509 if let Some(reader) = reader_registry.find_reader(&hint) {
2510 match reader.extract(payload, &hint) {
2511 Ok(output) => {
2512 extraction.reader = Some(output.reader_name.clone());
2513 if let Some(ms) = output.diagnostics.duration_ms {
2514 extraction.duration_ms = Some(ms);
2515 }
2516 if let Some(pages) = output.diagnostics.pages_processed {
2517 extraction.pages_processed = Some(pages);
2518 }
2519 if let Some(mime_type) = output.document.mime_type.clone() {
2520 metadata.mime = Some(mime_type);
2521 }
2522
2523 for warning in output.diagnostics.warnings.iter() {
2524 extraction.record_warning(warning);
2525 warn!("{warning}");
2526 }
2527
2528 let status = if output.diagnostics.fallback {
2529 ExtractionStatus::FallbackUsed
2530 } else {
2531 ExtractionStatus::Ok
2532 };
2533
2534 if let Some(doc_text) = output.document.text.as_ref() {
2535 applied = apply_extracted_text(
2536 doc_text,
2537 output.reader_name.as_str(),
2538 status,
2539 &mut extraction,
2540 &mut metadata,
2541 &mut search_text,
2542 );
2543 if !applied {
2544 extraction.reader = Some(output.reader_name);
2545 extraction.status = ExtractionStatus::Empty;
2546 let msg = "primary reader returned no usable text";
2547 extraction.record_warning(msg);
2548 warn!("{msg}");
2549 }
2550 } else {
2551 extraction.reader = Some(output.reader_name);
2552 extraction.status = ExtractionStatus::Empty;
2553 let msg = "primary reader returned empty text";
2554 extraction.record_warning(msg);
2555 warn!("{msg}");
2556 }
2557 }
2558 Err(err) => {
2559 let name = reader.name();
2560 extraction.reader = Some(name.to_string());
2561 extraction.status = ExtractionStatus::Failed;
2562 let msg = format!("primary reader {name} failed: {err}");
2563 extraction.record_warning(&msg);
2564 warn!("{msg}");
2565 }
2566 }
2567 } else {
2568 extraction.record_warning("no registered reader matched this PDF payload");
2569 warn!("no registered reader matched this PDF payload");
2570 }
2571
2572 if !applied {
2573 match extract_pdf_text(payload) {
2574 Ok(pdf_text) => {
2575 if !apply_extracted_text(
2576 &pdf_text,
2577 "pdf_extract",
2578 ExtractionStatus::FallbackUsed,
2579 &mut extraction,
2580 &mut metadata,
2581 &mut search_text,
2582 ) {
2583 extraction.reader = Some("pdf_extract".to_string());
2584 extraction.status = ExtractionStatus::Empty;
2585 let msg = "PDF text extraction yielded no searchable content";
2586 extraction.record_warning(msg);
2587 warn!("{msg}");
2588 }
2589 }
2590 Err(err) => {
2591 extraction.reader = Some("pdf_extract".to_string());
2592 extraction.status = ExtractionStatus::Failed;
2593 let msg = format!("failed to extract PDF text via fallback: {err}");
2594 extraction.record_warning(&msg);
2595 warn!("{msg}");
2596 }
2597 }
2598 }
2599 }
2600
2601 if !is_video && mime.starts_with("image/") {
2602 if let Some(image_meta) = analyze_image(payload) {
2603 let ImageAnalysis {
2604 width,
2605 height,
2606 palette,
2607 caption,
2608 exif,
2609 } = image_meta;
2610 metadata.width = Some(width);
2611 metadata.height = Some(height);
2612 if !palette.is_empty() {
2613 metadata.colors = Some(palette);
2614 }
2615 if metadata.caption.is_none() {
2616 metadata.caption = caption;
2617 }
2618 if let Some(exif) = exif {
2619 metadata.exif = Some(exif);
2620 }
2621 }
2622 }
2623
2624 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2625 if let Some(AudioAnalysis {
2626 metadata: audio_metadata,
2627 caption: audio_caption,
2628 mut search_terms,
2629 transcript,
2630 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2631 {
2632 if metadata.audio.is_none()
2633 || metadata
2634 .audio
2635 .as_ref()
2636 .map_or(true, DocAudioMetadata::is_empty)
2637 {
2638 metadata.audio = Some(audio_metadata);
2639 }
2640 if metadata.caption.is_none() {
2641 metadata.caption = audio_caption;
2642 }
2643
2644 if let Some(ref text) = transcript {
2646 extraction.reader = Some("whisper".to_string());
2647 extraction.status = ExtractionStatus::Ok;
2648 search_text = Some(text.clone());
2649 } else if !search_terms.is_empty() {
2650 match &mut search_text {
2651 Some(existing) => {
2652 for term in search_terms.drain(..) {
2653 if !existing.contains(&term) {
2654 if !existing.ends_with(' ') {
2655 existing.push(' ');
2656 }
2657 existing.push_str(&term);
2658 }
2659 }
2660 }
2661 None => {
2662 search_text = Some(search_terms.join(" "));
2663 }
2664 }
2665 }
2666 }
2667 }
2668
2669 if metadata.caption.is_none() {
2670 if let Some(title) = inferred_title {
2671 let trimmed = title.trim();
2672 if !trimmed.is_empty() {
2673 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2674 }
2675 }
2676 }
2677
2678 FileAnalysis {
2679 mime,
2680 metadata: if metadata.is_empty() {
2681 None
2682 } else {
2683 Some(metadata)
2684 },
2685 search_text,
2686 extraction,
2687 }
2688}
2689
2690fn default_reader_registry() -> &'static ReaderRegistry {
2691 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2692 REGISTRY.get_or_init(ReaderRegistry::default)
2693}
2694
2695fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2696 if detect_pdf_magic(magic) {
2697 return Some(DocumentFormat::Pdf);
2698 }
2699
2700 let mime = mime?.trim().to_ascii_lowercase();
2701 match mime.as_str() {
2702 "application/pdf" => Some(DocumentFormat::Pdf),
2703 "text/plain" => Some(DocumentFormat::PlainText),
2704 "text/markdown" => Some(DocumentFormat::Markdown),
2705 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2706 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2707 Some(DocumentFormat::Docx)
2708 }
2709 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2710 Some(DocumentFormat::Pptx)
2711 }
2712 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2713 Some(DocumentFormat::Xlsx)
2714 }
2715 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2716 _ => None,
2717 }
2718}
2719
2720fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2721 let mut slice = match magic {
2722 Some(slice) if !slice.is_empty() => slice,
2723 _ => return false,
2724 };
2725 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2726 slice = &slice[3..];
2727 }
2728 while let Some((first, rest)) = slice.split_first() {
2729 if first.is_ascii_whitespace() {
2730 slice = rest;
2731 } else {
2732 break;
2733 }
2734 }
2735 slice.starts_with(b"%PDF")
2736}
2737
2738fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2741 let mut best_text = String::new();
2742 let mut best_source = "none";
2743
2744 let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2747
2748 match pdf_extract::extract_text_from_mem(payload) {
2750 Ok(text) => {
2751 let trimmed = text.trim();
2752 if trimmed.len() > best_text.len() {
2753 best_text = trimmed.to_string();
2754 best_source = "pdf_extract";
2755 }
2756 if trimmed.len() >= min_good_chars {
2757 info!("pdf_extract extracted {} chars (good)", trimmed.len());
2758 return Ok(best_text);
2759 } else if !trimmed.is_empty() {
2760 warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2761 }
2762 }
2763 Err(err) => {
2764 warn!("pdf_extract failed: {err}");
2765 }
2766 }
2767
2768 match extract_pdf_with_lopdf(payload) {
2770 Ok(text) => {
2771 let trimmed = text.trim();
2772 if trimmed.len() > best_text.len() {
2773 best_text = trimmed.to_string();
2774 best_source = "lopdf";
2775 }
2776 if trimmed.len() >= min_good_chars {
2777 info!("lopdf extracted {} chars (good)", trimmed.len());
2778 return Ok(best_text);
2779 } else if !trimmed.is_empty() {
2780 warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2781 }
2782 }
2783 Err(err) => {
2784 warn!("lopdf failed: {err}");
2785 }
2786 }
2787
2788 match fallback_pdftotext(payload) {
2790 Ok(text) => {
2791 let trimmed = text.trim();
2792 if trimmed.len() > best_text.len() {
2793 best_text = trimmed.to_string();
2794 best_source = "pdftotext";
2795 }
2796 if !trimmed.is_empty() {
2797 info!("pdftotext extracted {} chars", trimmed.len());
2798 }
2799 }
2800 Err(err) => {
2801 warn!("pdftotext failed: {err}");
2802 }
2803 }
2804
2805 if !best_text.is_empty() {
2807 info!("using {} extraction ({} chars)", best_source, best_text.len());
2808 Ok(best_text)
2809 } else {
2810 warn!("all PDF extraction methods failed, storing without searchable text");
2811 Ok(String::new())
2812 }
2813}
2814
2815fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2817 use lopdf::Document;
2818
2819 let mut document = Document::load_mem(payload)
2820 .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2821
2822 if document.is_encrypted() {
2824 let _ = document.decrypt("");
2825 }
2826
2827 let _ = document.decompress();
2829
2830 let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
2832 if page_numbers.is_empty() {
2833 return Err(anyhow!("PDF has no pages"));
2834 }
2835 page_numbers.sort_unstable();
2836
2837 if page_numbers.len() > 4096 {
2839 page_numbers.truncate(4096);
2840 warn!("PDF has more than 4096 pages, truncating extraction");
2841 }
2842
2843 let text = document
2845 .extract_text(&page_numbers)
2846 .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
2847
2848 Ok(text.trim().to_string())
2849}
2850
2851fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2852 use std::io::Write;
2853 use std::process::{Command, Stdio};
2854
2855 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2856
2857 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2858 temp_pdf
2859 .write_all(payload)
2860 .context("failed to write pdf payload to temp file")?;
2861 temp_pdf.flush().context("failed to flush temp pdf file")?;
2862
2863 let child = Command::new(pdftotext)
2864 .arg("-layout")
2865 .arg(temp_pdf.path())
2866 .arg("-")
2867 .stdout(Stdio::piped())
2868 .spawn()
2869 .context("failed to spawn pdftotext")?;
2870
2871 let output = child
2872 .wait_with_output()
2873 .context("failed to read pdftotext output")?;
2874
2875 if !output.status.success() {
2876 return Err(anyhow!("pdftotext exited with status {}", output.status));
2877 }
2878
2879 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2880 Ok(text)
2881}
2882
2883fn detect_mime(
2884 source_path: Option<&Path>,
2885 payload: &[u8],
2886 payload_utf8: Option<&str>,
2887) -> (String, bool) {
2888 if let Some(kind) = infer::get(payload) {
2889 let mime = kind.mime_type().to_string();
2890 let treat_as_text = mime.starts_with("text/")
2891 || matches!(
2892 mime.as_str(),
2893 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2894 );
2895 return (mime, treat_as_text);
2896 }
2897
2898 let magic = magic_from_u8(payload);
2899 if !magic.is_empty() && magic != "application/octet-stream" {
2900 let treat_as_text = magic.starts_with("text/")
2901 || matches!(
2902 magic,
2903 "application/json"
2904 | "application/xml"
2905 | "application/javascript"
2906 | "image/svg+xml"
2907 | "text/plain"
2908 );
2909 return (magic.to_string(), treat_as_text);
2910 }
2911
2912 if let Some(path) = source_path {
2913 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2914 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2915 return (mime.to_string(), treat_as_text);
2916 }
2917 }
2918 }
2919
2920 if payload_utf8.is_some() {
2921 return ("text/plain".to_string(), true);
2922 }
2923
2924 ("application/octet-stream".to_string(), false)
2925}
2926
2927fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2928 let ext_lower = ext.to_ascii_lowercase();
2929 match ext_lower.as_str() {
2930 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2931 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2932 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2933 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2934 "md" | "markdown" => Some(("text/markdown", true)),
2935 "rst" => Some(("text/x-rst", true)),
2936 "json" => Some(("application/json", true)),
2937 "csv" => Some(("text/csv", true)),
2938 "tsv" => Some(("text/tab-separated-values", true)),
2939 "yaml" | "yml" => Some(("application/yaml", true)),
2940 "toml" => Some(("application/toml", true)),
2941 "html" | "htm" => Some(("text/html", true)),
2942 "xml" => Some(("application/xml", true)),
2943 "svg" => Some(("image/svg+xml", true)),
2944 "gif" => Some(("image/gif", false)),
2945 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2946 "png" => Some(("image/png", false)),
2947 "bmp" => Some(("image/bmp", false)),
2948 "ico" => Some(("image/x-icon", false)),
2949 "tif" | "tiff" => Some(("image/tiff", false)),
2950 "webp" => Some(("image/webp", false)),
2951 _ => None,
2952 }
2953}
2954
2955fn caption_from_text(text: &str) -> Option<String> {
2956 for line in text.lines() {
2957 let trimmed = line.trim();
2958 if !trimmed.is_empty() {
2959 return Some(truncate_to_boundary(trimmed, 240));
2960 }
2961 }
2962 None
2963}
2964
2965fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2966 if text.len() <= max_len {
2967 return text.to_string();
2968 }
2969 let mut end = max_len;
2970 while end > 0 && !text.is_char_boundary(end) {
2971 end -= 1;
2972 }
2973 if end == 0 {
2974 return String::new();
2975 }
2976 let mut truncated = text[..end].trim_end().to_string();
2977 truncated.push('…');
2978 truncated
2979}
2980
2981fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2982 let reader = ImageReader::new(Cursor::new(payload))
2983 .with_guessed_format()
2984 .map_err(|err| warn!("failed to guess image format: {err}"))
2985 .ok()?;
2986 let image = reader
2987 .decode()
2988 .map_err(|err| warn!("failed to decode image: {err}"))
2989 .ok()?;
2990 let width = image.width();
2991 let height = image.height();
2992 let rgb = image.to_rgb8();
2993 let palette = match get_palette(
2994 rgb.as_raw(),
2995 ColorFormat::Rgb,
2996 COLOR_PALETTE_SIZE,
2997 COLOR_PALETTE_QUALITY,
2998 ) {
2999 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3000 Err(err) => {
3001 warn!("failed to compute colour palette: {err}");
3002 Vec::new()
3003 }
3004 };
3005
3006 let (exif, exif_caption) = extract_exif_metadata(payload);
3007
3008 Some(ImageAnalysis {
3009 width,
3010 height,
3011 palette,
3012 caption: exif_caption,
3013 exif,
3014 })
3015}
3016
3017fn color_to_hex(color: Color) -> String {
3018 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3019}
3020
3021fn analyze_audio(
3022 payload: &[u8],
3023 source_path: Option<&Path>,
3024 mime: &str,
3025 options: AudioAnalyzeOptions,
3026) -> Option<AudioAnalysis> {
3027 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3028 use symphonia::core::errors::Error as SymphoniaError;
3029 use symphonia::core::formats::FormatOptions;
3030 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3031 use symphonia::core::meta::MetadataOptions;
3032 use symphonia::core::probe::Hint;
3033
3034 let cursor = Cursor::new(payload.to_vec());
3035 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3036
3037 let mut hint = Hint::new();
3038 if let Some(path) = source_path {
3039 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3040 hint.with_extension(ext);
3041 }
3042 }
3043 if let Some(suffix) = mime.split('/').nth(1) {
3044 hint.with_extension(suffix);
3045 }
3046
3047 let probed = symphonia::default::get_probe()
3048 .format(
3049 &hint,
3050 mss,
3051 &FormatOptions::default(),
3052 &MetadataOptions::default(),
3053 )
3054 .map_err(|err| warn!("failed to probe audio stream: {err}"))
3055 .ok()?;
3056
3057 let mut format = probed.format;
3058 let track = format.default_track()?;
3059 let track_id = track.id;
3060 let codec_params: CodecParameters = track.codec_params.clone();
3061 let sample_rate = codec_params.sample_rate;
3062 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3063
3064 let mut decoder = symphonia::default::get_codecs()
3065 .make(&codec_params, &DecoderOptions::default())
3066 .map_err(|err| warn!("failed to create audio decoder: {err}"))
3067 .ok()?;
3068
3069 let mut decoded_frames: u64 = 0;
3070 loop {
3071 let packet = match format.next_packet() {
3072 Ok(packet) => packet,
3073 Err(SymphoniaError::IoError(_)) => break,
3074 Err(SymphoniaError::DecodeError(err)) => {
3075 warn!("skipping undecodable audio packet: {err}");
3076 continue;
3077 }
3078 Err(SymphoniaError::ResetRequired) => {
3079 decoder.reset();
3080 continue;
3081 }
3082 Err(other) => {
3083 warn!("stopping audio analysis due to error: {other}");
3084 break;
3085 }
3086 };
3087
3088 if packet.track_id() != track_id {
3089 continue;
3090 }
3091
3092 match decoder.decode(&packet) {
3093 Ok(audio_buf) => {
3094 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3095 }
3096 Err(SymphoniaError::DecodeError(err)) => {
3097 warn!("failed to decode audio packet: {err}");
3098 }
3099 Err(SymphoniaError::IoError(_)) => break,
3100 Err(SymphoniaError::ResetRequired) => {
3101 decoder.reset();
3102 }
3103 Err(other) => {
3104 warn!("decoder error: {other}");
3105 break;
3106 }
3107 }
3108 }
3109
3110 let duration_secs = match sample_rate {
3111 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3112 _ => 0.0,
3113 };
3114
3115 let bitrate_kbps = if duration_secs > 0.0 {
3116 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3117 } else {
3118 None
3119 };
3120
3121 let tags = extract_lofty_tags(payload);
3122 let caption = tags
3123 .get("title")
3124 .cloned()
3125 .or_else(|| tags.get("tracktitle").cloned());
3126
3127 let mut search_terms = Vec::new();
3128 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3129 search_terms.push(title.clone());
3130 }
3131 if let Some(artist) = tags.get("artist") {
3132 search_terms.push(artist.clone());
3133 }
3134 if let Some(album) = tags.get("album") {
3135 search_terms.push(album.clone());
3136 }
3137 if let Some(genre) = tags.get("genre") {
3138 search_terms.push(genre.clone());
3139 }
3140
3141 let mut segments = Vec::new();
3142 if duration_secs > 0.0 {
3143 let segment_len = options.normalised_segment_secs() as f64;
3144 if segment_len > 0.0 {
3145 let mut start = 0.0;
3146 let mut idx = 1usize;
3147 while start < duration_secs {
3148 let end = (start + segment_len).min(duration_secs);
3149 segments.push(AudioSegmentMetadata {
3150 start_seconds: start as f32,
3151 end_seconds: end as f32,
3152 label: Some(format!("Segment {}", idx)),
3153 });
3154 if end >= duration_secs {
3155 break;
3156 }
3157 start = end;
3158 idx += 1;
3159 }
3160 }
3161 }
3162
3163 let mut metadata = DocAudioMetadata::default();
3164 if duration_secs > 0.0 {
3165 metadata.duration_secs = Some(duration_secs as f32);
3166 }
3167 if let Some(rate) = sample_rate {
3168 metadata.sample_rate_hz = Some(rate);
3169 }
3170 if let Some(channels) = channel_count {
3171 metadata.channels = Some(channels);
3172 }
3173 if let Some(bitrate) = bitrate_kbps {
3174 metadata.bitrate_kbps = Some(bitrate);
3175 }
3176 if codec_params.codec != CODEC_TYPE_NULL {
3177 metadata.codec = Some(format!("{:?}", codec_params.codec));
3178 }
3179 if !segments.is_empty() {
3180 metadata.segments = segments;
3181 }
3182 if !tags.is_empty() {
3183 metadata.tags = tags
3184 .iter()
3185 .map(|(k, v)| (k.clone(), v.clone()))
3186 .collect::<BTreeMap<_, _>>();
3187 }
3188
3189 let transcript = if options.transcribe {
3191 transcribe_audio(source_path)
3192 } else {
3193 None
3194 };
3195
3196 Some(AudioAnalysis {
3197 metadata,
3198 caption,
3199 search_terms,
3200 transcript,
3201 })
3202}
3203
3204#[cfg(feature = "whisper")]
3206fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3207 use memvid_core::{WhisperConfig, WhisperTranscriber};
3208
3209 let path = source_path?;
3210
3211 tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3212
3213 let config = WhisperConfig::default();
3214 let mut transcriber = match WhisperTranscriber::new(&config) {
3215 Ok(t) => t,
3216 Err(e) => {
3217 tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3218 return None;
3219 }
3220 };
3221
3222 match transcriber.transcribe_file(path) {
3223 Ok(result) => {
3224 tracing::info!(
3225 duration = result.duration_secs,
3226 text_len = result.text.len(),
3227 "Audio transcription complete"
3228 );
3229 Some(result.text)
3230 }
3231 Err(e) => {
3232 tracing::warn!(error = %e, "Failed to transcribe audio");
3233 None
3234 }
3235 }
3236}
3237
3238#[cfg(not(feature = "whisper"))]
3239fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3240 tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3241 None
3242}
3243
3244fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3245 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3246
3247 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3248 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3249 out.entry("title".into())
3250 .or_insert_with(|| value.to_string());
3251 out.entry("tracktitle".into())
3252 .or_insert_with(|| value.to_string());
3253 }
3254 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3255 out.entry("artist".into())
3256 .or_insert_with(|| value.to_string());
3257 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3258 out.entry("artist".into())
3259 .or_insert_with(|| value.to_string());
3260 }
3261 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3262 out.entry("album".into())
3263 .or_insert_with(|| value.to_string());
3264 }
3265 if let Some(value) = tag.get_string(&ItemKey::Genre) {
3266 out.entry("genre".into())
3267 .or_insert_with(|| value.to_string());
3268 }
3269 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3270 out.entry("track_number".into())
3271 .or_insert_with(|| value.to_string());
3272 }
3273 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3274 out.entry("disc_number".into())
3275 .or_insert_with(|| value.to_string());
3276 }
3277 }
3278
3279 let mut tags = HashMap::new();
3280 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3281 Ok(probe) => probe,
3282 Err(err) => {
3283 warn!("failed to guess audio tag file type: {err}");
3284 return tags;
3285 }
3286 };
3287
3288 let tagged_file = match probe.read() {
3289 Ok(file) => file,
3290 Err(err) => {
3291 warn!("failed to read audio tags: {err}");
3292 return tags;
3293 }
3294 };
3295
3296 if let Some(primary) = tagged_file.primary_tag() {
3297 collect_tag(primary, &mut tags);
3298 }
3299 for tag in tagged_file.tags() {
3300 collect_tag(tag, &mut tags);
3301 }
3302
3303 tags
3304}
3305
3306fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3307 let mut cursor = Cursor::new(payload);
3308 let exif = match ExifReader::new().read_from_container(&mut cursor) {
3309 Ok(exif) => exif,
3310 Err(err) => {
3311 warn!("failed to read EXIF metadata: {err}");
3312 return (None, None);
3313 }
3314 };
3315
3316 let mut doc = DocExifMetadata::default();
3317 let mut has_data = false;
3318
3319 if let Some(make) = exif_string(&exif, Tag::Make) {
3320 doc.make = Some(make);
3321 has_data = true;
3322 }
3323 if let Some(model) = exif_string(&exif, Tag::Model) {
3324 doc.model = Some(model);
3325 has_data = true;
3326 }
3327 if let Some(lens) =
3328 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3329 {
3330 doc.lens = Some(lens);
3331 has_data = true;
3332 }
3333 if let Some(dt) =
3334 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3335 {
3336 doc.datetime = Some(dt);
3337 has_data = true;
3338 }
3339
3340 if let Some(gps) = extract_gps_metadata(&exif) {
3341 doc.gps = Some(gps);
3342 has_data = true;
3343 }
3344
3345 let caption = exif_string(&exif, Tag::ImageDescription);
3346
3347 let metadata = if has_data { Some(doc) } else { None };
3348 (metadata, caption)
3349}
3350
3351fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3352 exif.fields()
3353 .find(|field| field.tag == tag)
3354 .and_then(|field| field_to_string(field, exif))
3355}
3356
3357fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3358 let value = field.display_value().with_unit(exif).to_string();
3359 let trimmed = value.trim_matches('\0').trim();
3360 if trimmed.is_empty() {
3361 None
3362 } else {
3363 Some(trimmed.to_string())
3364 }
3365}
3366
3367fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3368 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3369 let latitude_ref_field = exif
3370 .fields()
3371 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3372 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3373 let longitude_ref_field = exif
3374 .fields()
3375 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3376
3377 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3378 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3379
3380 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3381 if reference.eq_ignore_ascii_case("S") {
3382 latitude = -latitude;
3383 }
3384 }
3385 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3386 if reference.eq_ignore_ascii_case("W") {
3387 longitude = -longitude;
3388 }
3389 }
3390
3391 Some(DocGpsMetadata {
3392 latitude,
3393 longitude,
3394 })
3395}
3396
3397fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3398 match value {
3399 ExifValue::Rational(values) if !values.is_empty() => {
3400 let deg = rational_to_f64_u(values.get(0)?)?;
3401 let min = values
3402 .get(1)
3403 .and_then(|v| rational_to_f64_u(v))
3404 .unwrap_or(0.0);
3405 let sec = values
3406 .get(2)
3407 .and_then(|v| rational_to_f64_u(v))
3408 .unwrap_or(0.0);
3409 Some(deg + (min / 60.0) + (sec / 3600.0))
3410 }
3411 ExifValue::SRational(values) if !values.is_empty() => {
3412 let deg = rational_to_f64_i(values.get(0)?)?;
3413 let min = values
3414 .get(1)
3415 .and_then(|v| rational_to_f64_i(v))
3416 .unwrap_or(0.0);
3417 let sec = values
3418 .get(2)
3419 .and_then(|v| rational_to_f64_i(v))
3420 .unwrap_or(0.0);
3421 Some(deg + (min / 60.0) + (sec / 3600.0))
3422 }
3423 _ => None,
3424 }
3425}
3426
3427fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3428 if value.denom == 0 {
3429 None
3430 } else {
3431 Some(value.num as f64 / value.denom as f64)
3432 }
3433}
3434
3435fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3436 if value.denom == 0 {
3437 None
3438 } else {
3439 Some(value.num as f64 / value.denom as f64)
3440 }
3441}
3442
3443fn value_to_ascii(value: &ExifValue) -> Option<String> {
3444 if let ExifValue::Ascii(values) = value {
3445 values
3446 .first()
3447 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3448 .map(|s| s.trim_matches('\0').trim().to_string())
3449 .filter(|s| !s.is_empty())
3450 } else {
3451 None
3452 }
3453}
3454
3455fn ingest_payload(
3456 mem: &mut Memvid,
3457 args: &PutArgs,
3458 config: &CliConfig,
3459 payload: Vec<u8>,
3460 source_path: Option<&Path>,
3461 capacity_guard: Option<&mut CapacityGuard>,
3462 enable_embedding: bool,
3463 runtime: Option<&EmbeddingRuntime>,
3464 parallel_mode: bool,
3465) -> Result<IngestOutcome> {
3466 let original_bytes = payload.len();
3467 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3468
3469 let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3472 crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3473
3474 let payload_text = std::str::from_utf8(&payload).ok();
3475 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3476
3477 let audio_opts = AudioAnalyzeOptions {
3478 force: args.audio || args.transcribe,
3479 segment_secs: args
3480 .audio_segment_seconds
3481 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3482 transcribe: args.transcribe,
3483 };
3484
3485 let mut analysis = analyze_file(
3486 source_path,
3487 &payload,
3488 payload_text,
3489 inferred_title.as_deref(),
3490 audio_opts,
3491 args.video,
3492 );
3493
3494 if args.video && !analysis.mime.starts_with("video/") {
3495 anyhow::bail!(
3496 "--video requires a video input; detected MIME type {}",
3497 analysis.mime
3498 );
3499 }
3500
3501 let mut search_text = analysis.search_text.clone();
3502 if let Some(ref mut text) = search_text {
3503 if text.len() > MAX_SEARCH_TEXT_LEN {
3504 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3505 *text = truncated;
3506 }
3507 }
3508 analysis.search_text = search_text.clone();
3509
3510 let uri = if let Some(ref explicit) = args.uri {
3511 derive_uri(Some(explicit), None)
3512 } else if args.video {
3513 derive_video_uri(&payload, source_path, &analysis.mime)
3514 } else {
3515 derive_uri(None, source_path)
3516 };
3517
3518 let mut options_builder = PutOptions::builder()
3519 .enable_embedding(enable_embedding)
3520 .auto_tag(!args.no_auto_tag)
3521 .extract_dates(!args.no_extract_dates)
3522 .extract_triplets(!args.no_extract_triplets);
3523 if let Some(ts) = args.timestamp {
3524 options_builder = options_builder.timestamp(ts);
3525 }
3526 if let Some(track) = &args.track {
3527 options_builder = options_builder.track(track.clone());
3528 }
3529 if args.video {
3530 options_builder = options_builder.kind("video");
3531 options_builder = options_builder.tag("kind", "video");
3532 options_builder = options_builder.push_tag("video");
3533 } else if let Some(kind) = &args.kind {
3534 options_builder = options_builder.kind(kind.clone());
3535 }
3536 options_builder = options_builder.uri(uri.clone());
3537 if let Some(ref title) = inferred_title {
3538 options_builder = options_builder.title(title.clone());
3539 }
3540 for (key, value) in &args.tags {
3541 options_builder = options_builder.tag(key.clone(), value.clone());
3542 if !key.is_empty() {
3543 options_builder = options_builder.push_tag(key.clone());
3544 }
3545 if !value.is_empty() && value != key {
3546 options_builder = options_builder.push_tag(value.clone());
3547 }
3548 }
3549 for label in &args.labels {
3550 options_builder = options_builder.label(label.clone());
3551 }
3552 if let Some(metadata) = analysis.metadata.clone() {
3553 if !metadata.is_empty() {
3554 options_builder = options_builder.metadata(metadata);
3555 }
3556 }
3557 for (idx, entry) in args.metadata.iter().enumerate() {
3558 match serde_json::from_str::<DocMetadata>(entry) {
3559 Ok(meta) => {
3560 options_builder = options_builder.metadata(meta);
3561 }
3562 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3563 Ok(value) => {
3564 options_builder =
3565 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3566 }
3567 Err(err) => {
3568 warn!("failed to parse --metadata JSON: {err}");
3569 }
3570 },
3571 }
3572 }
3573 if let Some(text) = search_text.clone() {
3574 if !text.trim().is_empty() {
3575 options_builder = options_builder.search_text(text);
3576 }
3577 }
3578 let mut options = options_builder.build();
3579
3580 let existing_frame = if args.update_existing || !args.allow_duplicate {
3581 match mem.frame_by_uri(&uri) {
3582 Ok(frame) => Some(frame),
3583 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3584 Err(err) => return Err(err.into()),
3585 }
3586 } else {
3587 None
3588 };
3589
3590 let frame_id = if let Some(ref existing) = existing_frame {
3593 existing.id
3594 } else {
3595 mem.stats()?.frame_count
3596 };
3597
3598 let mut embedded = false;
3599 let allow_embedding = enable_embedding && !args.video;
3600 if enable_embedding && !allow_embedding && args.video {
3601 warn!("semantic embeddings are not generated for video payloads");
3602 }
3603 let seq = if let Some(existing) = existing_frame {
3604 if args.update_existing {
3605 let payload_for_update = payload.clone();
3606 if allow_embedding {
3607 if let Some(path) = args.embedding_vec.as_deref() {
3608 let embedding = crate::utils::read_embedding(path)?;
3609 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3610 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3611 let expected = choice.dimensions();
3612 if expected != 0 && embedding.len() != expected {
3613 anyhow::bail!(
3614 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3615 embedding.len(),
3616 choice.name(),
3617 expected
3618 );
3619 }
3620 apply_embedding_identity_metadata_from_choice(
3621 &mut options,
3622 choice,
3623 embedding.len(),
3624 Some(model_str),
3625 );
3626 } else {
3627 options.extra_metadata.insert(
3628 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3629 embedding.len().to_string(),
3630 );
3631 }
3632 embedded = true;
3633 mem.update_frame(
3634 existing.id,
3635 Some(payload_for_update),
3636 options.clone(),
3637 Some(embedding),
3638 )
3639 .map_err(|err| {
3640 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3641 })?
3642 } else {
3643 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3644 let embed_text = search_text
3645 .clone()
3646 .or_else(|| payload_text.map(|text| text.to_string()))
3647 .unwrap_or_default();
3648 if embed_text.trim().is_empty() {
3649 warn!("no textual content available; embedding skipped");
3650 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3651 .map_err(|err| {
3652 map_put_error(
3653 err,
3654 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3655 )
3656 })?
3657 } else {
3658 let truncated_text = truncate_for_embedding(&embed_text);
3660 if truncated_text.len() < embed_text.len() {
3661 info!(
3662 "Truncated text from {} to {} chars for embedding",
3663 embed_text.len(),
3664 truncated_text.len()
3665 );
3666 }
3667 let embedding = runtime.embed_passage(&truncated_text)?;
3668 embedded = true;
3669 apply_embedding_identity_metadata(&mut options, runtime);
3670 mem.update_frame(
3671 existing.id,
3672 Some(payload_for_update),
3673 options.clone(),
3674 Some(embedding),
3675 )
3676 .map_err(|err| {
3677 map_put_error(
3678 err,
3679 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3680 )
3681 })?
3682 }
3683 }
3684 } else {
3685 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3686 .map_err(|err| {
3687 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3688 })?
3689 }
3690 } else {
3691 return Err(DuplicateUriError::new(uri.as_str()).into());
3692 }
3693 } else {
3694 if let Some(guard) = capacity_guard.as_ref() {
3695 guard.ensure_capacity(stored_bytes as u64)?;
3696 }
3697 if parallel_mode {
3698 #[cfg(feature = "parallel_segments")]
3699 {
3700 let mut parent_embedding = None;
3701 let mut chunk_embeddings_vec = None;
3702 if allow_embedding {
3703 if let Some(path) = args.embedding_vec.as_deref() {
3704 let embedding = crate::utils::read_embedding(path)?;
3705 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3706 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3707 let expected = choice.dimensions();
3708 if expected != 0 && embedding.len() != expected {
3709 anyhow::bail!(
3710 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3711 embedding.len(),
3712 choice.name(),
3713 expected
3714 );
3715 }
3716 apply_embedding_identity_metadata_from_choice(
3717 &mut options,
3718 choice,
3719 embedding.len(),
3720 Some(model_str),
3721 );
3722 } else {
3723 options.extra_metadata.insert(
3724 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3725 embedding.len().to_string(),
3726 );
3727 }
3728 parent_embedding = Some(embedding);
3729 embedded = true;
3730 } else {
3731 let runtime =
3732 runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3733 let embed_text = search_text
3734 .clone()
3735 .or_else(|| payload_text.map(|text| text.to_string()))
3736 .unwrap_or_default();
3737 if embed_text.trim().is_empty() {
3738 warn!("no textual content available; embedding skipped");
3739 } else {
3740 info!(
3742 "parallel mode: checking for chunks on {} bytes payload",
3743 payload.len()
3744 );
3745 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3746 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3748
3749 #[cfg(feature = "temporal_enrich")]
3751 let enriched_chunks = if args.temporal_enrich {
3752 info!(
3753 "Temporal enrichment enabled, processing {} chunks",
3754 chunk_texts.len()
3755 );
3756 let today = chrono::Local::now().date_naive();
3758 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3759 let enriched: Vec<String> =
3761 results.iter().map(|(text, _)| text.clone()).collect();
3762 let resolved_count = results
3763 .iter()
3764 .filter(|(_, e)| {
3765 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3766 })
3767 .count();
3768 info!(
3769 "Temporal enrichment: resolved {} chunks with temporal context",
3770 resolved_count
3771 );
3772 enriched
3773 } else {
3774 chunk_texts.clone()
3775 };
3776 #[cfg(not(feature = "temporal_enrich"))]
3777 let enriched_chunks = {
3778 if args.temporal_enrich {
3779 warn!(
3780 "Temporal enrichment requested but feature not compiled in"
3781 );
3782 }
3783 chunk_texts.clone()
3784 };
3785
3786 let embed_chunks = if args.contextual {
3788 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3789 let engine = if args.contextual_model.as_deref() == Some("local") {
3790 #[cfg(feature = "llama-cpp")]
3791 {
3792 let model_path = get_local_contextual_model_path()?;
3793 ContextualEngine::local(model_path)
3794 }
3795 #[cfg(not(feature = "llama-cpp"))]
3796 {
3797 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3798 }
3799 } else if let Some(model) = &args.contextual_model {
3800 ContextualEngine::openai_with_model(model)?
3801 } else {
3802 ContextualEngine::openai()?
3803 };
3804
3805 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3806 {
3807 Ok(contexts) => {
3808 info!("Generated {} contextual prefixes", contexts.len());
3809 apply_contextual_prefixes(
3810 &embed_text,
3811 &enriched_chunks,
3812 &contexts,
3813 )
3814 }
3815 Err(e) => {
3816 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3817 enriched_chunks.clone()
3818 }
3819 }
3820 } else {
3821 enriched_chunks
3822 };
3823
3824 let truncated_chunks: Vec<String> = embed_chunks
3825 .iter()
3826 .map(|chunk_text| {
3827 let truncated_chunk = truncate_for_embedding(chunk_text);
3829 if truncated_chunk.len() < chunk_text.len() {
3830 info!(
3831 "parallel mode: Truncated chunk from {} to {} chars for embedding",
3832 chunk_text.len(),
3833 truncated_chunk.len()
3834 );
3835 }
3836 truncated_chunk
3837 })
3838 .collect();
3839 let truncated_refs: Vec<&str> =
3840 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3841 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3842 let num_chunks = chunk_embeddings.len();
3843 chunk_embeddings_vec = Some(chunk_embeddings);
3844 let parent_text = truncate_for_embedding(&embed_text);
3846 if parent_text.len() < embed_text.len() {
3847 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3848 }
3849 parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3850 embedded = true;
3851 info!(
3852 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3853 num_chunks
3854 );
3855 } else {
3856 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3858 let truncated_text = truncate_for_embedding(&embed_text);
3859 if truncated_text.len() < embed_text.len() {
3860 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3861 }
3862 parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3863 embedded = true;
3864 }
3865 }
3866 }
3867 }
3868 if embedded {
3869 if let Some(runtime) = runtime {
3870 apply_embedding_identity_metadata(&mut options, runtime);
3871 }
3872 }
3873 let payload_variant = if let Some(path) = source_path {
3874 ParallelPayload::Path(path.to_path_buf())
3875 } else {
3876 ParallelPayload::Bytes(payload)
3877 };
3878 let input = ParallelInput {
3879 payload: payload_variant,
3880 options: options.clone(),
3881 embedding: parent_embedding,
3882 chunk_embeddings: chunk_embeddings_vec,
3883 };
3884 return Ok(IngestOutcome {
3885 report: PutReport {
3886 seq: 0,
3887 frame_id: 0, uri,
3889 title: inferred_title,
3890 original_bytes,
3891 stored_bytes,
3892 compressed,
3893 source: source_path.map(Path::to_path_buf),
3894 mime: Some(analysis.mime),
3895 metadata: analysis.metadata,
3896 extraction: analysis.extraction,
3897 #[cfg(feature = "logic_mesh")]
3898 search_text: search_text.clone(),
3899 },
3900 embedded,
3901 parallel_input: Some(input),
3902 });
3903 }
3904 #[cfg(not(feature = "parallel_segments"))]
3905 {
3906 mem.put_bytes_with_options(&payload, options)
3907 .map_err(|err| {
3908 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3909 })?
3910 }
3911 } else if allow_embedding {
3912 if let Some(path) = args.embedding_vec.as_deref() {
3913 let embedding = crate::utils::read_embedding(path)?;
3914 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3915 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3916 let expected = choice.dimensions();
3917 if expected != 0 && embedding.len() != expected {
3918 anyhow::bail!(
3919 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3920 embedding.len(),
3921 choice.name(),
3922 expected
3923 );
3924 }
3925 apply_embedding_identity_metadata_from_choice(
3926 &mut options,
3927 choice,
3928 embedding.len(),
3929 Some(model_str),
3930 );
3931 } else {
3932 options.extra_metadata.insert(
3933 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3934 embedding.len().to_string(),
3935 );
3936 }
3937 embedded = true;
3938 mem.put_with_embedding_and_options(&payload, embedding, options)
3939 .map_err(|err| {
3940 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3941 })?
3942 } else {
3943 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3944 let embed_text = search_text
3945 .clone()
3946 .or_else(|| payload_text.map(|text| text.to_string()))
3947 .unwrap_or_default();
3948 if embed_text.trim().is_empty() {
3949 warn!("no textual content available; embedding skipped");
3950 mem.put_bytes_with_options(&payload, options)
3951 .map_err(|err| {
3952 map_put_error(
3953 err,
3954 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3955 )
3956 })?
3957 } else {
3958 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3960 info!(
3962 "Document will be chunked into {} chunks, generating embeddings for each",
3963 chunk_texts.len()
3964 );
3965
3966 #[cfg(feature = "temporal_enrich")]
3968 let enriched_chunks = if args.temporal_enrich {
3969 info!(
3970 "Temporal enrichment enabled, processing {} chunks",
3971 chunk_texts.len()
3972 );
3973 let today = chrono::Local::now().date_naive();
3975 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3976 let enriched: Vec<String> =
3978 results.iter().map(|(text, _)| text.clone()).collect();
3979 let resolved_count = results
3980 .iter()
3981 .filter(|(_, e)| {
3982 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3983 })
3984 .count();
3985 info!(
3986 "Temporal enrichment: resolved {} chunks with temporal context",
3987 resolved_count
3988 );
3989 enriched
3990 } else {
3991 chunk_texts.clone()
3992 };
3993 #[cfg(not(feature = "temporal_enrich"))]
3994 let enriched_chunks = {
3995 if args.temporal_enrich {
3996 warn!("Temporal enrichment requested but feature not compiled in");
3997 }
3998 chunk_texts.clone()
3999 };
4000
4001 let embed_chunks = if args.contextual {
4003 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4004 let engine = if args.contextual_model.as_deref() == Some("local") {
4005 #[cfg(feature = "llama-cpp")]
4006 {
4007 let model_path = get_local_contextual_model_path()?;
4008 ContextualEngine::local(model_path)
4009 }
4010 #[cfg(not(feature = "llama-cpp"))]
4011 {
4012 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4013 }
4014 } else if let Some(model) = &args.contextual_model {
4015 ContextualEngine::openai_with_model(model)?
4016 } else {
4017 ContextualEngine::openai()?
4018 };
4019
4020 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4021 Ok(contexts) => {
4022 info!("Generated {} contextual prefixes", contexts.len());
4023 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4024 }
4025 Err(e) => {
4026 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4027 enriched_chunks.clone()
4028 }
4029 }
4030 } else {
4031 enriched_chunks
4032 };
4033
4034 let truncated_chunks: Vec<String> = embed_chunks
4035 .iter()
4036 .map(|chunk_text| {
4037 let truncated_chunk = truncate_for_embedding(chunk_text);
4039 if truncated_chunk.len() < chunk_text.len() {
4040 info!(
4041 "Truncated chunk from {} to {} chars for embedding",
4042 chunk_text.len(),
4043 truncated_chunk.len()
4044 );
4045 }
4046 truncated_chunk
4047 })
4048 .collect();
4049 let truncated_refs: Vec<&str> =
4050 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4051 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4052 let parent_text = truncate_for_embedding(&embed_text);
4054 if parent_text.len() < embed_text.len() {
4055 info!(
4056 "Truncated parent text from {} to {} chars for embedding",
4057 embed_text.len(),
4058 parent_text.len()
4059 );
4060 }
4061 let parent_embedding = runtime.embed_passage(&parent_text)?;
4062 embedded = true;
4063 apply_embedding_identity_metadata(&mut options, runtime);
4064 info!(
4065 "Calling put_with_chunk_embeddings with {} chunk embeddings",
4066 chunk_embeddings.len()
4067 );
4068 mem.put_with_chunk_embeddings(
4069 &payload,
4070 Some(parent_embedding),
4071 chunk_embeddings,
4072 options,
4073 )
4074 .map_err(|err| {
4075 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4076 })?
4077 } else {
4078 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4080 let truncated_text = truncate_for_embedding(&embed_text);
4081 if truncated_text.len() < embed_text.len() {
4082 info!(
4083 "Truncated text from {} to {} chars for embedding",
4084 embed_text.len(),
4085 truncated_text.len()
4086 );
4087 }
4088 let embedding = runtime.embed_passage(&truncated_text)?;
4089 embedded = true;
4090 apply_embedding_identity_metadata(&mut options, runtime);
4091 mem.put_with_embedding_and_options(&payload, embedding, options)
4092 .map_err(|err| {
4093 map_put_error(
4094 err,
4095 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4096 )
4097 })?
4098 }
4099 }
4100 }
4101 } else {
4102 mem.put_bytes_with_options(&payload, options)
4103 .map_err(|err| {
4104 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4105 })?
4106 }
4107 };
4108
4109 Ok(IngestOutcome {
4110 report: PutReport {
4111 seq,
4112 frame_id,
4113 uri,
4114 title: inferred_title,
4115 original_bytes,
4116 stored_bytes,
4117 compressed,
4118 source: source_path.map(Path::to_path_buf),
4119 mime: Some(analysis.mime),
4120 metadata: analysis.metadata,
4121 extraction: analysis.extraction,
4122 #[cfg(feature = "logic_mesh")]
4123 search_text,
4124 },
4125 embedded,
4126 #[cfg(feature = "parallel_segments")]
4127 parallel_input: None,
4128 })
4129}
4130
4131fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4132 if std::str::from_utf8(payload).is_ok() {
4133 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4134 Ok((compressed.len(), true))
4135 } else {
4136 Ok((payload.len(), false))
4137 }
4138}
4139
4140fn print_report(report: &PutReport) {
4141 let name = report
4142 .source
4143 .as_ref()
4144 .map(|path| {
4145 let pretty = env::current_dir()
4146 .ok()
4147 .as_ref()
4148 .and_then(|cwd| diff_paths(path, cwd))
4149 .unwrap_or_else(|| path.to_path_buf());
4150 pretty.to_string_lossy().into_owned()
4151 })
4152 .unwrap_or_else(|| "stdin".to_string());
4153
4154 let ratio = if report.original_bytes > 0 {
4155 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4156 } else {
4157 100.0
4158 };
4159
4160 println!("• {name} → {}", report.uri);
4161 println!(" seq: {}", report.seq);
4162 if report.compressed {
4163 println!(
4164 " size: {} B → {} B ({:.1}% of original, compressed)",
4165 report.original_bytes, report.stored_bytes, ratio
4166 );
4167 } else {
4168 println!(" size: {} B (stored as-is)", report.original_bytes);
4169 }
4170 if let Some(mime) = &report.mime {
4171 println!(" mime: {mime}");
4172 }
4173 if let Some(title) = &report.title {
4174 println!(" title: {title}");
4175 }
4176 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4177 println!(
4178 " extraction: status={} reader={}",
4179 report.extraction.status.label(),
4180 extraction_reader
4181 );
4182 if let Some(ms) = report.extraction.duration_ms {
4183 println!(" extraction-duration: {} ms", ms);
4184 }
4185 if let Some(pages) = report.extraction.pages_processed {
4186 println!(" extraction-pages: {}", pages);
4187 }
4188 for warning in &report.extraction.warnings {
4189 println!(" warning: {warning}");
4190 }
4191 if let Some(metadata) = &report.metadata {
4192 if let Some(caption) = &metadata.caption {
4193 println!(" caption: {caption}");
4194 }
4195 if let Some(audio) = metadata.audio.as_ref() {
4196 if audio.duration_secs.is_some()
4197 || audio.sample_rate_hz.is_some()
4198 || audio.channels.is_some()
4199 {
4200 let duration = audio
4201 .duration_secs
4202 .map(|secs| format!("{secs:.1}s"))
4203 .unwrap_or_else(|| "unknown".into());
4204 let rate = audio
4205 .sample_rate_hz
4206 .map(|hz| format!("{} Hz", hz))
4207 .unwrap_or_else(|| "? Hz".into());
4208 let channels = audio
4209 .channels
4210 .map(|ch| ch.to_string())
4211 .unwrap_or_else(|| "?".into());
4212 println!(" audio: duration {duration}, {channels} ch, {rate}");
4213 }
4214 }
4215 }
4216
4217 println!();
4218}
4219
4220fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4221 let source = report
4222 .source
4223 .as_ref()
4224 .map(|path| path.to_string_lossy().into_owned());
4225 let extraction = json!({
4226 "status": report.extraction.status.label(),
4227 "reader": report.extraction.reader,
4228 "duration_ms": report.extraction.duration_ms,
4229 "pages_processed": report.extraction.pages_processed,
4230 "warnings": report.extraction.warnings,
4231 });
4232 json!({
4233 "seq": report.seq,
4234 "uri": report.uri,
4235 "title": report.title,
4236 "original_bytes": report.original_bytes,
4237 "original_bytes_human": format_bytes(report.original_bytes as u64),
4238 "stored_bytes": report.stored_bytes,
4239 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4240 "compressed": report.compressed,
4241 "mime": report.mime,
4242 "source": source,
4243 "metadata": report.metadata,
4244 "extraction": extraction,
4245 })
4246}
4247
4248fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4249 match err {
4250 MemvidError::CapacityExceeded {
4251 current,
4252 limit,
4253 required,
4254 } => anyhow!(CapacityExceededMessage {
4255 current,
4256 limit,
4257 required,
4258 }),
4259 other => other.into(),
4260 }
4261}
4262
4263fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4264 for (idx, entry) in entries.iter().enumerate() {
4265 match serde_json::from_str::<DocMetadata>(entry) {
4266 Ok(meta) => {
4267 options.metadata = Some(meta);
4268 }
4269 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4270 Ok(value) => {
4271 options
4272 .extra_metadata
4273 .insert(format!("custom_metadata_{idx}"), value.to_string());
4274 }
4275 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4276 },
4277 }
4278 }
4279}
4280
4281fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4282 let stats = mem.stats()?;
4283 let message = match stats.tier {
4284 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4285 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4286 };
4287 Ok(message)
4288}
4289
4290fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4291 let trimmed = raw.trim().trim_start_matches("mv2://");
4292 let normalized = trimmed.replace('\\', "/");
4293 let mut segments: Vec<String> = Vec::new();
4294 for segment in normalized.split('/') {
4295 if segment.is_empty() || segment == "." {
4296 continue;
4297 }
4298 if segment == ".." {
4299 segments.pop();
4300 continue;
4301 }
4302 segments.push(segment.to_string());
4303 }
4304
4305 if segments.is_empty() {
4306 return normalized
4307 .split('/')
4308 .last()
4309 .filter(|s| !s.is_empty())
4310 .unwrap_or("document")
4311 .to_string();
4312 }
4313
4314 if keep_path {
4315 segments.join("/")
4316 } else {
4317 segments
4318 .last()
4319 .cloned()
4320 .unwrap_or_else(|| "document".to_string())
4321 }
4322}
4323
4324fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4325 if quiet {
4326 let pb = ProgressBar::hidden();
4327 pb.set_message(message.to_string());
4328 return pb;
4329 }
4330
4331 match total {
4332 Some(len) => {
4333 let pb = ProgressBar::new(len);
4334 pb.set_draw_target(ProgressDrawTarget::stderr());
4335 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4336 .unwrap_or_else(|_| ProgressStyle::default_bar());
4337 pb.set_style(style);
4338 pb.set_message(message.to_string());
4339 pb
4340 }
4341 None => {
4342 let pb = ProgressBar::new_spinner();
4343 pb.set_draw_target(ProgressDrawTarget::stderr());
4344 pb.set_message(message.to_string());
4345 pb.enable_steady_tick(Duration::from_millis(120));
4346 pb
4347 }
4348 }
4349}
4350
4351fn create_spinner(message: &str) -> ProgressBar {
4352 let pb = ProgressBar::new_spinner();
4353 pb.set_draw_target(ProgressDrawTarget::stderr());
4354 let style = ProgressStyle::with_template("{spinner} {msg}")
4355 .unwrap_or_else(|_| ProgressStyle::default_spinner())
4356 .tick_strings(&["-", "\\", "|", "/"]);
4357 pb.set_style(style);
4358 pb.set_message(message.to_string());
4359 pb.enable_steady_tick(Duration::from_millis(120));
4360 pb
4361}
4362
4363#[cfg(feature = "parallel_segments")]
4369#[derive(Args)]
4370pub struct PutManyArgs {
4371 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4373 pub file: PathBuf,
4374 #[arg(long)]
4376 pub json: bool,
4377 #[arg(long, value_name = "PATH")]
4380 pub input: Option<PathBuf>,
4381 #[arg(long, default_value = "3")]
4383 pub compression_level: i32,
4384 #[command(flatten)]
4385 pub lock: LockCliArgs,
4386}
4387
4388#[cfg(feature = "parallel_segments")]
4390#[derive(Debug, serde::Deserialize)]
4391pub struct PutManyRequest {
4392 pub title: String,
4393 #[serde(default)]
4394 pub label: String,
4395 pub text: String,
4396 #[serde(default)]
4397 pub uri: Option<String>,
4398 #[serde(default)]
4399 pub tags: Vec<String>,
4400 #[serde(default)]
4401 pub labels: Vec<String>,
4402 #[serde(default)]
4403 pub metadata: serde_json::Value,
4404 #[serde(default)]
4407 pub extra_metadata: BTreeMap<String, String>,
4408 #[serde(default)]
4410 pub embedding: Option<Vec<f32>>,
4411 #[serde(default)]
4415 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4416}
4417
4418#[cfg(feature = "parallel_segments")]
4420#[derive(Debug, serde::Deserialize)]
4421#[serde(transparent)]
4422pub struct PutManyBatch {
4423 pub requests: Vec<PutManyRequest>,
4424}
4425
4426#[cfg(feature = "parallel_segments")]
4427pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4428 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4429 use std::io::Read;
4430
4431 let mut mem = Memvid::open(&args.file)?;
4432 crate::utils::ensure_cli_mutation_allowed(&mem)?;
4433 crate::utils::apply_lock_cli(&mut mem, &args.lock);
4434
4435 let stats = mem.stats()?;
4437 if !stats.has_lex_index {
4438 mem.enable_lex()?;
4439 }
4440 if !stats.has_vec_index {
4441 mem.enable_vec()?;
4442 }
4443
4444 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4446
4447 let batch_json: String = if let Some(input_path) = &args.input {
4449 fs::read_to_string(input_path)
4450 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4451 } else {
4452 let mut buffer = String::new();
4453 io::stdin()
4454 .read_to_string(&mut buffer)
4455 .context("Failed to read from stdin")?;
4456 buffer
4457 };
4458
4459 let batch: PutManyBatch =
4460 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4461
4462 if batch.requests.is_empty() {
4463 if args.json {
4464 println!("{{\"frame_ids\": [], \"count\": 0}}");
4465 } else {
4466 println!("No requests to process");
4467 }
4468 return Ok(());
4469 }
4470
4471 let count = batch.requests.len();
4472 if !args.json {
4473 eprintln!("Processing {} documents...", count);
4474 }
4475
4476 let inputs: Vec<ParallelInput> = batch
4478 .requests
4479 .into_iter()
4480 .enumerate()
4481 .map(|(i, req)| {
4482 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4483 let mut options = PutOptions::default();
4484 options.title = Some(req.title);
4485 options.uri = Some(uri);
4486 options.tags = req.tags;
4487 options.labels = req.labels;
4488 options.extra_metadata = req.extra_metadata;
4489 if !req.metadata.is_null() {
4490 match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4491 Ok(meta) => options.metadata = Some(meta),
4492 Err(_) => {
4493 options
4494 .extra_metadata
4495 .entry("memvid.metadata.json".to_string())
4496 .or_insert_with(|| req.metadata.to_string());
4497 }
4498 }
4499 }
4500
4501 if let Some(embedding) = req
4502 .embedding
4503 .as_ref()
4504 .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4505 {
4506 options
4507 .extra_metadata
4508 .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4509 .or_insert_with(|| embedding.len().to_string());
4510 }
4511
4512 ParallelInput {
4513 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4514 options,
4515 embedding: req.embedding,
4516 chunk_embeddings: req.chunk_embeddings,
4517 }
4518 })
4519 .collect();
4520
4521 let build_opts = BuildOpts {
4523 zstd_level: args.compression_level.clamp(1, 9),
4524 ..BuildOpts::default()
4525 };
4526
4527 let frame_ids = mem
4529 .put_parallel_inputs(&inputs, build_opts)
4530 .context("Failed to ingest batch")?;
4531
4532 if args.json {
4533 let output = serde_json::json!({
4534 "frame_ids": frame_ids,
4535 "count": frame_ids.len()
4536 });
4537 println!("{}", serde_json::to_string(&output)?);
4538 } else {
4539 println!("Ingested {} documents", frame_ids.len());
4540 if frame_ids.len() <= 10 {
4541 for fid in &frame_ids {
4542 println!(" frame_id: {}", fid);
4543 }
4544 } else {
4545 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4546 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4547 }
4548 }
4549
4550 Ok(())
4551}