1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32 DocMetadata, DocumentFormat, EnrichmentEngine, MediaManifest, Memvid, MemvidError, PutOptions,
33 ReaderHint, ReaderRegistry, RulesEngine, Stats, Tier, TimelineQueryBuilder,
34 MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50
51fn parse_timestamp(s: &str) -> Result<i64, String> {
58 let s = s.trim();
59
60 if s.chars().all(|c| c.is_ascii_digit()) ||
62 (s.starts_with('-') && s[1..].chars().all(|c| c.is_ascii_digit())) {
63 return s.parse::<i64>().map_err(|e| format!("Invalid epoch timestamp: {e}"));
64 }
65
66 use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
68
69 fn date_to_epoch(date: NaiveDate) -> i64 {
71 let datetime = NaiveDateTime::new(date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
72 Utc.from_utc_datetime(&datetime).timestamp()
73 }
74
75 if let Ok(date) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
77 return Ok(date_to_epoch(date));
78 }
79
80 if let Ok(date) = NaiveDate::parse_from_str(s, "%b %d, %Y") {
82 return Ok(date_to_epoch(date));
83 }
84 if let Ok(date) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
85 return Ok(date_to_epoch(date));
86 }
87
88 if let Ok(date) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
90 return Ok(date_to_epoch(date));
91 }
92
93 if let Ok(date) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
95 return Ok(date_to_epoch(date));
96 }
97 if let Ok(date) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
98 return Ok(date_to_epoch(date));
99 }
100
101 Err(format!(
102 "Invalid timestamp format: '{}'. Use epoch seconds (1673740800) or date format (Jan 15, 2023 / 2023-01-15 / 01/15/2023)",
103 s
104 ))
105}
106const COLOR_PALETTE_SIZE: u8 = 5;
107const COLOR_PALETTE_QUALITY: u8 = 8;
108const MAGIC_SNIFF_BYTES: usize = 16;
109#[cfg(feature = "clip")]
112const CLIP_PDF_MAX_IMAGES: usize = 100;
113#[cfg(feature = "clip")]
114const CLIP_PDF_TARGET_PX: u32 = 768;
115
116fn truncate_for_embedding(text: &str) -> String {
119 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
120 text.to_string()
121 } else {
122 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
124 let end = truncated
126 .char_indices()
127 .rev()
128 .next()
129 .map(|(i, c)| i + c.len_utf8())
130 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
131 text[..end].to_string()
132 }
133}
134
135fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
136 options.extra_metadata.insert(
137 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
138 runtime.provider_kind().to_string(),
139 );
140 options.extra_metadata.insert(
141 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
142 runtime.provider_model_id(),
143 );
144 options.extra_metadata.insert(
145 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
146 runtime.dimension().to_string(),
147 );
148}
149
150fn apply_embedding_identity_metadata_from_choice(
151 options: &mut PutOptions,
152 model: EmbeddingModelChoice,
153 dimension: usize,
154 model_id_override: Option<&str>,
155) {
156 let provider = match model {
157 EmbeddingModelChoice::OpenAILarge
158 | EmbeddingModelChoice::OpenAISmall
159 | EmbeddingModelChoice::OpenAIAda => "openai",
160 EmbeddingModelChoice::Nvidia => "nvidia",
161 _ => "fastembed",
162 };
163
164 let model_id = match model {
165 EmbeddingModelChoice::Nvidia => model_id_override
166 .map(str::trim)
167 .filter(|value| !value.is_empty())
168 .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
169 .filter(|value| !value.is_empty())
170 .map(|value| {
171 if value.contains('/') {
172 value.to_string()
173 } else {
174 format!("nvidia/{value}")
175 }
176 })
177 .unwrap_or_else(|| model.canonical_model_id().to_string()),
178 _ => model.canonical_model_id().to_string(),
179 };
180
181 options.extra_metadata.insert(
182 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
183 provider.to_string(),
184 );
185 options.extra_metadata.insert(
186 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
187 model_id,
188 );
189 options.extra_metadata.insert(
190 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
191 dimension.to_string(),
192 );
193}
194
195#[cfg(feature = "llama-cpp")]
198fn get_local_contextual_model_path() -> Result<PathBuf> {
199 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
200 let expanded = if custom.starts_with("~/") {
202 if let Ok(home) = env::var("HOME") {
203 PathBuf::from(home).join(&custom[2..])
204 } else {
205 PathBuf::from(&custom)
206 }
207 } else {
208 PathBuf::from(&custom)
209 };
210 expanded
211 } else {
212 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
214 PathBuf::from(home).join(".memvid").join("models")
215 };
216
217 let model_path = models_dir
218 .join("llm")
219 .join("phi-3-mini-q4")
220 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
221
222 if model_path.exists() {
223 Ok(model_path)
224 } else {
225 Err(anyhow!(
226 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
227 model_path.display()
228 ))
229 }
230}
231
232use pathdiff::diff_paths;
233
234use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
235use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
236use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
237use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
238use crate::error::{CapacityExceededMessage, DuplicateUriError};
239use crate::utils::{
240 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
241 select_frame,
242};
243#[cfg(feature = "temporal_enrich")]
244use memvid_core::enrich_chunks as temporal_enrich_chunks;
245
246fn parse_bool_flag(value: &str) -> Option<bool> {
248 match value.trim().to_ascii_lowercase().as_str() {
249 "1" | "true" | "yes" | "on" => Some(true),
250 "0" | "false" | "no" | "off" => Some(false),
251 _ => None,
252 }
253}
254
255#[cfg(feature = "parallel_segments")]
257fn parallel_env_override() -> Option<bool> {
258 std::env::var("MEMVID_PARALLEL_SEGMENTS")
259 .ok()
260 .and_then(|value| parse_bool_flag(&value))
261}
262
263#[cfg(feature = "clip")]
265#[allow(dead_code)]
266fn is_clip_model_installed() -> bool {
267 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
268 PathBuf::from(custom)
269 } else if let Ok(home) = env::var("HOME") {
270 PathBuf::from(home).join(".memvid/models")
271 } else {
272 PathBuf::from(".memvid/models")
273 };
274
275 let model_name =
276 env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
277
278 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
279 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
280
281 vision_model.exists() && text_model.exists()
282}
283
284#[cfg(feature = "logic_mesh")]
287const NER_MIN_CONFIDENCE: f32 = 0.50;
288
289#[cfg(feature = "logic_mesh")]
291const NER_MIN_ENTITY_LEN: usize = 2;
292
293#[cfg(feature = "logic_mesh")]
296const MAX_MERGE_GAP: usize = 3;
297
298#[cfg(feature = "logic_mesh")]
305fn merge_adjacent_entities(
306 entities: Vec<memvid_core::ExtractedEntity>,
307 original_text: &str,
308) -> Vec<memvid_core::ExtractedEntity> {
309 use memvid_core::ExtractedEntity;
310
311 if entities.is_empty() {
312 return entities;
313 }
314
315 let mut sorted: Vec<ExtractedEntity> = entities;
317 sorted.sort_by_key(|e| e.byte_start);
318
319 let mut merged: Vec<ExtractedEntity> = Vec::new();
320
321 for entity in sorted {
322 if let Some(last) = merged.last_mut() {
323 let gap = entity.byte_start.saturating_sub(last.byte_end);
325 let same_type = last.entity_type == entity.entity_type;
326
327 let gap_is_mergeable = if gap == 0 {
329 true
330 } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
331 let gap_text = &original_text[last.byte_end..entity.byte_start];
333 if gap_text.contains('\n') || gap_text.contains('\r') {
335 false
336 } else {
337 gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
338 }
339 } else {
340 false
341 };
342
343 if same_type && gap_is_mergeable {
344 let merged_text = if entity.byte_end <= original_text.len() {
346 original_text[last.byte_start..entity.byte_end].to_string()
347 } else {
348 format!("{}{}", last.text, entity.text)
349 };
350
351 tracing::debug!(
352 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
353 last.text, entity.text, merged_text, gap
354 );
355
356 last.text = merged_text;
357 last.byte_end = entity.byte_end;
358 last.confidence = (last.confidence + entity.confidence) / 2.0;
360 continue;
361 }
362 }
363
364 merged.push(entity);
365 }
366
367 merged
368}
369
370#[cfg(feature = "logic_mesh")]
373fn is_valid_entity(text: &str, confidence: f32) -> bool {
374 if confidence < NER_MIN_CONFIDENCE {
376 return false;
377 }
378
379 let trimmed = text.trim();
380
381 if trimmed.len() < NER_MIN_ENTITY_LEN {
383 return false;
384 }
385
386 if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
388 return false;
389 }
390
391 if trimmed.contains('\n') || trimmed.contains('\r') {
393 return false;
394 }
395
396 if trimmed.starts_with("##") || trimmed.ends_with("##") {
398 return false;
399 }
400
401 if trimmed.ends_with('.') {
403 return false;
404 }
405
406 let lower = trimmed.to_lowercase();
407
408 if trimmed.len() == 1 {
410 return false;
411 }
412
413 if trimmed.len() == 2 {
415 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
417 return true;
418 }
419 return false;
421 }
422
423 if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
425 return false;
426 }
427
428 let words: Vec<&str> = trimmed.split_whitespace().collect();
431 if words.len() >= 2 {
432 let first_word = words[0];
433 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
435 if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
437 return false;
438 }
439 }
440 }
441
442 if let Some(first_char) = trimmed.chars().next() {
444 if first_char.is_lowercase() {
445 return false;
446 }
447 }
448
449 true
450}
451
452pub fn parse_key_val(s: &str) -> Result<(String, String)> {
454 let (key, value) = s
455 .split_once('=')
456 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
457 Ok((key.to_string(), value.to_string()))
458}
459
460#[derive(Args, Clone, Copy, Debug)]
462pub struct LockCliArgs {
463 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
465 pub lock_timeout: u64,
466 #[arg(long = "force", action = ArgAction::SetTrue)]
468 pub force: bool,
469}
470
471#[derive(Args)]
473pub struct PutArgs {
474 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
476 pub file: PathBuf,
477 #[arg(long)]
479 pub json: bool,
480 #[arg(long, value_name = "PATH")]
482 pub input: Option<String>,
483 #[arg(long, value_name = "URI")]
485 pub uri: Option<String>,
486 #[arg(long, value_name = "TITLE")]
488 pub title: Option<String>,
489 #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
493 pub timestamp: Option<i64>,
494 #[arg(long)]
496 pub track: Option<String>,
497 #[arg(long)]
499 pub kind: Option<String>,
500 #[arg(long, action = ArgAction::SetTrue)]
502 pub video: bool,
503 #[arg(long, action = ArgAction::SetTrue)]
505 pub transcript: bool,
506 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
508 pub tags: Vec<(String, String)>,
509 #[arg(long = "label", value_name = "LABEL")]
511 pub labels: Vec<String>,
512 #[arg(long = "metadata", value_name = "JSON")]
514 pub metadata: Vec<String>,
515 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
517 pub no_auto_tag: bool,
518 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
520 pub no_extract_dates: bool,
521 #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
523 pub no_extract_triplets: bool,
524 #[arg(long, action = ArgAction::SetTrue)]
526 pub audio: bool,
527 #[arg(long, action = ArgAction::SetTrue)]
530 pub transcribe: bool,
531 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
533 pub audio_segment_seconds: Option<u32>,
534 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
537 pub embedding: bool,
538 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
540 pub no_embedding: bool,
541 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
544 pub embedding_vec: Option<PathBuf>,
545 #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
553 pub embedding_vec_model: Option<String>,
554 #[arg(long, action = ArgAction::SetTrue)]
558 pub vector_compression: bool,
559 #[arg(long, action = ArgAction::SetTrue)]
563 pub contextual: bool,
564 #[arg(long = "contextual-model", value_name = "MODEL")]
566 pub contextual_model: Option<String>,
567 #[arg(long, action = ArgAction::SetTrue)]
570 pub tables: bool,
571 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
573 pub no_tables: bool,
574 #[arg(long = "extraction-budget-ms", value_name = "MS")]
578 pub extraction_budget_ms: Option<u64>,
579 #[cfg(feature = "clip")]
583 #[arg(long, action = ArgAction::SetTrue)]
584 pub clip: bool,
585
586 #[cfg(feature = "clip")]
588 #[arg(long, action = ArgAction::SetTrue, hide = true)]
589 pub no_clip: bool,
590
591 #[arg(long, conflicts_with = "allow_duplicate")]
593 pub update_existing: bool,
594 #[arg(long)]
596 pub allow_duplicate: bool,
597
598 #[arg(long, action = ArgAction::SetTrue)]
602 pub temporal_enrich: bool,
603
604 #[arg(long = "memory-id", value_name = "ID")]
607 pub memory_id: Option<crate::commands::tickets::MemoryId>,
608
609 #[arg(long, action = ArgAction::SetTrue)]
614 pub logic_mesh: bool,
615
616 #[cfg(feature = "parallel_segments")]
618 #[arg(long, action = ArgAction::SetTrue)]
619 pub parallel_segments: bool,
620 #[cfg(feature = "parallel_segments")]
622 #[arg(long, action = ArgAction::SetTrue)]
623 pub no_parallel_segments: bool,
624 #[cfg(feature = "parallel_segments")]
626 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
627 pub parallel_segment_tokens: Option<usize>,
628 #[cfg(feature = "parallel_segments")]
630 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
631 pub parallel_segment_pages: Option<usize>,
632 #[cfg(feature = "parallel_segments")]
634 #[arg(long = "parallel-threads", value_name = "N")]
635 pub parallel_threads: Option<usize>,
636 #[cfg(feature = "parallel_segments")]
638 #[arg(long = "parallel-queue-depth", value_name = "N")]
639 pub parallel_queue_depth: Option<usize>,
640
641 #[arg(long = "raw", action = ArgAction::SetTrue)]
645 pub raw: bool,
646
647 #[arg(long = "no-raw", action = ArgAction::SetTrue, hide = true)]
651 pub no_raw: bool,
652
653 #[arg(long, action = ArgAction::SetTrue)]
656 pub dedup: bool,
657
658 #[arg(long, action = ArgAction::SetTrue, default_value_t = true)]
661 pub enrich: bool,
662
663 #[arg(long = "no-enrich", action = ArgAction::SetTrue)]
665 pub no_enrich: bool,
666
667 #[command(flatten)]
668 pub lock: LockCliArgs,
669}
670
671#[cfg(feature = "parallel_segments")]
672impl PutArgs {
673 pub fn wants_parallel(&self) -> bool {
674 if self.parallel_segments {
675 return true;
676 }
677 if self.no_parallel_segments {
678 return false;
679 }
680 if let Some(env_flag) = parallel_env_override() {
681 return env_flag;
682 }
683 false
684 }
685
686 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
687 let mut opts = memvid_core::BuildOpts::default();
688 if let Some(tokens) = self.parallel_segment_tokens {
689 opts.segment_tokens = tokens;
690 }
691 if let Some(pages) = self.parallel_segment_pages {
692 opts.segment_pages = pages;
693 }
694 if let Some(threads) = self.parallel_threads {
695 opts.threads = threads;
696 }
697 if let Some(depth) = self.parallel_queue_depth {
698 opts.queue_depth = depth;
699 }
700 opts.sanitize();
701 opts
702 }
703}
704
705#[cfg(not(feature = "parallel_segments"))]
706impl PutArgs {
707 pub fn wants_parallel(&self) -> bool {
708 false
709 }
710}
711
712#[derive(Args)]
714pub struct ApiFetchArgs {
715 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
717 pub file: PathBuf,
718 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
720 pub config: PathBuf,
721 #[arg(long, action = ArgAction::SetTrue)]
723 pub dry_run: bool,
724 #[arg(long, value_name = "MODE")]
726 pub mode: Option<ApiFetchMode>,
727 #[arg(long, value_name = "URI")]
729 pub uri: Option<String>,
730 #[arg(long, action = ArgAction::SetTrue)]
732 pub json: bool,
733
734 #[command(flatten)]
735 pub lock: LockCliArgs,
736}
737
738#[derive(Args)]
740pub struct UpdateArgs {
741 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
742 pub file: PathBuf,
743 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
744 pub frame_id: Option<u64>,
745 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
746 pub uri: Option<String>,
747 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
748 pub input: Option<PathBuf>,
749 #[arg(long = "set-uri", value_name = "URI")]
750 pub set_uri: Option<String>,
751 #[arg(long, value_name = "TITLE")]
752 pub title: Option<String>,
753 #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
755 pub timestamp: Option<i64>,
756 #[arg(long, value_name = "TRACK")]
757 pub track: Option<String>,
758 #[arg(long, value_name = "KIND")]
759 pub kind: Option<String>,
760 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
761 pub tags: Vec<(String, String)>,
762 #[arg(long = "label", value_name = "LABEL")]
763 pub labels: Vec<String>,
764 #[arg(long = "metadata", value_name = "JSON")]
765 pub metadata: Vec<String>,
766 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
767 pub embeddings: bool,
768 #[arg(long)]
769 pub json: bool,
770
771 #[command(flatten)]
772 pub lock: LockCliArgs,
773}
774
775#[derive(Args)]
777pub struct DeleteArgs {
778 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
779 pub file: PathBuf,
780 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
781 pub frame_id: Option<u64>,
782 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
783 pub uri: Option<String>,
784 #[arg(long, action = ArgAction::SetTrue)]
785 pub yes: bool,
786 #[arg(long)]
787 pub json: bool,
788
789 #[command(flatten)]
790 pub lock: LockCliArgs,
791}
792
793pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
795 let command = ApiFetchCommand {
796 file: args.file,
797 config_path: args.config,
798 dry_run: args.dry_run,
799 mode_override: args.mode,
800 uri_override: args.uri,
801 output_json: args.json,
802 lock_timeout_ms: args.lock.lock_timeout,
803 force_lock: args.lock.force,
804 };
805 run_api_fetch(config, command)
806}
807
808pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
810 let mut mem = Memvid::open(&args.file)?;
811 ensure_cli_mutation_allowed(&mem)?;
812 apply_lock_cli(&mut mem, &args.lock);
813 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
814
815 if !args.yes {
816 if let Some(uri) = &frame.uri {
817 println!("About to delete frame {} ({uri})", frame.id);
818 } else {
819 println!("About to delete frame {}", frame.id);
820 }
821 print!("Confirm? [y/N]: ");
822 io::stdout().flush()?;
823 let mut input = String::new();
824 io::stdin().read_line(&mut input)?;
825 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
826 if !confirmed {
827 println!("Aborted");
828 return Ok(());
829 }
830 }
831
832 let seq = mem.delete_frame(frame.id)?;
833 mem.commit()?;
834 let deleted = mem.frame_by_id(frame.id)?;
835
836 if args.json {
837 let json = json!({
838 "frame_id": frame.id,
839 "sequence": seq,
840 "status": frame_status_str(deleted.status),
841 });
842 println!("{}", serde_json::to_string_pretty(&json)?);
843 } else {
844 println!("Deleted frame {} (seq {})", frame.id, seq);
845 }
846
847 Ok(())
848}
849pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
850 crate::utils::require_active_plan(config, "put")?;
852
853 let mut mem = Memvid::open(&args.file)?;
854 ensure_cli_mutation_allowed(&mem)?;
855 apply_lock_cli(&mut mem, &args.lock);
856
857 if let Some(memory_id) = &args.memory_id {
859 let needs_binding = match mem.get_memory_binding() {
860 Some(binding) => binding.memory_id != **memory_id,
861 None => true,
862 };
863 if needs_binding {
864 match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
865 Ok((bound_id, capacity)) => {
866 eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
867 }
868 Err(e) => {
869 eprintln!("⚠️ Failed to bind to dashboard memory: {}", e);
870 eprintln!(" Continuing with existing capacity. Bind manually with:");
871 eprintln!(" memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
872 }
873 }
874 }
875 }
876
877 #[cfg(feature = "replay")]
879 let _ = mem.load_active_session();
880 let mut stats = mem.stats()?;
881 if stats.frame_count == 0 && !stats.has_lex_index {
882 mem.enable_lex()?;
883 stats = mem.stats()?;
884 }
885
886 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
889
890 if args.vector_compression {
892 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
893 }
894
895 let mut capacity_guard = CapacityGuard::from_stats(&stats);
896 let use_parallel = args.wants_parallel();
897 #[cfg(feature = "parallel_segments")]
898 let mut parallel_opts = if use_parallel {
899 Some(args.sanitized_parallel_opts())
900 } else {
901 None
902 };
903 #[cfg(feature = "parallel_segments")]
904 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
905 #[cfg(feature = "parallel_segments")]
906 let mut pending_parallel_indices: Vec<usize> = Vec::new();
907 let input_set = resolve_inputs(args.input.as_deref())?;
908 if args.embedding && args.embedding_vec.is_some() {
909 anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
910 }
911 if args.embedding_vec.is_some() {
912 match &input_set {
913 InputSet::Files(paths) if paths.len() != 1 => {
914 anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
915 }
916 _ => {}
917 }
918 }
919
920 if args.video {
921 if let Some(kind) = &args.kind {
922 if !kind.eq_ignore_ascii_case("video") {
923 anyhow::bail!("--video conflicts with explicit --kind={kind}");
924 }
925 }
926 if matches!(input_set, InputSet::Stdin) {
927 anyhow::bail!("--video requires --input <file>");
928 }
929 }
930
931 #[allow(dead_code)]
932 enum EmbeddingMode {
933 None,
934 Auto,
935 Explicit,
936 }
937
938 let mv2_dimension = mem.effective_vec_index_dimension()?;
942 let inferred_embedding_model = if args.embedding {
943 match mem.embedding_identity_summary(10_000) {
944 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
945 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
946 let models: Vec<_> = identities
947 .iter()
948 .filter_map(|entry| entry.identity.model.as_deref())
949 .collect();
950 anyhow::bail!(
951 "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
952 Detected models: {:?}\n\n\
953 Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
954 models
955 );
956 }
957 memvid_core::EmbeddingIdentitySummary::Unknown => None,
958 }
959 } else {
960 None
961 };
962 let (embedding_mode, embedding_runtime) = if args.embedding {
963 (
964 EmbeddingMode::Explicit,
965 Some(load_embedding_runtime_for_mv2(
966 config,
967 inferred_embedding_model.as_deref(),
968 mv2_dimension,
969 )?),
970 )
971 } else if args.embedding_vec.is_some() {
972 (EmbeddingMode::Explicit, None)
973 } else {
974 (EmbeddingMode::None, None)
975 };
976 let embedding_enabled = args.embedding || args.embedding_vec.is_some();
977 let runtime_ref = embedding_runtime.as_ref();
978
979 #[cfg(feature = "clip")]
982 let clip_enabled = args.clip;
983 #[cfg(feature = "clip")]
984 let clip_model = if clip_enabled {
985 mem.enable_clip()?;
986 if !args.json {
987 eprintln!("ℹ️ CLIP visual embeddings enabled");
988 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
989 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
990 eprintln!();
991 }
992 match memvid_core::ClipModel::default_model() {
994 Ok(model) => Some(model),
995 Err(e) => {
996 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
997 None
998 }
999 }
1000 } else {
1001 None
1002 };
1003
1004 if embedding_enabled && !args.json {
1006 let capacity = stats.capacity_bytes;
1007 if args.vector_compression {
1008 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
1011 eprintln!(" Storage: ~20 KB per document (16x compressed)");
1012 eprintln!(" Accuracy: ~95% recall@10 maintained");
1013 eprintln!(
1014 " Capacity: ~{} documents max for {:.1} GB",
1015 max_docs,
1016 capacity as f64 / 1e9
1017 );
1018 eprintln!();
1019 } else {
1020 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
1022 eprintln!(" Storage: ~270 KB per document");
1023 eprintln!(
1024 " Capacity: ~{} documents max for {:.1} GB",
1025 max_docs,
1026 capacity as f64 / 1e9
1027 );
1028 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
1029 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
1030 eprintln!();
1031 }
1032 }
1033
1034 let embed_progress = if embedding_enabled {
1035 let total = match &input_set {
1036 InputSet::Files(paths) => Some(paths.len() as u64),
1037 InputSet::Stdin => None,
1038 };
1039 Some(create_task_progress_bar(total, "embed", false))
1040 } else {
1041 None
1042 };
1043 let mut embedded_docs = 0usize;
1044
1045 if let InputSet::Files(paths) = &input_set {
1046 if paths.len() > 1 {
1047 if args.uri.is_some() || args.title.is_some() {
1048 anyhow::bail!(
1049 "--uri/--title apply to a single file; omit them when using a directory or glob"
1050 );
1051 }
1052 }
1053 }
1054
1055 let mut reports = Vec::new();
1056 let mut processed = 0usize;
1057 let mut skipped = 0usize;
1058 let mut bytes_added = 0u64;
1059 let mut bytes_input = 0u64;
1060 let mut no_raw_count = 0usize;
1061 let mut summary_warnings: Vec<String> = Vec::new();
1062 #[cfg(feature = "clip")]
1063 let mut clip_embeddings_added = false;
1064
1065 let mut capacity_reached = false;
1066 match input_set {
1067 InputSet::Stdin => {
1068 processed += 1;
1069 match read_payload(None) {
1070 Ok(payload) => {
1071 let mut analyze_spinner = if args.json {
1072 None
1073 } else {
1074 Some(create_spinner("Analyzing stdin payload..."))
1075 };
1076 match ingest_payload(
1077 &mut mem,
1078 &args,
1079 config,
1080 payload,
1081 None,
1082 capacity_guard.as_mut(),
1083 embedding_enabled,
1084 runtime_ref,
1085 use_parallel,
1086 ) {
1087 Ok(outcome) => {
1088 if let Some(pb) = analyze_spinner.take() {
1089 pb.finish_and_clear();
1090 }
1091 bytes_added += outcome.report.stored_bytes as u64;
1092 bytes_input += outcome.report.original_bytes as u64;
1093 if outcome.report.no_raw {
1094 no_raw_count += 1;
1095 }
1096 if args.json {
1097 summary_warnings
1098 .extend(outcome.report.extraction.warnings.iter().cloned());
1099 }
1100 reports.push(outcome.report);
1101 let report_index = reports.len() - 1;
1102 if !args.json && !use_parallel {
1103 if let Some(report) = reports.get(report_index) {
1104 print_report(report);
1105 }
1106 }
1107 if args.transcript {
1108 let notice = transcript_notice_message(&mut mem)?;
1109 if args.json {
1110 summary_warnings.push(notice);
1111 } else {
1112 println!("{notice}");
1113 }
1114 }
1115 if outcome.embedded {
1116 if let Some(pb) = embed_progress.as_ref() {
1117 pb.inc(1);
1118 }
1119 embedded_docs += 1;
1120 }
1121 if use_parallel {
1122 #[cfg(feature = "parallel_segments")]
1123 if let Some(input) = outcome.parallel_input {
1124 pending_parallel_indices.push(report_index);
1125 pending_parallel_inputs.push(input);
1126 }
1127 } else if let Some(guard) = capacity_guard.as_mut() {
1128 mem.commit()?;
1129 let stats = mem.stats()?;
1130 guard.update_after_commit(&stats);
1131 guard.check_and_warn_capacity(); }
1133 }
1134 Err(err) => {
1135 if let Some(pb) = analyze_spinner.take() {
1136 pb.finish_and_clear();
1137 }
1138 if err.downcast_ref::<DuplicateUriError>().is_some() {
1139 return Err(err);
1140 }
1141 warn!("skipped stdin payload: {err}");
1142 skipped += 1;
1143 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1144 capacity_reached = true;
1145 }
1146 }
1147 }
1148 }
1149 Err(err) => {
1150 warn!("failed to read stdin: {err}");
1151 skipped += 1;
1152 }
1153 }
1154 }
1155 InputSet::Files(ref paths) => {
1156 let mut transcript_notice_printed = false;
1157 for path in paths {
1158 processed += 1;
1159 match read_payload(Some(&path)) {
1160 Ok(payload) => {
1161 let label = path.display().to_string();
1162 let mut analyze_spinner = if args.json {
1163 None
1164 } else {
1165 Some(create_spinner(&format!("Analyzing {label}...")))
1166 };
1167 match ingest_payload(
1168 &mut mem,
1169 &args,
1170 config,
1171 payload,
1172 Some(&path),
1173 capacity_guard.as_mut(),
1174 embedding_enabled,
1175 runtime_ref,
1176 use_parallel,
1177 ) {
1178 Ok(outcome) => {
1179 if let Some(pb) = analyze_spinner.take() {
1180 pb.finish_and_clear();
1181 }
1182 bytes_added += outcome.report.stored_bytes as u64;
1183 bytes_input += outcome.report.original_bytes as u64;
1184 if outcome.report.no_raw {
1185 no_raw_count += 1;
1186 }
1187
1188 #[cfg(feature = "clip")]
1190 if let Some(ref model) = clip_model {
1191 let mime = outcome.report.mime.as_deref();
1192 let clip_frame_id = outcome.report.frame_id;
1193
1194 if is_image_file(&path) {
1195 match model.encode_image_file(&path) {
1196 Ok(embedding) => {
1197 if let Err(e) = mem.add_clip_embedding_with_page(
1198 clip_frame_id,
1199 None,
1200 embedding,
1201 ) {
1202 warn!(
1203 "Failed to add CLIP embedding for {}: {e}",
1204 path.display()
1205 );
1206 } else {
1207 info!(
1208 "Added CLIP embedding for frame {}",
1209 clip_frame_id
1210 );
1211 clip_embeddings_added = true;
1212 }
1213 }
1214 Err(e) => {
1215 warn!(
1216 "Failed to encode CLIP embedding for {}: {e}",
1217 path.display()
1218 );
1219 }
1220 }
1221 } else if matches!(mime, Some(m) if m == "application/pdf") {
1222 if let Err(e) = mem.commit() {
1224 warn!("Failed to commit before CLIP extraction: {e}");
1225 }
1226
1227 match render_pdf_pages_for_clip(
1228 &path,
1229 CLIP_PDF_MAX_IMAGES,
1230 CLIP_PDF_TARGET_PX,
1231 ) {
1232 Ok(rendered_pages) => {
1233 use rayon::prelude::*;
1234
1235 let path_for_closure = path.clone();
1238
1239 let prev_hook = std::panic::take_hook();
1242 std::panic::set_hook(Box::new(|_| {
1243 }));
1245
1246 let encoded_images: Vec<_> = rendered_pages
1247 .into_par_iter()
1248 .filter_map(|(page_num, image)| {
1249 let image_info = get_image_info(&image);
1251 if !image_info.should_embed() {
1252 info!(
1253 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1254 path_for_closure.display(),
1255 page_num,
1256 image_info.color_variance,
1257 image_info.width,
1258 image_info.height
1259 );
1260 return None;
1261 }
1262
1263 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1267 let rgb_image = image.to_rgb8();
1268 let mut png_bytes = Vec::new();
1269 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1270 &mut Cursor::new(&mut png_bytes),
1271 image::ImageFormat::Png,
1272 ).map(|()| png_bytes)
1273 }));
1274
1275 match encode_result {
1276 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1277 Ok(Err(e)) => {
1278 info!(
1279 "Skipping image for {} page {}: {e}",
1280 path_for_closure.display(),
1281 page_num
1282 );
1283 None
1284 }
1285 Err(_) => {
1286 info!(
1288 "Skipping malformed image for {} page {}",
1289 path_for_closure.display(),
1290 page_num
1291 );
1292 None
1293 }
1294 }
1295 })
1296 .collect();
1297
1298 std::panic::set_hook(prev_hook);
1300
1301 let mut child_frame_offset = 1u64;
1304 let parent_uri = outcome.report.uri.clone();
1305 let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1306 let total_images = encoded_images.len();
1307
1308 let clip_pb = if !args.json && total_images > 0 {
1310 let pb = ProgressBar::new(total_images as u64);
1311 pb.set_style(
1312 ProgressStyle::with_template(
1313 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1314 )
1315 .unwrap_or_else(|_| ProgressStyle::default_bar())
1316 .progress_chars("█▓░"),
1317 );
1318 Some(pb)
1319 } else {
1320 None
1321 };
1322
1323 for (page_num, image, png_bytes) in encoded_images {
1324 let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1325 let child_title = format!(
1326 "{} - Image {} (page {})",
1327 parent_title,
1328 child_frame_offset,
1329 page_num
1330 );
1331
1332 let child_options = PutOptions::builder()
1333 .uri(&child_uri)
1334 .title(&child_title)
1335 .parent_id(clip_frame_id)
1336 .role(FrameRole::ExtractedImage)
1337 .auto_tag(false)
1338 .extract_dates(false)
1339 .build();
1340
1341 match mem.put_bytes_with_options(&png_bytes, child_options) {
1342 Ok(_child_seq) => {
1343 let child_frame_id = clip_frame_id + child_frame_offset;
1344 child_frame_offset += 1;
1345
1346 match model.encode_image(&image) {
1347 Ok(embedding) => {
1348 if let Err(e) = mem
1349 .add_clip_embedding_with_page(
1350 child_frame_id,
1351 Some(page_num),
1352 embedding,
1353 )
1354 {
1355 warn!(
1356 "Failed to add CLIP embedding for {} page {}: {e}",
1357 path.display(),
1358 page_num
1359 );
1360 } else {
1361 info!(
1362 "Added CLIP image frame {} for {} page {}",
1363 child_frame_id,
1364 clip_frame_id,
1365 page_num
1366 );
1367 clip_embeddings_added = true;
1368 }
1369 }
1370 Err(e) => warn!(
1371 "Failed to encode CLIP embedding for {} page {}: {e}",
1372 path.display(),
1373 page_num
1374 ),
1375 }
1376
1377 if let Err(e) = mem.commit() {
1379 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1380 }
1381 }
1382 Err(e) => warn!(
1383 "Failed to store image frame for {} page {}: {e}",
1384 path.display(),
1385 page_num
1386 ),
1387 }
1388
1389 if let Some(ref pb) = clip_pb {
1390 pb.inc(1);
1391 }
1392 }
1393
1394 if let Some(pb) = clip_pb {
1395 pb.finish_and_clear();
1396 }
1397 }
1398 Err(err) => warn!(
1399 "Failed to render PDF pages for CLIP {}: {err}",
1400 path.display()
1401 ),
1402 }
1403 }
1404 }
1405
1406 if args.json {
1407 summary_warnings
1408 .extend(outcome.report.extraction.warnings.iter().cloned());
1409 }
1410 reports.push(outcome.report);
1411 let report_index = reports.len() - 1;
1412 if !args.json && !use_parallel {
1413 if let Some(report) = reports.get(report_index) {
1414 print_report(report);
1415 }
1416 }
1417 if args.transcript && !transcript_notice_printed {
1418 let notice = transcript_notice_message(&mut mem)?;
1419 if args.json {
1420 summary_warnings.push(notice);
1421 } else {
1422 println!("{notice}");
1423 }
1424 transcript_notice_printed = true;
1425 }
1426 if outcome.embedded {
1427 if let Some(pb) = embed_progress.as_ref() {
1428 pb.inc(1);
1429 }
1430 embedded_docs += 1;
1431 }
1432 if use_parallel {
1433 #[cfg(feature = "parallel_segments")]
1434 if let Some(input) = outcome.parallel_input {
1435 pending_parallel_indices.push(report_index);
1436 pending_parallel_inputs.push(input);
1437 }
1438 } else if let Some(guard) = capacity_guard.as_mut() {
1439 mem.commit()?;
1440 let stats = mem.stats()?;
1441 guard.update_after_commit(&stats);
1442 guard.check_and_warn_capacity(); }
1444 }
1445 Err(err) => {
1446 if let Some(pb) = analyze_spinner.take() {
1447 pb.finish_and_clear();
1448 }
1449 if err.downcast_ref::<DuplicateUriError>().is_some() {
1450 return Err(err);
1451 }
1452 warn!("skipped {}: {err}", path.display());
1453 skipped += 1;
1454 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1455 capacity_reached = true;
1456 }
1457 }
1458 }
1459 }
1460 Err(err) => {
1461 warn!("failed to read {}: {err}", path.display());
1462 skipped += 1;
1463 }
1464 }
1465 if capacity_reached {
1466 break;
1467 }
1468 }
1469 }
1470 }
1471
1472 if capacity_reached {
1473 warn!("capacity limit reached; remaining inputs were not processed");
1474 }
1475
1476 if let Some(pb) = embed_progress.as_ref() {
1477 pb.finish_with_message("embed");
1478 }
1479
1480 if let Some(runtime) = embedding_runtime.as_ref() {
1481 if embedded_docs > 0 {
1482 match embedding_mode {
1483 EmbeddingMode::Explicit => println!(
1484 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1485 embedded_docs,
1486 runtime.dimension()
1487 ),
1488 EmbeddingMode::Auto => println!(
1489 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1490 embedded_docs,
1491 runtime.dimension()
1492 ),
1493 EmbeddingMode::None => {}
1494 }
1495 } else {
1496 match embedding_mode {
1497 EmbeddingMode::Explicit => {
1498 println!("No embeddings were generated (no textual content available)");
1499 }
1500 EmbeddingMode::Auto => {
1501 println!(
1502 "Semantic runtime available, but no textual content produced embeddings"
1503 );
1504 }
1505 EmbeddingMode::None => {}
1506 }
1507 }
1508 }
1509
1510 if reports.is_empty() {
1511 if args.json {
1512 if capacity_reached {
1513 summary_warnings.push("capacity_limit_reached".to_string());
1514 }
1515 let summary_json = json!({
1516 "processed": processed,
1517 "ingested": 0,
1518 "skipped": skipped,
1519 "bytes_added": bytes_added,
1520 "bytes_added_human": format_bytes(bytes_added),
1521 "embedded_documents": embedded_docs,
1522 "capacity_reached": capacity_reached,
1523 "warnings": summary_warnings,
1524 "reports": Vec::<serde_json::Value>::new(),
1525 });
1526 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1527 } else {
1528 println!(
1529 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1530 processed, skipped
1531 );
1532 }
1533 return Ok(());
1534 }
1535
1536 #[cfg(feature = "parallel_segments")]
1537 let mut parallel_flush_spinner = if use_parallel && !args.json {
1538 Some(create_spinner("Flushing parallel segments..."))
1539 } else {
1540 None
1541 };
1542
1543 if use_parallel {
1544 #[cfg(feature = "parallel_segments")]
1545 {
1546 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1547 let mut opts = BuildOpts::default();
1548 opts.sanitize();
1549 opts
1550 });
1551 opts.vec_compression = mem.vector_compression().clone();
1553 let seqs = if pending_parallel_inputs.is_empty() {
1554 mem.commit_parallel(opts)?;
1555 Vec::new()
1556 } else {
1557 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1558 };
1559 if let Some(pb) = parallel_flush_spinner.take() {
1560 pb.finish_with_message("Flushed parallel segments");
1561 }
1562 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1563 if let Some(report) = reports.get_mut(idx) {
1564 report.seq = seq;
1565 }
1566 }
1567 }
1568 #[cfg(not(feature = "parallel_segments"))]
1569 {
1570 mem.commit()?;
1571 }
1572 } else {
1573 mem.commit()?;
1574 }
1575
1576 #[cfg(feature = "clip")]
1578 if clip_embeddings_added {
1579 mem.commit()?;
1580 }
1581
1582 if !args.json && use_parallel {
1583 for report in &reports {
1584 print_report(report);
1585 }
1586 }
1587
1588 let mut tables_extracted = 0usize;
1590 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1591 if args.tables && !args.no_tables {
1592 if let InputSet::Files(ref paths) = input_set {
1593 let pdf_paths: Vec<_> = paths
1594 .iter()
1595 .filter(|p| {
1596 p.extension()
1597 .and_then(|e| e.to_str())
1598 .map(|e| e.eq_ignore_ascii_case("pdf"))
1599 .unwrap_or(false)
1600 })
1601 .collect();
1602
1603 if !pdf_paths.is_empty() {
1604 let table_spinner = if args.json {
1605 None
1606 } else {
1607 Some(create_spinner(&format!(
1608 "Extracting tables from {} PDF(s)...",
1609 pdf_paths.len()
1610 )))
1611 };
1612
1613 let table_options = TableExtractionOptions::builder()
1615 .mode(memvid_core::table::ExtractionMode::Aggressive)
1616 .min_rows(2)
1617 .min_cols(2)
1618 .min_quality(memvid_core::table::TableQuality::Low)
1619 .merge_multi_page(true)
1620 .build();
1621
1622 for pdf_path in pdf_paths {
1623 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1624 let filename = pdf_path
1625 .file_name()
1626 .and_then(|s| s.to_str())
1627 .unwrap_or("unknown.pdf");
1628
1629 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1630 for table in &result.tables {
1631 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1632 {
1633 tables_extracted += 1;
1634 table_summaries.push(json!({
1635 "table_id": table.table_id,
1636 "source_file": table.source_file,
1637 "meta_frame_id": meta_id,
1638 "rows": table.n_rows,
1639 "cols": table.n_cols,
1640 "quality": format!("{:?}", table.quality),
1641 "pages": format!("{}-{}", table.page_start, table.page_end),
1642 }));
1643 }
1644 }
1645 }
1646 }
1647 }
1648
1649 if let Some(pb) = table_spinner {
1650 if tables_extracted > 0 {
1651 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1652 } else {
1653 pb.finish_with_message("No tables found");
1654 }
1655 }
1656
1657 if tables_extracted > 0 {
1659 mem.commit()?;
1660 }
1661 }
1662 }
1663 }
1664
1665 #[cfg(feature = "logic_mesh")]
1668 let mut logic_mesh_entities = 0usize;
1669 #[cfg(feature = "logic_mesh")]
1670 {
1671 use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1672
1673 let models_dir = config.models_dir.clone();
1674 let model_path = ner_model_path(&models_dir);
1675 let tokenizer_path = ner_tokenizer_path(&models_dir);
1676
1677 let ner_available = model_path.exists() && tokenizer_path.exists();
1679 let should_run_ner = args.logic_mesh;
1680
1681 if should_run_ner && !ner_available {
1682 if args.json {
1684 summary_warnings.push(
1685 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1686 );
1687 } else {
1688 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1689 eprintln!(" Run: memvid models install --ner distilbert-ner");
1690 }
1691 } else if should_run_ner {
1692 let ner_spinner = if args.json {
1693 None
1694 } else {
1695 Some(create_spinner("Building Logic-Mesh entity graph..."))
1696 };
1697
1698 match NerModel::load(&model_path, &tokenizer_path, None) {
1699 Ok(mut ner_model) => {
1700 let mut mesh = LogicMesh::new();
1701 let mut filtered_count = 0usize;
1702
1703 for report in &reports {
1705 let frame_id = report.frame_id;
1706 if let Some(ref content) = report.search_text {
1708 if !content.is_empty() {
1709 match ner_model.extract(content) {
1710 Ok(raw_entities) => {
1711 let merged_entities = merge_adjacent_entities(raw_entities, content);
1714
1715 for entity in &merged_entities {
1716 if !is_valid_entity(&entity.text, entity.confidence) {
1718 tracing::debug!(
1719 "Filtered entity: '{}' (confidence: {:.0}%)",
1720 entity.text, entity.confidence * 100.0
1721 );
1722 filtered_count += 1;
1723 continue;
1724 }
1725
1726 let kind = entity.to_entity_kind();
1728 let node = MeshNode::new(
1730 entity.text.to_lowercase(),
1731 entity.text.trim().to_string(),
1732 kind,
1733 entity.confidence,
1734 frame_id,
1735 entity.byte_start as u32,
1736 (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1737 );
1738 mesh.merge_node(node);
1739 logic_mesh_entities += 1;
1740 }
1741 }
1742 Err(e) => {
1743 warn!("NER extraction failed for frame {frame_id}: {e}");
1744 }
1745 }
1746 }
1747 }
1748 }
1749
1750 if logic_mesh_entities > 0 {
1751 mesh.finalize();
1752 let node_count = mesh.stats().node_count;
1753 mem.set_logic_mesh(mesh);
1755 mem.commit()?;
1756 info!(
1757 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1758 logic_mesh_entities,
1759 node_count,
1760 filtered_count
1761 );
1762 }
1763
1764 if let Some(pb) = ner_spinner {
1765 pb.finish_with_message(format!(
1766 "Logic-Mesh: {} entities ({} unique)",
1767 logic_mesh_entities,
1768 mem.mesh_node_count()
1769 ));
1770 }
1771 }
1772 Err(e) => {
1773 if let Some(pb) = ner_spinner {
1774 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1775 }
1776 if args.json {
1777 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1778 }
1779 }
1780 }
1781 }
1782 }
1783
1784 let mut enrichment_cards = 0usize;
1786 if args.enrich && !args.no_enrich && !reports.is_empty() {
1787 let enrich_spinner = if args.json {
1788 None
1789 } else {
1790 Some(create_spinner(&format!(
1791 "Extracting memories from {} frame(s)...",
1792 reports.len()
1793 )))
1794 };
1795
1796 let mut rules_engine = RulesEngine::new();
1797 rules_engine.init().ok(); let frame_ids: Vec<u64> = reports.iter().map(|r| r.frame_id).collect();
1800 for frame_id in &frame_ids {
1801 if let Ok(frame) = mem.frame_by_id(*frame_id) {
1802 let text = frame.search_text.as_deref().unwrap_or("");
1803 if !text.is_empty() {
1804 let ctx = memvid_core::enrich::EnrichmentContext::new(
1805 *frame_id,
1806 frame.uri.clone().unwrap_or_default(),
1807 text.to_string(),
1808 frame.title.clone(),
1809 frame.timestamp,
1810 None,
1811 );
1812 let result = rules_engine.enrich(&ctx);
1813 if !result.cards.is_empty() {
1814 for card in result.cards {
1815 if mem.put_memory_card(card).is_ok() {
1816 enrichment_cards += 1;
1817 }
1818 }
1819 }
1820 }
1821 }
1822 }
1823
1824 if enrichment_cards > 0 {
1825 mem.commit()?;
1826 }
1827
1828 if let Some(pb) = enrich_spinner {
1829 if enrichment_cards > 0 {
1830 pb.finish_with_message(format!("Extracted {} memory card(s)", enrichment_cards));
1831 } else {
1832 pb.finish_with_message("No memories extracted");
1833 }
1834 }
1835 }
1836
1837 let stats = mem.stats()?;
1838 if args.json {
1839 if capacity_reached {
1840 summary_warnings.push("capacity_limit_reached".to_string());
1841 }
1842 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1843 let total_duration_ms: Option<u64> = {
1844 let sum: u64 = reports
1845 .iter()
1846 .filter_map(|r| r.extraction.duration_ms)
1847 .sum();
1848 if sum > 0 {
1849 Some(sum)
1850 } else {
1851 None
1852 }
1853 };
1854 let total_pages: Option<u32> = {
1855 let sum: u32 = reports
1856 .iter()
1857 .filter_map(|r| r.extraction.pages_processed)
1858 .sum();
1859 if sum > 0 {
1860 Some(sum)
1861 } else {
1862 None
1863 }
1864 };
1865 let embedding_mode_str = match embedding_mode {
1866 EmbeddingMode::None => "none",
1867 EmbeddingMode::Auto => "auto",
1868 EmbeddingMode::Explicit => "explicit",
1869 };
1870 let reduction_percent = if bytes_input > 0 && bytes_input > bytes_added {
1871 Some(((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0)
1872 } else {
1873 None
1874 };
1875 let summary_json = json!({
1876 "processed": processed,
1877 "ingested": reports.len(),
1878 "skipped": skipped,
1879 "bytes_input": bytes_input,
1880 "bytes_input_human": format_bytes(bytes_input),
1881 "bytes_stored": bytes_added,
1882 "bytes_stored_human": format_bytes(bytes_added),
1883 "reduction_percent": reduction_percent,
1884 "embedded_documents": embedded_docs,
1885 "embedding": {
1886 "enabled": embedding_enabled,
1887 "mode": embedding_mode_str,
1888 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1889 },
1890 "tables": {
1891 "enabled": args.tables && !args.no_tables,
1892 "extracted": tables_extracted,
1893 "tables": table_summaries,
1894 },
1895 "enrichment": {
1896 "enabled": args.enrich && !args.no_enrich,
1897 "cards_extracted": enrichment_cards,
1898 },
1899 "capacity_reached": capacity_reached,
1900 "warnings": summary_warnings,
1901 "extraction": {
1902 "total_duration_ms": total_duration_ms,
1903 "total_pages_processed": total_pages,
1904 },
1905 "memory": {
1906 "path": args.file.display().to_string(),
1907 "frame_count": stats.frame_count,
1908 "size_bytes": stats.size_bytes,
1909 "tier": format!("{:?}", stats.tier),
1910 },
1911 "reports": reports_json,
1912 });
1913 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1914 } else {
1915 println!(
1916 "Committed {} frame(s); total frames {}",
1917 reports.len(),
1918 stats.frame_count
1919 );
1920 println!(
1921 "Summary: processed {}, ingested {}, skipped {}",
1922 processed,
1923 reports.len(),
1924 skipped
1925 );
1926 if bytes_input > 0 {
1929 let actual_stored = stats.payload_bytes;
1930 if actual_stored < bytes_input && no_raw_count > 0 {
1931 let reduction = ((bytes_input - actual_stored) as f64 / bytes_input as f64) * 100.0;
1933 println!(
1934 "Storage: {} input → {} stored ({:.1}% smaller, text extracted)",
1935 format_bytes(bytes_input),
1936 format_bytes(actual_stored),
1937 reduction
1938 );
1939 } else if bytes_added < bytes_input {
1940 let reduction = ((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0;
1942 println!(
1943 "Storage: {} input → {} stored ({:.1}% smaller)",
1944 format_bytes(bytes_input),
1945 format_bytes(bytes_added),
1946 reduction
1947 );
1948 } else {
1949 println!("Storage: {} added", format_bytes(bytes_added));
1950 }
1951 }
1952 if tables_extracted > 0 {
1953 println!("Tables: {} extracted from PDF(s)", tables_extracted);
1954 }
1955 if enrichment_cards > 0 {
1956 println!("Enrichment: {} memory card(s) extracted", enrichment_cards);
1957 }
1958 }
1959
1960 #[cfg(feature = "replay")]
1962 let _ = mem.save_active_session();
1963
1964 Ok(())
1965}
1966
1967pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1968 let mut mem = Memvid::open(&args.file)?;
1969 ensure_cli_mutation_allowed(&mem)?;
1970 apply_lock_cli(&mut mem, &args.lock);
1971 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1972 let frame_id = existing.id;
1973
1974 let payload_bytes = match args.input.as_ref() {
1975 Some(path) => Some(read_payload(Some(path))?),
1976 None => None,
1977 };
1978 let payload_slice = payload_bytes.as_deref();
1979 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1980 let source_path = args.input.as_deref();
1981
1982 let mut options = PutOptions::default();
1983 options.enable_embedding = false;
1984 options.auto_tag = payload_slice.is_some();
1985 options.extract_dates = payload_slice.is_some();
1986 options.timestamp = Some(existing.timestamp);
1987 options.track = existing.track.clone();
1988 options.kind = existing.kind.clone();
1989 options.uri = existing.uri.clone();
1990 options.title = existing.title.clone();
1991 options.metadata = existing.metadata.clone();
1992 options.search_text = existing.search_text.clone();
1993 options.tags = existing.tags.clone();
1994 options.labels = existing.labels.clone();
1995 options.extra_metadata = existing.extra_metadata.clone();
1996
1997 if let Some(new_uri) = &args.set_uri {
1998 options.uri = Some(derive_uri(Some(new_uri), None));
1999 }
2000
2001 if options.uri.is_none() && payload_slice.is_some() {
2002 options.uri = Some(derive_uri(None, source_path));
2003 }
2004
2005 if let Some(title) = &args.title {
2006 options.title = Some(title.clone());
2007 }
2008 if let Some(ts) = args.timestamp {
2009 options.timestamp = Some(ts);
2010 }
2011 if let Some(track) = &args.track {
2012 options.track = Some(track.clone());
2013 }
2014 if let Some(kind) = &args.kind {
2015 options.kind = Some(kind.clone());
2016 }
2017
2018 if !args.labels.is_empty() {
2019 options.labels = args.labels.clone();
2020 }
2021
2022 if !args.tags.is_empty() {
2023 options.tags.clear();
2024 for (key, value) in &args.tags {
2025 if !key.is_empty() {
2026 options.tags.push(key.clone());
2027 }
2028 if !value.is_empty() && value != key {
2029 options.tags.push(value.clone());
2030 }
2031 options.extra_metadata.insert(key.clone(), value.clone());
2032 }
2033 }
2034
2035 apply_metadata_overrides(&mut options, &args.metadata);
2036
2037 let mut search_source = options.search_text.clone();
2038
2039 if let Some(payload) = payload_slice {
2040 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
2041 if let Some(title) = inferred_title {
2042 options.title = Some(title);
2043 }
2044
2045 if options.uri.is_none() {
2046 options.uri = Some(derive_uri(None, source_path));
2047 }
2048
2049 let analysis = analyze_file(
2050 source_path,
2051 payload,
2052 payload_utf8,
2053 options.title.as_deref(),
2054 AudioAnalyzeOptions::default(),
2055 false,
2056 );
2057 if let Some(meta) = analysis.metadata {
2058 options.metadata = Some(meta);
2059 }
2060 if let Some(text) = analysis.search_text {
2061 search_source = Some(text.clone());
2062 options.search_text = Some(text);
2063 }
2064 } else {
2065 options.auto_tag = false;
2066 options.extract_dates = false;
2067 }
2068
2069 let stats = mem.stats()?;
2070 options.enable_embedding = stats.has_vec_index;
2071
2072 let mv2_dimension = mem.effective_vec_index_dimension()?;
2074 let mut embedding: Option<Vec<f32>> = None;
2075 if args.embeddings {
2076 let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
2077 memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
2078 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
2079 let models: Vec<_> = identities
2080 .iter()
2081 .filter_map(|entry| entry.identity.model.as_deref())
2082 .collect();
2083 anyhow::bail!(
2084 "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
2085 Detected models: {:?}",
2086 models
2087 );
2088 }
2089 memvid_core::EmbeddingIdentitySummary::Unknown => None,
2090 };
2091
2092 let runtime = load_embedding_runtime_for_mv2(
2093 config,
2094 inferred_embedding_model.as_deref(),
2095 mv2_dimension,
2096 )?;
2097 if let Some(text) = search_source.as_ref() {
2098 if !text.trim().is_empty() {
2099 embedding = Some(runtime.embed_passage(text)?);
2100 }
2101 }
2102 if embedding.is_none() {
2103 if let Some(text) = payload_utf8 {
2104 if !text.trim().is_empty() {
2105 embedding = Some(runtime.embed_passage(text)?);
2106 }
2107 }
2108 }
2109 if embedding.is_none() {
2110 warn!("no textual content available; embeddings not recomputed");
2111 }
2112 if embedding.is_some() {
2113 apply_embedding_identity_metadata(&mut options, &runtime);
2114 }
2115 }
2116
2117 let mut effective_embedding = embedding;
2118 if effective_embedding.is_none() && stats.has_vec_index {
2119 effective_embedding = mem.frame_embedding(frame_id)?;
2120 }
2121
2122 let final_uri = options.uri.clone();
2123 let replaced_payload = payload_slice.is_some();
2124 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
2125 mem.commit()?;
2126
2127 let updated_frame =
2128 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
2129 mem.frame_by_uri(&uri)?
2130 } else {
2131 let mut query = TimelineQueryBuilder::default();
2132 if let Some(limit) = NonZeroU64::new(1) {
2133 query = query.limit(limit).reverse(true);
2134 }
2135 let latest = mem.timeline(query.build())?;
2136 if let Some(entry) = latest.first() {
2137 mem.frame_by_id(entry.frame_id)?
2138 } else {
2139 mem.frame_by_id(frame_id)?
2140 }
2141 };
2142
2143 if args.json {
2144 let json = json!({
2145 "sequence": seq,
2146 "previous_frame_id": frame_id,
2147 "frame": frame_to_json(&updated_frame),
2148 "replaced_payload": replaced_payload,
2149 });
2150 println!("{}", serde_json::to_string_pretty(&json)?);
2151 } else {
2152 println!(
2153 "Updated frame {} → new frame {} (seq {})",
2154 frame_id, updated_frame.id, seq
2155 );
2156 println!(
2157 "Payload: {}",
2158 if replaced_payload {
2159 "replaced"
2160 } else {
2161 "reused"
2162 }
2163 );
2164 print_frame_summary(&mut mem, &updated_frame)?;
2165 }
2166
2167 Ok(())
2168}
2169
2170fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2171 if let Some(uri) = provided {
2172 let sanitized = sanitize_uri(uri, true);
2173 return format!("mv2://{}", sanitized);
2174 }
2175
2176 if let Some(path) = source {
2177 let raw = path.to_string_lossy();
2178 let sanitized = sanitize_uri(&raw, false);
2179 return format!("mv2://{}", sanitized);
2180 }
2181
2182 format!("mv2://frames/{}", Uuid::new_v4())
2183}
2184
2185pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2186 let digest = hash(payload).to_hex();
2187 let short = &digest[..32];
2188 let ext_from_path = source
2189 .and_then(|path| path.extension())
2190 .and_then(|ext| ext.to_str())
2191 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2192 .filter(|ext| !ext.is_empty());
2193 let ext = ext_from_path
2194 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2195 .unwrap_or_else(|| "bin".to_string());
2196 format!("mv2://video/{short}.{ext}")
2197}
2198
2199fn derive_title(
2200 provided: Option<String>,
2201 source: Option<&Path>,
2202 payload_utf8: Option<&str>,
2203) -> Option<String> {
2204 if let Some(title) = provided {
2205 let trimmed = title.trim();
2206 if trimmed.is_empty() {
2207 None
2208 } else {
2209 Some(trimmed.to_string())
2210 }
2211 } else {
2212 if let Some(text) = payload_utf8 {
2213 if let Some(markdown_title) = extract_markdown_title(text) {
2214 return Some(markdown_title);
2215 }
2216 }
2217 source
2218 .and_then(|path| path.file_stem())
2219 .and_then(|stem| stem.to_str())
2220 .map(to_title_case)
2221 }
2222}
2223
2224fn extract_markdown_title(text: &str) -> Option<String> {
2225 for line in text.lines() {
2226 let trimmed = line.trim();
2227 if trimmed.starts_with('#') {
2228 let title = trimmed.trim_start_matches('#').trim();
2229 if !title.is_empty() {
2230 return Some(title.to_string());
2231 }
2232 }
2233 }
2234 None
2235}
2236
2237fn to_title_case(input: &str) -> String {
2238 if input.is_empty() {
2239 return String::new();
2240 }
2241 let mut chars = input.chars();
2242 let Some(first) = chars.next() else {
2243 return String::new();
2244 };
2245 let prefix = first.to_uppercase().collect::<String>();
2246 prefix + chars.as_str()
2247}
2248
2249fn should_skip_input(path: &Path) -> bool {
2250 matches!(
2251 path.file_name().and_then(|name| name.to_str()),
2252 Some(".DS_Store")
2253 )
2254}
2255
2256fn should_skip_dir(path: &Path) -> bool {
2257 matches!(
2258 path.file_name().and_then(|name| name.to_str()),
2259 Some(name) if name.starts_with('.')
2260 )
2261}
2262
2263enum InputSet {
2264 Stdin,
2265 Files(Vec<PathBuf>),
2266}
2267
2268#[cfg(feature = "clip")]
2270fn is_image_file(path: &std::path::Path) -> bool {
2271 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2272 return false;
2273 };
2274 matches!(
2275 ext.to_ascii_lowercase().as_str(),
2276 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2277 )
2278}
2279
2280fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2281 let Some(raw) = input else {
2282 return Ok(InputSet::Stdin);
2283 };
2284
2285 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2286 let mut files = Vec::new();
2287 let mut matched_any = false;
2288 for entry in glob(raw)? {
2289 let path = entry?;
2290 if path.is_file() {
2291 matched_any = true;
2292 if should_skip_input(&path) {
2293 continue;
2294 }
2295 files.push(path);
2296 }
2297 }
2298 files.sort();
2299 if files.is_empty() {
2300 if matched_any {
2301 anyhow::bail!(
2302 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2303 );
2304 }
2305 anyhow::bail!("pattern '{raw}' did not match any files");
2306 }
2307 return Ok(InputSet::Files(files));
2308 }
2309
2310 let path = PathBuf::from(raw);
2311 if path.is_dir() {
2312 let (mut files, skipped_any) = collect_directory_files(&path)?;
2313 files.sort();
2314 if files.is_empty() {
2315 if skipped_any {
2316 anyhow::bail!(
2317 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2318 path.display()
2319 );
2320 }
2321 anyhow::bail!(
2322 "directory '{}' contains no ingestible files",
2323 path.display()
2324 );
2325 }
2326 Ok(InputSet::Files(files))
2327 } else {
2328 Ok(InputSet::Files(vec![path]))
2329 }
2330}
2331
2332fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2333 let mut files = Vec::new();
2334 let mut skipped_any = false;
2335 let mut stack = vec![root.to_path_buf()];
2336 while let Some(dir) = stack.pop() {
2337 for entry in fs::read_dir(&dir)? {
2338 let entry = entry?;
2339 let path = entry.path();
2340 let file_type = entry.file_type()?;
2341 if file_type.is_dir() {
2342 if should_skip_dir(&path) {
2343 skipped_any = true;
2344 continue;
2345 }
2346 stack.push(path);
2347 } else if file_type.is_file() {
2348 if should_skip_input(&path) {
2349 skipped_any = true;
2350 continue;
2351 }
2352 files.push(path);
2353 }
2354 }
2355 }
2356 Ok((files, skipped_any))
2357}
2358
2359struct IngestOutcome {
2360 report: PutReport,
2361 embedded: bool,
2362 #[cfg(feature = "parallel_segments")]
2363 parallel_input: Option<ParallelInput>,
2364}
2365
2366struct PutReport {
2367 seq: u64,
2368 #[allow(dead_code)] frame_id: u64,
2370 uri: String,
2371 title: Option<String>,
2372 original_bytes: usize,
2373 stored_bytes: usize,
2374 compressed: bool,
2375 no_raw: bool,
2377 source: Option<PathBuf>,
2378 mime: Option<String>,
2379 metadata: Option<DocMetadata>,
2380 extraction: ExtractionSummary,
2381 #[cfg(feature = "logic_mesh")]
2383 search_text: Option<String>,
2384}
2385
2386struct CapacityGuard {
2387 #[allow(dead_code)]
2388 tier: Tier,
2389 capacity: u64,
2390 current_size: u64,
2391}
2392
2393impl CapacityGuard {
2394 const MIN_OVERHEAD: u64 = 128 * 1024;
2395 const RELATIVE_OVERHEAD: f64 = 0.05;
2396
2397 fn from_stats(stats: &Stats) -> Option<Self> {
2398 if stats.capacity_bytes == 0 {
2399 return None;
2400 }
2401 Some(Self {
2402 tier: stats.tier,
2403 capacity: stats.capacity_bytes,
2404 current_size: stats.size_bytes,
2405 })
2406 }
2407
2408 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2409 let projected = self
2410 .current_size
2411 .saturating_add(additional)
2412 .saturating_add(Self::estimate_overhead(additional));
2413 if projected > self.capacity {
2414 return Err(map_put_error(
2415 MemvidError::CapacityExceeded {
2416 current: self.current_size,
2417 limit: self.capacity,
2418 required: projected,
2419 },
2420 Some(self.capacity),
2421 ));
2422 }
2423 Ok(())
2424 }
2425
2426 fn update_after_commit(&mut self, stats: &Stats) {
2427 self.current_size = stats.size_bytes;
2428 }
2429
2430 fn capacity_hint(&self) -> Option<u64> {
2431 Some(self.capacity)
2432 }
2433
2434 fn check_and_warn_capacity(&self) {
2436 if self.capacity == 0 {
2437 return;
2438 }
2439
2440 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2441
2442 if usage_pct >= 90.0 && usage_pct < 91.0 {
2444 eprintln!(
2445 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2446 format_bytes(self.current_size),
2447 format_bytes(self.capacity)
2448 );
2449 eprintln!(" Actions:");
2450 eprintln!(" 1. Recreate with larger --size");
2451 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2452 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2453 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2454 eprintln!(
2455 "⚠️ WARNING: 75% capacity used ({} / {})",
2456 format_bytes(self.current_size),
2457 format_bytes(self.capacity)
2458 );
2459 eprintln!(" Consider planning for capacity expansion soon");
2460 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2461 eprintln!(
2462 "ℹ️ INFO: 50% capacity used ({} / {})",
2463 format_bytes(self.current_size),
2464 format_bytes(self.capacity)
2465 );
2466 }
2467 }
2468
2469 fn estimate_overhead(additional: u64) -> u64 {
2470 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2471 fractional.max(Self::MIN_OVERHEAD)
2472 }
2473}
2474
2475#[derive(Debug, Clone)]
2476pub struct FileAnalysis {
2477 pub mime: String,
2478 pub metadata: Option<DocMetadata>,
2479 pub search_text: Option<String>,
2480 pub extraction: ExtractionSummary,
2481}
2482
2483#[derive(Clone, Copy)]
2484pub struct AudioAnalyzeOptions {
2485 force: bool,
2486 segment_secs: u32,
2487 transcribe: bool,
2488}
2489
2490impl AudioAnalyzeOptions {
2491 const DEFAULT_SEGMENT_SECS: u32 = 30;
2492
2493 fn normalised_segment_secs(self) -> u32 {
2494 self.segment_secs.max(1)
2495 }
2496}
2497
2498impl Default for AudioAnalyzeOptions {
2499 fn default() -> Self {
2500 Self {
2501 force: false,
2502 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2503 transcribe: false,
2504 }
2505 }
2506}
2507
2508struct AudioAnalysis {
2509 metadata: DocAudioMetadata,
2510 caption: Option<String>,
2511 search_terms: Vec<String>,
2512 transcript: Option<String>,
2513}
2514
2515struct ImageAnalysis {
2516 width: u32,
2517 height: u32,
2518 palette: Vec<String>,
2519 caption: Option<String>,
2520 exif: Option<DocExifMetadata>,
2521}
2522
2523#[derive(Debug, Clone)]
2524pub struct ExtractionSummary {
2525 reader: Option<String>,
2526 status: ExtractionStatus,
2527 warnings: Vec<String>,
2528 duration_ms: Option<u64>,
2529 pages_processed: Option<u32>,
2530}
2531
2532impl ExtractionSummary {
2533 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2534 self.warnings.push(warning.into());
2535 }
2536}
2537
2538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2539enum ExtractionStatus {
2540 Skipped,
2541 Ok,
2542 FallbackUsed,
2543 Empty,
2544 Failed,
2545}
2546
2547impl ExtractionStatus {
2548 fn label(self) -> &'static str {
2549 match self {
2550 Self::Skipped => "skipped",
2551 Self::Ok => "ok",
2552 Self::FallbackUsed => "fallback_used",
2553 Self::Empty => "empty",
2554 Self::Failed => "failed",
2555 }
2556 }
2557}
2558
2559fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2560 let filename = source_path
2561 .and_then(|path| path.file_name())
2562 .and_then(|name| name.to_str())
2563 .map(|value| value.to_string());
2564 MediaManifest {
2565 kind: "video".to_string(),
2566 mime: mime.to_string(),
2567 bytes,
2568 filename,
2569 duration_ms: None,
2570 width: None,
2571 height: None,
2572 codec: None,
2573 }
2574}
2575
2576pub fn analyze_file(
2577 source_path: Option<&Path>,
2578 payload: &[u8],
2579 payload_utf8: Option<&str>,
2580 inferred_title: Option<&str>,
2581 audio_opts: AudioAnalyzeOptions,
2582 force_video: bool,
2583) -> FileAnalysis {
2584 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2585 let is_video = force_video || mime.starts_with("video/");
2586 let treat_as_text = if is_video { false } else { treat_as_text_base };
2587
2588 let mut metadata = DocMetadata::default();
2589 metadata.mime = Some(mime.clone());
2590 metadata.bytes = Some(payload.len() as u64);
2591 metadata.hash = Some(hash(payload).to_hex().to_string());
2592
2593 let mut search_text = None;
2594
2595 let mut extraction = ExtractionSummary {
2596 reader: None,
2597 status: ExtractionStatus::Skipped,
2598 warnings: Vec::new(),
2599 duration_ms: None,
2600 pages_processed: None,
2601 };
2602
2603 let reader_registry = default_reader_registry();
2604 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2605 if slice.is_empty() {
2606 None
2607 } else {
2608 Some(slice)
2609 }
2610 });
2611
2612 let apply_extracted_text = |text: &str,
2613 reader_label: &str,
2614 status_if_applied: ExtractionStatus,
2615 extraction: &mut ExtractionSummary,
2616 metadata: &mut DocMetadata,
2617 search_text: &mut Option<String>|
2618 -> bool {
2619 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2620 let mut value = normalized.text;
2621 if normalized.truncated {
2622 value.push('…');
2623 }
2624 if metadata.caption.is_none() {
2625 if let Some(caption) = caption_from_text(&value) {
2626 metadata.caption = Some(caption);
2627 }
2628 }
2629 *search_text = Some(value);
2630 extraction.reader = Some(reader_label.to_string());
2631 extraction.status = status_if_applied;
2632 true
2633 } else {
2634 false
2635 }
2636 };
2637
2638 if is_video {
2639 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2640 }
2641
2642 if treat_as_text {
2643 if let Some(text) = payload_utf8 {
2644 if !apply_extracted_text(
2645 text,
2646 "bytes",
2647 ExtractionStatus::Ok,
2648 &mut extraction,
2649 &mut metadata,
2650 &mut search_text,
2651 ) {
2652 extraction.status = ExtractionStatus::Empty;
2653 extraction.reader = Some("bytes".to_string());
2654 let msg = "text payload contained no searchable content after normalization";
2655 extraction.record_warning(msg);
2656 warn!("{msg}");
2657 }
2658 } else {
2659 extraction.reader = Some("bytes".to_string());
2660 extraction.status = ExtractionStatus::Failed;
2661 let msg = "payload reported as text but was not valid UTF-8";
2662 extraction.record_warning(msg);
2663 warn!("{msg}");
2664 }
2665 } else if mime == "application/pdf"
2666 || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2667 || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2668 || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2669 || mime == "application/vnd.ms-excel"
2670 || mime == "application/msword"
2671 {
2672 let mut applied = false;
2674
2675 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2676 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2677 .with_uri(source_path.and_then(|p| p.to_str()))
2678 .with_magic(magic);
2679
2680 if let Some(reader) = reader_registry.find_reader(&hint) {
2681 match reader.extract(payload, &hint) {
2682 Ok(output) => {
2683 extraction.reader = Some(output.reader_name.clone());
2684 if let Some(ms) = output.diagnostics.duration_ms {
2685 extraction.duration_ms = Some(ms);
2686 }
2687 if let Some(pages) = output.diagnostics.pages_processed {
2688 extraction.pages_processed = Some(pages);
2689 }
2690 if let Some(mime_type) = output.document.mime_type.clone() {
2691 metadata.mime = Some(mime_type);
2692 }
2693
2694 for warning in output.diagnostics.warnings.iter() {
2695 extraction.record_warning(warning);
2696 warn!("{warning}");
2697 }
2698
2699 let status = if output.diagnostics.fallback {
2700 ExtractionStatus::FallbackUsed
2701 } else {
2702 ExtractionStatus::Ok
2703 };
2704
2705 if let Some(doc_text) = output.document.text.as_ref() {
2706 applied = apply_extracted_text(
2707 doc_text,
2708 output.reader_name.as_str(),
2709 status,
2710 &mut extraction,
2711 &mut metadata,
2712 &mut search_text,
2713 );
2714 if !applied {
2715 extraction.reader = Some(output.reader_name);
2716 extraction.status = ExtractionStatus::Empty;
2717 let msg = "primary reader returned no usable text";
2718 extraction.record_warning(msg);
2719 warn!("{msg}");
2720 }
2721 } else {
2722 extraction.reader = Some(output.reader_name);
2723 extraction.status = ExtractionStatus::Empty;
2724 let msg = "primary reader returned empty text";
2725 extraction.record_warning(msg);
2726 warn!("{msg}");
2727 }
2728 }
2729 Err(err) => {
2730 let name = reader.name();
2731 extraction.reader = Some(name.to_string());
2732 extraction.status = ExtractionStatus::Failed;
2733 let msg = format!("primary reader {name} failed: {err}");
2734 extraction.record_warning(&msg);
2735 warn!("{msg}");
2736 }
2737 }
2738 } else {
2739 extraction.record_warning("no registered reader matched this PDF payload");
2740 warn!("no registered reader matched this PDF payload");
2741 }
2742
2743 if !applied {
2744 match extract_pdf_text(payload) {
2745 Ok(pdf_text) => {
2746 if !apply_extracted_text(
2747 &pdf_text,
2748 "pdf_extract",
2749 ExtractionStatus::FallbackUsed,
2750 &mut extraction,
2751 &mut metadata,
2752 &mut search_text,
2753 ) {
2754 extraction.reader = Some("pdf_extract".to_string());
2755 extraction.status = ExtractionStatus::Empty;
2756 let msg = "PDF text extraction yielded no searchable content";
2757 extraction.record_warning(msg);
2758 warn!("{msg}");
2759 }
2760 }
2761 Err(err) => {
2762 extraction.reader = Some("pdf_extract".to_string());
2763 extraction.status = ExtractionStatus::Failed;
2764 let msg = format!("failed to extract PDF text via fallback: {err}");
2765 extraction.record_warning(&msg);
2766 warn!("{msg}");
2767 }
2768 }
2769 }
2770 }
2771
2772 if !is_video && mime.starts_with("image/") {
2773 if let Some(image_meta) = analyze_image(payload) {
2774 let ImageAnalysis {
2775 width,
2776 height,
2777 palette,
2778 caption,
2779 exif,
2780 } = image_meta;
2781 metadata.width = Some(width);
2782 metadata.height = Some(height);
2783 if !palette.is_empty() {
2784 metadata.colors = Some(palette);
2785 }
2786 if metadata.caption.is_none() {
2787 metadata.caption = caption;
2788 }
2789 if let Some(exif) = exif {
2790 metadata.exif = Some(exif);
2791 }
2792 }
2793 }
2794
2795 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2796 if let Some(AudioAnalysis {
2797 metadata: audio_metadata,
2798 caption: audio_caption,
2799 mut search_terms,
2800 transcript,
2801 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2802 {
2803 if metadata.audio.is_none()
2804 || metadata
2805 .audio
2806 .as_ref()
2807 .map_or(true, DocAudioMetadata::is_empty)
2808 {
2809 metadata.audio = Some(audio_metadata);
2810 }
2811 if metadata.caption.is_none() {
2812 metadata.caption = audio_caption;
2813 }
2814
2815 if let Some(ref text) = transcript {
2817 extraction.reader = Some("whisper".to_string());
2818 extraction.status = ExtractionStatus::Ok;
2819 search_text = Some(text.clone());
2820 } else if !search_terms.is_empty() {
2821 match &mut search_text {
2822 Some(existing) => {
2823 for term in search_terms.drain(..) {
2824 if !existing.contains(&term) {
2825 if !existing.ends_with(' ') {
2826 existing.push(' ');
2827 }
2828 existing.push_str(&term);
2829 }
2830 }
2831 }
2832 None => {
2833 search_text = Some(search_terms.join(" "));
2834 }
2835 }
2836 }
2837 }
2838 }
2839
2840 if metadata.caption.is_none() {
2841 if let Some(title) = inferred_title {
2842 let trimmed = title.trim();
2843 if !trimmed.is_empty() {
2844 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2845 }
2846 }
2847 }
2848
2849 FileAnalysis {
2850 mime,
2851 metadata: if metadata.is_empty() {
2852 None
2853 } else {
2854 Some(metadata)
2855 },
2856 search_text,
2857 extraction,
2858 }
2859}
2860
2861fn default_reader_registry() -> &'static ReaderRegistry {
2862 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2863 REGISTRY.get_or_init(ReaderRegistry::default)
2864}
2865
2866fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2867 if detect_pdf_magic(magic) {
2868 return Some(DocumentFormat::Pdf);
2869 }
2870
2871 let mime = mime?.trim().to_ascii_lowercase();
2872 match mime.as_str() {
2873 "application/pdf" => Some(DocumentFormat::Pdf),
2874 "text/plain" => Some(DocumentFormat::PlainText),
2875 "text/markdown" => Some(DocumentFormat::Markdown),
2876 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2877 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2878 Some(DocumentFormat::Docx)
2879 }
2880 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2881 Some(DocumentFormat::Pptx)
2882 }
2883 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2884 Some(DocumentFormat::Xlsx)
2885 }
2886 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2887 _ => None,
2888 }
2889}
2890
2891fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2892 let mut slice = match magic {
2893 Some(slice) if !slice.is_empty() => slice,
2894 _ => return false,
2895 };
2896 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2897 slice = &slice[3..];
2898 }
2899 while let Some((first, rest)) = slice.split_first() {
2900 if first.is_ascii_whitespace() {
2901 slice = rest;
2902 } else {
2903 break;
2904 }
2905 }
2906 slice.starts_with(b"%PDF")
2907}
2908
2909fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2912 let mut best_text = String::new();
2913 let mut best_source = "none";
2914
2915 let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2918
2919 match pdf_extract::extract_text_from_mem(payload) {
2921 Ok(text) => {
2922 let trimmed = text.trim();
2923 if trimmed.len() > best_text.len() {
2924 best_text = trimmed.to_string();
2925 best_source = "pdf_extract";
2926 }
2927 if trimmed.len() >= min_good_chars {
2928 info!("pdf_extract extracted {} chars (good)", trimmed.len());
2929 return Ok(best_text);
2930 } else if !trimmed.is_empty() {
2931 warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2932 }
2933 }
2934 Err(err) => {
2935 warn!("pdf_extract failed: {err}");
2936 }
2937 }
2938
2939 match extract_pdf_with_lopdf(payload) {
2941 Ok(text) => {
2942 let trimmed = text.trim();
2943 if trimmed.len() > best_text.len() {
2944 best_text = trimmed.to_string();
2945 best_source = "lopdf";
2946 }
2947 if trimmed.len() >= min_good_chars {
2948 info!("lopdf extracted {} chars (good)", trimmed.len());
2949 return Ok(best_text);
2950 } else if !trimmed.is_empty() {
2951 warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2952 }
2953 }
2954 Err(err) => {
2955 warn!("lopdf failed: {err}");
2956 }
2957 }
2958
2959 match fallback_pdftotext(payload) {
2961 Ok(text) => {
2962 let trimmed = text.trim();
2963 if trimmed.len() > best_text.len() {
2964 best_text = trimmed.to_string();
2965 best_source = "pdftotext";
2966 }
2967 if !trimmed.is_empty() {
2968 info!("pdftotext extracted {} chars", trimmed.len());
2969 }
2970 }
2971 Err(err) => {
2972 warn!("pdftotext failed: {err}");
2973 }
2974 }
2975
2976 if !best_text.is_empty() {
2978 info!("using {} extraction ({} chars)", best_source, best_text.len());
2979 Ok(best_text)
2980 } else {
2981 warn!("all PDF extraction methods failed, storing without searchable text");
2982 Ok(String::new())
2983 }
2984}
2985
2986fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2988 use lopdf::Document;
2989
2990 let mut document = Document::load_mem(payload)
2991 .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2992
2993 if document.is_encrypted() {
2995 let _ = document.decrypt("");
2996 }
2997
2998 let _ = document.decompress();
3000
3001 let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
3003 if page_numbers.is_empty() {
3004 return Err(anyhow!("PDF has no pages"));
3005 }
3006 page_numbers.sort_unstable();
3007
3008 if page_numbers.len() > 4096 {
3010 page_numbers.truncate(4096);
3011 warn!("PDF has more than 4096 pages, truncating extraction");
3012 }
3013
3014 let text = document
3016 .extract_text(&page_numbers)
3017 .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
3018
3019 Ok(text.trim().to_string())
3020}
3021
3022fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
3023 use std::io::Write;
3024 use std::process::{Command, Stdio};
3025
3026 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
3027
3028 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
3029 temp_pdf
3030 .write_all(payload)
3031 .context("failed to write pdf payload to temp file")?;
3032 temp_pdf.flush().context("failed to flush temp pdf file")?;
3033
3034 let child = Command::new(pdftotext)
3035 .arg("-layout")
3036 .arg(temp_pdf.path())
3037 .arg("-")
3038 .stdout(Stdio::piped())
3039 .spawn()
3040 .context("failed to spawn pdftotext")?;
3041
3042 let output = child
3043 .wait_with_output()
3044 .context("failed to read pdftotext output")?;
3045
3046 if !output.status.success() {
3047 return Err(anyhow!("pdftotext exited with status {}", output.status));
3048 }
3049
3050 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
3051 Ok(text)
3052}
3053
3054fn detect_mime(
3055 source_path: Option<&Path>,
3056 payload: &[u8],
3057 payload_utf8: Option<&str>,
3058) -> (String, bool) {
3059 if let Some(path) = source_path {
3062 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3063 let ext_lower = ext.to_ascii_lowercase();
3064 if matches!(
3066 ext_lower.as_str(),
3067 "docx" | "xlsx" | "pptx" | "doc" | "xls" | "ppt"
3068 ) {
3069 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3070 return (mime.to_string(), treat_as_text);
3071 }
3072 }
3073 }
3074 }
3075
3076 if let Some(kind) = infer::get(payload) {
3077 let mime = kind.mime_type().to_string();
3078 let treat_as_text = mime.starts_with("text/")
3079 || matches!(
3080 mime.as_str(),
3081 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
3082 );
3083 return (mime, treat_as_text);
3084 }
3085
3086 let magic = magic_from_u8(payload);
3087 if !magic.is_empty() && magic != "application/octet-stream" {
3088 let treat_as_text = magic.starts_with("text/")
3089 || matches!(
3090 magic,
3091 "application/json"
3092 | "application/xml"
3093 | "application/javascript"
3094 | "image/svg+xml"
3095 | "text/plain"
3096 );
3097 return (magic.to_string(), treat_as_text);
3098 }
3099
3100 if let Some(path) = source_path {
3101 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3102 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3103 return (mime.to_string(), treat_as_text);
3104 }
3105 }
3106 }
3107
3108 if payload_utf8.is_some() {
3109 return ("text/plain".to_string(), true);
3110 }
3111
3112 ("application/octet-stream".to_string(), false)
3113}
3114
3115fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
3116 let ext_lower = ext.to_ascii_lowercase();
3117 match ext_lower.as_str() {
3118 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
3119 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
3120 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
3121 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
3122 "md" | "markdown" => Some(("text/markdown", true)),
3123 "rst" => Some(("text/x-rst", true)),
3124 "json" => Some(("application/json", true)),
3125 "csv" => Some(("text/csv", true)),
3126 "tsv" => Some(("text/tab-separated-values", true)),
3127 "yaml" | "yml" => Some(("application/yaml", true)),
3128 "toml" => Some(("application/toml", true)),
3129 "html" | "htm" => Some(("text/html", true)),
3130 "xml" => Some(("application/xml", true)),
3131 "svg" => Some(("image/svg+xml", true)),
3132 "docx" => Some((
3134 "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
3135 false,
3136 )),
3137 "xlsx" => Some((
3138 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
3139 false,
3140 )),
3141 "pptx" => Some((
3142 "application/vnd.openxmlformats-officedocument.presentationml.presentation",
3143 false,
3144 )),
3145 "doc" => Some(("application/msword", false)),
3147 "xls" => Some(("application/vnd.ms-excel", false)),
3148 "ppt" => Some(("application/vnd.ms-powerpoint", false)),
3149 "gif" => Some(("image/gif", false)),
3150 "jpg" | "jpeg" => Some(("image/jpeg", false)),
3151 "png" => Some(("image/png", false)),
3152 "bmp" => Some(("image/bmp", false)),
3153 "ico" => Some(("image/x-icon", false)),
3154 "tif" | "tiff" => Some(("image/tiff", false)),
3155 "webp" => Some(("image/webp", false)),
3156 _ => None,
3157 }
3158}
3159
3160fn caption_from_text(text: &str) -> Option<String> {
3161 for line in text.lines() {
3162 let trimmed = line.trim();
3163 if !trimmed.is_empty() {
3164 return Some(truncate_to_boundary(trimmed, 240));
3165 }
3166 }
3167 None
3168}
3169
3170fn truncate_to_boundary(text: &str, max_len: usize) -> String {
3171 if text.len() <= max_len {
3172 return text.to_string();
3173 }
3174 let mut end = max_len;
3175 while end > 0 && !text.is_char_boundary(end) {
3176 end -= 1;
3177 }
3178 if end == 0 {
3179 return String::new();
3180 }
3181 let mut truncated = text[..end].trim_end().to_string();
3182 truncated.push('…');
3183 truncated
3184}
3185
3186fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
3187 let reader = ImageReader::new(Cursor::new(payload))
3188 .with_guessed_format()
3189 .map_err(|err| warn!("failed to guess image format: {err}"))
3190 .ok()?;
3191 let image = reader
3192 .decode()
3193 .map_err(|err| warn!("failed to decode image: {err}"))
3194 .ok()?;
3195 let width = image.width();
3196 let height = image.height();
3197 let rgb = image.to_rgb8();
3198 let palette = match get_palette(
3199 rgb.as_raw(),
3200 ColorFormat::Rgb,
3201 COLOR_PALETTE_SIZE,
3202 COLOR_PALETTE_QUALITY,
3203 ) {
3204 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3205 Err(err) => {
3206 warn!("failed to compute colour palette: {err}");
3207 Vec::new()
3208 }
3209 };
3210
3211 let (exif, exif_caption) = extract_exif_metadata(payload);
3212
3213 Some(ImageAnalysis {
3214 width,
3215 height,
3216 palette,
3217 caption: exif_caption,
3218 exif,
3219 })
3220}
3221
3222fn color_to_hex(color: Color) -> String {
3223 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3224}
3225
3226fn analyze_audio(
3227 payload: &[u8],
3228 source_path: Option<&Path>,
3229 mime: &str,
3230 options: AudioAnalyzeOptions,
3231) -> Option<AudioAnalysis> {
3232 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3233 use symphonia::core::errors::Error as SymphoniaError;
3234 use symphonia::core::formats::FormatOptions;
3235 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3236 use symphonia::core::meta::MetadataOptions;
3237 use symphonia::core::probe::Hint;
3238
3239 let cursor = Cursor::new(payload.to_vec());
3240 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3241
3242 let mut hint = Hint::new();
3243 if let Some(path) = source_path {
3244 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3245 hint.with_extension(ext);
3246 }
3247 }
3248 if let Some(suffix) = mime.split('/').nth(1) {
3249 hint.with_extension(suffix);
3250 }
3251
3252 let probed = symphonia::default::get_probe()
3253 .format(
3254 &hint,
3255 mss,
3256 &FormatOptions::default(),
3257 &MetadataOptions::default(),
3258 )
3259 .map_err(|err| warn!("failed to probe audio stream: {err}"))
3260 .ok()?;
3261
3262 let mut format = probed.format;
3263 let track = format.default_track()?;
3264 let track_id = track.id;
3265 let codec_params: CodecParameters = track.codec_params.clone();
3266 let sample_rate = codec_params.sample_rate;
3267 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3268
3269 let mut decoder = symphonia::default::get_codecs()
3270 .make(&codec_params, &DecoderOptions::default())
3271 .map_err(|err| warn!("failed to create audio decoder: {err}"))
3272 .ok()?;
3273
3274 let mut decoded_frames: u64 = 0;
3275 loop {
3276 let packet = match format.next_packet() {
3277 Ok(packet) => packet,
3278 Err(SymphoniaError::IoError(_)) => break,
3279 Err(SymphoniaError::DecodeError(err)) => {
3280 warn!("skipping undecodable audio packet: {err}");
3281 continue;
3282 }
3283 Err(SymphoniaError::ResetRequired) => {
3284 decoder.reset();
3285 continue;
3286 }
3287 Err(other) => {
3288 warn!("stopping audio analysis due to error: {other}");
3289 break;
3290 }
3291 };
3292
3293 if packet.track_id() != track_id {
3294 continue;
3295 }
3296
3297 match decoder.decode(&packet) {
3298 Ok(audio_buf) => {
3299 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3300 }
3301 Err(SymphoniaError::DecodeError(err)) => {
3302 warn!("failed to decode audio packet: {err}");
3303 }
3304 Err(SymphoniaError::IoError(_)) => break,
3305 Err(SymphoniaError::ResetRequired) => {
3306 decoder.reset();
3307 }
3308 Err(other) => {
3309 warn!("decoder error: {other}");
3310 break;
3311 }
3312 }
3313 }
3314
3315 let duration_secs = match sample_rate {
3316 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3317 _ => 0.0,
3318 };
3319
3320 let bitrate_kbps = if duration_secs > 0.0 {
3321 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3322 } else {
3323 None
3324 };
3325
3326 let tags = extract_lofty_tags(payload);
3327 let caption = tags
3328 .get("title")
3329 .cloned()
3330 .or_else(|| tags.get("tracktitle").cloned());
3331
3332 let mut search_terms = Vec::new();
3333 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3334 search_terms.push(title.clone());
3335 }
3336 if let Some(artist) = tags.get("artist") {
3337 search_terms.push(artist.clone());
3338 }
3339 if let Some(album) = tags.get("album") {
3340 search_terms.push(album.clone());
3341 }
3342 if let Some(genre) = tags.get("genre") {
3343 search_terms.push(genre.clone());
3344 }
3345
3346 let mut segments = Vec::new();
3347 if duration_secs > 0.0 {
3348 let segment_len = options.normalised_segment_secs() as f64;
3349 if segment_len > 0.0 {
3350 let mut start = 0.0;
3351 let mut idx = 1usize;
3352 while start < duration_secs {
3353 let end = (start + segment_len).min(duration_secs);
3354 segments.push(AudioSegmentMetadata {
3355 start_seconds: start as f32,
3356 end_seconds: end as f32,
3357 label: Some(format!("Segment {}", idx)),
3358 });
3359 if end >= duration_secs {
3360 break;
3361 }
3362 start = end;
3363 idx += 1;
3364 }
3365 }
3366 }
3367
3368 let mut metadata = DocAudioMetadata::default();
3369 if duration_secs > 0.0 {
3370 metadata.duration_secs = Some(duration_secs as f32);
3371 }
3372 if let Some(rate) = sample_rate {
3373 metadata.sample_rate_hz = Some(rate);
3374 }
3375 if let Some(channels) = channel_count {
3376 metadata.channels = Some(channels);
3377 }
3378 if let Some(bitrate) = bitrate_kbps {
3379 metadata.bitrate_kbps = Some(bitrate);
3380 }
3381 if codec_params.codec != CODEC_TYPE_NULL {
3382 metadata.codec = Some(format!("{:?}", codec_params.codec));
3383 }
3384 if !segments.is_empty() {
3385 metadata.segments = segments;
3386 }
3387 if !tags.is_empty() {
3388 metadata.tags = tags
3389 .iter()
3390 .map(|(k, v)| (k.clone(), v.clone()))
3391 .collect::<BTreeMap<_, _>>();
3392 }
3393
3394 let transcript = if options.transcribe {
3396 transcribe_audio(source_path)
3397 } else {
3398 None
3399 };
3400
3401 Some(AudioAnalysis {
3402 metadata,
3403 caption,
3404 search_terms,
3405 transcript,
3406 })
3407}
3408
3409#[cfg(feature = "whisper")]
3411fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3412 use memvid_core::{WhisperConfig, WhisperTranscriber};
3413
3414 let path = source_path?;
3415
3416 tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3417
3418 let config = WhisperConfig::default();
3419 let mut transcriber = match WhisperTranscriber::new(&config) {
3420 Ok(t) => t,
3421 Err(e) => {
3422 tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3423 return None;
3424 }
3425 };
3426
3427 match transcriber.transcribe_file(path) {
3428 Ok(result) => {
3429 tracing::info!(
3430 duration = result.duration_secs,
3431 text_len = result.text.len(),
3432 "Audio transcription complete"
3433 );
3434 Some(result.text)
3435 }
3436 Err(e) => {
3437 tracing::warn!(error = %e, "Failed to transcribe audio");
3438 None
3439 }
3440 }
3441}
3442
3443#[cfg(not(feature = "whisper"))]
3444fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3445 tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3446 None
3447}
3448
3449fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3450 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3451
3452 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3453 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3454 out.entry("title".into())
3455 .or_insert_with(|| value.to_string());
3456 out.entry("tracktitle".into())
3457 .or_insert_with(|| value.to_string());
3458 }
3459 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3460 out.entry("artist".into())
3461 .or_insert_with(|| value.to_string());
3462 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3463 out.entry("artist".into())
3464 .or_insert_with(|| value.to_string());
3465 }
3466 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3467 out.entry("album".into())
3468 .or_insert_with(|| value.to_string());
3469 }
3470 if let Some(value) = tag.get_string(&ItemKey::Genre) {
3471 out.entry("genre".into())
3472 .or_insert_with(|| value.to_string());
3473 }
3474 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3475 out.entry("track_number".into())
3476 .or_insert_with(|| value.to_string());
3477 }
3478 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3479 out.entry("disc_number".into())
3480 .or_insert_with(|| value.to_string());
3481 }
3482 }
3483
3484 let mut tags = HashMap::new();
3485 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3486 Ok(probe) => probe,
3487 Err(err) => {
3488 warn!("failed to guess audio tag file type: {err}");
3489 return tags;
3490 }
3491 };
3492
3493 let tagged_file = match probe.read() {
3494 Ok(file) => file,
3495 Err(err) => {
3496 warn!("failed to read audio tags: {err}");
3497 return tags;
3498 }
3499 };
3500
3501 if let Some(primary) = tagged_file.primary_tag() {
3502 collect_tag(primary, &mut tags);
3503 }
3504 for tag in tagged_file.tags() {
3505 collect_tag(tag, &mut tags);
3506 }
3507
3508 tags
3509}
3510
3511fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3512 let mut cursor = Cursor::new(payload);
3513 let exif = match ExifReader::new().read_from_container(&mut cursor) {
3514 Ok(exif) => exif,
3515 Err(err) => {
3516 warn!("failed to read EXIF metadata: {err}");
3517 return (None, None);
3518 }
3519 };
3520
3521 let mut doc = DocExifMetadata::default();
3522 let mut has_data = false;
3523
3524 if let Some(make) = exif_string(&exif, Tag::Make) {
3525 doc.make = Some(make);
3526 has_data = true;
3527 }
3528 if let Some(model) = exif_string(&exif, Tag::Model) {
3529 doc.model = Some(model);
3530 has_data = true;
3531 }
3532 if let Some(lens) =
3533 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3534 {
3535 doc.lens = Some(lens);
3536 has_data = true;
3537 }
3538 if let Some(dt) =
3539 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3540 {
3541 doc.datetime = Some(dt);
3542 has_data = true;
3543 }
3544
3545 if let Some(gps) = extract_gps_metadata(&exif) {
3546 doc.gps = Some(gps);
3547 has_data = true;
3548 }
3549
3550 let caption = exif_string(&exif, Tag::ImageDescription);
3551
3552 let metadata = if has_data { Some(doc) } else { None };
3553 (metadata, caption)
3554}
3555
3556fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3557 exif.fields()
3558 .find(|field| field.tag == tag)
3559 .and_then(|field| field_to_string(field, exif))
3560}
3561
3562fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3563 let value = field.display_value().with_unit(exif).to_string();
3564 let trimmed = value.trim_matches('\0').trim();
3565 if trimmed.is_empty() {
3566 None
3567 } else {
3568 Some(trimmed.to_string())
3569 }
3570}
3571
3572fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3573 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3574 let latitude_ref_field = exif
3575 .fields()
3576 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3577 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3578 let longitude_ref_field = exif
3579 .fields()
3580 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3581
3582 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3583 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3584
3585 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3586 if reference.eq_ignore_ascii_case("S") {
3587 latitude = -latitude;
3588 }
3589 }
3590 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3591 if reference.eq_ignore_ascii_case("W") {
3592 longitude = -longitude;
3593 }
3594 }
3595
3596 Some(DocGpsMetadata {
3597 latitude,
3598 longitude,
3599 })
3600}
3601
3602fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3603 match value {
3604 ExifValue::Rational(values) if !values.is_empty() => {
3605 let deg = rational_to_f64_u(values.get(0)?)?;
3606 let min = values
3607 .get(1)
3608 .and_then(|v| rational_to_f64_u(v))
3609 .unwrap_or(0.0);
3610 let sec = values
3611 .get(2)
3612 .and_then(|v| rational_to_f64_u(v))
3613 .unwrap_or(0.0);
3614 Some(deg + (min / 60.0) + (sec / 3600.0))
3615 }
3616 ExifValue::SRational(values) if !values.is_empty() => {
3617 let deg = rational_to_f64_i(values.get(0)?)?;
3618 let min = values
3619 .get(1)
3620 .and_then(|v| rational_to_f64_i(v))
3621 .unwrap_or(0.0);
3622 let sec = values
3623 .get(2)
3624 .and_then(|v| rational_to_f64_i(v))
3625 .unwrap_or(0.0);
3626 Some(deg + (min / 60.0) + (sec / 3600.0))
3627 }
3628 _ => None,
3629 }
3630}
3631
3632fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3633 if value.denom == 0 {
3634 None
3635 } else {
3636 Some(value.num as f64 / value.denom as f64)
3637 }
3638}
3639
3640fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3641 if value.denom == 0 {
3642 None
3643 } else {
3644 Some(value.num as f64 / value.denom as f64)
3645 }
3646}
3647
3648fn value_to_ascii(value: &ExifValue) -> Option<String> {
3649 if let ExifValue::Ascii(values) = value {
3650 values
3651 .first()
3652 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3653 .map(|s| s.trim_matches('\0').trim().to_string())
3654 .filter(|s| !s.is_empty())
3655 } else {
3656 None
3657 }
3658}
3659
3660fn ingest_payload(
3661 mem: &mut Memvid,
3662 args: &PutArgs,
3663 config: &CliConfig,
3664 payload: Vec<u8>,
3665 source_path: Option<&Path>,
3666 capacity_guard: Option<&mut CapacityGuard>,
3667 enable_embedding: bool,
3668 runtime: Option<&EmbeddingRuntime>,
3669 parallel_mode: bool,
3670) -> Result<IngestOutcome> {
3671 let original_bytes = payload.len();
3672 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3673
3674 let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3677 crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3678
3679 let payload_text = std::str::from_utf8(&payload).ok();
3680 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3681
3682 let audio_opts = AudioAnalyzeOptions {
3683 force: args.audio || args.transcribe,
3684 segment_secs: args
3685 .audio_segment_seconds
3686 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3687 transcribe: args.transcribe,
3688 };
3689
3690 let mut analysis = analyze_file(
3691 source_path,
3692 &payload,
3693 payload_text,
3694 inferred_title.as_deref(),
3695 audio_opts,
3696 args.video,
3697 );
3698
3699 if args.video && !analysis.mime.starts_with("video/") {
3700 anyhow::bail!(
3701 "--video requires a video input; detected MIME type {}",
3702 analysis.mime
3703 );
3704 }
3705
3706 let mut search_text = analysis.search_text.clone();
3707 if let Some(ref mut text) = search_text {
3708 if text.len() > MAX_SEARCH_TEXT_LEN {
3709 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3710 *text = truncated;
3711 }
3712 }
3713 analysis.search_text = search_text.clone();
3714
3715 let uri = if let Some(ref explicit) = args.uri {
3716 derive_uri(Some(explicit), None)
3717 } else if args.video {
3718 derive_video_uri(&payload, source_path, &analysis.mime)
3719 } else {
3720 derive_uri(None, source_path)
3721 };
3722
3723 let mut options_builder = PutOptions::builder()
3724 .enable_embedding(enable_embedding)
3725 .auto_tag(!args.no_auto_tag)
3726 .extract_dates(!args.no_extract_dates)
3727 .extract_triplets(!args.no_extract_triplets);
3728 if let Some(ts) = args.timestamp {
3729 options_builder = options_builder.timestamp(ts);
3730 }
3731 if let Some(track) = &args.track {
3732 options_builder = options_builder.track(track.clone());
3733 }
3734 if args.video {
3735 options_builder = options_builder.kind("video");
3736 options_builder = options_builder.tag("kind", "video");
3737 options_builder = options_builder.push_tag("video");
3738 } else if let Some(kind) = &args.kind {
3739 options_builder = options_builder.kind(kind.clone());
3740 }
3741 options_builder = options_builder.uri(uri.clone());
3742 if let Some(ref title) = inferred_title {
3743 options_builder = options_builder.title(title.clone());
3744 }
3745 for (key, value) in &args.tags {
3746 options_builder = options_builder.tag(key.clone(), value.clone());
3747 if !key.is_empty() {
3748 options_builder = options_builder.push_tag(key.clone());
3749 }
3750 if !value.is_empty() && value != key {
3751 options_builder = options_builder.push_tag(value.clone());
3752 }
3753 }
3754 for label in &args.labels {
3755 options_builder = options_builder.label(label.clone());
3756 }
3757 if let Some(metadata) = analysis.metadata.clone() {
3758 if !metadata.is_empty() {
3759 options_builder = options_builder.metadata(metadata);
3760 }
3761 }
3762 for (idx, entry) in args.metadata.iter().enumerate() {
3763 match serde_json::from_str::<DocMetadata>(entry) {
3764 Ok(meta) => {
3765 options_builder = options_builder.metadata(meta);
3766 }
3767 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3768 Ok(value) => {
3769 options_builder =
3770 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3771 }
3772 Err(err) => {
3773 warn!("failed to parse --metadata JSON: {err}");
3774 }
3775 },
3776 }
3777 }
3778 if let Some(text) = search_text.clone() {
3779 if !text.trim().is_empty() {
3780 options_builder = options_builder.search_text(text);
3781 }
3782 }
3783 let effective_no_raw = !args.raw; if effective_no_raw {
3787 options_builder = options_builder.no_raw(true);
3788 if let Some(path) = source_path {
3789 options_builder = options_builder.source_path(path.display().to_string());
3790 }
3791 }
3792 if args.dedup {
3794 options_builder = options_builder.dedup(true);
3795 }
3796 let mut options = options_builder.build();
3797
3798 if let Some(budget_ms) = args.extraction_budget_ms {
3800 options.extraction_budget_ms = budget_ms;
3801 }
3802
3803 let existing_frame = if args.update_existing || !args.allow_duplicate {
3804 match mem.frame_by_uri(&uri) {
3805 Ok(frame) => Some(frame),
3806 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3807 Err(err) => return Err(err.into()),
3808 }
3809 } else {
3810 None
3811 };
3812
3813 let frame_id = if let Some(ref existing) = existing_frame {
3816 existing.id
3817 } else {
3818 mem.stats()?.frame_count
3819 };
3820
3821 let mut embedded = false;
3822 let allow_embedding = enable_embedding && !args.video;
3823 if enable_embedding && !allow_embedding && args.video {
3824 warn!("semantic embeddings are not generated for video payloads");
3825 }
3826 let seq = if let Some(existing) = existing_frame {
3827 if args.update_existing {
3828 let payload_for_update = payload.clone();
3829 if allow_embedding {
3830 if let Some(path) = args.embedding_vec.as_deref() {
3831 let embedding = crate::utils::read_embedding(path)?;
3832 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3833 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3834 let expected = choice.dimensions();
3835 if expected != 0 && embedding.len() != expected {
3836 anyhow::bail!(
3837 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3838 embedding.len(),
3839 choice.name(),
3840 expected
3841 );
3842 }
3843 apply_embedding_identity_metadata_from_choice(
3844 &mut options,
3845 choice,
3846 embedding.len(),
3847 Some(model_str),
3848 );
3849 } else {
3850 options.extra_metadata.insert(
3851 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3852 embedding.len().to_string(),
3853 );
3854 }
3855 embedded = true;
3856 mem.update_frame(
3857 existing.id,
3858 Some(payload_for_update),
3859 options.clone(),
3860 Some(embedding),
3861 )
3862 .map_err(|err| {
3863 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3864 })?
3865 } else {
3866 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3867 let embed_text = search_text
3868 .clone()
3869 .or_else(|| payload_text.map(|text| text.to_string()))
3870 .unwrap_or_default();
3871 if embed_text.trim().is_empty() {
3872 warn!("no textual content available; embedding skipped");
3873 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3874 .map_err(|err| {
3875 map_put_error(
3876 err,
3877 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3878 )
3879 })?
3880 } else {
3881 let truncated_text = truncate_for_embedding(&embed_text);
3883 if truncated_text.len() < embed_text.len() {
3884 info!(
3885 "Truncated text from {} to {} chars for embedding",
3886 embed_text.len(),
3887 truncated_text.len()
3888 );
3889 }
3890 let embedding = runtime.embed_passage(&truncated_text)?;
3891 embedded = true;
3892 apply_embedding_identity_metadata(&mut options, runtime);
3893 mem.update_frame(
3894 existing.id,
3895 Some(payload_for_update),
3896 options.clone(),
3897 Some(embedding),
3898 )
3899 .map_err(|err| {
3900 map_put_error(
3901 err,
3902 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3903 )
3904 })?
3905 }
3906 }
3907 } else {
3908 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3909 .map_err(|err| {
3910 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3911 })?
3912 }
3913 } else {
3914 return Err(DuplicateUriError::new(uri.as_str()).into());
3915 }
3916 } else {
3917 if let Some(guard) = capacity_guard.as_ref() {
3918 guard.ensure_capacity(stored_bytes as u64)?;
3919 }
3920 if parallel_mode {
3921 #[cfg(feature = "parallel_segments")]
3922 {
3923 let mut parent_embedding = None;
3924 let mut chunk_embeddings_vec = None;
3925 if allow_embedding {
3926 if let Some(path) = args.embedding_vec.as_deref() {
3927 let embedding = crate::utils::read_embedding(path)?;
3928 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3929 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3930 let expected = choice.dimensions();
3931 if expected != 0 && embedding.len() != expected {
3932 anyhow::bail!(
3933 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3934 embedding.len(),
3935 choice.name(),
3936 expected
3937 );
3938 }
3939 apply_embedding_identity_metadata_from_choice(
3940 &mut options,
3941 choice,
3942 embedding.len(),
3943 Some(model_str),
3944 );
3945 } else {
3946 options.extra_metadata.insert(
3947 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3948 embedding.len().to_string(),
3949 );
3950 }
3951 parent_embedding = Some(embedding);
3952 embedded = true;
3953 } else {
3954 let runtime =
3955 runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3956 let embed_text = search_text
3957 .clone()
3958 .or_else(|| payload_text.map(|text| text.to_string()))
3959 .unwrap_or_default();
3960 if embed_text.trim().is_empty() {
3961 warn!("no textual content available; embedding skipped");
3962 } else {
3963 info!(
3965 "parallel mode: checking for chunks on {} bytes payload",
3966 payload.len()
3967 );
3968 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3969 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3971
3972 #[cfg(feature = "temporal_enrich")]
3974 let enriched_chunks = if args.temporal_enrich {
3975 info!(
3976 "Temporal enrichment enabled, processing {} chunks",
3977 chunk_texts.len()
3978 );
3979 let today = chrono::Local::now().date_naive();
3981 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3982 let enriched: Vec<String> =
3984 results.iter().map(|(text, _)| text.clone()).collect();
3985 let resolved_count = results
3986 .iter()
3987 .filter(|(_, e)| {
3988 e.relative_phrases.iter().any(|p| p.resolved.is_some())
3989 })
3990 .count();
3991 info!(
3992 "Temporal enrichment: resolved {} chunks with temporal context",
3993 resolved_count
3994 );
3995 enriched
3996 } else {
3997 chunk_texts.clone()
3998 };
3999 #[cfg(not(feature = "temporal_enrich"))]
4000 let enriched_chunks = {
4001 if args.temporal_enrich {
4002 warn!(
4003 "Temporal enrichment requested but feature not compiled in"
4004 );
4005 }
4006 chunk_texts.clone()
4007 };
4008
4009 let embed_chunks = if args.contextual {
4011 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4012 let engine = if args.contextual_model.as_deref() == Some("local") {
4013 #[cfg(feature = "llama-cpp")]
4014 {
4015 let model_path = get_local_contextual_model_path()?;
4016 ContextualEngine::local(model_path)
4017 }
4018 #[cfg(not(feature = "llama-cpp"))]
4019 {
4020 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4021 }
4022 } else if let Some(model) = &args.contextual_model {
4023 ContextualEngine::openai_with_model(model)?
4024 } else {
4025 ContextualEngine::openai()?
4026 };
4027
4028 match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
4029 {
4030 Ok(contexts) => {
4031 info!("Generated {} contextual prefixes", contexts.len());
4032 apply_contextual_prefixes(
4033 &embed_text,
4034 &enriched_chunks,
4035 &contexts,
4036 )
4037 }
4038 Err(e) => {
4039 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4040 enriched_chunks.clone()
4041 }
4042 }
4043 } else {
4044 enriched_chunks
4045 };
4046
4047 let truncated_chunks: Vec<String> = embed_chunks
4048 .iter()
4049 .map(|chunk_text| {
4050 let truncated_chunk = truncate_for_embedding(chunk_text);
4052 if truncated_chunk.len() < chunk_text.len() {
4053 info!(
4054 "parallel mode: Truncated chunk from {} to {} chars for embedding",
4055 chunk_text.len(),
4056 truncated_chunk.len()
4057 );
4058 }
4059 truncated_chunk
4060 })
4061 .collect();
4062 let truncated_refs: Vec<&str> =
4063 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4064 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4065 let num_chunks = chunk_embeddings.len();
4066 chunk_embeddings_vec = Some(chunk_embeddings);
4067 let parent_text = truncate_for_embedding(&embed_text);
4069 if parent_text.len() < embed_text.len() {
4070 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
4071 }
4072 parent_embedding = Some(runtime.embed_passage(&parent_text)?);
4073 embedded = true;
4074 info!(
4075 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
4076 num_chunks
4077 );
4078 } else {
4079 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
4081 let truncated_text = truncate_for_embedding(&embed_text);
4082 if truncated_text.len() < embed_text.len() {
4083 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
4084 }
4085 parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
4086 embedded = true;
4087 }
4088 }
4089 }
4090 }
4091 if embedded {
4092 if let Some(runtime) = runtime {
4093 apply_embedding_identity_metadata(&mut options, runtime);
4094 }
4095 }
4096 let payload_variant = if let Some(path) = source_path {
4097 ParallelPayload::Path(path.to_path_buf())
4098 } else {
4099 ParallelPayload::Bytes(payload)
4100 };
4101 let input = ParallelInput {
4102 payload: payload_variant,
4103 options: options.clone(),
4104 embedding: parent_embedding,
4105 chunk_embeddings: chunk_embeddings_vec,
4106 };
4107 return Ok(IngestOutcome {
4108 report: PutReport {
4109 seq: 0,
4110 frame_id: 0, uri,
4112 title: inferred_title,
4113 original_bytes,
4114 stored_bytes,
4115 compressed,
4116 no_raw: effective_no_raw,
4117 source: source_path.map(Path::to_path_buf),
4118 mime: Some(analysis.mime),
4119 metadata: analysis.metadata,
4120 extraction: analysis.extraction,
4121 #[cfg(feature = "logic_mesh")]
4122 search_text: search_text.clone(),
4123 },
4124 embedded,
4125 parallel_input: Some(input),
4126 });
4127 }
4128 #[cfg(not(feature = "parallel_segments"))]
4129 {
4130 mem.put_bytes_with_options(&payload, options)
4131 .map_err(|err| {
4132 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4133 })?
4134 }
4135 } else if allow_embedding {
4136 if let Some(path) = args.embedding_vec.as_deref() {
4137 let embedding = crate::utils::read_embedding(path)?;
4138 if let Some(model_str) = args.embedding_vec_model.as_deref() {
4139 let choice = model_str.parse::<EmbeddingModelChoice>()?;
4140 let expected = choice.dimensions();
4141 if expected != 0 && embedding.len() != expected {
4142 anyhow::bail!(
4143 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4144 embedding.len(),
4145 choice.name(),
4146 expected
4147 );
4148 }
4149 apply_embedding_identity_metadata_from_choice(
4150 &mut options,
4151 choice,
4152 embedding.len(),
4153 Some(model_str),
4154 );
4155 } else {
4156 options.extra_metadata.insert(
4157 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4158 embedding.len().to_string(),
4159 );
4160 }
4161 embedded = true;
4162 mem.put_with_embedding_and_options(&payload, embedding, options)
4163 .map_err(|err| {
4164 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4165 })?
4166 } else {
4167 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4168 let embed_text = search_text
4169 .clone()
4170 .or_else(|| payload_text.map(|text| text.to_string()))
4171 .unwrap_or_default();
4172 if embed_text.trim().is_empty() {
4173 warn!("no textual content available; embedding skipped");
4174 mem.put_bytes_with_options(&payload, options)
4175 .map_err(|err| {
4176 map_put_error(
4177 err,
4178 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4179 )
4180 })?
4181 } else {
4182 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4184 info!(
4186 "Document will be chunked into {} chunks, generating embeddings for each",
4187 chunk_texts.len()
4188 );
4189
4190 #[cfg(feature = "temporal_enrich")]
4192 let enriched_chunks = if args.temporal_enrich {
4193 info!(
4194 "Temporal enrichment enabled, processing {} chunks",
4195 chunk_texts.len()
4196 );
4197 let today = chrono::Local::now().date_naive();
4199 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4200 let enriched: Vec<String> =
4202 results.iter().map(|(text, _)| text.clone()).collect();
4203 let resolved_count = results
4204 .iter()
4205 .filter(|(_, e)| {
4206 e.relative_phrases.iter().any(|p| p.resolved.is_some())
4207 })
4208 .count();
4209 info!(
4210 "Temporal enrichment: resolved {} chunks with temporal context",
4211 resolved_count
4212 );
4213 enriched
4214 } else {
4215 chunk_texts.clone()
4216 };
4217 #[cfg(not(feature = "temporal_enrich"))]
4218 let enriched_chunks = {
4219 if args.temporal_enrich {
4220 warn!("Temporal enrichment requested but feature not compiled in");
4221 }
4222 chunk_texts.clone()
4223 };
4224
4225 let embed_chunks = if args.contextual {
4227 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4228 let engine = if args.contextual_model.as_deref() == Some("local") {
4229 #[cfg(feature = "llama-cpp")]
4230 {
4231 let model_path = get_local_contextual_model_path()?;
4232 ContextualEngine::local(model_path)
4233 }
4234 #[cfg(not(feature = "llama-cpp"))]
4235 {
4236 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4237 }
4238 } else if let Some(model) = &args.contextual_model {
4239 ContextualEngine::openai_with_model(model)?
4240 } else {
4241 ContextualEngine::openai()?
4242 };
4243
4244 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4245 Ok(contexts) => {
4246 info!("Generated {} contextual prefixes", contexts.len());
4247 apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4248 }
4249 Err(e) => {
4250 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4251 enriched_chunks.clone()
4252 }
4253 }
4254 } else {
4255 enriched_chunks
4256 };
4257
4258 let truncated_chunks: Vec<String> = embed_chunks
4259 .iter()
4260 .map(|chunk_text| {
4261 let truncated_chunk = truncate_for_embedding(chunk_text);
4263 if truncated_chunk.len() < chunk_text.len() {
4264 info!(
4265 "Truncated chunk from {} to {} chars for embedding",
4266 chunk_text.len(),
4267 truncated_chunk.len()
4268 );
4269 }
4270 truncated_chunk
4271 })
4272 .collect();
4273 let truncated_refs: Vec<&str> =
4274 truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4275 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4276 let parent_text = truncate_for_embedding(&embed_text);
4278 if parent_text.len() < embed_text.len() {
4279 info!(
4280 "Truncated parent text from {} to {} chars for embedding",
4281 embed_text.len(),
4282 parent_text.len()
4283 );
4284 }
4285 let parent_embedding = runtime.embed_passage(&parent_text)?;
4286 embedded = true;
4287 apply_embedding_identity_metadata(&mut options, runtime);
4288 info!(
4289 "Calling put_with_chunk_embeddings with {} chunk embeddings",
4290 chunk_embeddings.len()
4291 );
4292 mem.put_with_chunk_embeddings(
4293 &payload,
4294 Some(parent_embedding),
4295 chunk_embeddings,
4296 options,
4297 )
4298 .map_err(|err| {
4299 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4300 })?
4301 } else {
4302 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4304 let truncated_text = truncate_for_embedding(&embed_text);
4305 if truncated_text.len() < embed_text.len() {
4306 info!(
4307 "Truncated text from {} to {} chars for embedding",
4308 embed_text.len(),
4309 truncated_text.len()
4310 );
4311 }
4312 let embedding = runtime.embed_passage(&truncated_text)?;
4313 embedded = true;
4314 apply_embedding_identity_metadata(&mut options, runtime);
4315 mem.put_with_embedding_and_options(&payload, embedding, options)
4316 .map_err(|err| {
4317 map_put_error(
4318 err,
4319 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4320 )
4321 })?
4322 }
4323 }
4324 }
4325 } else {
4326 mem.put_bytes_with_options(&payload, options)
4327 .map_err(|err| {
4328 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4329 })?
4330 }
4331 };
4332
4333 Ok(IngestOutcome {
4334 report: PutReport {
4335 seq,
4336 frame_id,
4337 uri,
4338 title: inferred_title,
4339 original_bytes,
4340 stored_bytes,
4341 compressed,
4342 no_raw: effective_no_raw,
4343 source: source_path.map(Path::to_path_buf),
4344 mime: Some(analysis.mime),
4345 metadata: analysis.metadata,
4346 extraction: analysis.extraction,
4347 #[cfg(feature = "logic_mesh")]
4348 search_text,
4349 },
4350 embedded,
4351 #[cfg(feature = "parallel_segments")]
4352 parallel_input: None,
4353 })
4354}
4355
4356fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4357 if std::str::from_utf8(payload).is_ok() {
4358 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4359 Ok((compressed.len(), true))
4360 } else {
4361 Ok((payload.len(), false))
4362 }
4363}
4364
4365fn print_report(report: &PutReport) {
4366 let name = report
4367 .source
4368 .as_ref()
4369 .map(|path| {
4370 let pretty = env::current_dir()
4371 .ok()
4372 .as_ref()
4373 .and_then(|cwd| diff_paths(path, cwd))
4374 .unwrap_or_else(|| path.to_path_buf());
4375 pretty.to_string_lossy().into_owned()
4376 })
4377 .unwrap_or_else(|| "stdin".to_string());
4378
4379 let ratio = if report.original_bytes > 0 {
4380 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4381 } else {
4382 100.0
4383 };
4384
4385 println!("• {name} → {}", report.uri);
4386 println!(" seq: {}", report.seq);
4387 if report.no_raw {
4388 println!(
4389 " size: {} B (--no-raw: text only, hash stored)",
4390 report.original_bytes
4391 );
4392 } else if report.compressed {
4393 println!(
4394 " size: {} B → {} B ({:.1}% of original, compressed)",
4395 report.original_bytes, report.stored_bytes, ratio
4396 );
4397 } else {
4398 println!(" size: {} B (stored as-is)", report.original_bytes);
4399 }
4400 if let Some(mime) = &report.mime {
4401 println!(" mime: {mime}");
4402 }
4403 if let Some(title) = &report.title {
4404 println!(" title: {title}");
4405 }
4406 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4407 println!(
4408 " extraction: status={} reader={}",
4409 report.extraction.status.label(),
4410 extraction_reader
4411 );
4412 if let Some(ms) = report.extraction.duration_ms {
4413 println!(" extraction-duration: {} ms", ms);
4414 }
4415 if let Some(pages) = report.extraction.pages_processed {
4416 println!(" extraction-pages: {}", pages);
4417 }
4418 for warning in &report.extraction.warnings {
4419 println!(" warning: {warning}");
4420 }
4421 if let Some(metadata) = &report.metadata {
4422 if let Some(caption) = &metadata.caption {
4423 println!(" caption: {caption}");
4424 }
4425 if let Some(audio) = metadata.audio.as_ref() {
4426 if audio.duration_secs.is_some()
4427 || audio.sample_rate_hz.is_some()
4428 || audio.channels.is_some()
4429 {
4430 let duration = audio
4431 .duration_secs
4432 .map(|secs| format!("{secs:.1}s"))
4433 .unwrap_or_else(|| "unknown".into());
4434 let rate = audio
4435 .sample_rate_hz
4436 .map(|hz| format!("{} Hz", hz))
4437 .unwrap_or_else(|| "? Hz".into());
4438 let channels = audio
4439 .channels
4440 .map(|ch| ch.to_string())
4441 .unwrap_or_else(|| "?".into());
4442 println!(" audio: duration {duration}, {channels} ch, {rate}");
4443 }
4444 }
4445 }
4446
4447 println!();
4448}
4449
4450fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4451 let source = report
4452 .source
4453 .as_ref()
4454 .map(|path| path.to_string_lossy().into_owned());
4455 let extraction = json!({
4456 "status": report.extraction.status.label(),
4457 "reader": report.extraction.reader,
4458 "duration_ms": report.extraction.duration_ms,
4459 "pages_processed": report.extraction.pages_processed,
4460 "warnings": report.extraction.warnings,
4461 });
4462 json!({
4463 "seq": report.seq,
4464 "uri": report.uri,
4465 "title": report.title,
4466 "original_bytes": report.original_bytes,
4467 "original_bytes_human": format_bytes(report.original_bytes as u64),
4468 "stored_bytes": report.stored_bytes,
4469 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4470 "compressed": report.compressed,
4471 "mime": report.mime,
4472 "source": source,
4473 "metadata": report.metadata,
4474 "extraction": extraction,
4475 })
4476}
4477
4478fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4479 match err {
4480 MemvidError::CapacityExceeded {
4481 current,
4482 limit,
4483 required,
4484 } => anyhow!(CapacityExceededMessage {
4485 current,
4486 limit,
4487 required,
4488 }),
4489 other => other.into(),
4490 }
4491}
4492
4493fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4494 for (idx, entry) in entries.iter().enumerate() {
4495 match serde_json::from_str::<DocMetadata>(entry) {
4496 Ok(meta) => {
4497 options.metadata = Some(meta);
4498 }
4499 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4500 Ok(value) => {
4501 options
4502 .extra_metadata
4503 .insert(format!("custom_metadata_{idx}"), value.to_string());
4504 }
4505 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4506 },
4507 }
4508 }
4509}
4510
4511fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4512 let stats = mem.stats()?;
4513 let message = match stats.tier {
4514 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4515 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4516 };
4517 Ok(message)
4518}
4519
4520fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4521 let trimmed = raw.trim().trim_start_matches("mv2://");
4522 let normalized = trimmed.replace('\\', "/");
4523 let mut segments: Vec<String> = Vec::new();
4524 for segment in normalized.split('/') {
4525 if segment.is_empty() || segment == "." {
4526 continue;
4527 }
4528 if segment == ".." {
4529 segments.pop();
4530 continue;
4531 }
4532 segments.push(segment.to_string());
4533 }
4534
4535 if segments.is_empty() {
4536 return normalized
4537 .split('/')
4538 .last()
4539 .filter(|s| !s.is_empty())
4540 .unwrap_or("document")
4541 .to_string();
4542 }
4543
4544 if keep_path {
4545 segments.join("/")
4546 } else {
4547 segments
4548 .last()
4549 .cloned()
4550 .unwrap_or_else(|| "document".to_string())
4551 }
4552}
4553
4554fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4555 if quiet {
4556 let pb = ProgressBar::hidden();
4557 pb.set_message(message.to_string());
4558 return pb;
4559 }
4560
4561 match total {
4562 Some(len) => {
4563 let pb = ProgressBar::new(len);
4564 pb.set_draw_target(ProgressDrawTarget::stderr());
4565 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4566 .unwrap_or_else(|_| ProgressStyle::default_bar());
4567 pb.set_style(style);
4568 pb.set_message(message.to_string());
4569 pb
4570 }
4571 None => {
4572 let pb = ProgressBar::new_spinner();
4573 pb.set_draw_target(ProgressDrawTarget::stderr());
4574 pb.set_message(message.to_string());
4575 pb.enable_steady_tick(Duration::from_millis(120));
4576 pb
4577 }
4578 }
4579}
4580
4581fn create_spinner(message: &str) -> ProgressBar {
4582 let pb = ProgressBar::new_spinner();
4583 pb.set_draw_target(ProgressDrawTarget::stderr());
4584 let style = ProgressStyle::with_template("{spinner} {msg}")
4585 .unwrap_or_else(|_| ProgressStyle::default_spinner())
4586 .tick_strings(&["-", "\\", "|", "/"]);
4587 pb.set_style(style);
4588 pb.set_message(message.to_string());
4589 pb.enable_steady_tick(Duration::from_millis(120));
4590 pb
4591}
4592
4593#[cfg(feature = "parallel_segments")]
4599#[derive(Args)]
4600pub struct PutManyArgs {
4601 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4603 pub file: PathBuf,
4604 #[arg(long)]
4606 pub json: bool,
4607 #[arg(long, value_name = "PATH")]
4610 pub input: Option<PathBuf>,
4611 #[arg(long, default_value = "3")]
4613 pub compression_level: i32,
4614 #[arg(long)]
4617 pub raw: bool,
4618 #[arg(long, hide = true)]
4621 pub no_raw: bool,
4622 #[arg(long)]
4625 pub dedup: bool,
4626 #[command(flatten)]
4627 pub lock: LockCliArgs,
4628}
4629
4630#[cfg(feature = "parallel_segments")]
4632#[derive(Debug, serde::Deserialize)]
4633pub struct PutManyRequest {
4634 pub title: String,
4635 #[serde(default)]
4636 pub label: String,
4637 pub text: String,
4638 #[serde(default)]
4639 pub uri: Option<String>,
4640 #[serde(default)]
4641 pub tags: Vec<String>,
4642 #[serde(default)]
4643 pub labels: Vec<String>,
4644 #[serde(default)]
4645 pub metadata: serde_json::Value,
4646 #[serde(default)]
4649 pub extra_metadata: BTreeMap<String, String>,
4650 #[serde(default)]
4652 pub embedding: Option<Vec<f32>>,
4653 #[serde(default)]
4657 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4658 #[serde(default)]
4661 pub no_raw: Option<bool>,
4662 #[serde(default)]
4665 pub dedup: Option<bool>,
4666}
4667
4668#[cfg(feature = "parallel_segments")]
4670#[derive(Debug, serde::Deserialize)]
4671#[serde(transparent)]
4672pub struct PutManyBatch {
4673 pub requests: Vec<PutManyRequest>,
4674}
4675
4676#[cfg(feature = "parallel_segments")]
4677pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4678 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4679 use std::io::Read;
4680
4681 crate::utils::require_active_plan(config, "put-many")?;
4683
4684 let mut mem = Memvid::open(&args.file)?;
4685 crate::utils::ensure_cli_mutation_allowed(&mem)?;
4686 crate::utils::apply_lock_cli(&mut mem, &args.lock);
4687
4688 let stats = mem.stats()?;
4690 if !stats.has_lex_index {
4691 mem.enable_lex()?;
4692 }
4693 if !stats.has_vec_index {
4694 mem.enable_vec()?;
4695 }
4696
4697 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4699
4700 let batch_json: String = if let Some(input_path) = &args.input {
4702 fs::read_to_string(input_path)
4703 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4704 } else {
4705 let mut buffer = String::new();
4706 io::stdin()
4707 .read_to_string(&mut buffer)
4708 .context("Failed to read from stdin")?;
4709 buffer
4710 };
4711
4712 let batch: PutManyBatch =
4713 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4714
4715 if batch.requests.is_empty() {
4716 if args.json {
4717 println!("{{\"frame_ids\": [], \"count\": 0}}");
4718 } else {
4719 println!("No requests to process");
4720 }
4721 return Ok(());
4722 }
4723
4724 let count = batch.requests.len();
4725 if !args.json {
4726 eprintln!("Processing {} documents...", count);
4727 }
4728
4729 let global_no_raw = !args.raw; let global_dedup = args.dedup;
4734 let inputs: Vec<ParallelInput> = batch
4735 .requests
4736 .into_iter()
4737 .enumerate()
4738 .map(|(i, req)| {
4739 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4740 let mut options = PutOptions::default();
4741 options.title = Some(req.title.clone());
4742 options.uri = Some(uri);
4743 options.tags = req.tags;
4744 options.labels = req.labels;
4745 options.extra_metadata = req.extra_metadata;
4746 options.no_raw = req.no_raw.unwrap_or(global_no_raw);
4748 options.dedup = req.dedup.unwrap_or(global_dedup);
4750 if !req.metadata.is_null() {
4751 match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4752 Ok(meta) => options.metadata = Some(meta),
4753 Err(_) => {
4754 options
4755 .extra_metadata
4756 .entry("memvid.metadata.json".to_string())
4757 .or_insert_with(|| req.metadata.to_string());
4758 }
4759 }
4760 }
4761
4762 if let Some(embedding) = req
4763 .embedding
4764 .as_ref()
4765 .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4766 {
4767 options
4768 .extra_metadata
4769 .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4770 .or_insert_with(|| embedding.len().to_string());
4771 }
4772
4773 ParallelInput {
4774 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4775 options,
4776 embedding: req.embedding,
4777 chunk_embeddings: req.chunk_embeddings,
4778 }
4779 })
4780 .collect();
4781
4782 let build_opts = BuildOpts {
4784 zstd_level: args.compression_level.clamp(1, 9),
4785 ..BuildOpts::default()
4786 };
4787
4788 let frame_ids = mem
4790 .put_parallel_inputs(&inputs, build_opts)
4791 .context("Failed to ingest batch")?;
4792
4793 if args.json {
4794 let output = serde_json::json!({
4795 "frame_ids": frame_ids,
4796 "count": frame_ids.len()
4797 });
4798 println!("{}", serde_json::to_string(&output)?);
4799 } else {
4800 println!("Ingested {} documents", frame_ids.len());
4801 if frame_ids.len() <= 10 {
4802 for fid in &frame_ids {
4803 println!(" frame_id: {}", fid);
4804 }
4805 } else {
4806 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4807 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4808 }
4809 }
4810
4811 Ok(())
4812}
4813
4814#[derive(Args)]
4816pub struct CorrectArgs {
4817 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4819 pub file: PathBuf,
4820 #[arg(value_name = "STATEMENT")]
4822 pub statement: String,
4823 #[arg(long, value_name = "TOPICS", value_delimiter = ',')]
4825 pub topics: Option<Vec<String>>,
4826 #[arg(long, value_name = "SOURCE")]
4828 pub source: Option<String>,
4829 #[arg(long, default_value = "2.0")]
4831 pub boost: f64,
4832 #[arg(long)]
4834 pub json: bool,
4835 #[command(flatten)]
4836 pub lock: LockCliArgs,
4837}
4838
4839pub fn handle_correct(config: &CliConfig, args: CorrectArgs) -> Result<()> {
4845 crate::utils::require_active_plan(config, "correct")?;
4846
4847 let mut mem = Memvid::open(&args.file)?;
4848 ensure_cli_mutation_allowed(&mem)?;
4849 apply_lock_cli(&mut mem, &args.lock);
4850
4851 let stats = mem.stats()?;
4852 if stats.frame_count == 0 && !stats.has_lex_index {
4853 mem.enable_lex()?;
4854 }
4855
4856 let mut extra_metadata = BTreeMap::new();
4858 extra_metadata.insert("memvid.correction".to_string(), "true".to_string());
4859 extra_metadata.insert("memvid.correction.boost".to_string(), args.boost.to_string());
4860 if let Some(source) = &args.source {
4861 extra_metadata.insert("memvid.correction.source".to_string(), source.clone());
4862 }
4863
4864 let tags: Vec<String> = args.topics.clone().unwrap_or_default();
4866
4867 let mut options = PutOptions::default();
4869 options.title = Some(format!("Correction: {}", truncate_title(&args.statement, 50)));
4870 options.uri = Some(format!("mv2://correction/{}", Uuid::new_v4()));
4871 options.labels = vec!["correction".to_string()];
4872 options.tags = tags;
4873 options.extra_metadata = extra_metadata;
4874 options.no_raw = true; let frame_id = mem.put_bytes_with_options(args.statement.as_bytes(), options)?;
4878
4879 if args.json {
4880 let output = json!({
4881 "frame_id": frame_id,
4882 "type": "correction",
4883 "boost": args.boost,
4884 "topics": args.topics.unwrap_or_default(),
4885 "source": args.source,
4886 });
4887 println!("{}", serde_json::to_string_pretty(&output)?);
4888 } else {
4889 println!("✓ Correction stored (frame_id: {})", frame_id);
4890 if let Some(topics) = &args.topics {
4891 println!(" Topics: {}", topics.join(", "));
4892 }
4893 if let Some(source) = &args.source {
4894 println!(" Source: {}", source);
4895 }
4896 println!(" Boost: {}x", args.boost);
4897 }
4898
4899 Ok(())
4900}
4901
4902fn truncate_title(s: &str, max_len: usize) -> String {
4904 if s.len() <= max_len {
4905 s.to_string()
4906 } else {
4907 format!("{}...", &s[..max_len.saturating_sub(3)])
4908 }
4909}