1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34};
35#[cfg(feature = "clip")]
36use memvid_core::FrameRole;
37#[cfg(feature = "parallel_segments")]
38use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
39use pdf_extract::OutputError as PdfExtractError;
40use serde_json::json;
41use tracing::{info, warn};
42use tree_magic_mini::from_u8 as magic_from_u8;
43use uuid::Uuid;
44
45const MAX_SEARCH_TEXT_LEN: usize = 32_768;
46const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
49const COLOR_PALETTE_SIZE: u8 = 5;
50const COLOR_PALETTE_QUALITY: u8 = 8;
51const MAGIC_SNIFF_BYTES: usize = 16;
52#[cfg(feature = "clip")]
55const CLIP_PDF_MAX_IMAGES: usize = 100;
56#[cfg(feature = "clip")]
57const CLIP_PDF_TARGET_PX: u32 = 768;
58
59fn truncate_for_embedding(text: &str) -> String {
62 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
63 text.to_string()
64 } else {
65 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
67 let end = truncated
69 .char_indices()
70 .rev()
71 .next()
72 .map(|(i, c)| i + c.len_utf8())
73 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
74 text[..end].to_string()
75 }
76}
77
78#[cfg(feature = "llama-cpp")]
81fn get_local_contextual_model_path() -> Result<PathBuf> {
82 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
83 let expanded = if custom.starts_with("~/") {
85 if let Ok(home) = env::var("HOME") {
86 PathBuf::from(home).join(&custom[2..])
87 } else {
88 PathBuf::from(&custom)
89 }
90 } else {
91 PathBuf::from(&custom)
92 };
93 expanded
94 } else {
95 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
97 PathBuf::from(home).join(".memvid").join("models")
98 };
99
100 let model_path = models_dir
101 .join("llm")
102 .join("phi-3-mini-q4")
103 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
104
105 if model_path.exists() {
106 Ok(model_path)
107 } else {
108 Err(anyhow!(
109 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
110 model_path.display()
111 ))
112 }
113}
114
115use pathdiff::diff_paths;
116
117use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
118use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
119use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
120use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
121use crate::error::{CapacityExceededMessage, DuplicateUriError};
122use crate::utils::{
123 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
124 select_frame,
125};
126#[cfg(feature = "temporal_enrich")]
127use memvid_core::enrich_chunks as temporal_enrich_chunks;
128
129fn parse_bool_flag(value: &str) -> Option<bool> {
131 match value.trim().to_ascii_lowercase().as_str() {
132 "1" | "true" | "yes" | "on" => Some(true),
133 "0" | "false" | "no" | "off" => Some(false),
134 _ => None,
135 }
136}
137
138#[cfg(feature = "parallel_segments")]
140fn parallel_env_override() -> Option<bool> {
141 std::env::var("MEMVID_PARALLEL_SEGMENTS")
142 .ok()
143 .and_then(|value| parse_bool_flag(&value))
144}
145
146#[cfg(feature = "clip")]
148fn is_clip_model_installed() -> bool {
149 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
150 PathBuf::from(custom)
151 } else if let Ok(home) = env::var("HOME") {
152 PathBuf::from(home).join(".memvid/models")
153 } else {
154 PathBuf::from(".memvid/models")
155 };
156
157 let model_name =
158 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
159
160 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
161 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
162
163 vision_model.exists() && text_model.exists()
164}
165
166#[cfg(feature = "logic_mesh")]
169const NER_MIN_CONFIDENCE: f32 = 0.50;
170
171#[cfg(feature = "logic_mesh")]
173const NER_MIN_ENTITY_LEN: usize = 2;
174
175#[cfg(feature = "logic_mesh")]
178const MAX_MERGE_GAP: usize = 3;
179
180#[cfg(feature = "logic_mesh")]
187fn merge_adjacent_entities(
188 entities: Vec<memvid_core::ExtractedEntity>,
189 original_text: &str,
190) -> Vec<memvid_core::ExtractedEntity> {
191 use memvid_core::ExtractedEntity;
192
193 if entities.is_empty() {
194 return entities;
195 }
196
197 let mut sorted: Vec<ExtractedEntity> = entities;
199 sorted.sort_by_key(|e| e.byte_start);
200
201 let mut merged: Vec<ExtractedEntity> = Vec::new();
202
203 for entity in sorted {
204 if let Some(last) = merged.last_mut() {
205 let gap = entity.byte_start.saturating_sub(last.byte_end);
207 let same_type = last.entity_type == entity.entity_type;
208
209 let gap_is_mergeable = if gap == 0 {
211 true
212 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
213 let gap_text = &original_text[last.byte_end..entity.byte_start];
215 if gap_text.contains('\n') || gap_text.contains('\r') {
217 false
218 } else {
219 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
220 }
221 } else {
222 false
223 };
224
225 if same_type && gap_is_mergeable {
226 let merged_text = if entity.byte_end <= original_text.len() {
228 original_text[last.byte_start..entity.byte_end].to_string()
229 } else {
230 format!("{}{}", last.text, entity.text)
231 };
232
233 tracing::debug!(
234 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
235 last.text, entity.text, merged_text, gap
236 );
237
238 last.text = merged_text;
239 last.byte_end = entity.byte_end;
240 last.confidence = (last.confidence + entity.confidence) / 2.0;
242 continue;
243 }
244 }
245
246 merged.push(entity);
247 }
248
249 merged
250}
251
252#[cfg(feature = "logic_mesh")]
255fn is_valid_entity(text: &str, confidence: f32) -> bool {
256 if confidence < NER_MIN_CONFIDENCE {
258 return false;
259 }
260
261 let trimmed = text.trim();
262
263 if trimmed.len() < NER_MIN_ENTITY_LEN {
265 return false;
266 }
267
268 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
270 return false;
271 }
272
273 if trimmed.contains('\n') || trimmed.contains('\r') {
275 return false;
276 }
277
278 if trimmed.starts_with("##") || trimmed.ends_with("##") {
280 return false;
281 }
282
283 if trimmed.ends_with('.') {
285 return false;
286 }
287
288 let lower = trimmed.to_lowercase();
289
290 if trimmed.len() == 1 {
292 return false;
293 }
294
295 if trimmed.len() == 2 {
297 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
299 return true;
300 }
301 return false;
303 }
304
305 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
307 return false;
308 }
309
310 let words: Vec<&str> = trimmed.split_whitespace().collect();
313 if words.len() >= 2 {
314 let first_word = words[0];
315 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
317 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
319 return false;
320 }
321 }
322 }
323
324 if let Some(first_char) = trimmed.chars().next() {
326 if first_char.is_lowercase() {
327 return false;
328 }
329 }
330
331 true
332}
333
334pub fn parse_key_val(s: &str) -> Result<(String, String)> {
336 let (key, value) = s
337 .split_once('=')
338 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
339 Ok((key.to_string(), value.to_string()))
340}
341
342#[derive(Args, Clone, Copy, Debug)]
344pub struct LockCliArgs {
345 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
347 pub lock_timeout: u64,
348 #[arg(long = "force", action = ArgAction::SetTrue)]
350 pub force: bool,
351}
352
353#[derive(Args)]
355pub struct PutArgs {
356 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
358 pub file: PathBuf,
359 #[arg(long)]
361 pub json: bool,
362 #[arg(long, value_name = "PATH")]
364 pub input: Option<String>,
365 #[arg(long, value_name = "URI")]
367 pub uri: Option<String>,
368 #[arg(long, value_name = "TITLE")]
370 pub title: Option<String>,
371 #[arg(long)]
373 pub timestamp: Option<i64>,
374 #[arg(long)]
376 pub track: Option<String>,
377 #[arg(long)]
379 pub kind: Option<String>,
380 #[arg(long, action = ArgAction::SetTrue)]
382 pub video: bool,
383 #[arg(long, action = ArgAction::SetTrue)]
385 pub transcript: bool,
386 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
388 pub tags: Vec<(String, String)>,
389 #[arg(long = "label", value_name = "LABEL")]
391 pub labels: Vec<String>,
392 #[arg(long = "metadata", value_name = "JSON")]
394 pub metadata: Vec<String>,
395 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
397 pub no_auto_tag: bool,
398 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
400 pub no_extract_dates: bool,
401 #[arg(long, action = ArgAction::SetTrue)]
403 pub audio: bool,
404 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
406 pub audio_segment_seconds: Option<u32>,
407 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
410 pub embedding: bool,
411 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
413 pub no_embedding: bool,
414 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
417 pub embedding_vec: Option<PathBuf>,
418 #[arg(long, action = ArgAction::SetTrue)]
422 pub vector_compression: bool,
423 #[arg(long, action = ArgAction::SetTrue)]
427 pub contextual: bool,
428 #[arg(long = "contextual-model", value_name = "MODEL")]
430 pub contextual_model: Option<String>,
431 #[arg(long, action = ArgAction::SetTrue)]
434 pub tables: bool,
435 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
437 pub no_tables: bool,
438 #[cfg(feature = "clip")]
442 #[arg(long, action = ArgAction::SetTrue)]
443 pub clip: bool,
444
445 #[cfg(feature = "clip")]
447 #[arg(long, action = ArgAction::SetTrue)]
448 pub no_clip: bool,
449
450 #[arg(long, conflicts_with = "allow_duplicate")]
452 pub update_existing: bool,
453 #[arg(long)]
455 pub allow_duplicate: bool,
456
457 #[arg(long, action = ArgAction::SetTrue)]
461 pub temporal_enrich: bool,
462
463 #[arg(long, action = ArgAction::SetTrue)]
468 pub logic_mesh: bool,
469
470 #[cfg(feature = "parallel_segments")]
472 #[arg(long, action = ArgAction::SetTrue)]
473 pub parallel_segments: bool,
474 #[cfg(feature = "parallel_segments")]
476 #[arg(long, action = ArgAction::SetTrue)]
477 pub no_parallel_segments: bool,
478 #[cfg(feature = "parallel_segments")]
480 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
481 pub parallel_segment_tokens: Option<usize>,
482 #[cfg(feature = "parallel_segments")]
484 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
485 pub parallel_segment_pages: Option<usize>,
486 #[cfg(feature = "parallel_segments")]
488 #[arg(long = "parallel-threads", value_name = "N")]
489 pub parallel_threads: Option<usize>,
490 #[cfg(feature = "parallel_segments")]
492 #[arg(long = "parallel-queue-depth", value_name = "N")]
493 pub parallel_queue_depth: Option<usize>,
494
495 #[command(flatten)]
496 pub lock: LockCliArgs,
497}
498
499#[cfg(feature = "parallel_segments")]
500impl PutArgs {
501 pub fn wants_parallel(&self) -> bool {
502 if self.parallel_segments {
503 return true;
504 }
505 if self.no_parallel_segments {
506 return false;
507 }
508 if let Some(env_flag) = parallel_env_override() {
509 return env_flag;
510 }
511 false
512 }
513
514 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
515 let mut opts = memvid_core::BuildOpts::default();
516 if let Some(tokens) = self.parallel_segment_tokens {
517 opts.segment_tokens = tokens;
518 }
519 if let Some(pages) = self.parallel_segment_pages {
520 opts.segment_pages = pages;
521 }
522 if let Some(threads) = self.parallel_threads {
523 opts.threads = threads;
524 }
525 if let Some(depth) = self.parallel_queue_depth {
526 opts.queue_depth = depth;
527 }
528 opts.sanitize();
529 opts
530 }
531}
532
533#[cfg(not(feature = "parallel_segments"))]
534impl PutArgs {
535 pub fn wants_parallel(&self) -> bool {
536 false
537 }
538}
539
540#[derive(Args)]
542pub struct ApiFetchArgs {
543 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
545 pub file: PathBuf,
546 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
548 pub config: PathBuf,
549 #[arg(long, action = ArgAction::SetTrue)]
551 pub dry_run: bool,
552 #[arg(long, value_name = "MODE")]
554 pub mode: Option<ApiFetchMode>,
555 #[arg(long, value_name = "URI")]
557 pub uri: Option<String>,
558 #[arg(long, action = ArgAction::SetTrue)]
560 pub json: bool,
561
562 #[command(flatten)]
563 pub lock: LockCliArgs,
564}
565
566#[derive(Args)]
568pub struct UpdateArgs {
569 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
570 pub file: PathBuf,
571 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
572 pub frame_id: Option<u64>,
573 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
574 pub uri: Option<String>,
575 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
576 pub input: Option<PathBuf>,
577 #[arg(long = "set-uri", value_name = "URI")]
578 pub set_uri: Option<String>,
579 #[arg(long, value_name = "TITLE")]
580 pub title: Option<String>,
581 #[arg(long, value_name = "TIMESTAMP")]
582 pub timestamp: Option<i64>,
583 #[arg(long, value_name = "TRACK")]
584 pub track: Option<String>,
585 #[arg(long, value_name = "KIND")]
586 pub kind: Option<String>,
587 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
588 pub tags: Vec<(String, String)>,
589 #[arg(long = "label", value_name = "LABEL")]
590 pub labels: Vec<String>,
591 #[arg(long = "metadata", value_name = "JSON")]
592 pub metadata: Vec<String>,
593 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
594 pub embeddings: bool,
595 #[arg(long)]
596 pub json: bool,
597
598 #[command(flatten)]
599 pub lock: LockCliArgs,
600}
601
602#[derive(Args)]
604pub struct DeleteArgs {
605 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
606 pub file: PathBuf,
607 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
608 pub frame_id: Option<u64>,
609 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
610 pub uri: Option<String>,
611 #[arg(long, action = ArgAction::SetTrue)]
612 pub yes: bool,
613 #[arg(long)]
614 pub json: bool,
615
616 #[command(flatten)]
617 pub lock: LockCliArgs,
618}
619
620pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
622 let command = ApiFetchCommand {
623 file: args.file,
624 config_path: args.config,
625 dry_run: args.dry_run,
626 mode_override: args.mode,
627 uri_override: args.uri,
628 output_json: args.json,
629 lock_timeout_ms: args.lock.lock_timeout,
630 force_lock: args.lock.force,
631 };
632 run_api_fetch(config, command)
633}
634
635pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
637 let mut mem = Memvid::open(&args.file)?;
638 ensure_cli_mutation_allowed(&mem)?;
639 apply_lock_cli(&mut mem, &args.lock);
640 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
641
642 if !args.yes {
643 if let Some(uri) = &frame.uri {
644 println!("About to delete frame {} ({uri})", frame.id);
645 } else {
646 println!("About to delete frame {}", frame.id);
647 }
648 print!("Confirm? [y/N]: ");
649 io::stdout().flush()?;
650 let mut input = String::new();
651 io::stdin().read_line(&mut input)?;
652 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
653 if !confirmed {
654 println!("Aborted");
655 return Ok(());
656 }
657 }
658
659 let seq = mem.delete_frame(frame.id)?;
660 mem.commit()?;
661 let deleted = mem.frame_by_id(frame.id)?;
662
663 if args.json {
664 let json = json!({
665 "frame_id": frame.id,
666 "sequence": seq,
667 "status": frame_status_str(deleted.status),
668 });
669 println!("{}", serde_json::to_string_pretty(&json)?);
670 } else {
671 println!("Deleted frame {} (seq {})", frame.id, seq);
672 }
673
674 Ok(())
675}
676pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
677 let mut mem = Memvid::open(&args.file)?;
678 ensure_cli_mutation_allowed(&mem)?;
679 apply_lock_cli(&mut mem, &args.lock);
680 let mut stats = mem.stats()?;
681 if stats.frame_count == 0 && !stats.has_lex_index {
682 mem.enable_lex()?;
683 stats = mem.stats()?;
684 }
685
686 if args.vector_compression {
688 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
689 }
690
691 let mut capacity_guard = CapacityGuard::from_stats(&stats);
692 let use_parallel = args.wants_parallel();
693 #[cfg(feature = "parallel_segments")]
694 let mut parallel_opts = if use_parallel {
695 Some(args.sanitized_parallel_opts())
696 } else {
697 None
698 };
699 #[cfg(feature = "parallel_segments")]
700 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
701 #[cfg(feature = "parallel_segments")]
702 let mut pending_parallel_indices: Vec<usize> = Vec::new();
703 let input_set = resolve_inputs(args.input.as_deref())?;
704
705 if args.video {
706 if let Some(kind) = &args.kind {
707 if !kind.eq_ignore_ascii_case("video") {
708 anyhow::bail!("--video conflicts with explicit --kind={kind}");
709 }
710 }
711 if matches!(input_set, InputSet::Stdin) {
712 anyhow::bail!("--video requires --input <file>");
713 }
714 }
715
716 #[allow(dead_code)]
717 enum EmbeddingMode {
718 None,
719 Auto,
720 Explicit,
721 }
722
723 let mv2_dimension = mem.vec_index_dimension();
727 let (embedding_mode, embedding_runtime) = if args.embedding {
728 (
729 EmbeddingMode::Explicit,
730 Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
731 )
732 } else {
733 (EmbeddingMode::None, None)
734 };
735 let embedding_enabled = embedding_runtime.is_some();
736 let runtime_ref = embedding_runtime.as_ref();
737
738 #[cfg(feature = "clip")]
743 let clip_enabled = {
744 if args.no_clip {
745 false
746 } else if args.clip {
747 true } else {
749 let model_available = is_clip_model_installed();
751 if model_available {
752 match &input_set {
754 InputSet::Files(paths) => paths.iter().any(|p| {
755 is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
756 }),
757 InputSet::Stdin => false,
758 }
759 } else {
760 false
761 }
762 }
763 };
764 #[cfg(feature = "clip")]
765 let clip_model = if clip_enabled {
766 mem.enable_clip()?;
767 if !args.json {
768 let mode = if args.clip {
769 "explicit"
770 } else {
771 "auto-detected"
772 };
773 eprintln!("ℹ️ CLIP visual embeddings enabled ({mode})");
774 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
775 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
776 eprintln!(" Use --no-clip to disable auto-detection");
777 eprintln!();
778 }
779 match memvid_core::ClipModel::default_model() {
781 Ok(model) => Some(model),
782 Err(e) => {
783 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
784 None
785 }
786 }
787 } else {
788 None
789 };
790
791 if embedding_enabled && !args.json {
793 let capacity = stats.capacity_bytes;
794 if args.vector_compression {
795 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
798 eprintln!(" Storage: ~20 KB per document (16x compressed)");
799 eprintln!(" Accuracy: ~95% recall@10 maintained");
800 eprintln!(
801 " Capacity: ~{} documents max for {:.1} GB",
802 max_docs,
803 capacity as f64 / 1e9
804 );
805 eprintln!();
806 } else {
807 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
809 eprintln!(" Storage: ~270 KB per document");
810 eprintln!(
811 " Capacity: ~{} documents max for {:.1} GB",
812 max_docs,
813 capacity as f64 / 1e9
814 );
815 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
816 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
817 eprintln!();
818 }
819 }
820
821 let embed_progress = if embedding_enabled {
822 let total = match &input_set {
823 InputSet::Files(paths) => Some(paths.len() as u64),
824 InputSet::Stdin => None,
825 };
826 Some(create_task_progress_bar(total, "embed", false))
827 } else {
828 None
829 };
830 let mut embedded_docs = 0usize;
831
832 if let InputSet::Files(paths) = &input_set {
833 if paths.len() > 1 {
834 if args.uri.is_some() || args.title.is_some() {
835 anyhow::bail!(
836 "--uri/--title apply to a single file; omit them when using a directory or glob"
837 );
838 }
839 }
840 }
841
842 let mut reports = Vec::new();
843 let mut processed = 0usize;
844 let mut skipped = 0usize;
845 let mut bytes_added = 0u64;
846 let mut summary_warnings: Vec<String> = Vec::new();
847 #[cfg(feature = "clip")]
848 let mut clip_embeddings_added = false;
849
850 let mut capacity_reached = false;
851 match input_set {
852 InputSet::Stdin => {
853 processed += 1;
854 match read_payload(None) {
855 Ok(payload) => {
856 let mut analyze_spinner = if args.json {
857 None
858 } else {
859 Some(create_spinner("Analyzing stdin payload..."))
860 };
861 match ingest_payload(
862 &mut mem,
863 &args,
864 payload,
865 None,
866 capacity_guard.as_mut(),
867 embedding_enabled,
868 runtime_ref,
869 use_parallel,
870 ) {
871 Ok(outcome) => {
872 if let Some(pb) = analyze_spinner.take() {
873 pb.finish_and_clear();
874 }
875 bytes_added += outcome.report.stored_bytes as u64;
876 if args.json {
877 summary_warnings
878 .extend(outcome.report.extraction.warnings.iter().cloned());
879 }
880 reports.push(outcome.report);
881 let report_index = reports.len() - 1;
882 if !args.json && !use_parallel {
883 if let Some(report) = reports.get(report_index) {
884 print_report(report);
885 }
886 }
887 if args.transcript {
888 let notice = transcript_notice_message(&mut mem)?;
889 if args.json {
890 summary_warnings.push(notice);
891 } else {
892 println!("{notice}");
893 }
894 }
895 if outcome.embedded {
896 if let Some(pb) = embed_progress.as_ref() {
897 pb.inc(1);
898 }
899 embedded_docs += 1;
900 }
901 if use_parallel {
902 #[cfg(feature = "parallel_segments")]
903 if let Some(input) = outcome.parallel_input {
904 pending_parallel_indices.push(report_index);
905 pending_parallel_inputs.push(input);
906 }
907 } else if let Some(guard) = capacity_guard.as_mut() {
908 mem.commit()?;
909 let stats = mem.stats()?;
910 guard.update_after_commit(&stats);
911 guard.check_and_warn_capacity(); }
913 }
914 Err(err) => {
915 if let Some(pb) = analyze_spinner.take() {
916 pb.finish_and_clear();
917 }
918 if err.downcast_ref::<DuplicateUriError>().is_some() {
919 return Err(err);
920 }
921 warn!("skipped stdin payload: {err}");
922 skipped += 1;
923 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
924 capacity_reached = true;
925 }
926 }
927 }
928 }
929 Err(err) => {
930 warn!("failed to read stdin: {err}");
931 skipped += 1;
932 }
933 }
934 }
935 InputSet::Files(ref paths) => {
936 let mut transcript_notice_printed = false;
937 for path in paths {
938 processed += 1;
939 match read_payload(Some(&path)) {
940 Ok(payload) => {
941 let label = path.display().to_string();
942 let mut analyze_spinner = if args.json {
943 None
944 } else {
945 Some(create_spinner(&format!("Analyzing {label}...")))
946 };
947 match ingest_payload(
948 &mut mem,
949 &args,
950 payload,
951 Some(&path),
952 capacity_guard.as_mut(),
953 embedding_enabled,
954 runtime_ref,
955 use_parallel,
956 ) {
957 Ok(outcome) => {
958 if let Some(pb) = analyze_spinner.take() {
959 pb.finish_and_clear();
960 }
961 bytes_added += outcome.report.stored_bytes as u64;
962
963 #[cfg(feature = "clip")]
965 if let Some(ref model) = clip_model {
966 let mime = outcome.report.mime.as_deref();
967 let clip_frame_id = outcome.report.frame_id;
968
969 if is_image_file(&path) {
970 match model.encode_image_file(&path) {
971 Ok(embedding) => {
972 if let Err(e) = mem.add_clip_embedding_with_page(
973 clip_frame_id,
974 None,
975 embedding,
976 ) {
977 warn!(
978 "Failed to add CLIP embedding for {}: {e}",
979 path.display()
980 );
981 } else {
982 info!(
983 "Added CLIP embedding for frame {}",
984 clip_frame_id
985 );
986 clip_embeddings_added = true;
987 }
988 }
989 Err(e) => {
990 warn!(
991 "Failed to encode CLIP embedding for {}: {e}",
992 path.display()
993 );
994 }
995 }
996 } else if matches!(mime, Some(m) if m == "application/pdf") {
997 if let Err(e) = mem.commit() {
999 warn!("Failed to commit before CLIP extraction: {e}");
1000 }
1001
1002 match render_pdf_pages_for_clip(
1003 &path,
1004 CLIP_PDF_MAX_IMAGES,
1005 CLIP_PDF_TARGET_PX,
1006 ) {
1007 Ok(rendered_pages) => {
1008 use rayon::prelude::*;
1009
1010 let path_for_closure = path.clone();
1013
1014 let prev_hook = std::panic::take_hook();
1017 std::panic::set_hook(Box::new(|_| {
1018 }));
1020
1021 let encoded_images: Vec<_> = rendered_pages
1022 .into_par_iter()
1023 .filter_map(|(page_num, image)| {
1024 let image_info = get_image_info(&image);
1026 if !image_info.should_embed() {
1027 info!(
1028 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1029 path_for_closure.display(),
1030 page_num,
1031 image_info.color_variance,
1032 image_info.width,
1033 image_info.height
1034 );
1035 return None;
1036 }
1037
1038 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1042 let rgb_image = image.to_rgb8();
1043 let mut png_bytes = Vec::new();
1044 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1045 &mut Cursor::new(&mut png_bytes),
1046 image::ImageFormat::Png,
1047 ).map(|()| png_bytes)
1048 }));
1049
1050 match encode_result {
1051 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1052 Ok(Err(e)) => {
1053 info!(
1054 "Skipping image for {} page {}: {e}",
1055 path_for_closure.display(),
1056 page_num
1057 );
1058 None
1059 }
1060 Err(_) => {
1061 info!(
1063 "Skipping malformed image for {} page {}",
1064 path_for_closure.display(),
1065 page_num
1066 );
1067 None
1068 }
1069 }
1070 })
1071 .collect();
1072
1073 std::panic::set_hook(prev_hook);
1075
1076 let mut child_frame_offset = 1u64;
1079 let parent_uri = outcome.report.uri.clone();
1080 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1081 let total_images = encoded_images.len();
1082
1083 let clip_pb = if !args.json && total_images > 0 {
1085 let pb = ProgressBar::new(total_images as u64);
1086 pb.set_style(
1087 ProgressStyle::with_template(
1088 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1089 )
1090 .unwrap_or_else(|_| ProgressStyle::default_bar())
1091 .progress_chars("█▓░"),
1092 );
1093 Some(pb)
1094 } else {
1095 None
1096 };
1097
1098 for (page_num, image, png_bytes) in encoded_images {
1099 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1100 let child_title = format!(
1101 "{} - Image {} (page {})",
1102 parent_title,
1103 child_frame_offset,
1104 page_num
1105 );
1106
1107 let child_options = PutOptions::builder()
1108 .uri(&child_uri)
1109 .title(&child_title)
1110 .parent_id(clip_frame_id)
1111 .role(FrameRole::ExtractedImage)
1112 .auto_tag(false)
1113 .extract_dates(false)
1114 .build();
1115
1116 match mem.put_bytes_with_options(&png_bytes, child_options) {
1117 Ok(_child_seq) => {
1118 let child_frame_id = clip_frame_id + child_frame_offset;
1119 child_frame_offset += 1;
1120
1121 match model.encode_image(&image) {
1122 Ok(embedding) => {
1123 if let Err(e) = mem
1124 .add_clip_embedding_with_page(
1125 child_frame_id,
1126 Some(page_num),
1127 embedding,
1128 )
1129 {
1130 warn!(
1131 "Failed to add CLIP embedding for {} page {}: {e}",
1132 path.display(),
1133 page_num
1134 );
1135 } else {
1136 info!(
1137 "Added CLIP image frame {} for {} page {}",
1138 child_frame_id,
1139 clip_frame_id,
1140 page_num
1141 );
1142 clip_embeddings_added = true;
1143 }
1144 }
1145 Err(e) => warn!(
1146 "Failed to encode CLIP embedding for {} page {}: {e}",
1147 path.display(),
1148 page_num
1149 ),
1150 }
1151
1152 if let Err(e) = mem.commit() {
1154 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1155 }
1156 }
1157 Err(e) => warn!(
1158 "Failed to store image frame for {} page {}: {e}",
1159 path.display(),
1160 page_num
1161 ),
1162 }
1163
1164 if let Some(ref pb) = clip_pb {
1165 pb.inc(1);
1166 }
1167 }
1168
1169 if let Some(pb) = clip_pb {
1170 pb.finish_and_clear();
1171 }
1172 }
1173 Err(err) => warn!(
1174 "Failed to render PDF pages for CLIP {}: {err}",
1175 path.display()
1176 ),
1177 }
1178 }
1179 }
1180
1181 if args.json {
1182 summary_warnings
1183 .extend(outcome.report.extraction.warnings.iter().cloned());
1184 }
1185 reports.push(outcome.report);
1186 let report_index = reports.len() - 1;
1187 if !args.json && !use_parallel {
1188 if let Some(report) = reports.get(report_index) {
1189 print_report(report);
1190 }
1191 }
1192 if args.transcript && !transcript_notice_printed {
1193 let notice = transcript_notice_message(&mut mem)?;
1194 if args.json {
1195 summary_warnings.push(notice);
1196 } else {
1197 println!("{notice}");
1198 }
1199 transcript_notice_printed = true;
1200 }
1201 if outcome.embedded {
1202 if let Some(pb) = embed_progress.as_ref() {
1203 pb.inc(1);
1204 }
1205 embedded_docs += 1;
1206 }
1207 if use_parallel {
1208 #[cfg(feature = "parallel_segments")]
1209 if let Some(input) = outcome.parallel_input {
1210 pending_parallel_indices.push(report_index);
1211 pending_parallel_inputs.push(input);
1212 }
1213 } else if let Some(guard) = capacity_guard.as_mut() {
1214 mem.commit()?;
1215 let stats = mem.stats()?;
1216 guard.update_after_commit(&stats);
1217 guard.check_and_warn_capacity(); }
1219 }
1220 Err(err) => {
1221 if let Some(pb) = analyze_spinner.take() {
1222 pb.finish_and_clear();
1223 }
1224 if err.downcast_ref::<DuplicateUriError>().is_some() {
1225 return Err(err);
1226 }
1227 warn!("skipped {}: {err}", path.display());
1228 skipped += 1;
1229 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1230 capacity_reached = true;
1231 }
1232 }
1233 }
1234 }
1235 Err(err) => {
1236 warn!("failed to read {}: {err}", path.display());
1237 skipped += 1;
1238 }
1239 }
1240 if capacity_reached {
1241 break;
1242 }
1243 }
1244 }
1245 }
1246
1247 if capacity_reached {
1248 warn!("capacity limit reached; remaining inputs were not processed");
1249 }
1250
1251 if let Some(pb) = embed_progress.as_ref() {
1252 pb.finish_with_message("embed");
1253 }
1254
1255 if let Some(runtime) = embedding_runtime.as_ref() {
1256 if embedded_docs > 0 {
1257 match embedding_mode {
1258 EmbeddingMode::Explicit => println!(
1259 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1260 embedded_docs,
1261 runtime.dimension()
1262 ),
1263 EmbeddingMode::Auto => println!(
1264 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1265 embedded_docs,
1266 runtime.dimension()
1267 ),
1268 EmbeddingMode::None => {}
1269 }
1270 } else {
1271 match embedding_mode {
1272 EmbeddingMode::Explicit => {
1273 println!("No embeddings were generated (no textual content available)");
1274 }
1275 EmbeddingMode::Auto => {
1276 println!(
1277 "Semantic runtime available, but no textual content produced embeddings"
1278 );
1279 }
1280 EmbeddingMode::None => {}
1281 }
1282 }
1283 }
1284
1285 if reports.is_empty() {
1286 if args.json {
1287 if capacity_reached {
1288 summary_warnings.push("capacity_limit_reached".to_string());
1289 }
1290 let summary_json = json!({
1291 "processed": processed,
1292 "ingested": 0,
1293 "skipped": skipped,
1294 "bytes_added": bytes_added,
1295 "bytes_added_human": format_bytes(bytes_added),
1296 "embedded_documents": embedded_docs,
1297 "capacity_reached": capacity_reached,
1298 "warnings": summary_warnings,
1299 "reports": Vec::<serde_json::Value>::new(),
1300 });
1301 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1302 } else {
1303 println!(
1304 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1305 processed, skipped
1306 );
1307 }
1308 return Ok(());
1309 }
1310
1311 #[cfg(feature = "parallel_segments")]
1312 let mut parallel_flush_spinner = if use_parallel && !args.json {
1313 Some(create_spinner("Flushing parallel segments..."))
1314 } else {
1315 None
1316 };
1317
1318 if use_parallel {
1319 #[cfg(feature = "parallel_segments")]
1320 {
1321 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1322 let mut opts = BuildOpts::default();
1323 opts.sanitize();
1324 opts
1325 });
1326 opts.vec_compression = mem.vector_compression().clone();
1328 let seqs = if pending_parallel_inputs.is_empty() {
1329 mem.commit_parallel(opts)?;
1330 Vec::new()
1331 } else {
1332 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1333 };
1334 if let Some(pb) = parallel_flush_spinner.take() {
1335 pb.finish_with_message("Flushed parallel segments");
1336 }
1337 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1338 if let Some(report) = reports.get_mut(idx) {
1339 report.seq = seq;
1340 }
1341 }
1342 }
1343 #[cfg(not(feature = "parallel_segments"))]
1344 {
1345 mem.commit()?;
1346 }
1347 } else {
1348 mem.commit()?;
1349 }
1350
1351 #[cfg(feature = "clip")]
1353 if clip_embeddings_added {
1354 mem.commit()?;
1355 }
1356
1357 if !args.json && use_parallel {
1358 for report in &reports {
1359 print_report(report);
1360 }
1361 }
1362
1363 let mut tables_extracted = 0usize;
1365 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1366 if args.tables && !args.no_tables {
1367 if let InputSet::Files(ref paths) = input_set {
1368 let pdf_paths: Vec<_> = paths
1369 .iter()
1370 .filter(|p| {
1371 p.extension()
1372 .and_then(|e| e.to_str())
1373 .map(|e| e.eq_ignore_ascii_case("pdf"))
1374 .unwrap_or(false)
1375 })
1376 .collect();
1377
1378 if !pdf_paths.is_empty() {
1379 let table_spinner = if args.json {
1380 None
1381 } else {
1382 Some(create_spinner(&format!(
1383 "Extracting tables from {} PDF(s)...",
1384 pdf_paths.len()
1385 )))
1386 };
1387
1388 let table_options = TableExtractionOptions::builder()
1390 .mode(memvid_core::table::ExtractionMode::Aggressive)
1391 .min_rows(2)
1392 .min_cols(2)
1393 .min_quality(memvid_core::table::TableQuality::Low)
1394 .merge_multi_page(true)
1395 .build();
1396
1397 for pdf_path in pdf_paths {
1398 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1399 let filename = pdf_path
1400 .file_name()
1401 .and_then(|s| s.to_str())
1402 .unwrap_or("unknown.pdf");
1403
1404 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1405 for table in &result.tables {
1406 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1407 {
1408 tables_extracted += 1;
1409 table_summaries.push(json!({
1410 "table_id": table.table_id,
1411 "source_file": table.source_file,
1412 "meta_frame_id": meta_id,
1413 "rows": table.n_rows,
1414 "cols": table.n_cols,
1415 "quality": format!("{:?}", table.quality),
1416 "pages": format!("{}-{}", table.page_start, table.page_end),
1417 }));
1418 }
1419 }
1420 }
1421 }
1422 }
1423
1424 if let Some(pb) = table_spinner {
1425 if tables_extracted > 0 {
1426 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1427 } else {
1428 pb.finish_with_message("No tables found");
1429 }
1430 }
1431
1432 if tables_extracted > 0 {
1434 mem.commit()?;
1435 }
1436 }
1437 }
1438 }
1439
1440 #[cfg(feature = "logic_mesh")]
1443 let mut logic_mesh_entities = 0usize;
1444 #[cfg(feature = "logic_mesh")]
1445 {
1446 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1447
1448 let models_dir = config.models_dir.clone();
1449 let model_path = ner_model_path(&models_dir);
1450 let tokenizer_path = ner_tokenizer_path(&models_dir);
1451
1452 let ner_available = model_path.exists() && tokenizer_path.exists();
1454 let should_run_ner = args.logic_mesh;
1455
1456 if should_run_ner && !ner_available {
1457 if args.json {
1459 summary_warnings.push(
1460 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1461 );
1462 } else {
1463 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1464 eprintln!(" Run: memvid models install --ner distilbert-ner");
1465 }
1466 } else if should_run_ner {
1467 let ner_spinner = if args.json {
1468 None
1469 } else {
1470 Some(create_spinner("Building Logic-Mesh entity graph..."))
1471 };
1472
1473 match NerModel::load(&model_path, &tokenizer_path, None) {
1474 Ok(mut ner_model) => {
1475 let mut mesh = LogicMesh::new();
1476 let mut filtered_count = 0usize;
1477
1478 for report in &reports {
1480 let frame_id = report.frame_id;
1481 if let Some(ref content) = report.search_text {
1483 if !content.is_empty() {
1484 match ner_model.extract(content) {
1485 Ok(raw_entities) => {
1486 let merged_entities = merge_adjacent_entities(raw_entities, content);
1489
1490 for entity in &merged_entities {
1491 if !is_valid_entity(&entity.text, entity.confidence) {
1493 tracing::debug!(
1494 "Filtered entity: '{}' (confidence: {:.0}%)",
1495 entity.text, entity.confidence * 100.0
1496 );
1497 filtered_count += 1;
1498 continue;
1499 }
1500
1501 let kind = entity.to_entity_kind();
1503 let node = MeshNode::new(
1505 entity.text.to_lowercase(),
1506 entity.text.trim().to_string(),
1507 kind,
1508 entity.confidence,
1509 frame_id,
1510 entity.byte_start as u32,
1511 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1512 );
1513 mesh.merge_node(node);
1514 logic_mesh_entities += 1;
1515 }
1516 }
1517 Err(e) => {
1518 warn!("NER extraction failed for frame {frame_id}: {e}");
1519 }
1520 }
1521 }
1522 }
1523 }
1524
1525 if logic_mesh_entities > 0 {
1526 mesh.finalize();
1527 let node_count = mesh.stats().node_count;
1528 mem.set_logic_mesh(mesh);
1530 mem.commit()?;
1531 info!(
1532 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1533 logic_mesh_entities,
1534 node_count,
1535 filtered_count
1536 );
1537 }
1538
1539 if let Some(pb) = ner_spinner {
1540 pb.finish_with_message(format!(
1541 "Logic-Mesh: {} entities ({} unique)",
1542 logic_mesh_entities,
1543 mem.mesh_node_count()
1544 ));
1545 }
1546 }
1547 Err(e) => {
1548 if let Some(pb) = ner_spinner {
1549 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1550 }
1551 if args.json {
1552 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1553 }
1554 }
1555 }
1556 }
1557 }
1558
1559 let stats = mem.stats()?;
1560 if args.json {
1561 if capacity_reached {
1562 summary_warnings.push("capacity_limit_reached".to_string());
1563 }
1564 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1565 let total_duration_ms: Option<u64> = {
1566 let sum: u64 = reports
1567 .iter()
1568 .filter_map(|r| r.extraction.duration_ms)
1569 .sum();
1570 if sum > 0 {
1571 Some(sum)
1572 } else {
1573 None
1574 }
1575 };
1576 let total_pages: Option<u32> = {
1577 let sum: u32 = reports
1578 .iter()
1579 .filter_map(|r| r.extraction.pages_processed)
1580 .sum();
1581 if sum > 0 {
1582 Some(sum)
1583 } else {
1584 None
1585 }
1586 };
1587 let embedding_mode_str = match embedding_mode {
1588 EmbeddingMode::None => "none",
1589 EmbeddingMode::Auto => "auto",
1590 EmbeddingMode::Explicit => "explicit",
1591 };
1592 let summary_json = json!({
1593 "processed": processed,
1594 "ingested": reports.len(),
1595 "skipped": skipped,
1596 "bytes_added": bytes_added,
1597 "bytes_added_human": format_bytes(bytes_added),
1598 "embedded_documents": embedded_docs,
1599 "embedding": {
1600 "enabled": embedding_enabled,
1601 "mode": embedding_mode_str,
1602 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1603 },
1604 "tables": {
1605 "enabled": args.tables && !args.no_tables,
1606 "extracted": tables_extracted,
1607 "tables": table_summaries,
1608 },
1609 "capacity_reached": capacity_reached,
1610 "warnings": summary_warnings,
1611 "extraction": {
1612 "total_duration_ms": total_duration_ms,
1613 "total_pages_processed": total_pages,
1614 },
1615 "memory": {
1616 "path": args.file.display().to_string(),
1617 "frame_count": stats.frame_count,
1618 "size_bytes": stats.size_bytes,
1619 "tier": format!("{:?}", stats.tier),
1620 },
1621 "reports": reports_json,
1622 });
1623 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1624 } else {
1625 println!(
1626 "Committed {} frame(s); total frames {}",
1627 reports.len(),
1628 stats.frame_count
1629 );
1630 println!(
1631 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1632 processed,
1633 reports.len(),
1634 skipped,
1635 format_bytes(bytes_added)
1636 );
1637 if tables_extracted > 0 {
1638 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1639 }
1640 }
1641 Ok(())
1642}
1643
1644pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1645 let mut mem = Memvid::open(&args.file)?;
1646 ensure_cli_mutation_allowed(&mem)?;
1647 apply_lock_cli(&mut mem, &args.lock);
1648 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1649 let frame_id = existing.id;
1650
1651 let payload_bytes = match args.input.as_ref() {
1652 Some(path) => Some(read_payload(Some(path))?),
1653 None => None,
1654 };
1655 let payload_slice = payload_bytes.as_deref();
1656 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1657 let source_path = args.input.as_deref();
1658
1659 let mut options = PutOptions::default();
1660 options.enable_embedding = false;
1661 options.auto_tag = payload_slice.is_some();
1662 options.extract_dates = payload_slice.is_some();
1663 options.timestamp = Some(existing.timestamp);
1664 options.track = existing.track.clone();
1665 options.kind = existing.kind.clone();
1666 options.uri = existing.uri.clone();
1667 options.title = existing.title.clone();
1668 options.metadata = existing.metadata.clone();
1669 options.search_text = existing.search_text.clone();
1670 options.tags = existing.tags.clone();
1671 options.labels = existing.labels.clone();
1672 options.extra_metadata = existing.extra_metadata.clone();
1673
1674 if let Some(new_uri) = &args.set_uri {
1675 options.uri = Some(derive_uri(Some(new_uri), None));
1676 }
1677
1678 if options.uri.is_none() && payload_slice.is_some() {
1679 options.uri = Some(derive_uri(None, source_path));
1680 }
1681
1682 if let Some(title) = &args.title {
1683 options.title = Some(title.clone());
1684 }
1685 if let Some(ts) = args.timestamp {
1686 options.timestamp = Some(ts);
1687 }
1688 if let Some(track) = &args.track {
1689 options.track = Some(track.clone());
1690 }
1691 if let Some(kind) = &args.kind {
1692 options.kind = Some(kind.clone());
1693 }
1694
1695 if !args.labels.is_empty() {
1696 options.labels = args.labels.clone();
1697 }
1698
1699 if !args.tags.is_empty() {
1700 options.tags.clear();
1701 for (key, value) in &args.tags {
1702 if !key.is_empty() {
1703 options.tags.push(key.clone());
1704 }
1705 if !value.is_empty() && value != key {
1706 options.tags.push(value.clone());
1707 }
1708 options.extra_metadata.insert(key.clone(), value.clone());
1709 }
1710 }
1711
1712 apply_metadata_overrides(&mut options, &args.metadata);
1713
1714 let mut search_source = options.search_text.clone();
1715
1716 if let Some(payload) = payload_slice {
1717 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1718 if let Some(title) = inferred_title {
1719 options.title = Some(title);
1720 }
1721
1722 if options.uri.is_none() {
1723 options.uri = Some(derive_uri(None, source_path));
1724 }
1725
1726 let analysis = analyze_file(
1727 source_path,
1728 payload,
1729 payload_utf8,
1730 options.title.as_deref(),
1731 AudioAnalyzeOptions::default(),
1732 false,
1733 );
1734 if let Some(meta) = analysis.metadata {
1735 options.metadata = Some(meta);
1736 }
1737 if let Some(text) = analysis.search_text {
1738 search_source = Some(text.clone());
1739 options.search_text = Some(text);
1740 }
1741 } else {
1742 options.auto_tag = false;
1743 options.extract_dates = false;
1744 }
1745
1746 let stats = mem.stats()?;
1747 options.enable_embedding = stats.has_vec_index;
1748
1749 let mv2_dimension = mem.vec_index_dimension();
1751 let mut embedding: Option<Vec<f32>> = None;
1752 if args.embeddings {
1753 let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1754 if let Some(text) = search_source.as_ref() {
1755 if !text.trim().is_empty() {
1756 embedding = Some(runtime.embed(text)?);
1757 }
1758 }
1759 if embedding.is_none() {
1760 if let Some(text) = payload_utf8 {
1761 if !text.trim().is_empty() {
1762 embedding = Some(runtime.embed(text)?);
1763 }
1764 }
1765 }
1766 if embedding.is_none() {
1767 warn!("no textual content available; embeddings not recomputed");
1768 }
1769 }
1770
1771 let mut effective_embedding = embedding;
1772 if effective_embedding.is_none() && stats.has_vec_index {
1773 effective_embedding = mem.frame_embedding(frame_id)?;
1774 }
1775
1776 let final_uri = options.uri.clone();
1777 let replaced_payload = payload_slice.is_some();
1778 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1779 mem.commit()?;
1780
1781 let updated_frame =
1782 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1783 mem.frame_by_uri(&uri)?
1784 } else {
1785 let mut query = TimelineQueryBuilder::default();
1786 if let Some(limit) = NonZeroU64::new(1) {
1787 query = query.limit(limit).reverse(true);
1788 }
1789 let latest = mem.timeline(query.build())?;
1790 if let Some(entry) = latest.first() {
1791 mem.frame_by_id(entry.frame_id)?
1792 } else {
1793 mem.frame_by_id(frame_id)?
1794 }
1795 };
1796
1797 if args.json {
1798 let json = json!({
1799 "sequence": seq,
1800 "previous_frame_id": frame_id,
1801 "frame": frame_to_json(&updated_frame),
1802 "replaced_payload": replaced_payload,
1803 });
1804 println!("{}", serde_json::to_string_pretty(&json)?);
1805 } else {
1806 println!(
1807 "Updated frame {} → new frame {} (seq {})",
1808 frame_id, updated_frame.id, seq
1809 );
1810 println!(
1811 "Payload: {}",
1812 if replaced_payload {
1813 "replaced"
1814 } else {
1815 "reused"
1816 }
1817 );
1818 print_frame_summary(&mut mem, &updated_frame)?;
1819 }
1820
1821 Ok(())
1822}
1823
1824fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1825 if let Some(uri) = provided {
1826 let sanitized = sanitize_uri(uri, true);
1827 return format!("mv2://{}", sanitized);
1828 }
1829
1830 if let Some(path) = source {
1831 let raw = path.to_string_lossy();
1832 let sanitized = sanitize_uri(&raw, false);
1833 return format!("mv2://{}", sanitized);
1834 }
1835
1836 format!("mv2://frames/{}", Uuid::new_v4())
1837}
1838
1839pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1840 let digest = hash(payload).to_hex();
1841 let short = &digest[..32];
1842 let ext_from_path = source
1843 .and_then(|path| path.extension())
1844 .and_then(|ext| ext.to_str())
1845 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1846 .filter(|ext| !ext.is_empty());
1847 let ext = ext_from_path
1848 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1849 .unwrap_or_else(|| "bin".to_string());
1850 format!("mv2://video/{short}.{ext}")
1851}
1852
1853fn derive_title(
1854 provided: Option<String>,
1855 source: Option<&Path>,
1856 payload_utf8: Option<&str>,
1857) -> Option<String> {
1858 if let Some(title) = provided {
1859 let trimmed = title.trim();
1860 if trimmed.is_empty() {
1861 None
1862 } else {
1863 Some(trimmed.to_string())
1864 }
1865 } else {
1866 if let Some(text) = payload_utf8 {
1867 if let Some(markdown_title) = extract_markdown_title(text) {
1868 return Some(markdown_title);
1869 }
1870 }
1871 source
1872 .and_then(|path| path.file_stem())
1873 .and_then(|stem| stem.to_str())
1874 .map(to_title_case)
1875 }
1876}
1877
1878fn extract_markdown_title(text: &str) -> Option<String> {
1879 for line in text.lines() {
1880 let trimmed = line.trim();
1881 if trimmed.starts_with('#') {
1882 let title = trimmed.trim_start_matches('#').trim();
1883 if !title.is_empty() {
1884 return Some(title.to_string());
1885 }
1886 }
1887 }
1888 None
1889}
1890
1891fn to_title_case(input: &str) -> String {
1892 if input.is_empty() {
1893 return String::new();
1894 }
1895 let mut chars = input.chars();
1896 let Some(first) = chars.next() else {
1897 return String::new();
1898 };
1899 let prefix = first.to_uppercase().collect::<String>();
1900 prefix + chars.as_str()
1901}
1902
1903fn should_skip_input(path: &Path) -> bool {
1904 matches!(
1905 path.file_name().and_then(|name| name.to_str()),
1906 Some(".DS_Store")
1907 )
1908}
1909
1910fn should_skip_dir(path: &Path) -> bool {
1911 matches!(
1912 path.file_name().and_then(|name| name.to_str()),
1913 Some(name) if name.starts_with('.')
1914 )
1915}
1916
1917enum InputSet {
1918 Stdin,
1919 Files(Vec<PathBuf>),
1920}
1921
1922#[cfg(feature = "clip")]
1924fn is_image_file(path: &std::path::Path) -> bool {
1925 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
1926 return false;
1927 };
1928 matches!(
1929 ext.to_ascii_lowercase().as_str(),
1930 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
1931 )
1932}
1933
1934fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1935 let Some(raw) = input else {
1936 return Ok(InputSet::Stdin);
1937 };
1938
1939 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1940 let mut files = Vec::new();
1941 let mut matched_any = false;
1942 for entry in glob(raw)? {
1943 let path = entry?;
1944 if path.is_file() {
1945 matched_any = true;
1946 if should_skip_input(&path) {
1947 continue;
1948 }
1949 files.push(path);
1950 }
1951 }
1952 files.sort();
1953 if files.is_empty() {
1954 if matched_any {
1955 anyhow::bail!(
1956 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1957 );
1958 }
1959 anyhow::bail!("pattern '{raw}' did not match any files");
1960 }
1961 return Ok(InputSet::Files(files));
1962 }
1963
1964 let path = PathBuf::from(raw);
1965 if path.is_dir() {
1966 let (mut files, skipped_any) = collect_directory_files(&path)?;
1967 files.sort();
1968 if files.is_empty() {
1969 if skipped_any {
1970 anyhow::bail!(
1971 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1972 path.display()
1973 );
1974 }
1975 anyhow::bail!(
1976 "directory '{}' contains no ingestible files",
1977 path.display()
1978 );
1979 }
1980 Ok(InputSet::Files(files))
1981 } else {
1982 Ok(InputSet::Files(vec![path]))
1983 }
1984}
1985
1986fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1987 let mut files = Vec::new();
1988 let mut skipped_any = false;
1989 let mut stack = vec![root.to_path_buf()];
1990 while let Some(dir) = stack.pop() {
1991 for entry in fs::read_dir(&dir)? {
1992 let entry = entry?;
1993 let path = entry.path();
1994 let file_type = entry.file_type()?;
1995 if file_type.is_dir() {
1996 if should_skip_dir(&path) {
1997 skipped_any = true;
1998 continue;
1999 }
2000 stack.push(path);
2001 } else if file_type.is_file() {
2002 if should_skip_input(&path) {
2003 skipped_any = true;
2004 continue;
2005 }
2006 files.push(path);
2007 }
2008 }
2009 }
2010 Ok((files, skipped_any))
2011}
2012
2013struct IngestOutcome {
2014 report: PutReport,
2015 embedded: bool,
2016 #[cfg(feature = "parallel_segments")]
2017 parallel_input: Option<ParallelInput>,
2018}
2019
2020struct PutReport {
2021 seq: u64,
2022 #[allow(dead_code)] frame_id: u64,
2024 uri: String,
2025 title: Option<String>,
2026 original_bytes: usize,
2027 stored_bytes: usize,
2028 compressed: bool,
2029 source: Option<PathBuf>,
2030 mime: Option<String>,
2031 metadata: Option<DocMetadata>,
2032 extraction: ExtractionSummary,
2033 #[cfg(feature = "logic_mesh")]
2035 search_text: Option<String>,
2036}
2037
2038struct CapacityGuard {
2039 #[allow(dead_code)]
2040 tier: Tier,
2041 capacity: u64,
2042 current_size: u64,
2043}
2044
2045impl CapacityGuard {
2046 const MIN_OVERHEAD: u64 = 128 * 1024;
2047 const RELATIVE_OVERHEAD: f64 = 0.05;
2048
2049 fn from_stats(stats: &Stats) -> Option<Self> {
2050 if stats.capacity_bytes == 0 {
2051 return None;
2052 }
2053 Some(Self {
2054 tier: stats.tier,
2055 capacity: stats.capacity_bytes,
2056 current_size: stats.size_bytes,
2057 })
2058 }
2059
2060 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2061 let projected = self
2062 .current_size
2063 .saturating_add(additional)
2064 .saturating_add(Self::estimate_overhead(additional));
2065 if projected > self.capacity {
2066 return Err(map_put_error(
2067 MemvidError::CapacityExceeded {
2068 current: self.current_size,
2069 limit: self.capacity,
2070 required: projected,
2071 },
2072 Some(self.capacity),
2073 ));
2074 }
2075 Ok(())
2076 }
2077
2078 fn update_after_commit(&mut self, stats: &Stats) {
2079 self.current_size = stats.size_bytes;
2080 }
2081
2082 fn capacity_hint(&self) -> Option<u64> {
2083 Some(self.capacity)
2084 }
2085
2086 fn check_and_warn_capacity(&self) {
2088 if self.capacity == 0 {
2089 return;
2090 }
2091
2092 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2093
2094 if usage_pct >= 90.0 && usage_pct < 91.0 {
2096 eprintln!(
2097 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2098 format_bytes(self.current_size),
2099 format_bytes(self.capacity)
2100 );
2101 eprintln!(" Actions:");
2102 eprintln!(" 1. Recreate with larger --size");
2103 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2104 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2105 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2106 eprintln!(
2107 "⚠️ WARNING: 75% capacity used ({} / {})",
2108 format_bytes(self.current_size),
2109 format_bytes(self.capacity)
2110 );
2111 eprintln!(" Consider planning for capacity expansion soon");
2112 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2113 eprintln!(
2114 "ℹ️ INFO: 50% capacity used ({} / {})",
2115 format_bytes(self.current_size),
2116 format_bytes(self.capacity)
2117 );
2118 }
2119 }
2120
2121 fn estimate_overhead(additional: u64) -> u64 {
2122 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2123 fractional.max(Self::MIN_OVERHEAD)
2124 }
2125}
2126
2127#[derive(Debug, Clone)]
2128pub struct FileAnalysis {
2129 pub mime: String,
2130 pub metadata: Option<DocMetadata>,
2131 pub search_text: Option<String>,
2132 pub extraction: ExtractionSummary,
2133}
2134
2135#[derive(Clone, Copy)]
2136pub struct AudioAnalyzeOptions {
2137 force: bool,
2138 segment_secs: u32,
2139}
2140
2141impl AudioAnalyzeOptions {
2142 const DEFAULT_SEGMENT_SECS: u32 = 30;
2143
2144 fn normalised_segment_secs(self) -> u32 {
2145 self.segment_secs.max(1)
2146 }
2147}
2148
2149impl Default for AudioAnalyzeOptions {
2150 fn default() -> Self {
2151 Self {
2152 force: false,
2153 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2154 }
2155 }
2156}
2157
2158struct AudioAnalysis {
2159 metadata: DocAudioMetadata,
2160 caption: Option<String>,
2161 search_terms: Vec<String>,
2162}
2163
2164struct ImageAnalysis {
2165 width: u32,
2166 height: u32,
2167 palette: Vec<String>,
2168 caption: Option<String>,
2169 exif: Option<DocExifMetadata>,
2170}
2171
2172#[derive(Debug, Clone)]
2173pub struct ExtractionSummary {
2174 reader: Option<String>,
2175 status: ExtractionStatus,
2176 warnings: Vec<String>,
2177 duration_ms: Option<u64>,
2178 pages_processed: Option<u32>,
2179}
2180
2181impl ExtractionSummary {
2182 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2183 self.warnings.push(warning.into());
2184 }
2185}
2186
2187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2188enum ExtractionStatus {
2189 Skipped,
2190 Ok,
2191 FallbackUsed,
2192 Empty,
2193 Failed,
2194}
2195
2196impl ExtractionStatus {
2197 fn label(self) -> &'static str {
2198 match self {
2199 Self::Skipped => "skipped",
2200 Self::Ok => "ok",
2201 Self::FallbackUsed => "fallback_used",
2202 Self::Empty => "empty",
2203 Self::Failed => "failed",
2204 }
2205 }
2206}
2207
2208fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2209 let filename = source_path
2210 .and_then(|path| path.file_name())
2211 .and_then(|name| name.to_str())
2212 .map(|value| value.to_string());
2213 MediaManifest {
2214 kind: "video".to_string(),
2215 mime: mime.to_string(),
2216 bytes,
2217 filename,
2218 duration_ms: None,
2219 width: None,
2220 height: None,
2221 codec: None,
2222 }
2223}
2224
2225pub fn analyze_file(
2226 source_path: Option<&Path>,
2227 payload: &[u8],
2228 payload_utf8: Option<&str>,
2229 inferred_title: Option<&str>,
2230 audio_opts: AudioAnalyzeOptions,
2231 force_video: bool,
2232) -> FileAnalysis {
2233 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2234 let is_video = force_video || mime.starts_with("video/");
2235 let treat_as_text = if is_video { false } else { treat_as_text_base };
2236
2237 let mut metadata = DocMetadata::default();
2238 metadata.mime = Some(mime.clone());
2239 metadata.bytes = Some(payload.len() as u64);
2240 metadata.hash = Some(hash(payload).to_hex().to_string());
2241
2242 let mut search_text = None;
2243
2244 let mut extraction = ExtractionSummary {
2245 reader: None,
2246 status: ExtractionStatus::Skipped,
2247 warnings: Vec::new(),
2248 duration_ms: None,
2249 pages_processed: None,
2250 };
2251
2252 let reader_registry = default_reader_registry();
2253 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2254 if slice.is_empty() {
2255 None
2256 } else {
2257 Some(slice)
2258 }
2259 });
2260
2261 let apply_extracted_text = |text: &str,
2262 reader_label: &str,
2263 status_if_applied: ExtractionStatus,
2264 extraction: &mut ExtractionSummary,
2265 metadata: &mut DocMetadata,
2266 search_text: &mut Option<String>|
2267 -> bool {
2268 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2269 let mut value = normalized.text;
2270 if normalized.truncated {
2271 value.push('…');
2272 }
2273 if metadata.caption.is_none() {
2274 if let Some(caption) = caption_from_text(&value) {
2275 metadata.caption = Some(caption);
2276 }
2277 }
2278 *search_text = Some(value);
2279 extraction.reader = Some(reader_label.to_string());
2280 extraction.status = status_if_applied;
2281 true
2282 } else {
2283 false
2284 }
2285 };
2286
2287 if is_video {
2288 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2289 }
2290
2291 if treat_as_text {
2292 if let Some(text) = payload_utf8 {
2293 if !apply_extracted_text(
2294 text,
2295 "bytes",
2296 ExtractionStatus::Ok,
2297 &mut extraction,
2298 &mut metadata,
2299 &mut search_text,
2300 ) {
2301 extraction.status = ExtractionStatus::Empty;
2302 extraction.reader = Some("bytes".to_string());
2303 let msg = "text payload contained no searchable content after normalization";
2304 extraction.record_warning(msg);
2305 warn!("{msg}");
2306 }
2307 } else {
2308 extraction.reader = Some("bytes".to_string());
2309 extraction.status = ExtractionStatus::Failed;
2310 let msg = "payload reported as text but was not valid UTF-8";
2311 extraction.record_warning(msg);
2312 warn!("{msg}");
2313 }
2314 } else if mime == "application/pdf" {
2315 let mut applied = false;
2316
2317 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2318 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2319 .with_uri(source_path.and_then(|p| p.to_str()))
2320 .with_magic(magic);
2321
2322 if let Some(reader) = reader_registry.find_reader(&hint) {
2323 match reader.extract(payload, &hint) {
2324 Ok(output) => {
2325 extraction.reader = Some(output.reader_name.clone());
2326 if let Some(ms) = output.diagnostics.duration_ms {
2327 extraction.duration_ms = Some(ms);
2328 }
2329 if let Some(pages) = output.diagnostics.pages_processed {
2330 extraction.pages_processed = Some(pages);
2331 }
2332 if let Some(mime_type) = output.document.mime_type.clone() {
2333 metadata.mime = Some(mime_type);
2334 }
2335
2336 for warning in output.diagnostics.warnings.iter() {
2337 extraction.record_warning(warning);
2338 warn!("{warning}");
2339 }
2340
2341 let status = if output.diagnostics.fallback {
2342 ExtractionStatus::FallbackUsed
2343 } else {
2344 ExtractionStatus::Ok
2345 };
2346
2347 if let Some(doc_text) = output.document.text.as_ref() {
2348 applied = apply_extracted_text(
2349 doc_text,
2350 output.reader_name.as_str(),
2351 status,
2352 &mut extraction,
2353 &mut metadata,
2354 &mut search_text,
2355 );
2356 if !applied {
2357 extraction.reader = Some(output.reader_name);
2358 extraction.status = ExtractionStatus::Empty;
2359 let msg = "primary reader returned no usable text";
2360 extraction.record_warning(msg);
2361 warn!("{msg}");
2362 }
2363 } else {
2364 extraction.reader = Some(output.reader_name);
2365 extraction.status = ExtractionStatus::Empty;
2366 let msg = "primary reader returned empty text";
2367 extraction.record_warning(msg);
2368 warn!("{msg}");
2369 }
2370 }
2371 Err(err) => {
2372 let name = reader.name();
2373 extraction.reader = Some(name.to_string());
2374 extraction.status = ExtractionStatus::Failed;
2375 let msg = format!("primary reader {name} failed: {err}");
2376 extraction.record_warning(&msg);
2377 warn!("{msg}");
2378 }
2379 }
2380 } else {
2381 extraction.record_warning("no registered reader matched this PDF payload");
2382 warn!("no registered reader matched this PDF payload");
2383 }
2384
2385 if !applied {
2386 match extract_pdf_text(payload) {
2387 Ok(pdf_text) => {
2388 if !apply_extracted_text(
2389 &pdf_text,
2390 "pdf_extract",
2391 ExtractionStatus::FallbackUsed,
2392 &mut extraction,
2393 &mut metadata,
2394 &mut search_text,
2395 ) {
2396 extraction.reader = Some("pdf_extract".to_string());
2397 extraction.status = ExtractionStatus::Empty;
2398 let msg = "PDF text extraction yielded no searchable content";
2399 extraction.record_warning(msg);
2400 warn!("{msg}");
2401 }
2402 }
2403 Err(err) => {
2404 extraction.reader = Some("pdf_extract".to_string());
2405 extraction.status = ExtractionStatus::Failed;
2406 let msg = format!("failed to extract PDF text via fallback: {err}");
2407 extraction.record_warning(&msg);
2408 warn!("{msg}");
2409 }
2410 }
2411 }
2412 }
2413
2414 if !is_video && mime.starts_with("image/") {
2415 if let Some(image_meta) = analyze_image(payload) {
2416 let ImageAnalysis {
2417 width,
2418 height,
2419 palette,
2420 caption,
2421 exif,
2422 } = image_meta;
2423 metadata.width = Some(width);
2424 metadata.height = Some(height);
2425 if !palette.is_empty() {
2426 metadata.colors = Some(palette);
2427 }
2428 if metadata.caption.is_none() {
2429 metadata.caption = caption;
2430 }
2431 if let Some(exif) = exif {
2432 metadata.exif = Some(exif);
2433 }
2434 }
2435 }
2436
2437 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2438 if let Some(AudioAnalysis {
2439 metadata: audio_metadata,
2440 caption: audio_caption,
2441 mut search_terms,
2442 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2443 {
2444 if metadata.audio.is_none()
2445 || metadata
2446 .audio
2447 .as_ref()
2448 .map_or(true, DocAudioMetadata::is_empty)
2449 {
2450 metadata.audio = Some(audio_metadata);
2451 }
2452 if metadata.caption.is_none() {
2453 metadata.caption = audio_caption;
2454 }
2455 if !search_terms.is_empty() {
2456 match &mut search_text {
2457 Some(existing) => {
2458 for term in search_terms.drain(..) {
2459 if !existing.contains(&term) {
2460 if !existing.ends_with(' ') {
2461 existing.push(' ');
2462 }
2463 existing.push_str(&term);
2464 }
2465 }
2466 }
2467 None => {
2468 search_text = Some(search_terms.join(" "));
2469 }
2470 }
2471 }
2472 }
2473 }
2474
2475 if metadata.caption.is_none() {
2476 if let Some(title) = inferred_title {
2477 let trimmed = title.trim();
2478 if !trimmed.is_empty() {
2479 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2480 }
2481 }
2482 }
2483
2484 FileAnalysis {
2485 mime,
2486 metadata: if metadata.is_empty() {
2487 None
2488 } else {
2489 Some(metadata)
2490 },
2491 search_text,
2492 extraction,
2493 }
2494}
2495
2496fn default_reader_registry() -> &'static ReaderRegistry {
2497 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2498 REGISTRY.get_or_init(ReaderRegistry::default)
2499}
2500
2501fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2502 if detect_pdf_magic(magic) {
2503 return Some(DocumentFormat::Pdf);
2504 }
2505
2506 let mime = mime?.trim().to_ascii_lowercase();
2507 match mime.as_str() {
2508 "application/pdf" => Some(DocumentFormat::Pdf),
2509 "text/plain" => Some(DocumentFormat::PlainText),
2510 "text/markdown" => Some(DocumentFormat::Markdown),
2511 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2512 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2513 Some(DocumentFormat::Docx)
2514 }
2515 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2516 Some(DocumentFormat::Pptx)
2517 }
2518 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2519 Some(DocumentFormat::Xlsx)
2520 }
2521 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2522 _ => None,
2523 }
2524}
2525
2526fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2527 let mut slice = match magic {
2528 Some(slice) if !slice.is_empty() => slice,
2529 _ => return false,
2530 };
2531 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2532 slice = &slice[3..];
2533 }
2534 while let Some((first, rest)) = slice.split_first() {
2535 if first.is_ascii_whitespace() {
2536 slice = rest;
2537 } else {
2538 break;
2539 }
2540 }
2541 slice.starts_with(b"%PDF")
2542}
2543
2544fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2545 match pdf_extract::extract_text_from_mem(payload) {
2546 Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2547 Ok(fallback) => Ok(fallback),
2548 Err(err) => {
2549 warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2550 Ok(text)
2551 }
2552 },
2553 Err(err) => {
2554 warn!("pdf_extract failed: {err}");
2555 match fallback_pdftotext(payload) {
2556 Ok(fallback) => Ok(fallback),
2557 Err(fallback_err) => {
2558 warn!("pdftotext fallback failed: {fallback_err}");
2559 Err(err)
2560 }
2561 }
2562 }
2563 other => other,
2564 }
2565}
2566
2567fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2568 use std::io::Write;
2569 use std::process::{Command, Stdio};
2570
2571 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2572
2573 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2574 temp_pdf
2575 .write_all(payload)
2576 .context("failed to write pdf payload to temp file")?;
2577 temp_pdf.flush().context("failed to flush temp pdf file")?;
2578
2579 let child = Command::new(pdftotext)
2580 .arg("-layout")
2581 .arg(temp_pdf.path())
2582 .arg("-")
2583 .stdout(Stdio::piped())
2584 .spawn()
2585 .context("failed to spawn pdftotext")?;
2586
2587 let output = child
2588 .wait_with_output()
2589 .context("failed to read pdftotext output")?;
2590
2591 if !output.status.success() {
2592 return Err(anyhow!("pdftotext exited with status {}", output.status));
2593 }
2594
2595 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2596 Ok(text)
2597}
2598
2599fn detect_mime(
2600 source_path: Option<&Path>,
2601 payload: &[u8],
2602 payload_utf8: Option<&str>,
2603) -> (String, bool) {
2604 if let Some(kind) = infer::get(payload) {
2605 let mime = kind.mime_type().to_string();
2606 let treat_as_text = mime.starts_with("text/")
2607 || matches!(
2608 mime.as_str(),
2609 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2610 );
2611 return (mime, treat_as_text);
2612 }
2613
2614 let magic = magic_from_u8(payload);
2615 if !magic.is_empty() && magic != "application/octet-stream" {
2616 let treat_as_text = magic.starts_with("text/")
2617 || matches!(
2618 magic,
2619 "application/json"
2620 | "application/xml"
2621 | "application/javascript"
2622 | "image/svg+xml"
2623 | "text/plain"
2624 );
2625 return (magic.to_string(), treat_as_text);
2626 }
2627
2628 if let Some(path) = source_path {
2629 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2630 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2631 return (mime.to_string(), treat_as_text);
2632 }
2633 }
2634 }
2635
2636 if payload_utf8.is_some() {
2637 return ("text/plain".to_string(), true);
2638 }
2639
2640 ("application/octet-stream".to_string(), false)
2641}
2642
2643fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2644 let ext_lower = ext.to_ascii_lowercase();
2645 match ext_lower.as_str() {
2646 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2647 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2648 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2649 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2650 "md" | "markdown" => Some(("text/markdown", true)),
2651 "rst" => Some(("text/x-rst", true)),
2652 "json" => Some(("application/json", true)),
2653 "csv" => Some(("text/csv", true)),
2654 "tsv" => Some(("text/tab-separated-values", true)),
2655 "yaml" | "yml" => Some(("application/yaml", true)),
2656 "toml" => Some(("application/toml", true)),
2657 "html" | "htm" => Some(("text/html", true)),
2658 "xml" => Some(("application/xml", true)),
2659 "svg" => Some(("image/svg+xml", true)),
2660 "gif" => Some(("image/gif", false)),
2661 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2662 "png" => Some(("image/png", false)),
2663 "bmp" => Some(("image/bmp", false)),
2664 "ico" => Some(("image/x-icon", false)),
2665 "tif" | "tiff" => Some(("image/tiff", false)),
2666 "webp" => Some(("image/webp", false)),
2667 _ => None,
2668 }
2669}
2670
2671fn caption_from_text(text: &str) -> Option<String> {
2672 for line in text.lines() {
2673 let trimmed = line.trim();
2674 if !trimmed.is_empty() {
2675 return Some(truncate_to_boundary(trimmed, 240));
2676 }
2677 }
2678 None
2679}
2680
2681fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2682 if text.len() <= max_len {
2683 return text.to_string();
2684 }
2685 let mut end = max_len;
2686 while end > 0 && !text.is_char_boundary(end) {
2687 end -= 1;
2688 }
2689 if end == 0 {
2690 return String::new();
2691 }
2692 let mut truncated = text[..end].trim_end().to_string();
2693 truncated.push('…');
2694 truncated
2695}
2696
2697fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2698 let reader = ImageReader::new(Cursor::new(payload))
2699 .with_guessed_format()
2700 .map_err(|err| warn!("failed to guess image format: {err}"))
2701 .ok()?;
2702 let image = reader
2703 .decode()
2704 .map_err(|err| warn!("failed to decode image: {err}"))
2705 .ok()?;
2706 let width = image.width();
2707 let height = image.height();
2708 let rgb = image.to_rgb8();
2709 let palette = match get_palette(
2710 rgb.as_raw(),
2711 ColorFormat::Rgb,
2712 COLOR_PALETTE_SIZE,
2713 COLOR_PALETTE_QUALITY,
2714 ) {
2715 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2716 Err(err) => {
2717 warn!("failed to compute colour palette: {err}");
2718 Vec::new()
2719 }
2720 };
2721
2722 let (exif, exif_caption) = extract_exif_metadata(payload);
2723
2724 Some(ImageAnalysis {
2725 width,
2726 height,
2727 palette,
2728 caption: exif_caption,
2729 exif,
2730 })
2731}
2732
2733fn color_to_hex(color: Color) -> String {
2734 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2735}
2736
2737fn analyze_audio(
2738 payload: &[u8],
2739 source_path: Option<&Path>,
2740 mime: &str,
2741 options: AudioAnalyzeOptions,
2742) -> Option<AudioAnalysis> {
2743 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2744 use symphonia::core::errors::Error as SymphoniaError;
2745 use symphonia::core::formats::FormatOptions;
2746 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2747 use symphonia::core::meta::MetadataOptions;
2748 use symphonia::core::probe::Hint;
2749
2750 let cursor = Cursor::new(payload.to_vec());
2751 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2752
2753 let mut hint = Hint::new();
2754 if let Some(path) = source_path {
2755 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2756 hint.with_extension(ext);
2757 }
2758 }
2759 if let Some(suffix) = mime.split('/').nth(1) {
2760 hint.with_extension(suffix);
2761 }
2762
2763 let probed = symphonia::default::get_probe()
2764 .format(
2765 &hint,
2766 mss,
2767 &FormatOptions::default(),
2768 &MetadataOptions::default(),
2769 )
2770 .map_err(|err| warn!("failed to probe audio stream: {err}"))
2771 .ok()?;
2772
2773 let mut format = probed.format;
2774 let track = format.default_track()?;
2775 let track_id = track.id;
2776 let codec_params: CodecParameters = track.codec_params.clone();
2777 let sample_rate = codec_params.sample_rate;
2778 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2779
2780 let mut decoder = symphonia::default::get_codecs()
2781 .make(&codec_params, &DecoderOptions::default())
2782 .map_err(|err| warn!("failed to create audio decoder: {err}"))
2783 .ok()?;
2784
2785 let mut decoded_frames: u64 = 0;
2786 loop {
2787 let packet = match format.next_packet() {
2788 Ok(packet) => packet,
2789 Err(SymphoniaError::IoError(_)) => break,
2790 Err(SymphoniaError::DecodeError(err)) => {
2791 warn!("skipping undecodable audio packet: {err}");
2792 continue;
2793 }
2794 Err(SymphoniaError::ResetRequired) => {
2795 decoder.reset();
2796 continue;
2797 }
2798 Err(other) => {
2799 warn!("stopping audio analysis due to error: {other}");
2800 break;
2801 }
2802 };
2803
2804 if packet.track_id() != track_id {
2805 continue;
2806 }
2807
2808 match decoder.decode(&packet) {
2809 Ok(audio_buf) => {
2810 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2811 }
2812 Err(SymphoniaError::DecodeError(err)) => {
2813 warn!("failed to decode audio packet: {err}");
2814 }
2815 Err(SymphoniaError::IoError(_)) => break,
2816 Err(SymphoniaError::ResetRequired) => {
2817 decoder.reset();
2818 }
2819 Err(other) => {
2820 warn!("decoder error: {other}");
2821 break;
2822 }
2823 }
2824 }
2825
2826 let duration_secs = match sample_rate {
2827 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2828 _ => 0.0,
2829 };
2830
2831 let bitrate_kbps = if duration_secs > 0.0 {
2832 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2833 } else {
2834 None
2835 };
2836
2837 let tags = extract_lofty_tags(payload);
2838 let caption = tags
2839 .get("title")
2840 .cloned()
2841 .or_else(|| tags.get("tracktitle").cloned());
2842
2843 let mut search_terms = Vec::new();
2844 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2845 search_terms.push(title.clone());
2846 }
2847 if let Some(artist) = tags.get("artist") {
2848 search_terms.push(artist.clone());
2849 }
2850 if let Some(album) = tags.get("album") {
2851 search_terms.push(album.clone());
2852 }
2853 if let Some(genre) = tags.get("genre") {
2854 search_terms.push(genre.clone());
2855 }
2856
2857 let mut segments = Vec::new();
2858 if duration_secs > 0.0 {
2859 let segment_len = options.normalised_segment_secs() as f64;
2860 if segment_len > 0.0 {
2861 let mut start = 0.0;
2862 let mut idx = 1usize;
2863 while start < duration_secs {
2864 let end = (start + segment_len).min(duration_secs);
2865 segments.push(AudioSegmentMetadata {
2866 start_seconds: start as f32,
2867 end_seconds: end as f32,
2868 label: Some(format!("Segment {}", idx)),
2869 });
2870 if end >= duration_secs {
2871 break;
2872 }
2873 start = end;
2874 idx += 1;
2875 }
2876 }
2877 }
2878
2879 let mut metadata = DocAudioMetadata::default();
2880 if duration_secs > 0.0 {
2881 metadata.duration_secs = Some(duration_secs as f32);
2882 }
2883 if let Some(rate) = sample_rate {
2884 metadata.sample_rate_hz = Some(rate);
2885 }
2886 if let Some(channels) = channel_count {
2887 metadata.channels = Some(channels);
2888 }
2889 if let Some(bitrate) = bitrate_kbps {
2890 metadata.bitrate_kbps = Some(bitrate);
2891 }
2892 if codec_params.codec != CODEC_TYPE_NULL {
2893 metadata.codec = Some(format!("{:?}", codec_params.codec));
2894 }
2895 if !segments.is_empty() {
2896 metadata.segments = segments;
2897 }
2898 if !tags.is_empty() {
2899 metadata.tags = tags
2900 .iter()
2901 .map(|(k, v)| (k.clone(), v.clone()))
2902 .collect::<BTreeMap<_, _>>();
2903 }
2904
2905 Some(AudioAnalysis {
2906 metadata,
2907 caption,
2908 search_terms,
2909 })
2910}
2911
2912fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2913 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2914
2915 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2916 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2917 out.entry("title".into())
2918 .or_insert_with(|| value.to_string());
2919 out.entry("tracktitle".into())
2920 .or_insert_with(|| value.to_string());
2921 }
2922 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2923 out.entry("artist".into())
2924 .or_insert_with(|| value.to_string());
2925 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2926 out.entry("artist".into())
2927 .or_insert_with(|| value.to_string());
2928 }
2929 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2930 out.entry("album".into())
2931 .or_insert_with(|| value.to_string());
2932 }
2933 if let Some(value) = tag.get_string(&ItemKey::Genre) {
2934 out.entry("genre".into())
2935 .or_insert_with(|| value.to_string());
2936 }
2937 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2938 out.entry("track_number".into())
2939 .or_insert_with(|| value.to_string());
2940 }
2941 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2942 out.entry("disc_number".into())
2943 .or_insert_with(|| value.to_string());
2944 }
2945 }
2946
2947 let mut tags = HashMap::new();
2948 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2949 Ok(probe) => probe,
2950 Err(err) => {
2951 warn!("failed to guess audio tag file type: {err}");
2952 return tags;
2953 }
2954 };
2955
2956 let tagged_file = match probe.read() {
2957 Ok(file) => file,
2958 Err(err) => {
2959 warn!("failed to read audio tags: {err}");
2960 return tags;
2961 }
2962 };
2963
2964 if let Some(primary) = tagged_file.primary_tag() {
2965 collect_tag(primary, &mut tags);
2966 }
2967 for tag in tagged_file.tags() {
2968 collect_tag(tag, &mut tags);
2969 }
2970
2971 tags
2972}
2973
2974fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2975 let mut cursor = Cursor::new(payload);
2976 let exif = match ExifReader::new().read_from_container(&mut cursor) {
2977 Ok(exif) => exif,
2978 Err(err) => {
2979 warn!("failed to read EXIF metadata: {err}");
2980 return (None, None);
2981 }
2982 };
2983
2984 let mut doc = DocExifMetadata::default();
2985 let mut has_data = false;
2986
2987 if let Some(make) = exif_string(&exif, Tag::Make) {
2988 doc.make = Some(make);
2989 has_data = true;
2990 }
2991 if let Some(model) = exif_string(&exif, Tag::Model) {
2992 doc.model = Some(model);
2993 has_data = true;
2994 }
2995 if let Some(lens) =
2996 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
2997 {
2998 doc.lens = Some(lens);
2999 has_data = true;
3000 }
3001 if let Some(dt) =
3002 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3003 {
3004 doc.datetime = Some(dt);
3005 has_data = true;
3006 }
3007
3008 if let Some(gps) = extract_gps_metadata(&exif) {
3009 doc.gps = Some(gps);
3010 has_data = true;
3011 }
3012
3013 let caption = exif_string(&exif, Tag::ImageDescription);
3014
3015 let metadata = if has_data { Some(doc) } else { None };
3016 (metadata, caption)
3017}
3018
3019fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3020 exif.fields()
3021 .find(|field| field.tag == tag)
3022 .and_then(|field| field_to_string(field, exif))
3023}
3024
3025fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3026 let value = field.display_value().with_unit(exif).to_string();
3027 let trimmed = value.trim_matches('\0').trim();
3028 if trimmed.is_empty() {
3029 None
3030 } else {
3031 Some(trimmed.to_string())
3032 }
3033}
3034
3035fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3036 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3037 let latitude_ref_field = exif
3038 .fields()
3039 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3040 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3041 let longitude_ref_field = exif
3042 .fields()
3043 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3044
3045 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3046 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3047
3048 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3049 if reference.eq_ignore_ascii_case("S") {
3050 latitude = -latitude;
3051 }
3052 }
3053 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3054 if reference.eq_ignore_ascii_case("W") {
3055 longitude = -longitude;
3056 }
3057 }
3058
3059 Some(DocGpsMetadata {
3060 latitude,
3061 longitude,
3062 })
3063}
3064
3065fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3066 match value {
3067 ExifValue::Rational(values) if !values.is_empty() => {
3068 let deg = rational_to_f64_u(values.get(0)?)?;
3069 let min = values
3070 .get(1)
3071 .and_then(|v| rational_to_f64_u(v))
3072 .unwrap_or(0.0);
3073 let sec = values
3074 .get(2)
3075 .and_then(|v| rational_to_f64_u(v))
3076 .unwrap_or(0.0);
3077 Some(deg + (min / 60.0) + (sec / 3600.0))
3078 }
3079 ExifValue::SRational(values) if !values.is_empty() => {
3080 let deg = rational_to_f64_i(values.get(0)?)?;
3081 let min = values
3082 .get(1)
3083 .and_then(|v| rational_to_f64_i(v))
3084 .unwrap_or(0.0);
3085 let sec = values
3086 .get(2)
3087 .and_then(|v| rational_to_f64_i(v))
3088 .unwrap_or(0.0);
3089 Some(deg + (min / 60.0) + (sec / 3600.0))
3090 }
3091 _ => None,
3092 }
3093}
3094
3095fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3096 if value.denom == 0 {
3097 None
3098 } else {
3099 Some(value.num as f64 / value.denom as f64)
3100 }
3101}
3102
3103fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3104 if value.denom == 0 {
3105 None
3106 } else {
3107 Some(value.num as f64 / value.denom as f64)
3108 }
3109}
3110
3111fn value_to_ascii(value: &ExifValue) -> Option<String> {
3112 if let ExifValue::Ascii(values) = value {
3113 values
3114 .first()
3115 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3116 .map(|s| s.trim_matches('\0').trim().to_string())
3117 .filter(|s| !s.is_empty())
3118 } else {
3119 None
3120 }
3121}
3122
3123fn ingest_payload(
3124 mem: &mut Memvid,
3125 args: &PutArgs,
3126 payload: Vec<u8>,
3127 source_path: Option<&Path>,
3128 capacity_guard: Option<&mut CapacityGuard>,
3129 enable_embedding: bool,
3130 runtime: Option<&EmbeddingRuntime>,
3131 parallel_mode: bool,
3132) -> Result<IngestOutcome> {
3133 let original_bytes = payload.len();
3134 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3135
3136 let payload_text = std::str::from_utf8(&payload).ok();
3137 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3138
3139 let audio_opts = AudioAnalyzeOptions {
3140 force: args.audio,
3141 segment_secs: args
3142 .audio_segment_seconds
3143 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3144 };
3145
3146 let mut analysis = analyze_file(
3147 source_path,
3148 &payload,
3149 payload_text,
3150 inferred_title.as_deref(),
3151 audio_opts,
3152 args.video,
3153 );
3154
3155 if args.video && !analysis.mime.starts_with("video/") {
3156 anyhow::bail!(
3157 "--video requires a video input; detected MIME type {}",
3158 analysis.mime
3159 );
3160 }
3161
3162 let mut search_text = analysis.search_text.clone();
3163 if let Some(ref mut text) = search_text {
3164 if text.len() > MAX_SEARCH_TEXT_LEN {
3165 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3166 *text = truncated;
3167 }
3168 }
3169 analysis.search_text = search_text.clone();
3170
3171 let uri = if let Some(ref explicit) = args.uri {
3172 derive_uri(Some(explicit), None)
3173 } else if args.video {
3174 derive_video_uri(&payload, source_path, &analysis.mime)
3175 } else {
3176 derive_uri(None, source_path)
3177 };
3178
3179 let mut options_builder = PutOptions::builder()
3180 .enable_embedding(enable_embedding)
3181 .auto_tag(!args.no_auto_tag)
3182 .extract_dates(!args.no_extract_dates);
3183 if let Some(ts) = args.timestamp {
3184 options_builder = options_builder.timestamp(ts);
3185 }
3186 if let Some(track) = &args.track {
3187 options_builder = options_builder.track(track.clone());
3188 }
3189 if args.video {
3190 options_builder = options_builder.kind("video");
3191 options_builder = options_builder.tag("kind", "video");
3192 options_builder = options_builder.push_tag("video");
3193 } else if let Some(kind) = &args.kind {
3194 options_builder = options_builder.kind(kind.clone());
3195 }
3196 options_builder = options_builder.uri(uri.clone());
3197 if let Some(ref title) = inferred_title {
3198 options_builder = options_builder.title(title.clone());
3199 }
3200 for (key, value) in &args.tags {
3201 options_builder = options_builder.tag(key.clone(), value.clone());
3202 if !key.is_empty() {
3203 options_builder = options_builder.push_tag(key.clone());
3204 }
3205 if !value.is_empty() && value != key {
3206 options_builder = options_builder.push_tag(value.clone());
3207 }
3208 }
3209 for label in &args.labels {
3210 options_builder = options_builder.label(label.clone());
3211 }
3212 if let Some(metadata) = analysis.metadata.clone() {
3213 if !metadata.is_empty() {
3214 options_builder = options_builder.metadata(metadata);
3215 }
3216 }
3217 for (idx, entry) in args.metadata.iter().enumerate() {
3218 match serde_json::from_str::<DocMetadata>(entry) {
3219 Ok(meta) => {
3220 options_builder = options_builder.metadata(meta);
3221 }
3222 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3223 Ok(value) => {
3224 options_builder =
3225 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3226 }
3227 Err(err) => {
3228 warn!("failed to parse --metadata JSON: {err}");
3229 }
3230 },
3231 }
3232 }
3233 if let Some(text) = search_text.clone() {
3234 if !text.trim().is_empty() {
3235 options_builder = options_builder.search_text(text);
3236 }
3237 }
3238 let options = options_builder.build();
3239
3240 let existing_frame = if args.update_existing || !args.allow_duplicate {
3241 match mem.frame_by_uri(&uri) {
3242 Ok(frame) => Some(frame),
3243 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3244 Err(err) => return Err(err.into()),
3245 }
3246 } else {
3247 None
3248 };
3249
3250 let frame_id = if let Some(ref existing) = existing_frame {
3253 existing.id
3254 } else {
3255 mem.stats()?.frame_count
3256 };
3257
3258 let mut embedded = false;
3259 let allow_embedding = enable_embedding && !args.video;
3260 if enable_embedding && !allow_embedding && args.video {
3261 warn!("semantic embeddings are not generated for video payloads");
3262 }
3263 let seq = if let Some(existing) = existing_frame {
3264 if args.update_existing {
3265 let payload_for_update = payload.clone();
3266 if allow_embedding {
3267 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3268 let embed_text = search_text
3269 .clone()
3270 .or_else(|| payload_text.map(|text| text.to_string()))
3271 .unwrap_or_default();
3272 if embed_text.trim().is_empty() {
3273 warn!("no textual content available; embedding skipped");
3274 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3275 .map_err(|err| {
3276 map_put_error(
3277 err,
3278 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3279 )
3280 })?
3281 } else {
3282 let truncated_text = truncate_for_embedding(&embed_text);
3284 if truncated_text.len() < embed_text.len() {
3285 info!(
3286 "Truncated text from {} to {} chars for embedding",
3287 embed_text.len(),
3288 truncated_text.len()
3289 );
3290 }
3291 let embedding = runtime.embed(&truncated_text)?;
3292 embedded = true;
3293 mem.update_frame(
3294 existing.id,
3295 Some(payload_for_update),
3296 options.clone(),
3297 Some(embedding),
3298 )
3299 .map_err(|err| {
3300 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3301 })?
3302 }
3303 } else {
3304 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3305 .map_err(|err| {
3306 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3307 })?
3308 }
3309 } else {
3310 return Err(DuplicateUriError::new(uri.as_str()).into());
3311 }
3312 } else {
3313 if let Some(guard) = capacity_guard.as_ref() {
3314 guard.ensure_capacity(stored_bytes as u64)?;
3315 }
3316 if parallel_mode {
3317 #[cfg(feature = "parallel_segments")]
3318 {
3319 let mut parent_embedding = None;
3320 let mut chunk_embeddings_vec = None;
3321 if allow_embedding {
3322 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3323 let embed_text = search_text
3324 .clone()
3325 .or_else(|| payload_text.map(|text| text.to_string()))
3326 .unwrap_or_default();
3327 if embed_text.trim().is_empty() {
3328 warn!("no textual content available; embedding skipped");
3329 } else {
3330 info!(
3332 "parallel mode: checking for chunks on {} bytes payload",
3333 payload.len()
3334 );
3335 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3336 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3338
3339 #[cfg(feature = "temporal_enrich")]
3341 let enriched_chunks = if args.temporal_enrich {
3342 info!(
3343 "Temporal enrichment enabled, processing {} chunks",
3344 chunk_texts.len()
3345 );
3346 let today = chrono::Local::now().date_naive();
3348 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3349 let enriched: Vec<String> =
3351 results.iter().map(|(text, _)| text.clone()).collect();
3352 let resolved_count = results
3353 .iter()
3354 .filter(|(_, e)| {
3355 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3356 })
3357 .count();
3358 info!(
3359 "Temporal enrichment: resolved {} chunks with temporal context",
3360 resolved_count
3361 );
3362 enriched
3363 } else {
3364 chunk_texts.clone()
3365 };
3366 #[cfg(not(feature = "temporal_enrich"))]
3367 let enriched_chunks = {
3368 if args.temporal_enrich {
3369 warn!(
3370 "Temporal enrichment requested but feature not compiled in"
3371 );
3372 }
3373 chunk_texts.clone()
3374 };
3375
3376 let embed_chunks = if args.contextual {
3378 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3379 let engine = if args.contextual_model.as_deref() == Some("local") {
3380 #[cfg(feature = "llama-cpp")]
3381 {
3382 let model_path = get_local_contextual_model_path()?;
3383 ContextualEngine::local(model_path)
3384 }
3385 #[cfg(not(feature = "llama-cpp"))]
3386 {
3387 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3388 }
3389 } else if let Some(model) = &args.contextual_model {
3390 ContextualEngine::openai_with_model(model)?
3391 } else {
3392 ContextualEngine::openai()?
3393 };
3394
3395 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3396 {
3397 Ok(contexts) => {
3398 info!("Generated {} contextual prefixes", contexts.len());
3399 apply_contextual_prefixes(
3400 &embed_text,
3401 &enriched_chunks,
3402 &contexts,
3403 )
3404 }
3405 Err(e) => {
3406 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3407 enriched_chunks.clone()
3408 }
3409 }
3410 } else {
3411 enriched_chunks
3412 };
3413
3414 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3415 for chunk_text in &embed_chunks {
3416 let truncated_chunk = truncate_for_embedding(chunk_text);
3418 if truncated_chunk.len() < chunk_text.len() {
3419 info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
3420 }
3421 let chunk_emb = runtime.embed(&truncated_chunk)?;
3422 chunk_embeddings.push(chunk_emb);
3423 }
3424 let num_chunks = chunk_embeddings.len();
3425 chunk_embeddings_vec = Some(chunk_embeddings);
3426 let parent_text = truncate_for_embedding(&embed_text);
3428 if parent_text.len() < embed_text.len() {
3429 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3430 }
3431 parent_embedding = Some(runtime.embed(&parent_text)?);
3432 embedded = true;
3433 info!(
3434 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3435 num_chunks
3436 );
3437 } else {
3438 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3440 let truncated_text = truncate_for_embedding(&embed_text);
3441 if truncated_text.len() < embed_text.len() {
3442 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3443 }
3444 parent_embedding = Some(runtime.embed(&truncated_text)?);
3445 embedded = true;
3446 }
3447 }
3448 }
3449 let payload_variant = if let Some(path) = source_path {
3450 ParallelPayload::Path(path.to_path_buf())
3451 } else {
3452 ParallelPayload::Bytes(payload)
3453 };
3454 let input = ParallelInput {
3455 payload: payload_variant,
3456 options: options.clone(),
3457 embedding: parent_embedding,
3458 chunk_embeddings: chunk_embeddings_vec,
3459 };
3460 return Ok(IngestOutcome {
3461 report: PutReport {
3462 seq: 0,
3463 frame_id: 0, uri,
3465 title: inferred_title,
3466 original_bytes,
3467 stored_bytes,
3468 compressed,
3469 source: source_path.map(Path::to_path_buf),
3470 mime: Some(analysis.mime),
3471 metadata: analysis.metadata,
3472 extraction: analysis.extraction,
3473 #[cfg(feature = "logic_mesh")]
3474 search_text: search_text.clone(),
3475 },
3476 embedded,
3477 parallel_input: Some(input),
3478 });
3479 }
3480 #[cfg(not(feature = "parallel_segments"))]
3481 {
3482 mem.put_bytes_with_options(&payload, options)
3483 .map_err(|err| {
3484 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3485 })?
3486 }
3487 } else if allow_embedding {
3488 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3489 let embed_text = search_text
3490 .clone()
3491 .or_else(|| payload_text.map(|text| text.to_string()))
3492 .unwrap_or_default();
3493 if embed_text.trim().is_empty() {
3494 warn!("no textual content available; embedding skipped");
3495 mem.put_bytes_with_options(&payload, options)
3496 .map_err(|err| {
3497 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3498 })?
3499 } else {
3500 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3502 info!(
3504 "Document will be chunked into {} chunks, generating embeddings for each",
3505 chunk_texts.len()
3506 );
3507
3508 #[cfg(feature = "temporal_enrich")]
3510 let enriched_chunks = if args.temporal_enrich {
3511 info!(
3512 "Temporal enrichment enabled, processing {} chunks",
3513 chunk_texts.len()
3514 );
3515 let today = chrono::Local::now().date_naive();
3517 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3518 let enriched: Vec<String> =
3520 results.iter().map(|(text, _)| text.clone()).collect();
3521 let resolved_count = results
3522 .iter()
3523 .filter(|(_, e)| {
3524 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3525 })
3526 .count();
3527 info!(
3528 "Temporal enrichment: resolved {} chunks with temporal context",
3529 resolved_count
3530 );
3531 enriched
3532 } else {
3533 chunk_texts.clone()
3534 };
3535 #[cfg(not(feature = "temporal_enrich"))]
3536 let enriched_chunks = {
3537 if args.temporal_enrich {
3538 warn!("Temporal enrichment requested but feature not compiled in");
3539 }
3540 chunk_texts.clone()
3541 };
3542
3543 let embed_chunks = if args.contextual {
3545 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3546 let engine = if args.contextual_model.as_deref() == Some("local") {
3547 #[cfg(feature = "llama-cpp")]
3548 {
3549 let model_path = get_local_contextual_model_path()?;
3550 ContextualEngine::local(model_path)
3551 }
3552 #[cfg(not(feature = "llama-cpp"))]
3553 {
3554 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3555 }
3556 } else if let Some(model) = &args.contextual_model {
3557 ContextualEngine::openai_with_model(model)?
3558 } else {
3559 ContextualEngine::openai()?
3560 };
3561
3562 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3563 Ok(contexts) => {
3564 info!("Generated {} contextual prefixes", contexts.len());
3565 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3566 }
3567 Err(e) => {
3568 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3569 enriched_chunks.clone()
3570 }
3571 }
3572 } else {
3573 enriched_chunks
3574 };
3575
3576 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3577 for chunk_text in &embed_chunks {
3578 let truncated_chunk = truncate_for_embedding(chunk_text);
3580 if truncated_chunk.len() < chunk_text.len() {
3581 info!(
3582 "Truncated chunk from {} to {} chars for embedding",
3583 chunk_text.len(),
3584 truncated_chunk.len()
3585 );
3586 }
3587 let chunk_emb = runtime.embed(&truncated_chunk)?;
3588 chunk_embeddings.push(chunk_emb);
3589 }
3590 let parent_text = truncate_for_embedding(&embed_text);
3592 if parent_text.len() < embed_text.len() {
3593 info!(
3594 "Truncated parent text from {} to {} chars for embedding",
3595 embed_text.len(),
3596 parent_text.len()
3597 );
3598 }
3599 let parent_embedding = runtime.embed(&parent_text)?;
3600 embedded = true;
3601 info!(
3602 "Calling put_with_chunk_embeddings with {} chunk embeddings",
3603 chunk_embeddings.len()
3604 );
3605 mem.put_with_chunk_embeddings(
3606 &payload,
3607 Some(parent_embedding),
3608 chunk_embeddings,
3609 options,
3610 )
3611 .map_err(|err| {
3612 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3613 })?
3614 } else {
3615 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3617 let truncated_text = truncate_for_embedding(&embed_text);
3618 if truncated_text.len() < embed_text.len() {
3619 info!(
3620 "Truncated text from {} to {} chars for embedding",
3621 embed_text.len(),
3622 truncated_text.len()
3623 );
3624 }
3625 let embedding = runtime.embed(&truncated_text)?;
3626 embedded = true;
3627 mem.put_with_embedding_and_options(&payload, embedding, options)
3628 .map_err(|err| {
3629 map_put_error(
3630 err,
3631 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3632 )
3633 })?
3634 }
3635 }
3636 } else {
3637 mem.put_bytes_with_options(&payload, options)
3638 .map_err(|err| {
3639 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3640 })?
3641 }
3642 };
3643
3644 Ok(IngestOutcome {
3645 report: PutReport {
3646 seq,
3647 frame_id,
3648 uri,
3649 title: inferred_title,
3650 original_bytes,
3651 stored_bytes,
3652 compressed,
3653 source: source_path.map(Path::to_path_buf),
3654 mime: Some(analysis.mime),
3655 metadata: analysis.metadata,
3656 extraction: analysis.extraction,
3657 #[cfg(feature = "logic_mesh")]
3658 search_text,
3659 },
3660 embedded,
3661 #[cfg(feature = "parallel_segments")]
3662 parallel_input: None,
3663 })
3664}
3665
3666fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
3667 if std::str::from_utf8(payload).is_ok() {
3668 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
3669 Ok((compressed.len(), true))
3670 } else {
3671 Ok((payload.len(), false))
3672 }
3673}
3674
3675fn print_report(report: &PutReport) {
3676 let name = report
3677 .source
3678 .as_ref()
3679 .map(|path| {
3680 let pretty = env::current_dir()
3681 .ok()
3682 .as_ref()
3683 .and_then(|cwd| diff_paths(path, cwd))
3684 .unwrap_or_else(|| path.to_path_buf());
3685 pretty.to_string_lossy().into_owned()
3686 })
3687 .unwrap_or_else(|| "stdin".to_string());
3688
3689 let ratio = if report.original_bytes > 0 {
3690 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
3691 } else {
3692 100.0
3693 };
3694
3695 println!("• {name} → {}", report.uri);
3696 println!(" seq: {}", report.seq);
3697 if report.compressed {
3698 println!(
3699 " size: {} B → {} B ({:.1}% of original, compressed)",
3700 report.original_bytes, report.stored_bytes, ratio
3701 );
3702 } else {
3703 println!(" size: {} B (stored as-is)", report.original_bytes);
3704 }
3705 if let Some(mime) = &report.mime {
3706 println!(" mime: {mime}");
3707 }
3708 if let Some(title) = &report.title {
3709 println!(" title: {title}");
3710 }
3711 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3712 println!(
3713 " extraction: status={} reader={}",
3714 report.extraction.status.label(),
3715 extraction_reader
3716 );
3717 if let Some(ms) = report.extraction.duration_ms {
3718 println!(" extraction-duration: {} ms", ms);
3719 }
3720 if let Some(pages) = report.extraction.pages_processed {
3721 println!(" extraction-pages: {}", pages);
3722 }
3723 for warning in &report.extraction.warnings {
3724 println!(" warning: {warning}");
3725 }
3726 if let Some(metadata) = &report.metadata {
3727 if let Some(caption) = &metadata.caption {
3728 println!(" caption: {caption}");
3729 }
3730 if let Some(audio) = metadata.audio.as_ref() {
3731 if audio.duration_secs.is_some()
3732 || audio.sample_rate_hz.is_some()
3733 || audio.channels.is_some()
3734 {
3735 let duration = audio
3736 .duration_secs
3737 .map(|secs| format!("{secs:.1}s"))
3738 .unwrap_or_else(|| "unknown".into());
3739 let rate = audio
3740 .sample_rate_hz
3741 .map(|hz| format!("{} Hz", hz))
3742 .unwrap_or_else(|| "? Hz".into());
3743 let channels = audio
3744 .channels
3745 .map(|ch| ch.to_string())
3746 .unwrap_or_else(|| "?".into());
3747 println!(" audio: duration {duration}, {channels} ch, {rate}");
3748 }
3749 }
3750 }
3751
3752 println!();
3753}
3754
3755fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3756 let source = report
3757 .source
3758 .as_ref()
3759 .map(|path| path.to_string_lossy().into_owned());
3760 let extraction = json!({
3761 "status": report.extraction.status.label(),
3762 "reader": report.extraction.reader,
3763 "duration_ms": report.extraction.duration_ms,
3764 "pages_processed": report.extraction.pages_processed,
3765 "warnings": report.extraction.warnings,
3766 });
3767 json!({
3768 "seq": report.seq,
3769 "uri": report.uri,
3770 "title": report.title,
3771 "original_bytes": report.original_bytes,
3772 "original_bytes_human": format_bytes(report.original_bytes as u64),
3773 "stored_bytes": report.stored_bytes,
3774 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3775 "compressed": report.compressed,
3776 "mime": report.mime,
3777 "source": source,
3778 "metadata": report.metadata,
3779 "extraction": extraction,
3780 })
3781}
3782
3783fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3784 match err {
3785 MemvidError::CapacityExceeded {
3786 current,
3787 limit,
3788 required,
3789 } => anyhow!(CapacityExceededMessage {
3790 current,
3791 limit,
3792 required,
3793 }),
3794 other => other.into(),
3795 }
3796}
3797
3798fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3799 for (idx, entry) in entries.iter().enumerate() {
3800 match serde_json::from_str::<DocMetadata>(entry) {
3801 Ok(meta) => {
3802 options.metadata = Some(meta);
3803 }
3804 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3805 Ok(value) => {
3806 options
3807 .extra_metadata
3808 .insert(format!("custom_metadata_{idx}"), value.to_string());
3809 }
3810 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3811 },
3812 }
3813 }
3814}
3815
3816fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3817 let stats = mem.stats()?;
3818 let message = match stats.tier {
3819 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3820 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3821 };
3822 Ok(message)
3823}
3824
3825fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3826 let trimmed = raw.trim().trim_start_matches("mv2://");
3827 let normalized = trimmed.replace('\\', "/");
3828 let mut segments: Vec<String> = Vec::new();
3829 for segment in normalized.split('/') {
3830 if segment.is_empty() || segment == "." {
3831 continue;
3832 }
3833 if segment == ".." {
3834 segments.pop();
3835 continue;
3836 }
3837 segments.push(segment.to_string());
3838 }
3839
3840 if segments.is_empty() {
3841 return normalized
3842 .split('/')
3843 .last()
3844 .filter(|s| !s.is_empty())
3845 .unwrap_or("document")
3846 .to_string();
3847 }
3848
3849 if keep_path {
3850 segments.join("/")
3851 } else {
3852 segments
3853 .last()
3854 .cloned()
3855 .unwrap_or_else(|| "document".to_string())
3856 }
3857}
3858
3859fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3860 if quiet {
3861 let pb = ProgressBar::hidden();
3862 pb.set_message(message.to_string());
3863 return pb;
3864 }
3865
3866 match total {
3867 Some(len) => {
3868 let pb = ProgressBar::new(len);
3869 pb.set_draw_target(ProgressDrawTarget::stderr());
3870 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3871 .unwrap_or_else(|_| ProgressStyle::default_bar());
3872 pb.set_style(style);
3873 pb.set_message(message.to_string());
3874 pb
3875 }
3876 None => {
3877 let pb = ProgressBar::new_spinner();
3878 pb.set_draw_target(ProgressDrawTarget::stderr());
3879 pb.set_message(message.to_string());
3880 pb.enable_steady_tick(Duration::from_millis(120));
3881 pb
3882 }
3883 }
3884}
3885
3886fn create_spinner(message: &str) -> ProgressBar {
3887 let pb = ProgressBar::new_spinner();
3888 pb.set_draw_target(ProgressDrawTarget::stderr());
3889 let style = ProgressStyle::with_template("{spinner} {msg}")
3890 .unwrap_or_else(|_| ProgressStyle::default_spinner())
3891 .tick_strings(&["-", "\\", "|", "/"]);
3892 pb.set_style(style);
3893 pb.set_message(message.to_string());
3894 pb.enable_steady_tick(Duration::from_millis(120));
3895 pb
3896}
3897
3898#[cfg(feature = "parallel_segments")]
3904#[derive(Args)]
3905pub struct PutManyArgs {
3906 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3908 pub file: PathBuf,
3909 #[arg(long)]
3911 pub json: bool,
3912 #[arg(long, value_name = "PATH")]
3915 pub input: Option<PathBuf>,
3916 #[arg(long, default_value = "3")]
3918 pub compression_level: i32,
3919 #[command(flatten)]
3920 pub lock: LockCliArgs,
3921}
3922
3923#[cfg(feature = "parallel_segments")]
3925#[derive(Debug, serde::Deserialize)]
3926pub struct PutManyRequest {
3927 pub title: String,
3928 #[serde(default)]
3929 pub label: String,
3930 pub text: String,
3931 #[serde(default)]
3932 pub uri: Option<String>,
3933 #[serde(default)]
3934 pub tags: Vec<String>,
3935 #[serde(default)]
3936 pub labels: Vec<String>,
3937 #[serde(default)]
3938 pub metadata: serde_json::Value,
3939 #[serde(default)]
3941 pub embedding: Option<Vec<f32>>,
3942 #[serde(default)]
3946 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3947}
3948
3949#[cfg(feature = "parallel_segments")]
3951#[derive(Debug, serde::Deserialize)]
3952#[serde(transparent)]
3953pub struct PutManyBatch {
3954 pub requests: Vec<PutManyRequest>,
3955}
3956
3957#[cfg(feature = "parallel_segments")]
3958pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3959 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3960
3961 let mut mem = Memvid::open(&args.file)?;
3962 crate::utils::ensure_cli_mutation_allowed(&mem)?;
3963 crate::utils::apply_lock_cli(&mut mem, &args.lock);
3964
3965 let stats = mem.stats()?;
3967 if !stats.has_lex_index {
3968 mem.enable_lex()?;
3969 }
3970 if !stats.has_vec_index {
3971 mem.enable_vec()?;
3972 }
3973
3974 let batch_json: String = if let Some(input_path) = &args.input {
3976 fs::read_to_string(input_path)
3977 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3978 } else {
3979 let mut buffer = String::new();
3980 io::stdin()
3981 .read_line(&mut buffer)
3982 .context("Failed to read from stdin")?;
3983 buffer
3984 };
3985
3986 let batch: PutManyBatch =
3987 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3988
3989 if batch.requests.is_empty() {
3990 if args.json {
3991 println!("{{\"frame_ids\": [], \"count\": 0}}");
3992 } else {
3993 println!("No requests to process");
3994 }
3995 return Ok(());
3996 }
3997
3998 let count = batch.requests.len();
3999 if !args.json {
4000 eprintln!("Processing {} documents...", count);
4001 }
4002
4003 let inputs: Vec<ParallelInput> = batch
4005 .requests
4006 .into_iter()
4007 .enumerate()
4008 .map(|(i, req)| {
4009 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4010 let mut options = PutOptions::default();
4011 options.title = Some(req.title);
4012 options.uri = Some(uri);
4013 options.tags = req.tags;
4014 options.labels = req.labels;
4015
4016 ParallelInput {
4017 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4018 options,
4019 embedding: req.embedding,
4020 chunk_embeddings: req.chunk_embeddings,
4021 }
4022 })
4023 .collect();
4024
4025 let build_opts = BuildOpts {
4027 zstd_level: args.compression_level.clamp(1, 9),
4028 ..BuildOpts::default()
4029 };
4030
4031 let frame_ids = mem
4033 .put_parallel_inputs(&inputs, build_opts)
4034 .context("Failed to ingest batch")?;
4035
4036 if args.json {
4037 let output = serde_json::json!({
4038 "frame_ids": frame_ids,
4039 "count": frame_ids.len()
4040 });
4041 println!("{}", serde_json::to_string(&output)?);
4042 } else {
4043 println!("Ingested {} documents", frame_ids.len());
4044 if frame_ids.len() <= 10 {
4045 for fid in &frame_ids {
4046 println!(" frame_id: {}", fid);
4047 }
4048 } else {
4049 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4050 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4051 }
4052 }
4053
4054 Ok(())
4055}