memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
26use memvid_core::{
27    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
28    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions, ReaderHint,
29    ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
30};
31#[cfg(feature = "parallel_segments")]
32use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
33use pdf_extract::OutputError as PdfExtractError;
34use serde_json::json;
35use tracing::{info, warn};
36use tree_magic_mini::from_u8 as magic_from_u8;
37use uuid::Uuid;
38
39const MAX_SEARCH_TEXT_LEN: usize = 32_768;
40/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
41/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
42const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
43const COLOR_PALETTE_SIZE: u8 = 5;
44const COLOR_PALETTE_QUALITY: u8 = 8;
45const MAGIC_SNIFF_BYTES: usize = 16;
46
47/// Truncate text for embedding to fit within OpenAI token limits.
48/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
49fn truncate_for_embedding(text: &str) -> String {
50    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
51        text.to_string()
52    } else {
53        // Find a char boundary to avoid splitting UTF-8
54        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
55        // Try to find the last complete character
56        let end = truncated
57            .char_indices()
58            .rev()
59            .next()
60            .map(|(i, c)| i + c.len_utf8())
61            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
62        text[..end].to_string()
63    }
64}
65
66/// Get the default local model path for contextual retrieval (phi-3.5-mini).
67/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
68#[cfg(feature = "llama-cpp")]
69fn get_local_contextual_model_path() -> Result<PathBuf> {
70    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
71        // Handle ~ expansion manually
72        let expanded = if custom.starts_with("~/") {
73            if let Ok(home) = env::var("HOME") {
74                PathBuf::from(home).join(&custom[2..])
75            } else {
76                PathBuf::from(&custom)
77            }
78        } else {
79            PathBuf::from(&custom)
80        };
81        expanded
82    } else {
83        // Default to ~/.memvid/models
84        let home = env::var("HOME")
85            .map_err(|_| anyhow!("HOME environment variable not set"))?;
86        PathBuf::from(home).join(".memvid").join("models")
87    };
88
89    let model_path = models_dir
90        .join("llm")
91        .join("phi-3-mini-q4")
92        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
93
94    if model_path.exists() {
95        Ok(model_path)
96    } else {
97        Err(anyhow!(
98            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
99            model_path.display()
100        ))
101    }
102}
103
104use pathdiff::diff_paths;
105
106use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
107use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
108use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
109use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
110use crate::error::{CapacityExceededMessage, DuplicateUriError};
111#[cfg(feature = "temporal_enrich")]
112use memvid_core::enrich_chunks as temporal_enrich_chunks;
113use crate::utils::{
114    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
115    select_frame,
116};
117
118/// Helper function to parse boolean flags from environment variables
119fn parse_bool_flag(value: &str) -> Option<bool> {
120    match value.trim().to_ascii_lowercase().as_str() {
121        "1" | "true" | "yes" | "on" => Some(true),
122        "0" | "false" | "no" | "off" => Some(false),
123        _ => None,
124    }
125}
126
127/// Helper function to check for parallel segments environment override
128#[cfg(feature = "parallel_segments")]
129fn parallel_env_override() -> Option<bool> {
130    std::env::var("MEMVID_PARALLEL_SEGMENTS")
131        .ok()
132        .and_then(|value| parse_bool_flag(&value))
133}
134
135/// Parse key=value pairs for tags
136pub fn parse_key_val(s: &str) -> Result<(String, String)> {
137    let (key, value) = s
138        .split_once('=')
139        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
140    Ok((key.to_string(), value.to_string()))
141}
142
143/// Lock-related CLI arguments (shared across commands)
144#[derive(Args, Clone, Copy, Debug)]
145pub struct LockCliArgs {
146    /// Maximum time to wait for an active writer before failing (ms)
147    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
148    pub lock_timeout: u64,
149    /// Attempt a stale takeover if the recorded writer heartbeat has expired
150    #[arg(long = "force", action = ArgAction::SetTrue)]
151    pub force: bool,
152}
153
154/// Arguments for the `put` subcommand
155#[derive(Args)]
156pub struct PutArgs {
157    /// Path to the memory file to append to
158    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
159    pub file: PathBuf,
160    /// Emit JSON instead of human-readable output
161    #[arg(long)]
162    pub json: bool,
163    /// Read payload bytes from a file instead of stdin
164    #[arg(long, value_name = "PATH")]
165    pub input: Option<String>,
166    /// Override the derived URI for the document
167    #[arg(long, value_name = "URI")]
168    pub uri: Option<String>,
169    /// Override the derived title for the document
170    #[arg(long, value_name = "TITLE")]
171    pub title: Option<String>,
172    /// Optional POSIX timestamp to store on the frame
173    #[arg(long)]
174    pub timestamp: Option<i64>,
175    /// Track name for the frame
176    #[arg(long)]
177    pub track: Option<String>,
178    /// Kind metadata for the frame
179    #[arg(long)]
180    pub kind: Option<String>,
181    /// Store the payload as a binary video without transcoding
182    #[arg(long, action = ArgAction::SetTrue)]
183    pub video: bool,
184    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
185    #[arg(long, action = ArgAction::SetTrue)]
186    pub transcript: bool,
187    /// Tags to attach in key=value form
188    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
189    pub tags: Vec<(String, String)>,
190    /// Free-form labels to attach to the frame
191    #[arg(long = "label", value_name = "LABEL")]
192    pub labels: Vec<String>,
193    /// Additional metadata payloads expressed as JSON
194    #[arg(long = "metadata", value_name = "JSON")]
195    pub metadata: Vec<String>,
196    /// Disable automatic tag generation from extracted text
197    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
198    pub no_auto_tag: bool,
199    /// Disable automatic date extraction from content
200    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
201    pub no_extract_dates: bool,
202    /// Force audio analysis and tagging when ingesting binary data
203    #[arg(long, action = ArgAction::SetTrue)]
204    pub audio: bool,
205    /// Override the default segment duration (in seconds) when generating audio snippets
206    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
207    pub audio_segment_seconds: Option<u32>,
208    /// Compute semantic embeddings using the installed model
209    /// Increases storage overhead from ~5KB to ~270KB per document
210    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
211    pub embedding: bool,
212    /// Explicitly disable embeddings (default, but kept for clarity)
213    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
214    pub no_embedding: bool,
215    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
216    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
217    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
218    pub embedding_vec: Option<PathBuf>,
219    /// Enable Product Quantization compression for vectors (16x compression)
220    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
221    /// Only applies when --embedding is enabled
222    #[arg(long, action = ArgAction::SetTrue)]
223    pub vector_compression: bool,
224    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
225    /// This improves retrieval accuracy for preference/personalization queries
226    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
227    #[arg(long, action = ArgAction::SetTrue)]
228    pub contextual: bool,
229    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
230    #[arg(long = "contextual-model", value_name = "MODEL")]
231    pub contextual_model: Option<String>,
232    /// Automatically extract tables from PDF documents
233    /// Uses aggressive mode with fallbacks for best results
234    #[arg(long, action = ArgAction::SetTrue)]
235    pub tables: bool,
236    /// Disable automatic table extraction (when --tables is the default)
237    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
238    pub no_tables: bool,
239    /// Replace any existing frame with the same URI instead of inserting a duplicate
240    #[arg(long, conflicts_with = "allow_duplicate")]
241    pub update_existing: bool,
242    /// Allow inserting a new frame even if a frame with the same URI already exists
243    #[arg(long)]
244    pub allow_duplicate: bool,
245
246    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
247    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
248    /// Requires the temporal_enrich feature in memvid-core
249    #[arg(long, action = ArgAction::SetTrue)]
250    pub temporal_enrich: bool,
251
252    /// Use the experimental parallel segment builder (requires --features parallel_segments)
253    #[cfg(feature = "parallel_segments")]
254    #[arg(long, action = ArgAction::SetTrue)]
255    pub parallel_segments: bool,
256    /// Force the legacy ingestion path even when the parallel feature is available
257    #[cfg(feature = "parallel_segments")]
258    #[arg(long, action = ArgAction::SetTrue)]
259    pub no_parallel_segments: bool,
260    /// Target tokens per segment when using the parallel builder
261    #[cfg(feature = "parallel_segments")]
262    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
263    pub parallel_segment_tokens: Option<usize>,
264    /// Target pages per segment when using the parallel builder
265    #[cfg(feature = "parallel_segments")]
266    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
267    pub parallel_segment_pages: Option<usize>,
268    /// Worker threads used by the parallel builder
269    #[cfg(feature = "parallel_segments")]
270    #[arg(long = "parallel-threads", value_name = "N")]
271    pub parallel_threads: Option<usize>,
272    /// Worker queue depth used by the parallel builder
273    #[cfg(feature = "parallel_segments")]
274    #[arg(long = "parallel-queue-depth", value_name = "N")]
275    pub parallel_queue_depth: Option<usize>,
276
277    #[command(flatten)]
278    pub lock: LockCliArgs,
279}
280
281#[cfg(feature = "parallel_segments")]
282impl PutArgs {
283    pub fn wants_parallel(&self) -> bool {
284        if self.parallel_segments {
285            return true;
286        }
287        if self.no_parallel_segments {
288            return false;
289        }
290        if let Some(env_flag) = parallel_env_override() {
291            return env_flag;
292        }
293        false
294    }
295
296    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
297        let mut opts = memvid_core::BuildOpts::default();
298        if let Some(tokens) = self.parallel_segment_tokens {
299            opts.segment_tokens = tokens;
300        }
301        if let Some(pages) = self.parallel_segment_pages {
302            opts.segment_pages = pages;
303        }
304        if let Some(threads) = self.parallel_threads {
305            opts.threads = threads;
306        }
307        if let Some(depth) = self.parallel_queue_depth {
308            opts.queue_depth = depth;
309        }
310        opts.sanitize();
311        opts
312    }
313}
314
315#[cfg(not(feature = "parallel_segments"))]
316impl PutArgs {
317    pub fn wants_parallel(&self) -> bool {
318        false
319    }
320}
321
322/// Arguments for the `api-fetch` subcommand
323#[derive(Args)]
324pub struct ApiFetchArgs {
325    /// Path to the memory file to ingest into
326    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
327    pub file: PathBuf,
328    /// Path to the fetch configuration JSON file
329    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
330    pub config: PathBuf,
331    /// Perform a dry run without writing to the memory
332    #[arg(long, action = ArgAction::SetTrue)]
333    pub dry_run: bool,
334    /// Override the configured ingest mode
335    #[arg(long, value_name = "MODE")]
336    pub mode: Option<ApiFetchMode>,
337    /// Override the base URI used when constructing frame URIs
338    #[arg(long, value_name = "URI")]
339    pub uri: Option<String>,
340    /// Emit JSON instead of human-readable output
341    #[arg(long, action = ArgAction::SetTrue)]
342    pub json: bool,
343
344    #[command(flatten)]
345    pub lock: LockCliArgs,
346}
347
348/// Arguments for the `update` subcommand
349#[derive(Args)]
350pub struct UpdateArgs {
351    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
352    pub file: PathBuf,
353    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
354    pub frame_id: Option<u64>,
355    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
356    pub uri: Option<String>,
357    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
358    pub input: Option<PathBuf>,
359    #[arg(long = "set-uri", value_name = "URI")]
360    pub set_uri: Option<String>,
361    #[arg(long, value_name = "TITLE")]
362    pub title: Option<String>,
363    #[arg(long, value_name = "TIMESTAMP")]
364    pub timestamp: Option<i64>,
365    #[arg(long, value_name = "TRACK")]
366    pub track: Option<String>,
367    #[arg(long, value_name = "KIND")]
368    pub kind: Option<String>,
369    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
370    pub tags: Vec<(String, String)>,
371    #[arg(long = "label", value_name = "LABEL")]
372    pub labels: Vec<String>,
373    #[arg(long = "metadata", value_name = "JSON")]
374    pub metadata: Vec<String>,
375    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
376    pub embeddings: bool,
377    #[arg(long)]
378    pub json: bool,
379
380    #[command(flatten)]
381    pub lock: LockCliArgs,
382}
383
384/// Arguments for the `delete` subcommand
385#[derive(Args)]
386pub struct DeleteArgs {
387    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
388    pub file: PathBuf,
389    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
390    pub frame_id: Option<u64>,
391    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
392    pub uri: Option<String>,
393    #[arg(long, action = ArgAction::SetTrue)]
394    pub yes: bool,
395    #[arg(long)]
396    pub json: bool,
397
398    #[command(flatten)]
399    pub lock: LockCliArgs,
400}
401
402/// Handler for `memvid api-fetch`
403pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
404    let command = ApiFetchCommand {
405        file: args.file,
406        config_path: args.config,
407        dry_run: args.dry_run,
408        mode_override: args.mode,
409        uri_override: args.uri,
410        output_json: args.json,
411        lock_timeout_ms: args.lock.lock_timeout,
412        force_lock: args.lock.force,
413    };
414    run_api_fetch(config, command)
415}
416
417/// Handler for `memvid delete`
418pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
419    let mut mem = Memvid::open(&args.file)?;
420    ensure_cli_mutation_allowed(&mem)?;
421    apply_lock_cli(&mut mem, &args.lock);
422    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
423
424    if !args.yes {
425        if let Some(uri) = &frame.uri {
426            println!("About to delete frame {} ({uri})", frame.id);
427        } else {
428            println!("About to delete frame {}", frame.id);
429        }
430        print!("Confirm? [y/N]: ");
431        io::stdout().flush()?;
432        let mut input = String::new();
433        io::stdin().read_line(&mut input)?;
434        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
435        if !confirmed {
436            println!("Aborted");
437            return Ok(());
438        }
439    }
440
441    let seq = mem.delete_frame(frame.id)?;
442    mem.commit()?;
443    let deleted = mem.frame_by_id(frame.id)?;
444
445    if args.json {
446        let json = json!({
447            "frame_id": frame.id,
448            "sequence": seq,
449            "status": frame_status_str(deleted.status),
450        });
451        println!("{}", serde_json::to_string_pretty(&json)?);
452    } else {
453        println!("Deleted frame {} (seq {})", frame.id, seq);
454    }
455
456    Ok(())
457}
458pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
459    let mut mem = Memvid::open(&args.file)?;
460    ensure_cli_mutation_allowed(&mem)?;
461    apply_lock_cli(&mut mem, &args.lock);
462    let mut stats = mem.stats()?;
463    if stats.frame_count == 0 && !stats.has_lex_index {
464        mem.enable_lex()?;
465        stats = mem.stats()?;
466    }
467
468    // Set vector compression mode if requested
469    if args.vector_compression {
470        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
471    }
472
473    let mut capacity_guard = CapacityGuard::from_stats(&stats);
474    let use_parallel = args.wants_parallel();
475    #[cfg(feature = "parallel_segments")]
476    let mut parallel_opts = if use_parallel {
477        Some(args.sanitized_parallel_opts())
478    } else {
479        None
480    };
481    #[cfg(feature = "parallel_segments")]
482    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
483    #[cfg(feature = "parallel_segments")]
484    let mut pending_parallel_indices: Vec<usize> = Vec::new();
485    let input_set = resolve_inputs(args.input.as_deref())?;
486
487    if args.video {
488        if let Some(kind) = &args.kind {
489            if !kind.eq_ignore_ascii_case("video") {
490                anyhow::bail!("--video conflicts with explicit --kind={kind}");
491            }
492        }
493        if matches!(input_set, InputSet::Stdin) {
494            anyhow::bail!("--video requires --input <file>");
495        }
496    }
497
498    #[allow(dead_code)]
499    enum EmbeddingMode {
500        None,
501        Auto,
502        Explicit,
503    }
504
505    // PHASE 1: Make embeddings opt-in, not opt-out
506    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
507    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
508    let mv2_dimension = mem.vec_index_dimension();
509    let (embedding_mode, embedding_runtime) = if args.embedding {
510        (
511            EmbeddingMode::Explicit,
512            Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
513        )
514    } else {
515        (EmbeddingMode::None, None)
516    };
517    let embedding_enabled = embedding_runtime.is_some();
518    let runtime_ref = embedding_runtime.as_ref();
519
520    // Warn user about vector storage overhead (PHASE 2)
521    if embedding_enabled && !args.json {
522        let capacity = stats.capacity_bytes;
523        if args.vector_compression {
524            // PHASE 3: PQ compression reduces storage 16x
525            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
526            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
527            eprintln!("   Storage: ~20 KB per document (16x compressed)");
528            eprintln!("   Accuracy: ~95% recall@10 maintained");
529            eprintln!(
530                "   Capacity: ~{} documents max for {:.1} GB",
531                max_docs,
532                capacity as f64 / 1e9
533            );
534            eprintln!();
535        } else {
536            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
537            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
538            eprintln!("   Storage: ~270 KB per document");
539            eprintln!(
540                "   Capacity: ~{} documents max for {:.1} GB",
541                max_docs,
542                capacity as f64 / 1e9
543            );
544            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
545            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
546            eprintln!();
547        }
548    }
549
550    let embed_progress = if embedding_enabled {
551        let total = match &input_set {
552            InputSet::Files(paths) => Some(paths.len() as u64),
553            InputSet::Stdin => None,
554        };
555        Some(create_task_progress_bar(total, "embed", false))
556    } else {
557        None
558    };
559    let mut embedded_docs = 0usize;
560
561    if let InputSet::Files(paths) = &input_set {
562        if paths.len() > 1 {
563            if args.uri.is_some() || args.title.is_some() {
564                anyhow::bail!(
565                    "--uri/--title apply to a single file; omit them when using a directory or glob"
566                );
567            }
568        }
569    }
570
571    let mut reports = Vec::new();
572    let mut processed = 0usize;
573    let mut skipped = 0usize;
574    let mut bytes_added = 0u64;
575    let mut summary_warnings: Vec<String> = Vec::new();
576
577    let mut capacity_reached = false;
578    match input_set {
579        InputSet::Stdin => {
580            processed += 1;
581            match read_payload(None) {
582                Ok(payload) => {
583                    let mut analyze_spinner = if args.json {
584                        None
585                    } else {
586                        Some(create_spinner("Analyzing stdin payload..."))
587                    };
588                    match ingest_payload(
589                        &mut mem,
590                        &args,
591                        payload,
592                        None,
593                        capacity_guard.as_mut(),
594                        embedding_enabled,
595                        runtime_ref,
596                        use_parallel,
597                    ) {
598                        Ok(outcome) => {
599                            if let Some(pb) = analyze_spinner.take() {
600                                pb.finish_and_clear();
601                            }
602                            bytes_added += outcome.report.stored_bytes as u64;
603                            if args.json {
604                                summary_warnings
605                                    .extend(outcome.report.extraction.warnings.iter().cloned());
606                            }
607                            reports.push(outcome.report);
608                            let report_index = reports.len() - 1;
609                            if !args.json && !use_parallel {
610                                if let Some(report) = reports.get(report_index) {
611                                    print_report(report);
612                                }
613                            }
614                            if args.transcript {
615                                let notice = transcript_notice_message(&mut mem)?;
616                                if args.json {
617                                    summary_warnings.push(notice);
618                                } else {
619                                    println!("{notice}");
620                                }
621                            }
622                            if outcome.embedded {
623                                if let Some(pb) = embed_progress.as_ref() {
624                                    pb.inc(1);
625                                }
626                                embedded_docs += 1;
627                            }
628                            if use_parallel {
629                                #[cfg(feature = "parallel_segments")]
630                                if let Some(input) = outcome.parallel_input {
631                                    pending_parallel_indices.push(report_index);
632                                    pending_parallel_inputs.push(input);
633                                }
634                            } else if let Some(guard) = capacity_guard.as_mut() {
635                                mem.commit()?;
636                                let stats = mem.stats()?;
637                                guard.update_after_commit(&stats);
638                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
639                            }
640                        }
641                        Err(err) => {
642                            if let Some(pb) = analyze_spinner.take() {
643                                pb.finish_and_clear();
644                            }
645                            if err.downcast_ref::<DuplicateUriError>().is_some() {
646                                return Err(err);
647                            }
648                            warn!("skipped stdin payload: {err}");
649                            skipped += 1;
650                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
651                                capacity_reached = true;
652                            }
653                        }
654                    }
655                }
656                Err(err) => {
657                    warn!("failed to read stdin: {err}");
658                    skipped += 1;
659                }
660            }
661        }
662        InputSet::Files(ref paths) => {
663            let mut transcript_notice_printed = false;
664            for path in paths {
665                processed += 1;
666                match read_payload(Some(&path)) {
667                    Ok(payload) => {
668                        let label = path.display().to_string();
669                        let mut analyze_spinner = if args.json {
670                            None
671                        } else {
672                            Some(create_spinner(&format!("Analyzing {label}...")))
673                        };
674                        match ingest_payload(
675                            &mut mem,
676                            &args,
677                            payload,
678                            Some(&path),
679                            capacity_guard.as_mut(),
680                            embedding_enabled,
681                            runtime_ref,
682                            use_parallel,
683                        ) {
684                            Ok(outcome) => {
685                                if let Some(pb) = analyze_spinner.take() {
686                                    pb.finish_and_clear();
687                                }
688                                bytes_added += outcome.report.stored_bytes as u64;
689                                if args.json {
690                                    summary_warnings
691                                        .extend(outcome.report.extraction.warnings.iter().cloned());
692                                }
693                                reports.push(outcome.report);
694                                let report_index = reports.len() - 1;
695                                if !args.json && !use_parallel {
696                                    if let Some(report) = reports.get(report_index) {
697                                        print_report(report);
698                                    }
699                                }
700                                if args.transcript && !transcript_notice_printed {
701                                    let notice = transcript_notice_message(&mut mem)?;
702                                    if args.json {
703                                        summary_warnings.push(notice);
704                                    } else {
705                                        println!("{notice}");
706                                    }
707                                    transcript_notice_printed = true;
708                                }
709                                if outcome.embedded {
710                                    if let Some(pb) = embed_progress.as_ref() {
711                                        pb.inc(1);
712                                    }
713                                    embedded_docs += 1;
714                                }
715                                if use_parallel {
716                                    #[cfg(feature = "parallel_segments")]
717                                    if let Some(input) = outcome.parallel_input {
718                                        pending_parallel_indices.push(report_index);
719                                        pending_parallel_inputs.push(input);
720                                    }
721                                } else if let Some(guard) = capacity_guard.as_mut() {
722                                    mem.commit()?;
723                                    let stats = mem.stats()?;
724                                    guard.update_after_commit(&stats);
725                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
726                                }
727                            }
728                            Err(err) => {
729                                if let Some(pb) = analyze_spinner.take() {
730                                    pb.finish_and_clear();
731                                }
732                                if err.downcast_ref::<DuplicateUriError>().is_some() {
733                                    return Err(err);
734                                }
735                                warn!("skipped {}: {err}", path.display());
736                                skipped += 1;
737                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
738                                    capacity_reached = true;
739                                }
740                            }
741                        }
742                    }
743                    Err(err) => {
744                        warn!("failed to read {}: {err}", path.display());
745                        skipped += 1;
746                    }
747                }
748                if capacity_reached {
749                    break;
750                }
751            }
752        }
753    }
754
755    if capacity_reached {
756        warn!("capacity limit reached; remaining inputs were not processed");
757    }
758
759    if let Some(pb) = embed_progress.as_ref() {
760        pb.finish_with_message("embed");
761    }
762
763    if let Some(runtime) = embedding_runtime.as_ref() {
764        if embedded_docs > 0 {
765            match embedding_mode {
766                EmbeddingMode::Explicit => println!(
767                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
768                    embedded_docs,
769                    runtime.dimension()
770                ),
771                EmbeddingMode::Auto => println!(
772                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
773                    embedded_docs,
774                    runtime.dimension()
775                ),
776                EmbeddingMode::None => {}
777            }
778        } else {
779            match embedding_mode {
780                EmbeddingMode::Explicit => {
781                    println!("No embeddings were generated (no textual content available)");
782                }
783                EmbeddingMode::Auto => {
784                    println!(
785                        "Semantic runtime available, but no textual content produced embeddings"
786                    );
787                }
788                EmbeddingMode::None => {}
789            }
790        }
791    }
792
793    if reports.is_empty() {
794        if args.json {
795            if capacity_reached {
796                summary_warnings.push("capacity_limit_reached".to_string());
797            }
798            let summary_json = json!({
799                "processed": processed,
800                "ingested": 0,
801                "skipped": skipped,
802                "bytes_added": bytes_added,
803                "bytes_added_human": format_bytes(bytes_added),
804                "embedded_documents": embedded_docs,
805                "capacity_reached": capacity_reached,
806                "warnings": summary_warnings,
807                "reports": Vec::<serde_json::Value>::new(),
808            });
809            println!("{}", serde_json::to_string_pretty(&summary_json)?);
810        } else {
811            println!(
812                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
813                processed, skipped
814            );
815        }
816        return Ok(());
817    }
818
819    #[cfg(feature = "parallel_segments")]
820    let mut parallel_flush_spinner = if use_parallel && !args.json {
821        Some(create_spinner("Flushing parallel segments..."))
822    } else {
823        None
824    };
825
826    if use_parallel {
827        #[cfg(feature = "parallel_segments")]
828        {
829            let mut opts = parallel_opts.take().unwrap_or_else(|| {
830                let mut opts = BuildOpts::default();
831                opts.sanitize();
832                opts
833            });
834            // Set vector compression mode from mem state
835            opts.vec_compression = mem.vector_compression().clone();
836            let seqs = if pending_parallel_inputs.is_empty() {
837                mem.commit_parallel(opts)?;
838                Vec::new()
839            } else {
840                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
841            };
842            if let Some(pb) = parallel_flush_spinner.take() {
843                pb.finish_with_message("Flushed parallel segments");
844            }
845            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
846                if let Some(report) = reports.get_mut(idx) {
847                    report.seq = seq;
848                }
849            }
850        }
851        #[cfg(not(feature = "parallel_segments"))]
852        {
853            mem.commit()?;
854        }
855    } else {
856        mem.commit()?;
857    }
858
859    if !args.json && use_parallel {
860        for report in &reports {
861            print_report(report);
862        }
863    }
864
865    // Table extraction for PDF files
866    let mut tables_extracted = 0usize;
867    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
868    if args.tables && !args.no_tables {
869        if let InputSet::Files(ref paths) = input_set {
870            let pdf_paths: Vec<_> = paths
871                .iter()
872                .filter(|p| {
873                    p.extension()
874                        .and_then(|e| e.to_str())
875                        .map(|e| e.eq_ignore_ascii_case("pdf"))
876                        .unwrap_or(false)
877                })
878                .collect();
879
880            if !pdf_paths.is_empty() {
881                let table_spinner = if args.json {
882                    None
883                } else {
884                    Some(create_spinner(&format!(
885                        "Extracting tables from {} PDF(s)...",
886                        pdf_paths.len()
887                    )))
888                };
889
890                // Use aggressive mode with auto fallback for best results
891                let table_options = TableExtractionOptions::builder()
892                    .mode(memvid_core::table::ExtractionMode::Aggressive)
893                    .min_rows(2)
894                    .min_cols(2)
895                    .min_quality(memvid_core::table::TableQuality::Low)
896                    .merge_multi_page(true)
897                    .build();
898
899                for pdf_path in pdf_paths {
900                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
901                        let filename = pdf_path
902                            .file_name()
903                            .and_then(|s| s.to_str())
904                            .unwrap_or("unknown.pdf");
905
906                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
907                            for table in &result.tables {
908                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
909                                {
910                                    tables_extracted += 1;
911                                    table_summaries.push(json!({
912                                        "table_id": table.table_id,
913                                        "source_file": table.source_file,
914                                        "meta_frame_id": meta_id,
915                                        "rows": table.n_rows,
916                                        "cols": table.n_cols,
917                                        "quality": format!("{:?}", table.quality),
918                                        "pages": format!("{}-{}", table.page_start, table.page_end),
919                                    }));
920                                }
921                            }
922                        }
923                    }
924                }
925
926                if let Some(pb) = table_spinner {
927                    if tables_extracted > 0 {
928                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
929                    } else {
930                        pb.finish_with_message("No tables found");
931                    }
932                }
933
934                // Commit table frames
935                if tables_extracted > 0 {
936                    mem.commit()?;
937                }
938            }
939        }
940    }
941
942    let stats = mem.stats()?;
943    if args.json {
944        if capacity_reached {
945            summary_warnings.push("capacity_limit_reached".to_string());
946        }
947        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
948        let total_duration_ms: Option<u64> = {
949            let sum: u64 = reports
950                .iter()
951                .filter_map(|r| r.extraction.duration_ms)
952                .sum();
953            if sum > 0 {
954                Some(sum)
955            } else {
956                None
957            }
958        };
959        let total_pages: Option<u32> = {
960            let sum: u32 = reports
961                .iter()
962                .filter_map(|r| r.extraction.pages_processed)
963                .sum();
964            if sum > 0 {
965                Some(sum)
966            } else {
967                None
968            }
969        };
970        let embedding_mode_str = match embedding_mode {
971            EmbeddingMode::None => "none",
972            EmbeddingMode::Auto => "auto",
973            EmbeddingMode::Explicit => "explicit",
974        };
975        let summary_json = json!({
976            "processed": processed,
977            "ingested": reports.len(),
978            "skipped": skipped,
979            "bytes_added": bytes_added,
980            "bytes_added_human": format_bytes(bytes_added),
981            "embedded_documents": embedded_docs,
982            "embedding": {
983                "enabled": embedding_enabled,
984                "mode": embedding_mode_str,
985                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
986            },
987            "tables": {
988                "enabled": args.tables && !args.no_tables,
989                "extracted": tables_extracted,
990                "tables": table_summaries,
991            },
992            "capacity_reached": capacity_reached,
993            "warnings": summary_warnings,
994            "extraction": {
995                "total_duration_ms": total_duration_ms,
996                "total_pages_processed": total_pages,
997            },
998            "memory": {
999                "path": args.file.display().to_string(),
1000                "frame_count": stats.frame_count,
1001                "size_bytes": stats.size_bytes,
1002                "tier": format!("{:?}", stats.tier),
1003            },
1004            "reports": reports_json,
1005        });
1006        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1007    } else {
1008        println!(
1009            "Committed {} frame(s); total frames {}",
1010            reports.len(),
1011            stats.frame_count
1012        );
1013        println!(
1014            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1015            processed,
1016            reports.len(),
1017            skipped,
1018            format_bytes(bytes_added)
1019        );
1020        if tables_extracted > 0 {
1021            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1022        }
1023    }
1024    Ok(())
1025}
1026
1027pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1028    let mut mem = Memvid::open(&args.file)?;
1029    ensure_cli_mutation_allowed(&mem)?;
1030    apply_lock_cli(&mut mem, &args.lock);
1031    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1032    let frame_id = existing.id;
1033
1034    let payload_bytes = match args.input.as_ref() {
1035        Some(path) => Some(read_payload(Some(path))?),
1036        None => None,
1037    };
1038    let payload_slice = payload_bytes.as_deref();
1039    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1040    let source_path = args.input.as_deref();
1041
1042    let mut options = PutOptions::default();
1043    options.enable_embedding = false;
1044    options.auto_tag = payload_slice.is_some();
1045    options.extract_dates = payload_slice.is_some();
1046    options.timestamp = Some(existing.timestamp);
1047    options.track = existing.track.clone();
1048    options.kind = existing.kind.clone();
1049    options.uri = existing.uri.clone();
1050    options.title = existing.title.clone();
1051    options.metadata = existing.metadata.clone();
1052    options.search_text = existing.search_text.clone();
1053    options.tags = existing.tags.clone();
1054    options.labels = existing.labels.clone();
1055    options.extra_metadata = existing.extra_metadata.clone();
1056
1057    if let Some(new_uri) = &args.set_uri {
1058        options.uri = Some(derive_uri(Some(new_uri), None));
1059    }
1060
1061    if options.uri.is_none() && payload_slice.is_some() {
1062        options.uri = Some(derive_uri(None, source_path));
1063    }
1064
1065    if let Some(title) = &args.title {
1066        options.title = Some(title.clone());
1067    }
1068    if let Some(ts) = args.timestamp {
1069        options.timestamp = Some(ts);
1070    }
1071    if let Some(track) = &args.track {
1072        options.track = Some(track.clone());
1073    }
1074    if let Some(kind) = &args.kind {
1075        options.kind = Some(kind.clone());
1076    }
1077
1078    if !args.labels.is_empty() {
1079        options.labels = args.labels.clone();
1080    }
1081
1082    if !args.tags.is_empty() {
1083        options.tags.clear();
1084        for (key, value) in &args.tags {
1085            if !key.is_empty() {
1086                options.tags.push(key.clone());
1087            }
1088            if !value.is_empty() && value != key {
1089                options.tags.push(value.clone());
1090            }
1091            options.extra_metadata.insert(key.clone(), value.clone());
1092        }
1093    }
1094
1095    apply_metadata_overrides(&mut options, &args.metadata);
1096
1097    let mut search_source = options.search_text.clone();
1098
1099    if let Some(payload) = payload_slice {
1100        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1101        if let Some(title) = inferred_title {
1102            options.title = Some(title);
1103        }
1104
1105        if options.uri.is_none() {
1106            options.uri = Some(derive_uri(None, source_path));
1107        }
1108
1109        let analysis = analyze_file(
1110            source_path,
1111            payload,
1112            payload_utf8,
1113            options.title.as_deref(),
1114            AudioAnalyzeOptions::default(),
1115            false,
1116        );
1117        if let Some(meta) = analysis.metadata {
1118            options.metadata = Some(meta);
1119        }
1120        if let Some(text) = analysis.search_text {
1121            search_source = Some(text.clone());
1122            options.search_text = Some(text);
1123        }
1124    } else {
1125        options.auto_tag = false;
1126        options.extract_dates = false;
1127    }
1128
1129    let stats = mem.stats()?;
1130    options.enable_embedding = stats.has_vec_index;
1131
1132    // Auto-detect embedding model from existing MV2 dimension
1133    let mv2_dimension = mem.vec_index_dimension();
1134    let mut embedding: Option<Vec<f32>> = None;
1135    if args.embeddings {
1136        let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1137        if let Some(text) = search_source.as_ref() {
1138            if !text.trim().is_empty() {
1139                embedding = Some(runtime.embed(text)?);
1140            }
1141        }
1142        if embedding.is_none() {
1143            if let Some(text) = payload_utf8 {
1144                if !text.trim().is_empty() {
1145                    embedding = Some(runtime.embed(text)?);
1146                }
1147            }
1148        }
1149        if embedding.is_none() {
1150            warn!("no textual content available; embeddings not recomputed");
1151        }
1152    }
1153
1154    let mut effective_embedding = embedding;
1155    if effective_embedding.is_none() && stats.has_vec_index {
1156        effective_embedding = mem.frame_embedding(frame_id)?;
1157    }
1158
1159    let final_uri = options.uri.clone();
1160    let replaced_payload = payload_slice.is_some();
1161    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1162    mem.commit()?;
1163
1164    let updated_frame =
1165        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1166            mem.frame_by_uri(&uri)?
1167        } else {
1168            let mut query = TimelineQueryBuilder::default();
1169            if let Some(limit) = NonZeroU64::new(1) {
1170                query = query.limit(limit).reverse(true);
1171            }
1172            let latest = mem.timeline(query.build())?;
1173            if let Some(entry) = latest.first() {
1174                mem.frame_by_id(entry.frame_id)?
1175            } else {
1176                mem.frame_by_id(frame_id)?
1177            }
1178        };
1179
1180    if args.json {
1181        let json = json!({
1182            "sequence": seq,
1183            "previous_frame_id": frame_id,
1184            "frame": frame_to_json(&updated_frame),
1185            "replaced_payload": replaced_payload,
1186        });
1187        println!("{}", serde_json::to_string_pretty(&json)?);
1188    } else {
1189        println!(
1190            "Updated frame {} → new frame {} (seq {})",
1191            frame_id, updated_frame.id, seq
1192        );
1193        println!(
1194            "Payload: {}",
1195            if replaced_payload {
1196                "replaced"
1197            } else {
1198                "reused"
1199            }
1200        );
1201        print_frame_summary(&mut mem, &updated_frame)?;
1202    }
1203
1204    Ok(())
1205}
1206
1207fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1208    if let Some(uri) = provided {
1209        let sanitized = sanitize_uri(uri, true);
1210        return format!("mv2://{}", sanitized);
1211    }
1212
1213    if let Some(path) = source {
1214        let raw = path.to_string_lossy();
1215        let sanitized = sanitize_uri(&raw, false);
1216        return format!("mv2://{}", sanitized);
1217    }
1218
1219    format!("mv2://frames/{}", Uuid::new_v4())
1220}
1221
1222pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1223    let digest = hash(payload).to_hex();
1224    let short = &digest[..32];
1225    let ext_from_path = source
1226        .and_then(|path| path.extension())
1227        .and_then(|ext| ext.to_str())
1228        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1229        .filter(|ext| !ext.is_empty());
1230    let ext = ext_from_path
1231        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1232        .unwrap_or_else(|| "bin".to_string());
1233    format!("mv2://video/{short}.{ext}")
1234}
1235
1236fn derive_title(
1237    provided: Option<String>,
1238    source: Option<&Path>,
1239    payload_utf8: Option<&str>,
1240) -> Option<String> {
1241    if let Some(title) = provided {
1242        let trimmed = title.trim();
1243        if trimmed.is_empty() {
1244            None
1245        } else {
1246            Some(trimmed.to_string())
1247        }
1248    } else {
1249        if let Some(text) = payload_utf8 {
1250            if let Some(markdown_title) = extract_markdown_title(text) {
1251                return Some(markdown_title);
1252            }
1253        }
1254        source
1255            .and_then(|path| path.file_stem())
1256            .and_then(|stem| stem.to_str())
1257            .map(to_title_case)
1258    }
1259}
1260
1261fn extract_markdown_title(text: &str) -> Option<String> {
1262    for line in text.lines() {
1263        let trimmed = line.trim();
1264        if trimmed.starts_with('#') {
1265            let title = trimmed.trim_start_matches('#').trim();
1266            if !title.is_empty() {
1267                return Some(title.to_string());
1268            }
1269        }
1270    }
1271    None
1272}
1273
1274fn to_title_case(input: &str) -> String {
1275    if input.is_empty() {
1276        return String::new();
1277    }
1278    let mut chars = input.chars();
1279    let Some(first) = chars.next() else {
1280        return String::new();
1281    };
1282    let prefix = first.to_uppercase().collect::<String>();
1283    prefix + chars.as_str()
1284}
1285
1286fn should_skip_input(path: &Path) -> bool {
1287    matches!(
1288        path.file_name().and_then(|name| name.to_str()),
1289        Some(".DS_Store")
1290    )
1291}
1292
1293fn should_skip_dir(path: &Path) -> bool {
1294    matches!(
1295        path.file_name().and_then(|name| name.to_str()),
1296        Some(name) if name.starts_with('.')
1297    )
1298}
1299
1300enum InputSet {
1301    Stdin,
1302    Files(Vec<PathBuf>),
1303}
1304
1305fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1306    let Some(raw) = input else {
1307        return Ok(InputSet::Stdin);
1308    };
1309
1310    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1311        let mut files = Vec::new();
1312        let mut matched_any = false;
1313        for entry in glob(raw)? {
1314            let path = entry?;
1315            if path.is_file() {
1316                matched_any = true;
1317                if should_skip_input(&path) {
1318                    continue;
1319                }
1320                files.push(path);
1321            }
1322        }
1323        files.sort();
1324        if files.is_empty() {
1325            if matched_any {
1326                anyhow::bail!(
1327                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1328                );
1329            }
1330            anyhow::bail!("pattern '{raw}' did not match any files");
1331        }
1332        return Ok(InputSet::Files(files));
1333    }
1334
1335    let path = PathBuf::from(raw);
1336    if path.is_dir() {
1337        let (mut files, skipped_any) = collect_directory_files(&path)?;
1338        files.sort();
1339        if files.is_empty() {
1340            if skipped_any {
1341                anyhow::bail!(
1342                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1343                    path.display()
1344                );
1345            }
1346            anyhow::bail!(
1347                "directory '{}' contains no ingestible files",
1348                path.display()
1349            );
1350        }
1351        Ok(InputSet::Files(files))
1352    } else {
1353        Ok(InputSet::Files(vec![path]))
1354    }
1355}
1356
1357fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1358    let mut files = Vec::new();
1359    let mut skipped_any = false;
1360    let mut stack = vec![root.to_path_buf()];
1361    while let Some(dir) = stack.pop() {
1362        for entry in fs::read_dir(&dir)? {
1363            let entry = entry?;
1364            let path = entry.path();
1365            let file_type = entry.file_type()?;
1366            if file_type.is_dir() {
1367                if should_skip_dir(&path) {
1368                    skipped_any = true;
1369                    continue;
1370                }
1371                stack.push(path);
1372            } else if file_type.is_file() {
1373                if should_skip_input(&path) {
1374                    skipped_any = true;
1375                    continue;
1376                }
1377                files.push(path);
1378            }
1379        }
1380    }
1381    Ok((files, skipped_any))
1382}
1383
1384struct IngestOutcome {
1385    report: PutReport,
1386    embedded: bool,
1387    #[cfg(feature = "parallel_segments")]
1388    parallel_input: Option<ParallelInput>,
1389}
1390
1391struct PutReport {
1392    seq: u64,
1393    uri: String,
1394    title: Option<String>,
1395    original_bytes: usize,
1396    stored_bytes: usize,
1397    compressed: bool,
1398    source: Option<PathBuf>,
1399    mime: Option<String>,
1400    metadata: Option<DocMetadata>,
1401    extraction: ExtractionSummary,
1402}
1403
1404struct CapacityGuard {
1405    #[allow(dead_code)]
1406    tier: Tier,
1407    capacity: u64,
1408    current_size: u64,
1409}
1410
1411impl CapacityGuard {
1412    const MIN_OVERHEAD: u64 = 128 * 1024;
1413    const RELATIVE_OVERHEAD: f64 = 0.05;
1414
1415    fn from_stats(stats: &Stats) -> Option<Self> {
1416        if stats.capacity_bytes == 0 {
1417            return None;
1418        }
1419        Some(Self {
1420            tier: stats.tier,
1421            capacity: stats.capacity_bytes,
1422            current_size: stats.size_bytes,
1423        })
1424    }
1425
1426    fn ensure_capacity(&self, additional: u64) -> Result<()> {
1427        let projected = self
1428            .current_size
1429            .saturating_add(additional)
1430            .saturating_add(Self::estimate_overhead(additional));
1431        if projected > self.capacity {
1432            return Err(map_put_error(
1433                MemvidError::CapacityExceeded {
1434                    current: self.current_size,
1435                    limit: self.capacity,
1436                    required: projected,
1437                },
1438                Some(self.capacity),
1439            ));
1440        }
1441        Ok(())
1442    }
1443
1444    fn update_after_commit(&mut self, stats: &Stats) {
1445        self.current_size = stats.size_bytes;
1446    }
1447
1448    fn capacity_hint(&self) -> Option<u64> {
1449        Some(self.capacity)
1450    }
1451
1452    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
1453    fn check_and_warn_capacity(&self) {
1454        if self.capacity == 0 {
1455            return;
1456        }
1457
1458        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
1459
1460        // Warn at key thresholds
1461        if usage_pct >= 90.0 && usage_pct < 91.0 {
1462            eprintln!(
1463                "⚠️  CRITICAL: 90% capacity used ({} / {})",
1464                format_bytes(self.current_size),
1465                format_bytes(self.capacity)
1466            );
1467            eprintln!("   Actions:");
1468            eprintln!("   1. Recreate with larger --size");
1469            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
1470            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
1471        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
1472            eprintln!(
1473                "⚠️  WARNING: 75% capacity used ({} / {})",
1474                format_bytes(self.current_size),
1475                format_bytes(self.capacity)
1476            );
1477            eprintln!("   Consider planning for capacity expansion soon");
1478        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
1479            eprintln!(
1480                "ℹ️  INFO: 50% capacity used ({} / {})",
1481                format_bytes(self.current_size),
1482                format_bytes(self.capacity)
1483            );
1484        }
1485    }
1486
1487    fn estimate_overhead(additional: u64) -> u64 {
1488        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
1489        fractional.max(Self::MIN_OVERHEAD)
1490    }
1491}
1492
1493#[derive(Debug, Clone)]
1494pub struct FileAnalysis {
1495    pub mime: String,
1496    pub metadata: Option<DocMetadata>,
1497    pub search_text: Option<String>,
1498    pub extraction: ExtractionSummary,
1499}
1500
1501#[derive(Clone, Copy)]
1502pub struct AudioAnalyzeOptions {
1503    force: bool,
1504    segment_secs: u32,
1505}
1506
1507impl AudioAnalyzeOptions {
1508    const DEFAULT_SEGMENT_SECS: u32 = 30;
1509
1510    fn normalised_segment_secs(self) -> u32 {
1511        self.segment_secs.max(1)
1512    }
1513}
1514
1515impl Default for AudioAnalyzeOptions {
1516    fn default() -> Self {
1517        Self {
1518            force: false,
1519            segment_secs: Self::DEFAULT_SEGMENT_SECS,
1520        }
1521    }
1522}
1523
1524struct AudioAnalysis {
1525    metadata: DocAudioMetadata,
1526    caption: Option<String>,
1527    search_terms: Vec<String>,
1528}
1529
1530struct ImageAnalysis {
1531    width: u32,
1532    height: u32,
1533    palette: Vec<String>,
1534    caption: Option<String>,
1535    exif: Option<DocExifMetadata>,
1536}
1537
1538#[derive(Debug, Clone)]
1539pub struct ExtractionSummary {
1540    reader: Option<String>,
1541    status: ExtractionStatus,
1542    warnings: Vec<String>,
1543    duration_ms: Option<u64>,
1544    pages_processed: Option<u32>,
1545}
1546
1547impl ExtractionSummary {
1548    fn record_warning<S: Into<String>>(&mut self, warning: S) {
1549        self.warnings.push(warning.into());
1550    }
1551}
1552
1553#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1554enum ExtractionStatus {
1555    Skipped,
1556    Ok,
1557    FallbackUsed,
1558    Empty,
1559    Failed,
1560}
1561
1562impl ExtractionStatus {
1563    fn label(self) -> &'static str {
1564        match self {
1565            Self::Skipped => "skipped",
1566            Self::Ok => "ok",
1567            Self::FallbackUsed => "fallback_used",
1568            Self::Empty => "empty",
1569            Self::Failed => "failed",
1570        }
1571    }
1572}
1573
1574fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
1575    let filename = source_path
1576        .and_then(|path| path.file_name())
1577        .and_then(|name| name.to_str())
1578        .map(|value| value.to_string());
1579    MediaManifest {
1580        kind: "video".to_string(),
1581        mime: mime.to_string(),
1582        bytes,
1583        filename,
1584        duration_ms: None,
1585        width: None,
1586        height: None,
1587        codec: None,
1588    }
1589}
1590
1591pub fn analyze_file(
1592    source_path: Option<&Path>,
1593    payload: &[u8],
1594    payload_utf8: Option<&str>,
1595    inferred_title: Option<&str>,
1596    audio_opts: AudioAnalyzeOptions,
1597    force_video: bool,
1598) -> FileAnalysis {
1599    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
1600    let is_video = force_video || mime.starts_with("video/");
1601    let treat_as_text = if is_video { false } else { treat_as_text_base };
1602
1603    let mut metadata = DocMetadata::default();
1604    metadata.mime = Some(mime.clone());
1605    metadata.bytes = Some(payload.len() as u64);
1606    metadata.hash = Some(hash(payload).to_hex().to_string());
1607
1608    let mut search_text = None;
1609
1610    let mut extraction = ExtractionSummary {
1611        reader: None,
1612        status: ExtractionStatus::Skipped,
1613        warnings: Vec::new(),
1614        duration_ms: None,
1615        pages_processed: None,
1616    };
1617
1618    let reader_registry = default_reader_registry();
1619    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
1620        if slice.is_empty() {
1621            None
1622        } else {
1623            Some(slice)
1624        }
1625    });
1626
1627    let apply_extracted_text = |text: &str,
1628                                reader_label: &str,
1629                                status_if_applied: ExtractionStatus,
1630                                extraction: &mut ExtractionSummary,
1631                                metadata: &mut DocMetadata,
1632                                search_text: &mut Option<String>|
1633     -> bool {
1634        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
1635            let mut value = normalized.text;
1636            if normalized.truncated {
1637                value.push('…');
1638            }
1639            if metadata.caption.is_none() {
1640                if let Some(caption) = caption_from_text(&value) {
1641                    metadata.caption = Some(caption);
1642                }
1643            }
1644            *search_text = Some(value);
1645            extraction.reader = Some(reader_label.to_string());
1646            extraction.status = status_if_applied;
1647            true
1648        } else {
1649            false
1650        }
1651    };
1652
1653    if is_video {
1654        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
1655    }
1656
1657    if treat_as_text {
1658        if let Some(text) = payload_utf8 {
1659            if !apply_extracted_text(
1660                text,
1661                "bytes",
1662                ExtractionStatus::Ok,
1663                &mut extraction,
1664                &mut metadata,
1665                &mut search_text,
1666            ) {
1667                extraction.status = ExtractionStatus::Empty;
1668                extraction.reader = Some("bytes".to_string());
1669                let msg = "text payload contained no searchable content after normalization";
1670                extraction.record_warning(msg);
1671                warn!("{msg}");
1672            }
1673        } else {
1674            extraction.reader = Some("bytes".to_string());
1675            extraction.status = ExtractionStatus::Failed;
1676            let msg = "payload reported as text but was not valid UTF-8";
1677            extraction.record_warning(msg);
1678            warn!("{msg}");
1679        }
1680    } else if mime == "application/pdf" {
1681        let mut applied = false;
1682
1683        let format_hint = infer_document_format(Some(mime.as_str()), magic);
1684        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
1685            .with_uri(source_path.and_then(|p| p.to_str()))
1686            .with_magic(magic);
1687
1688        if let Some(reader) = reader_registry.find_reader(&hint) {
1689            match reader.extract(payload, &hint) {
1690                Ok(output) => {
1691                    extraction.reader = Some(output.reader_name.clone());
1692                    if let Some(ms) = output.diagnostics.duration_ms {
1693                        extraction.duration_ms = Some(ms);
1694                    }
1695                    if let Some(pages) = output.diagnostics.pages_processed {
1696                        extraction.pages_processed = Some(pages);
1697                    }
1698                    if let Some(mime_type) = output.document.mime_type.clone() {
1699                        metadata.mime = Some(mime_type);
1700                    }
1701
1702                    for warning in output.diagnostics.warnings.iter() {
1703                        extraction.record_warning(warning);
1704                        warn!("{warning}");
1705                    }
1706
1707                    let status = if output.diagnostics.fallback {
1708                        ExtractionStatus::FallbackUsed
1709                    } else {
1710                        ExtractionStatus::Ok
1711                    };
1712
1713                    if let Some(doc_text) = output.document.text.as_ref() {
1714                        applied = apply_extracted_text(
1715                            doc_text,
1716                            output.reader_name.as_str(),
1717                            status,
1718                            &mut extraction,
1719                            &mut metadata,
1720                            &mut search_text,
1721                        );
1722                        if !applied {
1723                            extraction.reader = Some(output.reader_name);
1724                            extraction.status = ExtractionStatus::Empty;
1725                            let msg = "primary reader returned no usable text";
1726                            extraction.record_warning(msg);
1727                            warn!("{msg}");
1728                        }
1729                    } else {
1730                        extraction.reader = Some(output.reader_name);
1731                        extraction.status = ExtractionStatus::Empty;
1732                        let msg = "primary reader returned empty text";
1733                        extraction.record_warning(msg);
1734                        warn!("{msg}");
1735                    }
1736                }
1737                Err(err) => {
1738                    let name = reader.name();
1739                    extraction.reader = Some(name.to_string());
1740                    extraction.status = ExtractionStatus::Failed;
1741                    let msg = format!("primary reader {name} failed: {err}");
1742                    extraction.record_warning(&msg);
1743                    warn!("{msg}");
1744                }
1745            }
1746        } else {
1747            extraction.record_warning("no registered reader matched this PDF payload");
1748            warn!("no registered reader matched this PDF payload");
1749        }
1750
1751        if !applied {
1752            match extract_pdf_text(payload) {
1753                Ok(pdf_text) => {
1754                    if !apply_extracted_text(
1755                        &pdf_text,
1756                        "pdf_extract",
1757                        ExtractionStatus::FallbackUsed,
1758                        &mut extraction,
1759                        &mut metadata,
1760                        &mut search_text,
1761                    ) {
1762                        extraction.reader = Some("pdf_extract".to_string());
1763                        extraction.status = ExtractionStatus::Empty;
1764                        let msg = "PDF text extraction yielded no searchable content";
1765                        extraction.record_warning(msg);
1766                        warn!("{msg}");
1767                    }
1768                }
1769                Err(err) => {
1770                    extraction.reader = Some("pdf_extract".to_string());
1771                    extraction.status = ExtractionStatus::Failed;
1772                    let msg = format!("failed to extract PDF text via fallback: {err}");
1773                    extraction.record_warning(&msg);
1774                    warn!("{msg}");
1775                }
1776            }
1777        }
1778    }
1779
1780    if !is_video && mime.starts_with("image/") {
1781        if let Some(image_meta) = analyze_image(payload) {
1782            let ImageAnalysis {
1783                width,
1784                height,
1785                palette,
1786                caption,
1787                exif,
1788            } = image_meta;
1789            metadata.width = Some(width);
1790            metadata.height = Some(height);
1791            if !palette.is_empty() {
1792                metadata.colors = Some(palette);
1793            }
1794            if metadata.caption.is_none() {
1795                metadata.caption = caption;
1796            }
1797            if let Some(exif) = exif {
1798                metadata.exif = Some(exif);
1799            }
1800        }
1801    }
1802
1803    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
1804        if let Some(AudioAnalysis {
1805            metadata: audio_metadata,
1806            caption: audio_caption,
1807            mut search_terms,
1808        }) = analyze_audio(payload, source_path, &mime, audio_opts)
1809        {
1810            if metadata.audio.is_none()
1811                || metadata
1812                    .audio
1813                    .as_ref()
1814                    .map_or(true, DocAudioMetadata::is_empty)
1815            {
1816                metadata.audio = Some(audio_metadata);
1817            }
1818            if metadata.caption.is_none() {
1819                metadata.caption = audio_caption;
1820            }
1821            if !search_terms.is_empty() {
1822                match &mut search_text {
1823                    Some(existing) => {
1824                        for term in search_terms.drain(..) {
1825                            if !existing.contains(&term) {
1826                                if !existing.ends_with(' ') {
1827                                    existing.push(' ');
1828                                }
1829                                existing.push_str(&term);
1830                            }
1831                        }
1832                    }
1833                    None => {
1834                        search_text = Some(search_terms.join(" "));
1835                    }
1836                }
1837            }
1838        }
1839    }
1840
1841    if metadata.caption.is_none() {
1842        if let Some(title) = inferred_title {
1843            let trimmed = title.trim();
1844            if !trimmed.is_empty() {
1845                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
1846            }
1847        }
1848    }
1849
1850    FileAnalysis {
1851        mime,
1852        metadata: if metadata.is_empty() {
1853            None
1854        } else {
1855            Some(metadata)
1856        },
1857        search_text,
1858        extraction,
1859    }
1860}
1861
1862fn default_reader_registry() -> &'static ReaderRegistry {
1863    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
1864    REGISTRY.get_or_init(ReaderRegistry::default)
1865}
1866
1867fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
1868    if detect_pdf_magic(magic) {
1869        return Some(DocumentFormat::Pdf);
1870    }
1871
1872    let mime = mime?.trim().to_ascii_lowercase();
1873    match mime.as_str() {
1874        "application/pdf" => Some(DocumentFormat::Pdf),
1875        "text/plain" => Some(DocumentFormat::PlainText),
1876        "text/markdown" => Some(DocumentFormat::Markdown),
1877        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
1878        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
1879            Some(DocumentFormat::Docx)
1880        }
1881        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
1882            Some(DocumentFormat::Pptx)
1883        }
1884        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
1885            Some(DocumentFormat::Xlsx)
1886        }
1887        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
1888        _ => None,
1889    }
1890}
1891
1892fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
1893    let mut slice = match magic {
1894        Some(slice) if !slice.is_empty() => slice,
1895        _ => return false,
1896    };
1897    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
1898        slice = &slice[3..];
1899    }
1900    while let Some((first, rest)) = slice.split_first() {
1901        if first.is_ascii_whitespace() {
1902            slice = rest;
1903        } else {
1904            break;
1905        }
1906    }
1907    slice.starts_with(b"%PDF")
1908}
1909
1910fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
1911    match pdf_extract::extract_text_from_mem(payload) {
1912        Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
1913            Ok(fallback) => Ok(fallback),
1914            Err(err) => {
1915                warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
1916                Ok(text)
1917            }
1918        },
1919        Err(err) => {
1920            warn!("pdf_extract failed: {err}");
1921            match fallback_pdftotext(payload) {
1922                Ok(fallback) => Ok(fallback),
1923                Err(fallback_err) => {
1924                    warn!("pdftotext fallback failed: {fallback_err}");
1925                    Err(err)
1926                }
1927            }
1928        }
1929        other => other,
1930    }
1931}
1932
1933fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
1934    use std::io::Write;
1935    use std::process::{Command, Stdio};
1936
1937    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
1938
1939    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
1940    temp_pdf
1941        .write_all(payload)
1942        .context("failed to write pdf payload to temp file")?;
1943    temp_pdf.flush().context("failed to flush temp pdf file")?;
1944
1945    let child = Command::new(pdftotext)
1946        .arg("-layout")
1947        .arg(temp_pdf.path())
1948        .arg("-")
1949        .stdout(Stdio::piped())
1950        .spawn()
1951        .context("failed to spawn pdftotext")?;
1952
1953    let output = child
1954        .wait_with_output()
1955        .context("failed to read pdftotext output")?;
1956
1957    if !output.status.success() {
1958        return Err(anyhow!("pdftotext exited with status {}", output.status));
1959    }
1960
1961    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
1962    Ok(text)
1963}
1964
1965fn detect_mime(
1966    source_path: Option<&Path>,
1967    payload: &[u8],
1968    payload_utf8: Option<&str>,
1969) -> (String, bool) {
1970    if let Some(kind) = infer::get(payload) {
1971        let mime = kind.mime_type().to_string();
1972        let treat_as_text = mime.starts_with("text/")
1973            || matches!(
1974                mime.as_str(),
1975                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
1976            );
1977        return (mime, treat_as_text);
1978    }
1979
1980    let magic = magic_from_u8(payload);
1981    if !magic.is_empty() && magic != "application/octet-stream" {
1982        let treat_as_text = magic.starts_with("text/")
1983            || matches!(
1984                magic,
1985                "application/json"
1986                    | "application/xml"
1987                    | "application/javascript"
1988                    | "image/svg+xml"
1989                    | "text/plain"
1990            );
1991        return (magic.to_string(), treat_as_text);
1992    }
1993
1994    if let Some(path) = source_path {
1995        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
1996            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
1997                return (mime.to_string(), treat_as_text);
1998            }
1999        }
2000    }
2001
2002    if payload_utf8.is_some() {
2003        return ("text/plain".to_string(), true);
2004    }
2005
2006    ("application/octet-stream".to_string(), false)
2007}
2008
2009fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2010    let ext_lower = ext.to_ascii_lowercase();
2011    match ext_lower.as_str() {
2012        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2013        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2014        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2015        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2016        "md" | "markdown" => Some(("text/markdown", true)),
2017        "rst" => Some(("text/x-rst", true)),
2018        "json" => Some(("application/json", true)),
2019        "csv" => Some(("text/csv", true)),
2020        "tsv" => Some(("text/tab-separated-values", true)),
2021        "yaml" | "yml" => Some(("application/yaml", true)),
2022        "toml" => Some(("application/toml", true)),
2023        "html" | "htm" => Some(("text/html", true)),
2024        "xml" => Some(("application/xml", true)),
2025        "svg" => Some(("image/svg+xml", true)),
2026        "gif" => Some(("image/gif", false)),
2027        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2028        "png" => Some(("image/png", false)),
2029        "bmp" => Some(("image/bmp", false)),
2030        "ico" => Some(("image/x-icon", false)),
2031        "tif" | "tiff" => Some(("image/tiff", false)),
2032        "webp" => Some(("image/webp", false)),
2033        _ => None,
2034    }
2035}
2036
2037fn caption_from_text(text: &str) -> Option<String> {
2038    for line in text.lines() {
2039        let trimmed = line.trim();
2040        if !trimmed.is_empty() {
2041            return Some(truncate_to_boundary(trimmed, 240));
2042        }
2043    }
2044    None
2045}
2046
2047fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2048    if text.len() <= max_len {
2049        return text.to_string();
2050    }
2051    let mut end = max_len;
2052    while end > 0 && !text.is_char_boundary(end) {
2053        end -= 1;
2054    }
2055    if end == 0 {
2056        return String::new();
2057    }
2058    let mut truncated = text[..end].trim_end().to_string();
2059    truncated.push('…');
2060    truncated
2061}
2062
2063fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2064    let reader = ImageReader::new(Cursor::new(payload))
2065        .with_guessed_format()
2066        .map_err(|err| warn!("failed to guess image format: {err}"))
2067        .ok()?;
2068    let image = reader
2069        .decode()
2070        .map_err(|err| warn!("failed to decode image: {err}"))
2071        .ok()?;
2072    let width = image.width();
2073    let height = image.height();
2074    let rgb = image.to_rgb8();
2075    let palette = match get_palette(
2076        rgb.as_raw(),
2077        ColorFormat::Rgb,
2078        COLOR_PALETTE_SIZE,
2079        COLOR_PALETTE_QUALITY,
2080    ) {
2081        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2082        Err(err) => {
2083            warn!("failed to compute colour palette: {err}");
2084            Vec::new()
2085        }
2086    };
2087
2088    let (exif, exif_caption) = extract_exif_metadata(payload);
2089
2090    Some(ImageAnalysis {
2091        width,
2092        height,
2093        palette,
2094        caption: exif_caption,
2095        exif,
2096    })
2097}
2098
2099fn color_to_hex(color: Color) -> String {
2100    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2101}
2102
2103fn analyze_audio(
2104    payload: &[u8],
2105    source_path: Option<&Path>,
2106    mime: &str,
2107    options: AudioAnalyzeOptions,
2108) -> Option<AudioAnalysis> {
2109    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2110    use symphonia::core::errors::Error as SymphoniaError;
2111    use symphonia::core::formats::FormatOptions;
2112    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2113    use symphonia::core::meta::MetadataOptions;
2114    use symphonia::core::probe::Hint;
2115
2116    let cursor = Cursor::new(payload.to_vec());
2117    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2118
2119    let mut hint = Hint::new();
2120    if let Some(path) = source_path {
2121        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2122            hint.with_extension(ext);
2123        }
2124    }
2125    if let Some(suffix) = mime.split('/').nth(1) {
2126        hint.with_extension(suffix);
2127    }
2128
2129    let probed = symphonia::default::get_probe()
2130        .format(
2131            &hint,
2132            mss,
2133            &FormatOptions::default(),
2134            &MetadataOptions::default(),
2135        )
2136        .map_err(|err| warn!("failed to probe audio stream: {err}"))
2137        .ok()?;
2138
2139    let mut format = probed.format;
2140    let track = format.default_track()?;
2141    let track_id = track.id;
2142    let codec_params: CodecParameters = track.codec_params.clone();
2143    let sample_rate = codec_params.sample_rate;
2144    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2145
2146    let mut decoder = symphonia::default::get_codecs()
2147        .make(&codec_params, &DecoderOptions::default())
2148        .map_err(|err| warn!("failed to create audio decoder: {err}"))
2149        .ok()?;
2150
2151    let mut decoded_frames: u64 = 0;
2152    loop {
2153        let packet = match format.next_packet() {
2154            Ok(packet) => packet,
2155            Err(SymphoniaError::IoError(_)) => break,
2156            Err(SymphoniaError::DecodeError(err)) => {
2157                warn!("skipping undecodable audio packet: {err}");
2158                continue;
2159            }
2160            Err(SymphoniaError::ResetRequired) => {
2161                decoder.reset();
2162                continue;
2163            }
2164            Err(other) => {
2165                warn!("stopping audio analysis due to error: {other}");
2166                break;
2167            }
2168        };
2169
2170        if packet.track_id() != track_id {
2171            continue;
2172        }
2173
2174        match decoder.decode(&packet) {
2175            Ok(audio_buf) => {
2176                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2177            }
2178            Err(SymphoniaError::DecodeError(err)) => {
2179                warn!("failed to decode audio packet: {err}");
2180            }
2181            Err(SymphoniaError::IoError(_)) => break,
2182            Err(SymphoniaError::ResetRequired) => {
2183                decoder.reset();
2184            }
2185            Err(other) => {
2186                warn!("decoder error: {other}");
2187                break;
2188            }
2189        }
2190    }
2191
2192    let duration_secs = match sample_rate {
2193        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2194        _ => 0.0,
2195    };
2196
2197    let bitrate_kbps = if duration_secs > 0.0 {
2198        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2199    } else {
2200        None
2201    };
2202
2203    let tags = extract_lofty_tags(payload);
2204    let caption = tags
2205        .get("title")
2206        .cloned()
2207        .or_else(|| tags.get("tracktitle").cloned());
2208
2209    let mut search_terms = Vec::new();
2210    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2211        search_terms.push(title.clone());
2212    }
2213    if let Some(artist) = tags.get("artist") {
2214        search_terms.push(artist.clone());
2215    }
2216    if let Some(album) = tags.get("album") {
2217        search_terms.push(album.clone());
2218    }
2219    if let Some(genre) = tags.get("genre") {
2220        search_terms.push(genre.clone());
2221    }
2222
2223    let mut segments = Vec::new();
2224    if duration_secs > 0.0 {
2225        let segment_len = options.normalised_segment_secs() as f64;
2226        if segment_len > 0.0 {
2227            let mut start = 0.0;
2228            let mut idx = 1usize;
2229            while start < duration_secs {
2230                let end = (start + segment_len).min(duration_secs);
2231                segments.push(AudioSegmentMetadata {
2232                    start_seconds: start as f32,
2233                    end_seconds: end as f32,
2234                    label: Some(format!("Segment {}", idx)),
2235                });
2236                if end >= duration_secs {
2237                    break;
2238                }
2239                start = end;
2240                idx += 1;
2241            }
2242        }
2243    }
2244
2245    let mut metadata = DocAudioMetadata::default();
2246    if duration_secs > 0.0 {
2247        metadata.duration_secs = Some(duration_secs as f32);
2248    }
2249    if let Some(rate) = sample_rate {
2250        metadata.sample_rate_hz = Some(rate);
2251    }
2252    if let Some(channels) = channel_count {
2253        metadata.channels = Some(channels);
2254    }
2255    if let Some(bitrate) = bitrate_kbps {
2256        metadata.bitrate_kbps = Some(bitrate);
2257    }
2258    if codec_params.codec != CODEC_TYPE_NULL {
2259        metadata.codec = Some(format!("{:?}", codec_params.codec));
2260    }
2261    if !segments.is_empty() {
2262        metadata.segments = segments;
2263    }
2264    if !tags.is_empty() {
2265        metadata.tags = tags
2266            .iter()
2267            .map(|(k, v)| (k.clone(), v.clone()))
2268            .collect::<BTreeMap<_, _>>();
2269    }
2270
2271    Some(AudioAnalysis {
2272        metadata,
2273        caption,
2274        search_terms,
2275    })
2276}
2277
2278fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2279    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2280
2281    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2282        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2283            out.entry("title".into())
2284                .or_insert_with(|| value.to_string());
2285            out.entry("tracktitle".into())
2286                .or_insert_with(|| value.to_string());
2287        }
2288        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2289            out.entry("artist".into())
2290                .or_insert_with(|| value.to_string());
2291        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2292            out.entry("artist".into())
2293                .or_insert_with(|| value.to_string());
2294        }
2295        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2296            out.entry("album".into())
2297                .or_insert_with(|| value.to_string());
2298        }
2299        if let Some(value) = tag.get_string(&ItemKey::Genre) {
2300            out.entry("genre".into())
2301                .or_insert_with(|| value.to_string());
2302        }
2303        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2304            out.entry("track_number".into())
2305                .or_insert_with(|| value.to_string());
2306        }
2307        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2308            out.entry("disc_number".into())
2309                .or_insert_with(|| value.to_string());
2310        }
2311    }
2312
2313    let mut tags = HashMap::new();
2314    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2315        Ok(probe) => probe,
2316        Err(err) => {
2317            warn!("failed to guess audio tag file type: {err}");
2318            return tags;
2319        }
2320    };
2321
2322    let tagged_file = match probe.read() {
2323        Ok(file) => file,
2324        Err(err) => {
2325            warn!("failed to read audio tags: {err}");
2326            return tags;
2327        }
2328    };
2329
2330    if let Some(primary) = tagged_file.primary_tag() {
2331        collect_tag(primary, &mut tags);
2332    }
2333    for tag in tagged_file.tags() {
2334        collect_tag(tag, &mut tags);
2335    }
2336
2337    tags
2338}
2339
2340fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2341    let mut cursor = Cursor::new(payload);
2342    let exif = match ExifReader::new().read_from_container(&mut cursor) {
2343        Ok(exif) => exif,
2344        Err(err) => {
2345            warn!("failed to read EXIF metadata: {err}");
2346            return (None, None);
2347        }
2348    };
2349
2350    let mut doc = DocExifMetadata::default();
2351    let mut has_data = false;
2352
2353    if let Some(make) = exif_string(&exif, Tag::Make) {
2354        doc.make = Some(make);
2355        has_data = true;
2356    }
2357    if let Some(model) = exif_string(&exif, Tag::Model) {
2358        doc.model = Some(model);
2359        has_data = true;
2360    }
2361    if let Some(lens) =
2362        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
2363    {
2364        doc.lens = Some(lens);
2365        has_data = true;
2366    }
2367    if let Some(dt) =
2368        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
2369    {
2370        doc.datetime = Some(dt);
2371        has_data = true;
2372    }
2373
2374    if let Some(gps) = extract_gps_metadata(&exif) {
2375        doc.gps = Some(gps);
2376        has_data = true;
2377    }
2378
2379    let caption = exif_string(&exif, Tag::ImageDescription);
2380
2381    let metadata = if has_data { Some(doc) } else { None };
2382    (metadata, caption)
2383}
2384
2385fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
2386    exif.fields()
2387        .find(|field| field.tag == tag)
2388        .and_then(|field| field_to_string(field, exif))
2389}
2390
2391fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
2392    let value = field.display_value().with_unit(exif).to_string();
2393    let trimmed = value.trim_matches('\0').trim();
2394    if trimmed.is_empty() {
2395        None
2396    } else {
2397        Some(trimmed.to_string())
2398    }
2399}
2400
2401fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
2402    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
2403    let latitude_ref_field = exif
2404        .fields()
2405        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
2406    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
2407    let longitude_ref_field = exif
2408        .fields()
2409        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
2410
2411    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
2412    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
2413
2414    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
2415        if reference.eq_ignore_ascii_case("S") {
2416            latitude = -latitude;
2417        }
2418    }
2419    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
2420        if reference.eq_ignore_ascii_case("W") {
2421            longitude = -longitude;
2422        }
2423    }
2424
2425    Some(DocGpsMetadata {
2426        latitude,
2427        longitude,
2428    })
2429}
2430
2431fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
2432    match value {
2433        ExifValue::Rational(values) if !values.is_empty() => {
2434            let deg = rational_to_f64_u(values.get(0)?)?;
2435            let min = values
2436                .get(1)
2437                .and_then(|v| rational_to_f64_u(v))
2438                .unwrap_or(0.0);
2439            let sec = values
2440                .get(2)
2441                .and_then(|v| rational_to_f64_u(v))
2442                .unwrap_or(0.0);
2443            Some(deg + (min / 60.0) + (sec / 3600.0))
2444        }
2445        ExifValue::SRational(values) if !values.is_empty() => {
2446            let deg = rational_to_f64_i(values.get(0)?)?;
2447            let min = values
2448                .get(1)
2449                .and_then(|v| rational_to_f64_i(v))
2450                .unwrap_or(0.0);
2451            let sec = values
2452                .get(2)
2453                .and_then(|v| rational_to_f64_i(v))
2454                .unwrap_or(0.0);
2455            Some(deg + (min / 60.0) + (sec / 3600.0))
2456        }
2457        _ => None,
2458    }
2459}
2460
2461fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
2462    if value.denom == 0 {
2463        None
2464    } else {
2465        Some(value.num as f64 / value.denom as f64)
2466    }
2467}
2468
2469fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
2470    if value.denom == 0 {
2471        None
2472    } else {
2473        Some(value.num as f64 / value.denom as f64)
2474    }
2475}
2476
2477fn value_to_ascii(value: &ExifValue) -> Option<String> {
2478    if let ExifValue::Ascii(values) = value {
2479        values
2480            .first()
2481            .and_then(|bytes| std::str::from_utf8(bytes).ok())
2482            .map(|s| s.trim_matches('\0').trim().to_string())
2483            .filter(|s| !s.is_empty())
2484    } else {
2485        None
2486    }
2487}
2488
2489fn ingest_payload(
2490    mem: &mut Memvid,
2491    args: &PutArgs,
2492    payload: Vec<u8>,
2493    source_path: Option<&Path>,
2494    capacity_guard: Option<&mut CapacityGuard>,
2495    enable_embedding: bool,
2496    runtime: Option<&EmbeddingRuntime>,
2497    parallel_mode: bool,
2498) -> Result<IngestOutcome> {
2499    let original_bytes = payload.len();
2500    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
2501
2502    let payload_text = std::str::from_utf8(&payload).ok();
2503    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
2504
2505    let audio_opts = AudioAnalyzeOptions {
2506        force: args.audio,
2507        segment_secs: args
2508            .audio_segment_seconds
2509            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
2510    };
2511
2512    let mut analysis = analyze_file(
2513        source_path,
2514        &payload,
2515        payload_text,
2516        inferred_title.as_deref(),
2517        audio_opts,
2518        args.video,
2519    );
2520
2521    if args.video && !analysis.mime.starts_with("video/") {
2522        anyhow::bail!(
2523            "--video requires a video input; detected MIME type {}",
2524            analysis.mime
2525        );
2526    }
2527
2528    let mut search_text = analysis.search_text.clone();
2529    if let Some(ref mut text) = search_text {
2530        if text.len() > MAX_SEARCH_TEXT_LEN {
2531            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
2532            *text = truncated;
2533        }
2534    }
2535    analysis.search_text = search_text.clone();
2536
2537    let uri = if let Some(ref explicit) = args.uri {
2538        derive_uri(Some(explicit), None)
2539    } else if args.video {
2540        derive_video_uri(&payload, source_path, &analysis.mime)
2541    } else {
2542        derive_uri(None, source_path)
2543    };
2544
2545    let mut options_builder = PutOptions::builder()
2546        .enable_embedding(enable_embedding)
2547        .auto_tag(!args.no_auto_tag)
2548        .extract_dates(!args.no_extract_dates);
2549    if let Some(ts) = args.timestamp {
2550        options_builder = options_builder.timestamp(ts);
2551    }
2552    if let Some(track) = &args.track {
2553        options_builder = options_builder.track(track.clone());
2554    }
2555    if args.video {
2556        options_builder = options_builder.kind("video");
2557        options_builder = options_builder.tag("kind", "video");
2558        options_builder = options_builder.push_tag("video");
2559    } else if let Some(kind) = &args.kind {
2560        options_builder = options_builder.kind(kind.clone());
2561    }
2562    options_builder = options_builder.uri(uri.clone());
2563    if let Some(ref title) = inferred_title {
2564        options_builder = options_builder.title(title.clone());
2565    }
2566    for (key, value) in &args.tags {
2567        options_builder = options_builder.tag(key.clone(), value.clone());
2568        if !key.is_empty() {
2569            options_builder = options_builder.push_tag(key.clone());
2570        }
2571        if !value.is_empty() && value != key {
2572            options_builder = options_builder.push_tag(value.clone());
2573        }
2574    }
2575    for label in &args.labels {
2576        options_builder = options_builder.label(label.clone());
2577    }
2578    if let Some(metadata) = analysis.metadata.clone() {
2579        if !metadata.is_empty() {
2580            options_builder = options_builder.metadata(metadata);
2581        }
2582    }
2583    for (idx, entry) in args.metadata.iter().enumerate() {
2584        match serde_json::from_str::<DocMetadata>(entry) {
2585            Ok(meta) => {
2586                options_builder = options_builder.metadata(meta);
2587            }
2588            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
2589                Ok(value) => {
2590                    options_builder =
2591                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
2592                }
2593                Err(err) => {
2594                    warn!("failed to parse --metadata JSON: {err}");
2595                }
2596            },
2597        }
2598    }
2599    if let Some(text) = search_text.clone() {
2600        if !text.trim().is_empty() {
2601            options_builder = options_builder.search_text(text);
2602        }
2603    }
2604    let options = options_builder.build();
2605
2606    let existing_frame = if args.update_existing || !args.allow_duplicate {
2607        match mem.frame_by_uri(&uri) {
2608            Ok(frame) => Some(frame),
2609            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
2610            Err(err) => return Err(err.into()),
2611        }
2612    } else {
2613        None
2614    };
2615
2616    let mut embedded = false;
2617    let allow_embedding = enable_embedding && !args.video;
2618    if enable_embedding && !allow_embedding && args.video {
2619        warn!("semantic embeddings are not generated for video payloads");
2620    }
2621    let seq = if let Some(existing) = existing_frame {
2622        if args.update_existing {
2623            let payload_for_update = payload.clone();
2624            if allow_embedding {
2625                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2626                let embed_text = search_text
2627                    .clone()
2628                    .or_else(|| payload_text.map(|text| text.to_string()))
2629                    .unwrap_or_default();
2630                if embed_text.trim().is_empty() {
2631                    warn!("no textual content available; embedding skipped");
2632                    mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
2633                        .map_err(|err| {
2634                            map_put_error(
2635                                err,
2636                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
2637                            )
2638                        })?
2639                } else {
2640                    // Truncate to avoid token limits
2641                    let truncated_text = truncate_for_embedding(&embed_text);
2642                    if truncated_text.len() < embed_text.len() {
2643                        info!("Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2644                    }
2645                    let embedding = runtime.embed(&truncated_text)?;
2646                    embedded = true;
2647                    mem.update_frame(
2648                        existing.id,
2649                        Some(payload_for_update),
2650                        options.clone(),
2651                        Some(embedding),
2652                    )
2653                    .map_err(|err| {
2654                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2655                    })?
2656                }
2657            } else {
2658                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
2659                    .map_err(|err| {
2660                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2661                    })?
2662            }
2663        } else {
2664            return Err(DuplicateUriError::new(uri.as_str()).into());
2665        }
2666    } else {
2667        if let Some(guard) = capacity_guard.as_ref() {
2668            guard.ensure_capacity(stored_bytes as u64)?;
2669        }
2670        if parallel_mode {
2671            #[cfg(feature = "parallel_segments")]
2672            {
2673                let mut parent_embedding = None;
2674                let mut chunk_embeddings_vec = None;
2675                if allow_embedding {
2676                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2677                    let embed_text = search_text
2678                        .clone()
2679                        .or_else(|| payload_text.map(|text| text.to_string()))
2680                        .unwrap_or_default();
2681                    if embed_text.trim().is_empty() {
2682                        warn!("no textual content available; embedding skipped");
2683                    } else {
2684                        // Check if document will be chunked - if so, embed each chunk
2685                        info!("parallel mode: checking for chunks on {} bytes payload", payload.len());
2686                        if let Some(chunk_texts) = mem.preview_chunks(&payload) {
2687                            // Document will be chunked - embed each chunk for full semantic coverage
2688                            info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
2689
2690                            // Apply temporal enrichment if enabled (before contextual)
2691                            #[cfg(feature = "temporal_enrich")]
2692                            let enriched_chunks = if args.temporal_enrich {
2693                                info!("Temporal enrichment enabled, processing {} chunks", chunk_texts.len());
2694                                // Use today's date as fallback document date
2695                                let today = chrono::Local::now().date_naive();
2696                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
2697                                // Results are tuples of (enriched_text, TemporalEnrichment)
2698                                let enriched: Vec<String> = results.iter()
2699                                    .map(|(text, _)| text.clone())
2700                                    .collect();
2701                                let resolved_count = results.iter()
2702                                    .filter(|(_, e)| e.relative_phrases.iter().any(|p| p.resolved.is_some()))
2703                                    .count();
2704                                info!("Temporal enrichment: resolved {} chunks with temporal context", resolved_count);
2705                                enriched
2706                            } else {
2707                                chunk_texts.clone()
2708                            };
2709                            #[cfg(not(feature = "temporal_enrich"))]
2710                            let enriched_chunks = {
2711                                if args.temporal_enrich {
2712                                    warn!("Temporal enrichment requested but feature not compiled in");
2713                                }
2714                                chunk_texts.clone()
2715                            };
2716
2717                            // Apply contextual retrieval if enabled
2718                            let embed_chunks = if args.contextual {
2719                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
2720                                let engine = if args.contextual_model.as_deref() == Some("local") {
2721                                    #[cfg(feature = "llama-cpp")]
2722                                    {
2723                                        let model_path = get_local_contextual_model_path()?;
2724                                        ContextualEngine::local(model_path)
2725                                    }
2726                                    #[cfg(not(feature = "llama-cpp"))]
2727                                    {
2728                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
2729                                    }
2730                                } else if let Some(model) = &args.contextual_model {
2731                                    ContextualEngine::openai_with_model(model)?
2732                                } else {
2733                                    ContextualEngine::openai()?
2734                                };
2735
2736                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
2737                                    Ok(contexts) => {
2738                                        info!("Generated {} contextual prefixes", contexts.len());
2739                                        apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
2740                                    }
2741                                    Err(e) => {
2742                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
2743                                        enriched_chunks.clone()
2744                                    }
2745                                }
2746                            } else {
2747                                enriched_chunks
2748                            };
2749
2750                            let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
2751                            for chunk_text in &embed_chunks {
2752                                // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
2753                                let truncated_chunk = truncate_for_embedding(chunk_text);
2754                                if truncated_chunk.len() < chunk_text.len() {
2755                                    info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
2756                                }
2757                                let chunk_emb = runtime.embed(&truncated_chunk)?;
2758                                chunk_embeddings.push(chunk_emb);
2759                            }
2760                            let num_chunks = chunk_embeddings.len();
2761                            chunk_embeddings_vec = Some(chunk_embeddings);
2762                            // Also embed the parent for the parent frame (truncate to avoid token limits)
2763                            let parent_text = truncate_for_embedding(&embed_text);
2764                            if parent_text.len() < embed_text.len() {
2765                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
2766                            }
2767                            parent_embedding = Some(runtime.embed(&parent_text)?);
2768                            embedded = true;
2769                            info!("parallel mode: Generated {} chunk embeddings + 1 parent embedding", num_chunks);
2770                        } else {
2771                            // No chunking - just embed the whole document (truncate to avoid token limits)
2772                            info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
2773                            let truncated_text = truncate_for_embedding(&embed_text);
2774                            if truncated_text.len() < embed_text.len() {
2775                                info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2776                            }
2777                            parent_embedding = Some(runtime.embed(&truncated_text)?);
2778                            embedded = true;
2779                        }
2780                    }
2781                }
2782                let payload_variant = if let Some(path) = source_path {
2783                    ParallelPayload::Path(path.to_path_buf())
2784                } else {
2785                    ParallelPayload::Bytes(payload)
2786                };
2787                let input = ParallelInput {
2788                    payload: payload_variant,
2789                    options: options.clone(),
2790                    embedding: parent_embedding,
2791                    chunk_embeddings: chunk_embeddings_vec,
2792                };
2793                return Ok(IngestOutcome {
2794                    report: PutReport {
2795                        seq: 0,
2796                        uri,
2797                        title: inferred_title,
2798                        original_bytes,
2799                        stored_bytes,
2800                        compressed,
2801                        source: source_path.map(Path::to_path_buf),
2802                        mime: Some(analysis.mime),
2803                        metadata: analysis.metadata,
2804                        extraction: analysis.extraction,
2805                    },
2806                    embedded,
2807                    parallel_input: Some(input),
2808                });
2809            }
2810            #[cfg(not(feature = "parallel_segments"))]
2811            {
2812                mem.put_bytes_with_options(&payload, options)
2813                    .map_err(|err| {
2814                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2815                    })?
2816            }
2817        } else if allow_embedding {
2818            let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
2819            let embed_text = search_text
2820                .clone()
2821                .or_else(|| payload_text.map(|text| text.to_string()))
2822                .unwrap_or_default();
2823            if embed_text.trim().is_empty() {
2824                warn!("no textual content available; embedding skipped");
2825                mem.put_bytes_with_options(&payload, options)
2826                    .map_err(|err| {
2827                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2828                    })?
2829            } else {
2830                // Check if document will be chunked - if so, embed each chunk
2831                if let Some(chunk_texts) = mem.preview_chunks(&payload) {
2832                    // Document will be chunked - embed each chunk for full semantic coverage
2833                    info!("Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
2834
2835                    // Apply temporal enrichment if enabled (before contextual)
2836                    #[cfg(feature = "temporal_enrich")]
2837                    let enriched_chunks = if args.temporal_enrich {
2838                        info!("Temporal enrichment enabled, processing {} chunks", chunk_texts.len());
2839                        // Use today's date as fallback document date
2840                        let today = chrono::Local::now().date_naive();
2841                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
2842                        // Results are tuples of (enriched_text, TemporalEnrichment)
2843                        let enriched: Vec<String> = results.iter()
2844                            .map(|(text, _)| text.clone())
2845                            .collect();
2846                        let resolved_count = results.iter()
2847                            .filter(|(_, e)| e.relative_phrases.iter().any(|p| p.resolved.is_some()))
2848                            .count();
2849                        info!("Temporal enrichment: resolved {} chunks with temporal context", resolved_count);
2850                        enriched
2851                    } else {
2852                        chunk_texts.clone()
2853                    };
2854                    #[cfg(not(feature = "temporal_enrich"))]
2855                    let enriched_chunks = {
2856                        if args.temporal_enrich {
2857                            warn!("Temporal enrichment requested but feature not compiled in");
2858                        }
2859                        chunk_texts.clone()
2860                    };
2861
2862                    // Apply contextual retrieval if enabled
2863                    let embed_chunks = if args.contextual {
2864                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
2865                        let engine = if args.contextual_model.as_deref() == Some("local") {
2866                            #[cfg(feature = "llama-cpp")]
2867                            {
2868                                let model_path = get_local_contextual_model_path()?;
2869                                ContextualEngine::local(model_path)
2870                            }
2871                            #[cfg(not(feature = "llama-cpp"))]
2872                            {
2873                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
2874                            }
2875                        } else if let Some(model) = &args.contextual_model {
2876                            ContextualEngine::openai_with_model(model)?
2877                        } else {
2878                            ContextualEngine::openai()?
2879                        };
2880
2881                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
2882                            Ok(contexts) => {
2883                                info!("Generated {} contextual prefixes", contexts.len());
2884                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
2885                            }
2886                            Err(e) => {
2887                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
2888                                enriched_chunks.clone()
2889                            }
2890                        }
2891                    } else {
2892                        enriched_chunks
2893                    };
2894
2895                    let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
2896                    for chunk_text in &embed_chunks {
2897                        // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
2898                        let truncated_chunk = truncate_for_embedding(chunk_text);
2899                        if truncated_chunk.len() < chunk_text.len() {
2900                            info!("Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
2901                        }
2902                        let chunk_emb = runtime.embed(&truncated_chunk)?;
2903                        chunk_embeddings.push(chunk_emb);
2904                    }
2905                    // Truncate parent text to avoid token limits
2906                    let parent_text = truncate_for_embedding(&embed_text);
2907                    if parent_text.len() < embed_text.len() {
2908                        info!("Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
2909                    }
2910                    let parent_embedding = runtime.embed(&parent_text)?;
2911                    embedded = true;
2912                    info!("Calling put_with_chunk_embeddings with {} chunk embeddings", chunk_embeddings.len());
2913                    mem.put_with_chunk_embeddings(&payload, Some(parent_embedding), chunk_embeddings, options)
2914                        .map_err(|err| {
2915                            map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2916                        })?
2917                } else {
2918                    // No chunking - just embed the whole document (truncate to avoid token limits)
2919                    info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
2920                    let truncated_text = truncate_for_embedding(&embed_text);
2921                    if truncated_text.len() < embed_text.len() {
2922                        info!("Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
2923                    }
2924                    let embedding = runtime.embed(&truncated_text)?;
2925                    embedded = true;
2926                    mem.put_with_embedding_and_options(&payload, embedding, options)
2927                        .map_err(|err| {
2928                            map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2929                        })?
2930                }
2931            }
2932        } else {
2933            mem.put_bytes_with_options(&payload, options)
2934                .map_err(|err| {
2935                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
2936                })?
2937        }
2938    };
2939
2940    Ok(IngestOutcome {
2941        report: PutReport {
2942            seq,
2943            uri,
2944            title: inferred_title,
2945            original_bytes,
2946            stored_bytes,
2947            compressed,
2948            source: source_path.map(Path::to_path_buf),
2949            mime: Some(analysis.mime),
2950            metadata: analysis.metadata,
2951            extraction: analysis.extraction,
2952        },
2953        embedded,
2954        #[cfg(feature = "parallel_segments")]
2955        parallel_input: None,
2956    })
2957}
2958
2959fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
2960    if std::str::from_utf8(payload).is_ok() {
2961        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
2962        Ok((compressed.len(), true))
2963    } else {
2964        Ok((payload.len(), false))
2965    }
2966}
2967
2968fn print_report(report: &PutReport) {
2969    let name = report
2970        .source
2971        .as_ref()
2972        .map(|path| {
2973            let pretty = env::current_dir()
2974                .ok()
2975                .as_ref()
2976                .and_then(|cwd| diff_paths(path, cwd))
2977                .unwrap_or_else(|| path.to_path_buf());
2978            pretty.to_string_lossy().into_owned()
2979        })
2980        .unwrap_or_else(|| "stdin".to_string());
2981
2982    let ratio = if report.original_bytes > 0 {
2983        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
2984    } else {
2985        100.0
2986    };
2987
2988    println!("• {name} → {}", report.uri);
2989    println!("  seq: {}", report.seq);
2990    if report.compressed {
2991        println!(
2992            "  size: {} B → {} B ({:.1}% of original, compressed)",
2993            report.original_bytes, report.stored_bytes, ratio
2994        );
2995    } else {
2996        println!("  size: {} B (stored as-is)", report.original_bytes);
2997    }
2998    if let Some(mime) = &report.mime {
2999        println!("  mime: {mime}");
3000    }
3001    if let Some(title) = &report.title {
3002        println!("  title: {title}");
3003    }
3004    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3005    println!(
3006        "  extraction: status={} reader={}",
3007        report.extraction.status.label(),
3008        extraction_reader
3009    );
3010    if let Some(ms) = report.extraction.duration_ms {
3011        println!("  extraction-duration: {} ms", ms);
3012    }
3013    if let Some(pages) = report.extraction.pages_processed {
3014        println!("  extraction-pages: {}", pages);
3015    }
3016    for warning in &report.extraction.warnings {
3017        println!("  warning: {warning}");
3018    }
3019    if let Some(metadata) = &report.metadata {
3020        if let Some(caption) = &metadata.caption {
3021            println!("  caption: {caption}");
3022        }
3023        if let Some(audio) = metadata.audio.as_ref() {
3024            if audio.duration_secs.is_some()
3025                || audio.sample_rate_hz.is_some()
3026                || audio.channels.is_some()
3027            {
3028                let duration = audio
3029                    .duration_secs
3030                    .map(|secs| format!("{secs:.1}s"))
3031                    .unwrap_or_else(|| "unknown".into());
3032                let rate = audio
3033                    .sample_rate_hz
3034                    .map(|hz| format!("{} Hz", hz))
3035                    .unwrap_or_else(|| "? Hz".into());
3036                let channels = audio
3037                    .channels
3038                    .map(|ch| ch.to_string())
3039                    .unwrap_or_else(|| "?".into());
3040                println!("  audio: duration {duration}, {channels} ch, {rate}");
3041            }
3042        }
3043    }
3044
3045    println!();
3046}
3047
3048fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3049    let source = report
3050        .source
3051        .as_ref()
3052        .map(|path| path.to_string_lossy().into_owned());
3053    let extraction = json!({
3054        "status": report.extraction.status.label(),
3055        "reader": report.extraction.reader,
3056        "duration_ms": report.extraction.duration_ms,
3057        "pages_processed": report.extraction.pages_processed,
3058        "warnings": report.extraction.warnings,
3059    });
3060    json!({
3061        "seq": report.seq,
3062        "uri": report.uri,
3063        "title": report.title,
3064        "original_bytes": report.original_bytes,
3065        "original_bytes_human": format_bytes(report.original_bytes as u64),
3066        "stored_bytes": report.stored_bytes,
3067        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3068        "compressed": report.compressed,
3069        "mime": report.mime,
3070        "source": source,
3071        "metadata": report.metadata,
3072        "extraction": extraction,
3073    })
3074}
3075
3076fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3077    match err {
3078        MemvidError::CapacityExceeded {
3079            current,
3080            limit,
3081            required,
3082        } => anyhow!(CapacityExceededMessage {
3083            current,
3084            limit,
3085            required,
3086        }),
3087        other => other.into(),
3088    }
3089}
3090
3091fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3092    for (idx, entry) in entries.iter().enumerate() {
3093        match serde_json::from_str::<DocMetadata>(entry) {
3094            Ok(meta) => {
3095                options.metadata = Some(meta);
3096            }
3097            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3098                Ok(value) => {
3099                    options
3100                        .extra_metadata
3101                        .insert(format!("custom_metadata_{idx}"), value.to_string());
3102                }
3103                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3104            },
3105        }
3106    }
3107}
3108
3109fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3110    let stats = mem.stats()?;
3111    let message = match stats.tier {
3112        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3113        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3114    };
3115    Ok(message)
3116}
3117
3118fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3119    let trimmed = raw.trim().trim_start_matches("mv2://");
3120    let normalized = trimmed.replace('\\', "/");
3121    let mut segments: Vec<String> = Vec::new();
3122    for segment in normalized.split('/') {
3123        if segment.is_empty() || segment == "." {
3124            continue;
3125        }
3126        if segment == ".." {
3127            segments.pop();
3128            continue;
3129        }
3130        segments.push(segment.to_string());
3131    }
3132
3133    if segments.is_empty() {
3134        return normalized
3135            .split('/')
3136            .last()
3137            .filter(|s| !s.is_empty())
3138            .unwrap_or("document")
3139            .to_string();
3140    }
3141
3142    if keep_path {
3143        segments.join("/")
3144    } else {
3145        segments
3146            .last()
3147            .cloned()
3148            .unwrap_or_else(|| "document".to_string())
3149    }
3150}
3151
3152fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3153    if quiet {
3154        let pb = ProgressBar::hidden();
3155        pb.set_message(message.to_string());
3156        return pb;
3157    }
3158
3159    match total {
3160        Some(len) => {
3161            let pb = ProgressBar::new(len);
3162            pb.set_draw_target(ProgressDrawTarget::stderr());
3163            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3164                .unwrap_or_else(|_| ProgressStyle::default_bar());
3165            pb.set_style(style);
3166            pb.set_message(message.to_string());
3167            pb
3168        }
3169        None => {
3170            let pb = ProgressBar::new_spinner();
3171            pb.set_draw_target(ProgressDrawTarget::stderr());
3172            pb.set_message(message.to_string());
3173            pb.enable_steady_tick(Duration::from_millis(120));
3174            pb
3175        }
3176    }
3177}
3178
3179fn create_spinner(message: &str) -> ProgressBar {
3180    let pb = ProgressBar::new_spinner();
3181    pb.set_draw_target(ProgressDrawTarget::stderr());
3182    let style = ProgressStyle::with_template("{spinner} {msg}")
3183        .unwrap_or_else(|_| ProgressStyle::default_spinner())
3184        .tick_strings(&["-", "\\", "|", "/"]);
3185    pb.set_style(style);
3186    pb.set_message(message.to_string());
3187    pb.enable_steady_tick(Duration::from_millis(120));
3188    pb
3189}
3190
3191// ============================================================================
3192// put-many command - batch ingestion with pre-computed embeddings
3193// ============================================================================
3194
3195/// Arguments for the `put-many` subcommand
3196#[cfg(feature = "parallel_segments")]
3197#[derive(Args)]
3198pub struct PutManyArgs {
3199    /// Path to the memory file to append to
3200    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3201    pub file: PathBuf,
3202    /// Emit JSON instead of human-readable output
3203    #[arg(long)]
3204    pub json: bool,
3205    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
3206    /// If not specified, reads from stdin
3207    #[arg(long, value_name = "PATH")]
3208    pub input: Option<PathBuf>,
3209    /// Zstd compression level (1-9, default: 3)
3210    #[arg(long, default_value = "3")]
3211    pub compression_level: i32,
3212    #[command(flatten)]
3213    pub lock: LockCliArgs,
3214}
3215
3216/// JSON input format for put-many requests
3217#[cfg(feature = "parallel_segments")]
3218#[derive(Debug, serde::Deserialize)]
3219pub struct PutManyRequest {
3220    pub title: String,
3221    #[serde(default)]
3222    pub label: String,
3223    pub text: String,
3224    #[serde(default)]
3225    pub uri: Option<String>,
3226    #[serde(default)]
3227    pub tags: Vec<String>,
3228    #[serde(default)]
3229    pub labels: Vec<String>,
3230    #[serde(default)]
3231    pub metadata: serde_json::Value,
3232    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
3233    #[serde(default)]
3234    pub embedding: Option<Vec<f32>>,
3235    /// Pre-computed embeddings for each chunk (matched by index)
3236    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
3237    /// The number of chunk embeddings should match the number of chunks that will be created.
3238    #[serde(default)]
3239    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3240}
3241
3242/// Batch input format (array of requests)
3243#[cfg(feature = "parallel_segments")]
3244#[derive(Debug, serde::Deserialize)]
3245#[serde(transparent)]
3246pub struct PutManyBatch {
3247    pub requests: Vec<PutManyRequest>,
3248}
3249
3250#[cfg(feature = "parallel_segments")]
3251pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3252    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3253
3254    let mut mem = Memvid::open(&args.file)?;
3255    crate::utils::ensure_cli_mutation_allowed(&mem)?;
3256    crate::utils::apply_lock_cli(&mut mem, &args.lock);
3257
3258    // Ensure indexes are enabled
3259    let stats = mem.stats()?;
3260    if !stats.has_lex_index {
3261        mem.enable_lex()?;
3262    }
3263    if !stats.has_vec_index {
3264        mem.enable_vec()?;
3265    }
3266
3267    // Read batch input
3268    let batch_json: String = if let Some(input_path) = &args.input {
3269        fs::read_to_string(input_path)
3270            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3271    } else {
3272        let mut buffer = String::new();
3273        io::stdin()
3274            .read_line(&mut buffer)
3275            .context("Failed to read from stdin")?;
3276        buffer
3277    };
3278
3279    let batch: PutManyBatch =
3280        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3281
3282    if batch.requests.is_empty() {
3283        if args.json {
3284            println!("{{\"frame_ids\": [], \"count\": 0}}");
3285        } else {
3286            println!("No requests to process");
3287        }
3288        return Ok(());
3289    }
3290
3291    let count = batch.requests.len();
3292    if !args.json {
3293        eprintln!("Processing {} documents...", count);
3294    }
3295
3296    // Convert to ParallelInput
3297    let inputs: Vec<ParallelInput> = batch
3298        .requests
3299        .into_iter()
3300        .enumerate()
3301        .map(|(i, req)| {
3302            let uri = req
3303                .uri
3304                .unwrap_or_else(|| format!("mv2://batch/{}", i));
3305            let mut options = PutOptions::default();
3306            options.title = Some(req.title);
3307            options.uri = Some(uri);
3308            options.tags = req.tags;
3309            options.labels = req.labels;
3310
3311            ParallelInput {
3312                payload: ParallelPayload::Bytes(req.text.into_bytes()),
3313                options,
3314                embedding: req.embedding,
3315                chunk_embeddings: req.chunk_embeddings,
3316            }
3317        })
3318        .collect();
3319
3320    // Build options
3321    let build_opts = BuildOpts {
3322        zstd_level: args.compression_level.clamp(1, 9),
3323        ..BuildOpts::default()
3324    };
3325
3326    // Ingest batch
3327    let frame_ids = mem
3328        .put_parallel_inputs(&inputs, build_opts)
3329        .context("Failed to ingest batch")?;
3330
3331    if args.json {
3332        let output = serde_json::json!({
3333            "frame_ids": frame_ids,
3334            "count": frame_ids.len()
3335        });
3336        println!("{}", serde_json::to_string(&output)?);
3337    } else {
3338        println!("Ingested {} documents", frame_ids.len());
3339        if frame_ids.len() <= 10 {
3340            for fid in &frame_ids {
3341                println!("  frame_id: {}", fid);
3342            }
3343        } else {
3344            println!("  first: {}", frame_ids.first().unwrap_or(&0));
3345            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
3346        }
3347    }
3348
3349    Ok(())
3350}