1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34};
35#[cfg(feature = "clip")]
36use memvid_core::FrameRole;
37#[cfg(feature = "parallel_segments")]
38use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
39use pdf_extract::OutputError as PdfExtractError;
40use serde_json::json;
41use tracing::{info, warn};
42use tree_magic_mini::from_u8 as magic_from_u8;
43use uuid::Uuid;
44
45const MAX_SEARCH_TEXT_LEN: usize = 32_768;
46const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
49const COLOR_PALETTE_SIZE: u8 = 5;
50const COLOR_PALETTE_QUALITY: u8 = 8;
51const MAGIC_SNIFF_BYTES: usize = 16;
52#[cfg(feature = "clip")]
55const CLIP_PDF_MAX_IMAGES: usize = 100;
56#[cfg(feature = "clip")]
57const CLIP_PDF_TARGET_PX: u32 = 768;
58
59fn truncate_for_embedding(text: &str) -> String {
62 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
63 text.to_string()
64 } else {
65 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
67 let end = truncated
69 .char_indices()
70 .rev()
71 .next()
72 .map(|(i, c)| i + c.len_utf8())
73 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
74 text[..end].to_string()
75 }
76}
77
78#[cfg(feature = "llama-cpp")]
81fn get_local_contextual_model_path() -> Result<PathBuf> {
82 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
83 let expanded = if custom.starts_with("~/") {
85 if let Ok(home) = env::var("HOME") {
86 PathBuf::from(home).join(&custom[2..])
87 } else {
88 PathBuf::from(&custom)
89 }
90 } else {
91 PathBuf::from(&custom)
92 };
93 expanded
94 } else {
95 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
97 PathBuf::from(home).join(".memvid").join("models")
98 };
99
100 let model_path = models_dir
101 .join("llm")
102 .join("phi-3-mini-q4")
103 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
104
105 if model_path.exists() {
106 Ok(model_path)
107 } else {
108 Err(anyhow!(
109 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
110 model_path.display()
111 ))
112 }
113}
114
115use pathdiff::diff_paths;
116
117use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
118use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
119use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
120use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
121use crate::error::{CapacityExceededMessage, DuplicateUriError};
122use crate::utils::{
123 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
124 select_frame,
125};
126#[cfg(feature = "temporal_enrich")]
127use memvid_core::enrich_chunks as temporal_enrich_chunks;
128
129fn parse_bool_flag(value: &str) -> Option<bool> {
131 match value.trim().to_ascii_lowercase().as_str() {
132 "1" | "true" | "yes" | "on" => Some(true),
133 "0" | "false" | "no" | "off" => Some(false),
134 _ => None,
135 }
136}
137
138#[cfg(feature = "parallel_segments")]
140fn parallel_env_override() -> Option<bool> {
141 std::env::var("MEMVID_PARALLEL_SEGMENTS")
142 .ok()
143 .and_then(|value| parse_bool_flag(&value))
144}
145
146#[cfg(feature = "clip")]
148fn is_clip_model_installed() -> bool {
149 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
150 PathBuf::from(custom)
151 } else if let Ok(home) = env::var("HOME") {
152 PathBuf::from(home).join(".memvid/models")
153 } else {
154 PathBuf::from(".memvid/models")
155 };
156
157 let model_name =
158 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
159
160 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
161 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
162
163 vision_model.exists() && text_model.exists()
164}
165
166#[cfg(feature = "logic_mesh")]
169const NER_MIN_CONFIDENCE: f32 = 0.50;
170
171#[cfg(feature = "logic_mesh")]
173const NER_MIN_ENTITY_LEN: usize = 2;
174
175#[cfg(feature = "logic_mesh")]
178const MAX_MERGE_GAP: usize = 3;
179
180#[cfg(feature = "logic_mesh")]
187fn merge_adjacent_entities(
188 entities: Vec<memvid_core::ExtractedEntity>,
189 original_text: &str,
190) -> Vec<memvid_core::ExtractedEntity> {
191 use memvid_core::ExtractedEntity;
192
193 if entities.is_empty() {
194 return entities;
195 }
196
197 let mut sorted: Vec<ExtractedEntity> = entities;
199 sorted.sort_by_key(|e| e.byte_start);
200
201 let mut merged: Vec<ExtractedEntity> = Vec::new();
202
203 for entity in sorted {
204 if let Some(last) = merged.last_mut() {
205 let gap = entity.byte_start.saturating_sub(last.byte_end);
207 let same_type = last.entity_type == entity.entity_type;
208
209 let gap_is_mergeable = if gap == 0 {
211 true
212 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
213 let gap_text = &original_text[last.byte_end..entity.byte_start];
215 if gap_text.contains('\n') || gap_text.contains('\r') {
217 false
218 } else {
219 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
220 }
221 } else {
222 false
223 };
224
225 if same_type && gap_is_mergeable {
226 let merged_text = if entity.byte_end <= original_text.len() {
228 original_text[last.byte_start..entity.byte_end].to_string()
229 } else {
230 format!("{}{}", last.text, entity.text)
231 };
232
233 tracing::debug!(
234 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
235 last.text, entity.text, merged_text, gap
236 );
237
238 last.text = merged_text;
239 last.byte_end = entity.byte_end;
240 last.confidence = (last.confidence + entity.confidence) / 2.0;
242 continue;
243 }
244 }
245
246 merged.push(entity);
247 }
248
249 merged
250}
251
252#[cfg(feature = "logic_mesh")]
255fn is_valid_entity(text: &str, confidence: f32) -> bool {
256 if confidence < NER_MIN_CONFIDENCE {
258 return false;
259 }
260
261 let trimmed = text.trim();
262
263 if trimmed.len() < NER_MIN_ENTITY_LEN {
265 return false;
266 }
267
268 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
270 return false;
271 }
272
273 if trimmed.contains('\n') || trimmed.contains('\r') {
275 return false;
276 }
277
278 if trimmed.starts_with("##") || trimmed.ends_with("##") {
280 return false;
281 }
282
283 if trimmed.ends_with('.') {
285 return false;
286 }
287
288 let lower = trimmed.to_lowercase();
289
290 if trimmed.len() == 1 {
292 return false;
293 }
294
295 if trimmed.len() == 2 {
297 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
299 return true;
300 }
301 return false;
303 }
304
305 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
307 return false;
308 }
309
310 let words: Vec<&str> = trimmed.split_whitespace().collect();
313 if words.len() >= 2 {
314 let first_word = words[0];
315 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
317 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
319 return false;
320 }
321 }
322 }
323
324 if let Some(first_char) = trimmed.chars().next() {
326 if first_char.is_lowercase() {
327 return false;
328 }
329 }
330
331 true
332}
333
334pub fn parse_key_val(s: &str) -> Result<(String, String)> {
336 let (key, value) = s
337 .split_once('=')
338 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
339 Ok((key.to_string(), value.to_string()))
340}
341
342#[derive(Args, Clone, Copy, Debug)]
344pub struct LockCliArgs {
345 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
347 pub lock_timeout: u64,
348 #[arg(long = "force", action = ArgAction::SetTrue)]
350 pub force: bool,
351}
352
353#[derive(Args)]
355pub struct PutArgs {
356 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
358 pub file: PathBuf,
359 #[arg(long)]
361 pub json: bool,
362 #[arg(long, value_name = "PATH")]
364 pub input: Option<String>,
365 #[arg(long, value_name = "URI")]
367 pub uri: Option<String>,
368 #[arg(long, value_name = "TITLE")]
370 pub title: Option<String>,
371 #[arg(long)]
373 pub timestamp: Option<i64>,
374 #[arg(long)]
376 pub track: Option<String>,
377 #[arg(long)]
379 pub kind: Option<String>,
380 #[arg(long, action = ArgAction::SetTrue)]
382 pub video: bool,
383 #[arg(long, action = ArgAction::SetTrue)]
385 pub transcript: bool,
386 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
388 pub tags: Vec<(String, String)>,
389 #[arg(long = "label", value_name = "LABEL")]
391 pub labels: Vec<String>,
392 #[arg(long = "metadata", value_name = "JSON")]
394 pub metadata: Vec<String>,
395 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
397 pub no_auto_tag: bool,
398 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
400 pub no_extract_dates: bool,
401 #[arg(long, action = ArgAction::SetTrue)]
403 pub audio: bool,
404 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
406 pub audio_segment_seconds: Option<u32>,
407 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
410 pub embedding: bool,
411 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
413 pub no_embedding: bool,
414 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
417 pub embedding_vec: Option<PathBuf>,
418 #[arg(long, action = ArgAction::SetTrue)]
422 pub vector_compression: bool,
423 #[arg(long, action = ArgAction::SetTrue)]
427 pub contextual: bool,
428 #[arg(long = "contextual-model", value_name = "MODEL")]
430 pub contextual_model: Option<String>,
431 #[arg(long, action = ArgAction::SetTrue)]
434 pub tables: bool,
435 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
437 pub no_tables: bool,
438 #[cfg(feature = "clip")]
442 #[arg(long, action = ArgAction::SetTrue)]
443 pub clip: bool,
444
445 #[cfg(feature = "clip")]
447 #[arg(long, action = ArgAction::SetTrue)]
448 pub no_clip: bool,
449
450 #[arg(long, conflicts_with = "allow_duplicate")]
452 pub update_existing: bool,
453 #[arg(long)]
455 pub allow_duplicate: bool,
456
457 #[arg(long, action = ArgAction::SetTrue)]
461 pub temporal_enrich: bool,
462
463 #[arg(long, action = ArgAction::SetTrue)]
468 pub logic_mesh: bool,
469
470 #[arg(long, action = ArgAction::SetTrue)]
472 pub no_logic_mesh: bool,
473
474 #[cfg(feature = "parallel_segments")]
476 #[arg(long, action = ArgAction::SetTrue)]
477 pub parallel_segments: bool,
478 #[cfg(feature = "parallel_segments")]
480 #[arg(long, action = ArgAction::SetTrue)]
481 pub no_parallel_segments: bool,
482 #[cfg(feature = "parallel_segments")]
484 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
485 pub parallel_segment_tokens: Option<usize>,
486 #[cfg(feature = "parallel_segments")]
488 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
489 pub parallel_segment_pages: Option<usize>,
490 #[cfg(feature = "parallel_segments")]
492 #[arg(long = "parallel-threads", value_name = "N")]
493 pub parallel_threads: Option<usize>,
494 #[cfg(feature = "parallel_segments")]
496 #[arg(long = "parallel-queue-depth", value_name = "N")]
497 pub parallel_queue_depth: Option<usize>,
498
499 #[command(flatten)]
500 pub lock: LockCliArgs,
501}
502
503#[cfg(feature = "parallel_segments")]
504impl PutArgs {
505 pub fn wants_parallel(&self) -> bool {
506 if self.parallel_segments {
507 return true;
508 }
509 if self.no_parallel_segments {
510 return false;
511 }
512 if let Some(env_flag) = parallel_env_override() {
513 return env_flag;
514 }
515 false
516 }
517
518 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
519 let mut opts = memvid_core::BuildOpts::default();
520 if let Some(tokens) = self.parallel_segment_tokens {
521 opts.segment_tokens = tokens;
522 }
523 if let Some(pages) = self.parallel_segment_pages {
524 opts.segment_pages = pages;
525 }
526 if let Some(threads) = self.parallel_threads {
527 opts.threads = threads;
528 }
529 if let Some(depth) = self.parallel_queue_depth {
530 opts.queue_depth = depth;
531 }
532 opts.sanitize();
533 opts
534 }
535}
536
537#[cfg(not(feature = "parallel_segments"))]
538impl PutArgs {
539 pub fn wants_parallel(&self) -> bool {
540 false
541 }
542}
543
544#[derive(Args)]
546pub struct ApiFetchArgs {
547 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
549 pub file: PathBuf,
550 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
552 pub config: PathBuf,
553 #[arg(long, action = ArgAction::SetTrue)]
555 pub dry_run: bool,
556 #[arg(long, value_name = "MODE")]
558 pub mode: Option<ApiFetchMode>,
559 #[arg(long, value_name = "URI")]
561 pub uri: Option<String>,
562 #[arg(long, action = ArgAction::SetTrue)]
564 pub json: bool,
565
566 #[command(flatten)]
567 pub lock: LockCliArgs,
568}
569
570#[derive(Args)]
572pub struct UpdateArgs {
573 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
574 pub file: PathBuf,
575 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
576 pub frame_id: Option<u64>,
577 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
578 pub uri: Option<String>,
579 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
580 pub input: Option<PathBuf>,
581 #[arg(long = "set-uri", value_name = "URI")]
582 pub set_uri: Option<String>,
583 #[arg(long, value_name = "TITLE")]
584 pub title: Option<String>,
585 #[arg(long, value_name = "TIMESTAMP")]
586 pub timestamp: Option<i64>,
587 #[arg(long, value_name = "TRACK")]
588 pub track: Option<String>,
589 #[arg(long, value_name = "KIND")]
590 pub kind: Option<String>,
591 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
592 pub tags: Vec<(String, String)>,
593 #[arg(long = "label", value_name = "LABEL")]
594 pub labels: Vec<String>,
595 #[arg(long = "metadata", value_name = "JSON")]
596 pub metadata: Vec<String>,
597 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
598 pub embeddings: bool,
599 #[arg(long)]
600 pub json: bool,
601
602 #[command(flatten)]
603 pub lock: LockCliArgs,
604}
605
606#[derive(Args)]
608pub struct DeleteArgs {
609 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
610 pub file: PathBuf,
611 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
612 pub frame_id: Option<u64>,
613 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
614 pub uri: Option<String>,
615 #[arg(long, action = ArgAction::SetTrue)]
616 pub yes: bool,
617 #[arg(long)]
618 pub json: bool,
619
620 #[command(flatten)]
621 pub lock: LockCliArgs,
622}
623
624pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
626 let command = ApiFetchCommand {
627 file: args.file,
628 config_path: args.config,
629 dry_run: args.dry_run,
630 mode_override: args.mode,
631 uri_override: args.uri,
632 output_json: args.json,
633 lock_timeout_ms: args.lock.lock_timeout,
634 force_lock: args.lock.force,
635 };
636 run_api_fetch(config, command)
637}
638
639pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
641 let mut mem = Memvid::open(&args.file)?;
642 ensure_cli_mutation_allowed(&mem)?;
643 apply_lock_cli(&mut mem, &args.lock);
644 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
645
646 if !args.yes {
647 if let Some(uri) = &frame.uri {
648 println!("About to delete frame {} ({uri})", frame.id);
649 } else {
650 println!("About to delete frame {}", frame.id);
651 }
652 print!("Confirm? [y/N]: ");
653 io::stdout().flush()?;
654 let mut input = String::new();
655 io::stdin().read_line(&mut input)?;
656 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
657 if !confirmed {
658 println!("Aborted");
659 return Ok(());
660 }
661 }
662
663 let seq = mem.delete_frame(frame.id)?;
664 mem.commit()?;
665 let deleted = mem.frame_by_id(frame.id)?;
666
667 if args.json {
668 let json = json!({
669 "frame_id": frame.id,
670 "sequence": seq,
671 "status": frame_status_str(deleted.status),
672 });
673 println!("{}", serde_json::to_string_pretty(&json)?);
674 } else {
675 println!("Deleted frame {} (seq {})", frame.id, seq);
676 }
677
678 Ok(())
679}
680pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
681 let mut mem = Memvid::open(&args.file)?;
682 ensure_cli_mutation_allowed(&mem)?;
683 apply_lock_cli(&mut mem, &args.lock);
684 let mut stats = mem.stats()?;
685 if stats.frame_count == 0 && !stats.has_lex_index {
686 mem.enable_lex()?;
687 stats = mem.stats()?;
688 }
689
690 if args.vector_compression {
692 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
693 }
694
695 let mut capacity_guard = CapacityGuard::from_stats(&stats);
696 let use_parallel = args.wants_parallel();
697 #[cfg(feature = "parallel_segments")]
698 let mut parallel_opts = if use_parallel {
699 Some(args.sanitized_parallel_opts())
700 } else {
701 None
702 };
703 #[cfg(feature = "parallel_segments")]
704 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
705 #[cfg(feature = "parallel_segments")]
706 let mut pending_parallel_indices: Vec<usize> = Vec::new();
707 let input_set = resolve_inputs(args.input.as_deref())?;
708
709 if args.video {
710 if let Some(kind) = &args.kind {
711 if !kind.eq_ignore_ascii_case("video") {
712 anyhow::bail!("--video conflicts with explicit --kind={kind}");
713 }
714 }
715 if matches!(input_set, InputSet::Stdin) {
716 anyhow::bail!("--video requires --input <file>");
717 }
718 }
719
720 #[allow(dead_code)]
721 enum EmbeddingMode {
722 None,
723 Auto,
724 Explicit,
725 }
726
727 let mv2_dimension = mem.vec_index_dimension();
731 let (embedding_mode, embedding_runtime) = if args.embedding {
732 (
733 EmbeddingMode::Explicit,
734 Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
735 )
736 } else {
737 (EmbeddingMode::None, None)
738 };
739 let embedding_enabled = embedding_runtime.is_some();
740 let runtime_ref = embedding_runtime.as_ref();
741
742 #[cfg(feature = "clip")]
747 let clip_enabled = {
748 if args.no_clip {
749 false
750 } else if args.clip {
751 true } else {
753 let model_available = is_clip_model_installed();
755 if model_available {
756 match &input_set {
758 InputSet::Files(paths) => paths.iter().any(|p| {
759 is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
760 }),
761 InputSet::Stdin => false,
762 }
763 } else {
764 false
765 }
766 }
767 };
768 #[cfg(feature = "clip")]
769 let clip_model = if clip_enabled {
770 mem.enable_clip()?;
771 if !args.json {
772 let mode = if args.clip {
773 "explicit"
774 } else {
775 "auto-detected"
776 };
777 eprintln!("ℹ️ CLIP visual embeddings enabled ({mode})");
778 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
779 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
780 eprintln!(" Use --no-clip to disable auto-detection");
781 eprintln!();
782 }
783 match memvid_core::ClipModel::default_model() {
785 Ok(model) => Some(model),
786 Err(e) => {
787 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
788 None
789 }
790 }
791 } else {
792 None
793 };
794
795 if embedding_enabled && !args.json {
797 let capacity = stats.capacity_bytes;
798 if args.vector_compression {
799 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
802 eprintln!(" Storage: ~20 KB per document (16x compressed)");
803 eprintln!(" Accuracy: ~95% recall@10 maintained");
804 eprintln!(
805 " Capacity: ~{} documents max for {:.1} GB",
806 max_docs,
807 capacity as f64 / 1e9
808 );
809 eprintln!();
810 } else {
811 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
813 eprintln!(" Storage: ~270 KB per document");
814 eprintln!(
815 " Capacity: ~{} documents max for {:.1} GB",
816 max_docs,
817 capacity as f64 / 1e9
818 );
819 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
820 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
821 eprintln!();
822 }
823 }
824
825 let embed_progress = if embedding_enabled {
826 let total = match &input_set {
827 InputSet::Files(paths) => Some(paths.len() as u64),
828 InputSet::Stdin => None,
829 };
830 Some(create_task_progress_bar(total, "embed", false))
831 } else {
832 None
833 };
834 let mut embedded_docs = 0usize;
835
836 if let InputSet::Files(paths) = &input_set {
837 if paths.len() > 1 {
838 if args.uri.is_some() || args.title.is_some() {
839 anyhow::bail!(
840 "--uri/--title apply to a single file; omit them when using a directory or glob"
841 );
842 }
843 }
844 }
845
846 let mut reports = Vec::new();
847 let mut processed = 0usize;
848 let mut skipped = 0usize;
849 let mut bytes_added = 0u64;
850 let mut summary_warnings: Vec<String> = Vec::new();
851 #[cfg(feature = "clip")]
852 let mut clip_embeddings_added = false;
853
854 let mut capacity_reached = false;
855 match input_set {
856 InputSet::Stdin => {
857 processed += 1;
858 match read_payload(None) {
859 Ok(payload) => {
860 let mut analyze_spinner = if args.json {
861 None
862 } else {
863 Some(create_spinner("Analyzing stdin payload..."))
864 };
865 match ingest_payload(
866 &mut mem,
867 &args,
868 payload,
869 None,
870 capacity_guard.as_mut(),
871 embedding_enabled,
872 runtime_ref,
873 use_parallel,
874 ) {
875 Ok(outcome) => {
876 if let Some(pb) = analyze_spinner.take() {
877 pb.finish_and_clear();
878 }
879 bytes_added += outcome.report.stored_bytes as u64;
880 if args.json {
881 summary_warnings
882 .extend(outcome.report.extraction.warnings.iter().cloned());
883 }
884 reports.push(outcome.report);
885 let report_index = reports.len() - 1;
886 if !args.json && !use_parallel {
887 if let Some(report) = reports.get(report_index) {
888 print_report(report);
889 }
890 }
891 if args.transcript {
892 let notice = transcript_notice_message(&mut mem)?;
893 if args.json {
894 summary_warnings.push(notice);
895 } else {
896 println!("{notice}");
897 }
898 }
899 if outcome.embedded {
900 if let Some(pb) = embed_progress.as_ref() {
901 pb.inc(1);
902 }
903 embedded_docs += 1;
904 }
905 if use_parallel {
906 #[cfg(feature = "parallel_segments")]
907 if let Some(input) = outcome.parallel_input {
908 pending_parallel_indices.push(report_index);
909 pending_parallel_inputs.push(input);
910 }
911 } else if let Some(guard) = capacity_guard.as_mut() {
912 mem.commit()?;
913 let stats = mem.stats()?;
914 guard.update_after_commit(&stats);
915 guard.check_and_warn_capacity(); }
917 }
918 Err(err) => {
919 if let Some(pb) = analyze_spinner.take() {
920 pb.finish_and_clear();
921 }
922 if err.downcast_ref::<DuplicateUriError>().is_some() {
923 return Err(err);
924 }
925 warn!("skipped stdin payload: {err}");
926 skipped += 1;
927 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
928 capacity_reached = true;
929 }
930 }
931 }
932 }
933 Err(err) => {
934 warn!("failed to read stdin: {err}");
935 skipped += 1;
936 }
937 }
938 }
939 InputSet::Files(ref paths) => {
940 let mut transcript_notice_printed = false;
941 for path in paths {
942 processed += 1;
943 match read_payload(Some(&path)) {
944 Ok(payload) => {
945 let label = path.display().to_string();
946 let mut analyze_spinner = if args.json {
947 None
948 } else {
949 Some(create_spinner(&format!("Analyzing {label}...")))
950 };
951 match ingest_payload(
952 &mut mem,
953 &args,
954 payload,
955 Some(&path),
956 capacity_guard.as_mut(),
957 embedding_enabled,
958 runtime_ref,
959 use_parallel,
960 ) {
961 Ok(outcome) => {
962 if let Some(pb) = analyze_spinner.take() {
963 pb.finish_and_clear();
964 }
965 bytes_added += outcome.report.stored_bytes as u64;
966
967 #[cfg(feature = "clip")]
969 if let Some(ref model) = clip_model {
970 let mime = outcome.report.mime.as_deref();
971 let clip_frame_id = outcome.report.frame_id;
972
973 if is_image_file(&path) {
974 match model.encode_image_file(&path) {
975 Ok(embedding) => {
976 if let Err(e) = mem.add_clip_embedding_with_page(
977 clip_frame_id,
978 None,
979 embedding,
980 ) {
981 warn!(
982 "Failed to add CLIP embedding for {}: {e}",
983 path.display()
984 );
985 } else {
986 info!(
987 "Added CLIP embedding for frame {}",
988 clip_frame_id
989 );
990 clip_embeddings_added = true;
991 }
992 }
993 Err(e) => {
994 warn!(
995 "Failed to encode CLIP embedding for {}: {e}",
996 path.display()
997 );
998 }
999 }
1000 } else if matches!(mime, Some(m) if m == "application/pdf") {
1001 if let Err(e) = mem.commit() {
1003 warn!("Failed to commit before CLIP extraction: {e}");
1004 }
1005
1006 match render_pdf_pages_for_clip(
1007 &path,
1008 CLIP_PDF_MAX_IMAGES,
1009 CLIP_PDF_TARGET_PX,
1010 ) {
1011 Ok(rendered_pages) => {
1012 use rayon::prelude::*;
1013
1014 let path_for_closure = path.clone();
1017
1018 let prev_hook = std::panic::take_hook();
1021 std::panic::set_hook(Box::new(|_| {
1022 }));
1024
1025 let encoded_images: Vec<_> = rendered_pages
1026 .into_par_iter()
1027 .filter_map(|(page_num, image)| {
1028 let image_info = get_image_info(&image);
1030 if !image_info.should_embed() {
1031 info!(
1032 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1033 path_for_closure.display(),
1034 page_num,
1035 image_info.color_variance,
1036 image_info.width,
1037 image_info.height
1038 );
1039 return None;
1040 }
1041
1042 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1046 let rgb_image = image.to_rgb8();
1047 let mut png_bytes = Vec::new();
1048 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1049 &mut Cursor::new(&mut png_bytes),
1050 image::ImageFormat::Png,
1051 ).map(|()| png_bytes)
1052 }));
1053
1054 match encode_result {
1055 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1056 Ok(Err(e)) => {
1057 info!(
1058 "Skipping image for {} page {}: {e}",
1059 path_for_closure.display(),
1060 page_num
1061 );
1062 None
1063 }
1064 Err(_) => {
1065 info!(
1067 "Skipping malformed image for {} page {}",
1068 path_for_closure.display(),
1069 page_num
1070 );
1071 None
1072 }
1073 }
1074 })
1075 .collect();
1076
1077 std::panic::set_hook(prev_hook);
1079
1080 let mut child_frame_offset = 1u64;
1083 let parent_uri = outcome.report.uri.clone();
1084 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1085 let total_images = encoded_images.len();
1086
1087 let clip_pb = if !args.json && total_images > 0 {
1089 let pb = ProgressBar::new(total_images as u64);
1090 pb.set_style(
1091 ProgressStyle::with_template(
1092 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1093 )
1094 .unwrap_or_else(|_| ProgressStyle::default_bar())
1095 .progress_chars("█▓░"),
1096 );
1097 Some(pb)
1098 } else {
1099 None
1100 };
1101
1102 for (page_num, image, png_bytes) in encoded_images {
1103 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1104 let child_title = format!(
1105 "{} - Image {} (page {})",
1106 parent_title,
1107 child_frame_offset,
1108 page_num
1109 );
1110
1111 let child_options = PutOptions::builder()
1112 .uri(&child_uri)
1113 .title(&child_title)
1114 .parent_id(clip_frame_id)
1115 .role(FrameRole::ExtractedImage)
1116 .auto_tag(false)
1117 .extract_dates(false)
1118 .build();
1119
1120 match mem.put_bytes_with_options(&png_bytes, child_options) {
1121 Ok(_child_seq) => {
1122 let child_frame_id = clip_frame_id + child_frame_offset;
1123 child_frame_offset += 1;
1124
1125 match model.encode_image(&image) {
1126 Ok(embedding) => {
1127 if let Err(e) = mem
1128 .add_clip_embedding_with_page(
1129 child_frame_id,
1130 Some(page_num),
1131 embedding,
1132 )
1133 {
1134 warn!(
1135 "Failed to add CLIP embedding for {} page {}: {e}",
1136 path.display(),
1137 page_num
1138 );
1139 } else {
1140 info!(
1141 "Added CLIP image frame {} for {} page {}",
1142 child_frame_id,
1143 clip_frame_id,
1144 page_num
1145 );
1146 clip_embeddings_added = true;
1147 }
1148 }
1149 Err(e) => warn!(
1150 "Failed to encode CLIP embedding for {} page {}: {e}",
1151 path.display(),
1152 page_num
1153 ),
1154 }
1155
1156 if let Err(e) = mem.commit() {
1158 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1159 }
1160 }
1161 Err(e) => warn!(
1162 "Failed to store image frame for {} page {}: {e}",
1163 path.display(),
1164 page_num
1165 ),
1166 }
1167
1168 if let Some(ref pb) = clip_pb {
1169 pb.inc(1);
1170 }
1171 }
1172
1173 if let Some(pb) = clip_pb {
1174 pb.finish_and_clear();
1175 }
1176 }
1177 Err(err) => warn!(
1178 "Failed to render PDF pages for CLIP {}: {err}",
1179 path.display()
1180 ),
1181 }
1182 }
1183 }
1184
1185 if args.json {
1186 summary_warnings
1187 .extend(outcome.report.extraction.warnings.iter().cloned());
1188 }
1189 reports.push(outcome.report);
1190 let report_index = reports.len() - 1;
1191 if !args.json && !use_parallel {
1192 if let Some(report) = reports.get(report_index) {
1193 print_report(report);
1194 }
1195 }
1196 if args.transcript && !transcript_notice_printed {
1197 let notice = transcript_notice_message(&mut mem)?;
1198 if args.json {
1199 summary_warnings.push(notice);
1200 } else {
1201 println!("{notice}");
1202 }
1203 transcript_notice_printed = true;
1204 }
1205 if outcome.embedded {
1206 if let Some(pb) = embed_progress.as_ref() {
1207 pb.inc(1);
1208 }
1209 embedded_docs += 1;
1210 }
1211 if use_parallel {
1212 #[cfg(feature = "parallel_segments")]
1213 if let Some(input) = outcome.parallel_input {
1214 pending_parallel_indices.push(report_index);
1215 pending_parallel_inputs.push(input);
1216 }
1217 } else if let Some(guard) = capacity_guard.as_mut() {
1218 mem.commit()?;
1219 let stats = mem.stats()?;
1220 guard.update_after_commit(&stats);
1221 guard.check_and_warn_capacity(); }
1223 }
1224 Err(err) => {
1225 if let Some(pb) = analyze_spinner.take() {
1226 pb.finish_and_clear();
1227 }
1228 if err.downcast_ref::<DuplicateUriError>().is_some() {
1229 return Err(err);
1230 }
1231 warn!("skipped {}: {err}", path.display());
1232 skipped += 1;
1233 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1234 capacity_reached = true;
1235 }
1236 }
1237 }
1238 }
1239 Err(err) => {
1240 warn!("failed to read {}: {err}", path.display());
1241 skipped += 1;
1242 }
1243 }
1244 if capacity_reached {
1245 break;
1246 }
1247 }
1248 }
1249 }
1250
1251 if capacity_reached {
1252 warn!("capacity limit reached; remaining inputs were not processed");
1253 }
1254
1255 if let Some(pb) = embed_progress.as_ref() {
1256 pb.finish_with_message("embed");
1257 }
1258
1259 if let Some(runtime) = embedding_runtime.as_ref() {
1260 if embedded_docs > 0 {
1261 match embedding_mode {
1262 EmbeddingMode::Explicit => println!(
1263 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1264 embedded_docs,
1265 runtime.dimension()
1266 ),
1267 EmbeddingMode::Auto => println!(
1268 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1269 embedded_docs,
1270 runtime.dimension()
1271 ),
1272 EmbeddingMode::None => {}
1273 }
1274 } else {
1275 match embedding_mode {
1276 EmbeddingMode::Explicit => {
1277 println!("No embeddings were generated (no textual content available)");
1278 }
1279 EmbeddingMode::Auto => {
1280 println!(
1281 "Semantic runtime available, but no textual content produced embeddings"
1282 );
1283 }
1284 EmbeddingMode::None => {}
1285 }
1286 }
1287 }
1288
1289 if reports.is_empty() {
1290 if args.json {
1291 if capacity_reached {
1292 summary_warnings.push("capacity_limit_reached".to_string());
1293 }
1294 let summary_json = json!({
1295 "processed": processed,
1296 "ingested": 0,
1297 "skipped": skipped,
1298 "bytes_added": bytes_added,
1299 "bytes_added_human": format_bytes(bytes_added),
1300 "embedded_documents": embedded_docs,
1301 "capacity_reached": capacity_reached,
1302 "warnings": summary_warnings,
1303 "reports": Vec::<serde_json::Value>::new(),
1304 });
1305 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1306 } else {
1307 println!(
1308 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1309 processed, skipped
1310 );
1311 }
1312 return Ok(());
1313 }
1314
1315 #[cfg(feature = "parallel_segments")]
1316 let mut parallel_flush_spinner = if use_parallel && !args.json {
1317 Some(create_spinner("Flushing parallel segments..."))
1318 } else {
1319 None
1320 };
1321
1322 if use_parallel {
1323 #[cfg(feature = "parallel_segments")]
1324 {
1325 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1326 let mut opts = BuildOpts::default();
1327 opts.sanitize();
1328 opts
1329 });
1330 opts.vec_compression = mem.vector_compression().clone();
1332 let seqs = if pending_parallel_inputs.is_empty() {
1333 mem.commit_parallel(opts)?;
1334 Vec::new()
1335 } else {
1336 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1337 };
1338 if let Some(pb) = parallel_flush_spinner.take() {
1339 pb.finish_with_message("Flushed parallel segments");
1340 }
1341 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1342 if let Some(report) = reports.get_mut(idx) {
1343 report.seq = seq;
1344 }
1345 }
1346 }
1347 #[cfg(not(feature = "parallel_segments"))]
1348 {
1349 mem.commit()?;
1350 }
1351 } else {
1352 mem.commit()?;
1353 }
1354
1355 #[cfg(feature = "clip")]
1357 if clip_embeddings_added {
1358 mem.commit()?;
1359 }
1360
1361 if !args.json && use_parallel {
1362 for report in &reports {
1363 print_report(report);
1364 }
1365 }
1366
1367 let mut tables_extracted = 0usize;
1369 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1370 if args.tables && !args.no_tables {
1371 if let InputSet::Files(ref paths) = input_set {
1372 let pdf_paths: Vec<_> = paths
1373 .iter()
1374 .filter(|p| {
1375 p.extension()
1376 .and_then(|e| e.to_str())
1377 .map(|e| e.eq_ignore_ascii_case("pdf"))
1378 .unwrap_or(false)
1379 })
1380 .collect();
1381
1382 if !pdf_paths.is_empty() {
1383 let table_spinner = if args.json {
1384 None
1385 } else {
1386 Some(create_spinner(&format!(
1387 "Extracting tables from {} PDF(s)...",
1388 pdf_paths.len()
1389 )))
1390 };
1391
1392 let table_options = TableExtractionOptions::builder()
1394 .mode(memvid_core::table::ExtractionMode::Aggressive)
1395 .min_rows(2)
1396 .min_cols(2)
1397 .min_quality(memvid_core::table::TableQuality::Low)
1398 .merge_multi_page(true)
1399 .build();
1400
1401 for pdf_path in pdf_paths {
1402 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1403 let filename = pdf_path
1404 .file_name()
1405 .and_then(|s| s.to_str())
1406 .unwrap_or("unknown.pdf");
1407
1408 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1409 for table in &result.tables {
1410 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1411 {
1412 tables_extracted += 1;
1413 table_summaries.push(json!({
1414 "table_id": table.table_id,
1415 "source_file": table.source_file,
1416 "meta_frame_id": meta_id,
1417 "rows": table.n_rows,
1418 "cols": table.n_cols,
1419 "quality": format!("{:?}", table.quality),
1420 "pages": format!("{}-{}", table.page_start, table.page_end),
1421 }));
1422 }
1423 }
1424 }
1425 }
1426 }
1427
1428 if let Some(pb) = table_spinner {
1429 if tables_extracted > 0 {
1430 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1431 } else {
1432 pb.finish_with_message("No tables found");
1433 }
1434 }
1435
1436 if tables_extracted > 0 {
1438 mem.commit()?;
1439 }
1440 }
1441 }
1442 }
1443
1444 #[cfg(feature = "logic_mesh")]
1447 let mut logic_mesh_entities = 0usize;
1448 #[cfg(feature = "logic_mesh")]
1449 {
1450 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1451
1452 let models_dir = config.models_dir.clone();
1453 let model_path = ner_model_path(&models_dir);
1454 let tokenizer_path = ner_tokenizer_path(&models_dir);
1455
1456 let ner_available = model_path.exists() && tokenizer_path.exists();
1458 let should_run_ner = if args.no_logic_mesh {
1459 false
1460 } else if args.logic_mesh {
1461 true } else {
1463 ner_available };
1465
1466 if should_run_ner && !ner_available {
1467 if args.json {
1469 summary_warnings.push(
1470 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1471 );
1472 } else {
1473 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1474 eprintln!(" Run: memvid models install --ner distilbert-ner");
1475 }
1476 } else if should_run_ner {
1477 let ner_spinner = if args.json {
1478 None
1479 } else {
1480 Some(create_spinner("Building Logic-Mesh entity graph..."))
1481 };
1482
1483 match NerModel::load(&model_path, &tokenizer_path, None) {
1484 Ok(mut ner_model) => {
1485 let mut mesh = LogicMesh::new();
1486 let mut filtered_count = 0usize;
1487
1488 for report in &reports {
1490 let frame_id = report.frame_id;
1491 if let Some(ref content) = report.search_text {
1493 if !content.is_empty() {
1494 match ner_model.extract(content) {
1495 Ok(raw_entities) => {
1496 let merged_entities = merge_adjacent_entities(raw_entities, content);
1499
1500 for entity in &merged_entities {
1501 if !is_valid_entity(&entity.text, entity.confidence) {
1503 tracing::debug!(
1504 "Filtered entity: '{}' (confidence: {:.0}%)",
1505 entity.text, entity.confidence * 100.0
1506 );
1507 filtered_count += 1;
1508 continue;
1509 }
1510
1511 let kind = entity.to_entity_kind();
1513 let node = MeshNode::new(
1515 entity.text.to_lowercase(),
1516 entity.text.trim().to_string(),
1517 kind,
1518 entity.confidence,
1519 frame_id,
1520 entity.byte_start as u32,
1521 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1522 );
1523 mesh.merge_node(node);
1524 logic_mesh_entities += 1;
1525 }
1526 }
1527 Err(e) => {
1528 warn!("NER extraction failed for frame {frame_id}: {e}");
1529 }
1530 }
1531 }
1532 }
1533 }
1534
1535 if logic_mesh_entities > 0 {
1536 mesh.finalize();
1537 let node_count = mesh.stats().node_count;
1538 mem.set_logic_mesh(mesh);
1540 mem.commit()?;
1541 info!(
1542 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1543 logic_mesh_entities,
1544 node_count,
1545 filtered_count
1546 );
1547 }
1548
1549 if let Some(pb) = ner_spinner {
1550 pb.finish_with_message(format!(
1551 "Logic-Mesh: {} entities ({} unique)",
1552 logic_mesh_entities,
1553 mem.mesh_node_count()
1554 ));
1555 }
1556 }
1557 Err(e) => {
1558 if let Some(pb) = ner_spinner {
1559 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1560 }
1561 if args.json {
1562 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1563 }
1564 }
1565 }
1566 }
1567 }
1568
1569 let stats = mem.stats()?;
1570 if args.json {
1571 if capacity_reached {
1572 summary_warnings.push("capacity_limit_reached".to_string());
1573 }
1574 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1575 let total_duration_ms: Option<u64> = {
1576 let sum: u64 = reports
1577 .iter()
1578 .filter_map(|r| r.extraction.duration_ms)
1579 .sum();
1580 if sum > 0 {
1581 Some(sum)
1582 } else {
1583 None
1584 }
1585 };
1586 let total_pages: Option<u32> = {
1587 let sum: u32 = reports
1588 .iter()
1589 .filter_map(|r| r.extraction.pages_processed)
1590 .sum();
1591 if sum > 0 {
1592 Some(sum)
1593 } else {
1594 None
1595 }
1596 };
1597 let embedding_mode_str = match embedding_mode {
1598 EmbeddingMode::None => "none",
1599 EmbeddingMode::Auto => "auto",
1600 EmbeddingMode::Explicit => "explicit",
1601 };
1602 let summary_json = json!({
1603 "processed": processed,
1604 "ingested": reports.len(),
1605 "skipped": skipped,
1606 "bytes_added": bytes_added,
1607 "bytes_added_human": format_bytes(bytes_added),
1608 "embedded_documents": embedded_docs,
1609 "embedding": {
1610 "enabled": embedding_enabled,
1611 "mode": embedding_mode_str,
1612 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1613 },
1614 "tables": {
1615 "enabled": args.tables && !args.no_tables,
1616 "extracted": tables_extracted,
1617 "tables": table_summaries,
1618 },
1619 "capacity_reached": capacity_reached,
1620 "warnings": summary_warnings,
1621 "extraction": {
1622 "total_duration_ms": total_duration_ms,
1623 "total_pages_processed": total_pages,
1624 },
1625 "memory": {
1626 "path": args.file.display().to_string(),
1627 "frame_count": stats.frame_count,
1628 "size_bytes": stats.size_bytes,
1629 "tier": format!("{:?}", stats.tier),
1630 },
1631 "reports": reports_json,
1632 });
1633 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1634 } else {
1635 println!(
1636 "Committed {} frame(s); total frames {}",
1637 reports.len(),
1638 stats.frame_count
1639 );
1640 println!(
1641 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1642 processed,
1643 reports.len(),
1644 skipped,
1645 format_bytes(bytes_added)
1646 );
1647 if tables_extracted > 0 {
1648 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1649 }
1650 }
1651 Ok(())
1652}
1653
1654pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1655 let mut mem = Memvid::open(&args.file)?;
1656 ensure_cli_mutation_allowed(&mem)?;
1657 apply_lock_cli(&mut mem, &args.lock);
1658 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1659 let frame_id = existing.id;
1660
1661 let payload_bytes = match args.input.as_ref() {
1662 Some(path) => Some(read_payload(Some(path))?),
1663 None => None,
1664 };
1665 let payload_slice = payload_bytes.as_deref();
1666 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1667 let source_path = args.input.as_deref();
1668
1669 let mut options = PutOptions::default();
1670 options.enable_embedding = false;
1671 options.auto_tag = payload_slice.is_some();
1672 options.extract_dates = payload_slice.is_some();
1673 options.timestamp = Some(existing.timestamp);
1674 options.track = existing.track.clone();
1675 options.kind = existing.kind.clone();
1676 options.uri = existing.uri.clone();
1677 options.title = existing.title.clone();
1678 options.metadata = existing.metadata.clone();
1679 options.search_text = existing.search_text.clone();
1680 options.tags = existing.tags.clone();
1681 options.labels = existing.labels.clone();
1682 options.extra_metadata = existing.extra_metadata.clone();
1683
1684 if let Some(new_uri) = &args.set_uri {
1685 options.uri = Some(derive_uri(Some(new_uri), None));
1686 }
1687
1688 if options.uri.is_none() && payload_slice.is_some() {
1689 options.uri = Some(derive_uri(None, source_path));
1690 }
1691
1692 if let Some(title) = &args.title {
1693 options.title = Some(title.clone());
1694 }
1695 if let Some(ts) = args.timestamp {
1696 options.timestamp = Some(ts);
1697 }
1698 if let Some(track) = &args.track {
1699 options.track = Some(track.clone());
1700 }
1701 if let Some(kind) = &args.kind {
1702 options.kind = Some(kind.clone());
1703 }
1704
1705 if !args.labels.is_empty() {
1706 options.labels = args.labels.clone();
1707 }
1708
1709 if !args.tags.is_empty() {
1710 options.tags.clear();
1711 for (key, value) in &args.tags {
1712 if !key.is_empty() {
1713 options.tags.push(key.clone());
1714 }
1715 if !value.is_empty() && value != key {
1716 options.tags.push(value.clone());
1717 }
1718 options.extra_metadata.insert(key.clone(), value.clone());
1719 }
1720 }
1721
1722 apply_metadata_overrides(&mut options, &args.metadata);
1723
1724 let mut search_source = options.search_text.clone();
1725
1726 if let Some(payload) = payload_slice {
1727 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1728 if let Some(title) = inferred_title {
1729 options.title = Some(title);
1730 }
1731
1732 if options.uri.is_none() {
1733 options.uri = Some(derive_uri(None, source_path));
1734 }
1735
1736 let analysis = analyze_file(
1737 source_path,
1738 payload,
1739 payload_utf8,
1740 options.title.as_deref(),
1741 AudioAnalyzeOptions::default(),
1742 false,
1743 );
1744 if let Some(meta) = analysis.metadata {
1745 options.metadata = Some(meta);
1746 }
1747 if let Some(text) = analysis.search_text {
1748 search_source = Some(text.clone());
1749 options.search_text = Some(text);
1750 }
1751 } else {
1752 options.auto_tag = false;
1753 options.extract_dates = false;
1754 }
1755
1756 let stats = mem.stats()?;
1757 options.enable_embedding = stats.has_vec_index;
1758
1759 let mv2_dimension = mem.vec_index_dimension();
1761 let mut embedding: Option<Vec<f32>> = None;
1762 if args.embeddings {
1763 let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1764 if let Some(text) = search_source.as_ref() {
1765 if !text.trim().is_empty() {
1766 embedding = Some(runtime.embed(text)?);
1767 }
1768 }
1769 if embedding.is_none() {
1770 if let Some(text) = payload_utf8 {
1771 if !text.trim().is_empty() {
1772 embedding = Some(runtime.embed(text)?);
1773 }
1774 }
1775 }
1776 if embedding.is_none() {
1777 warn!("no textual content available; embeddings not recomputed");
1778 }
1779 }
1780
1781 let mut effective_embedding = embedding;
1782 if effective_embedding.is_none() && stats.has_vec_index {
1783 effective_embedding = mem.frame_embedding(frame_id)?;
1784 }
1785
1786 let final_uri = options.uri.clone();
1787 let replaced_payload = payload_slice.is_some();
1788 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1789 mem.commit()?;
1790
1791 let updated_frame =
1792 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1793 mem.frame_by_uri(&uri)?
1794 } else {
1795 let mut query = TimelineQueryBuilder::default();
1796 if let Some(limit) = NonZeroU64::new(1) {
1797 query = query.limit(limit).reverse(true);
1798 }
1799 let latest = mem.timeline(query.build())?;
1800 if let Some(entry) = latest.first() {
1801 mem.frame_by_id(entry.frame_id)?
1802 } else {
1803 mem.frame_by_id(frame_id)?
1804 }
1805 };
1806
1807 if args.json {
1808 let json = json!({
1809 "sequence": seq,
1810 "previous_frame_id": frame_id,
1811 "frame": frame_to_json(&updated_frame),
1812 "replaced_payload": replaced_payload,
1813 });
1814 println!("{}", serde_json::to_string_pretty(&json)?);
1815 } else {
1816 println!(
1817 "Updated frame {} → new frame {} (seq {})",
1818 frame_id, updated_frame.id, seq
1819 );
1820 println!(
1821 "Payload: {}",
1822 if replaced_payload {
1823 "replaced"
1824 } else {
1825 "reused"
1826 }
1827 );
1828 print_frame_summary(&mut mem, &updated_frame)?;
1829 }
1830
1831 Ok(())
1832}
1833
1834fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1835 if let Some(uri) = provided {
1836 let sanitized = sanitize_uri(uri, true);
1837 return format!("mv2://{}", sanitized);
1838 }
1839
1840 if let Some(path) = source {
1841 let raw = path.to_string_lossy();
1842 let sanitized = sanitize_uri(&raw, false);
1843 return format!("mv2://{}", sanitized);
1844 }
1845
1846 format!("mv2://frames/{}", Uuid::new_v4())
1847}
1848
1849pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1850 let digest = hash(payload).to_hex();
1851 let short = &digest[..32];
1852 let ext_from_path = source
1853 .and_then(|path| path.extension())
1854 .and_then(|ext| ext.to_str())
1855 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1856 .filter(|ext| !ext.is_empty());
1857 let ext = ext_from_path
1858 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1859 .unwrap_or_else(|| "bin".to_string());
1860 format!("mv2://video/{short}.{ext}")
1861}
1862
1863fn derive_title(
1864 provided: Option<String>,
1865 source: Option<&Path>,
1866 payload_utf8: Option<&str>,
1867) -> Option<String> {
1868 if let Some(title) = provided {
1869 let trimmed = title.trim();
1870 if trimmed.is_empty() {
1871 None
1872 } else {
1873 Some(trimmed.to_string())
1874 }
1875 } else {
1876 if let Some(text) = payload_utf8 {
1877 if let Some(markdown_title) = extract_markdown_title(text) {
1878 return Some(markdown_title);
1879 }
1880 }
1881 source
1882 .and_then(|path| path.file_stem())
1883 .and_then(|stem| stem.to_str())
1884 .map(to_title_case)
1885 }
1886}
1887
1888fn extract_markdown_title(text: &str) -> Option<String> {
1889 for line in text.lines() {
1890 let trimmed = line.trim();
1891 if trimmed.starts_with('#') {
1892 let title = trimmed.trim_start_matches('#').trim();
1893 if !title.is_empty() {
1894 return Some(title.to_string());
1895 }
1896 }
1897 }
1898 None
1899}
1900
1901fn to_title_case(input: &str) -> String {
1902 if input.is_empty() {
1903 return String::new();
1904 }
1905 let mut chars = input.chars();
1906 let Some(first) = chars.next() else {
1907 return String::new();
1908 };
1909 let prefix = first.to_uppercase().collect::<String>();
1910 prefix + chars.as_str()
1911}
1912
1913fn should_skip_input(path: &Path) -> bool {
1914 matches!(
1915 path.file_name().and_then(|name| name.to_str()),
1916 Some(".DS_Store")
1917 )
1918}
1919
1920fn should_skip_dir(path: &Path) -> bool {
1921 matches!(
1922 path.file_name().and_then(|name| name.to_str()),
1923 Some(name) if name.starts_with('.')
1924 )
1925}
1926
1927enum InputSet {
1928 Stdin,
1929 Files(Vec<PathBuf>),
1930}
1931
1932#[cfg(feature = "clip")]
1934fn is_image_file(path: &std::path::Path) -> bool {
1935 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
1936 return false;
1937 };
1938 matches!(
1939 ext.to_ascii_lowercase().as_str(),
1940 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
1941 )
1942}
1943
1944fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1945 let Some(raw) = input else {
1946 return Ok(InputSet::Stdin);
1947 };
1948
1949 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1950 let mut files = Vec::new();
1951 let mut matched_any = false;
1952 for entry in glob(raw)? {
1953 let path = entry?;
1954 if path.is_file() {
1955 matched_any = true;
1956 if should_skip_input(&path) {
1957 continue;
1958 }
1959 files.push(path);
1960 }
1961 }
1962 files.sort();
1963 if files.is_empty() {
1964 if matched_any {
1965 anyhow::bail!(
1966 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1967 );
1968 }
1969 anyhow::bail!("pattern '{raw}' did not match any files");
1970 }
1971 return Ok(InputSet::Files(files));
1972 }
1973
1974 let path = PathBuf::from(raw);
1975 if path.is_dir() {
1976 let (mut files, skipped_any) = collect_directory_files(&path)?;
1977 files.sort();
1978 if files.is_empty() {
1979 if skipped_any {
1980 anyhow::bail!(
1981 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1982 path.display()
1983 );
1984 }
1985 anyhow::bail!(
1986 "directory '{}' contains no ingestible files",
1987 path.display()
1988 );
1989 }
1990 Ok(InputSet::Files(files))
1991 } else {
1992 Ok(InputSet::Files(vec![path]))
1993 }
1994}
1995
1996fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1997 let mut files = Vec::new();
1998 let mut skipped_any = false;
1999 let mut stack = vec![root.to_path_buf()];
2000 while let Some(dir) = stack.pop() {
2001 for entry in fs::read_dir(&dir)? {
2002 let entry = entry?;
2003 let path = entry.path();
2004 let file_type = entry.file_type()?;
2005 if file_type.is_dir() {
2006 if should_skip_dir(&path) {
2007 skipped_any = true;
2008 continue;
2009 }
2010 stack.push(path);
2011 } else if file_type.is_file() {
2012 if should_skip_input(&path) {
2013 skipped_any = true;
2014 continue;
2015 }
2016 files.push(path);
2017 }
2018 }
2019 }
2020 Ok((files, skipped_any))
2021}
2022
2023struct IngestOutcome {
2024 report: PutReport,
2025 embedded: bool,
2026 #[cfg(feature = "parallel_segments")]
2027 parallel_input: Option<ParallelInput>,
2028}
2029
2030struct PutReport {
2031 seq: u64,
2032 #[allow(dead_code)] frame_id: u64,
2034 uri: String,
2035 title: Option<String>,
2036 original_bytes: usize,
2037 stored_bytes: usize,
2038 compressed: bool,
2039 source: Option<PathBuf>,
2040 mime: Option<String>,
2041 metadata: Option<DocMetadata>,
2042 extraction: ExtractionSummary,
2043 #[cfg(feature = "logic_mesh")]
2045 search_text: Option<String>,
2046}
2047
2048struct CapacityGuard {
2049 #[allow(dead_code)]
2050 tier: Tier,
2051 capacity: u64,
2052 current_size: u64,
2053}
2054
2055impl CapacityGuard {
2056 const MIN_OVERHEAD: u64 = 128 * 1024;
2057 const RELATIVE_OVERHEAD: f64 = 0.05;
2058
2059 fn from_stats(stats: &Stats) -> Option<Self> {
2060 if stats.capacity_bytes == 0 {
2061 return None;
2062 }
2063 Some(Self {
2064 tier: stats.tier,
2065 capacity: stats.capacity_bytes,
2066 current_size: stats.size_bytes,
2067 })
2068 }
2069
2070 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2071 let projected = self
2072 .current_size
2073 .saturating_add(additional)
2074 .saturating_add(Self::estimate_overhead(additional));
2075 if projected > self.capacity {
2076 return Err(map_put_error(
2077 MemvidError::CapacityExceeded {
2078 current: self.current_size,
2079 limit: self.capacity,
2080 required: projected,
2081 },
2082 Some(self.capacity),
2083 ));
2084 }
2085 Ok(())
2086 }
2087
2088 fn update_after_commit(&mut self, stats: &Stats) {
2089 self.current_size = stats.size_bytes;
2090 }
2091
2092 fn capacity_hint(&self) -> Option<u64> {
2093 Some(self.capacity)
2094 }
2095
2096 fn check_and_warn_capacity(&self) {
2098 if self.capacity == 0 {
2099 return;
2100 }
2101
2102 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2103
2104 if usage_pct >= 90.0 && usage_pct < 91.0 {
2106 eprintln!(
2107 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2108 format_bytes(self.current_size),
2109 format_bytes(self.capacity)
2110 );
2111 eprintln!(" Actions:");
2112 eprintln!(" 1. Recreate with larger --size");
2113 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2114 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2115 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2116 eprintln!(
2117 "⚠️ WARNING: 75% capacity used ({} / {})",
2118 format_bytes(self.current_size),
2119 format_bytes(self.capacity)
2120 );
2121 eprintln!(" Consider planning for capacity expansion soon");
2122 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2123 eprintln!(
2124 "ℹ️ INFO: 50% capacity used ({} / {})",
2125 format_bytes(self.current_size),
2126 format_bytes(self.capacity)
2127 );
2128 }
2129 }
2130
2131 fn estimate_overhead(additional: u64) -> u64 {
2132 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2133 fractional.max(Self::MIN_OVERHEAD)
2134 }
2135}
2136
2137#[derive(Debug, Clone)]
2138pub struct FileAnalysis {
2139 pub mime: String,
2140 pub metadata: Option<DocMetadata>,
2141 pub search_text: Option<String>,
2142 pub extraction: ExtractionSummary,
2143}
2144
2145#[derive(Clone, Copy)]
2146pub struct AudioAnalyzeOptions {
2147 force: bool,
2148 segment_secs: u32,
2149}
2150
2151impl AudioAnalyzeOptions {
2152 const DEFAULT_SEGMENT_SECS: u32 = 30;
2153
2154 fn normalised_segment_secs(self) -> u32 {
2155 self.segment_secs.max(1)
2156 }
2157}
2158
2159impl Default for AudioAnalyzeOptions {
2160 fn default() -> Self {
2161 Self {
2162 force: false,
2163 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2164 }
2165 }
2166}
2167
2168struct AudioAnalysis {
2169 metadata: DocAudioMetadata,
2170 caption: Option<String>,
2171 search_terms: Vec<String>,
2172}
2173
2174struct ImageAnalysis {
2175 width: u32,
2176 height: u32,
2177 palette: Vec<String>,
2178 caption: Option<String>,
2179 exif: Option<DocExifMetadata>,
2180}
2181
2182#[derive(Debug, Clone)]
2183pub struct ExtractionSummary {
2184 reader: Option<String>,
2185 status: ExtractionStatus,
2186 warnings: Vec<String>,
2187 duration_ms: Option<u64>,
2188 pages_processed: Option<u32>,
2189}
2190
2191impl ExtractionSummary {
2192 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2193 self.warnings.push(warning.into());
2194 }
2195}
2196
2197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2198enum ExtractionStatus {
2199 Skipped,
2200 Ok,
2201 FallbackUsed,
2202 Empty,
2203 Failed,
2204}
2205
2206impl ExtractionStatus {
2207 fn label(self) -> &'static str {
2208 match self {
2209 Self::Skipped => "skipped",
2210 Self::Ok => "ok",
2211 Self::FallbackUsed => "fallback_used",
2212 Self::Empty => "empty",
2213 Self::Failed => "failed",
2214 }
2215 }
2216}
2217
2218fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2219 let filename = source_path
2220 .and_then(|path| path.file_name())
2221 .and_then(|name| name.to_str())
2222 .map(|value| value.to_string());
2223 MediaManifest {
2224 kind: "video".to_string(),
2225 mime: mime.to_string(),
2226 bytes,
2227 filename,
2228 duration_ms: None,
2229 width: None,
2230 height: None,
2231 codec: None,
2232 }
2233}
2234
2235pub fn analyze_file(
2236 source_path: Option<&Path>,
2237 payload: &[u8],
2238 payload_utf8: Option<&str>,
2239 inferred_title: Option<&str>,
2240 audio_opts: AudioAnalyzeOptions,
2241 force_video: bool,
2242) -> FileAnalysis {
2243 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2244 let is_video = force_video || mime.starts_with("video/");
2245 let treat_as_text = if is_video { false } else { treat_as_text_base };
2246
2247 let mut metadata = DocMetadata::default();
2248 metadata.mime = Some(mime.clone());
2249 metadata.bytes = Some(payload.len() as u64);
2250 metadata.hash = Some(hash(payload).to_hex().to_string());
2251
2252 let mut search_text = None;
2253
2254 let mut extraction = ExtractionSummary {
2255 reader: None,
2256 status: ExtractionStatus::Skipped,
2257 warnings: Vec::new(),
2258 duration_ms: None,
2259 pages_processed: None,
2260 };
2261
2262 let reader_registry = default_reader_registry();
2263 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2264 if slice.is_empty() {
2265 None
2266 } else {
2267 Some(slice)
2268 }
2269 });
2270
2271 let apply_extracted_text = |text: &str,
2272 reader_label: &str,
2273 status_if_applied: ExtractionStatus,
2274 extraction: &mut ExtractionSummary,
2275 metadata: &mut DocMetadata,
2276 search_text: &mut Option<String>|
2277 -> bool {
2278 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2279 let mut value = normalized.text;
2280 if normalized.truncated {
2281 value.push('…');
2282 }
2283 if metadata.caption.is_none() {
2284 if let Some(caption) = caption_from_text(&value) {
2285 metadata.caption = Some(caption);
2286 }
2287 }
2288 *search_text = Some(value);
2289 extraction.reader = Some(reader_label.to_string());
2290 extraction.status = status_if_applied;
2291 true
2292 } else {
2293 false
2294 }
2295 };
2296
2297 if is_video {
2298 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2299 }
2300
2301 if treat_as_text {
2302 if let Some(text) = payload_utf8 {
2303 if !apply_extracted_text(
2304 text,
2305 "bytes",
2306 ExtractionStatus::Ok,
2307 &mut extraction,
2308 &mut metadata,
2309 &mut search_text,
2310 ) {
2311 extraction.status = ExtractionStatus::Empty;
2312 extraction.reader = Some("bytes".to_string());
2313 let msg = "text payload contained no searchable content after normalization";
2314 extraction.record_warning(msg);
2315 warn!("{msg}");
2316 }
2317 } else {
2318 extraction.reader = Some("bytes".to_string());
2319 extraction.status = ExtractionStatus::Failed;
2320 let msg = "payload reported as text but was not valid UTF-8";
2321 extraction.record_warning(msg);
2322 warn!("{msg}");
2323 }
2324 } else if mime == "application/pdf" {
2325 let mut applied = false;
2326
2327 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2328 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2329 .with_uri(source_path.and_then(|p| p.to_str()))
2330 .with_magic(magic);
2331
2332 if let Some(reader) = reader_registry.find_reader(&hint) {
2333 match reader.extract(payload, &hint) {
2334 Ok(output) => {
2335 extraction.reader = Some(output.reader_name.clone());
2336 if let Some(ms) = output.diagnostics.duration_ms {
2337 extraction.duration_ms = Some(ms);
2338 }
2339 if let Some(pages) = output.diagnostics.pages_processed {
2340 extraction.pages_processed = Some(pages);
2341 }
2342 if let Some(mime_type) = output.document.mime_type.clone() {
2343 metadata.mime = Some(mime_type);
2344 }
2345
2346 for warning in output.diagnostics.warnings.iter() {
2347 extraction.record_warning(warning);
2348 warn!("{warning}");
2349 }
2350
2351 let status = if output.diagnostics.fallback {
2352 ExtractionStatus::FallbackUsed
2353 } else {
2354 ExtractionStatus::Ok
2355 };
2356
2357 if let Some(doc_text) = output.document.text.as_ref() {
2358 applied = apply_extracted_text(
2359 doc_text,
2360 output.reader_name.as_str(),
2361 status,
2362 &mut extraction,
2363 &mut metadata,
2364 &mut search_text,
2365 );
2366 if !applied {
2367 extraction.reader = Some(output.reader_name);
2368 extraction.status = ExtractionStatus::Empty;
2369 let msg = "primary reader returned no usable text";
2370 extraction.record_warning(msg);
2371 warn!("{msg}");
2372 }
2373 } else {
2374 extraction.reader = Some(output.reader_name);
2375 extraction.status = ExtractionStatus::Empty;
2376 let msg = "primary reader returned empty text";
2377 extraction.record_warning(msg);
2378 warn!("{msg}");
2379 }
2380 }
2381 Err(err) => {
2382 let name = reader.name();
2383 extraction.reader = Some(name.to_string());
2384 extraction.status = ExtractionStatus::Failed;
2385 let msg = format!("primary reader {name} failed: {err}");
2386 extraction.record_warning(&msg);
2387 warn!("{msg}");
2388 }
2389 }
2390 } else {
2391 extraction.record_warning("no registered reader matched this PDF payload");
2392 warn!("no registered reader matched this PDF payload");
2393 }
2394
2395 if !applied {
2396 match extract_pdf_text(payload) {
2397 Ok(pdf_text) => {
2398 if !apply_extracted_text(
2399 &pdf_text,
2400 "pdf_extract",
2401 ExtractionStatus::FallbackUsed,
2402 &mut extraction,
2403 &mut metadata,
2404 &mut search_text,
2405 ) {
2406 extraction.reader = Some("pdf_extract".to_string());
2407 extraction.status = ExtractionStatus::Empty;
2408 let msg = "PDF text extraction yielded no searchable content";
2409 extraction.record_warning(msg);
2410 warn!("{msg}");
2411 }
2412 }
2413 Err(err) => {
2414 extraction.reader = Some("pdf_extract".to_string());
2415 extraction.status = ExtractionStatus::Failed;
2416 let msg = format!("failed to extract PDF text via fallback: {err}");
2417 extraction.record_warning(&msg);
2418 warn!("{msg}");
2419 }
2420 }
2421 }
2422 }
2423
2424 if !is_video && mime.starts_with("image/") {
2425 if let Some(image_meta) = analyze_image(payload) {
2426 let ImageAnalysis {
2427 width,
2428 height,
2429 palette,
2430 caption,
2431 exif,
2432 } = image_meta;
2433 metadata.width = Some(width);
2434 metadata.height = Some(height);
2435 if !palette.is_empty() {
2436 metadata.colors = Some(palette);
2437 }
2438 if metadata.caption.is_none() {
2439 metadata.caption = caption;
2440 }
2441 if let Some(exif) = exif {
2442 metadata.exif = Some(exif);
2443 }
2444 }
2445 }
2446
2447 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2448 if let Some(AudioAnalysis {
2449 metadata: audio_metadata,
2450 caption: audio_caption,
2451 mut search_terms,
2452 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2453 {
2454 if metadata.audio.is_none()
2455 || metadata
2456 .audio
2457 .as_ref()
2458 .map_or(true, DocAudioMetadata::is_empty)
2459 {
2460 metadata.audio = Some(audio_metadata);
2461 }
2462 if metadata.caption.is_none() {
2463 metadata.caption = audio_caption;
2464 }
2465 if !search_terms.is_empty() {
2466 match &mut search_text {
2467 Some(existing) => {
2468 for term in search_terms.drain(..) {
2469 if !existing.contains(&term) {
2470 if !existing.ends_with(' ') {
2471 existing.push(' ');
2472 }
2473 existing.push_str(&term);
2474 }
2475 }
2476 }
2477 None => {
2478 search_text = Some(search_terms.join(" "));
2479 }
2480 }
2481 }
2482 }
2483 }
2484
2485 if metadata.caption.is_none() {
2486 if let Some(title) = inferred_title {
2487 let trimmed = title.trim();
2488 if !trimmed.is_empty() {
2489 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2490 }
2491 }
2492 }
2493
2494 FileAnalysis {
2495 mime,
2496 metadata: if metadata.is_empty() {
2497 None
2498 } else {
2499 Some(metadata)
2500 },
2501 search_text,
2502 extraction,
2503 }
2504}
2505
2506fn default_reader_registry() -> &'static ReaderRegistry {
2507 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2508 REGISTRY.get_or_init(ReaderRegistry::default)
2509}
2510
2511fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2512 if detect_pdf_magic(magic) {
2513 return Some(DocumentFormat::Pdf);
2514 }
2515
2516 let mime = mime?.trim().to_ascii_lowercase();
2517 match mime.as_str() {
2518 "application/pdf" => Some(DocumentFormat::Pdf),
2519 "text/plain" => Some(DocumentFormat::PlainText),
2520 "text/markdown" => Some(DocumentFormat::Markdown),
2521 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2522 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2523 Some(DocumentFormat::Docx)
2524 }
2525 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2526 Some(DocumentFormat::Pptx)
2527 }
2528 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2529 Some(DocumentFormat::Xlsx)
2530 }
2531 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2532 _ => None,
2533 }
2534}
2535
2536fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2537 let mut slice = match magic {
2538 Some(slice) if !slice.is_empty() => slice,
2539 _ => return false,
2540 };
2541 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2542 slice = &slice[3..];
2543 }
2544 while let Some((first, rest)) = slice.split_first() {
2545 if first.is_ascii_whitespace() {
2546 slice = rest;
2547 } else {
2548 break;
2549 }
2550 }
2551 slice.starts_with(b"%PDF")
2552}
2553
2554fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2555 match pdf_extract::extract_text_from_mem(payload) {
2556 Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2557 Ok(fallback) => Ok(fallback),
2558 Err(err) => {
2559 warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2560 Ok(text)
2561 }
2562 },
2563 Err(err) => {
2564 warn!("pdf_extract failed: {err}");
2565 match fallback_pdftotext(payload) {
2566 Ok(fallback) => Ok(fallback),
2567 Err(fallback_err) => {
2568 warn!("pdftotext fallback failed: {fallback_err}");
2569 Err(err)
2570 }
2571 }
2572 }
2573 other => other,
2574 }
2575}
2576
2577fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2578 use std::io::Write;
2579 use std::process::{Command, Stdio};
2580
2581 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2582
2583 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2584 temp_pdf
2585 .write_all(payload)
2586 .context("failed to write pdf payload to temp file")?;
2587 temp_pdf.flush().context("failed to flush temp pdf file")?;
2588
2589 let child = Command::new(pdftotext)
2590 .arg("-layout")
2591 .arg(temp_pdf.path())
2592 .arg("-")
2593 .stdout(Stdio::piped())
2594 .spawn()
2595 .context("failed to spawn pdftotext")?;
2596
2597 let output = child
2598 .wait_with_output()
2599 .context("failed to read pdftotext output")?;
2600
2601 if !output.status.success() {
2602 return Err(anyhow!("pdftotext exited with status {}", output.status));
2603 }
2604
2605 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2606 Ok(text)
2607}
2608
2609fn detect_mime(
2610 source_path: Option<&Path>,
2611 payload: &[u8],
2612 payload_utf8: Option<&str>,
2613) -> (String, bool) {
2614 if let Some(kind) = infer::get(payload) {
2615 let mime = kind.mime_type().to_string();
2616 let treat_as_text = mime.starts_with("text/")
2617 || matches!(
2618 mime.as_str(),
2619 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2620 );
2621 return (mime, treat_as_text);
2622 }
2623
2624 let magic = magic_from_u8(payload);
2625 if !magic.is_empty() && magic != "application/octet-stream" {
2626 let treat_as_text = magic.starts_with("text/")
2627 || matches!(
2628 magic,
2629 "application/json"
2630 | "application/xml"
2631 | "application/javascript"
2632 | "image/svg+xml"
2633 | "text/plain"
2634 );
2635 return (magic.to_string(), treat_as_text);
2636 }
2637
2638 if let Some(path) = source_path {
2639 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2640 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2641 return (mime.to_string(), treat_as_text);
2642 }
2643 }
2644 }
2645
2646 if payload_utf8.is_some() {
2647 return ("text/plain".to_string(), true);
2648 }
2649
2650 ("application/octet-stream".to_string(), false)
2651}
2652
2653fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2654 let ext_lower = ext.to_ascii_lowercase();
2655 match ext_lower.as_str() {
2656 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2657 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2658 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2659 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2660 "md" | "markdown" => Some(("text/markdown", true)),
2661 "rst" => Some(("text/x-rst", true)),
2662 "json" => Some(("application/json", true)),
2663 "csv" => Some(("text/csv", true)),
2664 "tsv" => Some(("text/tab-separated-values", true)),
2665 "yaml" | "yml" => Some(("application/yaml", true)),
2666 "toml" => Some(("application/toml", true)),
2667 "html" | "htm" => Some(("text/html", true)),
2668 "xml" => Some(("application/xml", true)),
2669 "svg" => Some(("image/svg+xml", true)),
2670 "gif" => Some(("image/gif", false)),
2671 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2672 "png" => Some(("image/png", false)),
2673 "bmp" => Some(("image/bmp", false)),
2674 "ico" => Some(("image/x-icon", false)),
2675 "tif" | "tiff" => Some(("image/tiff", false)),
2676 "webp" => Some(("image/webp", false)),
2677 _ => None,
2678 }
2679}
2680
2681fn caption_from_text(text: &str) -> Option<String> {
2682 for line in text.lines() {
2683 let trimmed = line.trim();
2684 if !trimmed.is_empty() {
2685 return Some(truncate_to_boundary(trimmed, 240));
2686 }
2687 }
2688 None
2689}
2690
2691fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2692 if text.len() <= max_len {
2693 return text.to_string();
2694 }
2695 let mut end = max_len;
2696 while end > 0 && !text.is_char_boundary(end) {
2697 end -= 1;
2698 }
2699 if end == 0 {
2700 return String::new();
2701 }
2702 let mut truncated = text[..end].trim_end().to_string();
2703 truncated.push('…');
2704 truncated
2705}
2706
2707fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2708 let reader = ImageReader::new(Cursor::new(payload))
2709 .with_guessed_format()
2710 .map_err(|err| warn!("failed to guess image format: {err}"))
2711 .ok()?;
2712 let image = reader
2713 .decode()
2714 .map_err(|err| warn!("failed to decode image: {err}"))
2715 .ok()?;
2716 let width = image.width();
2717 let height = image.height();
2718 let rgb = image.to_rgb8();
2719 let palette = match get_palette(
2720 rgb.as_raw(),
2721 ColorFormat::Rgb,
2722 COLOR_PALETTE_SIZE,
2723 COLOR_PALETTE_QUALITY,
2724 ) {
2725 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2726 Err(err) => {
2727 warn!("failed to compute colour palette: {err}");
2728 Vec::new()
2729 }
2730 };
2731
2732 let (exif, exif_caption) = extract_exif_metadata(payload);
2733
2734 Some(ImageAnalysis {
2735 width,
2736 height,
2737 palette,
2738 caption: exif_caption,
2739 exif,
2740 })
2741}
2742
2743fn color_to_hex(color: Color) -> String {
2744 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2745}
2746
2747fn analyze_audio(
2748 payload: &[u8],
2749 source_path: Option<&Path>,
2750 mime: &str,
2751 options: AudioAnalyzeOptions,
2752) -> Option<AudioAnalysis> {
2753 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2754 use symphonia::core::errors::Error as SymphoniaError;
2755 use symphonia::core::formats::FormatOptions;
2756 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2757 use symphonia::core::meta::MetadataOptions;
2758 use symphonia::core::probe::Hint;
2759
2760 let cursor = Cursor::new(payload.to_vec());
2761 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2762
2763 let mut hint = Hint::new();
2764 if let Some(path) = source_path {
2765 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2766 hint.with_extension(ext);
2767 }
2768 }
2769 if let Some(suffix) = mime.split('/').nth(1) {
2770 hint.with_extension(suffix);
2771 }
2772
2773 let probed = symphonia::default::get_probe()
2774 .format(
2775 &hint,
2776 mss,
2777 &FormatOptions::default(),
2778 &MetadataOptions::default(),
2779 )
2780 .map_err(|err| warn!("failed to probe audio stream: {err}"))
2781 .ok()?;
2782
2783 let mut format = probed.format;
2784 let track = format.default_track()?;
2785 let track_id = track.id;
2786 let codec_params: CodecParameters = track.codec_params.clone();
2787 let sample_rate = codec_params.sample_rate;
2788 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2789
2790 let mut decoder = symphonia::default::get_codecs()
2791 .make(&codec_params, &DecoderOptions::default())
2792 .map_err(|err| warn!("failed to create audio decoder: {err}"))
2793 .ok()?;
2794
2795 let mut decoded_frames: u64 = 0;
2796 loop {
2797 let packet = match format.next_packet() {
2798 Ok(packet) => packet,
2799 Err(SymphoniaError::IoError(_)) => break,
2800 Err(SymphoniaError::DecodeError(err)) => {
2801 warn!("skipping undecodable audio packet: {err}");
2802 continue;
2803 }
2804 Err(SymphoniaError::ResetRequired) => {
2805 decoder.reset();
2806 continue;
2807 }
2808 Err(other) => {
2809 warn!("stopping audio analysis due to error: {other}");
2810 break;
2811 }
2812 };
2813
2814 if packet.track_id() != track_id {
2815 continue;
2816 }
2817
2818 match decoder.decode(&packet) {
2819 Ok(audio_buf) => {
2820 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2821 }
2822 Err(SymphoniaError::DecodeError(err)) => {
2823 warn!("failed to decode audio packet: {err}");
2824 }
2825 Err(SymphoniaError::IoError(_)) => break,
2826 Err(SymphoniaError::ResetRequired) => {
2827 decoder.reset();
2828 }
2829 Err(other) => {
2830 warn!("decoder error: {other}");
2831 break;
2832 }
2833 }
2834 }
2835
2836 let duration_secs = match sample_rate {
2837 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2838 _ => 0.0,
2839 };
2840
2841 let bitrate_kbps = if duration_secs > 0.0 {
2842 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2843 } else {
2844 None
2845 };
2846
2847 let tags = extract_lofty_tags(payload);
2848 let caption = tags
2849 .get("title")
2850 .cloned()
2851 .or_else(|| tags.get("tracktitle").cloned());
2852
2853 let mut search_terms = Vec::new();
2854 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2855 search_terms.push(title.clone());
2856 }
2857 if let Some(artist) = tags.get("artist") {
2858 search_terms.push(artist.clone());
2859 }
2860 if let Some(album) = tags.get("album") {
2861 search_terms.push(album.clone());
2862 }
2863 if let Some(genre) = tags.get("genre") {
2864 search_terms.push(genre.clone());
2865 }
2866
2867 let mut segments = Vec::new();
2868 if duration_secs > 0.0 {
2869 let segment_len = options.normalised_segment_secs() as f64;
2870 if segment_len > 0.0 {
2871 let mut start = 0.0;
2872 let mut idx = 1usize;
2873 while start < duration_secs {
2874 let end = (start + segment_len).min(duration_secs);
2875 segments.push(AudioSegmentMetadata {
2876 start_seconds: start as f32,
2877 end_seconds: end as f32,
2878 label: Some(format!("Segment {}", idx)),
2879 });
2880 if end >= duration_secs {
2881 break;
2882 }
2883 start = end;
2884 idx += 1;
2885 }
2886 }
2887 }
2888
2889 let mut metadata = DocAudioMetadata::default();
2890 if duration_secs > 0.0 {
2891 metadata.duration_secs = Some(duration_secs as f32);
2892 }
2893 if let Some(rate) = sample_rate {
2894 metadata.sample_rate_hz = Some(rate);
2895 }
2896 if let Some(channels) = channel_count {
2897 metadata.channels = Some(channels);
2898 }
2899 if let Some(bitrate) = bitrate_kbps {
2900 metadata.bitrate_kbps = Some(bitrate);
2901 }
2902 if codec_params.codec != CODEC_TYPE_NULL {
2903 metadata.codec = Some(format!("{:?}", codec_params.codec));
2904 }
2905 if !segments.is_empty() {
2906 metadata.segments = segments;
2907 }
2908 if !tags.is_empty() {
2909 metadata.tags = tags
2910 .iter()
2911 .map(|(k, v)| (k.clone(), v.clone()))
2912 .collect::<BTreeMap<_, _>>();
2913 }
2914
2915 Some(AudioAnalysis {
2916 metadata,
2917 caption,
2918 search_terms,
2919 })
2920}
2921
2922fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2923 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2924
2925 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2926 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2927 out.entry("title".into())
2928 .or_insert_with(|| value.to_string());
2929 out.entry("tracktitle".into())
2930 .or_insert_with(|| value.to_string());
2931 }
2932 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2933 out.entry("artist".into())
2934 .or_insert_with(|| value.to_string());
2935 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2936 out.entry("artist".into())
2937 .or_insert_with(|| value.to_string());
2938 }
2939 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2940 out.entry("album".into())
2941 .or_insert_with(|| value.to_string());
2942 }
2943 if let Some(value) = tag.get_string(&ItemKey::Genre) {
2944 out.entry("genre".into())
2945 .or_insert_with(|| value.to_string());
2946 }
2947 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2948 out.entry("track_number".into())
2949 .or_insert_with(|| value.to_string());
2950 }
2951 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2952 out.entry("disc_number".into())
2953 .or_insert_with(|| value.to_string());
2954 }
2955 }
2956
2957 let mut tags = HashMap::new();
2958 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2959 Ok(probe) => probe,
2960 Err(err) => {
2961 warn!("failed to guess audio tag file type: {err}");
2962 return tags;
2963 }
2964 };
2965
2966 let tagged_file = match probe.read() {
2967 Ok(file) => file,
2968 Err(err) => {
2969 warn!("failed to read audio tags: {err}");
2970 return tags;
2971 }
2972 };
2973
2974 if let Some(primary) = tagged_file.primary_tag() {
2975 collect_tag(primary, &mut tags);
2976 }
2977 for tag in tagged_file.tags() {
2978 collect_tag(tag, &mut tags);
2979 }
2980
2981 tags
2982}
2983
2984fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2985 let mut cursor = Cursor::new(payload);
2986 let exif = match ExifReader::new().read_from_container(&mut cursor) {
2987 Ok(exif) => exif,
2988 Err(err) => {
2989 warn!("failed to read EXIF metadata: {err}");
2990 return (None, None);
2991 }
2992 };
2993
2994 let mut doc = DocExifMetadata::default();
2995 let mut has_data = false;
2996
2997 if let Some(make) = exif_string(&exif, Tag::Make) {
2998 doc.make = Some(make);
2999 has_data = true;
3000 }
3001 if let Some(model) = exif_string(&exif, Tag::Model) {
3002 doc.model = Some(model);
3003 has_data = true;
3004 }
3005 if let Some(lens) =
3006 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3007 {
3008 doc.lens = Some(lens);
3009 has_data = true;
3010 }
3011 if let Some(dt) =
3012 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3013 {
3014 doc.datetime = Some(dt);
3015 has_data = true;
3016 }
3017
3018 if let Some(gps) = extract_gps_metadata(&exif) {
3019 doc.gps = Some(gps);
3020 has_data = true;
3021 }
3022
3023 let caption = exif_string(&exif, Tag::ImageDescription);
3024
3025 let metadata = if has_data { Some(doc) } else { None };
3026 (metadata, caption)
3027}
3028
3029fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3030 exif.fields()
3031 .find(|field| field.tag == tag)
3032 .and_then(|field| field_to_string(field, exif))
3033}
3034
3035fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3036 let value = field.display_value().with_unit(exif).to_string();
3037 let trimmed = value.trim_matches('\0').trim();
3038 if trimmed.is_empty() {
3039 None
3040 } else {
3041 Some(trimmed.to_string())
3042 }
3043}
3044
3045fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3046 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3047 let latitude_ref_field = exif
3048 .fields()
3049 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3050 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3051 let longitude_ref_field = exif
3052 .fields()
3053 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3054
3055 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3056 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3057
3058 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3059 if reference.eq_ignore_ascii_case("S") {
3060 latitude = -latitude;
3061 }
3062 }
3063 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3064 if reference.eq_ignore_ascii_case("W") {
3065 longitude = -longitude;
3066 }
3067 }
3068
3069 Some(DocGpsMetadata {
3070 latitude,
3071 longitude,
3072 })
3073}
3074
3075fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3076 match value {
3077 ExifValue::Rational(values) if !values.is_empty() => {
3078 let deg = rational_to_f64_u(values.get(0)?)?;
3079 let min = values
3080 .get(1)
3081 .and_then(|v| rational_to_f64_u(v))
3082 .unwrap_or(0.0);
3083 let sec = values
3084 .get(2)
3085 .and_then(|v| rational_to_f64_u(v))
3086 .unwrap_or(0.0);
3087 Some(deg + (min / 60.0) + (sec / 3600.0))
3088 }
3089 ExifValue::SRational(values) if !values.is_empty() => {
3090 let deg = rational_to_f64_i(values.get(0)?)?;
3091 let min = values
3092 .get(1)
3093 .and_then(|v| rational_to_f64_i(v))
3094 .unwrap_or(0.0);
3095 let sec = values
3096 .get(2)
3097 .and_then(|v| rational_to_f64_i(v))
3098 .unwrap_or(0.0);
3099 Some(deg + (min / 60.0) + (sec / 3600.0))
3100 }
3101 _ => None,
3102 }
3103}
3104
3105fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3106 if value.denom == 0 {
3107 None
3108 } else {
3109 Some(value.num as f64 / value.denom as f64)
3110 }
3111}
3112
3113fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3114 if value.denom == 0 {
3115 None
3116 } else {
3117 Some(value.num as f64 / value.denom as f64)
3118 }
3119}
3120
3121fn value_to_ascii(value: &ExifValue) -> Option<String> {
3122 if let ExifValue::Ascii(values) = value {
3123 values
3124 .first()
3125 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3126 .map(|s| s.trim_matches('\0').trim().to_string())
3127 .filter(|s| !s.is_empty())
3128 } else {
3129 None
3130 }
3131}
3132
3133fn ingest_payload(
3134 mem: &mut Memvid,
3135 args: &PutArgs,
3136 payload: Vec<u8>,
3137 source_path: Option<&Path>,
3138 capacity_guard: Option<&mut CapacityGuard>,
3139 enable_embedding: bool,
3140 runtime: Option<&EmbeddingRuntime>,
3141 parallel_mode: bool,
3142) -> Result<IngestOutcome> {
3143 let original_bytes = payload.len();
3144 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3145
3146 let payload_text = std::str::from_utf8(&payload).ok();
3147 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3148
3149 let audio_opts = AudioAnalyzeOptions {
3150 force: args.audio,
3151 segment_secs: args
3152 .audio_segment_seconds
3153 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3154 };
3155
3156 let mut analysis = analyze_file(
3157 source_path,
3158 &payload,
3159 payload_text,
3160 inferred_title.as_deref(),
3161 audio_opts,
3162 args.video,
3163 );
3164
3165 if args.video && !analysis.mime.starts_with("video/") {
3166 anyhow::bail!(
3167 "--video requires a video input; detected MIME type {}",
3168 analysis.mime
3169 );
3170 }
3171
3172 let mut search_text = analysis.search_text.clone();
3173 if let Some(ref mut text) = search_text {
3174 if text.len() > MAX_SEARCH_TEXT_LEN {
3175 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3176 *text = truncated;
3177 }
3178 }
3179 analysis.search_text = search_text.clone();
3180
3181 let uri = if let Some(ref explicit) = args.uri {
3182 derive_uri(Some(explicit), None)
3183 } else if args.video {
3184 derive_video_uri(&payload, source_path, &analysis.mime)
3185 } else {
3186 derive_uri(None, source_path)
3187 };
3188
3189 let mut options_builder = PutOptions::builder()
3190 .enable_embedding(enable_embedding)
3191 .auto_tag(!args.no_auto_tag)
3192 .extract_dates(!args.no_extract_dates);
3193 if let Some(ts) = args.timestamp {
3194 options_builder = options_builder.timestamp(ts);
3195 }
3196 if let Some(track) = &args.track {
3197 options_builder = options_builder.track(track.clone());
3198 }
3199 if args.video {
3200 options_builder = options_builder.kind("video");
3201 options_builder = options_builder.tag("kind", "video");
3202 options_builder = options_builder.push_tag("video");
3203 } else if let Some(kind) = &args.kind {
3204 options_builder = options_builder.kind(kind.clone());
3205 }
3206 options_builder = options_builder.uri(uri.clone());
3207 if let Some(ref title) = inferred_title {
3208 options_builder = options_builder.title(title.clone());
3209 }
3210 for (key, value) in &args.tags {
3211 options_builder = options_builder.tag(key.clone(), value.clone());
3212 if !key.is_empty() {
3213 options_builder = options_builder.push_tag(key.clone());
3214 }
3215 if !value.is_empty() && value != key {
3216 options_builder = options_builder.push_tag(value.clone());
3217 }
3218 }
3219 for label in &args.labels {
3220 options_builder = options_builder.label(label.clone());
3221 }
3222 if let Some(metadata) = analysis.metadata.clone() {
3223 if !metadata.is_empty() {
3224 options_builder = options_builder.metadata(metadata);
3225 }
3226 }
3227 for (idx, entry) in args.metadata.iter().enumerate() {
3228 match serde_json::from_str::<DocMetadata>(entry) {
3229 Ok(meta) => {
3230 options_builder = options_builder.metadata(meta);
3231 }
3232 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3233 Ok(value) => {
3234 options_builder =
3235 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3236 }
3237 Err(err) => {
3238 warn!("failed to parse --metadata JSON: {err}");
3239 }
3240 },
3241 }
3242 }
3243 if let Some(text) = search_text.clone() {
3244 if !text.trim().is_empty() {
3245 options_builder = options_builder.search_text(text);
3246 }
3247 }
3248 let options = options_builder.build();
3249
3250 let existing_frame = if args.update_existing || !args.allow_duplicate {
3251 match mem.frame_by_uri(&uri) {
3252 Ok(frame) => Some(frame),
3253 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3254 Err(err) => return Err(err.into()),
3255 }
3256 } else {
3257 None
3258 };
3259
3260 let frame_id = if let Some(ref existing) = existing_frame {
3263 existing.id
3264 } else {
3265 mem.stats()?.frame_count
3266 };
3267
3268 let mut embedded = false;
3269 let allow_embedding = enable_embedding && !args.video;
3270 if enable_embedding && !allow_embedding && args.video {
3271 warn!("semantic embeddings are not generated for video payloads");
3272 }
3273 let seq = if let Some(existing) = existing_frame {
3274 if args.update_existing {
3275 let payload_for_update = payload.clone();
3276 if allow_embedding {
3277 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3278 let embed_text = search_text
3279 .clone()
3280 .or_else(|| payload_text.map(|text| text.to_string()))
3281 .unwrap_or_default();
3282 if embed_text.trim().is_empty() {
3283 warn!("no textual content available; embedding skipped");
3284 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3285 .map_err(|err| {
3286 map_put_error(
3287 err,
3288 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3289 )
3290 })?
3291 } else {
3292 let truncated_text = truncate_for_embedding(&embed_text);
3294 if truncated_text.len() < embed_text.len() {
3295 info!(
3296 "Truncated text from {} to {} chars for embedding",
3297 embed_text.len(),
3298 truncated_text.len()
3299 );
3300 }
3301 let embedding = runtime.embed(&truncated_text)?;
3302 embedded = true;
3303 mem.update_frame(
3304 existing.id,
3305 Some(payload_for_update),
3306 options.clone(),
3307 Some(embedding),
3308 )
3309 .map_err(|err| {
3310 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3311 })?
3312 }
3313 } else {
3314 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3315 .map_err(|err| {
3316 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3317 })?
3318 }
3319 } else {
3320 return Err(DuplicateUriError::new(uri.as_str()).into());
3321 }
3322 } else {
3323 if let Some(guard) = capacity_guard.as_ref() {
3324 guard.ensure_capacity(stored_bytes as u64)?;
3325 }
3326 if parallel_mode {
3327 #[cfg(feature = "parallel_segments")]
3328 {
3329 let mut parent_embedding = None;
3330 let mut chunk_embeddings_vec = None;
3331 if allow_embedding {
3332 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3333 let embed_text = search_text
3334 .clone()
3335 .or_else(|| payload_text.map(|text| text.to_string()))
3336 .unwrap_or_default();
3337 if embed_text.trim().is_empty() {
3338 warn!("no textual content available; embedding skipped");
3339 } else {
3340 info!(
3342 "parallel mode: checking for chunks on {} bytes payload",
3343 payload.len()
3344 );
3345 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3346 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3348
3349 #[cfg(feature = "temporal_enrich")]
3351 let enriched_chunks = if args.temporal_enrich {
3352 info!(
3353 "Temporal enrichment enabled, processing {} chunks",
3354 chunk_texts.len()
3355 );
3356 let today = chrono::Local::now().date_naive();
3358 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3359 let enriched: Vec<String> =
3361 results.iter().map(|(text, _)| text.clone()).collect();
3362 let resolved_count = results
3363 .iter()
3364 .filter(|(_, e)| {
3365 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3366 })
3367 .count();
3368 info!(
3369 "Temporal enrichment: resolved {} chunks with temporal context",
3370 resolved_count
3371 );
3372 enriched
3373 } else {
3374 chunk_texts.clone()
3375 };
3376 #[cfg(not(feature = "temporal_enrich"))]
3377 let enriched_chunks = {
3378 if args.temporal_enrich {
3379 warn!(
3380 "Temporal enrichment requested but feature not compiled in"
3381 );
3382 }
3383 chunk_texts.clone()
3384 };
3385
3386 let embed_chunks = if args.contextual {
3388 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3389 let engine = if args.contextual_model.as_deref() == Some("local") {
3390 #[cfg(feature = "llama-cpp")]
3391 {
3392 let model_path = get_local_contextual_model_path()?;
3393 ContextualEngine::local(model_path)
3394 }
3395 #[cfg(not(feature = "llama-cpp"))]
3396 {
3397 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3398 }
3399 } else if let Some(model) = &args.contextual_model {
3400 ContextualEngine::openai_with_model(model)?
3401 } else {
3402 ContextualEngine::openai()?
3403 };
3404
3405 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3406 {
3407 Ok(contexts) => {
3408 info!("Generated {} contextual prefixes", contexts.len());
3409 apply_contextual_prefixes(
3410 &embed_text,
3411 &enriched_chunks,
3412 &contexts,
3413 )
3414 }
3415 Err(e) => {
3416 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3417 enriched_chunks.clone()
3418 }
3419 }
3420 } else {
3421 enriched_chunks
3422 };
3423
3424 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3425 for chunk_text in &embed_chunks {
3426 let truncated_chunk = truncate_for_embedding(chunk_text);
3428 if truncated_chunk.len() < chunk_text.len() {
3429 info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
3430 }
3431 let chunk_emb = runtime.embed(&truncated_chunk)?;
3432 chunk_embeddings.push(chunk_emb);
3433 }
3434 let num_chunks = chunk_embeddings.len();
3435 chunk_embeddings_vec = Some(chunk_embeddings);
3436 let parent_text = truncate_for_embedding(&embed_text);
3438 if parent_text.len() < embed_text.len() {
3439 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3440 }
3441 parent_embedding = Some(runtime.embed(&parent_text)?);
3442 embedded = true;
3443 info!(
3444 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3445 num_chunks
3446 );
3447 } else {
3448 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3450 let truncated_text = truncate_for_embedding(&embed_text);
3451 if truncated_text.len() < embed_text.len() {
3452 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3453 }
3454 parent_embedding = Some(runtime.embed(&truncated_text)?);
3455 embedded = true;
3456 }
3457 }
3458 }
3459 let payload_variant = if let Some(path) = source_path {
3460 ParallelPayload::Path(path.to_path_buf())
3461 } else {
3462 ParallelPayload::Bytes(payload)
3463 };
3464 let input = ParallelInput {
3465 payload: payload_variant,
3466 options: options.clone(),
3467 embedding: parent_embedding,
3468 chunk_embeddings: chunk_embeddings_vec,
3469 };
3470 return Ok(IngestOutcome {
3471 report: PutReport {
3472 seq: 0,
3473 frame_id: 0, uri,
3475 title: inferred_title,
3476 original_bytes,
3477 stored_bytes,
3478 compressed,
3479 source: source_path.map(Path::to_path_buf),
3480 mime: Some(analysis.mime),
3481 metadata: analysis.metadata,
3482 extraction: analysis.extraction,
3483 #[cfg(feature = "logic_mesh")]
3484 search_text: search_text.clone(),
3485 },
3486 embedded,
3487 parallel_input: Some(input),
3488 });
3489 }
3490 #[cfg(not(feature = "parallel_segments"))]
3491 {
3492 mem.put_bytes_with_options(&payload, options)
3493 .map_err(|err| {
3494 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3495 })?
3496 }
3497 } else if allow_embedding {
3498 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3499 let embed_text = search_text
3500 .clone()
3501 .or_else(|| payload_text.map(|text| text.to_string()))
3502 .unwrap_or_default();
3503 if embed_text.trim().is_empty() {
3504 warn!("no textual content available; embedding skipped");
3505 mem.put_bytes_with_options(&payload, options)
3506 .map_err(|err| {
3507 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3508 })?
3509 } else {
3510 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3512 info!(
3514 "Document will be chunked into {} chunks, generating embeddings for each",
3515 chunk_texts.len()
3516 );
3517
3518 #[cfg(feature = "temporal_enrich")]
3520 let enriched_chunks = if args.temporal_enrich {
3521 info!(
3522 "Temporal enrichment enabled, processing {} chunks",
3523 chunk_texts.len()
3524 );
3525 let today = chrono::Local::now().date_naive();
3527 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3528 let enriched: Vec<String> =
3530 results.iter().map(|(text, _)| text.clone()).collect();
3531 let resolved_count = results
3532 .iter()
3533 .filter(|(_, e)| {
3534 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3535 })
3536 .count();
3537 info!(
3538 "Temporal enrichment: resolved {} chunks with temporal context",
3539 resolved_count
3540 );
3541 enriched
3542 } else {
3543 chunk_texts.clone()
3544 };
3545 #[cfg(not(feature = "temporal_enrich"))]
3546 let enriched_chunks = {
3547 if args.temporal_enrich {
3548 warn!("Temporal enrichment requested but feature not compiled in");
3549 }
3550 chunk_texts.clone()
3551 };
3552
3553 let embed_chunks = if args.contextual {
3555 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3556 let engine = if args.contextual_model.as_deref() == Some("local") {
3557 #[cfg(feature = "llama-cpp")]
3558 {
3559 let model_path = get_local_contextual_model_path()?;
3560 ContextualEngine::local(model_path)
3561 }
3562 #[cfg(not(feature = "llama-cpp"))]
3563 {
3564 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3565 }
3566 } else if let Some(model) = &args.contextual_model {
3567 ContextualEngine::openai_with_model(model)?
3568 } else {
3569 ContextualEngine::openai()?
3570 };
3571
3572 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3573 Ok(contexts) => {
3574 info!("Generated {} contextual prefixes", contexts.len());
3575 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3576 }
3577 Err(e) => {
3578 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3579 enriched_chunks.clone()
3580 }
3581 }
3582 } else {
3583 enriched_chunks
3584 };
3585
3586 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3587 for chunk_text in &embed_chunks {
3588 let truncated_chunk = truncate_for_embedding(chunk_text);
3590 if truncated_chunk.len() < chunk_text.len() {
3591 info!(
3592 "Truncated chunk from {} to {} chars for embedding",
3593 chunk_text.len(),
3594 truncated_chunk.len()
3595 );
3596 }
3597 let chunk_emb = runtime.embed(&truncated_chunk)?;
3598 chunk_embeddings.push(chunk_emb);
3599 }
3600 let parent_text = truncate_for_embedding(&embed_text);
3602 if parent_text.len() < embed_text.len() {
3603 info!(
3604 "Truncated parent text from {} to {} chars for embedding",
3605 embed_text.len(),
3606 parent_text.len()
3607 );
3608 }
3609 let parent_embedding = runtime.embed(&parent_text)?;
3610 embedded = true;
3611 info!(
3612 "Calling put_with_chunk_embeddings with {} chunk embeddings",
3613 chunk_embeddings.len()
3614 );
3615 mem.put_with_chunk_embeddings(
3616 &payload,
3617 Some(parent_embedding),
3618 chunk_embeddings,
3619 options,
3620 )
3621 .map_err(|err| {
3622 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3623 })?
3624 } else {
3625 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3627 let truncated_text = truncate_for_embedding(&embed_text);
3628 if truncated_text.len() < embed_text.len() {
3629 info!(
3630 "Truncated text from {} to {} chars for embedding",
3631 embed_text.len(),
3632 truncated_text.len()
3633 );
3634 }
3635 let embedding = runtime.embed(&truncated_text)?;
3636 embedded = true;
3637 mem.put_with_embedding_and_options(&payload, embedding, options)
3638 .map_err(|err| {
3639 map_put_error(
3640 err,
3641 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3642 )
3643 })?
3644 }
3645 }
3646 } else {
3647 mem.put_bytes_with_options(&payload, options)
3648 .map_err(|err| {
3649 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3650 })?
3651 }
3652 };
3653
3654 Ok(IngestOutcome {
3655 report: PutReport {
3656 seq,
3657 frame_id,
3658 uri,
3659 title: inferred_title,
3660 original_bytes,
3661 stored_bytes,
3662 compressed,
3663 source: source_path.map(Path::to_path_buf),
3664 mime: Some(analysis.mime),
3665 metadata: analysis.metadata,
3666 extraction: analysis.extraction,
3667 #[cfg(feature = "logic_mesh")]
3668 search_text,
3669 },
3670 embedded,
3671 #[cfg(feature = "parallel_segments")]
3672 parallel_input: None,
3673 })
3674}
3675
3676fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
3677 if std::str::from_utf8(payload).is_ok() {
3678 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
3679 Ok((compressed.len(), true))
3680 } else {
3681 Ok((payload.len(), false))
3682 }
3683}
3684
3685fn print_report(report: &PutReport) {
3686 let name = report
3687 .source
3688 .as_ref()
3689 .map(|path| {
3690 let pretty = env::current_dir()
3691 .ok()
3692 .as_ref()
3693 .and_then(|cwd| diff_paths(path, cwd))
3694 .unwrap_or_else(|| path.to_path_buf());
3695 pretty.to_string_lossy().into_owned()
3696 })
3697 .unwrap_or_else(|| "stdin".to_string());
3698
3699 let ratio = if report.original_bytes > 0 {
3700 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
3701 } else {
3702 100.0
3703 };
3704
3705 println!("• {name} → {}", report.uri);
3706 println!(" seq: {}", report.seq);
3707 if report.compressed {
3708 println!(
3709 " size: {} B → {} B ({:.1}% of original, compressed)",
3710 report.original_bytes, report.stored_bytes, ratio
3711 );
3712 } else {
3713 println!(" size: {} B (stored as-is)", report.original_bytes);
3714 }
3715 if let Some(mime) = &report.mime {
3716 println!(" mime: {mime}");
3717 }
3718 if let Some(title) = &report.title {
3719 println!(" title: {title}");
3720 }
3721 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3722 println!(
3723 " extraction: status={} reader={}",
3724 report.extraction.status.label(),
3725 extraction_reader
3726 );
3727 if let Some(ms) = report.extraction.duration_ms {
3728 println!(" extraction-duration: {} ms", ms);
3729 }
3730 if let Some(pages) = report.extraction.pages_processed {
3731 println!(" extraction-pages: {}", pages);
3732 }
3733 for warning in &report.extraction.warnings {
3734 println!(" warning: {warning}");
3735 }
3736 if let Some(metadata) = &report.metadata {
3737 if let Some(caption) = &metadata.caption {
3738 println!(" caption: {caption}");
3739 }
3740 if let Some(audio) = metadata.audio.as_ref() {
3741 if audio.duration_secs.is_some()
3742 || audio.sample_rate_hz.is_some()
3743 || audio.channels.is_some()
3744 {
3745 let duration = audio
3746 .duration_secs
3747 .map(|secs| format!("{secs:.1}s"))
3748 .unwrap_or_else(|| "unknown".into());
3749 let rate = audio
3750 .sample_rate_hz
3751 .map(|hz| format!("{} Hz", hz))
3752 .unwrap_or_else(|| "? Hz".into());
3753 let channels = audio
3754 .channels
3755 .map(|ch| ch.to_string())
3756 .unwrap_or_else(|| "?".into());
3757 println!(" audio: duration {duration}, {channels} ch, {rate}");
3758 }
3759 }
3760 }
3761
3762 println!();
3763}
3764
3765fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3766 let source = report
3767 .source
3768 .as_ref()
3769 .map(|path| path.to_string_lossy().into_owned());
3770 let extraction = json!({
3771 "status": report.extraction.status.label(),
3772 "reader": report.extraction.reader,
3773 "duration_ms": report.extraction.duration_ms,
3774 "pages_processed": report.extraction.pages_processed,
3775 "warnings": report.extraction.warnings,
3776 });
3777 json!({
3778 "seq": report.seq,
3779 "uri": report.uri,
3780 "title": report.title,
3781 "original_bytes": report.original_bytes,
3782 "original_bytes_human": format_bytes(report.original_bytes as u64),
3783 "stored_bytes": report.stored_bytes,
3784 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3785 "compressed": report.compressed,
3786 "mime": report.mime,
3787 "source": source,
3788 "metadata": report.metadata,
3789 "extraction": extraction,
3790 })
3791}
3792
3793fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3794 match err {
3795 MemvidError::CapacityExceeded {
3796 current,
3797 limit,
3798 required,
3799 } => anyhow!(CapacityExceededMessage {
3800 current,
3801 limit,
3802 required,
3803 }),
3804 other => other.into(),
3805 }
3806}
3807
3808fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3809 for (idx, entry) in entries.iter().enumerate() {
3810 match serde_json::from_str::<DocMetadata>(entry) {
3811 Ok(meta) => {
3812 options.metadata = Some(meta);
3813 }
3814 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3815 Ok(value) => {
3816 options
3817 .extra_metadata
3818 .insert(format!("custom_metadata_{idx}"), value.to_string());
3819 }
3820 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3821 },
3822 }
3823 }
3824}
3825
3826fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3827 let stats = mem.stats()?;
3828 let message = match stats.tier {
3829 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3830 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3831 };
3832 Ok(message)
3833}
3834
3835fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3836 let trimmed = raw.trim().trim_start_matches("mv2://");
3837 let normalized = trimmed.replace('\\', "/");
3838 let mut segments: Vec<String> = Vec::new();
3839 for segment in normalized.split('/') {
3840 if segment.is_empty() || segment == "." {
3841 continue;
3842 }
3843 if segment == ".." {
3844 segments.pop();
3845 continue;
3846 }
3847 segments.push(segment.to_string());
3848 }
3849
3850 if segments.is_empty() {
3851 return normalized
3852 .split('/')
3853 .last()
3854 .filter(|s| !s.is_empty())
3855 .unwrap_or("document")
3856 .to_string();
3857 }
3858
3859 if keep_path {
3860 segments.join("/")
3861 } else {
3862 segments
3863 .last()
3864 .cloned()
3865 .unwrap_or_else(|| "document".to_string())
3866 }
3867}
3868
3869fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3870 if quiet {
3871 let pb = ProgressBar::hidden();
3872 pb.set_message(message.to_string());
3873 return pb;
3874 }
3875
3876 match total {
3877 Some(len) => {
3878 let pb = ProgressBar::new(len);
3879 pb.set_draw_target(ProgressDrawTarget::stderr());
3880 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3881 .unwrap_or_else(|_| ProgressStyle::default_bar());
3882 pb.set_style(style);
3883 pb.set_message(message.to_string());
3884 pb
3885 }
3886 None => {
3887 let pb = ProgressBar::new_spinner();
3888 pb.set_draw_target(ProgressDrawTarget::stderr());
3889 pb.set_message(message.to_string());
3890 pb.enable_steady_tick(Duration::from_millis(120));
3891 pb
3892 }
3893 }
3894}
3895
3896fn create_spinner(message: &str) -> ProgressBar {
3897 let pb = ProgressBar::new_spinner();
3898 pb.set_draw_target(ProgressDrawTarget::stderr());
3899 let style = ProgressStyle::with_template("{spinner} {msg}")
3900 .unwrap_or_else(|_| ProgressStyle::default_spinner())
3901 .tick_strings(&["-", "\\", "|", "/"]);
3902 pb.set_style(style);
3903 pb.set_message(message.to_string());
3904 pb.enable_steady_tick(Duration::from_millis(120));
3905 pb
3906}
3907
3908#[cfg(feature = "parallel_segments")]
3914#[derive(Args)]
3915pub struct PutManyArgs {
3916 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3918 pub file: PathBuf,
3919 #[arg(long)]
3921 pub json: bool,
3922 #[arg(long, value_name = "PATH")]
3925 pub input: Option<PathBuf>,
3926 #[arg(long, default_value = "3")]
3928 pub compression_level: i32,
3929 #[command(flatten)]
3930 pub lock: LockCliArgs,
3931}
3932
3933#[cfg(feature = "parallel_segments")]
3935#[derive(Debug, serde::Deserialize)]
3936pub struct PutManyRequest {
3937 pub title: String,
3938 #[serde(default)]
3939 pub label: String,
3940 pub text: String,
3941 #[serde(default)]
3942 pub uri: Option<String>,
3943 #[serde(default)]
3944 pub tags: Vec<String>,
3945 #[serde(default)]
3946 pub labels: Vec<String>,
3947 #[serde(default)]
3948 pub metadata: serde_json::Value,
3949 #[serde(default)]
3951 pub embedding: Option<Vec<f32>>,
3952 #[serde(default)]
3956 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3957}
3958
3959#[cfg(feature = "parallel_segments")]
3961#[derive(Debug, serde::Deserialize)]
3962#[serde(transparent)]
3963pub struct PutManyBatch {
3964 pub requests: Vec<PutManyRequest>,
3965}
3966
3967#[cfg(feature = "parallel_segments")]
3968pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3969 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3970
3971 let mut mem = Memvid::open(&args.file)?;
3972 crate::utils::ensure_cli_mutation_allowed(&mem)?;
3973 crate::utils::apply_lock_cli(&mut mem, &args.lock);
3974
3975 let stats = mem.stats()?;
3977 if !stats.has_lex_index {
3978 mem.enable_lex()?;
3979 }
3980 if !stats.has_vec_index {
3981 mem.enable_vec()?;
3982 }
3983
3984 let batch_json: String = if let Some(input_path) = &args.input {
3986 fs::read_to_string(input_path)
3987 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3988 } else {
3989 let mut buffer = String::new();
3990 io::stdin()
3991 .read_line(&mut buffer)
3992 .context("Failed to read from stdin")?;
3993 buffer
3994 };
3995
3996 let batch: PutManyBatch =
3997 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3998
3999 if batch.requests.is_empty() {
4000 if args.json {
4001 println!("{{\"frame_ids\": [], \"count\": 0}}");
4002 } else {
4003 println!("No requests to process");
4004 }
4005 return Ok(());
4006 }
4007
4008 let count = batch.requests.len();
4009 if !args.json {
4010 eprintln!("Processing {} documents...", count);
4011 }
4012
4013 let inputs: Vec<ParallelInput> = batch
4015 .requests
4016 .into_iter()
4017 .enumerate()
4018 .map(|(i, req)| {
4019 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4020 let mut options = PutOptions::default();
4021 options.title = Some(req.title);
4022 options.uri = Some(uri);
4023 options.tags = req.tags;
4024 options.labels = req.labels;
4025
4026 ParallelInput {
4027 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4028 options,
4029 embedding: req.embedding,
4030 chunk_embeddings: req.chunk_embeddings,
4031 }
4032 })
4033 .collect();
4034
4035 let build_opts = BuildOpts {
4037 zstd_level: args.compression_level.clamp(1, 9),
4038 ..BuildOpts::default()
4039 };
4040
4041 let frame_ids = mem
4043 .put_parallel_inputs(&inputs, build_opts)
4044 .context("Failed to ingest batch")?;
4045
4046 if args.json {
4047 let output = serde_json::json!({
4048 "frame_ids": frame_ids,
4049 "count": frame_ids.len()
4050 });
4051 println!("{}", serde_json::to_string(&output)?);
4052 } else {
4053 println!("Ingested {} documents", frame_ids.len());
4054 if frame_ids.len() <= 10 {
4055 for fid in &frame_ids {
4056 println!(" frame_id: {}", fid);
4057 }
4058 } else {
4059 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4060 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4061 }
4062 }
4063
4064 Ok(())
4065}