memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34};
35#[cfg(feature = "clip")]
36use memvid_core::FrameRole;
37#[cfg(feature = "parallel_segments")]
38use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
39use pdf_extract::OutputError as PdfExtractError;
40use serde_json::json;
41use tracing::{info, warn};
42use tree_magic_mini::from_u8 as magic_from_u8;
43use uuid::Uuid;
44
45const MAX_SEARCH_TEXT_LEN: usize = 32_768;
46/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
47/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
48const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
49const COLOR_PALETTE_SIZE: u8 = 5;
50const COLOR_PALETTE_QUALITY: u8 = 8;
51const MAGIC_SNIFF_BYTES: usize = 16;
52/// Maximum number of images to extract from a PDF for CLIP visual search.
53/// Set high to capture all images; processing stops when no more images found.
54#[cfg(feature = "clip")]
55const CLIP_PDF_MAX_IMAGES: usize = 100;
56#[cfg(feature = "clip")]
57const CLIP_PDF_TARGET_PX: u32 = 768;
58
59/// Truncate text for embedding to fit within OpenAI token limits.
60/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
61fn truncate_for_embedding(text: &str) -> String {
62    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
63        text.to_string()
64    } else {
65        // Find a char boundary to avoid splitting UTF-8
66        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
67        // Try to find the last complete character
68        let end = truncated
69            .char_indices()
70            .rev()
71            .next()
72            .map(|(i, c)| i + c.len_utf8())
73            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
74        text[..end].to_string()
75    }
76}
77
78/// Get the default local model path for contextual retrieval (phi-3.5-mini).
79/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
80#[cfg(feature = "llama-cpp")]
81fn get_local_contextual_model_path() -> Result<PathBuf> {
82    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
83        // Handle ~ expansion manually
84        let expanded = if custom.starts_with("~/") {
85            if let Ok(home) = env::var("HOME") {
86                PathBuf::from(home).join(&custom[2..])
87            } else {
88                PathBuf::from(&custom)
89            }
90        } else {
91            PathBuf::from(&custom)
92        };
93        expanded
94    } else {
95        // Default to ~/.memvid/models
96        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
97        PathBuf::from(home).join(".memvid").join("models")
98    };
99
100    let model_path = models_dir
101        .join("llm")
102        .join("phi-3-mini-q4")
103        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
104
105    if model_path.exists() {
106        Ok(model_path)
107    } else {
108        Err(anyhow!(
109            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
110            model_path.display()
111        ))
112    }
113}
114
115use pathdiff::diff_paths;
116
117use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
118use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
119use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
120use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
121use crate::error::{CapacityExceededMessage, DuplicateUriError};
122use crate::utils::{
123    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
124    select_frame,
125};
126#[cfg(feature = "temporal_enrich")]
127use memvid_core::enrich_chunks as temporal_enrich_chunks;
128
129/// Helper function to parse boolean flags from environment variables
130fn parse_bool_flag(value: &str) -> Option<bool> {
131    match value.trim().to_ascii_lowercase().as_str() {
132        "1" | "true" | "yes" | "on" => Some(true),
133        "0" | "false" | "no" | "off" => Some(false),
134        _ => None,
135    }
136}
137
138/// Helper function to check for parallel segments environment override
139#[cfg(feature = "parallel_segments")]
140fn parallel_env_override() -> Option<bool> {
141    std::env::var("MEMVID_PARALLEL_SEGMENTS")
142        .ok()
143        .and_then(|value| parse_bool_flag(&value))
144}
145
146/// Check if CLIP model files are installed
147#[cfg(feature = "clip")]
148fn is_clip_model_installed() -> bool {
149    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
150        PathBuf::from(custom)
151    } else if let Ok(home) = env::var("HOME") {
152        PathBuf::from(home).join(".memvid/models")
153    } else {
154        PathBuf::from(".memvid/models")
155    };
156
157    let model_name =
158        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
159
160    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
161    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
162
163    vision_model.exists() && text_model.exists()
164}
165
166/// Minimum confidence threshold for NER entities (0.0-1.0)
167/// Note: Lower threshold captures more entities but also more noise
168#[cfg(feature = "logic_mesh")]
169const NER_MIN_CONFIDENCE: f32 = 0.50;
170
171/// Minimum length for entity names (characters)
172#[cfg(feature = "logic_mesh")]
173const NER_MIN_ENTITY_LEN: usize = 2;
174
175/// Maximum gap (in characters) between adjacent entities to merge
176/// Allows merging "S" + "&" + "P Global" when they're close together
177#[cfg(feature = "logic_mesh")]
178const MAX_MERGE_GAP: usize = 3;
179
180/// Merge adjacent NER entities that appear to be tokenization artifacts.
181///
182/// The BERT tokenizer splits names at special characters like `&`, producing
183/// separate entities like "S", "&", "P Global" instead of "S&P Global".
184/// This function merges adjacent entities of the same type when they're
185/// contiguous or nearly contiguous in the original text.
186#[cfg(feature = "logic_mesh")]
187fn merge_adjacent_entities(
188    entities: Vec<memvid_core::ExtractedEntity>,
189    original_text: &str,
190) -> Vec<memvid_core::ExtractedEntity> {
191    use memvid_core::ExtractedEntity;
192
193    if entities.is_empty() {
194        return entities;
195    }
196
197    // Sort by byte position
198    let mut sorted: Vec<ExtractedEntity> = entities;
199    sorted.sort_by_key(|e| e.byte_start);
200
201    let mut merged: Vec<ExtractedEntity> = Vec::new();
202
203    for entity in sorted {
204        if let Some(last) = merged.last_mut() {
205            // Check if this entity is adjacent to the previous one and same type
206            let gap = entity.byte_start.saturating_sub(last.byte_end);
207            let same_type = last.entity_type == entity.entity_type;
208
209            // Check what's in the gap (if any)
210            let gap_is_mergeable = if gap == 0 {
211                true
212            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
213                // Gap content should be whitespace (but not newlines), punctuation, or connectors
214                let gap_text = &original_text[last.byte_end..entity.byte_start];
215                // Don't merge across newlines - that indicates separate entities
216                if gap_text.contains('\n') || gap_text.contains('\r') {
217                    false
218                } else {
219                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
220                }
221            } else {
222                false
223            };
224
225            if same_type && gap_is_mergeable {
226                // Merge: extend the previous entity to include this one
227                let merged_text = if entity.byte_end <= original_text.len() {
228                    original_text[last.byte_start..entity.byte_end].to_string()
229                } else {
230                    format!("{}{}", last.text, entity.text)
231                };
232
233                tracing::debug!(
234                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
235                    last.text, entity.text, merged_text, gap
236                );
237
238                last.text = merged_text;
239                last.byte_end = entity.byte_end;
240                // Average the confidence
241                last.confidence = (last.confidence + entity.confidence) / 2.0;
242                continue;
243            }
244        }
245
246        merged.push(entity);
247    }
248
249    merged
250}
251
252/// Filter and clean extracted NER entities to remove noise.
253/// Returns true if the entity should be kept.
254#[cfg(feature = "logic_mesh")]
255fn is_valid_entity(text: &str, confidence: f32) -> bool {
256    // Filter by confidence
257    if confidence < NER_MIN_CONFIDENCE {
258        return false;
259    }
260
261    let trimmed = text.trim();
262
263    // Filter by minimum length
264    if trimmed.len() < NER_MIN_ENTITY_LEN {
265        return false;
266    }
267
268    // Filter out entities that are just whitespace or punctuation
269    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
270        return false;
271    }
272
273    // Filter out entities containing newlines (malformed extractions)
274    if trimmed.contains('\n') || trimmed.contains('\r') {
275        return false;
276    }
277
278    // Filter out entities that look like partial words (start/end with ##)
279    if trimmed.starts_with("##") || trimmed.ends_with("##") {
280        return false;
281    }
282
283    // Filter out entities ending with period (likely sentence fragments)
284    if trimmed.ends_with('.') {
285        return false;
286    }
287
288    let lower = trimmed.to_lowercase();
289
290    // Filter out common single-letter false positives
291    if trimmed.len() == 1 {
292        return false;
293    }
294
295    // Filter out 2-letter tokens that are common noise
296    if trimmed.len() == 2 {
297        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
298        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
299            return true;
300        }
301        // Filter other 2-letter tokens
302        return false;
303    }
304
305    // Filter out common noise words
306    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
307        return false;
308    }
309
310    // Filter out partial tokenization artifacts
311    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
312    let words: Vec<&str> = trimmed.split_whitespace().collect();
313    if words.len() >= 2 {
314        let first_word = words[0];
315        // If first word is 1-2 chars and looks like a fragment, filter it
316        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
317            // But allow "US Bank", "UK Office", etc.
318            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
319                return false;
320            }
321        }
322    }
323
324    // Filter entities that start with lowercase (likely partial words)
325    if let Some(first_char) = trimmed.chars().next() {
326        if first_char.is_lowercase() {
327            return false;
328        }
329    }
330
331    true
332}
333
334/// Parse key=value pairs for tags
335pub fn parse_key_val(s: &str) -> Result<(String, String)> {
336    let (key, value) = s
337        .split_once('=')
338        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
339    Ok((key.to_string(), value.to_string()))
340}
341
342/// Lock-related CLI arguments (shared across commands)
343#[derive(Args, Clone, Copy, Debug)]
344pub struct LockCliArgs {
345    /// Maximum time to wait for an active writer before failing (ms)
346    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
347    pub lock_timeout: u64,
348    /// Attempt a stale takeover if the recorded writer heartbeat has expired
349    #[arg(long = "force", action = ArgAction::SetTrue)]
350    pub force: bool,
351}
352
353/// Arguments for the `put` subcommand
354#[derive(Args)]
355pub struct PutArgs {
356    /// Path to the memory file to append to
357    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
358    pub file: PathBuf,
359    /// Emit JSON instead of human-readable output
360    #[arg(long)]
361    pub json: bool,
362    /// Read payload bytes from a file instead of stdin
363    #[arg(long, value_name = "PATH")]
364    pub input: Option<String>,
365    /// Override the derived URI for the document
366    #[arg(long, value_name = "URI")]
367    pub uri: Option<String>,
368    /// Override the derived title for the document
369    #[arg(long, value_name = "TITLE")]
370    pub title: Option<String>,
371    /// Optional POSIX timestamp to store on the frame
372    #[arg(long)]
373    pub timestamp: Option<i64>,
374    /// Track name for the frame
375    #[arg(long)]
376    pub track: Option<String>,
377    /// Kind metadata for the frame
378    #[arg(long)]
379    pub kind: Option<String>,
380    /// Store the payload as a binary video without transcoding
381    #[arg(long, action = ArgAction::SetTrue)]
382    pub video: bool,
383    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
384    #[arg(long, action = ArgAction::SetTrue)]
385    pub transcript: bool,
386    /// Tags to attach in key=value form
387    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
388    pub tags: Vec<(String, String)>,
389    /// Free-form labels to attach to the frame
390    #[arg(long = "label", value_name = "LABEL")]
391    pub labels: Vec<String>,
392    /// Additional metadata payloads expressed as JSON
393    #[arg(long = "metadata", value_name = "JSON")]
394    pub metadata: Vec<String>,
395    /// Disable automatic tag generation from extracted text
396    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
397    pub no_auto_tag: bool,
398    /// Disable automatic date extraction from content
399    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
400    pub no_extract_dates: bool,
401    /// Force audio analysis and tagging when ingesting binary data
402    #[arg(long, action = ArgAction::SetTrue)]
403    pub audio: bool,
404    /// Override the default segment duration (in seconds) when generating audio snippets
405    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
406    pub audio_segment_seconds: Option<u32>,
407    /// Compute semantic embeddings using the installed model
408    /// Increases storage overhead from ~5KB to ~270KB per document
409    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
410    pub embedding: bool,
411    /// Explicitly disable embeddings (default, but kept for clarity)
412    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
413    pub no_embedding: bool,
414    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
415    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
416    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
417    pub embedding_vec: Option<PathBuf>,
418    /// Enable Product Quantization compression for vectors (16x compression)
419    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
420    /// Only applies when --embedding is enabled
421    #[arg(long, action = ArgAction::SetTrue)]
422    pub vector_compression: bool,
423    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
424    /// This improves retrieval accuracy for preference/personalization queries
425    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
426    #[arg(long, action = ArgAction::SetTrue)]
427    pub contextual: bool,
428    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
429    #[arg(long = "contextual-model", value_name = "MODEL")]
430    pub contextual_model: Option<String>,
431    /// Automatically extract tables from PDF documents
432    /// Uses aggressive mode with fallbacks for best results
433    #[arg(long, action = ArgAction::SetTrue)]
434    pub tables: bool,
435    /// Disable automatic table extraction (when --tables is the default)
436    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
437    pub no_tables: bool,
438    /// Enable CLIP visual embeddings for images and PDF pages
439    /// Allows text-to-image search using natural language queries
440    /// Auto-enabled when CLIP model is installed; use --no-clip to disable
441    #[cfg(feature = "clip")]
442    #[arg(long, action = ArgAction::SetTrue)]
443    pub clip: bool,
444
445    /// Disable CLIP visual embeddings even when model is available
446    #[cfg(feature = "clip")]
447    #[arg(long, action = ArgAction::SetTrue)]
448    pub no_clip: bool,
449
450    /// Replace any existing frame with the same URI instead of inserting a duplicate
451    #[arg(long, conflicts_with = "allow_duplicate")]
452    pub update_existing: bool,
453    /// Allow inserting a new frame even if a frame with the same URI already exists
454    #[arg(long)]
455    pub allow_duplicate: bool,
456
457    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
458    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
459    /// Requires the temporal_enrich feature in memvid-core
460    #[arg(long, action = ArgAction::SetTrue)]
461    pub temporal_enrich: bool,
462
463    /// Build Logic-Mesh entity graph during ingestion (opt-in)
464    /// Extracts entities (people, organizations, locations, etc.) and their relationships
465    /// Enables graph traversal with `memvid follow`
466    /// Requires NER model: `memvid models install --ner distilbert-ner`
467    #[arg(long, action = ArgAction::SetTrue)]
468    pub logic_mesh: bool,
469
470    /// Use the experimental parallel segment builder (requires --features parallel_segments)
471    #[cfg(feature = "parallel_segments")]
472    #[arg(long, action = ArgAction::SetTrue)]
473    pub parallel_segments: bool,
474    /// Force the legacy ingestion path even when the parallel feature is available
475    #[cfg(feature = "parallel_segments")]
476    #[arg(long, action = ArgAction::SetTrue)]
477    pub no_parallel_segments: bool,
478    /// Target tokens per segment when using the parallel builder
479    #[cfg(feature = "parallel_segments")]
480    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
481    pub parallel_segment_tokens: Option<usize>,
482    /// Target pages per segment when using the parallel builder
483    #[cfg(feature = "parallel_segments")]
484    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
485    pub parallel_segment_pages: Option<usize>,
486    /// Worker threads used by the parallel builder
487    #[cfg(feature = "parallel_segments")]
488    #[arg(long = "parallel-threads", value_name = "N")]
489    pub parallel_threads: Option<usize>,
490    /// Worker queue depth used by the parallel builder
491    #[cfg(feature = "parallel_segments")]
492    #[arg(long = "parallel-queue-depth", value_name = "N")]
493    pub parallel_queue_depth: Option<usize>,
494
495    #[command(flatten)]
496    pub lock: LockCliArgs,
497}
498
499#[cfg(feature = "parallel_segments")]
500impl PutArgs {
501    pub fn wants_parallel(&self) -> bool {
502        if self.parallel_segments {
503            return true;
504        }
505        if self.no_parallel_segments {
506            return false;
507        }
508        if let Some(env_flag) = parallel_env_override() {
509            return env_flag;
510        }
511        false
512    }
513
514    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
515        let mut opts = memvid_core::BuildOpts::default();
516        if let Some(tokens) = self.parallel_segment_tokens {
517            opts.segment_tokens = tokens;
518        }
519        if let Some(pages) = self.parallel_segment_pages {
520            opts.segment_pages = pages;
521        }
522        if let Some(threads) = self.parallel_threads {
523            opts.threads = threads;
524        }
525        if let Some(depth) = self.parallel_queue_depth {
526            opts.queue_depth = depth;
527        }
528        opts.sanitize();
529        opts
530    }
531}
532
533#[cfg(not(feature = "parallel_segments"))]
534impl PutArgs {
535    pub fn wants_parallel(&self) -> bool {
536        false
537    }
538}
539
540/// Arguments for the `api-fetch` subcommand
541#[derive(Args)]
542pub struct ApiFetchArgs {
543    /// Path to the memory file to ingest into
544    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
545    pub file: PathBuf,
546    /// Path to the fetch configuration JSON file
547    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
548    pub config: PathBuf,
549    /// Perform a dry run without writing to the memory
550    #[arg(long, action = ArgAction::SetTrue)]
551    pub dry_run: bool,
552    /// Override the configured ingest mode
553    #[arg(long, value_name = "MODE")]
554    pub mode: Option<ApiFetchMode>,
555    /// Override the base URI used when constructing frame URIs
556    #[arg(long, value_name = "URI")]
557    pub uri: Option<String>,
558    /// Emit JSON instead of human-readable output
559    #[arg(long, action = ArgAction::SetTrue)]
560    pub json: bool,
561
562    #[command(flatten)]
563    pub lock: LockCliArgs,
564}
565
566/// Arguments for the `update` subcommand
567#[derive(Args)]
568pub struct UpdateArgs {
569    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
570    pub file: PathBuf,
571    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
572    pub frame_id: Option<u64>,
573    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
574    pub uri: Option<String>,
575    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
576    pub input: Option<PathBuf>,
577    #[arg(long = "set-uri", value_name = "URI")]
578    pub set_uri: Option<String>,
579    #[arg(long, value_name = "TITLE")]
580    pub title: Option<String>,
581    #[arg(long, value_name = "TIMESTAMP")]
582    pub timestamp: Option<i64>,
583    #[arg(long, value_name = "TRACK")]
584    pub track: Option<String>,
585    #[arg(long, value_name = "KIND")]
586    pub kind: Option<String>,
587    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
588    pub tags: Vec<(String, String)>,
589    #[arg(long = "label", value_name = "LABEL")]
590    pub labels: Vec<String>,
591    #[arg(long = "metadata", value_name = "JSON")]
592    pub metadata: Vec<String>,
593    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
594    pub embeddings: bool,
595    #[arg(long)]
596    pub json: bool,
597
598    #[command(flatten)]
599    pub lock: LockCliArgs,
600}
601
602/// Arguments for the `delete` subcommand
603#[derive(Args)]
604pub struct DeleteArgs {
605    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
606    pub file: PathBuf,
607    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
608    pub frame_id: Option<u64>,
609    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
610    pub uri: Option<String>,
611    #[arg(long, action = ArgAction::SetTrue)]
612    pub yes: bool,
613    #[arg(long)]
614    pub json: bool,
615
616    #[command(flatten)]
617    pub lock: LockCliArgs,
618}
619
620/// Handler for `memvid api-fetch`
621pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
622    let command = ApiFetchCommand {
623        file: args.file,
624        config_path: args.config,
625        dry_run: args.dry_run,
626        mode_override: args.mode,
627        uri_override: args.uri,
628        output_json: args.json,
629        lock_timeout_ms: args.lock.lock_timeout,
630        force_lock: args.lock.force,
631    };
632    run_api_fetch(config, command)
633}
634
635/// Handler for `memvid delete`
636pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
637    let mut mem = Memvid::open(&args.file)?;
638    ensure_cli_mutation_allowed(&mem)?;
639    apply_lock_cli(&mut mem, &args.lock);
640    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
641
642    if !args.yes {
643        if let Some(uri) = &frame.uri {
644            println!("About to delete frame {} ({uri})", frame.id);
645        } else {
646            println!("About to delete frame {}", frame.id);
647        }
648        print!("Confirm? [y/N]: ");
649        io::stdout().flush()?;
650        let mut input = String::new();
651        io::stdin().read_line(&mut input)?;
652        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
653        if !confirmed {
654            println!("Aborted");
655            return Ok(());
656        }
657    }
658
659    let seq = mem.delete_frame(frame.id)?;
660    mem.commit()?;
661    let deleted = mem.frame_by_id(frame.id)?;
662
663    if args.json {
664        let json = json!({
665            "frame_id": frame.id,
666            "sequence": seq,
667            "status": frame_status_str(deleted.status),
668        });
669        println!("{}", serde_json::to_string_pretty(&json)?);
670    } else {
671        println!("Deleted frame {} (seq {})", frame.id, seq);
672    }
673
674    Ok(())
675}
676pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
677    let mut mem = Memvid::open(&args.file)?;
678    ensure_cli_mutation_allowed(&mem)?;
679    apply_lock_cli(&mut mem, &args.lock);
680    let mut stats = mem.stats()?;
681    if stats.frame_count == 0 && !stats.has_lex_index {
682        mem.enable_lex()?;
683        stats = mem.stats()?;
684    }
685
686    // Set vector compression mode if requested
687    if args.vector_compression {
688        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
689    }
690
691    let mut capacity_guard = CapacityGuard::from_stats(&stats);
692    let use_parallel = args.wants_parallel();
693    #[cfg(feature = "parallel_segments")]
694    let mut parallel_opts = if use_parallel {
695        Some(args.sanitized_parallel_opts())
696    } else {
697        None
698    };
699    #[cfg(feature = "parallel_segments")]
700    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
701    #[cfg(feature = "parallel_segments")]
702    let mut pending_parallel_indices: Vec<usize> = Vec::new();
703    let input_set = resolve_inputs(args.input.as_deref())?;
704
705    if args.video {
706        if let Some(kind) = &args.kind {
707            if !kind.eq_ignore_ascii_case("video") {
708                anyhow::bail!("--video conflicts with explicit --kind={kind}");
709            }
710        }
711        if matches!(input_set, InputSet::Stdin) {
712            anyhow::bail!("--video requires --input <file>");
713        }
714    }
715
716    #[allow(dead_code)]
717    enum EmbeddingMode {
718        None,
719        Auto,
720        Explicit,
721    }
722
723    // PHASE 1: Make embeddings opt-in, not opt-out
724    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
725    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
726    let mv2_dimension = mem.vec_index_dimension();
727    let (embedding_mode, embedding_runtime) = if args.embedding {
728        (
729            EmbeddingMode::Explicit,
730            Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
731        )
732    } else {
733        (EmbeddingMode::None, None)
734    };
735    let embedding_enabled = embedding_runtime.is_some();
736    let runtime_ref = embedding_runtime.as_ref();
737
738    // Enable CLIP index if:
739    // 1. --clip flag is set explicitly, OR
740    // 2. Input contains image files and model is available, OR
741    // 3. Model is installed (auto-detect) and --no-clip is not set
742    #[cfg(feature = "clip")]
743    let clip_enabled = {
744        if args.no_clip {
745            false
746        } else if args.clip {
747            true // Explicitly requested
748        } else {
749            // Auto-detect: check if model is installed
750            let model_available = is_clip_model_installed();
751            if model_available {
752                // Model is installed, check if input has images or PDFs
753                match &input_set {
754                    InputSet::Files(paths) => paths.iter().any(|p| {
755                        is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
756                    }),
757                    InputSet::Stdin => false,
758                }
759            } else {
760                false
761            }
762        }
763    };
764    #[cfg(feature = "clip")]
765    let clip_model = if clip_enabled {
766        mem.enable_clip()?;
767        if !args.json {
768            let mode = if args.clip {
769                "explicit"
770            } else {
771                "auto-detected"
772            };
773            eprintln!("ℹ️  CLIP visual embeddings enabled ({mode})");
774            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
775            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
776            eprintln!("   Use --no-clip to disable auto-detection");
777            eprintln!();
778        }
779        // Initialize CLIP model for generating image embeddings
780        match memvid_core::ClipModel::default_model() {
781            Ok(model) => Some(model),
782            Err(e) => {
783                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
784                None
785            }
786        }
787    } else {
788        None
789    };
790
791    // Warn user about vector storage overhead (PHASE 2)
792    if embedding_enabled && !args.json {
793        let capacity = stats.capacity_bytes;
794        if args.vector_compression {
795            // PHASE 3: PQ compression reduces storage 16x
796            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
797            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
798            eprintln!("   Storage: ~20 KB per document (16x compressed)");
799            eprintln!("   Accuracy: ~95% recall@10 maintained");
800            eprintln!(
801                "   Capacity: ~{} documents max for {:.1} GB",
802                max_docs,
803                capacity as f64 / 1e9
804            );
805            eprintln!();
806        } else {
807            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
808            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
809            eprintln!("   Storage: ~270 KB per document");
810            eprintln!(
811                "   Capacity: ~{} documents max for {:.1} GB",
812                max_docs,
813                capacity as f64 / 1e9
814            );
815            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
816            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
817            eprintln!();
818        }
819    }
820
821    let embed_progress = if embedding_enabled {
822        let total = match &input_set {
823            InputSet::Files(paths) => Some(paths.len() as u64),
824            InputSet::Stdin => None,
825        };
826        Some(create_task_progress_bar(total, "embed", false))
827    } else {
828        None
829    };
830    let mut embedded_docs = 0usize;
831
832    if let InputSet::Files(paths) = &input_set {
833        if paths.len() > 1 {
834            if args.uri.is_some() || args.title.is_some() {
835                anyhow::bail!(
836                    "--uri/--title apply to a single file; omit them when using a directory or glob"
837                );
838            }
839        }
840    }
841
842    let mut reports = Vec::new();
843    let mut processed = 0usize;
844    let mut skipped = 0usize;
845    let mut bytes_added = 0u64;
846    let mut summary_warnings: Vec<String> = Vec::new();
847    #[cfg(feature = "clip")]
848    let mut clip_embeddings_added = false;
849
850    let mut capacity_reached = false;
851    match input_set {
852        InputSet::Stdin => {
853            processed += 1;
854            match read_payload(None) {
855                Ok(payload) => {
856                    let mut analyze_spinner = if args.json {
857                        None
858                    } else {
859                        Some(create_spinner("Analyzing stdin payload..."))
860                    };
861                    match ingest_payload(
862                        &mut mem,
863                        &args,
864                        payload,
865                        None,
866                        capacity_guard.as_mut(),
867                        embedding_enabled,
868                        runtime_ref,
869                        use_parallel,
870                    ) {
871                        Ok(outcome) => {
872                            if let Some(pb) = analyze_spinner.take() {
873                                pb.finish_and_clear();
874                            }
875                            bytes_added += outcome.report.stored_bytes as u64;
876                            if args.json {
877                                summary_warnings
878                                    .extend(outcome.report.extraction.warnings.iter().cloned());
879                            }
880                            reports.push(outcome.report);
881                            let report_index = reports.len() - 1;
882                            if !args.json && !use_parallel {
883                                if let Some(report) = reports.get(report_index) {
884                                    print_report(report);
885                                }
886                            }
887                            if args.transcript {
888                                let notice = transcript_notice_message(&mut mem)?;
889                                if args.json {
890                                    summary_warnings.push(notice);
891                                } else {
892                                    println!("{notice}");
893                                }
894                            }
895                            if outcome.embedded {
896                                if let Some(pb) = embed_progress.as_ref() {
897                                    pb.inc(1);
898                                }
899                                embedded_docs += 1;
900                            }
901                            if use_parallel {
902                                #[cfg(feature = "parallel_segments")]
903                                if let Some(input) = outcome.parallel_input {
904                                    pending_parallel_indices.push(report_index);
905                                    pending_parallel_inputs.push(input);
906                                }
907                            } else if let Some(guard) = capacity_guard.as_mut() {
908                                mem.commit()?;
909                                let stats = mem.stats()?;
910                                guard.update_after_commit(&stats);
911                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
912                            }
913                        }
914                        Err(err) => {
915                            if let Some(pb) = analyze_spinner.take() {
916                                pb.finish_and_clear();
917                            }
918                            if err.downcast_ref::<DuplicateUriError>().is_some() {
919                                return Err(err);
920                            }
921                            warn!("skipped stdin payload: {err}");
922                            skipped += 1;
923                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
924                                capacity_reached = true;
925                            }
926                        }
927                    }
928                }
929                Err(err) => {
930                    warn!("failed to read stdin: {err}");
931                    skipped += 1;
932                }
933            }
934        }
935        InputSet::Files(ref paths) => {
936            let mut transcript_notice_printed = false;
937            for path in paths {
938                processed += 1;
939                match read_payload(Some(&path)) {
940                    Ok(payload) => {
941                        let label = path.display().to_string();
942                        let mut analyze_spinner = if args.json {
943                            None
944                        } else {
945                            Some(create_spinner(&format!("Analyzing {label}...")))
946                        };
947                        match ingest_payload(
948                            &mut mem,
949                            &args,
950                            payload,
951                            Some(&path),
952                            capacity_guard.as_mut(),
953                            embedding_enabled,
954                            runtime_ref,
955                            use_parallel,
956                        ) {
957                            Ok(outcome) => {
958                                if let Some(pb) = analyze_spinner.take() {
959                                    pb.finish_and_clear();
960                                }
961                                bytes_added += outcome.report.stored_bytes as u64;
962
963                                // Generate CLIP embedding for image files
964                                #[cfg(feature = "clip")]
965                                if let Some(ref model) = clip_model {
966                                    let mime = outcome.report.mime.as_deref();
967                                    let clip_frame_id = outcome.report.frame_id;
968
969                                    if is_image_file(&path) {
970                                        match model.encode_image_file(&path) {
971                                            Ok(embedding) => {
972                                                if let Err(e) = mem.add_clip_embedding_with_page(
973                                                    clip_frame_id,
974                                                    None,
975                                                    embedding,
976                                                ) {
977                                                    warn!(
978                                                        "Failed to add CLIP embedding for {}: {e}",
979                                                        path.display()
980                                                    );
981                                                } else {
982                                                    info!(
983                                                        "Added CLIP embedding for frame {}",
984                                                        clip_frame_id
985                                                    );
986                                                    clip_embeddings_added = true;
987                                                }
988                                            }
989                                            Err(e) => {
990                                                warn!(
991                                                    "Failed to encode CLIP embedding for {}: {e}",
992                                                    path.display()
993                                                );
994                                            }
995                                        }
996                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
997                                        // Commit before adding child frames to ensure WAL has space
998                                        if let Err(e) = mem.commit() {
999                                            warn!("Failed to commit before CLIP extraction: {e}");
1000                                        }
1001
1002                                        match render_pdf_pages_for_clip(
1003                                            &path,
1004                                            CLIP_PDF_MAX_IMAGES,
1005                                            CLIP_PDF_TARGET_PX,
1006                                        ) {
1007                                            Ok(rendered_pages) => {
1008                                                use rayon::prelude::*;
1009
1010                                                // Pre-encode all images to PNG in parallel for better performance
1011                                                // Filter out junk images (solid colors, black, too small)
1012                                                let path_for_closure = path.clone();
1013
1014                                                // Temporarily suppress panic output during image encoding
1015                                                // (some PDFs have malformed images that cause panics in the image crate)
1016                                                let prev_hook = std::panic::take_hook();
1017                                                std::panic::set_hook(Box::new(|_| {
1018                                                    // Silently ignore panics - we handle them with catch_unwind
1019                                                }));
1020
1021                                                let encoded_images: Vec<_> = rendered_pages
1022                                                    .into_par_iter()
1023                                                    .filter_map(|(page_num, image)| {
1024                                                        // Check if image is worth embedding (not junk)
1025                                                        let image_info = get_image_info(&image);
1026                                                        if !image_info.should_embed() {
1027                                                            info!(
1028                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1029                                                                path_for_closure.display(),
1030                                                                page_num,
1031                                                                image_info.color_variance,
1032                                                                image_info.width,
1033                                                                image_info.height
1034                                                            );
1035                                                            return None;
1036                                                        }
1037
1038                                                        // Convert to RGB8 first to ensure consistent buffer size
1039                                                        // (some PDFs have images with alpha or other formats that cause panics)
1040                                                        // Use catch_unwind to handle any panics from corrupted image data
1041                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1042                                                            let rgb_image = image.to_rgb8();
1043                                                            let mut png_bytes = Vec::new();
1044                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1045                                                                &mut Cursor::new(&mut png_bytes),
1046                                                                image::ImageFormat::Png,
1047                                                            ).map(|()| png_bytes)
1048                                                        }));
1049
1050                                                        match encode_result {
1051                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1052                                                            Ok(Err(e)) => {
1053                                                                info!(
1054                                                                    "Skipping image for {} page {}: {e}",
1055                                                                    path_for_closure.display(),
1056                                                                    page_num
1057                                                                );
1058                                                                None
1059                                                            }
1060                                                            Err(_) => {
1061                                                                // Panic occurred - corrupted image data, silently skip
1062                                                                info!(
1063                                                                    "Skipping malformed image for {} page {}",
1064                                                                    path_for_closure.display(),
1065                                                                    page_num
1066                                                                );
1067                                                                None
1068                                                            }
1069                                                        }
1070                                                    })
1071                                                    .collect();
1072
1073                                                // Restore the original panic hook
1074                                                std::panic::set_hook(prev_hook);
1075
1076                                                // Process encoded images sequentially (storage + CLIP encoding)
1077                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1078                                                let mut child_frame_offset = 1u64;
1079                                                let parent_uri = outcome.report.uri.clone();
1080                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1081                                                let total_images = encoded_images.len();
1082
1083                                                // Show progress bar for CLIP extraction
1084                                                let clip_pb = if !args.json && total_images > 0 {
1085                                                    let pb = ProgressBar::new(total_images as u64);
1086                                                    pb.set_style(
1087                                                        ProgressStyle::with_template(
1088                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1089                                                        )
1090                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1091                                                        .progress_chars("█▓░"),
1092                                                    );
1093                                                    Some(pb)
1094                                                } else {
1095                                                    None
1096                                                };
1097
1098                                                for (page_num, image, png_bytes) in encoded_images {
1099                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1100                                                    let child_title = format!(
1101                                                        "{} - Image {} (page {})",
1102                                                        parent_title,
1103                                                        child_frame_offset,
1104                                                        page_num
1105                                                    );
1106
1107                                                    let child_options = PutOptions::builder()
1108                                                        .uri(&child_uri)
1109                                                        .title(&child_title)
1110                                                        .parent_id(clip_frame_id)
1111                                                        .role(FrameRole::ExtractedImage)
1112                                                        .auto_tag(false)
1113                                                        .extract_dates(false)
1114                                                        .build();
1115
1116                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1117                                                        Ok(_child_seq) => {
1118                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1119                                                            child_frame_offset += 1;
1120
1121                                                            match model.encode_image(&image) {
1122                                                                Ok(embedding) => {
1123                                                                    if let Err(e) = mem
1124                                                                        .add_clip_embedding_with_page(
1125                                                                            child_frame_id,
1126                                                                            Some(page_num),
1127                                                                            embedding,
1128                                                                        )
1129                                                                    {
1130                                                                        warn!(
1131                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1132                                                                            path.display(),
1133                                                                            page_num
1134                                                                        );
1135                                                                    } else {
1136                                                                        info!(
1137                                                                            "Added CLIP image frame {} for {} page {}",
1138                                                                            child_frame_id,
1139                                                                            clip_frame_id,
1140                                                                            page_num
1141                                                                        );
1142                                                                        clip_embeddings_added = true;
1143                                                                    }
1144                                                                }
1145                                                                Err(e) => warn!(
1146                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1147                                                                    path.display(),
1148                                                                    page_num
1149                                                                ),
1150                                                            }
1151
1152                                                            // Commit after each image to checkpoint WAL
1153                                                            if let Err(e) = mem.commit() {
1154                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1155                                                            }
1156                                                        }
1157                                                        Err(e) => warn!(
1158                                                            "Failed to store image frame for {} page {}: {e}",
1159                                                            path.display(),
1160                                                            page_num
1161                                                        ),
1162                                                    }
1163
1164                                                    if let Some(ref pb) = clip_pb {
1165                                                        pb.inc(1);
1166                                                    }
1167                                                }
1168
1169                                                if let Some(pb) = clip_pb {
1170                                                    pb.finish_and_clear();
1171                                                }
1172                                            }
1173                                            Err(err) => warn!(
1174                                                "Failed to render PDF pages for CLIP {}: {err}",
1175                                                path.display()
1176                                            ),
1177                                        }
1178                                    }
1179                                }
1180
1181                                if args.json {
1182                                    summary_warnings
1183                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1184                                }
1185                                reports.push(outcome.report);
1186                                let report_index = reports.len() - 1;
1187                                if !args.json && !use_parallel {
1188                                    if let Some(report) = reports.get(report_index) {
1189                                        print_report(report);
1190                                    }
1191                                }
1192                                if args.transcript && !transcript_notice_printed {
1193                                    let notice = transcript_notice_message(&mut mem)?;
1194                                    if args.json {
1195                                        summary_warnings.push(notice);
1196                                    } else {
1197                                        println!("{notice}");
1198                                    }
1199                                    transcript_notice_printed = true;
1200                                }
1201                                if outcome.embedded {
1202                                    if let Some(pb) = embed_progress.as_ref() {
1203                                        pb.inc(1);
1204                                    }
1205                                    embedded_docs += 1;
1206                                }
1207                                if use_parallel {
1208                                    #[cfg(feature = "parallel_segments")]
1209                                    if let Some(input) = outcome.parallel_input {
1210                                        pending_parallel_indices.push(report_index);
1211                                        pending_parallel_inputs.push(input);
1212                                    }
1213                                } else if let Some(guard) = capacity_guard.as_mut() {
1214                                    mem.commit()?;
1215                                    let stats = mem.stats()?;
1216                                    guard.update_after_commit(&stats);
1217                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1218                                }
1219                            }
1220                            Err(err) => {
1221                                if let Some(pb) = analyze_spinner.take() {
1222                                    pb.finish_and_clear();
1223                                }
1224                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1225                                    return Err(err);
1226                                }
1227                                warn!("skipped {}: {err}", path.display());
1228                                skipped += 1;
1229                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1230                                    capacity_reached = true;
1231                                }
1232                            }
1233                        }
1234                    }
1235                    Err(err) => {
1236                        warn!("failed to read {}: {err}", path.display());
1237                        skipped += 1;
1238                    }
1239                }
1240                if capacity_reached {
1241                    break;
1242                }
1243            }
1244        }
1245    }
1246
1247    if capacity_reached {
1248        warn!("capacity limit reached; remaining inputs were not processed");
1249    }
1250
1251    if let Some(pb) = embed_progress.as_ref() {
1252        pb.finish_with_message("embed");
1253    }
1254
1255    if let Some(runtime) = embedding_runtime.as_ref() {
1256        if embedded_docs > 0 {
1257            match embedding_mode {
1258                EmbeddingMode::Explicit => println!(
1259                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1260                    embedded_docs,
1261                    runtime.dimension()
1262                ),
1263                EmbeddingMode::Auto => println!(
1264                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1265                    embedded_docs,
1266                    runtime.dimension()
1267                ),
1268                EmbeddingMode::None => {}
1269            }
1270        } else {
1271            match embedding_mode {
1272                EmbeddingMode::Explicit => {
1273                    println!("No embeddings were generated (no textual content available)");
1274                }
1275                EmbeddingMode::Auto => {
1276                    println!(
1277                        "Semantic runtime available, but no textual content produced embeddings"
1278                    );
1279                }
1280                EmbeddingMode::None => {}
1281            }
1282        }
1283    }
1284
1285    if reports.is_empty() {
1286        if args.json {
1287            if capacity_reached {
1288                summary_warnings.push("capacity_limit_reached".to_string());
1289            }
1290            let summary_json = json!({
1291                "processed": processed,
1292                "ingested": 0,
1293                "skipped": skipped,
1294                "bytes_added": bytes_added,
1295                "bytes_added_human": format_bytes(bytes_added),
1296                "embedded_documents": embedded_docs,
1297                "capacity_reached": capacity_reached,
1298                "warnings": summary_warnings,
1299                "reports": Vec::<serde_json::Value>::new(),
1300            });
1301            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1302        } else {
1303            println!(
1304                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1305                processed, skipped
1306            );
1307        }
1308        return Ok(());
1309    }
1310
1311    #[cfg(feature = "parallel_segments")]
1312    let mut parallel_flush_spinner = if use_parallel && !args.json {
1313        Some(create_spinner("Flushing parallel segments..."))
1314    } else {
1315        None
1316    };
1317
1318    if use_parallel {
1319        #[cfg(feature = "parallel_segments")]
1320        {
1321            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1322                let mut opts = BuildOpts::default();
1323                opts.sanitize();
1324                opts
1325            });
1326            // Set vector compression mode from mem state
1327            opts.vec_compression = mem.vector_compression().clone();
1328            let seqs = if pending_parallel_inputs.is_empty() {
1329                mem.commit_parallel(opts)?;
1330                Vec::new()
1331            } else {
1332                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1333            };
1334            if let Some(pb) = parallel_flush_spinner.take() {
1335                pb.finish_with_message("Flushed parallel segments");
1336            }
1337            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1338                if let Some(report) = reports.get_mut(idx) {
1339                    report.seq = seq;
1340                }
1341            }
1342        }
1343        #[cfg(not(feature = "parallel_segments"))]
1344        {
1345            mem.commit()?;
1346        }
1347    } else {
1348        mem.commit()?;
1349    }
1350
1351    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1352    #[cfg(feature = "clip")]
1353    if clip_embeddings_added {
1354        mem.commit()?;
1355    }
1356
1357    if !args.json && use_parallel {
1358        for report in &reports {
1359            print_report(report);
1360        }
1361    }
1362
1363    // Table extraction for PDF files
1364    let mut tables_extracted = 0usize;
1365    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1366    if args.tables && !args.no_tables {
1367        if let InputSet::Files(ref paths) = input_set {
1368            let pdf_paths: Vec<_> = paths
1369                .iter()
1370                .filter(|p| {
1371                    p.extension()
1372                        .and_then(|e| e.to_str())
1373                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1374                        .unwrap_or(false)
1375                })
1376                .collect();
1377
1378            if !pdf_paths.is_empty() {
1379                let table_spinner = if args.json {
1380                    None
1381                } else {
1382                    Some(create_spinner(&format!(
1383                        "Extracting tables from {} PDF(s)...",
1384                        pdf_paths.len()
1385                    )))
1386                };
1387
1388                // Use aggressive mode with auto fallback for best results
1389                let table_options = TableExtractionOptions::builder()
1390                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1391                    .min_rows(2)
1392                    .min_cols(2)
1393                    .min_quality(memvid_core::table::TableQuality::Low)
1394                    .merge_multi_page(true)
1395                    .build();
1396
1397                for pdf_path in pdf_paths {
1398                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1399                        let filename = pdf_path
1400                            .file_name()
1401                            .and_then(|s| s.to_str())
1402                            .unwrap_or("unknown.pdf");
1403
1404                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1405                            for table in &result.tables {
1406                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1407                                {
1408                                    tables_extracted += 1;
1409                                    table_summaries.push(json!({
1410                                        "table_id": table.table_id,
1411                                        "source_file": table.source_file,
1412                                        "meta_frame_id": meta_id,
1413                                        "rows": table.n_rows,
1414                                        "cols": table.n_cols,
1415                                        "quality": format!("{:?}", table.quality),
1416                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1417                                    }));
1418                                }
1419                            }
1420                        }
1421                    }
1422                }
1423
1424                if let Some(pb) = table_spinner {
1425                    if tables_extracted > 0 {
1426                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1427                    } else {
1428                        pb.finish_with_message("No tables found");
1429                    }
1430                }
1431
1432                // Commit table frames
1433                if tables_extracted > 0 {
1434                    mem.commit()?;
1435                }
1436            }
1437        }
1438    }
1439
1440    // Logic-Mesh: Extract entities and build relationship graph
1441    // Opt-in only: requires --logic-mesh flag
1442    #[cfg(feature = "logic_mesh")]
1443    let mut logic_mesh_entities = 0usize;
1444    #[cfg(feature = "logic_mesh")]
1445    {
1446        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1447
1448        let models_dir = config.models_dir.clone();
1449        let model_path = ner_model_path(&models_dir);
1450        let tokenizer_path = ner_tokenizer_path(&models_dir);
1451
1452        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1453        let ner_available = model_path.exists() && tokenizer_path.exists();
1454        let should_run_ner = args.logic_mesh;
1455
1456        if should_run_ner && !ner_available {
1457            // User explicitly requested --logic-mesh but model not installed
1458            if args.json {
1459                summary_warnings.push(
1460                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1461                );
1462            } else {
1463                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1464                eprintln!("   Run: memvid models install --ner distilbert-ner");
1465            }
1466        } else if should_run_ner {
1467            let ner_spinner = if args.json {
1468                None
1469            } else {
1470                Some(create_spinner("Building Logic-Mesh entity graph..."))
1471            };
1472
1473            match NerModel::load(&model_path, &tokenizer_path, None) {
1474                Ok(mut ner_model) => {
1475                    let mut mesh = LogicMesh::new();
1476                    let mut filtered_count = 0usize;
1477
1478                    // Process each ingested frame for entity extraction using cached search_text
1479                    for report in &reports {
1480                        let frame_id = report.frame_id;
1481                        // Use the cached search_text from ingestion
1482                        if let Some(ref content) = report.search_text {
1483                            if !content.is_empty() {
1484                                match ner_model.extract(content) {
1485                                    Ok(raw_entities) => {
1486                                        // Merge adjacent entities that were split by tokenization
1487                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1488                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1489
1490                                        for entity in &merged_entities {
1491                                            // Apply post-processing filter to remove noisy entities
1492                                            if !is_valid_entity(&entity.text, entity.confidence) {
1493                                                tracing::debug!(
1494                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1495                                                    entity.text, entity.confidence * 100.0
1496                                                );
1497                                                filtered_count += 1;
1498                                                continue;
1499                                            }
1500
1501                                            // Convert NER entity type to EntityKind
1502                                            let kind = entity.to_entity_kind();
1503                                            // Create mesh node with proper structure
1504                                            let node = MeshNode::new(
1505                                                entity.text.to_lowercase(),
1506                                                entity.text.trim().to_string(),
1507                                                kind,
1508                                                entity.confidence,
1509                                                frame_id,
1510                                                entity.byte_start as u32,
1511                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1512                                            );
1513                                            mesh.merge_node(node);
1514                                            logic_mesh_entities += 1;
1515                                        }
1516                                    }
1517                                    Err(e) => {
1518                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1519                                    }
1520                                }
1521                            }
1522                        }
1523                    }
1524
1525                    if logic_mesh_entities > 0 {
1526                        mesh.finalize();
1527                        let node_count = mesh.stats().node_count;
1528                        // Persist mesh to MV2 file
1529                        mem.set_logic_mesh(mesh);
1530                        mem.commit()?;
1531                        info!(
1532                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1533                            logic_mesh_entities,
1534                            node_count,
1535                            filtered_count
1536                        );
1537                    }
1538
1539                    if let Some(pb) = ner_spinner {
1540                        pb.finish_with_message(format!(
1541                            "Logic-Mesh: {} entities ({} unique)",
1542                            logic_mesh_entities,
1543                            mem.mesh_node_count()
1544                        ));
1545                    }
1546                }
1547                Err(e) => {
1548                    if let Some(pb) = ner_spinner {
1549                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1550                    }
1551                    if args.json {
1552                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1553                    }
1554                }
1555            }
1556        }
1557    }
1558
1559    let stats = mem.stats()?;
1560    if args.json {
1561        if capacity_reached {
1562            summary_warnings.push("capacity_limit_reached".to_string());
1563        }
1564        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1565        let total_duration_ms: Option<u64> = {
1566            let sum: u64 = reports
1567                .iter()
1568                .filter_map(|r| r.extraction.duration_ms)
1569                .sum();
1570            if sum > 0 {
1571                Some(sum)
1572            } else {
1573                None
1574            }
1575        };
1576        let total_pages: Option<u32> = {
1577            let sum: u32 = reports
1578                .iter()
1579                .filter_map(|r| r.extraction.pages_processed)
1580                .sum();
1581            if sum > 0 {
1582                Some(sum)
1583            } else {
1584                None
1585            }
1586        };
1587        let embedding_mode_str = match embedding_mode {
1588            EmbeddingMode::None => "none",
1589            EmbeddingMode::Auto => "auto",
1590            EmbeddingMode::Explicit => "explicit",
1591        };
1592        let summary_json = json!({
1593            "processed": processed,
1594            "ingested": reports.len(),
1595            "skipped": skipped,
1596            "bytes_added": bytes_added,
1597            "bytes_added_human": format_bytes(bytes_added),
1598            "embedded_documents": embedded_docs,
1599            "embedding": {
1600                "enabled": embedding_enabled,
1601                "mode": embedding_mode_str,
1602                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1603            },
1604            "tables": {
1605                "enabled": args.tables && !args.no_tables,
1606                "extracted": tables_extracted,
1607                "tables": table_summaries,
1608            },
1609            "capacity_reached": capacity_reached,
1610            "warnings": summary_warnings,
1611            "extraction": {
1612                "total_duration_ms": total_duration_ms,
1613                "total_pages_processed": total_pages,
1614            },
1615            "memory": {
1616                "path": args.file.display().to_string(),
1617                "frame_count": stats.frame_count,
1618                "size_bytes": stats.size_bytes,
1619                "tier": format!("{:?}", stats.tier),
1620            },
1621            "reports": reports_json,
1622        });
1623        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1624    } else {
1625        println!(
1626            "Committed {} frame(s); total frames {}",
1627            reports.len(),
1628            stats.frame_count
1629        );
1630        println!(
1631            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1632            processed,
1633            reports.len(),
1634            skipped,
1635            format_bytes(bytes_added)
1636        );
1637        if tables_extracted > 0 {
1638            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1639        }
1640    }
1641    Ok(())
1642}
1643
1644pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1645    let mut mem = Memvid::open(&args.file)?;
1646    ensure_cli_mutation_allowed(&mem)?;
1647    apply_lock_cli(&mut mem, &args.lock);
1648    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1649    let frame_id = existing.id;
1650
1651    let payload_bytes = match args.input.as_ref() {
1652        Some(path) => Some(read_payload(Some(path))?),
1653        None => None,
1654    };
1655    let payload_slice = payload_bytes.as_deref();
1656    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1657    let source_path = args.input.as_deref();
1658
1659    let mut options = PutOptions::default();
1660    options.enable_embedding = false;
1661    options.auto_tag = payload_slice.is_some();
1662    options.extract_dates = payload_slice.is_some();
1663    options.timestamp = Some(existing.timestamp);
1664    options.track = existing.track.clone();
1665    options.kind = existing.kind.clone();
1666    options.uri = existing.uri.clone();
1667    options.title = existing.title.clone();
1668    options.metadata = existing.metadata.clone();
1669    options.search_text = existing.search_text.clone();
1670    options.tags = existing.tags.clone();
1671    options.labels = existing.labels.clone();
1672    options.extra_metadata = existing.extra_metadata.clone();
1673
1674    if let Some(new_uri) = &args.set_uri {
1675        options.uri = Some(derive_uri(Some(new_uri), None));
1676    }
1677
1678    if options.uri.is_none() && payload_slice.is_some() {
1679        options.uri = Some(derive_uri(None, source_path));
1680    }
1681
1682    if let Some(title) = &args.title {
1683        options.title = Some(title.clone());
1684    }
1685    if let Some(ts) = args.timestamp {
1686        options.timestamp = Some(ts);
1687    }
1688    if let Some(track) = &args.track {
1689        options.track = Some(track.clone());
1690    }
1691    if let Some(kind) = &args.kind {
1692        options.kind = Some(kind.clone());
1693    }
1694
1695    if !args.labels.is_empty() {
1696        options.labels = args.labels.clone();
1697    }
1698
1699    if !args.tags.is_empty() {
1700        options.tags.clear();
1701        for (key, value) in &args.tags {
1702            if !key.is_empty() {
1703                options.tags.push(key.clone());
1704            }
1705            if !value.is_empty() && value != key {
1706                options.tags.push(value.clone());
1707            }
1708            options.extra_metadata.insert(key.clone(), value.clone());
1709        }
1710    }
1711
1712    apply_metadata_overrides(&mut options, &args.metadata);
1713
1714    let mut search_source = options.search_text.clone();
1715
1716    if let Some(payload) = payload_slice {
1717        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1718        if let Some(title) = inferred_title {
1719            options.title = Some(title);
1720        }
1721
1722        if options.uri.is_none() {
1723            options.uri = Some(derive_uri(None, source_path));
1724        }
1725
1726        let analysis = analyze_file(
1727            source_path,
1728            payload,
1729            payload_utf8,
1730            options.title.as_deref(),
1731            AudioAnalyzeOptions::default(),
1732            false,
1733        );
1734        if let Some(meta) = analysis.metadata {
1735            options.metadata = Some(meta);
1736        }
1737        if let Some(text) = analysis.search_text {
1738            search_source = Some(text.clone());
1739            options.search_text = Some(text);
1740        }
1741    } else {
1742        options.auto_tag = false;
1743        options.extract_dates = false;
1744    }
1745
1746    let stats = mem.stats()?;
1747    options.enable_embedding = stats.has_vec_index;
1748
1749    // Auto-detect embedding model from existing MV2 dimension
1750    let mv2_dimension = mem.vec_index_dimension();
1751    let mut embedding: Option<Vec<f32>> = None;
1752    if args.embeddings {
1753        let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1754        if let Some(text) = search_source.as_ref() {
1755            if !text.trim().is_empty() {
1756                embedding = Some(runtime.embed(text)?);
1757            }
1758        }
1759        if embedding.is_none() {
1760            if let Some(text) = payload_utf8 {
1761                if !text.trim().is_empty() {
1762                    embedding = Some(runtime.embed(text)?);
1763                }
1764            }
1765        }
1766        if embedding.is_none() {
1767            warn!("no textual content available; embeddings not recomputed");
1768        }
1769    }
1770
1771    let mut effective_embedding = embedding;
1772    if effective_embedding.is_none() && stats.has_vec_index {
1773        effective_embedding = mem.frame_embedding(frame_id)?;
1774    }
1775
1776    let final_uri = options.uri.clone();
1777    let replaced_payload = payload_slice.is_some();
1778    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1779    mem.commit()?;
1780
1781    let updated_frame =
1782        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1783            mem.frame_by_uri(&uri)?
1784        } else {
1785            let mut query = TimelineQueryBuilder::default();
1786            if let Some(limit) = NonZeroU64::new(1) {
1787                query = query.limit(limit).reverse(true);
1788            }
1789            let latest = mem.timeline(query.build())?;
1790            if let Some(entry) = latest.first() {
1791                mem.frame_by_id(entry.frame_id)?
1792            } else {
1793                mem.frame_by_id(frame_id)?
1794            }
1795        };
1796
1797    if args.json {
1798        let json = json!({
1799            "sequence": seq,
1800            "previous_frame_id": frame_id,
1801            "frame": frame_to_json(&updated_frame),
1802            "replaced_payload": replaced_payload,
1803        });
1804        println!("{}", serde_json::to_string_pretty(&json)?);
1805    } else {
1806        println!(
1807            "Updated frame {} → new frame {} (seq {})",
1808            frame_id, updated_frame.id, seq
1809        );
1810        println!(
1811            "Payload: {}",
1812            if replaced_payload {
1813                "replaced"
1814            } else {
1815                "reused"
1816            }
1817        );
1818        print_frame_summary(&mut mem, &updated_frame)?;
1819    }
1820
1821    Ok(())
1822}
1823
1824fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1825    if let Some(uri) = provided {
1826        let sanitized = sanitize_uri(uri, true);
1827        return format!("mv2://{}", sanitized);
1828    }
1829
1830    if let Some(path) = source {
1831        let raw = path.to_string_lossy();
1832        let sanitized = sanitize_uri(&raw, false);
1833        return format!("mv2://{}", sanitized);
1834    }
1835
1836    format!("mv2://frames/{}", Uuid::new_v4())
1837}
1838
1839pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1840    let digest = hash(payload).to_hex();
1841    let short = &digest[..32];
1842    let ext_from_path = source
1843        .and_then(|path| path.extension())
1844        .and_then(|ext| ext.to_str())
1845        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1846        .filter(|ext| !ext.is_empty());
1847    let ext = ext_from_path
1848        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1849        .unwrap_or_else(|| "bin".to_string());
1850    format!("mv2://video/{short}.{ext}")
1851}
1852
1853fn derive_title(
1854    provided: Option<String>,
1855    source: Option<&Path>,
1856    payload_utf8: Option<&str>,
1857) -> Option<String> {
1858    if let Some(title) = provided {
1859        let trimmed = title.trim();
1860        if trimmed.is_empty() {
1861            None
1862        } else {
1863            Some(trimmed.to_string())
1864        }
1865    } else {
1866        if let Some(text) = payload_utf8 {
1867            if let Some(markdown_title) = extract_markdown_title(text) {
1868                return Some(markdown_title);
1869            }
1870        }
1871        source
1872            .and_then(|path| path.file_stem())
1873            .and_then(|stem| stem.to_str())
1874            .map(to_title_case)
1875    }
1876}
1877
1878fn extract_markdown_title(text: &str) -> Option<String> {
1879    for line in text.lines() {
1880        let trimmed = line.trim();
1881        if trimmed.starts_with('#') {
1882            let title = trimmed.trim_start_matches('#').trim();
1883            if !title.is_empty() {
1884                return Some(title.to_string());
1885            }
1886        }
1887    }
1888    None
1889}
1890
1891fn to_title_case(input: &str) -> String {
1892    if input.is_empty() {
1893        return String::new();
1894    }
1895    let mut chars = input.chars();
1896    let Some(first) = chars.next() else {
1897        return String::new();
1898    };
1899    let prefix = first.to_uppercase().collect::<String>();
1900    prefix + chars.as_str()
1901}
1902
1903fn should_skip_input(path: &Path) -> bool {
1904    matches!(
1905        path.file_name().and_then(|name| name.to_str()),
1906        Some(".DS_Store")
1907    )
1908}
1909
1910fn should_skip_dir(path: &Path) -> bool {
1911    matches!(
1912        path.file_name().and_then(|name| name.to_str()),
1913        Some(name) if name.starts_with('.')
1914    )
1915}
1916
1917enum InputSet {
1918    Stdin,
1919    Files(Vec<PathBuf>),
1920}
1921
1922/// Check if a file is an image based on extension
1923#[cfg(feature = "clip")]
1924fn is_image_file(path: &std::path::Path) -> bool {
1925    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
1926        return false;
1927    };
1928    matches!(
1929        ext.to_ascii_lowercase().as_str(),
1930        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
1931    )
1932}
1933
1934fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1935    let Some(raw) = input else {
1936        return Ok(InputSet::Stdin);
1937    };
1938
1939    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1940        let mut files = Vec::new();
1941        let mut matched_any = false;
1942        for entry in glob(raw)? {
1943            let path = entry?;
1944            if path.is_file() {
1945                matched_any = true;
1946                if should_skip_input(&path) {
1947                    continue;
1948                }
1949                files.push(path);
1950            }
1951        }
1952        files.sort();
1953        if files.is_empty() {
1954            if matched_any {
1955                anyhow::bail!(
1956                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1957                );
1958            }
1959            anyhow::bail!("pattern '{raw}' did not match any files");
1960        }
1961        return Ok(InputSet::Files(files));
1962    }
1963
1964    let path = PathBuf::from(raw);
1965    if path.is_dir() {
1966        let (mut files, skipped_any) = collect_directory_files(&path)?;
1967        files.sort();
1968        if files.is_empty() {
1969            if skipped_any {
1970                anyhow::bail!(
1971                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1972                    path.display()
1973                );
1974            }
1975            anyhow::bail!(
1976                "directory '{}' contains no ingestible files",
1977                path.display()
1978            );
1979        }
1980        Ok(InputSet::Files(files))
1981    } else {
1982        Ok(InputSet::Files(vec![path]))
1983    }
1984}
1985
1986fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1987    let mut files = Vec::new();
1988    let mut skipped_any = false;
1989    let mut stack = vec![root.to_path_buf()];
1990    while let Some(dir) = stack.pop() {
1991        for entry in fs::read_dir(&dir)? {
1992            let entry = entry?;
1993            let path = entry.path();
1994            let file_type = entry.file_type()?;
1995            if file_type.is_dir() {
1996                if should_skip_dir(&path) {
1997                    skipped_any = true;
1998                    continue;
1999                }
2000                stack.push(path);
2001            } else if file_type.is_file() {
2002                if should_skip_input(&path) {
2003                    skipped_any = true;
2004                    continue;
2005                }
2006                files.push(path);
2007            }
2008        }
2009    }
2010    Ok((files, skipped_any))
2011}
2012
2013struct IngestOutcome {
2014    report: PutReport,
2015    embedded: bool,
2016    #[cfg(feature = "parallel_segments")]
2017    parallel_input: Option<ParallelInput>,
2018}
2019
2020struct PutReport {
2021    seq: u64,
2022    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2023    frame_id: u64,
2024    uri: String,
2025    title: Option<String>,
2026    original_bytes: usize,
2027    stored_bytes: usize,
2028    compressed: bool,
2029    source: Option<PathBuf>,
2030    mime: Option<String>,
2031    metadata: Option<DocMetadata>,
2032    extraction: ExtractionSummary,
2033    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2034    #[cfg(feature = "logic_mesh")]
2035    search_text: Option<String>,
2036}
2037
2038struct CapacityGuard {
2039    #[allow(dead_code)]
2040    tier: Tier,
2041    capacity: u64,
2042    current_size: u64,
2043}
2044
2045impl CapacityGuard {
2046    const MIN_OVERHEAD: u64 = 128 * 1024;
2047    const RELATIVE_OVERHEAD: f64 = 0.05;
2048
2049    fn from_stats(stats: &Stats) -> Option<Self> {
2050        if stats.capacity_bytes == 0 {
2051            return None;
2052        }
2053        Some(Self {
2054            tier: stats.tier,
2055            capacity: stats.capacity_bytes,
2056            current_size: stats.size_bytes,
2057        })
2058    }
2059
2060    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2061        let projected = self
2062            .current_size
2063            .saturating_add(additional)
2064            .saturating_add(Self::estimate_overhead(additional));
2065        if projected > self.capacity {
2066            return Err(map_put_error(
2067                MemvidError::CapacityExceeded {
2068                    current: self.current_size,
2069                    limit: self.capacity,
2070                    required: projected,
2071                },
2072                Some(self.capacity),
2073            ));
2074        }
2075        Ok(())
2076    }
2077
2078    fn update_after_commit(&mut self, stats: &Stats) {
2079        self.current_size = stats.size_bytes;
2080    }
2081
2082    fn capacity_hint(&self) -> Option<u64> {
2083        Some(self.capacity)
2084    }
2085
2086    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2087    fn check_and_warn_capacity(&self) {
2088        if self.capacity == 0 {
2089            return;
2090        }
2091
2092        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2093
2094        // Warn at key thresholds
2095        if usage_pct >= 90.0 && usage_pct < 91.0 {
2096            eprintln!(
2097                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2098                format_bytes(self.current_size),
2099                format_bytes(self.capacity)
2100            );
2101            eprintln!("   Actions:");
2102            eprintln!("   1. Recreate with larger --size");
2103            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2104            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2105        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2106            eprintln!(
2107                "⚠️  WARNING: 75% capacity used ({} / {})",
2108                format_bytes(self.current_size),
2109                format_bytes(self.capacity)
2110            );
2111            eprintln!("   Consider planning for capacity expansion soon");
2112        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2113            eprintln!(
2114                "ℹ️  INFO: 50% capacity used ({} / {})",
2115                format_bytes(self.current_size),
2116                format_bytes(self.capacity)
2117            );
2118        }
2119    }
2120
2121    fn estimate_overhead(additional: u64) -> u64 {
2122        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2123        fractional.max(Self::MIN_OVERHEAD)
2124    }
2125}
2126
2127#[derive(Debug, Clone)]
2128pub struct FileAnalysis {
2129    pub mime: String,
2130    pub metadata: Option<DocMetadata>,
2131    pub search_text: Option<String>,
2132    pub extraction: ExtractionSummary,
2133}
2134
2135#[derive(Clone, Copy)]
2136pub struct AudioAnalyzeOptions {
2137    force: bool,
2138    segment_secs: u32,
2139}
2140
2141impl AudioAnalyzeOptions {
2142    const DEFAULT_SEGMENT_SECS: u32 = 30;
2143
2144    fn normalised_segment_secs(self) -> u32 {
2145        self.segment_secs.max(1)
2146    }
2147}
2148
2149impl Default for AudioAnalyzeOptions {
2150    fn default() -> Self {
2151        Self {
2152            force: false,
2153            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2154        }
2155    }
2156}
2157
2158struct AudioAnalysis {
2159    metadata: DocAudioMetadata,
2160    caption: Option<String>,
2161    search_terms: Vec<String>,
2162}
2163
2164struct ImageAnalysis {
2165    width: u32,
2166    height: u32,
2167    palette: Vec<String>,
2168    caption: Option<String>,
2169    exif: Option<DocExifMetadata>,
2170}
2171
2172#[derive(Debug, Clone)]
2173pub struct ExtractionSummary {
2174    reader: Option<String>,
2175    status: ExtractionStatus,
2176    warnings: Vec<String>,
2177    duration_ms: Option<u64>,
2178    pages_processed: Option<u32>,
2179}
2180
2181impl ExtractionSummary {
2182    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2183        self.warnings.push(warning.into());
2184    }
2185}
2186
2187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2188enum ExtractionStatus {
2189    Skipped,
2190    Ok,
2191    FallbackUsed,
2192    Empty,
2193    Failed,
2194}
2195
2196impl ExtractionStatus {
2197    fn label(self) -> &'static str {
2198        match self {
2199            Self::Skipped => "skipped",
2200            Self::Ok => "ok",
2201            Self::FallbackUsed => "fallback_used",
2202            Self::Empty => "empty",
2203            Self::Failed => "failed",
2204        }
2205    }
2206}
2207
2208fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2209    let filename = source_path
2210        .and_then(|path| path.file_name())
2211        .and_then(|name| name.to_str())
2212        .map(|value| value.to_string());
2213    MediaManifest {
2214        kind: "video".to_string(),
2215        mime: mime.to_string(),
2216        bytes,
2217        filename,
2218        duration_ms: None,
2219        width: None,
2220        height: None,
2221        codec: None,
2222    }
2223}
2224
2225pub fn analyze_file(
2226    source_path: Option<&Path>,
2227    payload: &[u8],
2228    payload_utf8: Option<&str>,
2229    inferred_title: Option<&str>,
2230    audio_opts: AudioAnalyzeOptions,
2231    force_video: bool,
2232) -> FileAnalysis {
2233    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2234    let is_video = force_video || mime.starts_with("video/");
2235    let treat_as_text = if is_video { false } else { treat_as_text_base };
2236
2237    let mut metadata = DocMetadata::default();
2238    metadata.mime = Some(mime.clone());
2239    metadata.bytes = Some(payload.len() as u64);
2240    metadata.hash = Some(hash(payload).to_hex().to_string());
2241
2242    let mut search_text = None;
2243
2244    let mut extraction = ExtractionSummary {
2245        reader: None,
2246        status: ExtractionStatus::Skipped,
2247        warnings: Vec::new(),
2248        duration_ms: None,
2249        pages_processed: None,
2250    };
2251
2252    let reader_registry = default_reader_registry();
2253    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2254        if slice.is_empty() {
2255            None
2256        } else {
2257            Some(slice)
2258        }
2259    });
2260
2261    let apply_extracted_text = |text: &str,
2262                                reader_label: &str,
2263                                status_if_applied: ExtractionStatus,
2264                                extraction: &mut ExtractionSummary,
2265                                metadata: &mut DocMetadata,
2266                                search_text: &mut Option<String>|
2267     -> bool {
2268        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2269            let mut value = normalized.text;
2270            if normalized.truncated {
2271                value.push('…');
2272            }
2273            if metadata.caption.is_none() {
2274                if let Some(caption) = caption_from_text(&value) {
2275                    metadata.caption = Some(caption);
2276                }
2277            }
2278            *search_text = Some(value);
2279            extraction.reader = Some(reader_label.to_string());
2280            extraction.status = status_if_applied;
2281            true
2282        } else {
2283            false
2284        }
2285    };
2286
2287    if is_video {
2288        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2289    }
2290
2291    if treat_as_text {
2292        if let Some(text) = payload_utf8 {
2293            if !apply_extracted_text(
2294                text,
2295                "bytes",
2296                ExtractionStatus::Ok,
2297                &mut extraction,
2298                &mut metadata,
2299                &mut search_text,
2300            ) {
2301                extraction.status = ExtractionStatus::Empty;
2302                extraction.reader = Some("bytes".to_string());
2303                let msg = "text payload contained no searchable content after normalization";
2304                extraction.record_warning(msg);
2305                warn!("{msg}");
2306            }
2307        } else {
2308            extraction.reader = Some("bytes".to_string());
2309            extraction.status = ExtractionStatus::Failed;
2310            let msg = "payload reported as text but was not valid UTF-8";
2311            extraction.record_warning(msg);
2312            warn!("{msg}");
2313        }
2314    } else if mime == "application/pdf" {
2315        let mut applied = false;
2316
2317        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2318        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2319            .with_uri(source_path.and_then(|p| p.to_str()))
2320            .with_magic(magic);
2321
2322        if let Some(reader) = reader_registry.find_reader(&hint) {
2323            match reader.extract(payload, &hint) {
2324                Ok(output) => {
2325                    extraction.reader = Some(output.reader_name.clone());
2326                    if let Some(ms) = output.diagnostics.duration_ms {
2327                        extraction.duration_ms = Some(ms);
2328                    }
2329                    if let Some(pages) = output.diagnostics.pages_processed {
2330                        extraction.pages_processed = Some(pages);
2331                    }
2332                    if let Some(mime_type) = output.document.mime_type.clone() {
2333                        metadata.mime = Some(mime_type);
2334                    }
2335
2336                    for warning in output.diagnostics.warnings.iter() {
2337                        extraction.record_warning(warning);
2338                        warn!("{warning}");
2339                    }
2340
2341                    let status = if output.diagnostics.fallback {
2342                        ExtractionStatus::FallbackUsed
2343                    } else {
2344                        ExtractionStatus::Ok
2345                    };
2346
2347                    if let Some(doc_text) = output.document.text.as_ref() {
2348                        applied = apply_extracted_text(
2349                            doc_text,
2350                            output.reader_name.as_str(),
2351                            status,
2352                            &mut extraction,
2353                            &mut metadata,
2354                            &mut search_text,
2355                        );
2356                        if !applied {
2357                            extraction.reader = Some(output.reader_name);
2358                            extraction.status = ExtractionStatus::Empty;
2359                            let msg = "primary reader returned no usable text";
2360                            extraction.record_warning(msg);
2361                            warn!("{msg}");
2362                        }
2363                    } else {
2364                        extraction.reader = Some(output.reader_name);
2365                        extraction.status = ExtractionStatus::Empty;
2366                        let msg = "primary reader returned empty text";
2367                        extraction.record_warning(msg);
2368                        warn!("{msg}");
2369                    }
2370                }
2371                Err(err) => {
2372                    let name = reader.name();
2373                    extraction.reader = Some(name.to_string());
2374                    extraction.status = ExtractionStatus::Failed;
2375                    let msg = format!("primary reader {name} failed: {err}");
2376                    extraction.record_warning(&msg);
2377                    warn!("{msg}");
2378                }
2379            }
2380        } else {
2381            extraction.record_warning("no registered reader matched this PDF payload");
2382            warn!("no registered reader matched this PDF payload");
2383        }
2384
2385        if !applied {
2386            match extract_pdf_text(payload) {
2387                Ok(pdf_text) => {
2388                    if !apply_extracted_text(
2389                        &pdf_text,
2390                        "pdf_extract",
2391                        ExtractionStatus::FallbackUsed,
2392                        &mut extraction,
2393                        &mut metadata,
2394                        &mut search_text,
2395                    ) {
2396                        extraction.reader = Some("pdf_extract".to_string());
2397                        extraction.status = ExtractionStatus::Empty;
2398                        let msg = "PDF text extraction yielded no searchable content";
2399                        extraction.record_warning(msg);
2400                        warn!("{msg}");
2401                    }
2402                }
2403                Err(err) => {
2404                    extraction.reader = Some("pdf_extract".to_string());
2405                    extraction.status = ExtractionStatus::Failed;
2406                    let msg = format!("failed to extract PDF text via fallback: {err}");
2407                    extraction.record_warning(&msg);
2408                    warn!("{msg}");
2409                }
2410            }
2411        }
2412    }
2413
2414    if !is_video && mime.starts_with("image/") {
2415        if let Some(image_meta) = analyze_image(payload) {
2416            let ImageAnalysis {
2417                width,
2418                height,
2419                palette,
2420                caption,
2421                exif,
2422            } = image_meta;
2423            metadata.width = Some(width);
2424            metadata.height = Some(height);
2425            if !palette.is_empty() {
2426                metadata.colors = Some(palette);
2427            }
2428            if metadata.caption.is_none() {
2429                metadata.caption = caption;
2430            }
2431            if let Some(exif) = exif {
2432                metadata.exif = Some(exif);
2433            }
2434        }
2435    }
2436
2437    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2438        if let Some(AudioAnalysis {
2439            metadata: audio_metadata,
2440            caption: audio_caption,
2441            mut search_terms,
2442        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2443        {
2444            if metadata.audio.is_none()
2445                || metadata
2446                    .audio
2447                    .as_ref()
2448                    .map_or(true, DocAudioMetadata::is_empty)
2449            {
2450                metadata.audio = Some(audio_metadata);
2451            }
2452            if metadata.caption.is_none() {
2453                metadata.caption = audio_caption;
2454            }
2455            if !search_terms.is_empty() {
2456                match &mut search_text {
2457                    Some(existing) => {
2458                        for term in search_terms.drain(..) {
2459                            if !existing.contains(&term) {
2460                                if !existing.ends_with(' ') {
2461                                    existing.push(' ');
2462                                }
2463                                existing.push_str(&term);
2464                            }
2465                        }
2466                    }
2467                    None => {
2468                        search_text = Some(search_terms.join(" "));
2469                    }
2470                }
2471            }
2472        }
2473    }
2474
2475    if metadata.caption.is_none() {
2476        if let Some(title) = inferred_title {
2477            let trimmed = title.trim();
2478            if !trimmed.is_empty() {
2479                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2480            }
2481        }
2482    }
2483
2484    FileAnalysis {
2485        mime,
2486        metadata: if metadata.is_empty() {
2487            None
2488        } else {
2489            Some(metadata)
2490        },
2491        search_text,
2492        extraction,
2493    }
2494}
2495
2496fn default_reader_registry() -> &'static ReaderRegistry {
2497    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2498    REGISTRY.get_or_init(ReaderRegistry::default)
2499}
2500
2501fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2502    if detect_pdf_magic(magic) {
2503        return Some(DocumentFormat::Pdf);
2504    }
2505
2506    let mime = mime?.trim().to_ascii_lowercase();
2507    match mime.as_str() {
2508        "application/pdf" => Some(DocumentFormat::Pdf),
2509        "text/plain" => Some(DocumentFormat::PlainText),
2510        "text/markdown" => Some(DocumentFormat::Markdown),
2511        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2512        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2513            Some(DocumentFormat::Docx)
2514        }
2515        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2516            Some(DocumentFormat::Pptx)
2517        }
2518        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2519            Some(DocumentFormat::Xlsx)
2520        }
2521        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2522        _ => None,
2523    }
2524}
2525
2526fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2527    let mut slice = match magic {
2528        Some(slice) if !slice.is_empty() => slice,
2529        _ => return false,
2530    };
2531    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2532        slice = &slice[3..];
2533    }
2534    while let Some((first, rest)) = slice.split_first() {
2535        if first.is_ascii_whitespace() {
2536            slice = rest;
2537        } else {
2538            break;
2539        }
2540    }
2541    slice.starts_with(b"%PDF")
2542}
2543
2544fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2545    match pdf_extract::extract_text_from_mem(payload) {
2546        Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2547            Ok(fallback) => Ok(fallback),
2548            Err(err) => {
2549                warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2550                Ok(text)
2551            }
2552        },
2553        Err(err) => {
2554            warn!("pdf_extract failed: {err}");
2555            match fallback_pdftotext(payload) {
2556                Ok(fallback) => Ok(fallback),
2557                Err(fallback_err) => {
2558                    warn!("pdftotext fallback failed: {fallback_err}");
2559                    Err(err)
2560                }
2561            }
2562        }
2563        other => other,
2564    }
2565}
2566
2567fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2568    use std::io::Write;
2569    use std::process::{Command, Stdio};
2570
2571    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2572
2573    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2574    temp_pdf
2575        .write_all(payload)
2576        .context("failed to write pdf payload to temp file")?;
2577    temp_pdf.flush().context("failed to flush temp pdf file")?;
2578
2579    let child = Command::new(pdftotext)
2580        .arg("-layout")
2581        .arg(temp_pdf.path())
2582        .arg("-")
2583        .stdout(Stdio::piped())
2584        .spawn()
2585        .context("failed to spawn pdftotext")?;
2586
2587    let output = child
2588        .wait_with_output()
2589        .context("failed to read pdftotext output")?;
2590
2591    if !output.status.success() {
2592        return Err(anyhow!("pdftotext exited with status {}", output.status));
2593    }
2594
2595    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2596    Ok(text)
2597}
2598
2599fn detect_mime(
2600    source_path: Option<&Path>,
2601    payload: &[u8],
2602    payload_utf8: Option<&str>,
2603) -> (String, bool) {
2604    if let Some(kind) = infer::get(payload) {
2605        let mime = kind.mime_type().to_string();
2606        let treat_as_text = mime.starts_with("text/")
2607            || matches!(
2608                mime.as_str(),
2609                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2610            );
2611        return (mime, treat_as_text);
2612    }
2613
2614    let magic = magic_from_u8(payload);
2615    if !magic.is_empty() && magic != "application/octet-stream" {
2616        let treat_as_text = magic.starts_with("text/")
2617            || matches!(
2618                magic,
2619                "application/json"
2620                    | "application/xml"
2621                    | "application/javascript"
2622                    | "image/svg+xml"
2623                    | "text/plain"
2624            );
2625        return (magic.to_string(), treat_as_text);
2626    }
2627
2628    if let Some(path) = source_path {
2629        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2630            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2631                return (mime.to_string(), treat_as_text);
2632            }
2633        }
2634    }
2635
2636    if payload_utf8.is_some() {
2637        return ("text/plain".to_string(), true);
2638    }
2639
2640    ("application/octet-stream".to_string(), false)
2641}
2642
2643fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2644    let ext_lower = ext.to_ascii_lowercase();
2645    match ext_lower.as_str() {
2646        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2647        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2648        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2649        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2650        "md" | "markdown" => Some(("text/markdown", true)),
2651        "rst" => Some(("text/x-rst", true)),
2652        "json" => Some(("application/json", true)),
2653        "csv" => Some(("text/csv", true)),
2654        "tsv" => Some(("text/tab-separated-values", true)),
2655        "yaml" | "yml" => Some(("application/yaml", true)),
2656        "toml" => Some(("application/toml", true)),
2657        "html" | "htm" => Some(("text/html", true)),
2658        "xml" => Some(("application/xml", true)),
2659        "svg" => Some(("image/svg+xml", true)),
2660        "gif" => Some(("image/gif", false)),
2661        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2662        "png" => Some(("image/png", false)),
2663        "bmp" => Some(("image/bmp", false)),
2664        "ico" => Some(("image/x-icon", false)),
2665        "tif" | "tiff" => Some(("image/tiff", false)),
2666        "webp" => Some(("image/webp", false)),
2667        _ => None,
2668    }
2669}
2670
2671fn caption_from_text(text: &str) -> Option<String> {
2672    for line in text.lines() {
2673        let trimmed = line.trim();
2674        if !trimmed.is_empty() {
2675            return Some(truncate_to_boundary(trimmed, 240));
2676        }
2677    }
2678    None
2679}
2680
2681fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2682    if text.len() <= max_len {
2683        return text.to_string();
2684    }
2685    let mut end = max_len;
2686    while end > 0 && !text.is_char_boundary(end) {
2687        end -= 1;
2688    }
2689    if end == 0 {
2690        return String::new();
2691    }
2692    let mut truncated = text[..end].trim_end().to_string();
2693    truncated.push('…');
2694    truncated
2695}
2696
2697fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2698    let reader = ImageReader::new(Cursor::new(payload))
2699        .with_guessed_format()
2700        .map_err(|err| warn!("failed to guess image format: {err}"))
2701        .ok()?;
2702    let image = reader
2703        .decode()
2704        .map_err(|err| warn!("failed to decode image: {err}"))
2705        .ok()?;
2706    let width = image.width();
2707    let height = image.height();
2708    let rgb = image.to_rgb8();
2709    let palette = match get_palette(
2710        rgb.as_raw(),
2711        ColorFormat::Rgb,
2712        COLOR_PALETTE_SIZE,
2713        COLOR_PALETTE_QUALITY,
2714    ) {
2715        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2716        Err(err) => {
2717            warn!("failed to compute colour palette: {err}");
2718            Vec::new()
2719        }
2720    };
2721
2722    let (exif, exif_caption) = extract_exif_metadata(payload);
2723
2724    Some(ImageAnalysis {
2725        width,
2726        height,
2727        palette,
2728        caption: exif_caption,
2729        exif,
2730    })
2731}
2732
2733fn color_to_hex(color: Color) -> String {
2734    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2735}
2736
2737fn analyze_audio(
2738    payload: &[u8],
2739    source_path: Option<&Path>,
2740    mime: &str,
2741    options: AudioAnalyzeOptions,
2742) -> Option<AudioAnalysis> {
2743    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2744    use symphonia::core::errors::Error as SymphoniaError;
2745    use symphonia::core::formats::FormatOptions;
2746    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2747    use symphonia::core::meta::MetadataOptions;
2748    use symphonia::core::probe::Hint;
2749
2750    let cursor = Cursor::new(payload.to_vec());
2751    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2752
2753    let mut hint = Hint::new();
2754    if let Some(path) = source_path {
2755        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2756            hint.with_extension(ext);
2757        }
2758    }
2759    if let Some(suffix) = mime.split('/').nth(1) {
2760        hint.with_extension(suffix);
2761    }
2762
2763    let probed = symphonia::default::get_probe()
2764        .format(
2765            &hint,
2766            mss,
2767            &FormatOptions::default(),
2768            &MetadataOptions::default(),
2769        )
2770        .map_err(|err| warn!("failed to probe audio stream: {err}"))
2771        .ok()?;
2772
2773    let mut format = probed.format;
2774    let track = format.default_track()?;
2775    let track_id = track.id;
2776    let codec_params: CodecParameters = track.codec_params.clone();
2777    let sample_rate = codec_params.sample_rate;
2778    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2779
2780    let mut decoder = symphonia::default::get_codecs()
2781        .make(&codec_params, &DecoderOptions::default())
2782        .map_err(|err| warn!("failed to create audio decoder: {err}"))
2783        .ok()?;
2784
2785    let mut decoded_frames: u64 = 0;
2786    loop {
2787        let packet = match format.next_packet() {
2788            Ok(packet) => packet,
2789            Err(SymphoniaError::IoError(_)) => break,
2790            Err(SymphoniaError::DecodeError(err)) => {
2791                warn!("skipping undecodable audio packet: {err}");
2792                continue;
2793            }
2794            Err(SymphoniaError::ResetRequired) => {
2795                decoder.reset();
2796                continue;
2797            }
2798            Err(other) => {
2799                warn!("stopping audio analysis due to error: {other}");
2800                break;
2801            }
2802        };
2803
2804        if packet.track_id() != track_id {
2805            continue;
2806        }
2807
2808        match decoder.decode(&packet) {
2809            Ok(audio_buf) => {
2810                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2811            }
2812            Err(SymphoniaError::DecodeError(err)) => {
2813                warn!("failed to decode audio packet: {err}");
2814            }
2815            Err(SymphoniaError::IoError(_)) => break,
2816            Err(SymphoniaError::ResetRequired) => {
2817                decoder.reset();
2818            }
2819            Err(other) => {
2820                warn!("decoder error: {other}");
2821                break;
2822            }
2823        }
2824    }
2825
2826    let duration_secs = match sample_rate {
2827        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2828        _ => 0.0,
2829    };
2830
2831    let bitrate_kbps = if duration_secs > 0.0 {
2832        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2833    } else {
2834        None
2835    };
2836
2837    let tags = extract_lofty_tags(payload);
2838    let caption = tags
2839        .get("title")
2840        .cloned()
2841        .or_else(|| tags.get("tracktitle").cloned());
2842
2843    let mut search_terms = Vec::new();
2844    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2845        search_terms.push(title.clone());
2846    }
2847    if let Some(artist) = tags.get("artist") {
2848        search_terms.push(artist.clone());
2849    }
2850    if let Some(album) = tags.get("album") {
2851        search_terms.push(album.clone());
2852    }
2853    if let Some(genre) = tags.get("genre") {
2854        search_terms.push(genre.clone());
2855    }
2856
2857    let mut segments = Vec::new();
2858    if duration_secs > 0.0 {
2859        let segment_len = options.normalised_segment_secs() as f64;
2860        if segment_len > 0.0 {
2861            let mut start = 0.0;
2862            let mut idx = 1usize;
2863            while start < duration_secs {
2864                let end = (start + segment_len).min(duration_secs);
2865                segments.push(AudioSegmentMetadata {
2866                    start_seconds: start as f32,
2867                    end_seconds: end as f32,
2868                    label: Some(format!("Segment {}", idx)),
2869                });
2870                if end >= duration_secs {
2871                    break;
2872                }
2873                start = end;
2874                idx += 1;
2875            }
2876        }
2877    }
2878
2879    let mut metadata = DocAudioMetadata::default();
2880    if duration_secs > 0.0 {
2881        metadata.duration_secs = Some(duration_secs as f32);
2882    }
2883    if let Some(rate) = sample_rate {
2884        metadata.sample_rate_hz = Some(rate);
2885    }
2886    if let Some(channels) = channel_count {
2887        metadata.channels = Some(channels);
2888    }
2889    if let Some(bitrate) = bitrate_kbps {
2890        metadata.bitrate_kbps = Some(bitrate);
2891    }
2892    if codec_params.codec != CODEC_TYPE_NULL {
2893        metadata.codec = Some(format!("{:?}", codec_params.codec));
2894    }
2895    if !segments.is_empty() {
2896        metadata.segments = segments;
2897    }
2898    if !tags.is_empty() {
2899        metadata.tags = tags
2900            .iter()
2901            .map(|(k, v)| (k.clone(), v.clone()))
2902            .collect::<BTreeMap<_, _>>();
2903    }
2904
2905    Some(AudioAnalysis {
2906        metadata,
2907        caption,
2908        search_terms,
2909    })
2910}
2911
2912fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2913    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2914
2915    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2916        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2917            out.entry("title".into())
2918                .or_insert_with(|| value.to_string());
2919            out.entry("tracktitle".into())
2920                .or_insert_with(|| value.to_string());
2921        }
2922        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2923            out.entry("artist".into())
2924                .or_insert_with(|| value.to_string());
2925        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2926            out.entry("artist".into())
2927                .or_insert_with(|| value.to_string());
2928        }
2929        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2930            out.entry("album".into())
2931                .or_insert_with(|| value.to_string());
2932        }
2933        if let Some(value) = tag.get_string(&ItemKey::Genre) {
2934            out.entry("genre".into())
2935                .or_insert_with(|| value.to_string());
2936        }
2937        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2938            out.entry("track_number".into())
2939                .or_insert_with(|| value.to_string());
2940        }
2941        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2942            out.entry("disc_number".into())
2943                .or_insert_with(|| value.to_string());
2944        }
2945    }
2946
2947    let mut tags = HashMap::new();
2948    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2949        Ok(probe) => probe,
2950        Err(err) => {
2951            warn!("failed to guess audio tag file type: {err}");
2952            return tags;
2953        }
2954    };
2955
2956    let tagged_file = match probe.read() {
2957        Ok(file) => file,
2958        Err(err) => {
2959            warn!("failed to read audio tags: {err}");
2960            return tags;
2961        }
2962    };
2963
2964    if let Some(primary) = tagged_file.primary_tag() {
2965        collect_tag(primary, &mut tags);
2966    }
2967    for tag in tagged_file.tags() {
2968        collect_tag(tag, &mut tags);
2969    }
2970
2971    tags
2972}
2973
2974fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2975    let mut cursor = Cursor::new(payload);
2976    let exif = match ExifReader::new().read_from_container(&mut cursor) {
2977        Ok(exif) => exif,
2978        Err(err) => {
2979            warn!("failed to read EXIF metadata: {err}");
2980            return (None, None);
2981        }
2982    };
2983
2984    let mut doc = DocExifMetadata::default();
2985    let mut has_data = false;
2986
2987    if let Some(make) = exif_string(&exif, Tag::Make) {
2988        doc.make = Some(make);
2989        has_data = true;
2990    }
2991    if let Some(model) = exif_string(&exif, Tag::Model) {
2992        doc.model = Some(model);
2993        has_data = true;
2994    }
2995    if let Some(lens) =
2996        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
2997    {
2998        doc.lens = Some(lens);
2999        has_data = true;
3000    }
3001    if let Some(dt) =
3002        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3003    {
3004        doc.datetime = Some(dt);
3005        has_data = true;
3006    }
3007
3008    if let Some(gps) = extract_gps_metadata(&exif) {
3009        doc.gps = Some(gps);
3010        has_data = true;
3011    }
3012
3013    let caption = exif_string(&exif, Tag::ImageDescription);
3014
3015    let metadata = if has_data { Some(doc) } else { None };
3016    (metadata, caption)
3017}
3018
3019fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3020    exif.fields()
3021        .find(|field| field.tag == tag)
3022        .and_then(|field| field_to_string(field, exif))
3023}
3024
3025fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3026    let value = field.display_value().with_unit(exif).to_string();
3027    let trimmed = value.trim_matches('\0').trim();
3028    if trimmed.is_empty() {
3029        None
3030    } else {
3031        Some(trimmed.to_string())
3032    }
3033}
3034
3035fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3036    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3037    let latitude_ref_field = exif
3038        .fields()
3039        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3040    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3041    let longitude_ref_field = exif
3042        .fields()
3043        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3044
3045    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3046    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3047
3048    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3049        if reference.eq_ignore_ascii_case("S") {
3050            latitude = -latitude;
3051        }
3052    }
3053    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3054        if reference.eq_ignore_ascii_case("W") {
3055            longitude = -longitude;
3056        }
3057    }
3058
3059    Some(DocGpsMetadata {
3060        latitude,
3061        longitude,
3062    })
3063}
3064
3065fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3066    match value {
3067        ExifValue::Rational(values) if !values.is_empty() => {
3068            let deg = rational_to_f64_u(values.get(0)?)?;
3069            let min = values
3070                .get(1)
3071                .and_then(|v| rational_to_f64_u(v))
3072                .unwrap_or(0.0);
3073            let sec = values
3074                .get(2)
3075                .and_then(|v| rational_to_f64_u(v))
3076                .unwrap_or(0.0);
3077            Some(deg + (min / 60.0) + (sec / 3600.0))
3078        }
3079        ExifValue::SRational(values) if !values.is_empty() => {
3080            let deg = rational_to_f64_i(values.get(0)?)?;
3081            let min = values
3082                .get(1)
3083                .and_then(|v| rational_to_f64_i(v))
3084                .unwrap_or(0.0);
3085            let sec = values
3086                .get(2)
3087                .and_then(|v| rational_to_f64_i(v))
3088                .unwrap_or(0.0);
3089            Some(deg + (min / 60.0) + (sec / 3600.0))
3090        }
3091        _ => None,
3092    }
3093}
3094
3095fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3096    if value.denom == 0 {
3097        None
3098    } else {
3099        Some(value.num as f64 / value.denom as f64)
3100    }
3101}
3102
3103fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3104    if value.denom == 0 {
3105        None
3106    } else {
3107        Some(value.num as f64 / value.denom as f64)
3108    }
3109}
3110
3111fn value_to_ascii(value: &ExifValue) -> Option<String> {
3112    if let ExifValue::Ascii(values) = value {
3113        values
3114            .first()
3115            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3116            .map(|s| s.trim_matches('\0').trim().to_string())
3117            .filter(|s| !s.is_empty())
3118    } else {
3119        None
3120    }
3121}
3122
3123fn ingest_payload(
3124    mem: &mut Memvid,
3125    args: &PutArgs,
3126    payload: Vec<u8>,
3127    source_path: Option<&Path>,
3128    capacity_guard: Option<&mut CapacityGuard>,
3129    enable_embedding: bool,
3130    runtime: Option<&EmbeddingRuntime>,
3131    parallel_mode: bool,
3132) -> Result<IngestOutcome> {
3133    let original_bytes = payload.len();
3134    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3135
3136    let payload_text = std::str::from_utf8(&payload).ok();
3137    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3138
3139    let audio_opts = AudioAnalyzeOptions {
3140        force: args.audio,
3141        segment_secs: args
3142            .audio_segment_seconds
3143            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3144    };
3145
3146    let mut analysis = analyze_file(
3147        source_path,
3148        &payload,
3149        payload_text,
3150        inferred_title.as_deref(),
3151        audio_opts,
3152        args.video,
3153    );
3154
3155    if args.video && !analysis.mime.starts_with("video/") {
3156        anyhow::bail!(
3157            "--video requires a video input; detected MIME type {}",
3158            analysis.mime
3159        );
3160    }
3161
3162    let mut search_text = analysis.search_text.clone();
3163    if let Some(ref mut text) = search_text {
3164        if text.len() > MAX_SEARCH_TEXT_LEN {
3165            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3166            *text = truncated;
3167        }
3168    }
3169    analysis.search_text = search_text.clone();
3170
3171    let uri = if let Some(ref explicit) = args.uri {
3172        derive_uri(Some(explicit), None)
3173    } else if args.video {
3174        derive_video_uri(&payload, source_path, &analysis.mime)
3175    } else {
3176        derive_uri(None, source_path)
3177    };
3178
3179    let mut options_builder = PutOptions::builder()
3180        .enable_embedding(enable_embedding)
3181        .auto_tag(!args.no_auto_tag)
3182        .extract_dates(!args.no_extract_dates);
3183    if let Some(ts) = args.timestamp {
3184        options_builder = options_builder.timestamp(ts);
3185    }
3186    if let Some(track) = &args.track {
3187        options_builder = options_builder.track(track.clone());
3188    }
3189    if args.video {
3190        options_builder = options_builder.kind("video");
3191        options_builder = options_builder.tag("kind", "video");
3192        options_builder = options_builder.push_tag("video");
3193    } else if let Some(kind) = &args.kind {
3194        options_builder = options_builder.kind(kind.clone());
3195    }
3196    options_builder = options_builder.uri(uri.clone());
3197    if let Some(ref title) = inferred_title {
3198        options_builder = options_builder.title(title.clone());
3199    }
3200    for (key, value) in &args.tags {
3201        options_builder = options_builder.tag(key.clone(), value.clone());
3202        if !key.is_empty() {
3203            options_builder = options_builder.push_tag(key.clone());
3204        }
3205        if !value.is_empty() && value != key {
3206            options_builder = options_builder.push_tag(value.clone());
3207        }
3208    }
3209    for label in &args.labels {
3210        options_builder = options_builder.label(label.clone());
3211    }
3212    if let Some(metadata) = analysis.metadata.clone() {
3213        if !metadata.is_empty() {
3214            options_builder = options_builder.metadata(metadata);
3215        }
3216    }
3217    for (idx, entry) in args.metadata.iter().enumerate() {
3218        match serde_json::from_str::<DocMetadata>(entry) {
3219            Ok(meta) => {
3220                options_builder = options_builder.metadata(meta);
3221            }
3222            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3223                Ok(value) => {
3224                    options_builder =
3225                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3226                }
3227                Err(err) => {
3228                    warn!("failed to parse --metadata JSON: {err}");
3229                }
3230            },
3231        }
3232    }
3233    if let Some(text) = search_text.clone() {
3234        if !text.trim().is_empty() {
3235            options_builder = options_builder.search_text(text);
3236        }
3237    }
3238    let options = options_builder.build();
3239
3240    let existing_frame = if args.update_existing || !args.allow_duplicate {
3241        match mem.frame_by_uri(&uri) {
3242            Ok(frame) => Some(frame),
3243            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3244            Err(err) => return Err(err.into()),
3245        }
3246    } else {
3247        None
3248    };
3249
3250    // Compute frame_id: for updates it's the existing frame's id,
3251    // for new frames it's the current frame count (which becomes the next id)
3252    let frame_id = if let Some(ref existing) = existing_frame {
3253        existing.id
3254    } else {
3255        mem.stats()?.frame_count
3256    };
3257
3258    let mut embedded = false;
3259    let allow_embedding = enable_embedding && !args.video;
3260    if enable_embedding && !allow_embedding && args.video {
3261        warn!("semantic embeddings are not generated for video payloads");
3262    }
3263    let seq = if let Some(existing) = existing_frame {
3264        if args.update_existing {
3265            let payload_for_update = payload.clone();
3266            if allow_embedding {
3267                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3268                let embed_text = search_text
3269                    .clone()
3270                    .or_else(|| payload_text.map(|text| text.to_string()))
3271                    .unwrap_or_default();
3272                if embed_text.trim().is_empty() {
3273                    warn!("no textual content available; embedding skipped");
3274                    mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3275                        .map_err(|err| {
3276                            map_put_error(
3277                                err,
3278                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3279                            )
3280                        })?
3281                } else {
3282                    // Truncate to avoid token limits
3283                    let truncated_text = truncate_for_embedding(&embed_text);
3284                    if truncated_text.len() < embed_text.len() {
3285                        info!(
3286                            "Truncated text from {} to {} chars for embedding",
3287                            embed_text.len(),
3288                            truncated_text.len()
3289                        );
3290                    }
3291                    let embedding = runtime.embed(&truncated_text)?;
3292                    embedded = true;
3293                    mem.update_frame(
3294                        existing.id,
3295                        Some(payload_for_update),
3296                        options.clone(),
3297                        Some(embedding),
3298                    )
3299                    .map_err(|err| {
3300                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3301                    })?
3302                }
3303            } else {
3304                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3305                    .map_err(|err| {
3306                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3307                    })?
3308            }
3309        } else {
3310            return Err(DuplicateUriError::new(uri.as_str()).into());
3311        }
3312    } else {
3313        if let Some(guard) = capacity_guard.as_ref() {
3314            guard.ensure_capacity(stored_bytes as u64)?;
3315        }
3316        if parallel_mode {
3317            #[cfg(feature = "parallel_segments")]
3318            {
3319                let mut parent_embedding = None;
3320                let mut chunk_embeddings_vec = None;
3321                if allow_embedding {
3322                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3323                    let embed_text = search_text
3324                        .clone()
3325                        .or_else(|| payload_text.map(|text| text.to_string()))
3326                        .unwrap_or_default();
3327                    if embed_text.trim().is_empty() {
3328                        warn!("no textual content available; embedding skipped");
3329                    } else {
3330                        // Check if document will be chunked - if so, embed each chunk
3331                        info!(
3332                            "parallel mode: checking for chunks on {} bytes payload",
3333                            payload.len()
3334                        );
3335                        if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3336                            // Document will be chunked - embed each chunk for full semantic coverage
3337                            info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3338
3339                            // Apply temporal enrichment if enabled (before contextual)
3340                            #[cfg(feature = "temporal_enrich")]
3341                            let enriched_chunks = if args.temporal_enrich {
3342                                info!(
3343                                    "Temporal enrichment enabled, processing {} chunks",
3344                                    chunk_texts.len()
3345                                );
3346                                // Use today's date as fallback document date
3347                                let today = chrono::Local::now().date_naive();
3348                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3349                                // Results are tuples of (enriched_text, TemporalEnrichment)
3350                                let enriched: Vec<String> =
3351                                    results.iter().map(|(text, _)| text.clone()).collect();
3352                                let resolved_count = results
3353                                    .iter()
3354                                    .filter(|(_, e)| {
3355                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3356                                    })
3357                                    .count();
3358                                info!(
3359                                    "Temporal enrichment: resolved {} chunks with temporal context",
3360                                    resolved_count
3361                                );
3362                                enriched
3363                            } else {
3364                                chunk_texts.clone()
3365                            };
3366                            #[cfg(not(feature = "temporal_enrich"))]
3367                            let enriched_chunks = {
3368                                if args.temporal_enrich {
3369                                    warn!(
3370                                        "Temporal enrichment requested but feature not compiled in"
3371                                    );
3372                                }
3373                                chunk_texts.clone()
3374                            };
3375
3376                            // Apply contextual retrieval if enabled
3377                            let embed_chunks = if args.contextual {
3378                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3379                                let engine = if args.contextual_model.as_deref() == Some("local") {
3380                                    #[cfg(feature = "llama-cpp")]
3381                                    {
3382                                        let model_path = get_local_contextual_model_path()?;
3383                                        ContextualEngine::local(model_path)
3384                                    }
3385                                    #[cfg(not(feature = "llama-cpp"))]
3386                                    {
3387                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3388                                    }
3389                                } else if let Some(model) = &args.contextual_model {
3390                                    ContextualEngine::openai_with_model(model)?
3391                                } else {
3392                                    ContextualEngine::openai()?
3393                                };
3394
3395                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3396                                {
3397                                    Ok(contexts) => {
3398                                        info!("Generated {} contextual prefixes", contexts.len());
3399                                        apply_contextual_prefixes(
3400                                            &embed_text,
3401                                            &enriched_chunks,
3402                                            &contexts,
3403                                        )
3404                                    }
3405                                    Err(e) => {
3406                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3407                                        enriched_chunks.clone()
3408                                    }
3409                                }
3410                            } else {
3411                                enriched_chunks
3412                            };
3413
3414                            let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3415                            for chunk_text in &embed_chunks {
3416                                // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
3417                                let truncated_chunk = truncate_for_embedding(chunk_text);
3418                                if truncated_chunk.len() < chunk_text.len() {
3419                                    info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
3420                                }
3421                                let chunk_emb = runtime.embed(&truncated_chunk)?;
3422                                chunk_embeddings.push(chunk_emb);
3423                            }
3424                            let num_chunks = chunk_embeddings.len();
3425                            chunk_embeddings_vec = Some(chunk_embeddings);
3426                            // Also embed the parent for the parent frame (truncate to avoid token limits)
3427                            let parent_text = truncate_for_embedding(&embed_text);
3428                            if parent_text.len() < embed_text.len() {
3429                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3430                            }
3431                            parent_embedding = Some(runtime.embed(&parent_text)?);
3432                            embedded = true;
3433                            info!(
3434                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3435                                num_chunks
3436                            );
3437                        } else {
3438                            // No chunking - just embed the whole document (truncate to avoid token limits)
3439                            info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3440                            let truncated_text = truncate_for_embedding(&embed_text);
3441                            if truncated_text.len() < embed_text.len() {
3442                                info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3443                            }
3444                            parent_embedding = Some(runtime.embed(&truncated_text)?);
3445                            embedded = true;
3446                        }
3447                    }
3448                }
3449                let payload_variant = if let Some(path) = source_path {
3450                    ParallelPayload::Path(path.to_path_buf())
3451                } else {
3452                    ParallelPayload::Bytes(payload)
3453                };
3454                let input = ParallelInput {
3455                    payload: payload_variant,
3456                    options: options.clone(),
3457                    embedding: parent_embedding,
3458                    chunk_embeddings: chunk_embeddings_vec,
3459                };
3460                return Ok(IngestOutcome {
3461                    report: PutReport {
3462                        seq: 0,
3463                        frame_id: 0, // Will be populated after parallel commit
3464                        uri,
3465                        title: inferred_title,
3466                        original_bytes,
3467                        stored_bytes,
3468                        compressed,
3469                        source: source_path.map(Path::to_path_buf),
3470                        mime: Some(analysis.mime),
3471                        metadata: analysis.metadata,
3472                        extraction: analysis.extraction,
3473                        #[cfg(feature = "logic_mesh")]
3474                        search_text: search_text.clone(),
3475                    },
3476                    embedded,
3477                    parallel_input: Some(input),
3478                });
3479            }
3480            #[cfg(not(feature = "parallel_segments"))]
3481            {
3482                mem.put_bytes_with_options(&payload, options)
3483                    .map_err(|err| {
3484                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3485                    })?
3486            }
3487        } else if allow_embedding {
3488            let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3489            let embed_text = search_text
3490                .clone()
3491                .or_else(|| payload_text.map(|text| text.to_string()))
3492                .unwrap_or_default();
3493            if embed_text.trim().is_empty() {
3494                warn!("no textual content available; embedding skipped");
3495                mem.put_bytes_with_options(&payload, options)
3496                    .map_err(|err| {
3497                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3498                    })?
3499            } else {
3500                // Check if document will be chunked - if so, embed each chunk
3501                if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3502                    // Document will be chunked - embed each chunk for full semantic coverage
3503                    info!(
3504                        "Document will be chunked into {} chunks, generating embeddings for each",
3505                        chunk_texts.len()
3506                    );
3507
3508                    // Apply temporal enrichment if enabled (before contextual)
3509                    #[cfg(feature = "temporal_enrich")]
3510                    let enriched_chunks = if args.temporal_enrich {
3511                        info!(
3512                            "Temporal enrichment enabled, processing {} chunks",
3513                            chunk_texts.len()
3514                        );
3515                        // Use today's date as fallback document date
3516                        let today = chrono::Local::now().date_naive();
3517                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3518                        // Results are tuples of (enriched_text, TemporalEnrichment)
3519                        let enriched: Vec<String> =
3520                            results.iter().map(|(text, _)| text.clone()).collect();
3521                        let resolved_count = results
3522                            .iter()
3523                            .filter(|(_, e)| {
3524                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
3525                            })
3526                            .count();
3527                        info!(
3528                            "Temporal enrichment: resolved {} chunks with temporal context",
3529                            resolved_count
3530                        );
3531                        enriched
3532                    } else {
3533                        chunk_texts.clone()
3534                    };
3535                    #[cfg(not(feature = "temporal_enrich"))]
3536                    let enriched_chunks = {
3537                        if args.temporal_enrich {
3538                            warn!("Temporal enrichment requested but feature not compiled in");
3539                        }
3540                        chunk_texts.clone()
3541                    };
3542
3543                    // Apply contextual retrieval if enabled
3544                    let embed_chunks = if args.contextual {
3545                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3546                        let engine = if args.contextual_model.as_deref() == Some("local") {
3547                            #[cfg(feature = "llama-cpp")]
3548                            {
3549                                let model_path = get_local_contextual_model_path()?;
3550                                ContextualEngine::local(model_path)
3551                            }
3552                            #[cfg(not(feature = "llama-cpp"))]
3553                            {
3554                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3555                            }
3556                        } else if let Some(model) = &args.contextual_model {
3557                            ContextualEngine::openai_with_model(model)?
3558                        } else {
3559                            ContextualEngine::openai()?
3560                        };
3561
3562                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3563                            Ok(contexts) => {
3564                                info!("Generated {} contextual prefixes", contexts.len());
3565                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3566                            }
3567                            Err(e) => {
3568                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3569                                enriched_chunks.clone()
3570                            }
3571                        }
3572                    } else {
3573                        enriched_chunks
3574                    };
3575
3576                    let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3577                    for chunk_text in &embed_chunks {
3578                        // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
3579                        let truncated_chunk = truncate_for_embedding(chunk_text);
3580                        if truncated_chunk.len() < chunk_text.len() {
3581                            info!(
3582                                "Truncated chunk from {} to {} chars for embedding",
3583                                chunk_text.len(),
3584                                truncated_chunk.len()
3585                            );
3586                        }
3587                        let chunk_emb = runtime.embed(&truncated_chunk)?;
3588                        chunk_embeddings.push(chunk_emb);
3589                    }
3590                    // Truncate parent text to avoid token limits
3591                    let parent_text = truncate_for_embedding(&embed_text);
3592                    if parent_text.len() < embed_text.len() {
3593                        info!(
3594                            "Truncated parent text from {} to {} chars for embedding",
3595                            embed_text.len(),
3596                            parent_text.len()
3597                        );
3598                    }
3599                    let parent_embedding = runtime.embed(&parent_text)?;
3600                    embedded = true;
3601                    info!(
3602                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
3603                        chunk_embeddings.len()
3604                    );
3605                    mem.put_with_chunk_embeddings(
3606                        &payload,
3607                        Some(parent_embedding),
3608                        chunk_embeddings,
3609                        options,
3610                    )
3611                    .map_err(|err| {
3612                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3613                    })?
3614                } else {
3615                    // No chunking - just embed the whole document (truncate to avoid token limits)
3616                    info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3617                    let truncated_text = truncate_for_embedding(&embed_text);
3618                    if truncated_text.len() < embed_text.len() {
3619                        info!(
3620                            "Truncated text from {} to {} chars for embedding",
3621                            embed_text.len(),
3622                            truncated_text.len()
3623                        );
3624                    }
3625                    let embedding = runtime.embed(&truncated_text)?;
3626                    embedded = true;
3627                    mem.put_with_embedding_and_options(&payload, embedding, options)
3628                        .map_err(|err| {
3629                            map_put_error(
3630                                err,
3631                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3632                            )
3633                        })?
3634                }
3635            }
3636        } else {
3637            mem.put_bytes_with_options(&payload, options)
3638                .map_err(|err| {
3639                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3640                })?
3641        }
3642    };
3643
3644    Ok(IngestOutcome {
3645        report: PutReport {
3646            seq,
3647            frame_id,
3648            uri,
3649            title: inferred_title,
3650            original_bytes,
3651            stored_bytes,
3652            compressed,
3653            source: source_path.map(Path::to_path_buf),
3654            mime: Some(analysis.mime),
3655            metadata: analysis.metadata,
3656            extraction: analysis.extraction,
3657            #[cfg(feature = "logic_mesh")]
3658            search_text,
3659        },
3660        embedded,
3661        #[cfg(feature = "parallel_segments")]
3662        parallel_input: None,
3663    })
3664}
3665
3666fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
3667    if std::str::from_utf8(payload).is_ok() {
3668        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
3669        Ok((compressed.len(), true))
3670    } else {
3671        Ok((payload.len(), false))
3672    }
3673}
3674
3675fn print_report(report: &PutReport) {
3676    let name = report
3677        .source
3678        .as_ref()
3679        .map(|path| {
3680            let pretty = env::current_dir()
3681                .ok()
3682                .as_ref()
3683                .and_then(|cwd| diff_paths(path, cwd))
3684                .unwrap_or_else(|| path.to_path_buf());
3685            pretty.to_string_lossy().into_owned()
3686        })
3687        .unwrap_or_else(|| "stdin".to_string());
3688
3689    let ratio = if report.original_bytes > 0 {
3690        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
3691    } else {
3692        100.0
3693    };
3694
3695    println!("• {name} → {}", report.uri);
3696    println!("  seq: {}", report.seq);
3697    if report.compressed {
3698        println!(
3699            "  size: {} B → {} B ({:.1}% of original, compressed)",
3700            report.original_bytes, report.stored_bytes, ratio
3701        );
3702    } else {
3703        println!("  size: {} B (stored as-is)", report.original_bytes);
3704    }
3705    if let Some(mime) = &report.mime {
3706        println!("  mime: {mime}");
3707    }
3708    if let Some(title) = &report.title {
3709        println!("  title: {title}");
3710    }
3711    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3712    println!(
3713        "  extraction: status={} reader={}",
3714        report.extraction.status.label(),
3715        extraction_reader
3716    );
3717    if let Some(ms) = report.extraction.duration_ms {
3718        println!("  extraction-duration: {} ms", ms);
3719    }
3720    if let Some(pages) = report.extraction.pages_processed {
3721        println!("  extraction-pages: {}", pages);
3722    }
3723    for warning in &report.extraction.warnings {
3724        println!("  warning: {warning}");
3725    }
3726    if let Some(metadata) = &report.metadata {
3727        if let Some(caption) = &metadata.caption {
3728            println!("  caption: {caption}");
3729        }
3730        if let Some(audio) = metadata.audio.as_ref() {
3731            if audio.duration_secs.is_some()
3732                || audio.sample_rate_hz.is_some()
3733                || audio.channels.is_some()
3734            {
3735                let duration = audio
3736                    .duration_secs
3737                    .map(|secs| format!("{secs:.1}s"))
3738                    .unwrap_or_else(|| "unknown".into());
3739                let rate = audio
3740                    .sample_rate_hz
3741                    .map(|hz| format!("{} Hz", hz))
3742                    .unwrap_or_else(|| "? Hz".into());
3743                let channels = audio
3744                    .channels
3745                    .map(|ch| ch.to_string())
3746                    .unwrap_or_else(|| "?".into());
3747                println!("  audio: duration {duration}, {channels} ch, {rate}");
3748            }
3749        }
3750    }
3751
3752    println!();
3753}
3754
3755fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3756    let source = report
3757        .source
3758        .as_ref()
3759        .map(|path| path.to_string_lossy().into_owned());
3760    let extraction = json!({
3761        "status": report.extraction.status.label(),
3762        "reader": report.extraction.reader,
3763        "duration_ms": report.extraction.duration_ms,
3764        "pages_processed": report.extraction.pages_processed,
3765        "warnings": report.extraction.warnings,
3766    });
3767    json!({
3768        "seq": report.seq,
3769        "uri": report.uri,
3770        "title": report.title,
3771        "original_bytes": report.original_bytes,
3772        "original_bytes_human": format_bytes(report.original_bytes as u64),
3773        "stored_bytes": report.stored_bytes,
3774        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3775        "compressed": report.compressed,
3776        "mime": report.mime,
3777        "source": source,
3778        "metadata": report.metadata,
3779        "extraction": extraction,
3780    })
3781}
3782
3783fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3784    match err {
3785        MemvidError::CapacityExceeded {
3786            current,
3787            limit,
3788            required,
3789        } => anyhow!(CapacityExceededMessage {
3790            current,
3791            limit,
3792            required,
3793        }),
3794        other => other.into(),
3795    }
3796}
3797
3798fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3799    for (idx, entry) in entries.iter().enumerate() {
3800        match serde_json::from_str::<DocMetadata>(entry) {
3801            Ok(meta) => {
3802                options.metadata = Some(meta);
3803            }
3804            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3805                Ok(value) => {
3806                    options
3807                        .extra_metadata
3808                        .insert(format!("custom_metadata_{idx}"), value.to_string());
3809                }
3810                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3811            },
3812        }
3813    }
3814}
3815
3816fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3817    let stats = mem.stats()?;
3818    let message = match stats.tier {
3819        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3820        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3821    };
3822    Ok(message)
3823}
3824
3825fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3826    let trimmed = raw.trim().trim_start_matches("mv2://");
3827    let normalized = trimmed.replace('\\', "/");
3828    let mut segments: Vec<String> = Vec::new();
3829    for segment in normalized.split('/') {
3830        if segment.is_empty() || segment == "." {
3831            continue;
3832        }
3833        if segment == ".." {
3834            segments.pop();
3835            continue;
3836        }
3837        segments.push(segment.to_string());
3838    }
3839
3840    if segments.is_empty() {
3841        return normalized
3842            .split('/')
3843            .last()
3844            .filter(|s| !s.is_empty())
3845            .unwrap_or("document")
3846            .to_string();
3847    }
3848
3849    if keep_path {
3850        segments.join("/")
3851    } else {
3852        segments
3853            .last()
3854            .cloned()
3855            .unwrap_or_else(|| "document".to_string())
3856    }
3857}
3858
3859fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3860    if quiet {
3861        let pb = ProgressBar::hidden();
3862        pb.set_message(message.to_string());
3863        return pb;
3864    }
3865
3866    match total {
3867        Some(len) => {
3868            let pb = ProgressBar::new(len);
3869            pb.set_draw_target(ProgressDrawTarget::stderr());
3870            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3871                .unwrap_or_else(|_| ProgressStyle::default_bar());
3872            pb.set_style(style);
3873            pb.set_message(message.to_string());
3874            pb
3875        }
3876        None => {
3877            let pb = ProgressBar::new_spinner();
3878            pb.set_draw_target(ProgressDrawTarget::stderr());
3879            pb.set_message(message.to_string());
3880            pb.enable_steady_tick(Duration::from_millis(120));
3881            pb
3882        }
3883    }
3884}
3885
3886fn create_spinner(message: &str) -> ProgressBar {
3887    let pb = ProgressBar::new_spinner();
3888    pb.set_draw_target(ProgressDrawTarget::stderr());
3889    let style = ProgressStyle::with_template("{spinner} {msg}")
3890        .unwrap_or_else(|_| ProgressStyle::default_spinner())
3891        .tick_strings(&["-", "\\", "|", "/"]);
3892    pb.set_style(style);
3893    pb.set_message(message.to_string());
3894    pb.enable_steady_tick(Duration::from_millis(120));
3895    pb
3896}
3897
3898// ============================================================================
3899// put-many command - batch ingestion with pre-computed embeddings
3900// ============================================================================
3901
3902/// Arguments for the `put-many` subcommand
3903#[cfg(feature = "parallel_segments")]
3904#[derive(Args)]
3905pub struct PutManyArgs {
3906    /// Path to the memory file to append to
3907    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3908    pub file: PathBuf,
3909    /// Emit JSON instead of human-readable output
3910    #[arg(long)]
3911    pub json: bool,
3912    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
3913    /// If not specified, reads from stdin
3914    #[arg(long, value_name = "PATH")]
3915    pub input: Option<PathBuf>,
3916    /// Zstd compression level (1-9, default: 3)
3917    #[arg(long, default_value = "3")]
3918    pub compression_level: i32,
3919    #[command(flatten)]
3920    pub lock: LockCliArgs,
3921}
3922
3923/// JSON input format for put-many requests
3924#[cfg(feature = "parallel_segments")]
3925#[derive(Debug, serde::Deserialize)]
3926pub struct PutManyRequest {
3927    pub title: String,
3928    #[serde(default)]
3929    pub label: String,
3930    pub text: String,
3931    #[serde(default)]
3932    pub uri: Option<String>,
3933    #[serde(default)]
3934    pub tags: Vec<String>,
3935    #[serde(default)]
3936    pub labels: Vec<String>,
3937    #[serde(default)]
3938    pub metadata: serde_json::Value,
3939    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
3940    #[serde(default)]
3941    pub embedding: Option<Vec<f32>>,
3942    /// Pre-computed embeddings for each chunk (matched by index)
3943    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
3944    /// The number of chunk embeddings should match the number of chunks that will be created.
3945    #[serde(default)]
3946    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3947}
3948
3949/// Batch input format (array of requests)
3950#[cfg(feature = "parallel_segments")]
3951#[derive(Debug, serde::Deserialize)]
3952#[serde(transparent)]
3953pub struct PutManyBatch {
3954    pub requests: Vec<PutManyRequest>,
3955}
3956
3957#[cfg(feature = "parallel_segments")]
3958pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3959    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3960
3961    let mut mem = Memvid::open(&args.file)?;
3962    crate::utils::ensure_cli_mutation_allowed(&mem)?;
3963    crate::utils::apply_lock_cli(&mut mem, &args.lock);
3964
3965    // Ensure indexes are enabled
3966    let stats = mem.stats()?;
3967    if !stats.has_lex_index {
3968        mem.enable_lex()?;
3969    }
3970    if !stats.has_vec_index {
3971        mem.enable_vec()?;
3972    }
3973
3974    // Read batch input
3975    let batch_json: String = if let Some(input_path) = &args.input {
3976        fs::read_to_string(input_path)
3977            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3978    } else {
3979        let mut buffer = String::new();
3980        io::stdin()
3981            .read_line(&mut buffer)
3982            .context("Failed to read from stdin")?;
3983        buffer
3984    };
3985
3986    let batch: PutManyBatch =
3987        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3988
3989    if batch.requests.is_empty() {
3990        if args.json {
3991            println!("{{\"frame_ids\": [], \"count\": 0}}");
3992        } else {
3993            println!("No requests to process");
3994        }
3995        return Ok(());
3996    }
3997
3998    let count = batch.requests.len();
3999    if !args.json {
4000        eprintln!("Processing {} documents...", count);
4001    }
4002
4003    // Convert to ParallelInput
4004    let inputs: Vec<ParallelInput> = batch
4005        .requests
4006        .into_iter()
4007        .enumerate()
4008        .map(|(i, req)| {
4009            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4010            let mut options = PutOptions::default();
4011            options.title = Some(req.title);
4012            options.uri = Some(uri);
4013            options.tags = req.tags;
4014            options.labels = req.labels;
4015
4016            ParallelInput {
4017                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4018                options,
4019                embedding: req.embedding,
4020                chunk_embeddings: req.chunk_embeddings,
4021            }
4022        })
4023        .collect();
4024
4025    // Build options
4026    let build_opts = BuildOpts {
4027        zstd_level: args.compression_level.clamp(1, 9),
4028        ..BuildOpts::default()
4029    };
4030
4031    // Ingest batch
4032    let frame_ids = mem
4033        .put_parallel_inputs(&inputs, build_opts)
4034        .context("Failed to ingest batch")?;
4035
4036    if args.json {
4037        let output = serde_json::json!({
4038            "frame_ids": frame_ids,
4039            "count": frame_ids.len()
4040        });
4041        println!("{}", serde_json::to_string(&output)?);
4042    } else {
4043        println!("Ingested {} documents", frame_ids.len());
4044        if frame_ids.len() <= 10 {
4045            for fid in &frame_ids {
4046                println!("  frame_id: {}", fid);
4047            }
4048        } else {
4049            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4050            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4051        }
4052    }
4053
4054    Ok(())
4055}