1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
26use memvid_core::{
27 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
28 DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions, ReaderHint,
29 ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
30};
31#[cfg(feature = "parallel_segments")]
32use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
33use pdf_extract::OutputError as PdfExtractError;
34use serde_json::json;
35use tracing::{info, warn};
36use tree_magic_mini::from_u8 as magic_from_u8;
37use uuid::Uuid;
38
39const MAX_SEARCH_TEXT_LEN: usize = 32_768;
40const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
43const COLOR_PALETTE_SIZE: u8 = 5;
44const COLOR_PALETTE_QUALITY: u8 = 8;
45const MAGIC_SNIFF_BYTES: usize = 16;
46
47fn truncate_for_embedding(text: &str) -> String {
50 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
51 text.to_string()
52 } else {
53 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
55 let end = truncated
57 .char_indices()
58 .rev()
59 .next()
60 .map(|(i, c)| i + c.len_utf8())
61 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
62 text[..end].to_string()
63 }
64}
65
66#[cfg(feature = "llama-cpp")]
69fn get_local_contextual_model_path() -> Result<PathBuf> {
70 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
71 let expanded = if custom.starts_with("~/") {
73 if let Ok(home) = env::var("HOME") {
74 PathBuf::from(home).join(&custom[2..])
75 } else {
76 PathBuf::from(&custom)
77 }
78 } else {
79 PathBuf::from(&custom)
80 };
81 expanded
82 } else {
83 let home = env::var("HOME")
85 .map_err(|_| anyhow!("HOME environment variable not set"))?;
86 PathBuf::from(home).join(".memvid").join("models")
87 };
88
89 let model_path = models_dir
90 .join("llm")
91 .join("phi-3-mini-q4")
92 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
93
94 if model_path.exists() {
95 Ok(model_path)
96 } else {
97 Err(anyhow!(
98 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
99 model_path.display()
100 ))
101 }
102}
103
104use pathdiff::diff_paths;
105
106use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
107use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
108use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
109use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
110use crate::error::{CapacityExceededMessage, DuplicateUriError};
111#[cfg(feature = "temporal_enrich")]
112use memvid_core::enrich_chunks as temporal_enrich_chunks;
113use crate::utils::{
114 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
115 select_frame,
116};
117
118fn parse_bool_flag(value: &str) -> Option<bool> {
120 match value.trim().to_ascii_lowercase().as_str() {
121 "1" | "true" | "yes" | "on" => Some(true),
122 "0" | "false" | "no" | "off" => Some(false),
123 _ => None,
124 }
125}
126
127#[cfg(feature = "parallel_segments")]
129fn parallel_env_override() -> Option<bool> {
130 std::env::var("MEMVID_PARALLEL_SEGMENTS")
131 .ok()
132 .and_then(|value| parse_bool_flag(&value))
133}
134
135pub fn parse_key_val(s: &str) -> Result<(String, String)> {
137 let (key, value) = s
138 .split_once('=')
139 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
140 Ok((key.to_string(), value.to_string()))
141}
142
143#[derive(Args, Clone, Copy, Debug)]
145pub struct LockCliArgs {
146 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
148 pub lock_timeout: u64,
149 #[arg(long = "force", action = ArgAction::SetTrue)]
151 pub force: bool,
152}
153
154#[derive(Args)]
156pub struct PutArgs {
157 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
159 pub file: PathBuf,
160 #[arg(long)]
162 pub json: bool,
163 #[arg(long, value_name = "PATH")]
165 pub input: Option<String>,
166 #[arg(long, value_name = "URI")]
168 pub uri: Option<String>,
169 #[arg(long, value_name = "TITLE")]
171 pub title: Option<String>,
172 #[arg(long)]
174 pub timestamp: Option<i64>,
175 #[arg(long)]
177 pub track: Option<String>,
178 #[arg(long)]
180 pub kind: Option<String>,
181 #[arg(long, action = ArgAction::SetTrue)]
183 pub video: bool,
184 #[arg(long, action = ArgAction::SetTrue)]
186 pub transcript: bool,
187 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
189 pub tags: Vec<(String, String)>,
190 #[arg(long = "label", value_name = "LABEL")]
192 pub labels: Vec<String>,
193 #[arg(long = "metadata", value_name = "JSON")]
195 pub metadata: Vec<String>,
196 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
198 pub no_auto_tag: bool,
199 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
201 pub no_extract_dates: bool,
202 #[arg(long, action = ArgAction::SetTrue)]
204 pub audio: bool,
205 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
207 pub audio_segment_seconds: Option<u32>,
208 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
211 pub embedding: bool,
212 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
214 pub no_embedding: bool,
215 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
218 pub embedding_vec: Option<PathBuf>,
219 #[arg(long, action = ArgAction::SetTrue)]
223 pub vector_compression: bool,
224 #[arg(long, action = ArgAction::SetTrue)]
228 pub contextual: bool,
229 #[arg(long = "contextual-model", value_name = "MODEL")]
231 pub contextual_model: Option<String>,
232 #[arg(long, action = ArgAction::SetTrue)]
235 pub tables: bool,
236 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
238 pub no_tables: bool,
239 #[arg(long, conflicts_with = "allow_duplicate")]
241 pub update_existing: bool,
242 #[arg(long)]
244 pub allow_duplicate: bool,
245
246 #[arg(long, action = ArgAction::SetTrue)]
250 pub temporal_enrich: bool,
251
252 #[cfg(feature = "parallel_segments")]
254 #[arg(long, action = ArgAction::SetTrue)]
255 pub parallel_segments: bool,
256 #[cfg(feature = "parallel_segments")]
258 #[arg(long, action = ArgAction::SetTrue)]
259 pub no_parallel_segments: bool,
260 #[cfg(feature = "parallel_segments")]
262 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
263 pub parallel_segment_tokens: Option<usize>,
264 #[cfg(feature = "parallel_segments")]
266 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
267 pub parallel_segment_pages: Option<usize>,
268 #[cfg(feature = "parallel_segments")]
270 #[arg(long = "parallel-threads", value_name = "N")]
271 pub parallel_threads: Option<usize>,
272 #[cfg(feature = "parallel_segments")]
274 #[arg(long = "parallel-queue-depth", value_name = "N")]
275 pub parallel_queue_depth: Option<usize>,
276
277 #[command(flatten)]
278 pub lock: LockCliArgs,
279}
280
281#[cfg(feature = "parallel_segments")]
282impl PutArgs {
283 pub fn wants_parallel(&self) -> bool {
284 if self.parallel_segments {
285 return true;
286 }
287 if self.no_parallel_segments {
288 return false;
289 }
290 if let Some(env_flag) = parallel_env_override() {
291 return env_flag;
292 }
293 false
294 }
295
296 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
297 let mut opts = memvid_core::BuildOpts::default();
298 if let Some(tokens) = self.parallel_segment_tokens {
299 opts.segment_tokens = tokens;
300 }
301 if let Some(pages) = self.parallel_segment_pages {
302 opts.segment_pages = pages;
303 }
304 if let Some(threads) = self.parallel_threads {
305 opts.threads = threads;
306 }
307 if let Some(depth) = self.parallel_queue_depth {
308 opts.queue_depth = depth;
309 }
310 opts.sanitize();
311 opts
312 }
313}
314
315#[cfg(not(feature = "parallel_segments"))]
316impl PutArgs {
317 pub fn wants_parallel(&self) -> bool {
318 false
319 }
320}
321
322#[derive(Args)]
324pub struct ApiFetchArgs {
325 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
327 pub file: PathBuf,
328 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
330 pub config: PathBuf,
331 #[arg(long, action = ArgAction::SetTrue)]
333 pub dry_run: bool,
334 #[arg(long, value_name = "MODE")]
336 pub mode: Option<ApiFetchMode>,
337 #[arg(long, value_name = "URI")]
339 pub uri: Option<String>,
340 #[arg(long, action = ArgAction::SetTrue)]
342 pub json: bool,
343
344 #[command(flatten)]
345 pub lock: LockCliArgs,
346}
347
348#[derive(Args)]
350pub struct UpdateArgs {
351 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
352 pub file: PathBuf,
353 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
354 pub frame_id: Option<u64>,
355 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
356 pub uri: Option<String>,
357 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
358 pub input: Option<PathBuf>,
359 #[arg(long = "set-uri", value_name = "URI")]
360 pub set_uri: Option<String>,
361 #[arg(long, value_name = "TITLE")]
362 pub title: Option<String>,
363 #[arg(long, value_name = "TIMESTAMP")]
364 pub timestamp: Option<i64>,
365 #[arg(long, value_name = "TRACK")]
366 pub track: Option<String>,
367 #[arg(long, value_name = "KIND")]
368 pub kind: Option<String>,
369 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
370 pub tags: Vec<(String, String)>,
371 #[arg(long = "label", value_name = "LABEL")]
372 pub labels: Vec<String>,
373 #[arg(long = "metadata", value_name = "JSON")]
374 pub metadata: Vec<String>,
375 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
376 pub embeddings: bool,
377 #[arg(long)]
378 pub json: bool,
379
380 #[command(flatten)]
381 pub lock: LockCliArgs,
382}
383
384#[derive(Args)]
386pub struct DeleteArgs {
387 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
388 pub file: PathBuf,
389 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
390 pub frame_id: Option<u64>,
391 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
392 pub uri: Option<String>,
393 #[arg(long, action = ArgAction::SetTrue)]
394 pub yes: bool,
395 #[arg(long)]
396 pub json: bool,
397
398 #[command(flatten)]
399 pub lock: LockCliArgs,
400}
401
402pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
404 let command = ApiFetchCommand {
405 file: args.file,
406 config_path: args.config,
407 dry_run: args.dry_run,
408 mode_override: args.mode,
409 uri_override: args.uri,
410 output_json: args.json,
411 lock_timeout_ms: args.lock.lock_timeout,
412 force_lock: args.lock.force,
413 };
414 run_api_fetch(config, command)
415}
416
417pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
419 let mut mem = Memvid::open(&args.file)?;
420 ensure_cli_mutation_allowed(&mem)?;
421 apply_lock_cli(&mut mem, &args.lock);
422 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
423
424 if !args.yes {
425 if let Some(uri) = &frame.uri {
426 println!("About to delete frame {} ({uri})", frame.id);
427 } else {
428 println!("About to delete frame {}", frame.id);
429 }
430 print!("Confirm? [y/N]: ");
431 io::stdout().flush()?;
432 let mut input = String::new();
433 io::stdin().read_line(&mut input)?;
434 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
435 if !confirmed {
436 println!("Aborted");
437 return Ok(());
438 }
439 }
440
441 let seq = mem.delete_frame(frame.id)?;
442 mem.commit()?;
443 let deleted = mem.frame_by_id(frame.id)?;
444
445 if args.json {
446 let json = json!({
447 "frame_id": frame.id,
448 "sequence": seq,
449 "status": frame_status_str(deleted.status),
450 });
451 println!("{}", serde_json::to_string_pretty(&json)?);
452 } else {
453 println!("Deleted frame {} (seq {})", frame.id, seq);
454 }
455
456 Ok(())
457}
458pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
459 let mut mem = Memvid::open(&args.file)?;
460 ensure_cli_mutation_allowed(&mem)?;
461 apply_lock_cli(&mut mem, &args.lock);
462 let mut stats = mem.stats()?;
463 if stats.frame_count == 0 && !stats.has_lex_index {
464 mem.enable_lex()?;
465 stats = mem.stats()?;
466 }
467
468 if args.vector_compression {
470 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
471 }
472
473 let mut capacity_guard = CapacityGuard::from_stats(&stats);
474 let use_parallel = args.wants_parallel();
475 #[cfg(feature = "parallel_segments")]
476 let mut parallel_opts = if use_parallel {
477 Some(args.sanitized_parallel_opts())
478 } else {
479 None
480 };
481 #[cfg(feature = "parallel_segments")]
482 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
483 #[cfg(feature = "parallel_segments")]
484 let mut pending_parallel_indices: Vec<usize> = Vec::new();
485 let input_set = resolve_inputs(args.input.as_deref())?;
486
487 if args.video {
488 if let Some(kind) = &args.kind {
489 if !kind.eq_ignore_ascii_case("video") {
490 anyhow::bail!("--video conflicts with explicit --kind={kind}");
491 }
492 }
493 if matches!(input_set, InputSet::Stdin) {
494 anyhow::bail!("--video requires --input <file>");
495 }
496 }
497
498 #[allow(dead_code)]
499 enum EmbeddingMode {
500 None,
501 Auto,
502 Explicit,
503 }
504
505 let mv2_dimension = mem.vec_index_dimension();
509 let (embedding_mode, embedding_runtime) = if args.embedding {
510 (
511 EmbeddingMode::Explicit,
512 Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
513 )
514 } else {
515 (EmbeddingMode::None, None)
516 };
517 let embedding_enabled = embedding_runtime.is_some();
518 let runtime_ref = embedding_runtime.as_ref();
519
520 if embedding_enabled && !args.json {
522 let capacity = stats.capacity_bytes;
523 if args.vector_compression {
524 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
527 eprintln!(" Storage: ~20 KB per document (16x compressed)");
528 eprintln!(" Accuracy: ~95% recall@10 maintained");
529 eprintln!(
530 " Capacity: ~{} documents max for {:.1} GB",
531 max_docs,
532 capacity as f64 / 1e9
533 );
534 eprintln!();
535 } else {
536 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
538 eprintln!(" Storage: ~270 KB per document");
539 eprintln!(
540 " Capacity: ~{} documents max for {:.1} GB",
541 max_docs,
542 capacity as f64 / 1e9
543 );
544 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
545 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
546 eprintln!();
547 }
548 }
549
550 let embed_progress = if embedding_enabled {
551 let total = match &input_set {
552 InputSet::Files(paths) => Some(paths.len() as u64),
553 InputSet::Stdin => None,
554 };
555 Some(create_task_progress_bar(total, "embed", false))
556 } else {
557 None
558 };
559 let mut embedded_docs = 0usize;
560
561 if let InputSet::Files(paths) = &input_set {
562 if paths.len() > 1 {
563 if args.uri.is_some() || args.title.is_some() {
564 anyhow::bail!(
565 "--uri/--title apply to a single file; omit them when using a directory or glob"
566 );
567 }
568 }
569 }
570
571 let mut reports = Vec::new();
572 let mut processed = 0usize;
573 let mut skipped = 0usize;
574 let mut bytes_added = 0u64;
575 let mut summary_warnings: Vec<String> = Vec::new();
576
577 let mut capacity_reached = false;
578 match input_set {
579 InputSet::Stdin => {
580 processed += 1;
581 match read_payload(None) {
582 Ok(payload) => {
583 let mut analyze_spinner = if args.json {
584 None
585 } else {
586 Some(create_spinner("Analyzing stdin payload..."))
587 };
588 match ingest_payload(
589 &mut mem,
590 &args,
591 payload,
592 None,
593 capacity_guard.as_mut(),
594 embedding_enabled,
595 runtime_ref,
596 use_parallel,
597 ) {
598 Ok(outcome) => {
599 if let Some(pb) = analyze_spinner.take() {
600 pb.finish_and_clear();
601 }
602 bytes_added += outcome.report.stored_bytes as u64;
603 if args.json {
604 summary_warnings
605 .extend(outcome.report.extraction.warnings.iter().cloned());
606 }
607 reports.push(outcome.report);
608 let report_index = reports.len() - 1;
609 if !args.json && !use_parallel {
610 if let Some(report) = reports.get(report_index) {
611 print_report(report);
612 }
613 }
614 if args.transcript {
615 let notice = transcript_notice_message(&mut mem)?;
616 if args.json {
617 summary_warnings.push(notice);
618 } else {
619 println!("{notice}");
620 }
621 }
622 if outcome.embedded {
623 if let Some(pb) = embed_progress.as_ref() {
624 pb.inc(1);
625 }
626 embedded_docs += 1;
627 }
628 if use_parallel {
629 #[cfg(feature = "parallel_segments")]
630 if let Some(input) = outcome.parallel_input {
631 pending_parallel_indices.push(report_index);
632 pending_parallel_inputs.push(input);
633 }
634 } else if let Some(guard) = capacity_guard.as_mut() {
635 mem.commit()?;
636 let stats = mem.stats()?;
637 guard.update_after_commit(&stats);
638 guard.check_and_warn_capacity(); }
640 }
641 Err(err) => {
642 if let Some(pb) = analyze_spinner.take() {
643 pb.finish_and_clear();
644 }
645 if err.downcast_ref::<DuplicateUriError>().is_some() {
646 return Err(err);
647 }
648 warn!("skipped stdin payload: {err}");
649 skipped += 1;
650 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
651 capacity_reached = true;
652 }
653 }
654 }
655 }
656 Err(err) => {
657 warn!("failed to read stdin: {err}");
658 skipped += 1;
659 }
660 }
661 }
662 InputSet::Files(ref paths) => {
663 let mut transcript_notice_printed = false;
664 for path in paths {
665 processed += 1;
666 match read_payload(Some(&path)) {
667 Ok(payload) => {
668 let label = path.display().to_string();
669 let mut analyze_spinner = if args.json {
670 None
671 } else {
672 Some(create_spinner(&format!("Analyzing {label}...")))
673 };
674 match ingest_payload(
675 &mut mem,
676 &args,
677 payload,
678 Some(&path),
679 capacity_guard.as_mut(),
680 embedding_enabled,
681 runtime_ref,
682 use_parallel,
683 ) {
684 Ok(outcome) => {
685 if let Some(pb) = analyze_spinner.take() {
686 pb.finish_and_clear();
687 }
688 bytes_added += outcome.report.stored_bytes as u64;
689 if args.json {
690 summary_warnings
691 .extend(outcome.report.extraction.warnings.iter().cloned());
692 }
693 reports.push(outcome.report);
694 let report_index = reports.len() - 1;
695 if !args.json && !use_parallel {
696 if let Some(report) = reports.get(report_index) {
697 print_report(report);
698 }
699 }
700 if args.transcript && !transcript_notice_printed {
701 let notice = transcript_notice_message(&mut mem)?;
702 if args.json {
703 summary_warnings.push(notice);
704 } else {
705 println!("{notice}");
706 }
707 transcript_notice_printed = true;
708 }
709 if outcome.embedded {
710 if let Some(pb) = embed_progress.as_ref() {
711 pb.inc(1);
712 }
713 embedded_docs += 1;
714 }
715 if use_parallel {
716 #[cfg(feature = "parallel_segments")]
717 if let Some(input) = outcome.parallel_input {
718 pending_parallel_indices.push(report_index);
719 pending_parallel_inputs.push(input);
720 }
721 } else if let Some(guard) = capacity_guard.as_mut() {
722 mem.commit()?;
723 let stats = mem.stats()?;
724 guard.update_after_commit(&stats);
725 guard.check_and_warn_capacity(); }
727 }
728 Err(err) => {
729 if let Some(pb) = analyze_spinner.take() {
730 pb.finish_and_clear();
731 }
732 if err.downcast_ref::<DuplicateUriError>().is_some() {
733 return Err(err);
734 }
735 warn!("skipped {}: {err}", path.display());
736 skipped += 1;
737 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
738 capacity_reached = true;
739 }
740 }
741 }
742 }
743 Err(err) => {
744 warn!("failed to read {}: {err}", path.display());
745 skipped += 1;
746 }
747 }
748 if capacity_reached {
749 break;
750 }
751 }
752 }
753 }
754
755 if capacity_reached {
756 warn!("capacity limit reached; remaining inputs were not processed");
757 }
758
759 if let Some(pb) = embed_progress.as_ref() {
760 pb.finish_with_message("embed");
761 }
762
763 if let Some(runtime) = embedding_runtime.as_ref() {
764 if embedded_docs > 0 {
765 match embedding_mode {
766 EmbeddingMode::Explicit => println!(
767 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
768 embedded_docs,
769 runtime.dimension()
770 ),
771 EmbeddingMode::Auto => println!(
772 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
773 embedded_docs,
774 runtime.dimension()
775 ),
776 EmbeddingMode::None => {}
777 }
778 } else {
779 match embedding_mode {
780 EmbeddingMode::Explicit => {
781 println!("No embeddings were generated (no textual content available)");
782 }
783 EmbeddingMode::Auto => {
784 println!(
785 "Semantic runtime available, but no textual content produced embeddings"
786 );
787 }
788 EmbeddingMode::None => {}
789 }
790 }
791 }
792
793 if reports.is_empty() {
794 if args.json {
795 if capacity_reached {
796 summary_warnings.push("capacity_limit_reached".to_string());
797 }
798 let summary_json = json!({
799 "processed": processed,
800 "ingested": 0,
801 "skipped": skipped,
802 "bytes_added": bytes_added,
803 "bytes_added_human": format_bytes(bytes_added),
804 "embedded_documents": embedded_docs,
805 "capacity_reached": capacity_reached,
806 "warnings": summary_warnings,
807 "reports": Vec::<serde_json::Value>::new(),
808 });
809 println!("{}", serde_json::to_string_pretty(&summary_json)?);
810 } else {
811 println!(
812 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
813 processed, skipped
814 );
815 }
816 return Ok(());
817 }
818
819 #[cfg(feature = "parallel_segments")]
820 let mut parallel_flush_spinner = if use_parallel && !args.json {
821 Some(create_spinner("Flushing parallel segments..."))
822 } else {
823 None
824 };
825
826 if use_parallel {
827 #[cfg(feature = "parallel_segments")]
828 {
829 let mut opts = parallel_opts.take().unwrap_or_else(|| {
830 let mut opts = BuildOpts::default();
831 opts.sanitize();
832 opts
833 });
834 opts.vec_compression = mem.vector_compression().clone();
836 let seqs = if pending_parallel_inputs.is_empty() {
837 mem.commit_parallel(opts)?;
838 Vec::new()
839 } else {
840 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
841 };
842 if let Some(pb) = parallel_flush_spinner.take() {
843 pb.finish_with_message("Flushed parallel segments");
844 }
845 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
846 if let Some(report) = reports.get_mut(idx) {
847 report.seq = seq;
848 }
849 }
850 }
851 #[cfg(not(feature = "parallel_segments"))]
852 {
853 mem.commit()?;
854 }
855 } else {
856 mem.commit()?;
857 }
858
859 if !args.json && use_parallel {
860 for report in &reports {
861 print_report(report);
862 }
863 }
864
865 let mut tables_extracted = 0usize;
867 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
868 if args.tables && !args.no_tables {
869 if let InputSet::Files(ref paths) = input_set {
870 let pdf_paths: Vec<_> = paths
871 .iter()
872 .filter(|p| {
873 p.extension()
874 .and_then(|e| e.to_str())
875 .map(|e| e.eq_ignore_ascii_case("pdf"))
876 .unwrap_or(false)
877 })
878 .collect();
879
880 if !pdf_paths.is_empty() {
881 let table_spinner = if args.json {
882 None
883 } else {
884 Some(create_spinner(&format!(
885 "Extracting tables from {} PDF(s)...",
886 pdf_paths.len()
887 )))
888 };
889
890 let table_options = TableExtractionOptions::builder()
892 .mode(memvid_core::table::ExtractionMode::Aggressive)
893 .min_rows(2)
894 .min_cols(2)
895 .min_quality(memvid_core::table::TableQuality::Low)
896 .merge_multi_page(true)
897 .build();
898
899 for pdf_path in pdf_paths {
900 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
901 let filename = pdf_path
902 .file_name()
903 .and_then(|s| s.to_str())
904 .unwrap_or("unknown.pdf");
905
906 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
907 for table in &result.tables {
908 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
909 {
910 tables_extracted += 1;
911 table_summaries.push(json!({
912 "table_id": table.table_id,
913 "source_file": table.source_file,
914 "meta_frame_id": meta_id,
915 "rows": table.n_rows,
916 "cols": table.n_cols,
917 "quality": format!("{:?}", table.quality),
918 "pages": format!("{}-{}", table.page_start, table.page_end),
919 }));
920 }
921 }
922 }
923 }
924 }
925
926 if let Some(pb) = table_spinner {
927 if tables_extracted > 0 {
928 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
929 } else {
930 pb.finish_with_message("No tables found");
931 }
932 }
933
934 if tables_extracted > 0 {
936 mem.commit()?;
937 }
938 }
939 }
940 }
941
942 let stats = mem.stats()?;
943 if args.json {
944 if capacity_reached {
945 summary_warnings.push("capacity_limit_reached".to_string());
946 }
947 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
948 let total_duration_ms: Option<u64> = {
949 let sum: u64 = reports
950 .iter()
951 .filter_map(|r| r.extraction.duration_ms)
952 .sum();
953 if sum > 0 {
954 Some(sum)
955 } else {
956 None
957 }
958 };
959 let total_pages: Option<u32> = {
960 let sum: u32 = reports
961 .iter()
962 .filter_map(|r| r.extraction.pages_processed)
963 .sum();
964 if sum > 0 {
965 Some(sum)
966 } else {
967 None
968 }
969 };
970 let embedding_mode_str = match embedding_mode {
971 EmbeddingMode::None => "none",
972 EmbeddingMode::Auto => "auto",
973 EmbeddingMode::Explicit => "explicit",
974 };
975 let summary_json = json!({
976 "processed": processed,
977 "ingested": reports.len(),
978 "skipped": skipped,
979 "bytes_added": bytes_added,
980 "bytes_added_human": format_bytes(bytes_added),
981 "embedded_documents": embedded_docs,
982 "embedding": {
983 "enabled": embedding_enabled,
984 "mode": embedding_mode_str,
985 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
986 },
987 "tables": {
988 "enabled": args.tables && !args.no_tables,
989 "extracted": tables_extracted,
990 "tables": table_summaries,
991 },
992 "capacity_reached": capacity_reached,
993 "warnings": summary_warnings,
994 "extraction": {
995 "total_duration_ms": total_duration_ms,
996 "total_pages_processed": total_pages,
997 },
998 "memory": {
999 "path": args.file.display().to_string(),
1000 "frame_count": stats.frame_count,
1001 "size_bytes": stats.size_bytes,
1002 "tier": format!("{:?}", stats.tier),
1003 },
1004 "reports": reports_json,
1005 });
1006 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1007 } else {
1008 println!(
1009 "Committed {} frame(s); total frames {}",
1010 reports.len(),
1011 stats.frame_count
1012 );
1013 println!(
1014 "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1015 processed,
1016 reports.len(),
1017 skipped,
1018 format_bytes(bytes_added)
1019 );
1020 if tables_extracted > 0 {
1021 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1022 }
1023 }
1024 Ok(())
1025}
1026
1027pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1028 let mut mem = Memvid::open(&args.file)?;
1029 ensure_cli_mutation_allowed(&mem)?;
1030 apply_lock_cli(&mut mem, &args.lock);
1031 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1032 let frame_id = existing.id;
1033
1034 let payload_bytes = match args.input.as_ref() {
1035 Some(path) => Some(read_payload(Some(path))?),
1036 None => None,
1037 };
1038 let payload_slice = payload_bytes.as_deref();
1039 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1040 let source_path = args.input.as_deref();
1041
1042 let mut options = PutOptions::default();
1043 options.enable_embedding = false;
1044 options.auto_tag = payload_slice.is_some();
1045 options.extract_dates = payload_slice.is_some();
1046 options.timestamp = Some(existing.timestamp);
1047 options.track = existing.track.clone();
1048 options.kind = existing.kind.clone();
1049 options.uri = existing.uri.clone();
1050 options.title = existing.title.clone();
1051 options.metadata = existing.metadata.clone();
1052 options.search_text = existing.search_text.clone();
1053 options.tags = existing.tags.clone();
1054 options.labels = existing.labels.clone();
1055 options.extra_metadata = existing.extra_metadata.clone();
1056
1057 if let Some(new_uri) = &args.set_uri {
1058 options.uri = Some(derive_uri(Some(new_uri), None));
1059 }
1060
1061 if options.uri.is_none() && payload_slice.is_some() {
1062 options.uri = Some(derive_uri(None, source_path));
1063 }
1064
1065 if let Some(title) = &args.title {
1066 options.title = Some(title.clone());
1067 }
1068 if let Some(ts) = args.timestamp {
1069 options.timestamp = Some(ts);
1070 }
1071 if let Some(track) = &args.track {
1072 options.track = Some(track.clone());
1073 }
1074 if let Some(kind) = &args.kind {
1075 options.kind = Some(kind.clone());
1076 }
1077
1078 if !args.labels.is_empty() {
1079 options.labels = args.labels.clone();
1080 }
1081
1082 if !args.tags.is_empty() {
1083 options.tags.clear();
1084 for (key, value) in &args.tags {
1085 if !key.is_empty() {
1086 options.tags.push(key.clone());
1087 }
1088 if !value.is_empty() && value != key {
1089 options.tags.push(value.clone());
1090 }
1091 options.extra_metadata.insert(key.clone(), value.clone());
1092 }
1093 }
1094
1095 apply_metadata_overrides(&mut options, &args.metadata);
1096
1097 let mut search_source = options.search_text.clone();
1098
1099 if let Some(payload) = payload_slice {
1100 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1101 if let Some(title) = inferred_title {
1102 options.title = Some(title);
1103 }
1104
1105 if options.uri.is_none() {
1106 options.uri = Some(derive_uri(None, source_path));
1107 }
1108
1109 let analysis = analyze_file(
1110 source_path,
1111 payload,
1112 payload_utf8,
1113 options.title.as_deref(),
1114 AudioAnalyzeOptions::default(),
1115 false,
1116 );
1117 if let Some(meta) = analysis.metadata {
1118 options.metadata = Some(meta);
1119 }
1120 if let Some(text) = analysis.search_text {
1121 search_source = Some(text.clone());
1122 options.search_text = Some(text);
1123 }
1124 } else {
1125 options.auto_tag = false;
1126 options.extract_dates = false;
1127 }
1128
1129 let stats = mem.stats()?;
1130 options.enable_embedding = stats.has_vec_index;
1131
1132 let mv2_dimension = mem.vec_index_dimension();
1134 let mut embedding: Option<Vec<f32>> = None;
1135 if args.embeddings {
1136 let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1137 if let Some(text) = search_source.as_ref() {
1138 if !text.trim().is_empty() {
1139 embedding = Some(runtime.embed(text)?);
1140 }
1141 }
1142 if embedding.is_none() {
1143 if let Some(text) = payload_utf8 {
1144 if !text.trim().is_empty() {
1145 embedding = Some(runtime.embed(text)?);
1146 }
1147 }
1148 }
1149 if embedding.is_none() {
1150 warn!("no textual content available; embeddings not recomputed");
1151 }
1152 }
1153
1154 let mut effective_embedding = embedding;
1155 if effective_embedding.is_none() && stats.has_vec_index {
1156 effective_embedding = mem.frame_embedding(frame_id)?;
1157 }
1158
1159 let final_uri = options.uri.clone();
1160 let replaced_payload = payload_slice.is_some();
1161 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1162 mem.commit()?;
1163
1164 let updated_frame =
1165 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1166 mem.frame_by_uri(&uri)?
1167 } else {
1168 let mut query = TimelineQueryBuilder::default();
1169 if let Some(limit) = NonZeroU64::new(1) {
1170 query = query.limit(limit).reverse(true);
1171 }
1172 let latest = mem.timeline(query.build())?;
1173 if let Some(entry) = latest.first() {
1174 mem.frame_by_id(entry.frame_id)?
1175 } else {
1176 mem.frame_by_id(frame_id)?
1177 }
1178 };
1179
1180 if args.json {
1181 let json = json!({
1182 "sequence": seq,
1183 "previous_frame_id": frame_id,
1184 "frame": frame_to_json(&updated_frame),
1185 "replaced_payload": replaced_payload,
1186 });
1187 println!("{}", serde_json::to_string_pretty(&json)?);
1188 } else {
1189 println!(
1190 "Updated frame {} → new frame {} (seq {})",
1191 frame_id, updated_frame.id, seq
1192 );
1193 println!(
1194 "Payload: {}",
1195 if replaced_payload {
1196 "replaced"
1197 } else {
1198 "reused"
1199 }
1200 );
1201 print_frame_summary(&mut mem, &updated_frame)?;
1202 }
1203
1204 Ok(())
1205}
1206
1207fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1208 if let Some(uri) = provided {
1209 let sanitized = sanitize_uri(uri, true);
1210 return format!("mv2://{}", sanitized);
1211 }
1212
1213 if let Some(path) = source {
1214 let raw = path.to_string_lossy();
1215 let sanitized = sanitize_uri(&raw, false);
1216 return format!("mv2://{}", sanitized);
1217 }
1218
1219 format!("mv2://frames/{}", Uuid::new_v4())
1220}
1221
1222pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1223 let digest = hash(payload).to_hex();
1224 let short = &digest[..32];
1225 let ext_from_path = source
1226 .and_then(|path| path.extension())
1227 .and_then(|ext| ext.to_str())
1228 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1229 .filter(|ext| !ext.is_empty());
1230 let ext = ext_from_path
1231 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1232 .unwrap_or_else(|| "bin".to_string());
1233 format!("mv2://video/{short}.{ext}")
1234}
1235
1236fn derive_title(
1237 provided: Option<String>,
1238 source: Option<&Path>,
1239 payload_utf8: Option<&str>,
1240) -> Option<String> {
1241 if let Some(title) = provided {
1242 let trimmed = title.trim();
1243 if trimmed.is_empty() {
1244 None
1245 } else {
1246 Some(trimmed.to_string())
1247 }
1248 } else {
1249 if let Some(text) = payload_utf8 {
1250 if let Some(markdown_title) = extract_markdown_title(text) {
1251 return Some(markdown_title);
1252 }
1253 }
1254 source
1255 .and_then(|path| path.file_stem())
1256 .and_then(|stem| stem.to_str())
1257 .map(to_title_case)
1258 }
1259}
1260
1261fn extract_markdown_title(text: &str) -> Option<String> {
1262 for line in text.lines() {
1263 let trimmed = line.trim();
1264 if trimmed.starts_with('#') {
1265 let title = trimmed.trim_start_matches('#').trim();
1266 if !title.is_empty() {
1267 return Some(title.to_string());
1268 }
1269 }
1270 }
1271 None
1272}
1273
1274fn to_title_case(input: &str) -> String {
1275 if input.is_empty() {
1276 return String::new();
1277 }
1278 let mut chars = input.chars();
1279 let Some(first) = chars.next() else {
1280 return String::new();
1281 };
1282 let prefix = first.to_uppercase().collect::<String>();
1283 prefix + chars.as_str()
1284}
1285
1286fn should_skip_input(path: &Path) -> bool {
1287 matches!(
1288 path.file_name().and_then(|name| name.to_str()),
1289 Some(".DS_Store")
1290 )
1291}
1292
1293fn should_skip_dir(path: &Path) -> bool {
1294 matches!(
1295 path.file_name().and_then(|name| name.to_str()),
1296 Some(name) if name.starts_with('.')
1297 )
1298}
1299
1300enum InputSet {
1301 Stdin,
1302 Files(Vec<PathBuf>),
1303}
1304
1305fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1306 let Some(raw) = input else {
1307 return Ok(InputSet::Stdin);
1308 };
1309
1310 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1311 let mut files = Vec::new();
1312 let mut matched_any = false;
1313 for entry in glob(raw)? {
1314 let path = entry?;
1315 if path.is_file() {
1316 matched_any = true;
1317 if should_skip_input(&path) {
1318 continue;
1319 }
1320 files.push(path);
1321 }
1322 }
1323 files.sort();
1324 if files.is_empty() {
1325 if matched_any {
1326 anyhow::bail!(
1327 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1328 );
1329 }
1330 anyhow::bail!("pattern '{raw}' did not match any files");
1331 }
1332 return Ok(InputSet::Files(files));
1333 }
1334
1335 let path = PathBuf::from(raw);
1336 if path.is_dir() {
1337 let (mut files, skipped_any) = collect_directory_files(&path)?;
1338 files.sort();
1339 if files.is_empty() {
1340 if skipped_any {
1341 anyhow::bail!(
1342 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1343 path.display()
1344 );
1345 }
1346 anyhow::bail!(
1347 "directory '{}' contains no ingestible files",
1348 path.display()
1349 );
1350 }
1351 Ok(InputSet::Files(files))
1352 } else {
1353 Ok(InputSet::Files(vec![path]))
1354 }
1355}
1356
1357fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1358 let mut files = Vec::new();
1359 let mut skipped_any = false;
1360 let mut stack = vec![root.to_path_buf()];
1361 while let Some(dir) = stack.pop() {
1362 for entry in fs::read_dir(&dir)? {
1363 let entry = entry?;
1364 let path = entry.path();
1365 let file_type = entry.file_type()?;
1366 if file_type.is_dir() {
1367 if should_skip_dir(&path) {
1368 skipped_any = true;
1369 continue;
1370 }
1371 stack.push(path);
1372 } else if file_type.is_file() {
1373 if should_skip_input(&path) {
1374 skipped_any = true;
1375 continue;
1376 }
1377 files.push(path);
1378 }
1379 }
1380 }
1381 Ok((files, skipped_any))
1382}
1383
1384struct IngestOutcome {
1385 report: PutReport,
1386 embedded: bool,
1387 #[cfg(feature = "parallel_segments")]
1388 parallel_input: Option<ParallelInput>,
1389}
1390
1391struct PutReport {
1392 seq: u64,
1393 uri: String,
1394 title: Option<String>,
1395 original_bytes: usize,
1396 stored_bytes: usize,
1397 compressed: bool,
1398 source: Option<PathBuf>,
1399 mime: Option<String>,
1400 metadata: Option<DocMetadata>,
1401 extraction: ExtractionSummary,
1402}
1403
1404struct CapacityGuard {
1405 #[allow(dead_code)]
1406 tier: Tier,
1407 capacity: u64,
1408 current_size: u64,
1409}
1410
1411impl CapacityGuard {
1412 const MIN_OVERHEAD: u64 = 128 * 1024;
1413 const RELATIVE_OVERHEAD: f64 = 0.05;
1414
1415 fn from_stats(stats: &Stats) -> Option<Self> {
1416 if stats.capacity_bytes == 0 {
1417 return None;
1418 }
1419 Some(Self {
1420 tier: stats.tier,
1421 capacity: stats.capacity_bytes,
1422 current_size: stats.size_bytes,
1423 })
1424 }
1425
1426 fn ensure_capacity(&self, additional: u64) -> Result<()> {
1427 let projected = self
1428 .current_size
1429 .saturating_add(additional)
1430 .saturating_add(Self::estimate_overhead(additional));
1431 if projected > self.capacity {
1432 return Err(map_put_error(
1433 MemvidError::CapacityExceeded {
1434 current: self.current_size,
1435 limit: self.capacity,
1436 required: projected,
1437 },
1438 Some(self.capacity),
1439 ));
1440 }
1441 Ok(())
1442 }
1443
1444 fn update_after_commit(&mut self, stats: &Stats) {
1445 self.current_size = stats.size_bytes;
1446 }
1447
1448 fn capacity_hint(&self) -> Option<u64> {
1449 Some(self.capacity)
1450 }
1451
1452 fn check_and_warn_capacity(&self) {
1454 if self.capacity == 0 {
1455 return;
1456 }
1457
1458 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
1459
1460 if usage_pct >= 90.0 && usage_pct < 91.0 {
1462 eprintln!(
1463 "⚠️ CRITICAL: 90% capacity used ({} / {})",
1464 format_bytes(self.current_size),
1465 format_bytes(self.capacity)
1466 );
1467 eprintln!(" Actions:");
1468 eprintln!(" 1. Recreate with larger --size");
1469 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
1470 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
1471 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
1472 eprintln!(
1473 "⚠️ WARNING: 75% capacity used ({} / {})",
1474 format_bytes(self.current_size),
1475 format_bytes(self.capacity)
1476 );
1477 eprintln!(" Consider planning for capacity expansion soon");
1478 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
1479 eprintln!(
1480 "ℹ️ INFO: 50% capacity used ({} / {})",
1481 format_bytes(self.current_size),
1482 format_bytes(self.capacity)
1483 );
1484 }
1485 }
1486
1487 fn estimate_overhead(additional: u64) -> u64 {
1488 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
1489 fractional.max(Self::MIN_OVERHEAD)
1490 }
1491}
1492
1493#[derive(Debug, Clone)]
1494pub struct FileAnalysis {
1495 pub mime: String,
1496 pub metadata: Option<DocMetadata>,
1497 pub search_text: Option<String>,
1498 pub extraction: ExtractionSummary,
1499}
1500
1501#[derive(Clone, Copy)]
1502pub struct AudioAnalyzeOptions {
1503 force: bool,
1504 segment_secs: u32,
1505}
1506
1507impl AudioAnalyzeOptions {
1508 const DEFAULT_SEGMENT_SECS: u32 = 30;
1509
1510 fn normalised_segment_secs(self) -> u32 {
1511 self.segment_secs.max(1)
1512 }
1513}
1514
1515impl Default for AudioAnalyzeOptions {
1516 fn default() -> Self {
1517 Self {
1518 force: false,
1519 segment_secs: Self::DEFAULT_SEGMENT_SECS,
1520 }
1521 }
1522}
1523
1524struct AudioAnalysis {
1525 metadata: DocAudioMetadata,
1526 caption: Option<String>,
1527 search_terms: Vec<String>,
1528}
1529
1530struct ImageAnalysis {
1531 width: u32,
1532 height: u32,
1533 palette: Vec<String>,
1534 caption: Option<String>,
1535 exif: Option<DocExifMetadata>,
1536}
1537
1538#[derive(Debug, Clone)]
1539pub struct ExtractionSummary {
1540 reader: Option<String>,
1541 status: ExtractionStatus,
1542 warnings: Vec<String>,
1543 duration_ms: Option<u64>,
1544 pages_processed: Option<u32>,
1545}
1546
1547impl ExtractionSummary {
1548 fn record_warning<S: Into<String>>(&mut self, warning: S) {
1549 self.warnings.push(warning.into());
1550 }
1551}
1552
1553#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1554enum ExtractionStatus {
1555 Skipped,
1556 Ok,
1557 FallbackUsed,
1558 Empty,
1559 Failed,
1560}
1561
1562impl ExtractionStatus {
1563 fn label(self) -> &'static str {
1564 match self {
1565 Self::Skipped => "skipped",
1566 Self::Ok => "ok",
1567 Self::FallbackUsed => "fallback_used",
1568 Self::Empty => "empty",
1569 Self::Failed => "failed",
1570 }
1571 }
1572}
1573
1574fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
1575 let filename = source_path
1576 .and_then(|path| path.file_name())
1577 .and_then(|name| name.to_str())
1578 .map(|value| value.to_string());
1579 MediaManifest {
1580 kind: "video".to_string(),
1581 mime: mime.to_string(),
1582 bytes,
1583 filename,
1584 duration_ms: None,
1585 width: None,
1586 height: None,
1587 codec: None,
1588 }
1589}
1590
1591pub fn analyze_file(
1592 source_path: Option<&Path>,
1593 payload: &[u8],
1594 payload_utf8: Option<&str>,
1595 inferred_title: Option<&str>,
1596 audio_opts: AudioAnalyzeOptions,
1597 force_video: bool,
1598) -> FileAnalysis {
1599 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
1600 let is_video = force_video || mime.starts_with("video/");
1601 let treat_as_text = if is_video { false } else { treat_as_text_base };
1602
1603 let mut metadata = DocMetadata::default();
1604 metadata.mime = Some(mime.clone());
1605 metadata.bytes = Some(payload.len() as u64);
1606 metadata.hash = Some(hash(payload).to_hex().to_string());
1607
1608 let mut search_text = None;
1609
1610 let mut extraction = ExtractionSummary {
1611 reader: None,
1612 status: ExtractionStatus::Skipped,
1613 warnings: Vec::new(),
1614 duration_ms: None,
1615 pages_processed: None,
1616 };
1617
1618 let reader_registry = default_reader_registry();
1619 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
1620 if slice.is_empty() {
1621 None
1622 } else {
1623 Some(slice)
1624 }
1625 });
1626
1627 let apply_extracted_text = |text: &str,
1628 reader_label: &str,
1629 status_if_applied: ExtractionStatus,
1630 extraction: &mut ExtractionSummary,
1631 metadata: &mut DocMetadata,
1632 search_text: &mut Option<String>|
1633 -> bool {
1634 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
1635 let mut value = normalized.text;
1636 if normalized.truncated {
1637 value.push('…');
1638 }
1639 if metadata.caption.is_none() {
1640 if let Some(caption) = caption_from_text(&value) {
1641 metadata.caption = Some(caption);
1642 }
1643 }
1644 *search_text = Some(value);
1645 extraction.reader = Some(reader_label.to_string());
1646 extraction.status = status_if_applied;
1647 true
1648 } else {
1649 false
1650 }
1651 };
1652
1653 if is_video {
1654 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
1655 }
1656
1657 if treat_as_text {
1658 if let Some(text) = payload_utf8 {
1659 if !apply_extracted_text(
1660 text,
1661 "bytes",
1662 ExtractionStatus::Ok,
1663 &mut extraction,
1664 &mut metadata,
1665 &mut search_text,
1666 ) {
1667 extraction.status = ExtractionStatus::Empty;
1668 extraction.reader = Some("bytes".to_string());
1669 let msg = "text payload contained no searchable content after normalization";
1670 extraction.record_warning(msg);
1671 warn!("{msg}");
1672 }
1673 } else {
1674 extraction.reader = Some("bytes".to_string());
1675 extraction.status = ExtractionStatus::Failed;
1676 let msg = "payload reported as text but was not valid UTF-8";
1677 extraction.record_warning(msg);
1678 warn!("{msg}");
1679 }
1680 } else if mime == "application/pdf" {
1681 let mut applied = false;
1682
1683 let format_hint = infer_document_format(Some(mime.as_str()), magic);
1684 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
1685 .with_uri(source_path.and_then(|p| p.to_str()))
1686 .with_magic(magic);
1687
1688 if let Some(reader) = reader_registry.find_reader(&hint) {
1689 match reader.extract(payload, &hint) {
1690 Ok(output) => {
1691 extraction.reader = Some(output.reader_name.clone());
1692 if let Some(ms) = output.diagnostics.duration_ms {
1693 extraction.duration_ms = Some(ms);
1694 }
1695 if let Some(pages) = output.diagnostics.pages_processed {
1696 extraction.pages_processed = Some(pages);
1697 }
1698 if let Some(mime_type) = output.document.mime_type.clone() {
1699 metadata.mime = Some(mime_type);
1700 }
1701
1702 for warning in output.diagnostics.warnings.iter() {
1703 extraction.record_warning(warning);
1704 warn!("{warning}");
1705 }
1706
1707 let status = if output.diagnostics.fallback {
1708 ExtractionStatus::FallbackUsed
1709 } else {
1710 ExtractionStatus::Ok
1711 };
1712
1713 if let Some(doc_text) = output.document.text.as_ref() {
1714 applied = apply_extracted_text(
1715 doc_text,
1716 output.reader_name.as_str(),
1717 status,
1718 &mut extraction,
1719 &mut metadata,
1720 &mut search_text,
1721 );
1722 if !applied {
1723 extraction.reader = Some(output.reader_name);
1724 extraction.status = ExtractionStatus::Empty;
1725 let msg = "primary reader returned no usable text";
1726 extraction.record_warning(msg);
1727 warn!("{msg}");
1728 }
1729 } else {
1730 extraction.reader = Some(output.reader_name);
1731 extraction.status = ExtractionStatus::Empty;
1732 let msg = "primary reader returned empty text";
1733 extraction.record_warning(msg);
1734 warn!("{msg}");
1735 }
1736 }
1737 Err(err) => {
1738 let name = reader.name();
1739 extraction.reader = Some(name.to_string());
1740 extraction.status = ExtractionStatus::Failed;
1741 let msg = format!("primary reader {name} failed: {err}");
1742 extraction.record_warning(&msg);
1743 warn!("{msg}");
1744 }
1745 }
1746 } else {
1747 extraction.record_warning("no registered reader matched this PDF payload");
1748 warn!("no registered reader matched this PDF payload");
1749 }
1750
1751 if !applied {
1752 match extract_pdf_text(payload) {
1753 Ok(pdf_text) => {
1754 if !apply_extracted_text(
1755 &pdf_text,
1756 "pdf_extract",
1757 ExtractionStatus::FallbackUsed,
1758 &mut extraction,
1759 &mut metadata,
1760 &mut search_text,
1761 ) {
1762 extraction.reader = Some("pdf_extract".to_string());
1763 extraction.status = ExtractionStatus::Empty;
1764 let msg = "PDF text extraction yielded no searchable content";
1765 extraction.record_warning(msg);
1766 warn!("{msg}");
1767 }
1768 }
1769 Err(err) => {
1770 extraction.reader = Some("pdf_extract".to_string());
1771 extraction.status = ExtractionStatus::Failed;
1772 let msg = format!("failed to extract PDF text via fallback: {err}");
1773 extraction.record_warning(&msg);
1774 warn!("{msg}");
1775 }
1776 }
1777 }
1778 }
1779
1780 if !is_video && mime.starts_with("image/") {
1781 if let Some(image_meta) = analyze_image(payload) {
1782 let ImageAnalysis {
1783 width,
1784 height,
1785 palette,
1786 caption,
1787 exif,
1788 } = image_meta;
1789 metadata.width = Some(width);
1790 metadata.height = Some(height);
1791 if !palette.is_empty() {
1792 metadata.colors = Some(palette);
1793 }
1794 if metadata.caption.is_none() {
1795 metadata.caption = caption;
1796 }
1797 if let Some(exif) = exif {
1798 metadata.exif = Some(exif);
1799 }
1800 }
1801 }
1802
1803 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
1804 if let Some(AudioAnalysis {
1805 metadata: audio_metadata,
1806 caption: audio_caption,
1807 mut search_terms,
1808 }) = analyze_audio(payload, source_path, &mime, audio_opts)
1809 {
1810 if metadata.audio.is_none()
1811 || metadata
1812 .audio
1813 .as_ref()
1814 .map_or(true, DocAudioMetadata::is_empty)
1815 {
1816 metadata.audio = Some(audio_metadata);
1817 }
1818 if metadata.caption.is_none() {
1819 metadata.caption = audio_caption;
1820 }
1821 if !search_terms.is_empty() {
1822 match &mut search_text {
1823 Some(existing) => {
1824 for term in search_terms.drain(..) {
1825 if !existing.contains(&term) {
1826 if !existing.ends_with(' ') {
1827 existing.push(' ');
1828 }
1829 existing.push_str(&term);
1830 }
1831 }
1832 }
1833 None => {
1834 search_text = Some(search_terms.join(" "));
1835 }
1836 }
1837 }
1838 }
1839 }
1840
1841 if metadata.caption.is_none() {
1842 if let Some(title) = inferred_title {
1843 let trimmed = title.trim();
1844 if !trimmed.is_empty() {
1845 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
1846 }
1847 }
1848 }
1849
1850 FileAnalysis {
1851 mime,
1852 metadata: if metadata.is_empty() {
1853 None
1854 } else {
1855 Some(metadata)
1856 },
1857 search_text,
1858 extraction,
1859 }
1860}
1861
1862fn default_reader_registry() -> &'static ReaderRegistry {
1863 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
1864 REGISTRY.get_or_init(ReaderRegistry::default)
1865}
1866
1867fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
1868 if detect_pdf_magic(magic) {
1869 return Some(DocumentFormat::Pdf);
1870 }
1871
1872 let mime = mime?.trim().to_ascii_lowercase();
1873 match mime.as_str() {
1874 "application/pdf" => Some(DocumentFormat::Pdf),
1875 "text/plain" => Some(DocumentFormat::PlainText),
1876 "text/markdown" => Some(DocumentFormat::Markdown),
1877 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
1878 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
1879 Some(DocumentFormat::Docx)
1880 }
1881 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
1882 Some(DocumentFormat::Pptx)
1883 }
1884 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
1885 Some(DocumentFormat::Xlsx)
1886 }
1887 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
1888 _ => None,
1889 }
1890}
1891
1892fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
1893 let mut slice = match magic {
1894 Some(slice) if !slice.is_empty() => slice,
1895 _ => return false,
1896 };
1897 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
1898 slice = &slice[3..];
1899 }
1900 while let Some((first, rest)) = slice.split_first() {
1901 if first.is_ascii_whitespace() {
1902 slice = rest;
1903 } else {
1904 break;
1905 }
1906 }
1907 slice.starts_with(b"%PDF")
1908}
1909
1910fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
1911 match pdf_extract::extract_text_from_mem(payload) {
1912 Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
1913 Ok(fallback) => Ok(fallback),
1914 Err(err) => {
1915 warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
1916 Ok(text)
1917 }
1918 },
1919 Err(err) => {
1920 warn!("pdf_extract failed: {err}");
1921 match fallback_pdftotext(payload) {
1922 Ok(fallback) => Ok(fallback),
1923 Err(fallback_err) => {
1924 warn!("pdftotext fallback failed: {fallback_err}");
1925 Err(err)
1926 }
1927 }
1928 }
1929 other => other,
1930 }
1931}
1932
1933fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
1934 use std::io::Write;
1935 use std::process::{Command, Stdio};
1936
1937 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
1938
1939 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
1940 temp_pdf
1941 .write_all(payload)
1942 .context("failed to write pdf payload to temp file")?;
1943 temp_pdf.flush().context("failed to flush temp pdf file")?;
1944
1945 let child = Command::new(pdftotext)
1946 .arg("-layout")
1947 .arg(temp_pdf.path())
1948 .arg("-")
1949 .stdout(Stdio::piped())
1950 .spawn()
1951 .context("failed to spawn pdftotext")?;
1952
1953 let output = child
1954 .wait_with_output()
1955 .context("failed to read pdftotext output")?;
1956
1957 if !output.status.success() {
1958 return Err(anyhow!("pdftotext exited with status {}", output.status));
1959 }
1960
1961 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
1962 Ok(text)
1963}
1964
1965fn detect_mime(
1966 source_path: Option<&Path>,
1967 payload: &[u8],
1968 payload_utf8: Option<&str>,
1969) -> (String, bool) {
1970 if let Some(kind) = infer::get(payload) {
1971 let mime = kind.mime_type().to_string();
1972 let treat_as_text = mime.starts_with("text/")
1973 || matches!(
1974 mime.as_str(),
1975 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
1976 );
1977 return (mime, treat_as_text);
1978 }
1979
1980 let magic = magic_from_u8(payload);
1981 if !magic.is_empty() && magic != "application/octet-stream" {
1982 let treat_as_text = magic.starts_with("text/")
1983 || matches!(
1984 magic,
1985 "application/json"
1986 | "application/xml"
1987 | "application/javascript"
1988 | "image/svg+xml"
1989 | "text/plain"
1990 );
1991 return (magic.to_string(), treat_as_text);
1992 }
1993
1994 if let Some(path) = source_path {
1995 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
1996 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
1997 return (mime.to_string(), treat_as_text);
1998 }
1999 }
2000 }
2001
2002 if payload_utf8.is_some() {
2003 return ("text/plain".to_string(), true);
2004 }
2005
2006 ("application/octet-stream".to_string(), false)
2007}
2008
2009fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2010 let ext_lower = ext.to_ascii_lowercase();
2011 match ext_lower.as_str() {
2012 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2013 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2014 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2015 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2016 "md" | "markdown" => Some(("text/markdown", true)),
2017 "rst" => Some(("text/x-rst", true)),
2018 "json" => Some(("application/json", true)),
2019 "csv" => Some(("text/csv", true)),
2020 "tsv" => Some(("text/tab-separated-values", true)),
2021 "yaml" | "yml" => Some(("application/yaml", true)),
2022 "toml" => Some(("application/toml", true)),
2023 "html" | "htm" => Some(("text/html", true)),
2024 "xml" => Some(("application/xml", true)),
2025 "svg" => Some(("image/svg+xml", true)),
2026 "gif" => Some(("image/gif", false)),
2027 "jpg" | "jpeg" => Some(("image/jpeg", false)),
2028 "png" => Some(("image/png", false)),
2029 "bmp" => Some(("image/bmp", false)),
2030 "ico" => Some(("image/x-icon", false)),
2031 "tif" | "tiff" => Some(("image/tiff", false)),
2032 "webp" => Some(("image/webp", false)),
2033 _ => None,
2034 }
2035}
2036
2037fn caption_from_text(text: &str) -> Option<String> {
2038 for line in text.lines() {
2039 let trimmed = line.trim();
2040 if !trimmed.is_empty() {
2041 return Some(truncate_to_boundary(trimmed, 240));
2042 }
2043 }
2044 None
2045}
2046
2047fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2048 if text.len() <= max_len {
2049 return text.to_string();
2050 }
2051 let mut end = max_len;
2052 while end > 0 && !text.is_char_boundary(end) {
2053 end -= 1;
2054 }
2055 if end == 0 {
2056 return String::new();
2057 }
2058 let mut truncated = text[..end].trim_end().to_string();
2059 truncated.push('…');
2060 truncated
2061}
2062
2063fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2064 let reader = ImageReader::new(Cursor::new(payload))
2065 .with_guessed_format()
2066 .map_err(|err| warn!("failed to guess image format: {err}"))
2067 .ok()?;
2068 let image = reader
2069 .decode()
2070 .map_err(|err| warn!("failed to decode image: {err}"))
2071 .ok()?;
2072 let width = image.width();
2073 let height = image.height();
2074 let rgb = image.to_rgb8();
2075 let palette = match get_palette(
2076 rgb.as_raw(),
2077 ColorFormat::Rgb,
2078 COLOR_PALETTE_SIZE,
2079 COLOR_PALETTE_QUALITY,
2080 ) {
2081 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2082 Err(err) => {
2083 warn!("failed to compute colour palette: {err}");
2084 Vec::new()
2085 }
2086 };
2087
2088 let (exif, exif_caption) = extract_exif_metadata(payload);
2089
2090 Some(ImageAnalysis {
2091 width,
2092 height,
2093 palette,
2094 caption: exif_caption,
2095 exif,
2096 })
2097}
2098
2099fn color_to_hex(color: Color) -> String {
2100 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2101}
2102
2103fn analyze_audio(
2104 payload: &[u8],
2105 source_path: Option<&Path>,
2106 mime: &str,
2107 options: AudioAnalyzeOptions,
2108) -> Option<AudioAnalysis> {
2109 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2110 use symphonia::core::errors::Error as SymphoniaError;
2111 use symphonia::core::formats::FormatOptions;
2112 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2113 use symphonia::core::meta::MetadataOptions;
2114 use symphonia::core::probe::Hint;
2115
2116 let cursor = Cursor::new(payload.to_vec());
2117 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2118
2119 let mut hint = Hint::new();
2120 if let Some(path) = source_path {
2121 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2122 hint.with_extension(ext);
2123 }
2124 }
2125 if let Some(suffix) = mime.split('/').nth(1) {
2126 hint.with_extension(suffix);
2127 }
2128
2129 let probed = symphonia::default::get_probe()
2130 .format(
2131 &hint,
2132 mss,
2133 &FormatOptions::default(),
2134 &MetadataOptions::default(),
2135 )
2136 .map_err(|err| warn!("failed to probe audio stream: {err}"))
2137 .ok()?;
2138
2139 let mut format = probed.format;
2140 let track = format.default_track()?;
2141 let track_id = track.id;
2142 let codec_params: CodecParameters = track.codec_params.clone();
2143 let sample_rate = codec_params.sample_rate;
2144 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2145
2146 let mut decoder = symphonia::default::get_codecs()
2147 .make(&codec_params, &DecoderOptions::default())
2148 .map_err(|err| warn!("failed to create audio decoder: {err}"))
2149 .ok()?;
2150
2151 let mut decoded_frames: u64 = 0;
2152 loop {
2153 let packet = match format.next_packet() {
2154 Ok(packet) => packet,
2155 Err(SymphoniaError::IoError(_)) => break,
2156 Err(SymphoniaError::DecodeError(err)) => {
2157 warn!("skipping undecodable audio packet: {err}");
2158 continue;
2159 }
2160 Err(SymphoniaError::ResetRequired) => {
2161 decoder.reset();
2162 continue;
2163 }
2164 Err(other) => {
2165 warn!("stopping audio analysis due to error: {other}");
2166 break;
2167 }
2168 };
2169
2170 if packet.track_id() != track_id {
2171 continue;
2172 }
2173
2174 match decoder.decode(&packet) {
2175 Ok(audio_buf) => {
2176 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2177 }
2178 Err(SymphoniaError::DecodeError(err)) => {
2179 warn!("failed to decode audio packet: {err}");
2180 }
2181 Err(SymphoniaError::IoError(_)) => break,
2182 Err(SymphoniaError::ResetRequired) => {
2183 decoder.reset();
2184 }
2185 Err(other) => {
2186 warn!("decoder error: {other}");
2187 break;
2188 }
2189 }
2190 }
2191
2192 let duration_secs = match sample_rate {
2193 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2194 _ => 0.0,
2195 };
2196
2197 let bitrate_kbps = if duration_secs > 0.0 {
2198 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2199 } else {
2200 None
2201 };
2202
2203 let tags = extract_lofty_tags(payload);
2204 let caption = tags
2205 .get("title")
2206 .cloned()
2207 .or_else(|| tags.get("tracktitle").cloned());
2208
2209 let mut search_terms = Vec::new();
2210 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2211 search_terms.push(title.clone());
2212 }
2213 if let Some(artist) = tags.get("artist") {
2214 search_terms.push(artist.clone());
2215 }
2216 if let Some(album) = tags.get("album") {
2217 search_terms.push(album.clone());
2218 }
2219 if let Some(genre) = tags.get("genre") {
2220 search_terms.push(genre.clone());
2221 }
2222
2223 let mut segments = Vec::new();
2224 if duration_secs > 0.0 {
2225 let segment_len = options.normalised_segment_secs() as f64;
2226 if segment_len > 0.0 {
2227 let mut start = 0.0;
2228 let mut idx = 1usize;
2229 while start < duration_secs {
2230 let end = (start + segment_len).min(duration_secs);
2231 segments.push(AudioSegmentMetadata {
2232 start_seconds: start as f32,
2233 end_seconds: end as f32,
2234 label: Some(format!("Segment {}", idx)),
2235 });
2236 if end >= duration_secs {
2237 break;
2238 }
2239 start = end;
2240 idx += 1;
2241 }
2242 }
2243 }
2244
2245 let mut metadata = DocAudioMetadata::default();
2246 if duration_secs > 0.0 {
2247 metadata.duration_secs = Some(duration_secs as f32);
2248 }
2249 if let Some(rate) = sample_rate {
2250 metadata.sample_rate_hz = Some(rate);
2251 }
2252 if let Some(channels) = channel_count {
2253 metadata.channels = Some(channels);
2254 }
2255 if let Some(bitrate) = bitrate_kbps {
2256 metadata.bitrate_kbps = Some(bitrate);
2257 }
2258 if codec_params.codec != CODEC_TYPE_NULL {
2259 metadata.codec = Some(format!("{:?}", codec_params.codec));
2260 }
2261 if !segments.is_empty() {
2262 metadata.segments = segments;
2263 }
2264 if !tags.is_empty() {
2265 metadata.tags = tags
2266 .iter()
2267 .map(|(k, v)| (k.clone(), v.clone()))
2268 .collect::<BTreeMap<_, _>>();
2269 }
2270
2271 Some(AudioAnalysis {
2272 metadata,
2273 caption,
2274 search_terms,
2275 })
2276}
2277
2278fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2279 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2280
2281 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2282 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2283 out.entry("title".into())
2284 .or_insert_with(|| value.to_string());
2285 out.entry("tracktitle".into())
2286 .or_insert_with(|| value.to_string());
2287 }
2288 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2289 out.entry("artist".into())
2290 .or_insert_with(|| value.to_string());
2291 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2292 out.entry("artist".into())
2293 .or_insert_with(|| value.to_string());
2294 }
2295 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2296 out.entry("album".into())
2297 .or_insert_with(|| value.to_string());
2298 }
2299 if let Some(value) = tag.get_string(&ItemKey::Genre) {
2300 out.entry("genre".into())
2301 .or_insert_with(|| value.to_string());
2302 }
2303 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2304 out.entry("track_number".into())
2305 .or_insert_with(|| value.to_string());
2306 }
2307 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2308 out.entry("disc_number".into())
2309 .or_insert_with(|| value.to_string());
2310 }
2311 }
2312
2313 let mut tags = HashMap::new();
2314 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2315 Ok(probe) => probe,
2316 Err(err) => {
2317 warn!("failed to guess audio tag file type: {err}");
2318 return tags;
2319 }
2320 };
2321
2322 let tagged_file = match probe.read() {
2323 Ok(file) => file,
2324 Err(err) => {
2325 warn!("failed to read audio tags: {err}");
2326 return tags;
2327 }
2328 };
2329
2330 if let Some(primary) = tagged_file.primary_tag() {
2331 collect_tag(primary, &mut tags);
2332 }
2333 for tag in tagged_file.tags() {
2334 collect_tag(tag, &mut tags);
2335 }
2336
2337 tags
2338}
2339
2340fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2341 let mut cursor = Cursor::new(payload);
2342 let exif = match ExifReader::new().read_from_container(&mut cursor) {
2343 Ok(exif) => exif,
2344 Err(err) => {
2345 warn!("failed to read EXIF metadata: {err}");
2346 return (None, None);
2347 }
2348 };
2349
2350 let mut doc = DocExifMetadata::default();
2351 let mut has_data = false;
2352
2353 if let Some(make) = exif_string(&exif, Tag::Make) {
2354 doc.make = Some(make);
2355 has_data = true;
2356 }
2357 if let Some(model) = exif_string(&exif, Tag::Model) {
2358 doc.model = Some(model);
2359 has_data = true;
2360 }
2361 if let Some(lens) =
2362 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
2363 {
2364 doc.lens = Some(lens);
2365 has_data = true;
2366 }
2367 if let Some(dt) =
2368 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
2369 {
2370 doc.datetime = Some(dt);
2371 has_data = true;
2372 }
2373
2374 if let Some(gps) = extract_gps_metadata(&exif) {
2375 doc.gps = Some(gps);
2376 has_data = true;
2377 }
2378
2379 let caption = exif_string(&exif, Tag::ImageDescription);
2380
2381 let metadata = if has_data { Some(doc) } else { None };
2382 (metadata, caption)
2383}
2384
2385fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
2386 exif.fields()
2387 .find(|field| field.tag == tag)
2388 .and_then(|field| field_to_string(field, exif))
2389}
2390
2391fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
2392 let value = field.display_value().with_unit(exif).to_string();
2393 let trimmed = value.trim_matches('\0').trim();
2394 if trimmed.is_empty() {
2395 None
2396 } else {
2397 Some(trimmed.to_string())
2398 }
2399}
2400
2401fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
2402 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
2403 let latitude_ref_field = exif
2404 .fields()
2405 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
2406 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
2407 let longitude_ref_field = exif
2408 .fields()
2409 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
2410
2411 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
2412 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
2413
2414 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
2415 if reference.eq_ignore_ascii_case("S") {
2416 latitude = -latitude;
2417 }
2418 }
2419 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
2420 if reference.eq_ignore_ascii_case("W") {
2421 longitude = -longitude;
2422 }
2423 }
2424
2425 Some(DocGpsMetadata {
2426 latitude,
2427 longitude,
2428 })
2429}
2430
2431fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
2432 match value {
2433 ExifValue::Rational(values) if !values.is_empty() => {
2434 let deg = rational_to_f64_u(values.get(0)?)?;
2435 let min = values
2436 .get(1)
2437 .and_then(|v| rational_to_f64_u(v))
2438 .unwrap_or(0.0);
2439 let sec = values
2440 .get(2)
2441 .and_then(|v| rational_to_f64_u(v))
2442 .unwrap_or(0.0);
2443 Some(deg + (min / 60.0) + (sec / 3600.0))
2444 }
2445 ExifValue::SRational(values) if !values.is_empty() => {
2446 let deg = rational_to_f64_i(values.get(0)?)?;
2447 let min = values
2448 .get(1)
2449 .and_then(|v| rational_to_f64_i(v))
2450 .unwrap_or(0.0);
2451 let sec = values
2452 .get(2)
2453 .and_then(|v| rational_to_f64_i(v))
2454 .unwrap_or(0.0);
2455 Some(deg + (min / 60.0) + (sec / 3600.0))
2456 }
2457 _ => None,
2458 }
2459}
2460
2461fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
2462 if value.denom == 0 {
2463 None
2464 } else {
2465 Some(value.num as f64 / value.denom as f64)
2466 }
2467}
2468
2469fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
2470 if value.denom == 0 {
2471 None
2472 } else {
2473 Some(value.num as f64 / value.denom as f64)
2474 }
2475}
2476
2477fn value_to_ascii(value: &ExifValue) -> Option<String> {
2478 if let ExifValue::Ascii(values) = value {
2479 values
2480 .first()
2481 .and_then(|bytes| std::str::from_utf8(bytes).ok())
2482 .map(|s| s.trim_matches('\0').trim().to_string())
2483 .filter(|s| !s.is_empty())
2484 } else {
2485 None
2486 }
2487}
2488
2489fn ingest_payload(
2490 mem: &mut Memvid,
2491 args: &PutArgs,
2492 payload: Vec<u8>,
2493 source_path: Option<&Path>,
2494 capacity_guard: Option<&mut CapacityGuard>,
2495 enable_embedding: bool,
2496 runtime: Option<&EmbeddingRuntime>,
2497 parallel_mode: bool,
2498) -> Result<IngestOutcome> {
2499 let original_bytes = payload.len();
2500 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
2501
2502 let payload_text = std::str::from_utf8(&payload).ok();
2503 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
2504
2505 let audio_opts = AudioAnalyzeOptions {
2506 force: args.audio,
2507 segment_secs: args
2508 .audio_segment_seconds
2509 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
2510 };
2511
2512 let mut analysis = analyze_file(
2513 source_path,
2514 &payload,
2515 payload_text,
2516 inferred_title.as_deref(),
2517 audio_opts,
2518 args.video,
2519 );
2520
2521 if args.video && !analysis.mime.starts_with("video/") {
2522 anyhow::bail!(
2523 "--video requires a video input; detected MIME type {}",
2524 analysis.mime
2525 );
2526 }
2527
2528 let mut search_text = analysis.search_text.clone();
2529 if let Some(ref mut text) = search_text {
2530 if text.len() > MAX_SEARCH_TEXT_LEN {
2531 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
2532 *text = truncated;
2533 }
2534 }
2535 analysis.search_text = search_text.clone();
2536
2537 let uri = if let Some(ref explicit) = args.uri {
2538 derive_uri(Some(explicit), None)
2539 } else if args.video {
2540 derive_video_uri(&payload, source_path, &analysis.mime)
2541 } else {
2542 derive_uri(None, source_path)
2543 };
2544
2545 let mut options_builder = PutOptions::builder()
2546 .enable_embedding(enable_embedding)
2547 .auto_tag(!args.no_auto_tag)
2548 .extract_dates(!args.no_extract_dates);
2549 if let Some(ts) = args.timestamp {
2550 options_builder = options_builder.timestamp(ts);
2551 }
2552 if let Some(track) = &args.track {
2553 options_builder = options_builder.track(track.clone());
2554 }
2555 if args.video {
2556 options_builder = options_builder.kind("video");
2557 options_builder = options_builder.tag("kind", "video");
2558 options_builder = options_builder.push_tag("video");
2559 } else if let Some(kind) = &args.kind {
2560 options_builder = options_builder.kind(kind.clone());
2561 }
2562 options_builder = options_builder.uri(uri.clone());
2563 if let Some(ref title) = inferred_title {
2564 options_builder = options_builder.title(title.clone());
2565 }
2566 for (key, value) in &args.tags {
2567 options_builder = options_builder.tag(key.clone(), value.clone());
2568 if !key.is_empty() {
2569 options_builder = options_builder.push_tag(key.clone());
2570 }
2571 if !value.is_empty() && value != key {
2572 options_builder = options_builder.push_tag(value.clone());
2573 }
2574 }
2575 for label in &args.labels {
2576 options_builder = options_builder.label(label.clone());
2577 }
2578 if let Some(metadata) = analysis.metadata.clone() {
2579 if !metadata.is_empty() {
2580 options_builder = options_builder.metadata(metadata);
2581 }
2582 }
2583 for (idx, entry) in args.metadata.iter().enumerate() {
2584 match serde_json::from_str::<DocMetadata>(entry) {
2585 Ok(meta) => {
2586 options_builder = options_builder.metadata(meta);
2587 }
2588 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
2589 Ok(value) => {
2590 options_builder =
2591 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
2592 }
2593 Err(err) => {
2594 warn!("failed to parse --metadata JSON: {err}");
2595 }
2596 },
2597 }
2598 }
2599 if let Some(text) = search_text.clone() {
2600 if !text.trim().is_empty() {
2601 options_builder = options_builder.search_text(text);
2602 }
2603 }
2604 let options = options_builder.build();
2605
2606 let existing_frame = if args.update_existing || !args.allow_duplicate {
2607 match mem.frame_by_uri(&uri) {
2608 Ok(frame) => Some(frame),
2609 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
2610 Err(err) => return Err(err.into()),
2611 }
2612 } else {
2613 None
2614 };
2615
2616 let mut embedded = false;
2617 let allow_embedding = enable_embedding && !args.video;
2618 if enable_embedding && !allow_embedding && args.video {
2619 warn!("semantic embeddings are not generated for video payloads");
2620 }
2621 let seq = if let Some(existing) = existing_frame {
2622 if args.update_existing {
2623 let payload_for_update = payload.clone();
2624 if allow_embedding {
2625 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2626 let embed_text = search_text
2627 .clone()
2628 .or_else(|| payload_text.map(|text| text.to_string()))
2629 .unwrap_or_default();
2630 if embed_text.trim().is_empty() {
2631 warn!("no textual content available; embedding skipped");
2632 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
2633 .map_err(|err| {
2634 map_put_error(
2635 err,
2636 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
2637 )
2638 })?
2639 } else {
2640 let truncated_text = truncate_for_embedding(&embed_text);
2642 if truncated_text.len() < embed_text.len() {
2643 info!("Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2644 }
2645 let embedding = runtime.embed(&truncated_text)?;
2646 embedded = true;
2647 mem.update_frame(
2648 existing.id,
2649 Some(payload_for_update),
2650 options.clone(),
2651 Some(embedding),
2652 )
2653 .map_err(|err| {
2654 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2655 })?
2656 }
2657 } else {
2658 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
2659 .map_err(|err| {
2660 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2661 })?
2662 }
2663 } else {
2664 return Err(DuplicateUriError::new(uri.as_str()).into());
2665 }
2666 } else {
2667 if let Some(guard) = capacity_guard.as_ref() {
2668 guard.ensure_capacity(stored_bytes as u64)?;
2669 }
2670 if parallel_mode {
2671 #[cfg(feature = "parallel_segments")]
2672 {
2673 let mut parent_embedding = None;
2674 let mut chunk_embeddings_vec = None;
2675 if allow_embedding {
2676 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2677 let embed_text = search_text
2678 .clone()
2679 .or_else(|| payload_text.map(|text| text.to_string()))
2680 .unwrap_or_default();
2681 if embed_text.trim().is_empty() {
2682 warn!("no textual content available; embedding skipped");
2683 } else {
2684 info!("parallel mode: checking for chunks on {} bytes payload", payload.len());
2686 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
2687 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
2689
2690 #[cfg(feature = "temporal_enrich")]
2692 let enriched_chunks = if args.temporal_enrich {
2693 info!("Temporal enrichment enabled, processing {} chunks", chunk_texts.len());
2694 let today = chrono::Local::now().date_naive();
2696 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
2697 let enriched: Vec<String> = results.iter()
2699 .map(|(text, _)| text.clone())
2700 .collect();
2701 let resolved_count = results.iter()
2702 .filter(|(_, e)| e.relative_phrases.iter().any(|p| p.resolved.is_some()))
2703 .count();
2704 info!("Temporal enrichment: resolved {} chunks with temporal context", resolved_count);
2705 enriched
2706 } else {
2707 chunk_texts.clone()
2708 };
2709 #[cfg(not(feature = "temporal_enrich"))]
2710 let enriched_chunks = {
2711 if args.temporal_enrich {
2712 warn!("Temporal enrichment requested but feature not compiled in");
2713 }
2714 chunk_texts.clone()
2715 };
2716
2717 let embed_chunks = if args.contextual {
2719 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
2720 let engine = if args.contextual_model.as_deref() == Some("local") {
2721 #[cfg(feature = "llama-cpp")]
2722 {
2723 let model_path = get_local_contextual_model_path()?;
2724 ContextualEngine::local(model_path)
2725 }
2726 #[cfg(not(feature = "llama-cpp"))]
2727 {
2728 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
2729 }
2730 } else if let Some(model) = &args.contextual_model {
2731 ContextualEngine::openai_with_model(model)?
2732 } else {
2733 ContextualEngine::openai()?
2734 };
2735
2736 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
2737 Ok(contexts) => {
2738 info!("Generated {} contextual prefixes", contexts.len());
2739 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
2740 }
2741 Err(e) => {
2742 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
2743 enriched_chunks.clone()
2744 }
2745 }
2746 } else {
2747 enriched_chunks
2748 };
2749
2750 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
2751 for chunk_text in &embed_chunks {
2752 let truncated_chunk = truncate_for_embedding(chunk_text);
2754 if truncated_chunk.len() < chunk_text.len() {
2755 info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
2756 }
2757 let chunk_emb = runtime.embed(&truncated_chunk)?;
2758 chunk_embeddings.push(chunk_emb);
2759 }
2760 let num_chunks = chunk_embeddings.len();
2761 chunk_embeddings_vec = Some(chunk_embeddings);
2762 let parent_text = truncate_for_embedding(&embed_text);
2764 if parent_text.len() < embed_text.len() {
2765 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
2766 }
2767 parent_embedding = Some(runtime.embed(&parent_text)?);
2768 embedded = true;
2769 info!("parallel mode: Generated {} chunk embeddings + 1 parent embedding", num_chunks);
2770 } else {
2771 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
2773 let truncated_text = truncate_for_embedding(&embed_text);
2774 if truncated_text.len() < embed_text.len() {
2775 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2776 }
2777 parent_embedding = Some(runtime.embed(&truncated_text)?);
2778 embedded = true;
2779 }
2780 }
2781 }
2782 let payload_variant = if let Some(path) = source_path {
2783 ParallelPayload::Path(path.to_path_buf())
2784 } else {
2785 ParallelPayload::Bytes(payload)
2786 };
2787 let input = ParallelInput {
2788 payload: payload_variant,
2789 options: options.clone(),
2790 embedding: parent_embedding,
2791 chunk_embeddings: chunk_embeddings_vec,
2792 };
2793 return Ok(IngestOutcome {
2794 report: PutReport {
2795 seq: 0,
2796 uri,
2797 title: inferred_title,
2798 original_bytes,
2799 stored_bytes,
2800 compressed,
2801 source: source_path.map(Path::to_path_buf),
2802 mime: Some(analysis.mime),
2803 metadata: analysis.metadata,
2804 extraction: analysis.extraction,
2805 },
2806 embedded,
2807 parallel_input: Some(input),
2808 });
2809 }
2810 #[cfg(not(feature = "parallel_segments"))]
2811 {
2812 mem.put_bytes_with_options(&payload, options)
2813 .map_err(|err| {
2814 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2815 })?
2816 }
2817 } else if allow_embedding {
2818 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2819 let embed_text = search_text
2820 .clone()
2821 .or_else(|| payload_text.map(|text| text.to_string()))
2822 .unwrap_or_default();
2823 if embed_text.trim().is_empty() {
2824 warn!("no textual content available; embedding skipped");
2825 mem.put_bytes_with_options(&payload, options)
2826 .map_err(|err| {
2827 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2828 })?
2829 } else {
2830 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
2832 info!("Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
2834
2835 #[cfg(feature = "temporal_enrich")]
2837 let enriched_chunks = if args.temporal_enrich {
2838 info!("Temporal enrichment enabled, processing {} chunks", chunk_texts.len());
2839 let today = chrono::Local::now().date_naive();
2841 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
2842 let enriched: Vec<String> = results.iter()
2844 .map(|(text, _)| text.clone())
2845 .collect();
2846 let resolved_count = results.iter()
2847 .filter(|(_, e)| e.relative_phrases.iter().any(|p| p.resolved.is_some()))
2848 .count();
2849 info!("Temporal enrichment: resolved {} chunks with temporal context", resolved_count);
2850 enriched
2851 } else {
2852 chunk_texts.clone()
2853 };
2854 #[cfg(not(feature = "temporal_enrich"))]
2855 let enriched_chunks = {
2856 if args.temporal_enrich {
2857 warn!("Temporal enrichment requested but feature not compiled in");
2858 }
2859 chunk_texts.clone()
2860 };
2861
2862 let embed_chunks = if args.contextual {
2864 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
2865 let engine = if args.contextual_model.as_deref() == Some("local") {
2866 #[cfg(feature = "llama-cpp")]
2867 {
2868 let model_path = get_local_contextual_model_path()?;
2869 ContextualEngine::local(model_path)
2870 }
2871 #[cfg(not(feature = "llama-cpp"))]
2872 {
2873 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
2874 }
2875 } else if let Some(model) = &args.contextual_model {
2876 ContextualEngine::openai_with_model(model)?
2877 } else {
2878 ContextualEngine::openai()?
2879 };
2880
2881 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
2882 Ok(contexts) => {
2883 info!("Generated {} contextual prefixes", contexts.len());
2884 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
2885 }
2886 Err(e) => {
2887 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
2888 enriched_chunks.clone()
2889 }
2890 }
2891 } else {
2892 enriched_chunks
2893 };
2894
2895 let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
2896 for chunk_text in &embed_chunks {
2897 let truncated_chunk = truncate_for_embedding(chunk_text);
2899 if truncated_chunk.len() < chunk_text.len() {
2900 info!("Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
2901 }
2902 let chunk_emb = runtime.embed(&truncated_chunk)?;
2903 chunk_embeddings.push(chunk_emb);
2904 }
2905 let parent_text = truncate_for_embedding(&embed_text);
2907 if parent_text.len() < embed_text.len() {
2908 info!("Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
2909 }
2910 let parent_embedding = runtime.embed(&parent_text)?;
2911 embedded = true;
2912 info!("Calling put_with_chunk_embeddings with {} chunk embeddings", chunk_embeddings.len());
2913 mem.put_with_chunk_embeddings(&payload, Some(parent_embedding), chunk_embeddings, options)
2914 .map_err(|err| {
2915 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2916 })?
2917 } else {
2918 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
2920 let truncated_text = truncate_for_embedding(&embed_text);
2921 if truncated_text.len() < embed_text.len() {
2922 info!("Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2923 }
2924 let embedding = runtime.embed(&truncated_text)?;
2925 embedded = true;
2926 mem.put_with_embedding_and_options(&payload, embedding, options)
2927 .map_err(|err| {
2928 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2929 })?
2930 }
2931 }
2932 } else {
2933 mem.put_bytes_with_options(&payload, options)
2934 .map_err(|err| {
2935 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2936 })?
2937 }
2938 };
2939
2940 Ok(IngestOutcome {
2941 report: PutReport {
2942 seq,
2943 uri,
2944 title: inferred_title,
2945 original_bytes,
2946 stored_bytes,
2947 compressed,
2948 source: source_path.map(Path::to_path_buf),
2949 mime: Some(analysis.mime),
2950 metadata: analysis.metadata,
2951 extraction: analysis.extraction,
2952 },
2953 embedded,
2954 #[cfg(feature = "parallel_segments")]
2955 parallel_input: None,
2956 })
2957}
2958
2959fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
2960 if std::str::from_utf8(payload).is_ok() {
2961 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
2962 Ok((compressed.len(), true))
2963 } else {
2964 Ok((payload.len(), false))
2965 }
2966}
2967
2968fn print_report(report: &PutReport) {
2969 let name = report
2970 .source
2971 .as_ref()
2972 .map(|path| {
2973 let pretty = env::current_dir()
2974 .ok()
2975 .as_ref()
2976 .and_then(|cwd| diff_paths(path, cwd))
2977 .unwrap_or_else(|| path.to_path_buf());
2978 pretty.to_string_lossy().into_owned()
2979 })
2980 .unwrap_or_else(|| "stdin".to_string());
2981
2982 let ratio = if report.original_bytes > 0 {
2983 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
2984 } else {
2985 100.0
2986 };
2987
2988 println!("• {name} → {}", report.uri);
2989 println!(" seq: {}", report.seq);
2990 if report.compressed {
2991 println!(
2992 " size: {} B → {} B ({:.1}% of original, compressed)",
2993 report.original_bytes, report.stored_bytes, ratio
2994 );
2995 } else {
2996 println!(" size: {} B (stored as-is)", report.original_bytes);
2997 }
2998 if let Some(mime) = &report.mime {
2999 println!(" mime: {mime}");
3000 }
3001 if let Some(title) = &report.title {
3002 println!(" title: {title}");
3003 }
3004 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3005 println!(
3006 " extraction: status={} reader={}",
3007 report.extraction.status.label(),
3008 extraction_reader
3009 );
3010 if let Some(ms) = report.extraction.duration_ms {
3011 println!(" extraction-duration: {} ms", ms);
3012 }
3013 if let Some(pages) = report.extraction.pages_processed {
3014 println!(" extraction-pages: {}", pages);
3015 }
3016 for warning in &report.extraction.warnings {
3017 println!(" warning: {warning}");
3018 }
3019 if let Some(metadata) = &report.metadata {
3020 if let Some(caption) = &metadata.caption {
3021 println!(" caption: {caption}");
3022 }
3023 if let Some(audio) = metadata.audio.as_ref() {
3024 if audio.duration_secs.is_some()
3025 || audio.sample_rate_hz.is_some()
3026 || audio.channels.is_some()
3027 {
3028 let duration = audio
3029 .duration_secs
3030 .map(|secs| format!("{secs:.1}s"))
3031 .unwrap_or_else(|| "unknown".into());
3032 let rate = audio
3033 .sample_rate_hz
3034 .map(|hz| format!("{} Hz", hz))
3035 .unwrap_or_else(|| "? Hz".into());
3036 let channels = audio
3037 .channels
3038 .map(|ch| ch.to_string())
3039 .unwrap_or_else(|| "?".into());
3040 println!(" audio: duration {duration}, {channels} ch, {rate}");
3041 }
3042 }
3043 }
3044
3045 println!();
3046}
3047
3048fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3049 let source = report
3050 .source
3051 .as_ref()
3052 .map(|path| path.to_string_lossy().into_owned());
3053 let extraction = json!({
3054 "status": report.extraction.status.label(),
3055 "reader": report.extraction.reader,
3056 "duration_ms": report.extraction.duration_ms,
3057 "pages_processed": report.extraction.pages_processed,
3058 "warnings": report.extraction.warnings,
3059 });
3060 json!({
3061 "seq": report.seq,
3062 "uri": report.uri,
3063 "title": report.title,
3064 "original_bytes": report.original_bytes,
3065 "original_bytes_human": format_bytes(report.original_bytes as u64),
3066 "stored_bytes": report.stored_bytes,
3067 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3068 "compressed": report.compressed,
3069 "mime": report.mime,
3070 "source": source,
3071 "metadata": report.metadata,
3072 "extraction": extraction,
3073 })
3074}
3075
3076fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3077 match err {
3078 MemvidError::CapacityExceeded {
3079 current,
3080 limit,
3081 required,
3082 } => anyhow!(CapacityExceededMessage {
3083 current,
3084 limit,
3085 required,
3086 }),
3087 other => other.into(),
3088 }
3089}
3090
3091fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3092 for (idx, entry) in entries.iter().enumerate() {
3093 match serde_json::from_str::<DocMetadata>(entry) {
3094 Ok(meta) => {
3095 options.metadata = Some(meta);
3096 }
3097 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3098 Ok(value) => {
3099 options
3100 .extra_metadata
3101 .insert(format!("custom_metadata_{idx}"), value.to_string());
3102 }
3103 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3104 },
3105 }
3106 }
3107}
3108
3109fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3110 let stats = mem.stats()?;
3111 let message = match stats.tier {
3112 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3113 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3114 };
3115 Ok(message)
3116}
3117
3118fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3119 let trimmed = raw.trim().trim_start_matches("mv2://");
3120 let normalized = trimmed.replace('\\', "/");
3121 let mut segments: Vec<String> = Vec::new();
3122 for segment in normalized.split('/') {
3123 if segment.is_empty() || segment == "." {
3124 continue;
3125 }
3126 if segment == ".." {
3127 segments.pop();
3128 continue;
3129 }
3130 segments.push(segment.to_string());
3131 }
3132
3133 if segments.is_empty() {
3134 return normalized
3135 .split('/')
3136 .last()
3137 .filter(|s| !s.is_empty())
3138 .unwrap_or("document")
3139 .to_string();
3140 }
3141
3142 if keep_path {
3143 segments.join("/")
3144 } else {
3145 segments
3146 .last()
3147 .cloned()
3148 .unwrap_or_else(|| "document".to_string())
3149 }
3150}
3151
3152fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3153 if quiet {
3154 let pb = ProgressBar::hidden();
3155 pb.set_message(message.to_string());
3156 return pb;
3157 }
3158
3159 match total {
3160 Some(len) => {
3161 let pb = ProgressBar::new(len);
3162 pb.set_draw_target(ProgressDrawTarget::stderr());
3163 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3164 .unwrap_or_else(|_| ProgressStyle::default_bar());
3165 pb.set_style(style);
3166 pb.set_message(message.to_string());
3167 pb
3168 }
3169 None => {
3170 let pb = ProgressBar::new_spinner();
3171 pb.set_draw_target(ProgressDrawTarget::stderr());
3172 pb.set_message(message.to_string());
3173 pb.enable_steady_tick(Duration::from_millis(120));
3174 pb
3175 }
3176 }
3177}
3178
3179fn create_spinner(message: &str) -> ProgressBar {
3180 let pb = ProgressBar::new_spinner();
3181 pb.set_draw_target(ProgressDrawTarget::stderr());
3182 let style = ProgressStyle::with_template("{spinner} {msg}")
3183 .unwrap_or_else(|_| ProgressStyle::default_spinner())
3184 .tick_strings(&["-", "\\", "|", "/"]);
3185 pb.set_style(style);
3186 pb.set_message(message.to_string());
3187 pb.enable_steady_tick(Duration::from_millis(120));
3188 pb
3189}
3190
3191#[cfg(feature = "parallel_segments")]
3197#[derive(Args)]
3198pub struct PutManyArgs {
3199 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3201 pub file: PathBuf,
3202 #[arg(long)]
3204 pub json: bool,
3205 #[arg(long, value_name = "PATH")]
3208 pub input: Option<PathBuf>,
3209 #[arg(long, default_value = "3")]
3211 pub compression_level: i32,
3212 #[command(flatten)]
3213 pub lock: LockCliArgs,
3214}
3215
3216#[cfg(feature = "parallel_segments")]
3218#[derive(Debug, serde::Deserialize)]
3219pub struct PutManyRequest {
3220 pub title: String,
3221 #[serde(default)]
3222 pub label: String,
3223 pub text: String,
3224 #[serde(default)]
3225 pub uri: Option<String>,
3226 #[serde(default)]
3227 pub tags: Vec<String>,
3228 #[serde(default)]
3229 pub labels: Vec<String>,
3230 #[serde(default)]
3231 pub metadata: serde_json::Value,
3232 #[serde(default)]
3234 pub embedding: Option<Vec<f32>>,
3235 #[serde(default)]
3239 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3240}
3241
3242#[cfg(feature = "parallel_segments")]
3244#[derive(Debug, serde::Deserialize)]
3245#[serde(transparent)]
3246pub struct PutManyBatch {
3247 pub requests: Vec<PutManyRequest>,
3248}
3249
3250#[cfg(feature = "parallel_segments")]
3251pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3252 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3253
3254 let mut mem = Memvid::open(&args.file)?;
3255 crate::utils::ensure_cli_mutation_allowed(&mem)?;
3256 crate::utils::apply_lock_cli(&mut mem, &args.lock);
3257
3258 let stats = mem.stats()?;
3260 if !stats.has_lex_index {
3261 mem.enable_lex()?;
3262 }
3263 if !stats.has_vec_index {
3264 mem.enable_vec()?;
3265 }
3266
3267 let batch_json: String = if let Some(input_path) = &args.input {
3269 fs::read_to_string(input_path)
3270 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3271 } else {
3272 let mut buffer = String::new();
3273 io::stdin()
3274 .read_line(&mut buffer)
3275 .context("Failed to read from stdin")?;
3276 buffer
3277 };
3278
3279 let batch: PutManyBatch =
3280 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3281
3282 if batch.requests.is_empty() {
3283 if args.json {
3284 println!("{{\"frame_ids\": [], \"count\": 0}}");
3285 } else {
3286 println!("No requests to process");
3287 }
3288 return Ok(());
3289 }
3290
3291 let count = batch.requests.len();
3292 if !args.json {
3293 eprintln!("Processing {} documents...", count);
3294 }
3295
3296 let inputs: Vec<ParallelInput> = batch
3298 .requests
3299 .into_iter()
3300 .enumerate()
3301 .map(|(i, req)| {
3302 let uri = req
3303 .uri
3304 .unwrap_or_else(|| format!("mv2://batch/{}", i));
3305 let mut options = PutOptions::default();
3306 options.title = Some(req.title);
3307 options.uri = Some(uri);
3308 options.tags = req.tags;
3309 options.labels = req.labels;
3310
3311 ParallelInput {
3312 payload: ParallelPayload::Bytes(req.text.into_bytes()),
3313 options,
3314 embedding: req.embedding,
3315 chunk_embeddings: req.chunk_embeddings,
3316 }
3317 })
3318 .collect();
3319
3320 let build_opts = BuildOpts {
3322 zstd_level: args.compression_level.clamp(1, 9),
3323 ..BuildOpts::default()
3324 };
3325
3326 let frame_ids = mem
3328 .put_parallel_inputs(&inputs, build_opts)
3329 .context("Failed to ingest batch")?;
3330
3331 if args.json {
3332 let output = serde_json::json!({
3333 "frame_ids": frame_ids,
3334 "count": frame_ids.len()
3335 });
3336 println!("{}", serde_json::to_string(&output)?);
3337 } else {
3338 println!("Ingested {} documents", frame_ids.len());
3339 if frame_ids.len() <= 10 {
3340 for fid in &frame_ids {
3341 println!(" frame_id: {}", fid);
3342 }
3343 } else {
3344 println!(" first: {}", frame_ids.first().unwrap_or(&0));
3345 println!(" last: {}", frame_ids.last().unwrap_or(&0));
3346 }
3347 }
3348
3349 Ok(())
3350}