1use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30#[cfg(feature = "clip")]
31use memvid_core::FrameRole;
32use memvid_core::{
33 normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
34 DocMetadata, DocumentFormat, EnrichmentEngine, MediaManifest, Memvid, MemvidError, PutOptions,
35 ReaderHint, ReaderRegistry, RulesEngine, Stats, Tier, TimelineQueryBuilder,
36 MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
37};
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50
51fn parse_timestamp(s: &str) -> Result<i64, String> {
58 let s = s.trim();
59
60 if s.chars().all(|c| c.is_ascii_digit())
62 || (s.starts_with('-') && s[1..].chars().all(|c| c.is_ascii_digit()))
63 {
64 return s
65 .parse::<i64>()
66 .map_err(|e| format!("Invalid epoch timestamp: {e}"));
67 }
68
69 use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
71
72 fn date_to_epoch(date: NaiveDate) -> i64 {
74 let datetime = NaiveDateTime::new(date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
75 Utc.from_utc_datetime(&datetime).timestamp()
76 }
77
78 if let Ok(date) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
80 return Ok(date_to_epoch(date));
81 }
82
83 if let Ok(date) = NaiveDate::parse_from_str(s, "%b %d, %Y") {
85 return Ok(date_to_epoch(date));
86 }
87 if let Ok(date) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
88 return Ok(date_to_epoch(date));
89 }
90
91 if let Ok(date) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
93 return Ok(date_to_epoch(date));
94 }
95
96 if let Ok(date) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
98 return Ok(date_to_epoch(date));
99 }
100 if let Ok(date) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
101 return Ok(date_to_epoch(date));
102 }
103
104 Err(format!(
105 "Invalid timestamp format: '{}'. Use epoch seconds (1673740800) or date format (Jan 15, 2023 / 2023-01-15 / 01/15/2023)",
106 s
107 ))
108}
109const COLOR_PALETTE_SIZE: u8 = 5;
110const COLOR_PALETTE_QUALITY: u8 = 8;
111const MAGIC_SNIFF_BYTES: usize = 16;
112#[cfg(feature = "clip")]
115const CLIP_PDF_MAX_IMAGES: usize = 100;
116#[cfg(feature = "clip")]
117const CLIP_PDF_TARGET_PX: u32 = 768;
118
119fn truncate_for_embedding(text: &str) -> String {
122 if text.len() <= MAX_EMBEDDING_TEXT_LEN {
123 text.to_string()
124 } else {
125 let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
127 let end = truncated
129 .char_indices()
130 .rev()
131 .next()
132 .map(|(i, c)| i + c.len_utf8())
133 .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
134 text[..end].to_string()
135 }
136}
137
138fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
139 options.extra_metadata.insert(
140 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
141 runtime.provider_kind().to_string(),
142 );
143 options.extra_metadata.insert(
144 MEMVID_EMBEDDING_MODEL_KEY.to_string(),
145 runtime.provider_model_id(),
146 );
147 options.extra_metadata.insert(
148 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
149 runtime.dimension().to_string(),
150 );
151}
152
153fn apply_embedding_identity_metadata_from_choice(
154 options: &mut PutOptions,
155 model: EmbeddingModelChoice,
156 dimension: usize,
157 model_id_override: Option<&str>,
158) {
159 let provider = match model {
160 EmbeddingModelChoice::OpenAILarge
161 | EmbeddingModelChoice::OpenAISmall
162 | EmbeddingModelChoice::OpenAIAda => "openai",
163 EmbeddingModelChoice::Nvidia => "nvidia",
164 _ => "fastembed",
165 };
166
167 let model_id = match model {
168 EmbeddingModelChoice::Nvidia => model_id_override
169 .map(str::trim)
170 .filter(|value| !value.is_empty())
171 .map(|value| {
172 value
173 .trim_start_matches("nvidia:")
174 .trim_start_matches("nv:")
175 })
176 .filter(|value| !value.is_empty())
177 .map(|value| {
178 if value.contains('/') {
179 value.to_string()
180 } else {
181 format!("nvidia/{value}")
182 }
183 })
184 .unwrap_or_else(|| model.canonical_model_id().to_string()),
185 _ => model.canonical_model_id().to_string(),
186 };
187
188 options.extra_metadata.insert(
189 MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
190 provider.to_string(),
191 );
192 options
193 .extra_metadata
194 .insert(MEMVID_EMBEDDING_MODEL_KEY.to_string(), model_id);
195 options.extra_metadata.insert(
196 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
197 dimension.to_string(),
198 );
199}
200
201#[cfg(feature = "llama-cpp")]
204fn get_local_contextual_model_path() -> Result<PathBuf> {
205 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
206 let expanded = if custom.starts_with("~/") {
208 if let Ok(home) = env::var("HOME") {
209 PathBuf::from(home).join(&custom[2..])
210 } else {
211 PathBuf::from(&custom)
212 }
213 } else {
214 PathBuf::from(&custom)
215 };
216 expanded
217 } else {
218 let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
220 PathBuf::from(home).join(".memvid").join("models")
221 };
222
223 let model_path = models_dir
224 .join("llm")
225 .join("phi-3-mini-q4")
226 .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
227
228 if model_path.exists() {
229 Ok(model_path)
230 } else {
231 Err(anyhow!(
232 "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
233 model_path.display()
234 ))
235 }
236}
237
238use pathdiff::diff_paths;
239
240use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
241use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
242use crate::config::{
243 load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime,
244};
245use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
246use crate::error::{CapacityExceededMessage, DuplicateUriError};
247use crate::utils::{
248 apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
249 select_frame,
250};
251#[cfg(feature = "temporal_enrich")]
252use memvid_core::enrich_chunks as temporal_enrich_chunks;
253
254fn parse_bool_flag(value: &str) -> Option<bool> {
256 match value.trim().to_ascii_lowercase().as_str() {
257 "1" | "true" | "yes" | "on" => Some(true),
258 "0" | "false" | "no" | "off" => Some(false),
259 _ => None,
260 }
261}
262
263#[cfg(feature = "parallel_segments")]
265fn parallel_env_override() -> Option<bool> {
266 std::env::var("MEMVID_PARALLEL_SEGMENTS")
267 .ok()
268 .and_then(|value| parse_bool_flag(&value))
269}
270
271#[cfg(feature = "clip")]
273#[allow(dead_code)]
274fn is_clip_model_installed() -> bool {
275 let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
276 PathBuf::from(custom)
277 } else if let Ok(home) = env::var("HOME") {
278 PathBuf::from(home).join(".memvid/models")
279 } else {
280 PathBuf::from(".memvid/models")
281 };
282
283 let model_name = env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
284
285 let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
286 let text_model = models_dir.join(format!("{}_text.onnx", model_name));
287
288 vision_model.exists() && text_model.exists()
289}
290
291#[cfg(feature = "logic_mesh")]
294const NER_MIN_CONFIDENCE: f32 = 0.50;
295
296#[cfg(feature = "logic_mesh")]
298const NER_MIN_ENTITY_LEN: usize = 2;
299
300#[cfg(feature = "logic_mesh")]
303const MAX_MERGE_GAP: usize = 3;
304
305#[cfg(feature = "logic_mesh")]
312fn merge_adjacent_entities(
313 entities: Vec<memvid_core::ExtractedEntity>,
314 original_text: &str,
315) -> Vec<memvid_core::ExtractedEntity> {
316 use memvid_core::ExtractedEntity;
317
318 if entities.is_empty() {
319 return entities;
320 }
321
322 let mut sorted: Vec<ExtractedEntity> = entities;
324 sorted.sort_by_key(|e| e.byte_start);
325
326 let mut merged: Vec<ExtractedEntity> = Vec::new();
327
328 for entity in sorted {
329 if let Some(last) = merged.last_mut() {
330 let gap = entity.byte_start.saturating_sub(last.byte_end);
332 let same_type = last.entity_type == entity.entity_type;
333
334 let gap_is_mergeable = if gap == 0 {
336 true
337 } else if gap <= MAX_MERGE_GAP
338 && entity.byte_start <= original_text.len()
339 && last.byte_end <= original_text.len()
340 {
341 let gap_text = &original_text[last.byte_end..entity.byte_start];
343 if gap_text.contains('\n') || gap_text.contains('\r') {
345 false
346 } else {
347 gap_text.chars().all(|c| {
348 c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.'
349 })
350 }
351 } else {
352 false
353 };
354
355 if same_type && gap_is_mergeable {
356 let merged_text = if entity.byte_end <= original_text.len() {
358 original_text[last.byte_start..entity.byte_end].to_string()
359 } else {
360 format!("{}{}", last.text, entity.text)
361 };
362
363 tracing::debug!(
364 "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
365 last.text,
366 entity.text,
367 merged_text,
368 gap
369 );
370
371 last.text = merged_text;
372 last.byte_end = entity.byte_end;
373 last.confidence = (last.confidence + entity.confidence) / 2.0;
375 continue;
376 }
377 }
378
379 merged.push(entity);
380 }
381
382 merged
383}
384
385#[cfg(feature = "logic_mesh")]
388fn is_valid_entity(text: &str, confidence: f32) -> bool {
389 if confidence < NER_MIN_CONFIDENCE {
391 return false;
392 }
393
394 let trimmed = text.trim();
395
396 if trimmed.len() < NER_MIN_ENTITY_LEN {
398 return false;
399 }
400
401 if trimmed
403 .chars()
404 .all(|c| c.is_whitespace() || c.is_ascii_punctuation())
405 {
406 return false;
407 }
408
409 if trimmed.contains('\n') || trimmed.contains('\r') {
411 return false;
412 }
413
414 if trimmed.starts_with("##") || trimmed.ends_with("##") {
416 return false;
417 }
418
419 if trimmed.ends_with('.') {
421 return false;
422 }
423
424 let lower = trimmed.to_lowercase();
425
426 if trimmed.len() == 1 {
428 return false;
429 }
430
431 if trimmed.len() == 2 {
433 if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
435 return true;
436 }
437 return false;
439 }
440
441 if matches!(
443 lower.as_str(),
444 "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc"
445 ) {
446 return false;
447 }
448
449 let words: Vec<&str> = trimmed.split_whitespace().collect();
452 if words.len() >= 2 {
453 let first_word = words[0];
454 if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
456 if !matches!(
458 first_word.to_lowercase().as_str(),
459 "us" | "uk" | "eu" | "un"
460 ) {
461 return false;
462 }
463 }
464 }
465
466 if let Some(first_char) = trimmed.chars().next() {
468 if first_char.is_lowercase() {
469 return false;
470 }
471 }
472
473 true
474}
475
476pub fn parse_key_val(s: &str) -> Result<(String, String)> {
478 let (key, value) = s
479 .split_once('=')
480 .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
481 Ok((key.to_string(), value.to_string()))
482}
483
484#[derive(Args, Clone, Copy, Debug)]
486pub struct LockCliArgs {
487 #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
489 pub lock_timeout: u64,
490 #[arg(long = "force", action = ArgAction::SetTrue)]
492 pub force: bool,
493}
494
495#[derive(Args)]
497pub struct PutArgs {
498 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
500 pub file: PathBuf,
501 #[arg(long)]
503 pub json: bool,
504 #[arg(long, value_name = "PATH")]
506 pub input: Option<String>,
507 #[arg(long, value_name = "URI")]
509 pub uri: Option<String>,
510 #[arg(long, value_name = "TITLE")]
512 pub title: Option<String>,
513 #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
517 pub timestamp: Option<i64>,
518 #[arg(long)]
520 pub track: Option<String>,
521 #[arg(long)]
523 pub kind: Option<String>,
524 #[arg(long, action = ArgAction::SetTrue)]
526 pub video: bool,
527 #[arg(long, action = ArgAction::SetTrue)]
529 pub transcript: bool,
530 #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
532 pub tags: Vec<(String, String)>,
533 #[arg(long = "label", value_name = "LABEL")]
535 pub labels: Vec<String>,
536 #[arg(long = "metadata", value_name = "JSON")]
538 pub metadata: Vec<String>,
539 #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
541 pub no_auto_tag: bool,
542 #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
544 pub no_extract_dates: bool,
545 #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
547 pub no_extract_triplets: bool,
548 #[arg(long, action = ArgAction::SetTrue)]
550 pub audio: bool,
551 #[arg(long, action = ArgAction::SetTrue)]
554 pub transcribe: bool,
555 #[arg(long = "audio-segment-seconds", value_name = "SECS")]
557 pub audio_segment_seconds: Option<u32>,
558 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
561 pub embedding: bool,
562 #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
564 pub no_embedding: bool,
565 #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
568 pub embedding_vec: Option<PathBuf>,
569 #[arg(
577 long = "embedding-vec-model",
578 value_name = "EMB_MODEL",
579 requires = "embedding_vec"
580 )]
581 pub embedding_vec_model: Option<String>,
582 #[arg(long, action = ArgAction::SetTrue)]
586 pub vector_compression: bool,
587 #[arg(long, action = ArgAction::SetTrue)]
591 pub contextual: bool,
592 #[arg(long = "contextual-model", value_name = "MODEL")]
594 pub contextual_model: Option<String>,
595 #[arg(long, action = ArgAction::SetTrue)]
598 pub tables: bool,
599 #[arg(long = "no-tables", action = ArgAction::SetTrue)]
601 pub no_tables: bool,
602 #[arg(long = "extraction-budget-ms", value_name = "MS")]
606 pub extraction_budget_ms: Option<u64>,
607 #[cfg(feature = "clip")]
611 #[arg(long, action = ArgAction::SetTrue)]
612 pub clip: bool,
613
614 #[cfg(feature = "clip")]
616 #[arg(long, action = ArgAction::SetTrue, hide = true)]
617 pub no_clip: bool,
618
619 #[arg(long, conflicts_with = "allow_duplicate")]
621 pub update_existing: bool,
622 #[arg(long)]
624 pub allow_duplicate: bool,
625
626 #[arg(long, action = ArgAction::SetTrue)]
630 pub temporal_enrich: bool,
631
632 #[arg(long = "memory-id", value_name = "ID")]
635 pub memory_id: Option<crate::commands::tickets::MemoryId>,
636
637 #[arg(long, action = ArgAction::SetTrue)]
642 pub logic_mesh: bool,
643
644 #[cfg(feature = "parallel_segments")]
646 #[arg(long, action = ArgAction::SetTrue)]
647 pub parallel_segments: bool,
648 #[cfg(feature = "parallel_segments")]
650 #[arg(long, action = ArgAction::SetTrue)]
651 pub no_parallel_segments: bool,
652 #[cfg(feature = "parallel_segments")]
654 #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
655 pub parallel_segment_tokens: Option<usize>,
656 #[cfg(feature = "parallel_segments")]
658 #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
659 pub parallel_segment_pages: Option<usize>,
660 #[cfg(feature = "parallel_segments")]
662 #[arg(long = "parallel-threads", value_name = "N")]
663 pub parallel_threads: Option<usize>,
664 #[cfg(feature = "parallel_segments")]
666 #[arg(long = "parallel-queue-depth", value_name = "N")]
667 pub parallel_queue_depth: Option<usize>,
668
669 #[arg(long = "raw", action = ArgAction::SetTrue)]
673 pub raw: bool,
674
675 #[arg(long = "no-raw", action = ArgAction::SetTrue, hide = true)]
679 pub no_raw: bool,
680
681 #[arg(long, action = ArgAction::SetTrue)]
684 pub dedup: bool,
685
686 #[arg(long, action = ArgAction::SetTrue, default_value_t = true)]
689 pub enrich: bool,
690
691 #[arg(long = "no-enrich", action = ArgAction::SetTrue)]
693 pub no_enrich: bool,
694
695 #[command(flatten)]
696 pub lock: LockCliArgs,
697}
698
699#[cfg(feature = "parallel_segments")]
700impl PutArgs {
701 pub fn wants_parallel(&self) -> bool {
702 if self.parallel_segments {
703 return true;
704 }
705 if self.no_parallel_segments {
706 return false;
707 }
708 if let Some(env_flag) = parallel_env_override() {
709 return env_flag;
710 }
711 false
712 }
713
714 pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
715 let mut opts = memvid_core::BuildOpts::default();
716 if let Some(tokens) = self.parallel_segment_tokens {
717 opts.segment_tokens = tokens;
718 }
719 if let Some(pages) = self.parallel_segment_pages {
720 opts.segment_pages = pages;
721 }
722 if let Some(threads) = self.parallel_threads {
723 opts.threads = threads;
724 }
725 if let Some(depth) = self.parallel_queue_depth {
726 opts.queue_depth = depth;
727 }
728 opts.sanitize();
729 opts
730 }
731}
732
733#[cfg(not(feature = "parallel_segments"))]
734impl PutArgs {
735 pub fn wants_parallel(&self) -> bool {
736 false
737 }
738}
739
740#[derive(Args)]
742pub struct ApiFetchArgs {
743 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
745 pub file: PathBuf,
746 #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
748 pub config: PathBuf,
749 #[arg(long, action = ArgAction::SetTrue)]
751 pub dry_run: bool,
752 #[arg(long, value_name = "MODE")]
754 pub mode: Option<ApiFetchMode>,
755 #[arg(long, value_name = "URI")]
757 pub uri: Option<String>,
758 #[arg(long, action = ArgAction::SetTrue)]
760 pub json: bool,
761
762 #[command(flatten)]
763 pub lock: LockCliArgs,
764}
765
766#[derive(Args)]
768pub struct UpdateArgs {
769 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
770 pub file: PathBuf,
771 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
772 pub frame_id: Option<u64>,
773 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
774 pub uri: Option<String>,
775 #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
776 pub input: Option<PathBuf>,
777 #[arg(long = "set-uri", value_name = "URI")]
778 pub set_uri: Option<String>,
779 #[arg(long, value_name = "TITLE")]
780 pub title: Option<String>,
781 #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
783 pub timestamp: Option<i64>,
784 #[arg(long, value_name = "TRACK")]
785 pub track: Option<String>,
786 #[arg(long, value_name = "KIND")]
787 pub kind: Option<String>,
788 #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
789 pub tags: Vec<(String, String)>,
790 #[arg(long = "label", value_name = "LABEL")]
791 pub labels: Vec<String>,
792 #[arg(long = "metadata", value_name = "JSON")]
793 pub metadata: Vec<String>,
794 #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
795 pub embeddings: bool,
796 #[arg(long)]
797 pub json: bool,
798
799 #[command(flatten)]
800 pub lock: LockCliArgs,
801}
802
803#[derive(Args)]
805pub struct DeleteArgs {
806 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
807 pub file: PathBuf,
808 #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
809 pub frame_id: Option<u64>,
810 #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
811 pub uri: Option<String>,
812 #[arg(long, action = ArgAction::SetTrue)]
813 pub yes: bool,
814 #[arg(long)]
815 pub json: bool,
816
817 #[command(flatten)]
818 pub lock: LockCliArgs,
819}
820
821pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
823 let command = ApiFetchCommand {
824 file: args.file,
825 config_path: args.config,
826 dry_run: args.dry_run,
827 mode_override: args.mode,
828 uri_override: args.uri,
829 output_json: args.json,
830 lock_timeout_ms: args.lock.lock_timeout,
831 force_lock: args.lock.force,
832 };
833 run_api_fetch(config, command)
834}
835
836pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
838 let mut mem = Memvid::open(&args.file)?;
839 ensure_cli_mutation_allowed(&mem)?;
840 apply_lock_cli(&mut mem, &args.lock);
841 let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
842
843 if !args.yes {
844 if let Some(uri) = &frame.uri {
845 println!("About to delete frame {} ({uri})", frame.id);
846 } else {
847 println!("About to delete frame {}", frame.id);
848 }
849 print!("Confirm? [y/N]: ");
850 io::stdout().flush()?;
851 let mut input = String::new();
852 io::stdin().read_line(&mut input)?;
853 let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
854 if !confirmed {
855 println!("Aborted");
856 return Ok(());
857 }
858 }
859
860 let seq = mem.delete_frame(frame.id)?;
861 mem.commit()?;
862 let deleted = mem.frame_by_id(frame.id)?;
863
864 if args.json {
865 let json = json!({
866 "frame_id": frame.id,
867 "sequence": seq,
868 "status": frame_status_str(deleted.status),
869 });
870 println!("{}", serde_json::to_string_pretty(&json)?);
871 } else {
872 println!("Deleted frame {} (seq {})", frame.id, seq);
873 }
874
875 Ok(())
876}
877pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
878 crate::utils::require_active_plan(config, "put")?;
880
881 let mut mem = Memvid::open(&args.file)?;
882 ensure_cli_mutation_allowed(&mem)?;
883 apply_lock_cli(&mut mem, &args.lock);
884
885 if let Some(memory_id) = &args.memory_id {
887 let needs_binding = match mem.get_memory_binding() {
888 Some(binding) => binding.memory_id != **memory_id,
889 None => true,
890 };
891 if needs_binding {
892 match crate::commands::creation::bind_to_dashboard_memory(
893 config, &mut mem, &args.file, memory_id,
894 ) {
895 Ok((bound_id, capacity)) => {
896 eprintln!(
897 "✓ Bound to dashboard memory: {} (capacity: {})",
898 bound_id,
899 crate::utils::format_bytes(capacity)
900 );
901 }
902 Err(e) => {
903 eprintln!("⚠️ Failed to bind to dashboard memory: {}", e);
904 eprintln!(" Continuing with existing capacity. Bind manually with:");
905 eprintln!(
906 " memvid tickets sync {} --memory-id {}",
907 args.file.display(),
908 memory_id
909 );
910 }
911 }
912 }
913 }
914
915 #[cfg(feature = "replay")]
917 let _ = mem.load_active_session();
918 let mut stats = mem.stats()?;
919 if stats.frame_count == 0 && !stats.has_lex_index {
920 mem.enable_lex()?;
921 stats = mem.stats()?;
922 }
923
924 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
927
928 if args.vector_compression {
930 mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
931 }
932
933 let mut capacity_guard = CapacityGuard::from_stats(&stats);
934 let use_parallel = args.wants_parallel();
935 #[cfg(feature = "parallel_segments")]
936 let mut parallel_opts = if use_parallel {
937 Some(args.sanitized_parallel_opts())
938 } else {
939 None
940 };
941 #[cfg(feature = "parallel_segments")]
942 let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
943 #[cfg(feature = "parallel_segments")]
944 let mut pending_parallel_indices: Vec<usize> = Vec::new();
945 let input_set = resolve_inputs(args.input.as_deref())?;
946 if args.embedding && args.embedding_vec.is_some() {
947 anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
948 }
949 if args.embedding_vec.is_some() {
950 match &input_set {
951 InputSet::Files(paths) if paths.len() != 1 => {
952 anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
953 }
954 _ => {}
955 }
956 }
957
958 if args.video {
959 if let Some(kind) = &args.kind {
960 if !kind.eq_ignore_ascii_case("video") {
961 anyhow::bail!("--video conflicts with explicit --kind={kind}");
962 }
963 }
964 if matches!(input_set, InputSet::Stdin) {
965 anyhow::bail!("--video requires --input <file>");
966 }
967 }
968
969 #[allow(dead_code)]
970 enum EmbeddingMode {
971 None,
972 Auto,
973 Explicit,
974 }
975
976 let mv2_dimension = mem.effective_vec_index_dimension()?;
980 let inferred_embedding_model = if args.embedding {
981 match mem.embedding_identity_summary(10_000) {
982 memvid_core::EmbeddingIdentitySummary::Single(identity) => {
983 identity.model.map(String::from)
984 }
985 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
986 let models: Vec<_> = identities
987 .iter()
988 .filter_map(|entry| entry.identity.model.as_deref())
989 .collect();
990 anyhow::bail!(
991 "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
992 Detected models: {:?}\n\n\
993 Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
994 models
995 );
996 }
997 memvid_core::EmbeddingIdentitySummary::Unknown => None,
998 }
999 } else {
1000 None
1001 };
1002 let (embedding_mode, embedding_runtime) = if args.embedding {
1003 (
1004 EmbeddingMode::Explicit,
1005 Some(load_embedding_runtime_for_mv2(
1006 config,
1007 inferred_embedding_model.as_deref(),
1008 mv2_dimension,
1009 )?),
1010 )
1011 } else if args.embedding_vec.is_some() {
1012 (EmbeddingMode::Explicit, None)
1013 } else {
1014 (EmbeddingMode::None, None)
1015 };
1016 let embedding_enabled = args.embedding || args.embedding_vec.is_some();
1017 let runtime_ref = embedding_runtime.as_ref();
1018
1019 #[cfg(feature = "clip")]
1022 let clip_enabled = args.clip;
1023 #[cfg(feature = "clip")]
1024 let clip_model = if clip_enabled {
1025 mem.enable_clip()?;
1026 if !args.json {
1027 eprintln!("ℹ️ CLIP visual embeddings enabled");
1028 eprintln!(" Model: MobileCLIP-S2 (512 dimensions)");
1029 eprintln!(" Use 'memvid find --mode clip' to search with natural language");
1030 eprintln!();
1031 }
1032 match memvid_core::ClipModel::default_model() {
1034 Ok(model) => Some(model),
1035 Err(e) => {
1036 warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
1037 None
1038 }
1039 }
1040 } else {
1041 None
1042 };
1043
1044 if embedding_enabled && !args.json {
1046 let capacity = stats.capacity_bytes;
1047 if args.vector_compression {
1048 let max_docs = capacity / 20_000; eprintln!("ℹ️ Vector embeddings enabled with Product Quantization compression");
1051 eprintln!(" Storage: ~20 KB per document (16x compressed)");
1052 eprintln!(" Accuracy: ~95% recall@10 maintained");
1053 eprintln!(
1054 " Capacity: ~{} documents max for {:.1} GB",
1055 max_docs,
1056 capacity as f64 / 1e9
1057 );
1058 eprintln!();
1059 } else {
1060 let max_docs = capacity / 270_000; eprintln!("⚠️ WARNING: Vector embeddings enabled (uncompressed)");
1062 eprintln!(" Storage: ~270 KB per document");
1063 eprintln!(
1064 " Capacity: ~{} documents max for {:.1} GB",
1065 max_docs,
1066 capacity as f64 / 1e9
1067 );
1068 eprintln!(" Use --vector-compression for 16x storage savings (~20 KB/doc)");
1069 eprintln!(" Use --no-embedding for lexical search only (~5 KB/doc)");
1070 eprintln!();
1071 }
1072 }
1073
1074 let embed_progress = if embedding_enabled {
1075 let total = match &input_set {
1076 InputSet::Files(paths) => Some(paths.len() as u64),
1077 InputSet::Stdin => None,
1078 };
1079 Some(create_task_progress_bar(total, "embed", false))
1080 } else {
1081 None
1082 };
1083 let mut embedded_docs = 0usize;
1084
1085 if let InputSet::Files(paths) = &input_set {
1086 if paths.len() > 1 {
1087 if args.uri.is_some() || args.title.is_some() {
1088 anyhow::bail!(
1089 "--uri/--title apply to a single file; omit them when using a directory or glob"
1090 );
1091 }
1092 }
1093 }
1094
1095 let mut reports = Vec::new();
1096 let mut processed = 0usize;
1097 let mut skipped = 0usize;
1098 let mut bytes_added = 0u64;
1099 let mut bytes_input = 0u64;
1100 let mut no_raw_count = 0usize;
1101 let mut summary_warnings: Vec<String> = Vec::new();
1102 #[cfg(feature = "clip")]
1103 let mut clip_embeddings_added = false;
1104
1105 let mut capacity_reached = false;
1106 match input_set {
1107 InputSet::Stdin => {
1108 processed += 1;
1109 match read_payload(None) {
1110 Ok(payload) => {
1111 let mut analyze_spinner = if args.json {
1112 None
1113 } else {
1114 Some(create_spinner("Analyzing stdin payload..."))
1115 };
1116 match ingest_payload(
1117 &mut mem,
1118 &args,
1119 config,
1120 payload,
1121 None,
1122 capacity_guard.as_mut(),
1123 embedding_enabled,
1124 runtime_ref,
1125 use_parallel,
1126 ) {
1127 Ok(outcome) => {
1128 if let Some(pb) = analyze_spinner.take() {
1129 pb.finish_and_clear();
1130 }
1131 bytes_added += outcome.report.stored_bytes as u64;
1132 bytes_input += outcome.report.original_bytes as u64;
1133 if outcome.report.no_raw {
1134 no_raw_count += 1;
1135 }
1136 if args.json {
1137 summary_warnings
1138 .extend(outcome.report.extraction.warnings.iter().cloned());
1139 }
1140 reports.push(outcome.report);
1141 let report_index = reports.len() - 1;
1142 if !args.json && !use_parallel {
1143 if let Some(report) = reports.get(report_index) {
1144 print_report(report);
1145 }
1146 }
1147 if args.transcript {
1148 let notice = transcript_notice_message(&mut mem)?;
1149 if args.json {
1150 summary_warnings.push(notice);
1151 } else {
1152 println!("{notice}");
1153 }
1154 }
1155 if outcome.embedded {
1156 if let Some(pb) = embed_progress.as_ref() {
1157 pb.inc(1);
1158 }
1159 embedded_docs += 1;
1160 }
1161 if use_parallel {
1162 #[cfg(feature = "parallel_segments")]
1163 if let Some(input) = outcome.parallel_input {
1164 pending_parallel_indices.push(report_index);
1165 pending_parallel_inputs.push(input);
1166 }
1167 } else if let Some(guard) = capacity_guard.as_mut() {
1168 mem.commit()?;
1169 let stats = mem.stats()?;
1170 guard.update_after_commit(&stats);
1171 guard.check_and_warn_capacity(); }
1173 }
1174 Err(err) => {
1175 if let Some(pb) = analyze_spinner.take() {
1176 pb.finish_and_clear();
1177 }
1178 if err.downcast_ref::<DuplicateUriError>().is_some() {
1179 return Err(err);
1180 }
1181 warn!("skipped stdin payload: {err}");
1182 skipped += 1;
1183 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1184 capacity_reached = true;
1185 }
1186 }
1187 }
1188 }
1189 Err(err) => {
1190 warn!("failed to read stdin: {err}");
1191 skipped += 1;
1192 }
1193 }
1194 }
1195 InputSet::Files(ref paths) => {
1196 let mut transcript_notice_printed = false;
1197 for path in paths {
1198 processed += 1;
1199 match read_payload(Some(&path)) {
1200 Ok(payload) => {
1201 let label = path.display().to_string();
1202 let mut analyze_spinner = if args.json {
1203 None
1204 } else {
1205 Some(create_spinner(&format!("Analyzing {label}...")))
1206 };
1207 match ingest_payload(
1208 &mut mem,
1209 &args,
1210 config,
1211 payload,
1212 Some(&path),
1213 capacity_guard.as_mut(),
1214 embedding_enabled,
1215 runtime_ref,
1216 use_parallel,
1217 ) {
1218 Ok(outcome) => {
1219 if let Some(pb) = analyze_spinner.take() {
1220 pb.finish_and_clear();
1221 }
1222 bytes_added += outcome.report.stored_bytes as u64;
1223 bytes_input += outcome.report.original_bytes as u64;
1224 if outcome.report.no_raw {
1225 no_raw_count += 1;
1226 }
1227
1228 #[cfg(feature = "clip")]
1230 if let Some(ref model) = clip_model {
1231 let mime = outcome.report.mime.as_deref();
1232 let clip_frame_id = outcome.report.frame_id;
1233
1234 if is_image_file(&path) {
1235 match model.encode_image_file(&path) {
1236 Ok(embedding) => {
1237 if let Err(e) = mem.add_clip_embedding_with_page(
1238 clip_frame_id,
1239 None,
1240 embedding,
1241 ) {
1242 warn!(
1243 "Failed to add CLIP embedding for {}: {e}",
1244 path.display()
1245 );
1246 } else {
1247 info!(
1248 "Added CLIP embedding for frame {}",
1249 clip_frame_id
1250 );
1251 clip_embeddings_added = true;
1252 }
1253 }
1254 Err(e) => {
1255 warn!(
1256 "Failed to encode CLIP embedding for {}: {e}",
1257 path.display()
1258 );
1259 }
1260 }
1261 } else if matches!(mime, Some(m) if m == "application/pdf") {
1262 if let Err(e) = mem.commit() {
1264 warn!("Failed to commit before CLIP extraction: {e}");
1265 }
1266
1267 match render_pdf_pages_for_clip(
1268 &path,
1269 CLIP_PDF_MAX_IMAGES,
1270 CLIP_PDF_TARGET_PX,
1271 ) {
1272 Ok(rendered_pages) => {
1273 use rayon::prelude::*;
1274
1275 let path_for_closure = path.clone();
1278
1279 let prev_hook = std::panic::take_hook();
1282 std::panic::set_hook(Box::new(|_| {
1283 }));
1285
1286 let encoded_images: Vec<_> = rendered_pages
1287 .into_par_iter()
1288 .filter_map(|(page_num, image)| {
1289 let image_info = get_image_info(&image);
1291 if !image_info.should_embed() {
1292 info!(
1293 "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1294 path_for_closure.display(),
1295 page_num,
1296 image_info.color_variance,
1297 image_info.width,
1298 image_info.height
1299 );
1300 return None;
1301 }
1302
1303 let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1307 let rgb_image = image.to_rgb8();
1308 let mut png_bytes = Vec::new();
1309 image::DynamicImage::ImageRgb8(rgb_image).write_to(
1310 &mut Cursor::new(&mut png_bytes),
1311 image::ImageFormat::Png,
1312 ).map(|()| png_bytes)
1313 }));
1314
1315 match encode_result {
1316 Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1317 Ok(Err(e)) => {
1318 info!(
1319 "Skipping image for {} page {}: {e}",
1320 path_for_closure.display(),
1321 page_num
1322 );
1323 None
1324 }
1325 Err(_) => {
1326 info!(
1328 "Skipping malformed image for {} page {}",
1329 path_for_closure.display(),
1330 page_num
1331 );
1332 None
1333 }
1334 }
1335 })
1336 .collect();
1337
1338 std::panic::set_hook(prev_hook);
1340
1341 let mut child_frame_offset = 1u64;
1344 let parent_uri = outcome.report.uri.clone();
1345 let parent_title = outcome
1346 .report
1347 .title
1348 .clone()
1349 .unwrap_or_else(|| "Image".to_string());
1350 let total_images = encoded_images.len();
1351
1352 let clip_pb = if !args.json && total_images > 0 {
1354 let pb = ProgressBar::new(total_images as u64);
1355 pb.set_style(
1356 ProgressStyle::with_template(
1357 " CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1358 )
1359 .unwrap_or_else(|_| ProgressStyle::default_bar())
1360 .progress_chars("█▓░"),
1361 );
1362 Some(pb)
1363 } else {
1364 None
1365 };
1366
1367 for (page_num, image, png_bytes) in encoded_images {
1368 let child_uri = format!(
1369 "{}/image/{}",
1370 parent_uri, child_frame_offset
1371 );
1372 let child_title = format!(
1373 "{} - Image {} (page {})",
1374 parent_title, child_frame_offset, page_num
1375 );
1376
1377 let child_options = PutOptions::builder()
1378 .uri(&child_uri)
1379 .title(&child_title)
1380 .parent_id(clip_frame_id)
1381 .role(FrameRole::ExtractedImage)
1382 .auto_tag(false)
1383 .extract_dates(false)
1384 .build();
1385
1386 match mem.put_bytes_with_options(&png_bytes, child_options) {
1387 Ok(_child_seq) => {
1388 let child_frame_id = clip_frame_id + child_frame_offset;
1389 child_frame_offset += 1;
1390
1391 match model.encode_image(&image) {
1392 Ok(embedding) => {
1393 if let Err(e) = mem
1394 .add_clip_embedding_with_page(
1395 child_frame_id,
1396 Some(page_num),
1397 embedding,
1398 )
1399 {
1400 warn!(
1401 "Failed to add CLIP embedding for {} page {}: {e}",
1402 path.display(),
1403 page_num
1404 );
1405 } else {
1406 info!(
1407 "Added CLIP image frame {} for {} page {}",
1408 child_frame_id,
1409 clip_frame_id,
1410 page_num
1411 );
1412 clip_embeddings_added = true;
1413 }
1414 }
1415 Err(e) => warn!(
1416 "Failed to encode CLIP embedding for {} page {}: {e}",
1417 path.display(),
1418 page_num
1419 ),
1420 }
1421
1422 if let Err(e) = mem.commit() {
1424 warn!("Failed to commit after image {}: {e}", child_frame_offset);
1425 }
1426 }
1427 Err(e) => warn!(
1428 "Failed to store image frame for {} page {}: {e}",
1429 path.display(),
1430 page_num
1431 ),
1432 }
1433
1434 if let Some(ref pb) = clip_pb {
1435 pb.inc(1);
1436 }
1437 }
1438
1439 if let Some(pb) = clip_pb {
1440 pb.finish_and_clear();
1441 }
1442 }
1443 Err(err) => warn!(
1444 "Failed to render PDF pages for CLIP {}: {err}",
1445 path.display()
1446 ),
1447 }
1448 }
1449 }
1450
1451 if args.json {
1452 summary_warnings
1453 .extend(outcome.report.extraction.warnings.iter().cloned());
1454 }
1455 reports.push(outcome.report);
1456 let report_index = reports.len() - 1;
1457 if !args.json && !use_parallel {
1458 if let Some(report) = reports.get(report_index) {
1459 print_report(report);
1460 }
1461 }
1462 if args.transcript && !transcript_notice_printed {
1463 let notice = transcript_notice_message(&mut mem)?;
1464 if args.json {
1465 summary_warnings.push(notice);
1466 } else {
1467 println!("{notice}");
1468 }
1469 transcript_notice_printed = true;
1470 }
1471 if outcome.embedded {
1472 if let Some(pb) = embed_progress.as_ref() {
1473 pb.inc(1);
1474 }
1475 embedded_docs += 1;
1476 }
1477 if use_parallel {
1478 #[cfg(feature = "parallel_segments")]
1479 if let Some(input) = outcome.parallel_input {
1480 pending_parallel_indices.push(report_index);
1481 pending_parallel_inputs.push(input);
1482 }
1483 } else if let Some(guard) = capacity_guard.as_mut() {
1484 mem.commit()?;
1485 let stats = mem.stats()?;
1486 guard.update_after_commit(&stats);
1487 guard.check_and_warn_capacity(); }
1489 }
1490 Err(err) => {
1491 if let Some(pb) = analyze_spinner.take() {
1492 pb.finish_and_clear();
1493 }
1494 if err.downcast_ref::<DuplicateUriError>().is_some() {
1495 return Err(err);
1496 }
1497 warn!("skipped {}: {err}", path.display());
1498 skipped += 1;
1499 if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1500 capacity_reached = true;
1501 }
1502 }
1503 }
1504 }
1505 Err(err) => {
1506 warn!("failed to read {}: {err}", path.display());
1507 skipped += 1;
1508 }
1509 }
1510 if capacity_reached {
1511 break;
1512 }
1513 }
1514 }
1515 }
1516
1517 if capacity_reached {
1518 warn!("capacity limit reached; remaining inputs were not processed");
1519 }
1520
1521 if let Some(pb) = embed_progress.as_ref() {
1522 pb.finish_with_message("embed");
1523 }
1524
1525 if let Some(runtime) = embedding_runtime.as_ref() {
1526 if embedded_docs > 0 {
1527 match embedding_mode {
1528 EmbeddingMode::Explicit => println!(
1529 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200)",
1530 embedded_docs,
1531 runtime.dimension()
1532 ),
1533 EmbeddingMode::Auto => println!(
1534 "\u{2713} vectors={} dim={} hnsw(M=16, efC=200) (auto)",
1535 embedded_docs,
1536 runtime.dimension()
1537 ),
1538 EmbeddingMode::None => {}
1539 }
1540 } else {
1541 match embedding_mode {
1542 EmbeddingMode::Explicit => {
1543 println!("No embeddings were generated (no textual content available)");
1544 }
1545 EmbeddingMode::Auto => {
1546 println!(
1547 "Semantic runtime available, but no textual content produced embeddings"
1548 );
1549 }
1550 EmbeddingMode::None => {}
1551 }
1552 }
1553 }
1554
1555 if reports.is_empty() {
1556 if args.json {
1557 if capacity_reached {
1558 summary_warnings.push("capacity_limit_reached".to_string());
1559 }
1560 let summary_json = json!({
1561 "processed": processed,
1562 "ingested": 0,
1563 "skipped": skipped,
1564 "bytes_added": bytes_added,
1565 "bytes_added_human": format_bytes(bytes_added),
1566 "embedded_documents": embedded_docs,
1567 "capacity_reached": capacity_reached,
1568 "warnings": summary_warnings,
1569 "reports": Vec::<serde_json::Value>::new(),
1570 });
1571 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1572 } else {
1573 println!(
1574 "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1575 processed, skipped
1576 );
1577 }
1578 return Ok(());
1579 }
1580
1581 #[cfg(feature = "parallel_segments")]
1582 let mut parallel_flush_spinner = if use_parallel && !args.json {
1583 Some(create_spinner("Flushing parallel segments..."))
1584 } else {
1585 None
1586 };
1587
1588 if use_parallel {
1589 #[cfg(feature = "parallel_segments")]
1590 {
1591 let mut opts = parallel_opts.take().unwrap_or_else(|| {
1592 let mut opts = BuildOpts::default();
1593 opts.sanitize();
1594 opts
1595 });
1596 opts.vec_compression = mem.vector_compression().clone();
1598 let seqs = if pending_parallel_inputs.is_empty() {
1599 mem.commit_parallel(opts)?;
1600 Vec::new()
1601 } else {
1602 mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1603 };
1604 if let Some(pb) = parallel_flush_spinner.take() {
1605 pb.finish_with_message("Flushed parallel segments");
1606 }
1607 for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1608 if let Some(report) = reports.get_mut(idx) {
1609 report.seq = seq;
1610 }
1611 }
1612 }
1613 #[cfg(not(feature = "parallel_segments"))]
1614 {
1615 mem.commit()?;
1616 }
1617 } else {
1618 mem.commit()?;
1619 }
1620
1621 #[cfg(feature = "clip")]
1623 if clip_embeddings_added {
1624 mem.commit()?;
1625 }
1626
1627 if !args.json && use_parallel {
1628 for report in &reports {
1629 print_report(report);
1630 }
1631 }
1632
1633 let mut tables_extracted = 0usize;
1635 let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1636 if args.tables && !args.no_tables {
1637 if let InputSet::Files(ref paths) = input_set {
1638 let pdf_paths: Vec<_> = paths
1639 .iter()
1640 .filter(|p| {
1641 p.extension()
1642 .and_then(|e| e.to_str())
1643 .map(|e| e.eq_ignore_ascii_case("pdf"))
1644 .unwrap_or(false)
1645 })
1646 .collect();
1647
1648 if !pdf_paths.is_empty() {
1649 let table_spinner = if args.json {
1650 None
1651 } else {
1652 Some(create_spinner(&format!(
1653 "Extracting tables from {} PDF(s)...",
1654 pdf_paths.len()
1655 )))
1656 };
1657
1658 let table_options = TableExtractionOptions::builder()
1660 .mode(memvid_core::table::ExtractionMode::Aggressive)
1661 .min_rows(2)
1662 .min_cols(2)
1663 .min_quality(memvid_core::table::TableQuality::Low)
1664 .merge_multi_page(true)
1665 .build();
1666
1667 for pdf_path in pdf_paths {
1668 if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1669 let filename = pdf_path
1670 .file_name()
1671 .and_then(|s| s.to_str())
1672 .unwrap_or("unknown.pdf");
1673
1674 if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1675 for table in &result.tables {
1676 if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1677 {
1678 tables_extracted += 1;
1679 table_summaries.push(json!({
1680 "table_id": table.table_id,
1681 "source_file": table.source_file,
1682 "meta_frame_id": meta_id,
1683 "rows": table.n_rows,
1684 "cols": table.n_cols,
1685 "quality": format!("{:?}", table.quality),
1686 "pages": format!("{}-{}", table.page_start, table.page_end),
1687 }));
1688 }
1689 }
1690 }
1691 }
1692 }
1693
1694 if let Some(pb) = table_spinner {
1695 if tables_extracted > 0 {
1696 pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1697 } else {
1698 pb.finish_with_message("No tables found");
1699 }
1700 }
1701
1702 if tables_extracted > 0 {
1704 mem.commit()?;
1705 }
1706 }
1707 }
1708 }
1709
1710 #[cfg(feature = "logic_mesh")]
1713 let mut logic_mesh_entities = 0usize;
1714 #[cfg(feature = "logic_mesh")]
1715 {
1716 use memvid_core::{ner_model_path, ner_tokenizer_path, LogicMesh, MeshNode, NerModel};
1717
1718 let models_dir = config.models_dir.clone();
1719 let model_path = ner_model_path(&models_dir);
1720 let tokenizer_path = ner_tokenizer_path(&models_dir);
1721
1722 let ner_available = model_path.exists() && tokenizer_path.exists();
1724 let should_run_ner = args.logic_mesh;
1725
1726 if should_run_ner && !ner_available {
1727 if args.json {
1729 summary_warnings.push(
1730 "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1731 );
1732 } else {
1733 eprintln!("⚠️ Logic-Mesh skipped: NER model not installed.");
1734 eprintln!(" Run: memvid models install --ner distilbert-ner");
1735 }
1736 } else if should_run_ner {
1737 let ner_spinner = if args.json {
1738 None
1739 } else {
1740 Some(create_spinner("Building Logic-Mesh entity graph..."))
1741 };
1742
1743 match NerModel::load(&model_path, &tokenizer_path, None) {
1744 Ok(mut ner_model) => {
1745 let mut mesh = LogicMesh::new();
1746 let mut filtered_count = 0usize;
1747
1748 for report in &reports {
1750 let frame_id = report.frame_id;
1751 if let Some(ref content) = report.search_text {
1753 if !content.is_empty() {
1754 match ner_model.extract(content) {
1755 Ok(raw_entities) => {
1756 let merged_entities =
1759 merge_adjacent_entities(raw_entities, content);
1760
1761 for entity in &merged_entities {
1762 if !is_valid_entity(&entity.text, entity.confidence) {
1764 tracing::debug!(
1765 "Filtered entity: '{}' (confidence: {:.0}%)",
1766 entity.text,
1767 entity.confidence * 100.0
1768 );
1769 filtered_count += 1;
1770 continue;
1771 }
1772
1773 let kind = entity.to_entity_kind();
1775 let node = MeshNode::new(
1777 entity.text.to_lowercase(),
1778 entity.text.trim().to_string(),
1779 kind,
1780 entity.confidence,
1781 frame_id,
1782 entity.byte_start as u32,
1783 (entity.byte_end - entity.byte_start)
1784 .min(u16::MAX as usize)
1785 as u16,
1786 );
1787 mesh.merge_node(node);
1788 logic_mesh_entities += 1;
1789 }
1790 }
1791 Err(e) => {
1792 warn!("NER extraction failed for frame {frame_id}: {e}");
1793 }
1794 }
1795 }
1796 }
1797 }
1798
1799 if logic_mesh_entities > 0 {
1800 mesh.finalize();
1801 let node_count = mesh.stats().node_count;
1802 mem.set_logic_mesh(mesh);
1804 mem.commit()?;
1805 info!(
1806 "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1807 logic_mesh_entities, node_count, filtered_count
1808 );
1809 }
1810
1811 if let Some(pb) = ner_spinner {
1812 pb.finish_with_message(format!(
1813 "Logic-Mesh: {} entities ({} unique)",
1814 logic_mesh_entities,
1815 mem.mesh_node_count()
1816 ));
1817 }
1818 }
1819 Err(e) => {
1820 if let Some(pb) = ner_spinner {
1821 pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1822 }
1823 if args.json {
1824 summary_warnings.push(format!("logic_mesh_failed: {e}"));
1825 }
1826 }
1827 }
1828 }
1829 }
1830
1831 let mut enrichment_cards = 0usize;
1833 if args.enrich && !args.no_enrich && !reports.is_empty() {
1834 let enrich_spinner = if args.json {
1835 None
1836 } else {
1837 Some(create_spinner(&format!(
1838 "Extracting memories from {} frame(s)...",
1839 reports.len()
1840 )))
1841 };
1842
1843 let mut rules_engine = RulesEngine::new();
1844 rules_engine.init().ok(); let frame_ids: Vec<u64> = reports.iter().map(|r| r.frame_id).collect();
1847 for frame_id in &frame_ids {
1848 if let Ok(frame) = mem.frame_by_id(*frame_id) {
1849 let text = frame.search_text.as_deref().unwrap_or("");
1850 if !text.is_empty() {
1851 let ctx = memvid_core::enrich::EnrichmentContext::new(
1852 *frame_id,
1853 frame.uri.clone().unwrap_or_default(),
1854 text.to_string(),
1855 frame.title.clone(),
1856 frame.timestamp,
1857 None,
1858 );
1859 let result = rules_engine.enrich(&ctx);
1860 if !result.cards.is_empty() {
1861 for card in result.cards {
1862 if mem.put_memory_card(card).is_ok() {
1863 enrichment_cards += 1;
1864 }
1865 }
1866 }
1867 }
1868 }
1869 }
1870
1871 if enrichment_cards > 0 {
1872 mem.commit()?;
1873 }
1874
1875 if let Some(pb) = enrich_spinner {
1876 if enrichment_cards > 0 {
1877 pb.finish_with_message(format!("Extracted {} memory card(s)", enrichment_cards));
1878 } else {
1879 pb.finish_with_message("No memories extracted");
1880 }
1881 }
1882 }
1883
1884 let stats = mem.stats()?;
1885 if args.json {
1886 if capacity_reached {
1887 summary_warnings.push("capacity_limit_reached".to_string());
1888 }
1889 let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1890 let total_duration_ms: Option<u64> = {
1891 let sum: u64 = reports
1892 .iter()
1893 .filter_map(|r| r.extraction.duration_ms)
1894 .sum();
1895 if sum > 0 {
1896 Some(sum)
1897 } else {
1898 None
1899 }
1900 };
1901 let total_pages: Option<u32> = {
1902 let sum: u32 = reports
1903 .iter()
1904 .filter_map(|r| r.extraction.pages_processed)
1905 .sum();
1906 if sum > 0 {
1907 Some(sum)
1908 } else {
1909 None
1910 }
1911 };
1912 let embedding_mode_str = match embedding_mode {
1913 EmbeddingMode::None => "none",
1914 EmbeddingMode::Auto => "auto",
1915 EmbeddingMode::Explicit => "explicit",
1916 };
1917 let reduction_percent = if bytes_input > 0 && bytes_input > bytes_added {
1918 Some(((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0)
1919 } else {
1920 None
1921 };
1922 let summary_json = json!({
1923 "processed": processed,
1924 "ingested": reports.len(),
1925 "skipped": skipped,
1926 "bytes_input": bytes_input,
1927 "bytes_input_human": format_bytes(bytes_input),
1928 "bytes_stored": bytes_added,
1929 "bytes_stored_human": format_bytes(bytes_added),
1930 "reduction_percent": reduction_percent,
1931 "embedded_documents": embedded_docs,
1932 "embedding": {
1933 "enabled": embedding_enabled,
1934 "mode": embedding_mode_str,
1935 "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1936 },
1937 "tables": {
1938 "enabled": args.tables && !args.no_tables,
1939 "extracted": tables_extracted,
1940 "tables": table_summaries,
1941 },
1942 "enrichment": {
1943 "enabled": args.enrich && !args.no_enrich,
1944 "cards_extracted": enrichment_cards,
1945 },
1946 "capacity_reached": capacity_reached,
1947 "warnings": summary_warnings,
1948 "extraction": {
1949 "total_duration_ms": total_duration_ms,
1950 "total_pages_processed": total_pages,
1951 },
1952 "memory": {
1953 "path": args.file.display().to_string(),
1954 "frame_count": stats.frame_count,
1955 "size_bytes": stats.size_bytes,
1956 "tier": format!("{:?}", stats.tier),
1957 },
1958 "reports": reports_json,
1959 });
1960 println!("{}", serde_json::to_string_pretty(&summary_json)?);
1961 } else {
1962 println!(
1963 "Committed {} frame(s); total frames {}",
1964 reports.len(),
1965 stats.frame_count
1966 );
1967 println!(
1968 "Summary: processed {}, ingested {}, skipped {}",
1969 processed,
1970 reports.len(),
1971 skipped
1972 );
1973 if bytes_input > 0 {
1976 let actual_stored = stats.payload_bytes;
1977 if actual_stored < bytes_input && no_raw_count > 0 {
1978 let reduction = ((bytes_input - actual_stored) as f64 / bytes_input as f64) * 100.0;
1980 println!(
1981 "Storage: {} input → {} stored ({:.1}% smaller, text extracted)",
1982 format_bytes(bytes_input),
1983 format_bytes(actual_stored),
1984 reduction
1985 );
1986 } else if bytes_added < bytes_input {
1987 let reduction = ((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0;
1989 println!(
1990 "Storage: {} input → {} stored ({:.1}% smaller)",
1991 format_bytes(bytes_input),
1992 format_bytes(bytes_added),
1993 reduction
1994 );
1995 } else {
1996 println!("Storage: {} added", format_bytes(bytes_added));
1997 }
1998 }
1999 if tables_extracted > 0 {
2000 println!("Tables: {} extracted from PDF(s)", tables_extracted);
2001 }
2002 if enrichment_cards > 0 {
2003 println!("Enrichment: {} memory card(s) extracted", enrichment_cards);
2004 }
2005 }
2006
2007 #[cfg(feature = "replay")]
2009 let _ = mem.save_active_session();
2010
2011 Ok(())
2012}
2013
2014pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
2015 let mut mem = Memvid::open(&args.file)?;
2016 ensure_cli_mutation_allowed(&mem)?;
2017 apply_lock_cli(&mut mem, &args.lock);
2018 let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
2019 let frame_id = existing.id;
2020
2021 let payload_bytes = match args.input.as_ref() {
2022 Some(path) => Some(read_payload(Some(path))?),
2023 None => None,
2024 };
2025 let payload_slice = payload_bytes.as_deref();
2026 let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
2027 let source_path = args.input.as_deref();
2028
2029 let mut options = PutOptions::default();
2030 options.enable_embedding = false;
2031 options.auto_tag = payload_slice.is_some();
2032 options.extract_dates = payload_slice.is_some();
2033 options.timestamp = Some(existing.timestamp);
2034 options.track = existing.track.clone();
2035 options.kind = existing.kind.clone();
2036 options.uri = existing.uri.clone();
2037 options.title = existing.title.clone();
2038 options.metadata = existing.metadata.clone();
2039 options.search_text = existing.search_text.clone();
2040 options.tags = existing.tags.clone();
2041 options.labels = existing.labels.clone();
2042 options.extra_metadata = existing.extra_metadata.clone();
2043
2044 if let Some(new_uri) = &args.set_uri {
2045 options.uri = Some(derive_uri(Some(new_uri), None));
2046 }
2047
2048 if options.uri.is_none() && payload_slice.is_some() {
2049 options.uri = Some(derive_uri(None, source_path));
2050 }
2051
2052 if let Some(title) = &args.title {
2053 options.title = Some(title.clone());
2054 }
2055 if let Some(ts) = args.timestamp {
2056 options.timestamp = Some(ts);
2057 }
2058 if let Some(track) = &args.track {
2059 options.track = Some(track.clone());
2060 }
2061 if let Some(kind) = &args.kind {
2062 options.kind = Some(kind.clone());
2063 }
2064
2065 if !args.labels.is_empty() {
2066 options.labels = args.labels.clone();
2067 }
2068
2069 if !args.tags.is_empty() {
2070 options.tags.clear();
2071 for (key, value) in &args.tags {
2072 if !key.is_empty() {
2073 options.tags.push(key.clone());
2074 }
2075 if !value.is_empty() && value != key {
2076 options.tags.push(value.clone());
2077 }
2078 options.extra_metadata.insert(key.clone(), value.clone());
2079 }
2080 }
2081
2082 apply_metadata_overrides(&mut options, &args.metadata);
2083
2084 let mut search_source = options.search_text.clone();
2085
2086 if let Some(payload) = payload_slice {
2087 let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
2088 if let Some(title) = inferred_title {
2089 options.title = Some(title);
2090 }
2091
2092 if options.uri.is_none() {
2093 options.uri = Some(derive_uri(None, source_path));
2094 }
2095
2096 let analysis = analyze_file(
2097 source_path,
2098 payload,
2099 payload_utf8,
2100 options.title.as_deref(),
2101 AudioAnalyzeOptions::default(),
2102 false,
2103 );
2104 if let Some(meta) = analysis.metadata {
2105 options.metadata = Some(meta);
2106 }
2107 if let Some(text) = analysis.search_text {
2108 search_source = Some(text.clone());
2109 options.search_text = Some(text);
2110 }
2111 } else {
2112 options.auto_tag = false;
2113 options.extract_dates = false;
2114 }
2115
2116 let stats = mem.stats()?;
2117 options.enable_embedding = stats.has_vec_index;
2118
2119 let mv2_dimension = mem.effective_vec_index_dimension()?;
2121 let mut embedding: Option<Vec<f32>> = None;
2122 if args.embeddings {
2123 let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
2124 memvid_core::EmbeddingIdentitySummary::Single(identity) => {
2125 identity.model.map(String::from)
2126 }
2127 memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
2128 let models: Vec<_> = identities
2129 .iter()
2130 .filter_map(|entry| entry.identity.model.as_deref())
2131 .collect();
2132 anyhow::bail!(
2133 "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
2134 Detected models: {:?}",
2135 models
2136 );
2137 }
2138 memvid_core::EmbeddingIdentitySummary::Unknown => None,
2139 };
2140
2141 let runtime = load_embedding_runtime_for_mv2(
2142 config,
2143 inferred_embedding_model.as_deref(),
2144 mv2_dimension,
2145 )?;
2146 if let Some(text) = search_source.as_ref() {
2147 if !text.trim().is_empty() {
2148 embedding = Some(runtime.embed_passage(text)?);
2149 }
2150 }
2151 if embedding.is_none() {
2152 if let Some(text) = payload_utf8 {
2153 if !text.trim().is_empty() {
2154 embedding = Some(runtime.embed_passage(text)?);
2155 }
2156 }
2157 }
2158 if embedding.is_none() {
2159 warn!("no textual content available; embeddings not recomputed");
2160 }
2161 if embedding.is_some() {
2162 apply_embedding_identity_metadata(&mut options, &runtime);
2163 }
2164 }
2165
2166 let mut effective_embedding = embedding;
2167 if effective_embedding.is_none() && stats.has_vec_index {
2168 effective_embedding = mem.frame_embedding(frame_id)?;
2169 }
2170
2171 let final_uri = options.uri.clone();
2172 let replaced_payload = payload_slice.is_some();
2173 let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
2174 mem.commit()?;
2175
2176 let updated_frame =
2177 if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
2178 mem.frame_by_uri(&uri)?
2179 } else {
2180 let mut query = TimelineQueryBuilder::default();
2181 if let Some(limit) = NonZeroU64::new(1) {
2182 query = query.limit(limit).reverse(true);
2183 }
2184 let latest = mem.timeline(query.build())?;
2185 if let Some(entry) = latest.first() {
2186 mem.frame_by_id(entry.frame_id)?
2187 } else {
2188 mem.frame_by_id(frame_id)?
2189 }
2190 };
2191
2192 if args.json {
2193 let json = json!({
2194 "sequence": seq,
2195 "previous_frame_id": frame_id,
2196 "frame": frame_to_json(&updated_frame),
2197 "replaced_payload": replaced_payload,
2198 });
2199 println!("{}", serde_json::to_string_pretty(&json)?);
2200 } else {
2201 println!(
2202 "Updated frame {} → new frame {} (seq {})",
2203 frame_id, updated_frame.id, seq
2204 );
2205 println!(
2206 "Payload: {}",
2207 if replaced_payload {
2208 "replaced"
2209 } else {
2210 "reused"
2211 }
2212 );
2213 print_frame_summary(&mut mem, &updated_frame)?;
2214 }
2215
2216 Ok(())
2217}
2218
2219fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2220 if let Some(uri) = provided {
2221 let sanitized = sanitize_uri(uri, true);
2222 return format!("mv2://{}", sanitized);
2223 }
2224
2225 if let Some(path) = source {
2226 let raw = path.to_string_lossy();
2227 let sanitized = sanitize_uri(&raw, false);
2228 return format!("mv2://{}", sanitized);
2229 }
2230
2231 format!("mv2://frames/{}", Uuid::new_v4())
2232}
2233
2234pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2235 let digest = hash(payload).to_hex();
2236 let short = &digest[..32];
2237 let ext_from_path = source
2238 .and_then(|path| path.extension())
2239 .and_then(|ext| ext.to_str())
2240 .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2241 .filter(|ext| !ext.is_empty());
2242 let ext = ext_from_path
2243 .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2244 .unwrap_or_else(|| "bin".to_string());
2245 format!("mv2://video/{short}.{ext}")
2246}
2247
2248fn derive_title(
2249 provided: Option<String>,
2250 source: Option<&Path>,
2251 payload_utf8: Option<&str>,
2252) -> Option<String> {
2253 if let Some(title) = provided {
2254 let trimmed = title.trim();
2255 if trimmed.is_empty() {
2256 None
2257 } else {
2258 Some(trimmed.to_string())
2259 }
2260 } else {
2261 if let Some(text) = payload_utf8 {
2262 if let Some(markdown_title) = extract_markdown_title(text) {
2263 return Some(markdown_title);
2264 }
2265 }
2266 source
2267 .and_then(|path| path.file_stem())
2268 .and_then(|stem| stem.to_str())
2269 .map(to_title_case)
2270 }
2271}
2272
2273fn extract_markdown_title(text: &str) -> Option<String> {
2274 for line in text.lines() {
2275 let trimmed = line.trim();
2276 if trimmed.starts_with('#') {
2277 let title = trimmed.trim_start_matches('#').trim();
2278 if !title.is_empty() {
2279 return Some(title.to_string());
2280 }
2281 }
2282 }
2283 None
2284}
2285
2286fn to_title_case(input: &str) -> String {
2287 if input.is_empty() {
2288 return String::new();
2289 }
2290 let mut chars = input.chars();
2291 let Some(first) = chars.next() else {
2292 return String::new();
2293 };
2294 let prefix = first.to_uppercase().collect::<String>();
2295 prefix + chars.as_str()
2296}
2297
2298fn should_skip_input(path: &Path) -> bool {
2299 matches!(
2300 path.file_name().and_then(|name| name.to_str()),
2301 Some(".DS_Store")
2302 )
2303}
2304
2305fn should_skip_dir(path: &Path) -> bool {
2306 matches!(
2307 path.file_name().and_then(|name| name.to_str()),
2308 Some(name) if name.starts_with('.')
2309 )
2310}
2311
2312enum InputSet {
2313 Stdin,
2314 Files(Vec<PathBuf>),
2315}
2316
2317#[cfg(feature = "clip")]
2319fn is_image_file(path: &std::path::Path) -> bool {
2320 let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2321 return false;
2322 };
2323 matches!(
2324 ext.to_ascii_lowercase().as_str(),
2325 "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2326 )
2327}
2328
2329fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2330 let Some(raw) = input else {
2331 return Ok(InputSet::Stdin);
2332 };
2333
2334 if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2335 let mut files = Vec::new();
2336 let mut matched_any = false;
2337 for entry in glob(raw)? {
2338 let path = entry?;
2339 if path.is_file() {
2340 matched_any = true;
2341 if should_skip_input(&path) {
2342 continue;
2343 }
2344 files.push(path);
2345 }
2346 }
2347 files.sort();
2348 if files.is_empty() {
2349 if matched_any {
2350 anyhow::bail!(
2351 "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2352 );
2353 }
2354 anyhow::bail!("pattern '{raw}' did not match any files");
2355 }
2356 return Ok(InputSet::Files(files));
2357 }
2358
2359 let path = PathBuf::from(raw);
2360 if path.is_dir() {
2361 let (mut files, skipped_any) = collect_directory_files(&path)?;
2362 files.sort();
2363 if files.is_empty() {
2364 if skipped_any {
2365 anyhow::bail!(
2366 "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2367 path.display()
2368 );
2369 }
2370 anyhow::bail!(
2371 "directory '{}' contains no ingestible files",
2372 path.display()
2373 );
2374 }
2375 Ok(InputSet::Files(files))
2376 } else {
2377 Ok(InputSet::Files(vec![path]))
2378 }
2379}
2380
2381fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2382 let mut files = Vec::new();
2383 let mut skipped_any = false;
2384 let mut stack = vec![root.to_path_buf()];
2385 while let Some(dir) = stack.pop() {
2386 for entry in fs::read_dir(&dir)? {
2387 let entry = entry?;
2388 let path = entry.path();
2389 let file_type = entry.file_type()?;
2390 if file_type.is_dir() {
2391 if should_skip_dir(&path) {
2392 skipped_any = true;
2393 continue;
2394 }
2395 stack.push(path);
2396 } else if file_type.is_file() {
2397 if should_skip_input(&path) {
2398 skipped_any = true;
2399 continue;
2400 }
2401 files.push(path);
2402 }
2403 }
2404 }
2405 Ok((files, skipped_any))
2406}
2407
2408struct IngestOutcome {
2409 report: PutReport,
2410 embedded: bool,
2411 #[cfg(feature = "parallel_segments")]
2412 parallel_input: Option<ParallelInput>,
2413}
2414
2415struct PutReport {
2416 seq: u64,
2417 #[allow(dead_code)] frame_id: u64,
2419 uri: String,
2420 title: Option<String>,
2421 original_bytes: usize,
2422 stored_bytes: usize,
2423 compressed: bool,
2424 no_raw: bool,
2426 source: Option<PathBuf>,
2427 mime: Option<String>,
2428 metadata: Option<DocMetadata>,
2429 extraction: ExtractionSummary,
2430 #[cfg(feature = "logic_mesh")]
2432 search_text: Option<String>,
2433}
2434
2435struct CapacityGuard {
2436 #[allow(dead_code)]
2437 tier: Tier,
2438 capacity: u64,
2439 current_size: u64,
2440}
2441
2442impl CapacityGuard {
2443 const MIN_OVERHEAD: u64 = 128 * 1024;
2444 const RELATIVE_OVERHEAD: f64 = 0.05;
2445
2446 fn from_stats(stats: &Stats) -> Option<Self> {
2447 if stats.capacity_bytes == 0 {
2448 return None;
2449 }
2450 Some(Self {
2451 tier: stats.tier,
2452 capacity: stats.capacity_bytes,
2453 current_size: stats.size_bytes,
2454 })
2455 }
2456
2457 fn ensure_capacity(&self, additional: u64) -> Result<()> {
2458 let projected = self
2459 .current_size
2460 .saturating_add(additional)
2461 .saturating_add(Self::estimate_overhead(additional));
2462 if projected > self.capacity {
2463 return Err(map_put_error(
2464 MemvidError::CapacityExceeded {
2465 current: self.current_size,
2466 limit: self.capacity,
2467 required: projected,
2468 },
2469 Some(self.capacity),
2470 ));
2471 }
2472 Ok(())
2473 }
2474
2475 fn update_after_commit(&mut self, stats: &Stats) {
2476 self.current_size = stats.size_bytes;
2477 }
2478
2479 fn capacity_hint(&self) -> Option<u64> {
2480 Some(self.capacity)
2481 }
2482
2483 fn check_and_warn_capacity(&self) {
2485 if self.capacity == 0 {
2486 return;
2487 }
2488
2489 let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2490
2491 if usage_pct >= 90.0 && usage_pct < 91.0 {
2493 eprintln!(
2494 "⚠️ CRITICAL: 90% capacity used ({} / {})",
2495 format_bytes(self.current_size),
2496 format_bytes(self.capacity)
2497 );
2498 eprintln!(" Actions:");
2499 eprintln!(" 1. Recreate with larger --size");
2500 eprintln!(" 2. Delete old frames with: memvid delete <file> --uri <pattern>");
2501 eprintln!(" 3. If using vectors, consider --no-embedding for new docs");
2502 } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2503 eprintln!(
2504 "⚠️ WARNING: 75% capacity used ({} / {})",
2505 format_bytes(self.current_size),
2506 format_bytes(self.capacity)
2507 );
2508 eprintln!(" Consider planning for capacity expansion soon");
2509 } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2510 eprintln!(
2511 "ℹ️ INFO: 50% capacity used ({} / {})",
2512 format_bytes(self.current_size),
2513 format_bytes(self.capacity)
2514 );
2515 }
2516 }
2517
2518 fn estimate_overhead(additional: u64) -> u64 {
2519 let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2520 fractional.max(Self::MIN_OVERHEAD)
2521 }
2522}
2523
2524#[derive(Debug, Clone)]
2525pub struct FileAnalysis {
2526 pub mime: String,
2527 pub metadata: Option<DocMetadata>,
2528 pub search_text: Option<String>,
2529 pub extraction: ExtractionSummary,
2530}
2531
2532#[derive(Clone, Copy)]
2533pub struct AudioAnalyzeOptions {
2534 force: bool,
2535 segment_secs: u32,
2536 transcribe: bool,
2537}
2538
2539impl AudioAnalyzeOptions {
2540 const DEFAULT_SEGMENT_SECS: u32 = 30;
2541
2542 fn normalised_segment_secs(self) -> u32 {
2543 self.segment_secs.max(1)
2544 }
2545}
2546
2547impl Default for AudioAnalyzeOptions {
2548 fn default() -> Self {
2549 Self {
2550 force: false,
2551 segment_secs: Self::DEFAULT_SEGMENT_SECS,
2552 transcribe: false,
2553 }
2554 }
2555}
2556
2557struct AudioAnalysis {
2558 metadata: DocAudioMetadata,
2559 caption: Option<String>,
2560 search_terms: Vec<String>,
2561 transcript: Option<String>,
2562}
2563
2564struct ImageAnalysis {
2565 width: u32,
2566 height: u32,
2567 palette: Vec<String>,
2568 caption: Option<String>,
2569 exif: Option<DocExifMetadata>,
2570}
2571
2572#[derive(Debug, Clone)]
2573pub struct ExtractionSummary {
2574 reader: Option<String>,
2575 status: ExtractionStatus,
2576 warnings: Vec<String>,
2577 duration_ms: Option<u64>,
2578 pages_processed: Option<u32>,
2579}
2580
2581impl ExtractionSummary {
2582 fn record_warning<S: Into<String>>(&mut self, warning: S) {
2583 self.warnings.push(warning.into());
2584 }
2585}
2586
2587#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2588enum ExtractionStatus {
2589 Skipped,
2590 Ok,
2591 FallbackUsed,
2592 Empty,
2593 Failed,
2594}
2595
2596impl ExtractionStatus {
2597 fn label(self) -> &'static str {
2598 match self {
2599 Self::Skipped => "skipped",
2600 Self::Ok => "ok",
2601 Self::FallbackUsed => "fallback_used",
2602 Self::Empty => "empty",
2603 Self::Failed => "failed",
2604 }
2605 }
2606}
2607
2608fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2609 let filename = source_path
2610 .and_then(|path| path.file_name())
2611 .and_then(|name| name.to_str())
2612 .map(|value| value.to_string());
2613 MediaManifest {
2614 kind: "video".to_string(),
2615 mime: mime.to_string(),
2616 bytes,
2617 filename,
2618 duration_ms: None,
2619 width: None,
2620 height: None,
2621 codec: None,
2622 }
2623}
2624
2625pub fn analyze_file(
2626 source_path: Option<&Path>,
2627 payload: &[u8],
2628 payload_utf8: Option<&str>,
2629 inferred_title: Option<&str>,
2630 audio_opts: AudioAnalyzeOptions,
2631 force_video: bool,
2632) -> FileAnalysis {
2633 let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2634 let is_video = force_video || mime.starts_with("video/");
2635 let treat_as_text = if is_video { false } else { treat_as_text_base };
2636
2637 let mut metadata = DocMetadata::default();
2638 metadata.mime = Some(mime.clone());
2639 metadata.bytes = Some(payload.len() as u64);
2640 metadata.hash = Some(hash(payload).to_hex().to_string());
2641
2642 let mut search_text = None;
2643
2644 let mut extraction = ExtractionSummary {
2645 reader: None,
2646 status: ExtractionStatus::Skipped,
2647 warnings: Vec::new(),
2648 duration_ms: None,
2649 pages_processed: None,
2650 };
2651
2652 let reader_registry = default_reader_registry();
2653 let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2654 if slice.is_empty() {
2655 None
2656 } else {
2657 Some(slice)
2658 }
2659 });
2660
2661 let apply_extracted_text = |text: &str,
2662 reader_label: &str,
2663 status_if_applied: ExtractionStatus,
2664 extraction: &mut ExtractionSummary,
2665 metadata: &mut DocMetadata,
2666 search_text: &mut Option<String>|
2667 -> bool {
2668 if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2669 let mut value = normalized.text;
2670 if normalized.truncated {
2671 value.push('…');
2672 }
2673 if metadata.caption.is_none() {
2674 if let Some(caption) = caption_from_text(&value) {
2675 metadata.caption = Some(caption);
2676 }
2677 }
2678 *search_text = Some(value);
2679 extraction.reader = Some(reader_label.to_string());
2680 extraction.status = status_if_applied;
2681 true
2682 } else {
2683 false
2684 }
2685 };
2686
2687 if is_video {
2688 metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2689 }
2690
2691 if treat_as_text {
2692 if let Some(text) = payload_utf8 {
2693 if !apply_extracted_text(
2694 text,
2695 "bytes",
2696 ExtractionStatus::Ok,
2697 &mut extraction,
2698 &mut metadata,
2699 &mut search_text,
2700 ) {
2701 extraction.status = ExtractionStatus::Empty;
2702 extraction.reader = Some("bytes".to_string());
2703 let msg = "text payload contained no searchable content after normalization";
2704 extraction.record_warning(msg);
2705 warn!("{msg}");
2706 }
2707 } else {
2708 extraction.reader = Some("bytes".to_string());
2709 extraction.status = ExtractionStatus::Failed;
2710 let msg = "payload reported as text but was not valid UTF-8";
2711 extraction.record_warning(msg);
2712 warn!("{msg}");
2713 }
2714 } else if mime == "application/pdf"
2715 || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2716 || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2717 || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2718 || mime == "application/vnd.ms-excel"
2719 || mime == "application/msword"
2720 {
2721 let mut applied = false;
2723
2724 let format_hint = infer_document_format(Some(mime.as_str()), magic);
2725 let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2726 .with_uri(source_path.and_then(|p| p.to_str()))
2727 .with_magic(magic);
2728
2729 if let Some(reader) = reader_registry.find_reader(&hint) {
2730 match reader.extract(payload, &hint) {
2731 Ok(output) => {
2732 extraction.reader = Some(output.reader_name.clone());
2733 if let Some(ms) = output.diagnostics.duration_ms {
2734 extraction.duration_ms = Some(ms);
2735 }
2736 if let Some(pages) = output.diagnostics.pages_processed {
2737 extraction.pages_processed = Some(pages);
2738 }
2739 if let Some(mime_type) = output.document.mime_type.clone() {
2740 metadata.mime = Some(mime_type);
2741 }
2742
2743 for warning in output.diagnostics.warnings.iter() {
2744 extraction.record_warning(warning);
2745 warn!("{warning}");
2746 }
2747
2748 let status = if output.diagnostics.fallback {
2749 ExtractionStatus::FallbackUsed
2750 } else {
2751 ExtractionStatus::Ok
2752 };
2753
2754 if let Some(doc_text) = output.document.text.as_ref() {
2755 applied = apply_extracted_text(
2756 doc_text,
2757 output.reader_name.as_str(),
2758 status,
2759 &mut extraction,
2760 &mut metadata,
2761 &mut search_text,
2762 );
2763 if !applied {
2764 extraction.reader = Some(output.reader_name);
2765 extraction.status = ExtractionStatus::Empty;
2766 let msg = "primary reader returned no usable text";
2767 extraction.record_warning(msg);
2768 warn!("{msg}");
2769 }
2770 } else {
2771 extraction.reader = Some(output.reader_name);
2772 extraction.status = ExtractionStatus::Empty;
2773 let msg = "primary reader returned empty text";
2774 extraction.record_warning(msg);
2775 warn!("{msg}");
2776 }
2777 }
2778 Err(err) => {
2779 let name = reader.name();
2780 extraction.reader = Some(name.to_string());
2781 extraction.status = ExtractionStatus::Failed;
2782 let msg = format!("primary reader {name} failed: {err}");
2783 extraction.record_warning(&msg);
2784 warn!("{msg}");
2785 }
2786 }
2787 } else {
2788 extraction.record_warning("no registered reader matched this PDF payload");
2789 warn!("no registered reader matched this PDF payload");
2790 }
2791
2792 if !applied {
2793 match extract_pdf_text(payload) {
2794 Ok(pdf_text) => {
2795 if !apply_extracted_text(
2796 &pdf_text,
2797 "pdf_extract",
2798 ExtractionStatus::FallbackUsed,
2799 &mut extraction,
2800 &mut metadata,
2801 &mut search_text,
2802 ) {
2803 extraction.reader = Some("pdf_extract".to_string());
2804 extraction.status = ExtractionStatus::Empty;
2805 let msg = "PDF text extraction yielded no searchable content";
2806 extraction.record_warning(msg);
2807 warn!("{msg}");
2808 }
2809 }
2810 Err(err) => {
2811 extraction.reader = Some("pdf_extract".to_string());
2812 extraction.status = ExtractionStatus::Failed;
2813 let msg = format!("failed to extract PDF text via fallback: {err}");
2814 extraction.record_warning(&msg);
2815 warn!("{msg}");
2816 }
2817 }
2818 }
2819 }
2820
2821 if !is_video && mime.starts_with("image/") {
2822 if let Some(image_meta) = analyze_image(payload) {
2823 let ImageAnalysis {
2824 width,
2825 height,
2826 palette,
2827 caption,
2828 exif,
2829 } = image_meta;
2830 metadata.width = Some(width);
2831 metadata.height = Some(height);
2832 if !palette.is_empty() {
2833 metadata.colors = Some(palette);
2834 }
2835 if metadata.caption.is_none() {
2836 metadata.caption = caption;
2837 }
2838 if let Some(exif) = exif {
2839 metadata.exif = Some(exif);
2840 }
2841 }
2842 }
2843
2844 if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2845 if let Some(AudioAnalysis {
2846 metadata: audio_metadata,
2847 caption: audio_caption,
2848 mut search_terms,
2849 transcript,
2850 }) = analyze_audio(payload, source_path, &mime, audio_opts)
2851 {
2852 if metadata.audio.is_none()
2853 || metadata
2854 .audio
2855 .as_ref()
2856 .map_or(true, DocAudioMetadata::is_empty)
2857 {
2858 metadata.audio = Some(audio_metadata);
2859 }
2860 if metadata.caption.is_none() {
2861 metadata.caption = audio_caption;
2862 }
2863
2864 if let Some(ref text) = transcript {
2866 extraction.reader = Some("whisper".to_string());
2867 extraction.status = ExtractionStatus::Ok;
2868 search_text = Some(text.clone());
2869 } else if !search_terms.is_empty() {
2870 match &mut search_text {
2871 Some(existing) => {
2872 for term in search_terms.drain(..) {
2873 if !existing.contains(&term) {
2874 if !existing.ends_with(' ') {
2875 existing.push(' ');
2876 }
2877 existing.push_str(&term);
2878 }
2879 }
2880 }
2881 None => {
2882 search_text = Some(search_terms.join(" "));
2883 }
2884 }
2885 }
2886 }
2887 }
2888
2889 if metadata.caption.is_none() {
2890 if let Some(title) = inferred_title {
2891 let trimmed = title.trim();
2892 if !trimmed.is_empty() {
2893 metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2894 }
2895 }
2896 }
2897
2898 FileAnalysis {
2899 mime,
2900 metadata: if metadata.is_empty() {
2901 None
2902 } else {
2903 Some(metadata)
2904 },
2905 search_text,
2906 extraction,
2907 }
2908}
2909
2910fn default_reader_registry() -> &'static ReaderRegistry {
2911 static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2912 REGISTRY.get_or_init(ReaderRegistry::default)
2913}
2914
2915fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2916 if detect_pdf_magic(magic) {
2917 return Some(DocumentFormat::Pdf);
2918 }
2919
2920 let mime = mime?.trim().to_ascii_lowercase();
2921 match mime.as_str() {
2922 "application/pdf" => Some(DocumentFormat::Pdf),
2923 "text/plain" => Some(DocumentFormat::PlainText),
2924 "text/markdown" => Some(DocumentFormat::Markdown),
2925 "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2926 "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2927 Some(DocumentFormat::Docx)
2928 }
2929 "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2930 Some(DocumentFormat::Pptx)
2931 }
2932 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2933 Some(DocumentFormat::Xlsx)
2934 }
2935 other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2936 _ => None,
2937 }
2938}
2939
2940fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2941 let mut slice = match magic {
2942 Some(slice) if !slice.is_empty() => slice,
2943 _ => return false,
2944 };
2945 if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2946 slice = &slice[3..];
2947 }
2948 while let Some((first, rest)) = slice.split_first() {
2949 if first.is_ascii_whitespace() {
2950 slice = rest;
2951 } else {
2952 break;
2953 }
2954 }
2955 slice.starts_with(b"%PDF")
2956}
2957
2958fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2961 let mut best_text = String::new();
2962 let mut best_source = "none";
2963
2964 let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2967
2968 match pdf_extract::extract_text_from_mem(payload) {
2970 Ok(text) => {
2971 let trimmed = text.trim();
2972 if trimmed.len() > best_text.len() {
2973 best_text = trimmed.to_string();
2974 best_source = "pdf_extract";
2975 }
2976 if trimmed.len() >= min_good_chars {
2977 info!("pdf_extract extracted {} chars (good)", trimmed.len());
2978 return Ok(best_text);
2979 } else if !trimmed.is_empty() {
2980 warn!(
2981 "pdf_extract returned only {} chars, trying other extractors",
2982 trimmed.len()
2983 );
2984 }
2985 }
2986 Err(err) => {
2987 warn!("pdf_extract failed: {err}");
2988 }
2989 }
2990
2991 match extract_pdf_with_lopdf(payload) {
2993 Ok(text) => {
2994 let trimmed = text.trim();
2995 if trimmed.len() > best_text.len() {
2996 best_text = trimmed.to_string();
2997 best_source = "lopdf";
2998 }
2999 if trimmed.len() >= min_good_chars {
3000 info!("lopdf extracted {} chars (good)", trimmed.len());
3001 return Ok(best_text);
3002 } else if !trimmed.is_empty() {
3003 warn!(
3004 "lopdf returned only {} chars, trying pdftotext",
3005 trimmed.len()
3006 );
3007 }
3008 }
3009 Err(err) => {
3010 warn!("lopdf failed: {err}");
3011 }
3012 }
3013
3014 match fallback_pdftotext(payload) {
3016 Ok(text) => {
3017 let trimmed = text.trim();
3018 if trimmed.len() > best_text.len() {
3019 best_text = trimmed.to_string();
3020 best_source = "pdftotext";
3021 }
3022 if !trimmed.is_empty() {
3023 info!("pdftotext extracted {} chars", trimmed.len());
3024 }
3025 }
3026 Err(err) => {
3027 warn!("pdftotext failed: {err}");
3028 }
3029 }
3030
3031 if !best_text.is_empty() {
3033 info!(
3034 "using {} extraction ({} chars)",
3035 best_source,
3036 best_text.len()
3037 );
3038 Ok(best_text)
3039 } else {
3040 warn!("all PDF extraction methods failed, storing without searchable text");
3041 Ok(String::new())
3042 }
3043}
3044
3045fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
3047 use lopdf::Document;
3048
3049 let mut document =
3050 Document::load_mem(payload).map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
3051
3052 if document.is_encrypted() {
3054 let _ = document.decrypt("");
3055 }
3056
3057 let _ = document.decompress();
3059
3060 let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
3062 if page_numbers.is_empty() {
3063 return Err(anyhow!("PDF has no pages"));
3064 }
3065 page_numbers.sort_unstable();
3066
3067 if page_numbers.len() > 4096 {
3069 page_numbers.truncate(4096);
3070 warn!("PDF has more than 4096 pages, truncating extraction");
3071 }
3072
3073 let text = document
3075 .extract_text(&page_numbers)
3076 .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
3077
3078 Ok(text.trim().to_string())
3079}
3080
3081fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
3082 use std::io::Write;
3083 use std::process::{Command, Stdio};
3084
3085 let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
3086
3087 let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
3088 temp_pdf
3089 .write_all(payload)
3090 .context("failed to write pdf payload to temp file")?;
3091 temp_pdf.flush().context("failed to flush temp pdf file")?;
3092
3093 let child = Command::new(pdftotext)
3094 .arg("-layout")
3095 .arg(temp_pdf.path())
3096 .arg("-")
3097 .stdout(Stdio::piped())
3098 .spawn()
3099 .context("failed to spawn pdftotext")?;
3100
3101 let output = child
3102 .wait_with_output()
3103 .context("failed to read pdftotext output")?;
3104
3105 if !output.status.success() {
3106 return Err(anyhow!("pdftotext exited with status {}", output.status));
3107 }
3108
3109 let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
3110 Ok(text)
3111}
3112
3113fn detect_mime(
3114 source_path: Option<&Path>,
3115 payload: &[u8],
3116 payload_utf8: Option<&str>,
3117) -> (String, bool) {
3118 if let Some(path) = source_path {
3121 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3122 let ext_lower = ext.to_ascii_lowercase();
3123 if matches!(
3125 ext_lower.as_str(),
3126 "docx" | "xlsx" | "pptx" | "doc" | "xls" | "ppt"
3127 ) {
3128 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3129 return (mime.to_string(), treat_as_text);
3130 }
3131 }
3132 }
3133 }
3134
3135 if let Some(kind) = infer::get(payload) {
3136 let mime = kind.mime_type().to_string();
3137 let treat_as_text = mime.starts_with("text/")
3138 || matches!(
3139 mime.as_str(),
3140 "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
3141 );
3142 return (mime, treat_as_text);
3143 }
3144
3145 let magic = magic_from_u8(payload);
3146 if !magic.is_empty() && magic != "application/octet-stream" {
3147 let treat_as_text = magic.starts_with("text/")
3148 || matches!(
3149 magic,
3150 "application/json"
3151 | "application/xml"
3152 | "application/javascript"
3153 | "image/svg+xml"
3154 | "text/plain"
3155 );
3156 return (magic.to_string(), treat_as_text);
3157 }
3158
3159 if let Some(path) = source_path {
3160 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3161 if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3162 return (mime.to_string(), treat_as_text);
3163 }
3164 }
3165 }
3166
3167 if payload_utf8.is_some() {
3168 return ("text/plain".to_string(), true);
3169 }
3170
3171 ("application/octet-stream".to_string(), false)
3172}
3173
3174fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
3175 let ext_lower = ext.to_ascii_lowercase();
3176 match ext_lower.as_str() {
3177 "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
3178 | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
3179 | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
3180 | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
3181 "md" | "markdown" => Some(("text/markdown", true)),
3182 "rst" => Some(("text/x-rst", true)),
3183 "json" => Some(("application/json", true)),
3184 "csv" => Some(("text/csv", true)),
3185 "tsv" => Some(("text/tab-separated-values", true)),
3186 "yaml" | "yml" => Some(("application/yaml", true)),
3187 "toml" => Some(("application/toml", true)),
3188 "html" | "htm" => Some(("text/html", true)),
3189 "xml" => Some(("application/xml", true)),
3190 "svg" => Some(("image/svg+xml", true)),
3191 "docx" => Some((
3193 "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
3194 false,
3195 )),
3196 "xlsx" => Some((
3197 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
3198 false,
3199 )),
3200 "pptx" => Some((
3201 "application/vnd.openxmlformats-officedocument.presentationml.presentation",
3202 false,
3203 )),
3204 "doc" => Some(("application/msword", false)),
3206 "xls" => Some(("application/vnd.ms-excel", false)),
3207 "ppt" => Some(("application/vnd.ms-powerpoint", false)),
3208 "gif" => Some(("image/gif", false)),
3209 "jpg" | "jpeg" => Some(("image/jpeg", false)),
3210 "png" => Some(("image/png", false)),
3211 "bmp" => Some(("image/bmp", false)),
3212 "ico" => Some(("image/x-icon", false)),
3213 "tif" | "tiff" => Some(("image/tiff", false)),
3214 "webp" => Some(("image/webp", false)),
3215 _ => None,
3216 }
3217}
3218
3219fn caption_from_text(text: &str) -> Option<String> {
3220 for line in text.lines() {
3221 let trimmed = line.trim();
3222 if !trimmed.is_empty() {
3223 return Some(truncate_to_boundary(trimmed, 240));
3224 }
3225 }
3226 None
3227}
3228
3229fn truncate_to_boundary(text: &str, max_len: usize) -> String {
3230 if text.len() <= max_len {
3231 return text.to_string();
3232 }
3233 let mut end = max_len;
3234 while end > 0 && !text.is_char_boundary(end) {
3235 end -= 1;
3236 }
3237 if end == 0 {
3238 return String::new();
3239 }
3240 let mut truncated = text[..end].trim_end().to_string();
3241 truncated.push('…');
3242 truncated
3243}
3244
3245fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
3246 let reader = ImageReader::new(Cursor::new(payload))
3247 .with_guessed_format()
3248 .map_err(|err| warn!("failed to guess image format: {err}"))
3249 .ok()?;
3250 let image = reader
3251 .decode()
3252 .map_err(|err| warn!("failed to decode image: {err}"))
3253 .ok()?;
3254 let width = image.width();
3255 let height = image.height();
3256 let rgb = image.to_rgb8();
3257 let palette = match get_palette(
3258 rgb.as_raw(),
3259 ColorFormat::Rgb,
3260 COLOR_PALETTE_SIZE,
3261 COLOR_PALETTE_QUALITY,
3262 ) {
3263 Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3264 Err(err) => {
3265 warn!("failed to compute colour palette: {err}");
3266 Vec::new()
3267 }
3268 };
3269
3270 let (exif, exif_caption) = extract_exif_metadata(payload);
3271
3272 Some(ImageAnalysis {
3273 width,
3274 height,
3275 palette,
3276 caption: exif_caption,
3277 exif,
3278 })
3279}
3280
3281fn color_to_hex(color: Color) -> String {
3282 format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3283}
3284
3285fn analyze_audio(
3286 payload: &[u8],
3287 source_path: Option<&Path>,
3288 mime: &str,
3289 options: AudioAnalyzeOptions,
3290) -> Option<AudioAnalysis> {
3291 use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3292 use symphonia::core::errors::Error as SymphoniaError;
3293 use symphonia::core::formats::FormatOptions;
3294 use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3295 use symphonia::core::meta::MetadataOptions;
3296 use symphonia::core::probe::Hint;
3297
3298 let cursor = Cursor::new(payload.to_vec());
3299 let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3300
3301 let mut hint = Hint::new();
3302 if let Some(path) = source_path {
3303 if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3304 hint.with_extension(ext);
3305 }
3306 }
3307 if let Some(suffix) = mime.split('/').nth(1) {
3308 hint.with_extension(suffix);
3309 }
3310
3311 let probed = symphonia::default::get_probe()
3312 .format(
3313 &hint,
3314 mss,
3315 &FormatOptions::default(),
3316 &MetadataOptions::default(),
3317 )
3318 .map_err(|err| warn!("failed to probe audio stream: {err}"))
3319 .ok()?;
3320
3321 let mut format = probed.format;
3322 let track = format.default_track()?;
3323 let track_id = track.id;
3324 let codec_params: CodecParameters = track.codec_params.clone();
3325 let sample_rate = codec_params.sample_rate;
3326 let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3327
3328 let mut decoder = symphonia::default::get_codecs()
3329 .make(&codec_params, &DecoderOptions::default())
3330 .map_err(|err| warn!("failed to create audio decoder: {err}"))
3331 .ok()?;
3332
3333 let mut decoded_frames: u64 = 0;
3334 loop {
3335 let packet = match format.next_packet() {
3336 Ok(packet) => packet,
3337 Err(SymphoniaError::IoError(_)) => break,
3338 Err(SymphoniaError::DecodeError(err)) => {
3339 warn!("skipping undecodable audio packet: {err}");
3340 continue;
3341 }
3342 Err(SymphoniaError::ResetRequired) => {
3343 decoder.reset();
3344 continue;
3345 }
3346 Err(other) => {
3347 warn!("stopping audio analysis due to error: {other}");
3348 break;
3349 }
3350 };
3351
3352 if packet.track_id() != track_id {
3353 continue;
3354 }
3355
3356 match decoder.decode(&packet) {
3357 Ok(audio_buf) => {
3358 decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3359 }
3360 Err(SymphoniaError::DecodeError(err)) => {
3361 warn!("failed to decode audio packet: {err}");
3362 }
3363 Err(SymphoniaError::IoError(_)) => break,
3364 Err(SymphoniaError::ResetRequired) => {
3365 decoder.reset();
3366 }
3367 Err(other) => {
3368 warn!("decoder error: {other}");
3369 break;
3370 }
3371 }
3372 }
3373
3374 let duration_secs = match sample_rate {
3375 Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3376 _ => 0.0,
3377 };
3378
3379 let bitrate_kbps = if duration_secs > 0.0 {
3380 Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3381 } else {
3382 None
3383 };
3384
3385 let tags = extract_lofty_tags(payload);
3386 let caption = tags
3387 .get("title")
3388 .cloned()
3389 .or_else(|| tags.get("tracktitle").cloned());
3390
3391 let mut search_terms = Vec::new();
3392 if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3393 search_terms.push(title.clone());
3394 }
3395 if let Some(artist) = tags.get("artist") {
3396 search_terms.push(artist.clone());
3397 }
3398 if let Some(album) = tags.get("album") {
3399 search_terms.push(album.clone());
3400 }
3401 if let Some(genre) = tags.get("genre") {
3402 search_terms.push(genre.clone());
3403 }
3404
3405 let mut segments = Vec::new();
3406 if duration_secs > 0.0 {
3407 let segment_len = options.normalised_segment_secs() as f64;
3408 if segment_len > 0.0 {
3409 let mut start = 0.0;
3410 let mut idx = 1usize;
3411 while start < duration_secs {
3412 let end = (start + segment_len).min(duration_secs);
3413 segments.push(AudioSegmentMetadata {
3414 start_seconds: start as f32,
3415 end_seconds: end as f32,
3416 label: Some(format!("Segment {}", idx)),
3417 });
3418 if end >= duration_secs {
3419 break;
3420 }
3421 start = end;
3422 idx += 1;
3423 }
3424 }
3425 }
3426
3427 let mut metadata = DocAudioMetadata::default();
3428 if duration_secs > 0.0 {
3429 metadata.duration_secs = Some(duration_secs as f32);
3430 }
3431 if let Some(rate) = sample_rate {
3432 metadata.sample_rate_hz = Some(rate);
3433 }
3434 if let Some(channels) = channel_count {
3435 metadata.channels = Some(channels);
3436 }
3437 if let Some(bitrate) = bitrate_kbps {
3438 metadata.bitrate_kbps = Some(bitrate);
3439 }
3440 if codec_params.codec != CODEC_TYPE_NULL {
3441 metadata.codec = Some(format!("{:?}", codec_params.codec));
3442 }
3443 if !segments.is_empty() {
3444 metadata.segments = segments;
3445 }
3446 if !tags.is_empty() {
3447 metadata.tags = tags
3448 .iter()
3449 .map(|(k, v)| (k.clone(), v.clone()))
3450 .collect::<BTreeMap<_, _>>();
3451 }
3452
3453 let transcript = if options.transcribe {
3455 transcribe_audio(source_path)
3456 } else {
3457 None
3458 };
3459
3460 Some(AudioAnalysis {
3461 metadata,
3462 caption,
3463 search_terms,
3464 transcript,
3465 })
3466}
3467
3468#[cfg(feature = "whisper")]
3470fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3471 use memvid_core::{WhisperConfig, WhisperTranscriber};
3472
3473 let path = source_path?;
3474
3475 tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3476
3477 let config = WhisperConfig::default();
3478 let mut transcriber = match WhisperTranscriber::new(&config) {
3479 Ok(t) => t,
3480 Err(e) => {
3481 tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3482 return None;
3483 }
3484 };
3485
3486 match transcriber.transcribe_file(path) {
3487 Ok(result) => {
3488 tracing::info!(
3489 duration = result.duration_secs,
3490 text_len = result.text.len(),
3491 "Audio transcription complete"
3492 );
3493 Some(result.text)
3494 }
3495 Err(e) => {
3496 tracing::warn!(error = %e, "Failed to transcribe audio");
3497 None
3498 }
3499 }
3500}
3501
3502#[cfg(not(feature = "whisper"))]
3503fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3504 tracing::warn!(
3505 "Whisper feature not enabled. Rebuild with --features whisper to enable transcription."
3506 );
3507 None
3508}
3509
3510fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3511 use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3512
3513 fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3514 if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3515 out.entry("title".into())
3516 .or_insert_with(|| value.to_string());
3517 out.entry("tracktitle".into())
3518 .or_insert_with(|| value.to_string());
3519 }
3520 if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3521 out.entry("artist".into())
3522 .or_insert_with(|| value.to_string());
3523 } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3524 out.entry("artist".into())
3525 .or_insert_with(|| value.to_string());
3526 }
3527 if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3528 out.entry("album".into())
3529 .or_insert_with(|| value.to_string());
3530 }
3531 if let Some(value) = tag.get_string(&ItemKey::Genre) {
3532 out.entry("genre".into())
3533 .or_insert_with(|| value.to_string());
3534 }
3535 if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3536 out.entry("track_number".into())
3537 .or_insert_with(|| value.to_string());
3538 }
3539 if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3540 out.entry("disc_number".into())
3541 .or_insert_with(|| value.to_string());
3542 }
3543 }
3544
3545 let mut tags = HashMap::new();
3546 let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3547 Ok(probe) => probe,
3548 Err(err) => {
3549 warn!("failed to guess audio tag file type: {err}");
3550 return tags;
3551 }
3552 };
3553
3554 let tagged_file = match probe.read() {
3555 Ok(file) => file,
3556 Err(err) => {
3557 warn!("failed to read audio tags: {err}");
3558 return tags;
3559 }
3560 };
3561
3562 if let Some(primary) = tagged_file.primary_tag() {
3563 collect_tag(primary, &mut tags);
3564 }
3565 for tag in tagged_file.tags() {
3566 collect_tag(tag, &mut tags);
3567 }
3568
3569 tags
3570}
3571
3572fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3573 let mut cursor = Cursor::new(payload);
3574 let exif = match ExifReader::new().read_from_container(&mut cursor) {
3575 Ok(exif) => exif,
3576 Err(err) => {
3577 warn!("failed to read EXIF metadata: {err}");
3578 return (None, None);
3579 }
3580 };
3581
3582 let mut doc = DocExifMetadata::default();
3583 let mut has_data = false;
3584
3585 if let Some(make) = exif_string(&exif, Tag::Make) {
3586 doc.make = Some(make);
3587 has_data = true;
3588 }
3589 if let Some(model) = exif_string(&exif, Tag::Model) {
3590 doc.model = Some(model);
3591 has_data = true;
3592 }
3593 if let Some(lens) =
3594 exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3595 {
3596 doc.lens = Some(lens);
3597 has_data = true;
3598 }
3599 if let Some(dt) =
3600 exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3601 {
3602 doc.datetime = Some(dt);
3603 has_data = true;
3604 }
3605
3606 if let Some(gps) = extract_gps_metadata(&exif) {
3607 doc.gps = Some(gps);
3608 has_data = true;
3609 }
3610
3611 let caption = exif_string(&exif, Tag::ImageDescription);
3612
3613 let metadata = if has_data { Some(doc) } else { None };
3614 (metadata, caption)
3615}
3616
3617fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3618 exif.fields()
3619 .find(|field| field.tag == tag)
3620 .and_then(|field| field_to_string(field, exif))
3621}
3622
3623fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3624 let value = field.display_value().with_unit(exif).to_string();
3625 let trimmed = value.trim_matches('\0').trim();
3626 if trimmed.is_empty() {
3627 None
3628 } else {
3629 Some(trimmed.to_string())
3630 }
3631}
3632
3633fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3634 let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3635 let latitude_ref_field = exif
3636 .fields()
3637 .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3638 let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3639 let longitude_ref_field = exif
3640 .fields()
3641 .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3642
3643 let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3644 let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3645
3646 if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3647 if reference.eq_ignore_ascii_case("S") {
3648 latitude = -latitude;
3649 }
3650 }
3651 if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3652 if reference.eq_ignore_ascii_case("W") {
3653 longitude = -longitude;
3654 }
3655 }
3656
3657 Some(DocGpsMetadata {
3658 latitude,
3659 longitude,
3660 })
3661}
3662
3663fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3664 match value {
3665 ExifValue::Rational(values) if !values.is_empty() => {
3666 let deg = rational_to_f64_u(values.get(0)?)?;
3667 let min = values
3668 .get(1)
3669 .and_then(|v| rational_to_f64_u(v))
3670 .unwrap_or(0.0);
3671 let sec = values
3672 .get(2)
3673 .and_then(|v| rational_to_f64_u(v))
3674 .unwrap_or(0.0);
3675 Some(deg + (min / 60.0) + (sec / 3600.0))
3676 }
3677 ExifValue::SRational(values) if !values.is_empty() => {
3678 let deg = rational_to_f64_i(values.get(0)?)?;
3679 let min = values
3680 .get(1)
3681 .and_then(|v| rational_to_f64_i(v))
3682 .unwrap_or(0.0);
3683 let sec = values
3684 .get(2)
3685 .and_then(|v| rational_to_f64_i(v))
3686 .unwrap_or(0.0);
3687 Some(deg + (min / 60.0) + (sec / 3600.0))
3688 }
3689 _ => None,
3690 }
3691}
3692
3693fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3694 if value.denom == 0 {
3695 None
3696 } else {
3697 Some(value.num as f64 / value.denom as f64)
3698 }
3699}
3700
3701fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3702 if value.denom == 0 {
3703 None
3704 } else {
3705 Some(value.num as f64 / value.denom as f64)
3706 }
3707}
3708
3709fn value_to_ascii(value: &ExifValue) -> Option<String> {
3710 if let ExifValue::Ascii(values) = value {
3711 values
3712 .first()
3713 .and_then(|bytes| std::str::from_utf8(bytes).ok())
3714 .map(|s| s.trim_matches('\0').trim().to_string())
3715 .filter(|s| !s.is_empty())
3716 } else {
3717 None
3718 }
3719}
3720
3721fn ingest_payload(
3722 mem: &mut Memvid,
3723 args: &PutArgs,
3724 config: &CliConfig,
3725 payload: Vec<u8>,
3726 source_path: Option<&Path>,
3727 capacity_guard: Option<&mut CapacityGuard>,
3728 enable_embedding: bool,
3729 runtime: Option<&EmbeddingRuntime>,
3730 parallel_mode: bool,
3731) -> Result<IngestOutcome> {
3732 let original_bytes = payload.len();
3733 let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3734
3735 let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3738 crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3739
3740 let payload_text = std::str::from_utf8(&payload).ok();
3741 let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3742
3743 let audio_opts = AudioAnalyzeOptions {
3744 force: args.audio || args.transcribe,
3745 segment_secs: args
3746 .audio_segment_seconds
3747 .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3748 transcribe: args.transcribe,
3749 };
3750
3751 let mut analysis = analyze_file(
3752 source_path,
3753 &payload,
3754 payload_text,
3755 inferred_title.as_deref(),
3756 audio_opts,
3757 args.video,
3758 );
3759
3760 if args.video && !analysis.mime.starts_with("video/") {
3761 anyhow::bail!(
3762 "--video requires a video input; detected MIME type {}",
3763 analysis.mime
3764 );
3765 }
3766
3767 let mut search_text = analysis.search_text.clone();
3768 if let Some(ref mut text) = search_text {
3769 if text.len() > MAX_SEARCH_TEXT_LEN {
3770 let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3771 *text = truncated;
3772 }
3773 }
3774 analysis.search_text = search_text.clone();
3775
3776 let uri = if let Some(ref explicit) = args.uri {
3777 derive_uri(Some(explicit), None)
3778 } else if args.video {
3779 derive_video_uri(&payload, source_path, &analysis.mime)
3780 } else {
3781 derive_uri(None, source_path)
3782 };
3783
3784 let mut options_builder = PutOptions::builder()
3785 .enable_embedding(enable_embedding)
3786 .auto_tag(!args.no_auto_tag)
3787 .extract_dates(!args.no_extract_dates)
3788 .extract_triplets(!args.no_extract_triplets);
3789 if let Some(ts) = args.timestamp {
3790 options_builder = options_builder.timestamp(ts);
3791 }
3792 if let Some(track) = &args.track {
3793 options_builder = options_builder.track(track.clone());
3794 }
3795 if args.video {
3796 options_builder = options_builder.kind("video");
3797 options_builder = options_builder.tag("kind", "video");
3798 options_builder = options_builder.push_tag("video");
3799 } else if let Some(kind) = &args.kind {
3800 options_builder = options_builder.kind(kind.clone());
3801 }
3802 options_builder = options_builder.uri(uri.clone());
3803 if let Some(ref title) = inferred_title {
3804 options_builder = options_builder.title(title.clone());
3805 }
3806 for (key, value) in &args.tags {
3807 options_builder = options_builder.tag(key.clone(), value.clone());
3808 if !key.is_empty() {
3809 options_builder = options_builder.push_tag(key.clone());
3810 }
3811 if !value.is_empty() && value != key {
3812 options_builder = options_builder.push_tag(value.clone());
3813 }
3814 }
3815 for label in &args.labels {
3816 options_builder = options_builder.label(label.clone());
3817 }
3818 if let Some(metadata) = analysis.metadata.clone() {
3819 if !metadata.is_empty() {
3820 options_builder = options_builder.metadata(metadata);
3821 }
3822 }
3823 for (idx, entry) in args.metadata.iter().enumerate() {
3824 match serde_json::from_str::<DocMetadata>(entry) {
3825 Ok(meta) => {
3826 options_builder = options_builder.metadata(meta);
3827 }
3828 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3829 Ok(value) => {
3830 options_builder =
3831 options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3832 }
3833 Err(err) => {
3834 warn!("failed to parse --metadata JSON: {err}");
3835 }
3836 },
3837 }
3838 }
3839 if let Some(text) = search_text.clone() {
3840 if !text.trim().is_empty() {
3841 options_builder = options_builder.search_text(text);
3842 }
3843 }
3844 let effective_no_raw = !args.raw; if effective_no_raw {
3848 options_builder = options_builder.no_raw(true);
3849 if let Some(path) = source_path {
3850 options_builder = options_builder.source_path(path.display().to_string());
3851 }
3852 }
3853 if args.dedup {
3855 options_builder = options_builder.dedup(true);
3856 }
3857 let mut options = options_builder.build();
3858
3859 if let Some(budget_ms) = args.extraction_budget_ms {
3861 options.extraction_budget_ms = budget_ms;
3862 }
3863
3864 let existing_frame = if args.update_existing || !args.allow_duplicate {
3865 match mem.frame_by_uri(&uri) {
3866 Ok(frame) => Some(frame),
3867 Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3868 Err(err) => return Err(err.into()),
3869 }
3870 } else {
3871 None
3872 };
3873
3874 let frame_id = if let Some(ref existing) = existing_frame {
3877 existing.id
3878 } else {
3879 mem.stats()?.frame_count
3880 };
3881
3882 let mut embedded = false;
3883 let allow_embedding = enable_embedding && !args.video;
3884 if enable_embedding && !allow_embedding && args.video {
3885 warn!("semantic embeddings are not generated for video payloads");
3886 }
3887 let seq = if let Some(existing) = existing_frame {
3888 if args.update_existing {
3889 let payload_for_update = payload.clone();
3890 if allow_embedding {
3891 if let Some(path) = args.embedding_vec.as_deref() {
3892 let embedding = crate::utils::read_embedding(path)?;
3893 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3894 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3895 let expected = choice.dimensions();
3896 if expected != 0 && embedding.len() != expected {
3897 anyhow::bail!(
3898 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3899 embedding.len(),
3900 choice.name(),
3901 expected
3902 );
3903 }
3904 apply_embedding_identity_metadata_from_choice(
3905 &mut options,
3906 choice,
3907 embedding.len(),
3908 Some(model_str),
3909 );
3910 } else {
3911 options.extra_metadata.insert(
3912 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3913 embedding.len().to_string(),
3914 );
3915 }
3916 embedded = true;
3917 mem.update_frame(
3918 existing.id,
3919 Some(payload_for_update),
3920 options.clone(),
3921 Some(embedding),
3922 )
3923 .map_err(|err| {
3924 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3925 })?
3926 } else {
3927 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3928 let embed_text = search_text
3929 .clone()
3930 .or_else(|| payload_text.map(|text| text.to_string()))
3931 .unwrap_or_default();
3932 if embed_text.trim().is_empty() {
3933 warn!("no textual content available; embedding skipped");
3934 mem.update_frame(
3935 existing.id,
3936 Some(payload_for_update),
3937 options.clone(),
3938 None,
3939 )
3940 .map_err(|err| {
3941 map_put_error(
3942 err,
3943 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3944 )
3945 })?
3946 } else {
3947 let truncated_text = truncate_for_embedding(&embed_text);
3949 if truncated_text.len() < embed_text.len() {
3950 info!(
3951 "Truncated text from {} to {} chars for embedding",
3952 embed_text.len(),
3953 truncated_text.len()
3954 );
3955 }
3956 let embedding = runtime.embed_passage(&truncated_text)?;
3957 embedded = true;
3958 apply_embedding_identity_metadata(&mut options, runtime);
3959 mem.update_frame(
3960 existing.id,
3961 Some(payload_for_update),
3962 options.clone(),
3963 Some(embedding),
3964 )
3965 .map_err(|err| {
3966 map_put_error(
3967 err,
3968 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3969 )
3970 })?
3971 }
3972 }
3973 } else {
3974 mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3975 .map_err(|err| {
3976 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3977 })?
3978 }
3979 } else {
3980 return Err(DuplicateUriError::new(uri.as_str()).into());
3981 }
3982 } else {
3983 if let Some(guard) = capacity_guard.as_ref() {
3984 guard.ensure_capacity(stored_bytes as u64)?;
3985 }
3986 if parallel_mode {
3987 #[cfg(feature = "parallel_segments")]
3988 {
3989 let mut parent_embedding = None;
3990 let mut chunk_embeddings_vec = None;
3991 if allow_embedding {
3992 if let Some(path) = args.embedding_vec.as_deref() {
3993 let embedding = crate::utils::read_embedding(path)?;
3994 if let Some(model_str) = args.embedding_vec_model.as_deref() {
3995 let choice = model_str.parse::<EmbeddingModelChoice>()?;
3996 let expected = choice.dimensions();
3997 if expected != 0 && embedding.len() != expected {
3998 anyhow::bail!(
3999 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4000 embedding.len(),
4001 choice.name(),
4002 expected
4003 );
4004 }
4005 apply_embedding_identity_metadata_from_choice(
4006 &mut options,
4007 choice,
4008 embedding.len(),
4009 Some(model_str),
4010 );
4011 } else {
4012 options.extra_metadata.insert(
4013 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4014 embedding.len().to_string(),
4015 );
4016 }
4017 parent_embedding = Some(embedding);
4018 embedded = true;
4019 } else {
4020 let runtime =
4021 runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4022 let embed_text = search_text
4023 .clone()
4024 .or_else(|| payload_text.map(|text| text.to_string()))
4025 .unwrap_or_default();
4026 if embed_text.trim().is_empty() {
4027 warn!("no textual content available; embedding skipped");
4028 } else {
4029 info!(
4031 "parallel mode: checking for chunks on {} bytes payload",
4032 payload.len()
4033 );
4034 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4035 info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
4037
4038 #[cfg(feature = "temporal_enrich")]
4040 let enriched_chunks = if args.temporal_enrich {
4041 info!(
4042 "Temporal enrichment enabled, processing {} chunks",
4043 chunk_texts.len()
4044 );
4045 let today = chrono::Local::now().date_naive();
4047 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4048 let enriched: Vec<String> =
4050 results.iter().map(|(text, _)| text.clone()).collect();
4051 let resolved_count = results
4052 .iter()
4053 .filter(|(_, e)| {
4054 e.relative_phrases.iter().any(|p| p.resolved.is_some())
4055 })
4056 .count();
4057 info!(
4058 "Temporal enrichment: resolved {} chunks with temporal context",
4059 resolved_count
4060 );
4061 enriched
4062 } else {
4063 chunk_texts.clone()
4064 };
4065 #[cfg(not(feature = "temporal_enrich"))]
4066 let enriched_chunks = {
4067 if args.temporal_enrich {
4068 warn!(
4069 "Temporal enrichment requested but feature not compiled in"
4070 );
4071 }
4072 chunk_texts.clone()
4073 };
4074
4075 let embed_chunks = if args.contextual {
4077 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4078 let engine = if args.contextual_model.as_deref()
4079 == Some("local")
4080 {
4081 #[cfg(feature = "llama-cpp")]
4082 {
4083 let model_path = get_local_contextual_model_path()?;
4084 ContextualEngine::local(model_path)
4085 }
4086 #[cfg(not(feature = "llama-cpp"))]
4087 {
4088 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4089 }
4090 } else if let Some(model) = &args.contextual_model {
4091 ContextualEngine::openai_with_model(model)?
4092 } else {
4093 ContextualEngine::openai()?
4094 };
4095
4096 match engine
4097 .generate_contexts_batch(&embed_text, &enriched_chunks)
4098 {
4099 Ok(contexts) => {
4100 info!(
4101 "Generated {} contextual prefixes",
4102 contexts.len()
4103 );
4104 apply_contextual_prefixes(
4105 &embed_text,
4106 &enriched_chunks,
4107 &contexts,
4108 )
4109 }
4110 Err(e) => {
4111 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4112 enriched_chunks.clone()
4113 }
4114 }
4115 } else {
4116 enriched_chunks
4117 };
4118
4119 let truncated_chunks: Vec<String> = embed_chunks
4120 .iter()
4121 .map(|chunk_text| {
4122 let truncated_chunk = truncate_for_embedding(chunk_text);
4124 if truncated_chunk.len() < chunk_text.len() {
4125 info!(
4126 "parallel mode: Truncated chunk from {} to {} chars for embedding",
4127 chunk_text.len(),
4128 truncated_chunk.len()
4129 );
4130 }
4131 truncated_chunk
4132 })
4133 .collect();
4134 let truncated_refs: Vec<&str> = truncated_chunks
4135 .iter()
4136 .map(|chunk| chunk.as_str())
4137 .collect();
4138 let chunk_embeddings =
4139 runtime.embed_batch_passages(&truncated_refs)?;
4140 let num_chunks = chunk_embeddings.len();
4141 chunk_embeddings_vec = Some(chunk_embeddings);
4142 let parent_text = truncate_for_embedding(&embed_text);
4144 if parent_text.len() < embed_text.len() {
4145 info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
4146 }
4147 parent_embedding = Some(runtime.embed_passage(&parent_text)?);
4148 embedded = true;
4149 info!(
4150 "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
4151 num_chunks
4152 );
4153 } else {
4154 info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
4156 let truncated_text = truncate_for_embedding(&embed_text);
4157 if truncated_text.len() < embed_text.len() {
4158 info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
4159 }
4160 parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
4161 embedded = true;
4162 }
4163 }
4164 }
4165 }
4166 if embedded {
4167 if let Some(runtime) = runtime {
4168 apply_embedding_identity_metadata(&mut options, runtime);
4169 }
4170 }
4171 let payload_variant = if let Some(path) = source_path {
4172 ParallelPayload::Path(path.to_path_buf())
4173 } else {
4174 ParallelPayload::Bytes(payload)
4175 };
4176 let input = ParallelInput {
4177 payload: payload_variant,
4178 options: options.clone(),
4179 embedding: parent_embedding,
4180 chunk_embeddings: chunk_embeddings_vec,
4181 };
4182 return Ok(IngestOutcome {
4183 report: PutReport {
4184 seq: 0,
4185 frame_id: 0, uri,
4187 title: inferred_title,
4188 original_bytes,
4189 stored_bytes,
4190 compressed,
4191 no_raw: effective_no_raw,
4192 source: source_path.map(Path::to_path_buf),
4193 mime: Some(analysis.mime),
4194 metadata: analysis.metadata,
4195 extraction: analysis.extraction,
4196 #[cfg(feature = "logic_mesh")]
4197 search_text: search_text.clone(),
4198 },
4199 embedded,
4200 parallel_input: Some(input),
4201 });
4202 }
4203 #[cfg(not(feature = "parallel_segments"))]
4204 {
4205 mem.put_bytes_with_options(&payload, options)
4206 .map_err(|err| {
4207 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4208 })?
4209 }
4210 } else if allow_embedding {
4211 if let Some(path) = args.embedding_vec.as_deref() {
4212 let embedding = crate::utils::read_embedding(path)?;
4213 if let Some(model_str) = args.embedding_vec_model.as_deref() {
4214 let choice = model_str.parse::<EmbeddingModelChoice>()?;
4215 let expected = choice.dimensions();
4216 if expected != 0 && embedding.len() != expected {
4217 anyhow::bail!(
4218 "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4219 embedding.len(),
4220 choice.name(),
4221 expected
4222 );
4223 }
4224 apply_embedding_identity_metadata_from_choice(
4225 &mut options,
4226 choice,
4227 embedding.len(),
4228 Some(model_str),
4229 );
4230 } else {
4231 options.extra_metadata.insert(
4232 MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4233 embedding.len().to_string(),
4234 );
4235 }
4236 embedded = true;
4237 mem.put_with_embedding_and_options(&payload, embedding, options)
4238 .map_err(|err| {
4239 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4240 })?
4241 } else {
4242 let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4243 let embed_text = search_text
4244 .clone()
4245 .or_else(|| payload_text.map(|text| text.to_string()))
4246 .unwrap_or_default();
4247 if embed_text.trim().is_empty() {
4248 warn!("no textual content available; embedding skipped");
4249 mem.put_bytes_with_options(&payload, options)
4250 .map_err(|err| {
4251 map_put_error(
4252 err,
4253 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4254 )
4255 })?
4256 } else {
4257 if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4259 info!(
4261 "Document will be chunked into {} chunks, generating embeddings for each",
4262 chunk_texts.len()
4263 );
4264
4265 #[cfg(feature = "temporal_enrich")]
4267 let enriched_chunks = if args.temporal_enrich {
4268 info!(
4269 "Temporal enrichment enabled, processing {} chunks",
4270 chunk_texts.len()
4271 );
4272 let today = chrono::Local::now().date_naive();
4274 let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4275 let enriched: Vec<String> =
4277 results.iter().map(|(text, _)| text.clone()).collect();
4278 let resolved_count = results
4279 .iter()
4280 .filter(|(_, e)| {
4281 e.relative_phrases.iter().any(|p| p.resolved.is_some())
4282 })
4283 .count();
4284 info!(
4285 "Temporal enrichment: resolved {} chunks with temporal context",
4286 resolved_count
4287 );
4288 enriched
4289 } else {
4290 chunk_texts.clone()
4291 };
4292 #[cfg(not(feature = "temporal_enrich"))]
4293 let enriched_chunks = {
4294 if args.temporal_enrich {
4295 warn!("Temporal enrichment requested but feature not compiled in");
4296 }
4297 chunk_texts.clone()
4298 };
4299
4300 let embed_chunks = if args.contextual {
4302 info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4303 let engine = if args.contextual_model.as_deref() == Some("local") {
4304 #[cfg(feature = "llama-cpp")]
4305 {
4306 let model_path = get_local_contextual_model_path()?;
4307 ContextualEngine::local(model_path)
4308 }
4309 #[cfg(not(feature = "llama-cpp"))]
4310 {
4311 anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4312 }
4313 } else if let Some(model) = &args.contextual_model {
4314 ContextualEngine::openai_with_model(model)?
4315 } else {
4316 ContextualEngine::openai()?
4317 };
4318
4319 match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4320 Ok(contexts) => {
4321 info!("Generated {} contextual prefixes", contexts.len());
4322 apply_contextual_prefixes(
4323 &embed_text,
4324 &enriched_chunks,
4325 &contexts,
4326 )
4327 }
4328 Err(e) => {
4329 warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4330 enriched_chunks.clone()
4331 }
4332 }
4333 } else {
4334 enriched_chunks
4335 };
4336
4337 let truncated_chunks: Vec<String> = embed_chunks
4338 .iter()
4339 .map(|chunk_text| {
4340 let truncated_chunk = truncate_for_embedding(chunk_text);
4342 if truncated_chunk.len() < chunk_text.len() {
4343 info!(
4344 "Truncated chunk from {} to {} chars for embedding",
4345 chunk_text.len(),
4346 truncated_chunk.len()
4347 );
4348 }
4349 truncated_chunk
4350 })
4351 .collect();
4352 let truncated_refs: Vec<&str> = truncated_chunks
4353 .iter()
4354 .map(|chunk| chunk.as_str())
4355 .collect();
4356 let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4357 let parent_text = truncate_for_embedding(&embed_text);
4359 if parent_text.len() < embed_text.len() {
4360 info!(
4361 "Truncated parent text from {} to {} chars for embedding",
4362 embed_text.len(),
4363 parent_text.len()
4364 );
4365 }
4366 let parent_embedding = runtime.embed_passage(&parent_text)?;
4367 embedded = true;
4368 apply_embedding_identity_metadata(&mut options, runtime);
4369 info!(
4370 "Calling put_with_chunk_embeddings with {} chunk embeddings",
4371 chunk_embeddings.len()
4372 );
4373 mem.put_with_chunk_embeddings(
4374 &payload,
4375 Some(parent_embedding),
4376 chunk_embeddings,
4377 options,
4378 )
4379 .map_err(|err| {
4380 map_put_error(
4381 err,
4382 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4383 )
4384 })?
4385 } else {
4386 info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4388 let truncated_text = truncate_for_embedding(&embed_text);
4389 if truncated_text.len() < embed_text.len() {
4390 info!(
4391 "Truncated text from {} to {} chars for embedding",
4392 embed_text.len(),
4393 truncated_text.len()
4394 );
4395 }
4396 let embedding = runtime.embed_passage(&truncated_text)?;
4397 embedded = true;
4398 apply_embedding_identity_metadata(&mut options, runtime);
4399 mem.put_with_embedding_and_options(&payload, embedding, options)
4400 .map_err(|err| {
4401 map_put_error(
4402 err,
4403 capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4404 )
4405 })?
4406 }
4407 }
4408 }
4409 } else {
4410 mem.put_bytes_with_options(&payload, options)
4411 .map_err(|err| {
4412 map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4413 })?
4414 }
4415 };
4416
4417 Ok(IngestOutcome {
4418 report: PutReport {
4419 seq,
4420 frame_id,
4421 uri,
4422 title: inferred_title,
4423 original_bytes,
4424 stored_bytes,
4425 compressed,
4426 no_raw: effective_no_raw,
4427 source: source_path.map(Path::to_path_buf),
4428 mime: Some(analysis.mime),
4429 metadata: analysis.metadata,
4430 extraction: analysis.extraction,
4431 #[cfg(feature = "logic_mesh")]
4432 search_text,
4433 },
4434 embedded,
4435 #[cfg(feature = "parallel_segments")]
4436 parallel_input: None,
4437 })
4438}
4439
4440fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4441 if std::str::from_utf8(payload).is_ok() {
4442 let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4443 Ok((compressed.len(), true))
4444 } else {
4445 Ok((payload.len(), false))
4446 }
4447}
4448
4449fn print_report(report: &PutReport) {
4450 let name = report
4451 .source
4452 .as_ref()
4453 .map(|path| {
4454 let pretty = env::current_dir()
4455 .ok()
4456 .as_ref()
4457 .and_then(|cwd| diff_paths(path, cwd))
4458 .unwrap_or_else(|| path.to_path_buf());
4459 pretty.to_string_lossy().into_owned()
4460 })
4461 .unwrap_or_else(|| "stdin".to_string());
4462
4463 let ratio = if report.original_bytes > 0 {
4464 (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4465 } else {
4466 100.0
4467 };
4468
4469 println!("• {name} → {}", report.uri);
4470 println!(" seq: {}", report.seq);
4471 if report.no_raw {
4472 println!(
4473 " size: {} B (--no-raw: text only, hash stored)",
4474 report.original_bytes
4475 );
4476 } else if report.compressed {
4477 println!(
4478 " size: {} B → {} B ({:.1}% of original, compressed)",
4479 report.original_bytes, report.stored_bytes, ratio
4480 );
4481 } else {
4482 println!(" size: {} B (stored as-is)", report.original_bytes);
4483 }
4484 if let Some(mime) = &report.mime {
4485 println!(" mime: {mime}");
4486 }
4487 if let Some(title) = &report.title {
4488 println!(" title: {title}");
4489 }
4490 let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4491 println!(
4492 " extraction: status={} reader={}",
4493 report.extraction.status.label(),
4494 extraction_reader
4495 );
4496 if let Some(ms) = report.extraction.duration_ms {
4497 println!(" extraction-duration: {} ms", ms);
4498 }
4499 if let Some(pages) = report.extraction.pages_processed {
4500 println!(" extraction-pages: {}", pages);
4501 }
4502 for warning in &report.extraction.warnings {
4503 println!(" warning: {warning}");
4504 }
4505 if let Some(metadata) = &report.metadata {
4506 if let Some(caption) = &metadata.caption {
4507 println!(" caption: {caption}");
4508 }
4509 if let Some(audio) = metadata.audio.as_ref() {
4510 if audio.duration_secs.is_some()
4511 || audio.sample_rate_hz.is_some()
4512 || audio.channels.is_some()
4513 {
4514 let duration = audio
4515 .duration_secs
4516 .map(|secs| format!("{secs:.1}s"))
4517 .unwrap_or_else(|| "unknown".into());
4518 let rate = audio
4519 .sample_rate_hz
4520 .map(|hz| format!("{} Hz", hz))
4521 .unwrap_or_else(|| "? Hz".into());
4522 let channels = audio
4523 .channels
4524 .map(|ch| ch.to_string())
4525 .unwrap_or_else(|| "?".into());
4526 println!(" audio: duration {duration}, {channels} ch, {rate}");
4527 }
4528 }
4529 }
4530
4531 println!();
4532}
4533
4534fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4535 let source = report
4536 .source
4537 .as_ref()
4538 .map(|path| path.to_string_lossy().into_owned());
4539 let extraction = json!({
4540 "status": report.extraction.status.label(),
4541 "reader": report.extraction.reader,
4542 "duration_ms": report.extraction.duration_ms,
4543 "pages_processed": report.extraction.pages_processed,
4544 "warnings": report.extraction.warnings,
4545 });
4546 json!({
4547 "seq": report.seq,
4548 "uri": report.uri,
4549 "title": report.title,
4550 "original_bytes": report.original_bytes,
4551 "original_bytes_human": format_bytes(report.original_bytes as u64),
4552 "stored_bytes": report.stored_bytes,
4553 "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4554 "compressed": report.compressed,
4555 "mime": report.mime,
4556 "source": source,
4557 "metadata": report.metadata,
4558 "extraction": extraction,
4559 })
4560}
4561
4562fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4563 match err {
4564 MemvidError::CapacityExceeded {
4565 current,
4566 limit,
4567 required,
4568 } => anyhow!(CapacityExceededMessage {
4569 current,
4570 limit,
4571 required,
4572 }),
4573 other => other.into(),
4574 }
4575}
4576
4577fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4578 for (idx, entry) in entries.iter().enumerate() {
4579 match serde_json::from_str::<DocMetadata>(entry) {
4580 Ok(meta) => {
4581 options.metadata = Some(meta);
4582 }
4583 Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4584 Ok(value) => {
4585 options
4586 .extra_metadata
4587 .insert(format!("custom_metadata_{idx}"), value.to_string());
4588 }
4589 Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4590 },
4591 }
4592 }
4593}
4594
4595fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4596 let stats = mem.stats()?;
4597 let message = match stats.tier {
4598 Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4599 Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4600 };
4601 Ok(message)
4602}
4603
4604fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4605 let trimmed = raw.trim().trim_start_matches("mv2://");
4606 let normalized = trimmed.replace('\\', "/");
4607 let mut segments: Vec<String> = Vec::new();
4608 for segment in normalized.split('/') {
4609 if segment.is_empty() || segment == "." {
4610 continue;
4611 }
4612 if segment == ".." {
4613 segments.pop();
4614 continue;
4615 }
4616 segments.push(segment.to_string());
4617 }
4618
4619 if segments.is_empty() {
4620 return normalized
4621 .split('/')
4622 .last()
4623 .filter(|s| !s.is_empty())
4624 .unwrap_or("document")
4625 .to_string();
4626 }
4627
4628 if keep_path {
4629 segments.join("/")
4630 } else {
4631 segments
4632 .last()
4633 .cloned()
4634 .unwrap_or_else(|| "document".to_string())
4635 }
4636}
4637
4638fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4639 if quiet {
4640 let pb = ProgressBar::hidden();
4641 pb.set_message(message.to_string());
4642 return pb;
4643 }
4644
4645 match total {
4646 Some(len) => {
4647 let pb = ProgressBar::new(len);
4648 pb.set_draw_target(ProgressDrawTarget::stderr());
4649 let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4650 .unwrap_or_else(|_| ProgressStyle::default_bar());
4651 pb.set_style(style);
4652 pb.set_message(message.to_string());
4653 pb
4654 }
4655 None => {
4656 let pb = ProgressBar::new_spinner();
4657 pb.set_draw_target(ProgressDrawTarget::stderr());
4658 pb.set_message(message.to_string());
4659 pb.enable_steady_tick(Duration::from_millis(120));
4660 pb
4661 }
4662 }
4663}
4664
4665fn create_spinner(message: &str) -> ProgressBar {
4666 let pb = ProgressBar::new_spinner();
4667 pb.set_draw_target(ProgressDrawTarget::stderr());
4668 let style = ProgressStyle::with_template("{spinner} {msg}")
4669 .unwrap_or_else(|_| ProgressStyle::default_spinner())
4670 .tick_strings(&["-", "\\", "|", "/"]);
4671 pb.set_style(style);
4672 pb.set_message(message.to_string());
4673 pb.enable_steady_tick(Duration::from_millis(120));
4674 pb
4675}
4676
4677#[cfg(feature = "parallel_segments")]
4683#[derive(Args)]
4684pub struct PutManyArgs {
4685 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4687 pub file: PathBuf,
4688 #[arg(long)]
4690 pub json: bool,
4691 #[arg(long, value_name = "PATH")]
4694 pub input: Option<PathBuf>,
4695 #[arg(long, default_value = "3")]
4697 pub compression_level: i32,
4698 #[arg(long)]
4701 pub raw: bool,
4702 #[arg(long, hide = true)]
4705 pub no_raw: bool,
4706 #[arg(long)]
4709 pub dedup: bool,
4710 #[command(flatten)]
4711 pub lock: LockCliArgs,
4712}
4713
4714#[cfg(feature = "parallel_segments")]
4716#[derive(Debug, serde::Deserialize)]
4717pub struct PutManyRequest {
4718 pub title: String,
4719 #[serde(default)]
4720 pub label: String,
4721 pub text: String,
4722 #[serde(default)]
4723 pub uri: Option<String>,
4724 #[serde(default)]
4725 pub tags: Vec<String>,
4726 #[serde(default)]
4727 pub labels: Vec<String>,
4728 #[serde(default)]
4729 pub metadata: serde_json::Value,
4730 #[serde(default)]
4733 pub extra_metadata: BTreeMap<String, String>,
4734 #[serde(default)]
4736 pub embedding: Option<Vec<f32>>,
4737 #[serde(default)]
4741 pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4742 #[serde(default)]
4745 pub no_raw: Option<bool>,
4746 #[serde(default)]
4749 pub dedup: Option<bool>,
4750}
4751
4752#[cfg(feature = "parallel_segments")]
4754#[derive(Debug, serde::Deserialize)]
4755#[serde(transparent)]
4756pub struct PutManyBatch {
4757 pub requests: Vec<PutManyRequest>,
4758}
4759
4760#[cfg(feature = "parallel_segments")]
4761pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4762 use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4763 use std::io::Read;
4764
4765 crate::utils::require_active_plan(config, "put-many")?;
4767
4768 let mut mem = Memvid::open(&args.file)?;
4769 crate::utils::ensure_cli_mutation_allowed(&mem)?;
4770 crate::utils::apply_lock_cli(&mut mem, &args.lock);
4771
4772 let stats = mem.stats()?;
4774 if !stats.has_lex_index {
4775 mem.enable_lex()?;
4776 }
4777 if !stats.has_vec_index {
4778 mem.enable_vec()?;
4779 }
4780
4781 crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4783
4784 let batch_json: String = if let Some(input_path) = &args.input {
4786 fs::read_to_string(input_path)
4787 .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4788 } else {
4789 let mut buffer = String::new();
4790 io::stdin()
4791 .read_to_string(&mut buffer)
4792 .context("Failed to read from stdin")?;
4793 buffer
4794 };
4795
4796 let batch: PutManyBatch =
4797 serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4798
4799 if batch.requests.is_empty() {
4800 if args.json {
4801 println!("{{\"frame_ids\": [], \"count\": 0}}");
4802 } else {
4803 println!("No requests to process");
4804 }
4805 return Ok(());
4806 }
4807
4808 let count = batch.requests.len();
4809 if !args.json {
4810 eprintln!("Processing {} documents...", count);
4811 }
4812
4813 let global_no_raw = !args.raw; let global_dedup = args.dedup;
4818 let inputs: Vec<ParallelInput> = batch
4819 .requests
4820 .into_iter()
4821 .enumerate()
4822 .map(|(i, req)| {
4823 let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4824 let mut options = PutOptions::default();
4825 options.title = Some(req.title.clone());
4826 options.uri = Some(uri);
4827 options.tags = req.tags;
4828 options.labels = req.labels;
4829 options.extra_metadata = req.extra_metadata;
4830 options.no_raw = req.no_raw.unwrap_or(global_no_raw);
4832 options.dedup = req.dedup.unwrap_or(global_dedup);
4834 if !req.metadata.is_null() {
4835 match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4836 Ok(meta) => options.metadata = Some(meta),
4837 Err(_) => {
4838 options
4839 .extra_metadata
4840 .entry("memvid.metadata.json".to_string())
4841 .or_insert_with(|| req.metadata.to_string());
4842 }
4843 }
4844 }
4845
4846 if let Some(embedding) = req
4847 .embedding
4848 .as_ref()
4849 .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4850 {
4851 options
4852 .extra_metadata
4853 .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4854 .or_insert_with(|| embedding.len().to_string());
4855 }
4856
4857 ParallelInput {
4858 payload: ParallelPayload::Bytes(req.text.into_bytes()),
4859 options,
4860 embedding: req.embedding,
4861 chunk_embeddings: req.chunk_embeddings,
4862 }
4863 })
4864 .collect();
4865
4866 let build_opts = BuildOpts {
4868 zstd_level: args.compression_level.clamp(1, 9),
4869 ..BuildOpts::default()
4870 };
4871
4872 let frame_ids = mem
4874 .put_parallel_inputs(&inputs, build_opts)
4875 .context("Failed to ingest batch")?;
4876
4877 if args.json {
4878 let output = serde_json::json!({
4879 "frame_ids": frame_ids,
4880 "count": frame_ids.len()
4881 });
4882 println!("{}", serde_json::to_string(&output)?);
4883 } else {
4884 println!("Ingested {} documents", frame_ids.len());
4885 if frame_ids.len() <= 10 {
4886 for fid in &frame_ids {
4887 println!(" frame_id: {}", fid);
4888 }
4889 } else {
4890 println!(" first: {}", frame_ids.first().unwrap_or(&0));
4891 println!(" last: {}", frame_ids.last().unwrap_or(&0));
4892 }
4893 }
4894
4895 Ok(())
4896}
4897
4898#[derive(Args)]
4900pub struct CorrectArgs {
4901 #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4903 pub file: PathBuf,
4904 #[arg(value_name = "STATEMENT")]
4906 pub statement: String,
4907 #[arg(long, value_name = "TOPICS", value_delimiter = ',')]
4909 pub topics: Option<Vec<String>>,
4910 #[arg(long, value_name = "SOURCE")]
4912 pub source: Option<String>,
4913 #[arg(long, default_value = "2.0")]
4915 pub boost: f64,
4916 #[arg(long)]
4918 pub json: bool,
4919 #[command(flatten)]
4920 pub lock: LockCliArgs,
4921}
4922
4923pub fn handle_correct(config: &CliConfig, args: CorrectArgs) -> Result<()> {
4929 crate::utils::require_active_plan(config, "correct")?;
4930
4931 let mut mem = Memvid::open(&args.file)?;
4932 ensure_cli_mutation_allowed(&mem)?;
4933 apply_lock_cli(&mut mem, &args.lock);
4934
4935 let stats = mem.stats()?;
4936 if stats.frame_count == 0 && !stats.has_lex_index {
4937 mem.enable_lex()?;
4938 }
4939
4940 let mut extra_metadata = BTreeMap::new();
4942 extra_metadata.insert("memvid.correction".to_string(), "true".to_string());
4943 extra_metadata.insert(
4944 "memvid.correction.boost".to_string(),
4945 args.boost.to_string(),
4946 );
4947 if let Some(source) = &args.source {
4948 extra_metadata.insert("memvid.correction.source".to_string(), source.clone());
4949 }
4950
4951 let tags: Vec<String> = args.topics.clone().unwrap_or_default();
4953
4954 let mut options = PutOptions::default();
4956 options.title = Some(format!(
4957 "Correction: {}",
4958 truncate_title(&args.statement, 50)
4959 ));
4960 options.uri = Some(format!("mv2://correction/{}", Uuid::new_v4()));
4961 options.labels = vec!["correction".to_string()];
4962 options.tags = tags;
4963 options.extra_metadata = extra_metadata;
4964 options.no_raw = true; let frame_id = mem.put_bytes_with_options(args.statement.as_bytes(), options)?;
4968
4969 if args.json {
4970 let output = json!({
4971 "frame_id": frame_id,
4972 "type": "correction",
4973 "boost": args.boost,
4974 "topics": args.topics.unwrap_or_default(),
4975 "source": args.source,
4976 });
4977 println!("{}", serde_json::to_string_pretty(&output)?);
4978 } else {
4979 println!("✓ Correction stored (frame_id: {})", frame_id);
4980 if let Some(topics) = &args.topics {
4981 println!(" Topics: {}", topics.join(", "));
4982 }
4983 if let Some(source) = &args.source {
4984 println!(" Source: {}", source);
4985 }
4986 println!(" Boost: {}x", args.boost);
4987 }
4988
4989 Ok(())
4990}
4991
4992fn truncate_title(s: &str, max_len: usize) -> String {
4994 if s.len() <= max_len {
4995 s.to_string()
4996 } else {
4997 format!("{}...", &s[..max_len.saturating_sub(3)])
4998 }
4999}