memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34};
35#[cfg(feature = "clip")]
36use memvid_core::FrameRole;
37#[cfg(feature = "parallel_segments")]
38use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
39use pdf_extract::OutputError as PdfExtractError;
40use serde_json::json;
41use tracing::{info, warn};
42use tree_magic_mini::from_u8 as magic_from_u8;
43use uuid::Uuid;
44
45const MAX_SEARCH_TEXT_LEN: usize = 32_768;
46/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
47/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
48const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
49const COLOR_PALETTE_SIZE: u8 = 5;
50const COLOR_PALETTE_QUALITY: u8 = 8;
51const MAGIC_SNIFF_BYTES: usize = 16;
52/// Maximum number of images to extract from a PDF for CLIP visual search.
53/// Set high to capture all images; processing stops when no more images found.
54#[cfg(feature = "clip")]
55const CLIP_PDF_MAX_IMAGES: usize = 100;
56#[cfg(feature = "clip")]
57const CLIP_PDF_TARGET_PX: u32 = 768;
58
59/// Truncate text for embedding to fit within OpenAI token limits.
60/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
61fn truncate_for_embedding(text: &str) -> String {
62    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
63        text.to_string()
64    } else {
65        // Find a char boundary to avoid splitting UTF-8
66        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
67        // Try to find the last complete character
68        let end = truncated
69            .char_indices()
70            .rev()
71            .next()
72            .map(|(i, c)| i + c.len_utf8())
73            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
74        text[..end].to_string()
75    }
76}
77
78/// Get the default local model path for contextual retrieval (phi-3.5-mini).
79/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
80#[cfg(feature = "llama-cpp")]
81fn get_local_contextual_model_path() -> Result<PathBuf> {
82    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
83        // Handle ~ expansion manually
84        let expanded = if custom.starts_with("~/") {
85            if let Ok(home) = env::var("HOME") {
86                PathBuf::from(home).join(&custom[2..])
87            } else {
88                PathBuf::from(&custom)
89            }
90        } else {
91            PathBuf::from(&custom)
92        };
93        expanded
94    } else {
95        // Default to ~/.memvid/models
96        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
97        PathBuf::from(home).join(".memvid").join("models")
98    };
99
100    let model_path = models_dir
101        .join("llm")
102        .join("phi-3-mini-q4")
103        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
104
105    if model_path.exists() {
106        Ok(model_path)
107    } else {
108        Err(anyhow!(
109            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
110            model_path.display()
111        ))
112    }
113}
114
115use pathdiff::diff_paths;
116
117use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
118use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
119use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingRuntime};
120use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
121use crate::error::{CapacityExceededMessage, DuplicateUriError};
122use crate::utils::{
123    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
124    select_frame,
125};
126#[cfg(feature = "temporal_enrich")]
127use memvid_core::enrich_chunks as temporal_enrich_chunks;
128
129/// Helper function to parse boolean flags from environment variables
130fn parse_bool_flag(value: &str) -> Option<bool> {
131    match value.trim().to_ascii_lowercase().as_str() {
132        "1" | "true" | "yes" | "on" => Some(true),
133        "0" | "false" | "no" | "off" => Some(false),
134        _ => None,
135    }
136}
137
138/// Helper function to check for parallel segments environment override
139#[cfg(feature = "parallel_segments")]
140fn parallel_env_override() -> Option<bool> {
141    std::env::var("MEMVID_PARALLEL_SEGMENTS")
142        .ok()
143        .and_then(|value| parse_bool_flag(&value))
144}
145
146/// Check if CLIP model files are installed
147#[cfg(feature = "clip")]
148fn is_clip_model_installed() -> bool {
149    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
150        PathBuf::from(custom)
151    } else if let Ok(home) = env::var("HOME") {
152        PathBuf::from(home).join(".memvid/models")
153    } else {
154        PathBuf::from(".memvid/models")
155    };
156
157    let model_name =
158        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
159
160    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
161    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
162
163    vision_model.exists() && text_model.exists()
164}
165
166/// Minimum confidence threshold for NER entities (0.0-1.0)
167/// Note: Lower threshold captures more entities but also more noise
168#[cfg(feature = "logic_mesh")]
169const NER_MIN_CONFIDENCE: f32 = 0.50;
170
171/// Minimum length for entity names (characters)
172#[cfg(feature = "logic_mesh")]
173const NER_MIN_ENTITY_LEN: usize = 2;
174
175/// Maximum gap (in characters) between adjacent entities to merge
176/// Allows merging "S" + "&" + "P Global" when they're close together
177#[cfg(feature = "logic_mesh")]
178const MAX_MERGE_GAP: usize = 3;
179
180/// Merge adjacent NER entities that appear to be tokenization artifacts.
181///
182/// The BERT tokenizer splits names at special characters like `&`, producing
183/// separate entities like "S", "&", "P Global" instead of "S&P Global".
184/// This function merges adjacent entities of the same type when they're
185/// contiguous or nearly contiguous in the original text.
186#[cfg(feature = "logic_mesh")]
187fn merge_adjacent_entities(
188    entities: Vec<memvid_core::ExtractedEntity>,
189    original_text: &str,
190) -> Vec<memvid_core::ExtractedEntity> {
191    use memvid_core::ExtractedEntity;
192
193    if entities.is_empty() {
194        return entities;
195    }
196
197    // Sort by byte position
198    let mut sorted: Vec<ExtractedEntity> = entities;
199    sorted.sort_by_key(|e| e.byte_start);
200
201    let mut merged: Vec<ExtractedEntity> = Vec::new();
202
203    for entity in sorted {
204        if let Some(last) = merged.last_mut() {
205            // Check if this entity is adjacent to the previous one and same type
206            let gap = entity.byte_start.saturating_sub(last.byte_end);
207            let same_type = last.entity_type == entity.entity_type;
208
209            // Check what's in the gap (if any)
210            let gap_is_mergeable = if gap == 0 {
211                true
212            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
213                // Gap content should be whitespace (but not newlines), punctuation, or connectors
214                let gap_text = &original_text[last.byte_end..entity.byte_start];
215                // Don't merge across newlines - that indicates separate entities
216                if gap_text.contains('\n') || gap_text.contains('\r') {
217                    false
218                } else {
219                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
220                }
221            } else {
222                false
223            };
224
225            if same_type && gap_is_mergeable {
226                // Merge: extend the previous entity to include this one
227                let merged_text = if entity.byte_end <= original_text.len() {
228                    original_text[last.byte_start..entity.byte_end].to_string()
229                } else {
230                    format!("{}{}", last.text, entity.text)
231                };
232
233                tracing::debug!(
234                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
235                    last.text, entity.text, merged_text, gap
236                );
237
238                last.text = merged_text;
239                last.byte_end = entity.byte_end;
240                // Average the confidence
241                last.confidence = (last.confidence + entity.confidence) / 2.0;
242                continue;
243            }
244        }
245
246        merged.push(entity);
247    }
248
249    merged
250}
251
252/// Filter and clean extracted NER entities to remove noise.
253/// Returns true if the entity should be kept.
254#[cfg(feature = "logic_mesh")]
255fn is_valid_entity(text: &str, confidence: f32) -> bool {
256    // Filter by confidence
257    if confidence < NER_MIN_CONFIDENCE {
258        return false;
259    }
260
261    let trimmed = text.trim();
262
263    // Filter by minimum length
264    if trimmed.len() < NER_MIN_ENTITY_LEN {
265        return false;
266    }
267
268    // Filter out entities that are just whitespace or punctuation
269    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
270        return false;
271    }
272
273    // Filter out entities containing newlines (malformed extractions)
274    if trimmed.contains('\n') || trimmed.contains('\r') {
275        return false;
276    }
277
278    // Filter out entities that look like partial words (start/end with ##)
279    if trimmed.starts_with("##") || trimmed.ends_with("##") {
280        return false;
281    }
282
283    // Filter out entities ending with period (likely sentence fragments)
284    if trimmed.ends_with('.') {
285        return false;
286    }
287
288    let lower = trimmed.to_lowercase();
289
290    // Filter out common single-letter false positives
291    if trimmed.len() == 1 {
292        return false;
293    }
294
295    // Filter out 2-letter tokens that are common noise
296    if trimmed.len() == 2 {
297        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
298        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
299            return true;
300        }
301        // Filter other 2-letter tokens
302        return false;
303    }
304
305    // Filter out common noise words
306    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
307        return false;
308    }
309
310    // Filter out partial tokenization artifacts
311    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
312    let words: Vec<&str> = trimmed.split_whitespace().collect();
313    if words.len() >= 2 {
314        let first_word = words[0];
315        // If first word is 1-2 chars and looks like a fragment, filter it
316        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
317            // But allow "US Bank", "UK Office", etc.
318            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
319                return false;
320            }
321        }
322    }
323
324    // Filter entities that start with lowercase (likely partial words)
325    if let Some(first_char) = trimmed.chars().next() {
326        if first_char.is_lowercase() {
327            return false;
328        }
329    }
330
331    true
332}
333
334/// Parse key=value pairs for tags
335pub fn parse_key_val(s: &str) -> Result<(String, String)> {
336    let (key, value) = s
337        .split_once('=')
338        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
339    Ok((key.to_string(), value.to_string()))
340}
341
342/// Lock-related CLI arguments (shared across commands)
343#[derive(Args, Clone, Copy, Debug)]
344pub struct LockCliArgs {
345    /// Maximum time to wait for an active writer before failing (ms)
346    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
347    pub lock_timeout: u64,
348    /// Attempt a stale takeover if the recorded writer heartbeat has expired
349    #[arg(long = "force", action = ArgAction::SetTrue)]
350    pub force: bool,
351}
352
353/// Arguments for the `put` subcommand
354#[derive(Args)]
355pub struct PutArgs {
356    /// Path to the memory file to append to
357    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
358    pub file: PathBuf,
359    /// Emit JSON instead of human-readable output
360    #[arg(long)]
361    pub json: bool,
362    /// Read payload bytes from a file instead of stdin
363    #[arg(long, value_name = "PATH")]
364    pub input: Option<String>,
365    /// Override the derived URI for the document
366    #[arg(long, value_name = "URI")]
367    pub uri: Option<String>,
368    /// Override the derived title for the document
369    #[arg(long, value_name = "TITLE")]
370    pub title: Option<String>,
371    /// Optional POSIX timestamp to store on the frame
372    #[arg(long)]
373    pub timestamp: Option<i64>,
374    /// Track name for the frame
375    #[arg(long)]
376    pub track: Option<String>,
377    /// Kind metadata for the frame
378    #[arg(long)]
379    pub kind: Option<String>,
380    /// Store the payload as a binary video without transcoding
381    #[arg(long, action = ArgAction::SetTrue)]
382    pub video: bool,
383    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
384    #[arg(long, action = ArgAction::SetTrue)]
385    pub transcript: bool,
386    /// Tags to attach in key=value form
387    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
388    pub tags: Vec<(String, String)>,
389    /// Free-form labels to attach to the frame
390    #[arg(long = "label", value_name = "LABEL")]
391    pub labels: Vec<String>,
392    /// Additional metadata payloads expressed as JSON
393    #[arg(long = "metadata", value_name = "JSON")]
394    pub metadata: Vec<String>,
395    /// Disable automatic tag generation from extracted text
396    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
397    pub no_auto_tag: bool,
398    /// Disable automatic date extraction from content
399    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
400    pub no_extract_dates: bool,
401    /// Force audio analysis and tagging when ingesting binary data
402    #[arg(long, action = ArgAction::SetTrue)]
403    pub audio: bool,
404    /// Override the default segment duration (in seconds) when generating audio snippets
405    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
406    pub audio_segment_seconds: Option<u32>,
407    /// Compute semantic embeddings using the installed model
408    /// Increases storage overhead from ~5KB to ~270KB per document
409    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
410    pub embedding: bool,
411    /// Explicitly disable embeddings (default, but kept for clarity)
412    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
413    pub no_embedding: bool,
414    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
415    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
416    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
417    pub embedding_vec: Option<PathBuf>,
418    /// Enable Product Quantization compression for vectors (16x compression)
419    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
420    /// Only applies when --embedding is enabled
421    #[arg(long, action = ArgAction::SetTrue)]
422    pub vector_compression: bool,
423    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
424    /// This improves retrieval accuracy for preference/personalization queries
425    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
426    #[arg(long, action = ArgAction::SetTrue)]
427    pub contextual: bool,
428    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
429    #[arg(long = "contextual-model", value_name = "MODEL")]
430    pub contextual_model: Option<String>,
431    /// Automatically extract tables from PDF documents
432    /// Uses aggressive mode with fallbacks for best results
433    #[arg(long, action = ArgAction::SetTrue)]
434    pub tables: bool,
435    /// Disable automatic table extraction (when --tables is the default)
436    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
437    pub no_tables: bool,
438    /// Enable CLIP visual embeddings for images and PDF pages
439    /// Allows text-to-image search using natural language queries
440    /// Auto-enabled when CLIP model is installed; use --no-clip to disable
441    #[cfg(feature = "clip")]
442    #[arg(long, action = ArgAction::SetTrue)]
443    pub clip: bool,
444
445    /// Disable CLIP visual embeddings even when model is available
446    #[cfg(feature = "clip")]
447    #[arg(long, action = ArgAction::SetTrue)]
448    pub no_clip: bool,
449
450    /// Replace any existing frame with the same URI instead of inserting a duplicate
451    #[arg(long, conflicts_with = "allow_duplicate")]
452    pub update_existing: bool,
453    /// Allow inserting a new frame even if a frame with the same URI already exists
454    #[arg(long)]
455    pub allow_duplicate: bool,
456
457    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
458    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
459    /// Requires the temporal_enrich feature in memvid-core
460    #[arg(long, action = ArgAction::SetTrue)]
461    pub temporal_enrich: bool,
462
463    /// Build Logic-Mesh entity graph during ingestion
464    /// Extracts entities (people, organizations, locations, etc.) and their relationships
465    /// Enables graph traversal with `memvid follow`
466    /// Auto-enabled when NER model is installed; use --no-logic-mesh to disable
467    #[arg(long, action = ArgAction::SetTrue)]
468    pub logic_mesh: bool,
469
470    /// Disable Logic-Mesh even when NER model is available
471    #[arg(long, action = ArgAction::SetTrue)]
472    pub no_logic_mesh: bool,
473
474    /// Use the experimental parallel segment builder (requires --features parallel_segments)
475    #[cfg(feature = "parallel_segments")]
476    #[arg(long, action = ArgAction::SetTrue)]
477    pub parallel_segments: bool,
478    /// Force the legacy ingestion path even when the parallel feature is available
479    #[cfg(feature = "parallel_segments")]
480    #[arg(long, action = ArgAction::SetTrue)]
481    pub no_parallel_segments: bool,
482    /// Target tokens per segment when using the parallel builder
483    #[cfg(feature = "parallel_segments")]
484    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
485    pub parallel_segment_tokens: Option<usize>,
486    /// Target pages per segment when using the parallel builder
487    #[cfg(feature = "parallel_segments")]
488    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
489    pub parallel_segment_pages: Option<usize>,
490    /// Worker threads used by the parallel builder
491    #[cfg(feature = "parallel_segments")]
492    #[arg(long = "parallel-threads", value_name = "N")]
493    pub parallel_threads: Option<usize>,
494    /// Worker queue depth used by the parallel builder
495    #[cfg(feature = "parallel_segments")]
496    #[arg(long = "parallel-queue-depth", value_name = "N")]
497    pub parallel_queue_depth: Option<usize>,
498
499    #[command(flatten)]
500    pub lock: LockCliArgs,
501}
502
503#[cfg(feature = "parallel_segments")]
504impl PutArgs {
505    pub fn wants_parallel(&self) -> bool {
506        if self.parallel_segments {
507            return true;
508        }
509        if self.no_parallel_segments {
510            return false;
511        }
512        if let Some(env_flag) = parallel_env_override() {
513            return env_flag;
514        }
515        false
516    }
517
518    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
519        let mut opts = memvid_core::BuildOpts::default();
520        if let Some(tokens) = self.parallel_segment_tokens {
521            opts.segment_tokens = tokens;
522        }
523        if let Some(pages) = self.parallel_segment_pages {
524            opts.segment_pages = pages;
525        }
526        if let Some(threads) = self.parallel_threads {
527            opts.threads = threads;
528        }
529        if let Some(depth) = self.parallel_queue_depth {
530            opts.queue_depth = depth;
531        }
532        opts.sanitize();
533        opts
534    }
535}
536
537#[cfg(not(feature = "parallel_segments"))]
538impl PutArgs {
539    pub fn wants_parallel(&self) -> bool {
540        false
541    }
542}
543
544/// Arguments for the `api-fetch` subcommand
545#[derive(Args)]
546pub struct ApiFetchArgs {
547    /// Path to the memory file to ingest into
548    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
549    pub file: PathBuf,
550    /// Path to the fetch configuration JSON file
551    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
552    pub config: PathBuf,
553    /// Perform a dry run without writing to the memory
554    #[arg(long, action = ArgAction::SetTrue)]
555    pub dry_run: bool,
556    /// Override the configured ingest mode
557    #[arg(long, value_name = "MODE")]
558    pub mode: Option<ApiFetchMode>,
559    /// Override the base URI used when constructing frame URIs
560    #[arg(long, value_name = "URI")]
561    pub uri: Option<String>,
562    /// Emit JSON instead of human-readable output
563    #[arg(long, action = ArgAction::SetTrue)]
564    pub json: bool,
565
566    #[command(flatten)]
567    pub lock: LockCliArgs,
568}
569
570/// Arguments for the `update` subcommand
571#[derive(Args)]
572pub struct UpdateArgs {
573    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
574    pub file: PathBuf,
575    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
576    pub frame_id: Option<u64>,
577    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
578    pub uri: Option<String>,
579    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
580    pub input: Option<PathBuf>,
581    #[arg(long = "set-uri", value_name = "URI")]
582    pub set_uri: Option<String>,
583    #[arg(long, value_name = "TITLE")]
584    pub title: Option<String>,
585    #[arg(long, value_name = "TIMESTAMP")]
586    pub timestamp: Option<i64>,
587    #[arg(long, value_name = "TRACK")]
588    pub track: Option<String>,
589    #[arg(long, value_name = "KIND")]
590    pub kind: Option<String>,
591    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
592    pub tags: Vec<(String, String)>,
593    #[arg(long = "label", value_name = "LABEL")]
594    pub labels: Vec<String>,
595    #[arg(long = "metadata", value_name = "JSON")]
596    pub metadata: Vec<String>,
597    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
598    pub embeddings: bool,
599    #[arg(long)]
600    pub json: bool,
601
602    #[command(flatten)]
603    pub lock: LockCliArgs,
604}
605
606/// Arguments for the `delete` subcommand
607#[derive(Args)]
608pub struct DeleteArgs {
609    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
610    pub file: PathBuf,
611    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
612    pub frame_id: Option<u64>,
613    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
614    pub uri: Option<String>,
615    #[arg(long, action = ArgAction::SetTrue)]
616    pub yes: bool,
617    #[arg(long)]
618    pub json: bool,
619
620    #[command(flatten)]
621    pub lock: LockCliArgs,
622}
623
624/// Handler for `memvid api-fetch`
625pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
626    let command = ApiFetchCommand {
627        file: args.file,
628        config_path: args.config,
629        dry_run: args.dry_run,
630        mode_override: args.mode,
631        uri_override: args.uri,
632        output_json: args.json,
633        lock_timeout_ms: args.lock.lock_timeout,
634        force_lock: args.lock.force,
635    };
636    run_api_fetch(config, command)
637}
638
639/// Handler for `memvid delete`
640pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
641    let mut mem = Memvid::open(&args.file)?;
642    ensure_cli_mutation_allowed(&mem)?;
643    apply_lock_cli(&mut mem, &args.lock);
644    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
645
646    if !args.yes {
647        if let Some(uri) = &frame.uri {
648            println!("About to delete frame {} ({uri})", frame.id);
649        } else {
650            println!("About to delete frame {}", frame.id);
651        }
652        print!("Confirm? [y/N]: ");
653        io::stdout().flush()?;
654        let mut input = String::new();
655        io::stdin().read_line(&mut input)?;
656        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
657        if !confirmed {
658            println!("Aborted");
659            return Ok(());
660        }
661    }
662
663    let seq = mem.delete_frame(frame.id)?;
664    mem.commit()?;
665    let deleted = mem.frame_by_id(frame.id)?;
666
667    if args.json {
668        let json = json!({
669            "frame_id": frame.id,
670            "sequence": seq,
671            "status": frame_status_str(deleted.status),
672        });
673        println!("{}", serde_json::to_string_pretty(&json)?);
674    } else {
675        println!("Deleted frame {} (seq {})", frame.id, seq);
676    }
677
678    Ok(())
679}
680pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
681    let mut mem = Memvid::open(&args.file)?;
682    ensure_cli_mutation_allowed(&mem)?;
683    apply_lock_cli(&mut mem, &args.lock);
684    let mut stats = mem.stats()?;
685    if stats.frame_count == 0 && !stats.has_lex_index {
686        mem.enable_lex()?;
687        stats = mem.stats()?;
688    }
689
690    // Set vector compression mode if requested
691    if args.vector_compression {
692        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
693    }
694
695    let mut capacity_guard = CapacityGuard::from_stats(&stats);
696    let use_parallel = args.wants_parallel();
697    #[cfg(feature = "parallel_segments")]
698    let mut parallel_opts = if use_parallel {
699        Some(args.sanitized_parallel_opts())
700    } else {
701        None
702    };
703    #[cfg(feature = "parallel_segments")]
704    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
705    #[cfg(feature = "parallel_segments")]
706    let mut pending_parallel_indices: Vec<usize> = Vec::new();
707    let input_set = resolve_inputs(args.input.as_deref())?;
708
709    if args.video {
710        if let Some(kind) = &args.kind {
711            if !kind.eq_ignore_ascii_case("video") {
712                anyhow::bail!("--video conflicts with explicit --kind={kind}");
713            }
714        }
715        if matches!(input_set, InputSet::Stdin) {
716            anyhow::bail!("--video requires --input <file>");
717        }
718    }
719
720    #[allow(dead_code)]
721    enum EmbeddingMode {
722        None,
723        Auto,
724        Explicit,
725    }
726
727    // PHASE 1: Make embeddings opt-in, not opt-out
728    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
729    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
730    let mv2_dimension = mem.vec_index_dimension();
731    let (embedding_mode, embedding_runtime) = if args.embedding {
732        (
733            EmbeddingMode::Explicit,
734            Some(load_embedding_runtime_for_mv2(config, None, mv2_dimension)?),
735        )
736    } else {
737        (EmbeddingMode::None, None)
738    };
739    let embedding_enabled = embedding_runtime.is_some();
740    let runtime_ref = embedding_runtime.as_ref();
741
742    // Enable CLIP index if:
743    // 1. --clip flag is set explicitly, OR
744    // 2. Input contains image files and model is available, OR
745    // 3. Model is installed (auto-detect) and --no-clip is not set
746    #[cfg(feature = "clip")]
747    let clip_enabled = {
748        if args.no_clip {
749            false
750        } else if args.clip {
751            true // Explicitly requested
752        } else {
753            // Auto-detect: check if model is installed
754            let model_available = is_clip_model_installed();
755            if model_available {
756                // Model is installed, check if input has images or PDFs
757                match &input_set {
758                    InputSet::Files(paths) => paths.iter().any(|p| {
759                        is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
760                    }),
761                    InputSet::Stdin => false,
762                }
763            } else {
764                false
765            }
766        }
767    };
768    #[cfg(feature = "clip")]
769    let clip_model = if clip_enabled {
770        mem.enable_clip()?;
771        if !args.json {
772            let mode = if args.clip {
773                "explicit"
774            } else {
775                "auto-detected"
776            };
777            eprintln!("ℹ️  CLIP visual embeddings enabled ({mode})");
778            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
779            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
780            eprintln!("   Use --no-clip to disable auto-detection");
781            eprintln!();
782        }
783        // Initialize CLIP model for generating image embeddings
784        match memvid_core::ClipModel::default_model() {
785            Ok(model) => Some(model),
786            Err(e) => {
787                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
788                None
789            }
790        }
791    } else {
792        None
793    };
794
795    // Warn user about vector storage overhead (PHASE 2)
796    if embedding_enabled && !args.json {
797        let capacity = stats.capacity_bytes;
798        if args.vector_compression {
799            // PHASE 3: PQ compression reduces storage 16x
800            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
801            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
802            eprintln!("   Storage: ~20 KB per document (16x compressed)");
803            eprintln!("   Accuracy: ~95% recall@10 maintained");
804            eprintln!(
805                "   Capacity: ~{} documents max for {:.1} GB",
806                max_docs,
807                capacity as f64 / 1e9
808            );
809            eprintln!();
810        } else {
811            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
812            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
813            eprintln!("   Storage: ~270 KB per document");
814            eprintln!(
815                "   Capacity: ~{} documents max for {:.1} GB",
816                max_docs,
817                capacity as f64 / 1e9
818            );
819            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
820            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
821            eprintln!();
822        }
823    }
824
825    let embed_progress = if embedding_enabled {
826        let total = match &input_set {
827            InputSet::Files(paths) => Some(paths.len() as u64),
828            InputSet::Stdin => None,
829        };
830        Some(create_task_progress_bar(total, "embed", false))
831    } else {
832        None
833    };
834    let mut embedded_docs = 0usize;
835
836    if let InputSet::Files(paths) = &input_set {
837        if paths.len() > 1 {
838            if args.uri.is_some() || args.title.is_some() {
839                anyhow::bail!(
840                    "--uri/--title apply to a single file; omit them when using a directory or glob"
841                );
842            }
843        }
844    }
845
846    let mut reports = Vec::new();
847    let mut processed = 0usize;
848    let mut skipped = 0usize;
849    let mut bytes_added = 0u64;
850    let mut summary_warnings: Vec<String> = Vec::new();
851    #[cfg(feature = "clip")]
852    let mut clip_embeddings_added = false;
853
854    let mut capacity_reached = false;
855    match input_set {
856        InputSet::Stdin => {
857            processed += 1;
858            match read_payload(None) {
859                Ok(payload) => {
860                    let mut analyze_spinner = if args.json {
861                        None
862                    } else {
863                        Some(create_spinner("Analyzing stdin payload..."))
864                    };
865                    match ingest_payload(
866                        &mut mem,
867                        &args,
868                        payload,
869                        None,
870                        capacity_guard.as_mut(),
871                        embedding_enabled,
872                        runtime_ref,
873                        use_parallel,
874                    ) {
875                        Ok(outcome) => {
876                            if let Some(pb) = analyze_spinner.take() {
877                                pb.finish_and_clear();
878                            }
879                            bytes_added += outcome.report.stored_bytes as u64;
880                            if args.json {
881                                summary_warnings
882                                    .extend(outcome.report.extraction.warnings.iter().cloned());
883                            }
884                            reports.push(outcome.report);
885                            let report_index = reports.len() - 1;
886                            if !args.json && !use_parallel {
887                                if let Some(report) = reports.get(report_index) {
888                                    print_report(report);
889                                }
890                            }
891                            if args.transcript {
892                                let notice = transcript_notice_message(&mut mem)?;
893                                if args.json {
894                                    summary_warnings.push(notice);
895                                } else {
896                                    println!("{notice}");
897                                }
898                            }
899                            if outcome.embedded {
900                                if let Some(pb) = embed_progress.as_ref() {
901                                    pb.inc(1);
902                                }
903                                embedded_docs += 1;
904                            }
905                            if use_parallel {
906                                #[cfg(feature = "parallel_segments")]
907                                if let Some(input) = outcome.parallel_input {
908                                    pending_parallel_indices.push(report_index);
909                                    pending_parallel_inputs.push(input);
910                                }
911                            } else if let Some(guard) = capacity_guard.as_mut() {
912                                mem.commit()?;
913                                let stats = mem.stats()?;
914                                guard.update_after_commit(&stats);
915                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
916                            }
917                        }
918                        Err(err) => {
919                            if let Some(pb) = analyze_spinner.take() {
920                                pb.finish_and_clear();
921                            }
922                            if err.downcast_ref::<DuplicateUriError>().is_some() {
923                                return Err(err);
924                            }
925                            warn!("skipped stdin payload: {err}");
926                            skipped += 1;
927                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
928                                capacity_reached = true;
929                            }
930                        }
931                    }
932                }
933                Err(err) => {
934                    warn!("failed to read stdin: {err}");
935                    skipped += 1;
936                }
937            }
938        }
939        InputSet::Files(ref paths) => {
940            let mut transcript_notice_printed = false;
941            for path in paths {
942                processed += 1;
943                match read_payload(Some(&path)) {
944                    Ok(payload) => {
945                        let label = path.display().to_string();
946                        let mut analyze_spinner = if args.json {
947                            None
948                        } else {
949                            Some(create_spinner(&format!("Analyzing {label}...")))
950                        };
951                        match ingest_payload(
952                            &mut mem,
953                            &args,
954                            payload,
955                            Some(&path),
956                            capacity_guard.as_mut(),
957                            embedding_enabled,
958                            runtime_ref,
959                            use_parallel,
960                        ) {
961                            Ok(outcome) => {
962                                if let Some(pb) = analyze_spinner.take() {
963                                    pb.finish_and_clear();
964                                }
965                                bytes_added += outcome.report.stored_bytes as u64;
966
967                                // Generate CLIP embedding for image files
968                                #[cfg(feature = "clip")]
969                                if let Some(ref model) = clip_model {
970                                    let mime = outcome.report.mime.as_deref();
971                                    let clip_frame_id = outcome.report.frame_id;
972
973                                    if is_image_file(&path) {
974                                        match model.encode_image_file(&path) {
975                                            Ok(embedding) => {
976                                                if let Err(e) = mem.add_clip_embedding_with_page(
977                                                    clip_frame_id,
978                                                    None,
979                                                    embedding,
980                                                ) {
981                                                    warn!(
982                                                        "Failed to add CLIP embedding for {}: {e}",
983                                                        path.display()
984                                                    );
985                                                } else {
986                                                    info!(
987                                                        "Added CLIP embedding for frame {}",
988                                                        clip_frame_id
989                                                    );
990                                                    clip_embeddings_added = true;
991                                                }
992                                            }
993                                            Err(e) => {
994                                                warn!(
995                                                    "Failed to encode CLIP embedding for {}: {e}",
996                                                    path.display()
997                                                );
998                                            }
999                                        }
1000                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1001                                        // Commit before adding child frames to ensure WAL has space
1002                                        if let Err(e) = mem.commit() {
1003                                            warn!("Failed to commit before CLIP extraction: {e}");
1004                                        }
1005
1006                                        match render_pdf_pages_for_clip(
1007                                            &path,
1008                                            CLIP_PDF_MAX_IMAGES,
1009                                            CLIP_PDF_TARGET_PX,
1010                                        ) {
1011                                            Ok(rendered_pages) => {
1012                                                use rayon::prelude::*;
1013
1014                                                // Pre-encode all images to PNG in parallel for better performance
1015                                                // Filter out junk images (solid colors, black, too small)
1016                                                let path_for_closure = path.clone();
1017
1018                                                // Temporarily suppress panic output during image encoding
1019                                                // (some PDFs have malformed images that cause panics in the image crate)
1020                                                let prev_hook = std::panic::take_hook();
1021                                                std::panic::set_hook(Box::new(|_| {
1022                                                    // Silently ignore panics - we handle them with catch_unwind
1023                                                }));
1024
1025                                                let encoded_images: Vec<_> = rendered_pages
1026                                                    .into_par_iter()
1027                                                    .filter_map(|(page_num, image)| {
1028                                                        // Check if image is worth embedding (not junk)
1029                                                        let image_info = get_image_info(&image);
1030                                                        if !image_info.should_embed() {
1031                                                            info!(
1032                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1033                                                                path_for_closure.display(),
1034                                                                page_num,
1035                                                                image_info.color_variance,
1036                                                                image_info.width,
1037                                                                image_info.height
1038                                                            );
1039                                                            return None;
1040                                                        }
1041
1042                                                        // Convert to RGB8 first to ensure consistent buffer size
1043                                                        // (some PDFs have images with alpha or other formats that cause panics)
1044                                                        // Use catch_unwind to handle any panics from corrupted image data
1045                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1046                                                            let rgb_image = image.to_rgb8();
1047                                                            let mut png_bytes = Vec::new();
1048                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1049                                                                &mut Cursor::new(&mut png_bytes),
1050                                                                image::ImageFormat::Png,
1051                                                            ).map(|()| png_bytes)
1052                                                        }));
1053
1054                                                        match encode_result {
1055                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1056                                                            Ok(Err(e)) => {
1057                                                                info!(
1058                                                                    "Skipping image for {} page {}: {e}",
1059                                                                    path_for_closure.display(),
1060                                                                    page_num
1061                                                                );
1062                                                                None
1063                                                            }
1064                                                            Err(_) => {
1065                                                                // Panic occurred - corrupted image data, silently skip
1066                                                                info!(
1067                                                                    "Skipping malformed image for {} page {}",
1068                                                                    path_for_closure.display(),
1069                                                                    page_num
1070                                                                );
1071                                                                None
1072                                                            }
1073                                                        }
1074                                                    })
1075                                                    .collect();
1076
1077                                                // Restore the original panic hook
1078                                                std::panic::set_hook(prev_hook);
1079
1080                                                // Process encoded images sequentially (storage + CLIP encoding)
1081                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1082                                                let mut child_frame_offset = 1u64;
1083                                                let parent_uri = outcome.report.uri.clone();
1084                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1085                                                let total_images = encoded_images.len();
1086
1087                                                // Show progress bar for CLIP extraction
1088                                                let clip_pb = if !args.json && total_images > 0 {
1089                                                    let pb = ProgressBar::new(total_images as u64);
1090                                                    pb.set_style(
1091                                                        ProgressStyle::with_template(
1092                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1093                                                        )
1094                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1095                                                        .progress_chars("█▓░"),
1096                                                    );
1097                                                    Some(pb)
1098                                                } else {
1099                                                    None
1100                                                };
1101
1102                                                for (page_num, image, png_bytes) in encoded_images {
1103                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1104                                                    let child_title = format!(
1105                                                        "{} - Image {} (page {})",
1106                                                        parent_title,
1107                                                        child_frame_offset,
1108                                                        page_num
1109                                                    );
1110
1111                                                    let child_options = PutOptions::builder()
1112                                                        .uri(&child_uri)
1113                                                        .title(&child_title)
1114                                                        .parent_id(clip_frame_id)
1115                                                        .role(FrameRole::ExtractedImage)
1116                                                        .auto_tag(false)
1117                                                        .extract_dates(false)
1118                                                        .build();
1119
1120                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1121                                                        Ok(_child_seq) => {
1122                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1123                                                            child_frame_offset += 1;
1124
1125                                                            match model.encode_image(&image) {
1126                                                                Ok(embedding) => {
1127                                                                    if let Err(e) = mem
1128                                                                        .add_clip_embedding_with_page(
1129                                                                            child_frame_id,
1130                                                                            Some(page_num),
1131                                                                            embedding,
1132                                                                        )
1133                                                                    {
1134                                                                        warn!(
1135                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1136                                                                            path.display(),
1137                                                                            page_num
1138                                                                        );
1139                                                                    } else {
1140                                                                        info!(
1141                                                                            "Added CLIP image frame {} for {} page {}",
1142                                                                            child_frame_id,
1143                                                                            clip_frame_id,
1144                                                                            page_num
1145                                                                        );
1146                                                                        clip_embeddings_added = true;
1147                                                                    }
1148                                                                }
1149                                                                Err(e) => warn!(
1150                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1151                                                                    path.display(),
1152                                                                    page_num
1153                                                                ),
1154                                                            }
1155
1156                                                            // Commit after each image to checkpoint WAL
1157                                                            if let Err(e) = mem.commit() {
1158                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1159                                                            }
1160                                                        }
1161                                                        Err(e) => warn!(
1162                                                            "Failed to store image frame for {} page {}: {e}",
1163                                                            path.display(),
1164                                                            page_num
1165                                                        ),
1166                                                    }
1167
1168                                                    if let Some(ref pb) = clip_pb {
1169                                                        pb.inc(1);
1170                                                    }
1171                                                }
1172
1173                                                if let Some(pb) = clip_pb {
1174                                                    pb.finish_and_clear();
1175                                                }
1176                                            }
1177                                            Err(err) => warn!(
1178                                                "Failed to render PDF pages for CLIP {}: {err}",
1179                                                path.display()
1180                                            ),
1181                                        }
1182                                    }
1183                                }
1184
1185                                if args.json {
1186                                    summary_warnings
1187                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1188                                }
1189                                reports.push(outcome.report);
1190                                let report_index = reports.len() - 1;
1191                                if !args.json && !use_parallel {
1192                                    if let Some(report) = reports.get(report_index) {
1193                                        print_report(report);
1194                                    }
1195                                }
1196                                if args.transcript && !transcript_notice_printed {
1197                                    let notice = transcript_notice_message(&mut mem)?;
1198                                    if args.json {
1199                                        summary_warnings.push(notice);
1200                                    } else {
1201                                        println!("{notice}");
1202                                    }
1203                                    transcript_notice_printed = true;
1204                                }
1205                                if outcome.embedded {
1206                                    if let Some(pb) = embed_progress.as_ref() {
1207                                        pb.inc(1);
1208                                    }
1209                                    embedded_docs += 1;
1210                                }
1211                                if use_parallel {
1212                                    #[cfg(feature = "parallel_segments")]
1213                                    if let Some(input) = outcome.parallel_input {
1214                                        pending_parallel_indices.push(report_index);
1215                                        pending_parallel_inputs.push(input);
1216                                    }
1217                                } else if let Some(guard) = capacity_guard.as_mut() {
1218                                    mem.commit()?;
1219                                    let stats = mem.stats()?;
1220                                    guard.update_after_commit(&stats);
1221                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1222                                }
1223                            }
1224                            Err(err) => {
1225                                if let Some(pb) = analyze_spinner.take() {
1226                                    pb.finish_and_clear();
1227                                }
1228                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1229                                    return Err(err);
1230                                }
1231                                warn!("skipped {}: {err}", path.display());
1232                                skipped += 1;
1233                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1234                                    capacity_reached = true;
1235                                }
1236                            }
1237                        }
1238                    }
1239                    Err(err) => {
1240                        warn!("failed to read {}: {err}", path.display());
1241                        skipped += 1;
1242                    }
1243                }
1244                if capacity_reached {
1245                    break;
1246                }
1247            }
1248        }
1249    }
1250
1251    if capacity_reached {
1252        warn!("capacity limit reached; remaining inputs were not processed");
1253    }
1254
1255    if let Some(pb) = embed_progress.as_ref() {
1256        pb.finish_with_message("embed");
1257    }
1258
1259    if let Some(runtime) = embedding_runtime.as_ref() {
1260        if embedded_docs > 0 {
1261            match embedding_mode {
1262                EmbeddingMode::Explicit => println!(
1263                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1264                    embedded_docs,
1265                    runtime.dimension()
1266                ),
1267                EmbeddingMode::Auto => println!(
1268                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1269                    embedded_docs,
1270                    runtime.dimension()
1271                ),
1272                EmbeddingMode::None => {}
1273            }
1274        } else {
1275            match embedding_mode {
1276                EmbeddingMode::Explicit => {
1277                    println!("No embeddings were generated (no textual content available)");
1278                }
1279                EmbeddingMode::Auto => {
1280                    println!(
1281                        "Semantic runtime available, but no textual content produced embeddings"
1282                    );
1283                }
1284                EmbeddingMode::None => {}
1285            }
1286        }
1287    }
1288
1289    if reports.is_empty() {
1290        if args.json {
1291            if capacity_reached {
1292                summary_warnings.push("capacity_limit_reached".to_string());
1293            }
1294            let summary_json = json!({
1295                "processed": processed,
1296                "ingested": 0,
1297                "skipped": skipped,
1298                "bytes_added": bytes_added,
1299                "bytes_added_human": format_bytes(bytes_added),
1300                "embedded_documents": embedded_docs,
1301                "capacity_reached": capacity_reached,
1302                "warnings": summary_warnings,
1303                "reports": Vec::<serde_json::Value>::new(),
1304            });
1305            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1306        } else {
1307            println!(
1308                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1309                processed, skipped
1310            );
1311        }
1312        return Ok(());
1313    }
1314
1315    #[cfg(feature = "parallel_segments")]
1316    let mut parallel_flush_spinner = if use_parallel && !args.json {
1317        Some(create_spinner("Flushing parallel segments..."))
1318    } else {
1319        None
1320    };
1321
1322    if use_parallel {
1323        #[cfg(feature = "parallel_segments")]
1324        {
1325            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1326                let mut opts = BuildOpts::default();
1327                opts.sanitize();
1328                opts
1329            });
1330            // Set vector compression mode from mem state
1331            opts.vec_compression = mem.vector_compression().clone();
1332            let seqs = if pending_parallel_inputs.is_empty() {
1333                mem.commit_parallel(opts)?;
1334                Vec::new()
1335            } else {
1336                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1337            };
1338            if let Some(pb) = parallel_flush_spinner.take() {
1339                pb.finish_with_message("Flushed parallel segments");
1340            }
1341            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1342                if let Some(report) = reports.get_mut(idx) {
1343                    report.seq = seq;
1344                }
1345            }
1346        }
1347        #[cfg(not(feature = "parallel_segments"))]
1348        {
1349            mem.commit()?;
1350        }
1351    } else {
1352        mem.commit()?;
1353    }
1354
1355    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1356    #[cfg(feature = "clip")]
1357    if clip_embeddings_added {
1358        mem.commit()?;
1359    }
1360
1361    if !args.json && use_parallel {
1362        for report in &reports {
1363            print_report(report);
1364        }
1365    }
1366
1367    // Table extraction for PDF files
1368    let mut tables_extracted = 0usize;
1369    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1370    if args.tables && !args.no_tables {
1371        if let InputSet::Files(ref paths) = input_set {
1372            let pdf_paths: Vec<_> = paths
1373                .iter()
1374                .filter(|p| {
1375                    p.extension()
1376                        .and_then(|e| e.to_str())
1377                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1378                        .unwrap_or(false)
1379                })
1380                .collect();
1381
1382            if !pdf_paths.is_empty() {
1383                let table_spinner = if args.json {
1384                    None
1385                } else {
1386                    Some(create_spinner(&format!(
1387                        "Extracting tables from {} PDF(s)...",
1388                        pdf_paths.len()
1389                    )))
1390                };
1391
1392                // Use aggressive mode with auto fallback for best results
1393                let table_options = TableExtractionOptions::builder()
1394                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1395                    .min_rows(2)
1396                    .min_cols(2)
1397                    .min_quality(memvid_core::table::TableQuality::Low)
1398                    .merge_multi_page(true)
1399                    .build();
1400
1401                for pdf_path in pdf_paths {
1402                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1403                        let filename = pdf_path
1404                            .file_name()
1405                            .and_then(|s| s.to_str())
1406                            .unwrap_or("unknown.pdf");
1407
1408                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1409                            for table in &result.tables {
1410                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1411                                {
1412                                    tables_extracted += 1;
1413                                    table_summaries.push(json!({
1414                                        "table_id": table.table_id,
1415                                        "source_file": table.source_file,
1416                                        "meta_frame_id": meta_id,
1417                                        "rows": table.n_rows,
1418                                        "cols": table.n_cols,
1419                                        "quality": format!("{:?}", table.quality),
1420                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1421                                    }));
1422                                }
1423                            }
1424                        }
1425                    }
1426                }
1427
1428                if let Some(pb) = table_spinner {
1429                    if tables_extracted > 0 {
1430                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1431                    } else {
1432                        pb.finish_with_message("No tables found");
1433                    }
1434                }
1435
1436                // Commit table frames
1437                if tables_extracted > 0 {
1438                    mem.commit()?;
1439                }
1440            }
1441        }
1442    }
1443
1444    // Logic-Mesh: Extract entities and build relationship graph
1445    // Auto-enabled when NER model is installed, unless --no-logic-mesh is set
1446    #[cfg(feature = "logic_mesh")]
1447    let mut logic_mesh_entities = 0usize;
1448    #[cfg(feature = "logic_mesh")]
1449    {
1450        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1451
1452        let models_dir = config.models_dir.clone();
1453        let model_path = ner_model_path(&models_dir);
1454        let tokenizer_path = ner_tokenizer_path(&models_dir);
1455
1456        // Auto-detect: enable Logic-Mesh if model exists, unless explicitly disabled
1457        let ner_available = model_path.exists() && tokenizer_path.exists();
1458        let should_run_ner = if args.no_logic_mesh {
1459            false
1460        } else if args.logic_mesh {
1461            true  // Explicitly requested
1462        } else {
1463            ner_available  // Auto-detect
1464        };
1465
1466        if should_run_ner && !ner_available {
1467            // User explicitly requested --logic-mesh but model not installed
1468            if args.json {
1469                summary_warnings.push(
1470                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1471                );
1472            } else {
1473                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1474                eprintln!("   Run: memvid models install --ner distilbert-ner");
1475            }
1476        } else if should_run_ner {
1477            let ner_spinner = if args.json {
1478                None
1479            } else {
1480                Some(create_spinner("Building Logic-Mesh entity graph..."))
1481            };
1482
1483            match NerModel::load(&model_path, &tokenizer_path, None) {
1484                Ok(mut ner_model) => {
1485                    let mut mesh = LogicMesh::new();
1486                    let mut filtered_count = 0usize;
1487
1488                    // Process each ingested frame for entity extraction using cached search_text
1489                    for report in &reports {
1490                        let frame_id = report.frame_id;
1491                        // Use the cached search_text from ingestion
1492                        if let Some(ref content) = report.search_text {
1493                            if !content.is_empty() {
1494                                match ner_model.extract(content) {
1495                                    Ok(raw_entities) => {
1496                                        // Merge adjacent entities that were split by tokenization
1497                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1498                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1499
1500                                        for entity in &merged_entities {
1501                                            // Apply post-processing filter to remove noisy entities
1502                                            if !is_valid_entity(&entity.text, entity.confidence) {
1503                                                tracing::debug!(
1504                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1505                                                    entity.text, entity.confidence * 100.0
1506                                                );
1507                                                filtered_count += 1;
1508                                                continue;
1509                                            }
1510
1511                                            // Convert NER entity type to EntityKind
1512                                            let kind = entity.to_entity_kind();
1513                                            // Create mesh node with proper structure
1514                                            let node = MeshNode::new(
1515                                                entity.text.to_lowercase(),
1516                                                entity.text.trim().to_string(),
1517                                                kind,
1518                                                entity.confidence,
1519                                                frame_id,
1520                                                entity.byte_start as u32,
1521                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1522                                            );
1523                                            mesh.merge_node(node);
1524                                            logic_mesh_entities += 1;
1525                                        }
1526                                    }
1527                                    Err(e) => {
1528                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1529                                    }
1530                                }
1531                            }
1532                        }
1533                    }
1534
1535                    if logic_mesh_entities > 0 {
1536                        mesh.finalize();
1537                        let node_count = mesh.stats().node_count;
1538                        // Persist mesh to MV2 file
1539                        mem.set_logic_mesh(mesh);
1540                        mem.commit()?;
1541                        info!(
1542                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1543                            logic_mesh_entities,
1544                            node_count,
1545                            filtered_count
1546                        );
1547                    }
1548
1549                    if let Some(pb) = ner_spinner {
1550                        pb.finish_with_message(format!(
1551                            "Logic-Mesh: {} entities ({} unique)",
1552                            logic_mesh_entities,
1553                            mem.mesh_node_count()
1554                        ));
1555                    }
1556                }
1557                Err(e) => {
1558                    if let Some(pb) = ner_spinner {
1559                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1560                    }
1561                    if args.json {
1562                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1563                    }
1564                }
1565            }
1566        }
1567    }
1568
1569    let stats = mem.stats()?;
1570    if args.json {
1571        if capacity_reached {
1572            summary_warnings.push("capacity_limit_reached".to_string());
1573        }
1574        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1575        let total_duration_ms: Option<u64> = {
1576            let sum: u64 = reports
1577                .iter()
1578                .filter_map(|r| r.extraction.duration_ms)
1579                .sum();
1580            if sum > 0 {
1581                Some(sum)
1582            } else {
1583                None
1584            }
1585        };
1586        let total_pages: Option<u32> = {
1587            let sum: u32 = reports
1588                .iter()
1589                .filter_map(|r| r.extraction.pages_processed)
1590                .sum();
1591            if sum > 0 {
1592                Some(sum)
1593            } else {
1594                None
1595            }
1596        };
1597        let embedding_mode_str = match embedding_mode {
1598            EmbeddingMode::None => "none",
1599            EmbeddingMode::Auto => "auto",
1600            EmbeddingMode::Explicit => "explicit",
1601        };
1602        let summary_json = json!({
1603            "processed": processed,
1604            "ingested": reports.len(),
1605            "skipped": skipped,
1606            "bytes_added": bytes_added,
1607            "bytes_added_human": format_bytes(bytes_added),
1608            "embedded_documents": embedded_docs,
1609            "embedding": {
1610                "enabled": embedding_enabled,
1611                "mode": embedding_mode_str,
1612                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1613            },
1614            "tables": {
1615                "enabled": args.tables && !args.no_tables,
1616                "extracted": tables_extracted,
1617                "tables": table_summaries,
1618            },
1619            "capacity_reached": capacity_reached,
1620            "warnings": summary_warnings,
1621            "extraction": {
1622                "total_duration_ms": total_duration_ms,
1623                "total_pages_processed": total_pages,
1624            },
1625            "memory": {
1626                "path": args.file.display().to_string(),
1627                "frame_count": stats.frame_count,
1628                "size_bytes": stats.size_bytes,
1629                "tier": format!("{:?}", stats.tier),
1630            },
1631            "reports": reports_json,
1632        });
1633        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1634    } else {
1635        println!(
1636            "Committed {} frame(s); total frames {}",
1637            reports.len(),
1638            stats.frame_count
1639        );
1640        println!(
1641            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1642            processed,
1643            reports.len(),
1644            skipped,
1645            format_bytes(bytes_added)
1646        );
1647        if tables_extracted > 0 {
1648            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1649        }
1650    }
1651    Ok(())
1652}
1653
1654pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1655    let mut mem = Memvid::open(&args.file)?;
1656    ensure_cli_mutation_allowed(&mem)?;
1657    apply_lock_cli(&mut mem, &args.lock);
1658    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1659    let frame_id = existing.id;
1660
1661    let payload_bytes = match args.input.as_ref() {
1662        Some(path) => Some(read_payload(Some(path))?),
1663        None => None,
1664    };
1665    let payload_slice = payload_bytes.as_deref();
1666    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1667    let source_path = args.input.as_deref();
1668
1669    let mut options = PutOptions::default();
1670    options.enable_embedding = false;
1671    options.auto_tag = payload_slice.is_some();
1672    options.extract_dates = payload_slice.is_some();
1673    options.timestamp = Some(existing.timestamp);
1674    options.track = existing.track.clone();
1675    options.kind = existing.kind.clone();
1676    options.uri = existing.uri.clone();
1677    options.title = existing.title.clone();
1678    options.metadata = existing.metadata.clone();
1679    options.search_text = existing.search_text.clone();
1680    options.tags = existing.tags.clone();
1681    options.labels = existing.labels.clone();
1682    options.extra_metadata = existing.extra_metadata.clone();
1683
1684    if let Some(new_uri) = &args.set_uri {
1685        options.uri = Some(derive_uri(Some(new_uri), None));
1686    }
1687
1688    if options.uri.is_none() && payload_slice.is_some() {
1689        options.uri = Some(derive_uri(None, source_path));
1690    }
1691
1692    if let Some(title) = &args.title {
1693        options.title = Some(title.clone());
1694    }
1695    if let Some(ts) = args.timestamp {
1696        options.timestamp = Some(ts);
1697    }
1698    if let Some(track) = &args.track {
1699        options.track = Some(track.clone());
1700    }
1701    if let Some(kind) = &args.kind {
1702        options.kind = Some(kind.clone());
1703    }
1704
1705    if !args.labels.is_empty() {
1706        options.labels = args.labels.clone();
1707    }
1708
1709    if !args.tags.is_empty() {
1710        options.tags.clear();
1711        for (key, value) in &args.tags {
1712            if !key.is_empty() {
1713                options.tags.push(key.clone());
1714            }
1715            if !value.is_empty() && value != key {
1716                options.tags.push(value.clone());
1717            }
1718            options.extra_metadata.insert(key.clone(), value.clone());
1719        }
1720    }
1721
1722    apply_metadata_overrides(&mut options, &args.metadata);
1723
1724    let mut search_source = options.search_text.clone();
1725
1726    if let Some(payload) = payload_slice {
1727        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1728        if let Some(title) = inferred_title {
1729            options.title = Some(title);
1730        }
1731
1732        if options.uri.is_none() {
1733            options.uri = Some(derive_uri(None, source_path));
1734        }
1735
1736        let analysis = analyze_file(
1737            source_path,
1738            payload,
1739            payload_utf8,
1740            options.title.as_deref(),
1741            AudioAnalyzeOptions::default(),
1742            false,
1743        );
1744        if let Some(meta) = analysis.metadata {
1745            options.metadata = Some(meta);
1746        }
1747        if let Some(text) = analysis.search_text {
1748            search_source = Some(text.clone());
1749            options.search_text = Some(text);
1750        }
1751    } else {
1752        options.auto_tag = false;
1753        options.extract_dates = false;
1754    }
1755
1756    let stats = mem.stats()?;
1757    options.enable_embedding = stats.has_vec_index;
1758
1759    // Auto-detect embedding model from existing MV2 dimension
1760    let mv2_dimension = mem.vec_index_dimension();
1761    let mut embedding: Option<Vec<f32>> = None;
1762    if args.embeddings {
1763        let runtime = load_embedding_runtime_for_mv2(config, None, mv2_dimension)?;
1764        if let Some(text) = search_source.as_ref() {
1765            if !text.trim().is_empty() {
1766                embedding = Some(runtime.embed(text)?);
1767            }
1768        }
1769        if embedding.is_none() {
1770            if let Some(text) = payload_utf8 {
1771                if !text.trim().is_empty() {
1772                    embedding = Some(runtime.embed(text)?);
1773                }
1774            }
1775        }
1776        if embedding.is_none() {
1777            warn!("no textual content available; embeddings not recomputed");
1778        }
1779    }
1780
1781    let mut effective_embedding = embedding;
1782    if effective_embedding.is_none() && stats.has_vec_index {
1783        effective_embedding = mem.frame_embedding(frame_id)?;
1784    }
1785
1786    let final_uri = options.uri.clone();
1787    let replaced_payload = payload_slice.is_some();
1788    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1789    mem.commit()?;
1790
1791    let updated_frame =
1792        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1793            mem.frame_by_uri(&uri)?
1794        } else {
1795            let mut query = TimelineQueryBuilder::default();
1796            if let Some(limit) = NonZeroU64::new(1) {
1797                query = query.limit(limit).reverse(true);
1798            }
1799            let latest = mem.timeline(query.build())?;
1800            if let Some(entry) = latest.first() {
1801                mem.frame_by_id(entry.frame_id)?
1802            } else {
1803                mem.frame_by_id(frame_id)?
1804            }
1805        };
1806
1807    if args.json {
1808        let json = json!({
1809            "sequence": seq,
1810            "previous_frame_id": frame_id,
1811            "frame": frame_to_json(&updated_frame),
1812            "replaced_payload": replaced_payload,
1813        });
1814        println!("{}", serde_json::to_string_pretty(&json)?);
1815    } else {
1816        println!(
1817            "Updated frame {} → new frame {} (seq {})",
1818            frame_id, updated_frame.id, seq
1819        );
1820        println!(
1821            "Payload: {}",
1822            if replaced_payload {
1823                "replaced"
1824            } else {
1825                "reused"
1826            }
1827        );
1828        print_frame_summary(&mut mem, &updated_frame)?;
1829    }
1830
1831    Ok(())
1832}
1833
1834fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1835    if let Some(uri) = provided {
1836        let sanitized = sanitize_uri(uri, true);
1837        return format!("mv2://{}", sanitized);
1838    }
1839
1840    if let Some(path) = source {
1841        let raw = path.to_string_lossy();
1842        let sanitized = sanitize_uri(&raw, false);
1843        return format!("mv2://{}", sanitized);
1844    }
1845
1846    format!("mv2://frames/{}", Uuid::new_v4())
1847}
1848
1849pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1850    let digest = hash(payload).to_hex();
1851    let short = &digest[..32];
1852    let ext_from_path = source
1853        .and_then(|path| path.extension())
1854        .and_then(|ext| ext.to_str())
1855        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1856        .filter(|ext| !ext.is_empty());
1857    let ext = ext_from_path
1858        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1859        .unwrap_or_else(|| "bin".to_string());
1860    format!("mv2://video/{short}.{ext}")
1861}
1862
1863fn derive_title(
1864    provided: Option<String>,
1865    source: Option<&Path>,
1866    payload_utf8: Option<&str>,
1867) -> Option<String> {
1868    if let Some(title) = provided {
1869        let trimmed = title.trim();
1870        if trimmed.is_empty() {
1871            None
1872        } else {
1873            Some(trimmed.to_string())
1874        }
1875    } else {
1876        if let Some(text) = payload_utf8 {
1877            if let Some(markdown_title) = extract_markdown_title(text) {
1878                return Some(markdown_title);
1879            }
1880        }
1881        source
1882            .and_then(|path| path.file_stem())
1883            .and_then(|stem| stem.to_str())
1884            .map(to_title_case)
1885    }
1886}
1887
1888fn extract_markdown_title(text: &str) -> Option<String> {
1889    for line in text.lines() {
1890        let trimmed = line.trim();
1891        if trimmed.starts_with('#') {
1892            let title = trimmed.trim_start_matches('#').trim();
1893            if !title.is_empty() {
1894                return Some(title.to_string());
1895            }
1896        }
1897    }
1898    None
1899}
1900
1901fn to_title_case(input: &str) -> String {
1902    if input.is_empty() {
1903        return String::new();
1904    }
1905    let mut chars = input.chars();
1906    let Some(first) = chars.next() else {
1907        return String::new();
1908    };
1909    let prefix = first.to_uppercase().collect::<String>();
1910    prefix + chars.as_str()
1911}
1912
1913fn should_skip_input(path: &Path) -> bool {
1914    matches!(
1915        path.file_name().and_then(|name| name.to_str()),
1916        Some(".DS_Store")
1917    )
1918}
1919
1920fn should_skip_dir(path: &Path) -> bool {
1921    matches!(
1922        path.file_name().and_then(|name| name.to_str()),
1923        Some(name) if name.starts_with('.')
1924    )
1925}
1926
1927enum InputSet {
1928    Stdin,
1929    Files(Vec<PathBuf>),
1930}
1931
1932/// Check if a file is an image based on extension
1933#[cfg(feature = "clip")]
1934fn is_image_file(path: &std::path::Path) -> bool {
1935    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
1936        return false;
1937    };
1938    matches!(
1939        ext.to_ascii_lowercase().as_str(),
1940        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
1941    )
1942}
1943
1944fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
1945    let Some(raw) = input else {
1946        return Ok(InputSet::Stdin);
1947    };
1948
1949    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
1950        let mut files = Vec::new();
1951        let mut matched_any = false;
1952        for entry in glob(raw)? {
1953            let path = entry?;
1954            if path.is_file() {
1955                matched_any = true;
1956                if should_skip_input(&path) {
1957                    continue;
1958                }
1959                files.push(path);
1960            }
1961        }
1962        files.sort();
1963        if files.is_empty() {
1964            if matched_any {
1965                anyhow::bail!(
1966                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
1967                );
1968            }
1969            anyhow::bail!("pattern '{raw}' did not match any files");
1970        }
1971        return Ok(InputSet::Files(files));
1972    }
1973
1974    let path = PathBuf::from(raw);
1975    if path.is_dir() {
1976        let (mut files, skipped_any) = collect_directory_files(&path)?;
1977        files.sort();
1978        if files.is_empty() {
1979            if skipped_any {
1980                anyhow::bail!(
1981                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
1982                    path.display()
1983                );
1984            }
1985            anyhow::bail!(
1986                "directory '{}' contains no ingestible files",
1987                path.display()
1988            );
1989        }
1990        Ok(InputSet::Files(files))
1991    } else {
1992        Ok(InputSet::Files(vec![path]))
1993    }
1994}
1995
1996fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
1997    let mut files = Vec::new();
1998    let mut skipped_any = false;
1999    let mut stack = vec![root.to_path_buf()];
2000    while let Some(dir) = stack.pop() {
2001        for entry in fs::read_dir(&dir)? {
2002            let entry = entry?;
2003            let path = entry.path();
2004            let file_type = entry.file_type()?;
2005            if file_type.is_dir() {
2006                if should_skip_dir(&path) {
2007                    skipped_any = true;
2008                    continue;
2009                }
2010                stack.push(path);
2011            } else if file_type.is_file() {
2012                if should_skip_input(&path) {
2013                    skipped_any = true;
2014                    continue;
2015                }
2016                files.push(path);
2017            }
2018        }
2019    }
2020    Ok((files, skipped_any))
2021}
2022
2023struct IngestOutcome {
2024    report: PutReport,
2025    embedded: bool,
2026    #[cfg(feature = "parallel_segments")]
2027    parallel_input: Option<ParallelInput>,
2028}
2029
2030struct PutReport {
2031    seq: u64,
2032    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2033    frame_id: u64,
2034    uri: String,
2035    title: Option<String>,
2036    original_bytes: usize,
2037    stored_bytes: usize,
2038    compressed: bool,
2039    source: Option<PathBuf>,
2040    mime: Option<String>,
2041    metadata: Option<DocMetadata>,
2042    extraction: ExtractionSummary,
2043    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2044    #[cfg(feature = "logic_mesh")]
2045    search_text: Option<String>,
2046}
2047
2048struct CapacityGuard {
2049    #[allow(dead_code)]
2050    tier: Tier,
2051    capacity: u64,
2052    current_size: u64,
2053}
2054
2055impl CapacityGuard {
2056    const MIN_OVERHEAD: u64 = 128 * 1024;
2057    const RELATIVE_OVERHEAD: f64 = 0.05;
2058
2059    fn from_stats(stats: &Stats) -> Option<Self> {
2060        if stats.capacity_bytes == 0 {
2061            return None;
2062        }
2063        Some(Self {
2064            tier: stats.tier,
2065            capacity: stats.capacity_bytes,
2066            current_size: stats.size_bytes,
2067        })
2068    }
2069
2070    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2071        let projected = self
2072            .current_size
2073            .saturating_add(additional)
2074            .saturating_add(Self::estimate_overhead(additional));
2075        if projected > self.capacity {
2076            return Err(map_put_error(
2077                MemvidError::CapacityExceeded {
2078                    current: self.current_size,
2079                    limit: self.capacity,
2080                    required: projected,
2081                },
2082                Some(self.capacity),
2083            ));
2084        }
2085        Ok(())
2086    }
2087
2088    fn update_after_commit(&mut self, stats: &Stats) {
2089        self.current_size = stats.size_bytes;
2090    }
2091
2092    fn capacity_hint(&self) -> Option<u64> {
2093        Some(self.capacity)
2094    }
2095
2096    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2097    fn check_and_warn_capacity(&self) {
2098        if self.capacity == 0 {
2099            return;
2100        }
2101
2102        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2103
2104        // Warn at key thresholds
2105        if usage_pct >= 90.0 && usage_pct < 91.0 {
2106            eprintln!(
2107                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2108                format_bytes(self.current_size),
2109                format_bytes(self.capacity)
2110            );
2111            eprintln!("   Actions:");
2112            eprintln!("   1. Recreate with larger --size");
2113            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2114            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2115        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2116            eprintln!(
2117                "⚠️  WARNING: 75% capacity used ({} / {})",
2118                format_bytes(self.current_size),
2119                format_bytes(self.capacity)
2120            );
2121            eprintln!("   Consider planning for capacity expansion soon");
2122        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2123            eprintln!(
2124                "ℹ️  INFO: 50% capacity used ({} / {})",
2125                format_bytes(self.current_size),
2126                format_bytes(self.capacity)
2127            );
2128        }
2129    }
2130
2131    fn estimate_overhead(additional: u64) -> u64 {
2132        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2133        fractional.max(Self::MIN_OVERHEAD)
2134    }
2135}
2136
2137#[derive(Debug, Clone)]
2138pub struct FileAnalysis {
2139    pub mime: String,
2140    pub metadata: Option<DocMetadata>,
2141    pub search_text: Option<String>,
2142    pub extraction: ExtractionSummary,
2143}
2144
2145#[derive(Clone, Copy)]
2146pub struct AudioAnalyzeOptions {
2147    force: bool,
2148    segment_secs: u32,
2149}
2150
2151impl AudioAnalyzeOptions {
2152    const DEFAULT_SEGMENT_SECS: u32 = 30;
2153
2154    fn normalised_segment_secs(self) -> u32 {
2155        self.segment_secs.max(1)
2156    }
2157}
2158
2159impl Default for AudioAnalyzeOptions {
2160    fn default() -> Self {
2161        Self {
2162            force: false,
2163            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2164        }
2165    }
2166}
2167
2168struct AudioAnalysis {
2169    metadata: DocAudioMetadata,
2170    caption: Option<String>,
2171    search_terms: Vec<String>,
2172}
2173
2174struct ImageAnalysis {
2175    width: u32,
2176    height: u32,
2177    palette: Vec<String>,
2178    caption: Option<String>,
2179    exif: Option<DocExifMetadata>,
2180}
2181
2182#[derive(Debug, Clone)]
2183pub struct ExtractionSummary {
2184    reader: Option<String>,
2185    status: ExtractionStatus,
2186    warnings: Vec<String>,
2187    duration_ms: Option<u64>,
2188    pages_processed: Option<u32>,
2189}
2190
2191impl ExtractionSummary {
2192    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2193        self.warnings.push(warning.into());
2194    }
2195}
2196
2197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2198enum ExtractionStatus {
2199    Skipped,
2200    Ok,
2201    FallbackUsed,
2202    Empty,
2203    Failed,
2204}
2205
2206impl ExtractionStatus {
2207    fn label(self) -> &'static str {
2208        match self {
2209            Self::Skipped => "skipped",
2210            Self::Ok => "ok",
2211            Self::FallbackUsed => "fallback_used",
2212            Self::Empty => "empty",
2213            Self::Failed => "failed",
2214        }
2215    }
2216}
2217
2218fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2219    let filename = source_path
2220        .and_then(|path| path.file_name())
2221        .and_then(|name| name.to_str())
2222        .map(|value| value.to_string());
2223    MediaManifest {
2224        kind: "video".to_string(),
2225        mime: mime.to_string(),
2226        bytes,
2227        filename,
2228        duration_ms: None,
2229        width: None,
2230        height: None,
2231        codec: None,
2232    }
2233}
2234
2235pub fn analyze_file(
2236    source_path: Option<&Path>,
2237    payload: &[u8],
2238    payload_utf8: Option<&str>,
2239    inferred_title: Option<&str>,
2240    audio_opts: AudioAnalyzeOptions,
2241    force_video: bool,
2242) -> FileAnalysis {
2243    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2244    let is_video = force_video || mime.starts_with("video/");
2245    let treat_as_text = if is_video { false } else { treat_as_text_base };
2246
2247    let mut metadata = DocMetadata::default();
2248    metadata.mime = Some(mime.clone());
2249    metadata.bytes = Some(payload.len() as u64);
2250    metadata.hash = Some(hash(payload).to_hex().to_string());
2251
2252    let mut search_text = None;
2253
2254    let mut extraction = ExtractionSummary {
2255        reader: None,
2256        status: ExtractionStatus::Skipped,
2257        warnings: Vec::new(),
2258        duration_ms: None,
2259        pages_processed: None,
2260    };
2261
2262    let reader_registry = default_reader_registry();
2263    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2264        if slice.is_empty() {
2265            None
2266        } else {
2267            Some(slice)
2268        }
2269    });
2270
2271    let apply_extracted_text = |text: &str,
2272                                reader_label: &str,
2273                                status_if_applied: ExtractionStatus,
2274                                extraction: &mut ExtractionSummary,
2275                                metadata: &mut DocMetadata,
2276                                search_text: &mut Option<String>|
2277     -> bool {
2278        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2279            let mut value = normalized.text;
2280            if normalized.truncated {
2281                value.push('…');
2282            }
2283            if metadata.caption.is_none() {
2284                if let Some(caption) = caption_from_text(&value) {
2285                    metadata.caption = Some(caption);
2286                }
2287            }
2288            *search_text = Some(value);
2289            extraction.reader = Some(reader_label.to_string());
2290            extraction.status = status_if_applied;
2291            true
2292        } else {
2293            false
2294        }
2295    };
2296
2297    if is_video {
2298        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2299    }
2300
2301    if treat_as_text {
2302        if let Some(text) = payload_utf8 {
2303            if !apply_extracted_text(
2304                text,
2305                "bytes",
2306                ExtractionStatus::Ok,
2307                &mut extraction,
2308                &mut metadata,
2309                &mut search_text,
2310            ) {
2311                extraction.status = ExtractionStatus::Empty;
2312                extraction.reader = Some("bytes".to_string());
2313                let msg = "text payload contained no searchable content after normalization";
2314                extraction.record_warning(msg);
2315                warn!("{msg}");
2316            }
2317        } else {
2318            extraction.reader = Some("bytes".to_string());
2319            extraction.status = ExtractionStatus::Failed;
2320            let msg = "payload reported as text but was not valid UTF-8";
2321            extraction.record_warning(msg);
2322            warn!("{msg}");
2323        }
2324    } else if mime == "application/pdf" {
2325        let mut applied = false;
2326
2327        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2328        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2329            .with_uri(source_path.and_then(|p| p.to_str()))
2330            .with_magic(magic);
2331
2332        if let Some(reader) = reader_registry.find_reader(&hint) {
2333            match reader.extract(payload, &hint) {
2334                Ok(output) => {
2335                    extraction.reader = Some(output.reader_name.clone());
2336                    if let Some(ms) = output.diagnostics.duration_ms {
2337                        extraction.duration_ms = Some(ms);
2338                    }
2339                    if let Some(pages) = output.diagnostics.pages_processed {
2340                        extraction.pages_processed = Some(pages);
2341                    }
2342                    if let Some(mime_type) = output.document.mime_type.clone() {
2343                        metadata.mime = Some(mime_type);
2344                    }
2345
2346                    for warning in output.diagnostics.warnings.iter() {
2347                        extraction.record_warning(warning);
2348                        warn!("{warning}");
2349                    }
2350
2351                    let status = if output.diagnostics.fallback {
2352                        ExtractionStatus::FallbackUsed
2353                    } else {
2354                        ExtractionStatus::Ok
2355                    };
2356
2357                    if let Some(doc_text) = output.document.text.as_ref() {
2358                        applied = apply_extracted_text(
2359                            doc_text,
2360                            output.reader_name.as_str(),
2361                            status,
2362                            &mut extraction,
2363                            &mut metadata,
2364                            &mut search_text,
2365                        );
2366                        if !applied {
2367                            extraction.reader = Some(output.reader_name);
2368                            extraction.status = ExtractionStatus::Empty;
2369                            let msg = "primary reader returned no usable text";
2370                            extraction.record_warning(msg);
2371                            warn!("{msg}");
2372                        }
2373                    } else {
2374                        extraction.reader = Some(output.reader_name);
2375                        extraction.status = ExtractionStatus::Empty;
2376                        let msg = "primary reader returned empty text";
2377                        extraction.record_warning(msg);
2378                        warn!("{msg}");
2379                    }
2380                }
2381                Err(err) => {
2382                    let name = reader.name();
2383                    extraction.reader = Some(name.to_string());
2384                    extraction.status = ExtractionStatus::Failed;
2385                    let msg = format!("primary reader {name} failed: {err}");
2386                    extraction.record_warning(&msg);
2387                    warn!("{msg}");
2388                }
2389            }
2390        } else {
2391            extraction.record_warning("no registered reader matched this PDF payload");
2392            warn!("no registered reader matched this PDF payload");
2393        }
2394
2395        if !applied {
2396            match extract_pdf_text(payload) {
2397                Ok(pdf_text) => {
2398                    if !apply_extracted_text(
2399                        &pdf_text,
2400                        "pdf_extract",
2401                        ExtractionStatus::FallbackUsed,
2402                        &mut extraction,
2403                        &mut metadata,
2404                        &mut search_text,
2405                    ) {
2406                        extraction.reader = Some("pdf_extract".to_string());
2407                        extraction.status = ExtractionStatus::Empty;
2408                        let msg = "PDF text extraction yielded no searchable content";
2409                        extraction.record_warning(msg);
2410                        warn!("{msg}");
2411                    }
2412                }
2413                Err(err) => {
2414                    extraction.reader = Some("pdf_extract".to_string());
2415                    extraction.status = ExtractionStatus::Failed;
2416                    let msg = format!("failed to extract PDF text via fallback: {err}");
2417                    extraction.record_warning(&msg);
2418                    warn!("{msg}");
2419                }
2420            }
2421        }
2422    }
2423
2424    if !is_video && mime.starts_with("image/") {
2425        if let Some(image_meta) = analyze_image(payload) {
2426            let ImageAnalysis {
2427                width,
2428                height,
2429                palette,
2430                caption,
2431                exif,
2432            } = image_meta;
2433            metadata.width = Some(width);
2434            metadata.height = Some(height);
2435            if !palette.is_empty() {
2436                metadata.colors = Some(palette);
2437            }
2438            if metadata.caption.is_none() {
2439                metadata.caption = caption;
2440            }
2441            if let Some(exif) = exif {
2442                metadata.exif = Some(exif);
2443            }
2444        }
2445    }
2446
2447    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2448        if let Some(AudioAnalysis {
2449            metadata: audio_metadata,
2450            caption: audio_caption,
2451            mut search_terms,
2452        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2453        {
2454            if metadata.audio.is_none()
2455                || metadata
2456                    .audio
2457                    .as_ref()
2458                    .map_or(true, DocAudioMetadata::is_empty)
2459            {
2460                metadata.audio = Some(audio_metadata);
2461            }
2462            if metadata.caption.is_none() {
2463                metadata.caption = audio_caption;
2464            }
2465            if !search_terms.is_empty() {
2466                match &mut search_text {
2467                    Some(existing) => {
2468                        for term in search_terms.drain(..) {
2469                            if !existing.contains(&term) {
2470                                if !existing.ends_with(' ') {
2471                                    existing.push(' ');
2472                                }
2473                                existing.push_str(&term);
2474                            }
2475                        }
2476                    }
2477                    None => {
2478                        search_text = Some(search_terms.join(" "));
2479                    }
2480                }
2481            }
2482        }
2483    }
2484
2485    if metadata.caption.is_none() {
2486        if let Some(title) = inferred_title {
2487            let trimmed = title.trim();
2488            if !trimmed.is_empty() {
2489                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2490            }
2491        }
2492    }
2493
2494    FileAnalysis {
2495        mime,
2496        metadata: if metadata.is_empty() {
2497            None
2498        } else {
2499            Some(metadata)
2500        },
2501        search_text,
2502        extraction,
2503    }
2504}
2505
2506fn default_reader_registry() -> &'static ReaderRegistry {
2507    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2508    REGISTRY.get_or_init(ReaderRegistry::default)
2509}
2510
2511fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2512    if detect_pdf_magic(magic) {
2513        return Some(DocumentFormat::Pdf);
2514    }
2515
2516    let mime = mime?.trim().to_ascii_lowercase();
2517    match mime.as_str() {
2518        "application/pdf" => Some(DocumentFormat::Pdf),
2519        "text/plain" => Some(DocumentFormat::PlainText),
2520        "text/markdown" => Some(DocumentFormat::Markdown),
2521        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2522        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2523            Some(DocumentFormat::Docx)
2524        }
2525        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2526            Some(DocumentFormat::Pptx)
2527        }
2528        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2529            Some(DocumentFormat::Xlsx)
2530        }
2531        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2532        _ => None,
2533    }
2534}
2535
2536fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2537    let mut slice = match magic {
2538        Some(slice) if !slice.is_empty() => slice,
2539        _ => return false,
2540    };
2541    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2542        slice = &slice[3..];
2543    }
2544    while let Some((first, rest)) = slice.split_first() {
2545        if first.is_ascii_whitespace() {
2546            slice = rest;
2547        } else {
2548            break;
2549        }
2550    }
2551    slice.starts_with(b"%PDF")
2552}
2553
2554fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2555    match pdf_extract::extract_text_from_mem(payload) {
2556        Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2557            Ok(fallback) => Ok(fallback),
2558            Err(err) => {
2559                warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2560                Ok(text)
2561            }
2562        },
2563        Err(err) => {
2564            warn!("pdf_extract failed: {err}");
2565            match fallback_pdftotext(payload) {
2566                Ok(fallback) => Ok(fallback),
2567                Err(fallback_err) => {
2568                    warn!("pdftotext fallback failed: {fallback_err}");
2569                    Err(err)
2570                }
2571            }
2572        }
2573        other => other,
2574    }
2575}
2576
2577fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2578    use std::io::Write;
2579    use std::process::{Command, Stdio};
2580
2581    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2582
2583    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2584    temp_pdf
2585        .write_all(payload)
2586        .context("failed to write pdf payload to temp file")?;
2587    temp_pdf.flush().context("failed to flush temp pdf file")?;
2588
2589    let child = Command::new(pdftotext)
2590        .arg("-layout")
2591        .arg(temp_pdf.path())
2592        .arg("-")
2593        .stdout(Stdio::piped())
2594        .spawn()
2595        .context("failed to spawn pdftotext")?;
2596
2597    let output = child
2598        .wait_with_output()
2599        .context("failed to read pdftotext output")?;
2600
2601    if !output.status.success() {
2602        return Err(anyhow!("pdftotext exited with status {}", output.status));
2603    }
2604
2605    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2606    Ok(text)
2607}
2608
2609fn detect_mime(
2610    source_path: Option<&Path>,
2611    payload: &[u8],
2612    payload_utf8: Option<&str>,
2613) -> (String, bool) {
2614    if let Some(kind) = infer::get(payload) {
2615        let mime = kind.mime_type().to_string();
2616        let treat_as_text = mime.starts_with("text/")
2617            || matches!(
2618                mime.as_str(),
2619                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2620            );
2621        return (mime, treat_as_text);
2622    }
2623
2624    let magic = magic_from_u8(payload);
2625    if !magic.is_empty() && magic != "application/octet-stream" {
2626        let treat_as_text = magic.starts_with("text/")
2627            || matches!(
2628                magic,
2629                "application/json"
2630                    | "application/xml"
2631                    | "application/javascript"
2632                    | "image/svg+xml"
2633                    | "text/plain"
2634            );
2635        return (magic.to_string(), treat_as_text);
2636    }
2637
2638    if let Some(path) = source_path {
2639        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2640            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2641                return (mime.to_string(), treat_as_text);
2642            }
2643        }
2644    }
2645
2646    if payload_utf8.is_some() {
2647        return ("text/plain".to_string(), true);
2648    }
2649
2650    ("application/octet-stream".to_string(), false)
2651}
2652
2653fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2654    let ext_lower = ext.to_ascii_lowercase();
2655    match ext_lower.as_str() {
2656        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2657        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2658        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2659        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2660        "md" | "markdown" => Some(("text/markdown", true)),
2661        "rst" => Some(("text/x-rst", true)),
2662        "json" => Some(("application/json", true)),
2663        "csv" => Some(("text/csv", true)),
2664        "tsv" => Some(("text/tab-separated-values", true)),
2665        "yaml" | "yml" => Some(("application/yaml", true)),
2666        "toml" => Some(("application/toml", true)),
2667        "html" | "htm" => Some(("text/html", true)),
2668        "xml" => Some(("application/xml", true)),
2669        "svg" => Some(("image/svg+xml", true)),
2670        "gif" => Some(("image/gif", false)),
2671        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2672        "png" => Some(("image/png", false)),
2673        "bmp" => Some(("image/bmp", false)),
2674        "ico" => Some(("image/x-icon", false)),
2675        "tif" | "tiff" => Some(("image/tiff", false)),
2676        "webp" => Some(("image/webp", false)),
2677        _ => None,
2678    }
2679}
2680
2681fn caption_from_text(text: &str) -> Option<String> {
2682    for line in text.lines() {
2683        let trimmed = line.trim();
2684        if !trimmed.is_empty() {
2685            return Some(truncate_to_boundary(trimmed, 240));
2686        }
2687    }
2688    None
2689}
2690
2691fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2692    if text.len() <= max_len {
2693        return text.to_string();
2694    }
2695    let mut end = max_len;
2696    while end > 0 && !text.is_char_boundary(end) {
2697        end -= 1;
2698    }
2699    if end == 0 {
2700        return String::new();
2701    }
2702    let mut truncated = text[..end].trim_end().to_string();
2703    truncated.push('…');
2704    truncated
2705}
2706
2707fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2708    let reader = ImageReader::new(Cursor::new(payload))
2709        .with_guessed_format()
2710        .map_err(|err| warn!("failed to guess image format: {err}"))
2711        .ok()?;
2712    let image = reader
2713        .decode()
2714        .map_err(|err| warn!("failed to decode image: {err}"))
2715        .ok()?;
2716    let width = image.width();
2717    let height = image.height();
2718    let rgb = image.to_rgb8();
2719    let palette = match get_palette(
2720        rgb.as_raw(),
2721        ColorFormat::Rgb,
2722        COLOR_PALETTE_SIZE,
2723        COLOR_PALETTE_QUALITY,
2724    ) {
2725        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2726        Err(err) => {
2727            warn!("failed to compute colour palette: {err}");
2728            Vec::new()
2729        }
2730    };
2731
2732    let (exif, exif_caption) = extract_exif_metadata(payload);
2733
2734    Some(ImageAnalysis {
2735        width,
2736        height,
2737        palette,
2738        caption: exif_caption,
2739        exif,
2740    })
2741}
2742
2743fn color_to_hex(color: Color) -> String {
2744    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2745}
2746
2747fn analyze_audio(
2748    payload: &[u8],
2749    source_path: Option<&Path>,
2750    mime: &str,
2751    options: AudioAnalyzeOptions,
2752) -> Option<AudioAnalysis> {
2753    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2754    use symphonia::core::errors::Error as SymphoniaError;
2755    use symphonia::core::formats::FormatOptions;
2756    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2757    use symphonia::core::meta::MetadataOptions;
2758    use symphonia::core::probe::Hint;
2759
2760    let cursor = Cursor::new(payload.to_vec());
2761    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2762
2763    let mut hint = Hint::new();
2764    if let Some(path) = source_path {
2765        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2766            hint.with_extension(ext);
2767        }
2768    }
2769    if let Some(suffix) = mime.split('/').nth(1) {
2770        hint.with_extension(suffix);
2771    }
2772
2773    let probed = symphonia::default::get_probe()
2774        .format(
2775            &hint,
2776            mss,
2777            &FormatOptions::default(),
2778            &MetadataOptions::default(),
2779        )
2780        .map_err(|err| warn!("failed to probe audio stream: {err}"))
2781        .ok()?;
2782
2783    let mut format = probed.format;
2784    let track = format.default_track()?;
2785    let track_id = track.id;
2786    let codec_params: CodecParameters = track.codec_params.clone();
2787    let sample_rate = codec_params.sample_rate;
2788    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2789
2790    let mut decoder = symphonia::default::get_codecs()
2791        .make(&codec_params, &DecoderOptions::default())
2792        .map_err(|err| warn!("failed to create audio decoder: {err}"))
2793        .ok()?;
2794
2795    let mut decoded_frames: u64 = 0;
2796    loop {
2797        let packet = match format.next_packet() {
2798            Ok(packet) => packet,
2799            Err(SymphoniaError::IoError(_)) => break,
2800            Err(SymphoniaError::DecodeError(err)) => {
2801                warn!("skipping undecodable audio packet: {err}");
2802                continue;
2803            }
2804            Err(SymphoniaError::ResetRequired) => {
2805                decoder.reset();
2806                continue;
2807            }
2808            Err(other) => {
2809                warn!("stopping audio analysis due to error: {other}");
2810                break;
2811            }
2812        };
2813
2814        if packet.track_id() != track_id {
2815            continue;
2816        }
2817
2818        match decoder.decode(&packet) {
2819            Ok(audio_buf) => {
2820                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2821            }
2822            Err(SymphoniaError::DecodeError(err)) => {
2823                warn!("failed to decode audio packet: {err}");
2824            }
2825            Err(SymphoniaError::IoError(_)) => break,
2826            Err(SymphoniaError::ResetRequired) => {
2827                decoder.reset();
2828            }
2829            Err(other) => {
2830                warn!("decoder error: {other}");
2831                break;
2832            }
2833        }
2834    }
2835
2836    let duration_secs = match sample_rate {
2837        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2838        _ => 0.0,
2839    };
2840
2841    let bitrate_kbps = if duration_secs > 0.0 {
2842        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2843    } else {
2844        None
2845    };
2846
2847    let tags = extract_lofty_tags(payload);
2848    let caption = tags
2849        .get("title")
2850        .cloned()
2851        .or_else(|| tags.get("tracktitle").cloned());
2852
2853    let mut search_terms = Vec::new();
2854    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
2855        search_terms.push(title.clone());
2856    }
2857    if let Some(artist) = tags.get("artist") {
2858        search_terms.push(artist.clone());
2859    }
2860    if let Some(album) = tags.get("album") {
2861        search_terms.push(album.clone());
2862    }
2863    if let Some(genre) = tags.get("genre") {
2864        search_terms.push(genre.clone());
2865    }
2866
2867    let mut segments = Vec::new();
2868    if duration_secs > 0.0 {
2869        let segment_len = options.normalised_segment_secs() as f64;
2870        if segment_len > 0.0 {
2871            let mut start = 0.0;
2872            let mut idx = 1usize;
2873            while start < duration_secs {
2874                let end = (start + segment_len).min(duration_secs);
2875                segments.push(AudioSegmentMetadata {
2876                    start_seconds: start as f32,
2877                    end_seconds: end as f32,
2878                    label: Some(format!("Segment {}", idx)),
2879                });
2880                if end >= duration_secs {
2881                    break;
2882                }
2883                start = end;
2884                idx += 1;
2885            }
2886        }
2887    }
2888
2889    let mut metadata = DocAudioMetadata::default();
2890    if duration_secs > 0.0 {
2891        metadata.duration_secs = Some(duration_secs as f32);
2892    }
2893    if let Some(rate) = sample_rate {
2894        metadata.sample_rate_hz = Some(rate);
2895    }
2896    if let Some(channels) = channel_count {
2897        metadata.channels = Some(channels);
2898    }
2899    if let Some(bitrate) = bitrate_kbps {
2900        metadata.bitrate_kbps = Some(bitrate);
2901    }
2902    if codec_params.codec != CODEC_TYPE_NULL {
2903        metadata.codec = Some(format!("{:?}", codec_params.codec));
2904    }
2905    if !segments.is_empty() {
2906        metadata.segments = segments;
2907    }
2908    if !tags.is_empty() {
2909        metadata.tags = tags
2910            .iter()
2911            .map(|(k, v)| (k.clone(), v.clone()))
2912            .collect::<BTreeMap<_, _>>();
2913    }
2914
2915    Some(AudioAnalysis {
2916        metadata,
2917        caption,
2918        search_terms,
2919    })
2920}
2921
2922fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
2923    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
2924
2925    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
2926        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
2927            out.entry("title".into())
2928                .or_insert_with(|| value.to_string());
2929            out.entry("tracktitle".into())
2930                .or_insert_with(|| value.to_string());
2931        }
2932        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
2933            out.entry("artist".into())
2934                .or_insert_with(|| value.to_string());
2935        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
2936            out.entry("artist".into())
2937                .or_insert_with(|| value.to_string());
2938        }
2939        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
2940            out.entry("album".into())
2941                .or_insert_with(|| value.to_string());
2942        }
2943        if let Some(value) = tag.get_string(&ItemKey::Genre) {
2944            out.entry("genre".into())
2945                .or_insert_with(|| value.to_string());
2946        }
2947        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
2948            out.entry("track_number".into())
2949                .or_insert_with(|| value.to_string());
2950        }
2951        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
2952            out.entry("disc_number".into())
2953                .or_insert_with(|| value.to_string());
2954        }
2955    }
2956
2957    let mut tags = HashMap::new();
2958    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
2959        Ok(probe) => probe,
2960        Err(err) => {
2961            warn!("failed to guess audio tag file type: {err}");
2962            return tags;
2963        }
2964    };
2965
2966    let tagged_file = match probe.read() {
2967        Ok(file) => file,
2968        Err(err) => {
2969            warn!("failed to read audio tags: {err}");
2970            return tags;
2971        }
2972    };
2973
2974    if let Some(primary) = tagged_file.primary_tag() {
2975        collect_tag(primary, &mut tags);
2976    }
2977    for tag in tagged_file.tags() {
2978        collect_tag(tag, &mut tags);
2979    }
2980
2981    tags
2982}
2983
2984fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
2985    let mut cursor = Cursor::new(payload);
2986    let exif = match ExifReader::new().read_from_container(&mut cursor) {
2987        Ok(exif) => exif,
2988        Err(err) => {
2989            warn!("failed to read EXIF metadata: {err}");
2990            return (None, None);
2991        }
2992    };
2993
2994    let mut doc = DocExifMetadata::default();
2995    let mut has_data = false;
2996
2997    if let Some(make) = exif_string(&exif, Tag::Make) {
2998        doc.make = Some(make);
2999        has_data = true;
3000    }
3001    if let Some(model) = exif_string(&exif, Tag::Model) {
3002        doc.model = Some(model);
3003        has_data = true;
3004    }
3005    if let Some(lens) =
3006        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3007    {
3008        doc.lens = Some(lens);
3009        has_data = true;
3010    }
3011    if let Some(dt) =
3012        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3013    {
3014        doc.datetime = Some(dt);
3015        has_data = true;
3016    }
3017
3018    if let Some(gps) = extract_gps_metadata(&exif) {
3019        doc.gps = Some(gps);
3020        has_data = true;
3021    }
3022
3023    let caption = exif_string(&exif, Tag::ImageDescription);
3024
3025    let metadata = if has_data { Some(doc) } else { None };
3026    (metadata, caption)
3027}
3028
3029fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3030    exif.fields()
3031        .find(|field| field.tag == tag)
3032        .and_then(|field| field_to_string(field, exif))
3033}
3034
3035fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3036    let value = field.display_value().with_unit(exif).to_string();
3037    let trimmed = value.trim_matches('\0').trim();
3038    if trimmed.is_empty() {
3039        None
3040    } else {
3041        Some(trimmed.to_string())
3042    }
3043}
3044
3045fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3046    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3047    let latitude_ref_field = exif
3048        .fields()
3049        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3050    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3051    let longitude_ref_field = exif
3052        .fields()
3053        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3054
3055    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3056    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3057
3058    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3059        if reference.eq_ignore_ascii_case("S") {
3060            latitude = -latitude;
3061        }
3062    }
3063    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3064        if reference.eq_ignore_ascii_case("W") {
3065            longitude = -longitude;
3066        }
3067    }
3068
3069    Some(DocGpsMetadata {
3070        latitude,
3071        longitude,
3072    })
3073}
3074
3075fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3076    match value {
3077        ExifValue::Rational(values) if !values.is_empty() => {
3078            let deg = rational_to_f64_u(values.get(0)?)?;
3079            let min = values
3080                .get(1)
3081                .and_then(|v| rational_to_f64_u(v))
3082                .unwrap_or(0.0);
3083            let sec = values
3084                .get(2)
3085                .and_then(|v| rational_to_f64_u(v))
3086                .unwrap_or(0.0);
3087            Some(deg + (min / 60.0) + (sec / 3600.0))
3088        }
3089        ExifValue::SRational(values) if !values.is_empty() => {
3090            let deg = rational_to_f64_i(values.get(0)?)?;
3091            let min = values
3092                .get(1)
3093                .and_then(|v| rational_to_f64_i(v))
3094                .unwrap_or(0.0);
3095            let sec = values
3096                .get(2)
3097                .and_then(|v| rational_to_f64_i(v))
3098                .unwrap_or(0.0);
3099            Some(deg + (min / 60.0) + (sec / 3600.0))
3100        }
3101        _ => None,
3102    }
3103}
3104
3105fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3106    if value.denom == 0 {
3107        None
3108    } else {
3109        Some(value.num as f64 / value.denom as f64)
3110    }
3111}
3112
3113fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3114    if value.denom == 0 {
3115        None
3116    } else {
3117        Some(value.num as f64 / value.denom as f64)
3118    }
3119}
3120
3121fn value_to_ascii(value: &ExifValue) -> Option<String> {
3122    if let ExifValue::Ascii(values) = value {
3123        values
3124            .first()
3125            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3126            .map(|s| s.trim_matches('\0').trim().to_string())
3127            .filter(|s| !s.is_empty())
3128    } else {
3129        None
3130    }
3131}
3132
3133fn ingest_payload(
3134    mem: &mut Memvid,
3135    args: &PutArgs,
3136    payload: Vec<u8>,
3137    source_path: Option<&Path>,
3138    capacity_guard: Option<&mut CapacityGuard>,
3139    enable_embedding: bool,
3140    runtime: Option<&EmbeddingRuntime>,
3141    parallel_mode: bool,
3142) -> Result<IngestOutcome> {
3143    let original_bytes = payload.len();
3144    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3145
3146    let payload_text = std::str::from_utf8(&payload).ok();
3147    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3148
3149    let audio_opts = AudioAnalyzeOptions {
3150        force: args.audio,
3151        segment_secs: args
3152            .audio_segment_seconds
3153            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3154    };
3155
3156    let mut analysis = analyze_file(
3157        source_path,
3158        &payload,
3159        payload_text,
3160        inferred_title.as_deref(),
3161        audio_opts,
3162        args.video,
3163    );
3164
3165    if args.video && !analysis.mime.starts_with("video/") {
3166        anyhow::bail!(
3167            "--video requires a video input; detected MIME type {}",
3168            analysis.mime
3169        );
3170    }
3171
3172    let mut search_text = analysis.search_text.clone();
3173    if let Some(ref mut text) = search_text {
3174        if text.len() > MAX_SEARCH_TEXT_LEN {
3175            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3176            *text = truncated;
3177        }
3178    }
3179    analysis.search_text = search_text.clone();
3180
3181    let uri = if let Some(ref explicit) = args.uri {
3182        derive_uri(Some(explicit), None)
3183    } else if args.video {
3184        derive_video_uri(&payload, source_path, &analysis.mime)
3185    } else {
3186        derive_uri(None, source_path)
3187    };
3188
3189    let mut options_builder = PutOptions::builder()
3190        .enable_embedding(enable_embedding)
3191        .auto_tag(!args.no_auto_tag)
3192        .extract_dates(!args.no_extract_dates);
3193    if let Some(ts) = args.timestamp {
3194        options_builder = options_builder.timestamp(ts);
3195    }
3196    if let Some(track) = &args.track {
3197        options_builder = options_builder.track(track.clone());
3198    }
3199    if args.video {
3200        options_builder = options_builder.kind("video");
3201        options_builder = options_builder.tag("kind", "video");
3202        options_builder = options_builder.push_tag("video");
3203    } else if let Some(kind) = &args.kind {
3204        options_builder = options_builder.kind(kind.clone());
3205    }
3206    options_builder = options_builder.uri(uri.clone());
3207    if let Some(ref title) = inferred_title {
3208        options_builder = options_builder.title(title.clone());
3209    }
3210    for (key, value) in &args.tags {
3211        options_builder = options_builder.tag(key.clone(), value.clone());
3212        if !key.is_empty() {
3213            options_builder = options_builder.push_tag(key.clone());
3214        }
3215        if !value.is_empty() && value != key {
3216            options_builder = options_builder.push_tag(value.clone());
3217        }
3218    }
3219    for label in &args.labels {
3220        options_builder = options_builder.label(label.clone());
3221    }
3222    if let Some(metadata) = analysis.metadata.clone() {
3223        if !metadata.is_empty() {
3224            options_builder = options_builder.metadata(metadata);
3225        }
3226    }
3227    for (idx, entry) in args.metadata.iter().enumerate() {
3228        match serde_json::from_str::<DocMetadata>(entry) {
3229            Ok(meta) => {
3230                options_builder = options_builder.metadata(meta);
3231            }
3232            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3233                Ok(value) => {
3234                    options_builder =
3235                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3236                }
3237                Err(err) => {
3238                    warn!("failed to parse --metadata JSON: {err}");
3239                }
3240            },
3241        }
3242    }
3243    if let Some(text) = search_text.clone() {
3244        if !text.trim().is_empty() {
3245            options_builder = options_builder.search_text(text);
3246        }
3247    }
3248    let options = options_builder.build();
3249
3250    let existing_frame = if args.update_existing || !args.allow_duplicate {
3251        match mem.frame_by_uri(&uri) {
3252            Ok(frame) => Some(frame),
3253            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3254            Err(err) => return Err(err.into()),
3255        }
3256    } else {
3257        None
3258    };
3259
3260    // Compute frame_id: for updates it's the existing frame's id,
3261    // for new frames it's the current frame count (which becomes the next id)
3262    let frame_id = if let Some(ref existing) = existing_frame {
3263        existing.id
3264    } else {
3265        mem.stats()?.frame_count
3266    };
3267
3268    let mut embedded = false;
3269    let allow_embedding = enable_embedding && !args.video;
3270    if enable_embedding && !allow_embedding && args.video {
3271        warn!("semantic embeddings are not generated for video payloads");
3272    }
3273    let seq = if let Some(existing) = existing_frame {
3274        if args.update_existing {
3275            let payload_for_update = payload.clone();
3276            if allow_embedding {
3277                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3278                let embed_text = search_text
3279                    .clone()
3280                    .or_else(|| payload_text.map(|text| text.to_string()))
3281                    .unwrap_or_default();
3282                if embed_text.trim().is_empty() {
3283                    warn!("no textual content available; embedding skipped");
3284                    mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3285                        .map_err(|err| {
3286                            map_put_error(
3287                                err,
3288                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3289                            )
3290                        })?
3291                } else {
3292                    // Truncate to avoid token limits
3293                    let truncated_text = truncate_for_embedding(&embed_text);
3294                    if truncated_text.len() < embed_text.len() {
3295                        info!(
3296                            "Truncated text from {} to {} chars for embedding",
3297                            embed_text.len(),
3298                            truncated_text.len()
3299                        );
3300                    }
3301                    let embedding = runtime.embed(&truncated_text)?;
3302                    embedded = true;
3303                    mem.update_frame(
3304                        existing.id,
3305                        Some(payload_for_update),
3306                        options.clone(),
3307                        Some(embedding),
3308                    )
3309                    .map_err(|err| {
3310                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3311                    })?
3312                }
3313            } else {
3314                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3315                    .map_err(|err| {
3316                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3317                    })?
3318            }
3319        } else {
3320            return Err(DuplicateUriError::new(uri.as_str()).into());
3321        }
3322    } else {
3323        if let Some(guard) = capacity_guard.as_ref() {
3324            guard.ensure_capacity(stored_bytes as u64)?;
3325        }
3326        if parallel_mode {
3327            #[cfg(feature = "parallel_segments")]
3328            {
3329                let mut parent_embedding = None;
3330                let mut chunk_embeddings_vec = None;
3331                if allow_embedding {
3332                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3333                    let embed_text = search_text
3334                        .clone()
3335                        .or_else(|| payload_text.map(|text| text.to_string()))
3336                        .unwrap_or_default();
3337                    if embed_text.trim().is_empty() {
3338                        warn!("no textual content available; embedding skipped");
3339                    } else {
3340                        // Check if document will be chunked - if so, embed each chunk
3341                        info!(
3342                            "parallel mode: checking for chunks on {} bytes payload",
3343                            payload.len()
3344                        );
3345                        if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3346                            // Document will be chunked - embed each chunk for full semantic coverage
3347                            info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3348
3349                            // Apply temporal enrichment if enabled (before contextual)
3350                            #[cfg(feature = "temporal_enrich")]
3351                            let enriched_chunks = if args.temporal_enrich {
3352                                info!(
3353                                    "Temporal enrichment enabled, processing {} chunks",
3354                                    chunk_texts.len()
3355                                );
3356                                // Use today's date as fallback document date
3357                                let today = chrono::Local::now().date_naive();
3358                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3359                                // Results are tuples of (enriched_text, TemporalEnrichment)
3360                                let enriched: Vec<String> =
3361                                    results.iter().map(|(text, _)| text.clone()).collect();
3362                                let resolved_count = results
3363                                    .iter()
3364                                    .filter(|(_, e)| {
3365                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3366                                    })
3367                                    .count();
3368                                info!(
3369                                    "Temporal enrichment: resolved {} chunks with temporal context",
3370                                    resolved_count
3371                                );
3372                                enriched
3373                            } else {
3374                                chunk_texts.clone()
3375                            };
3376                            #[cfg(not(feature = "temporal_enrich"))]
3377                            let enriched_chunks = {
3378                                if args.temporal_enrich {
3379                                    warn!(
3380                                        "Temporal enrichment requested but feature not compiled in"
3381                                    );
3382                                }
3383                                chunk_texts.clone()
3384                            };
3385
3386                            // Apply contextual retrieval if enabled
3387                            let embed_chunks = if args.contextual {
3388                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3389                                let engine = if args.contextual_model.as_deref() == Some("local") {
3390                                    #[cfg(feature = "llama-cpp")]
3391                                    {
3392                                        let model_path = get_local_contextual_model_path()?;
3393                                        ContextualEngine::local(model_path)
3394                                    }
3395                                    #[cfg(not(feature = "llama-cpp"))]
3396                                    {
3397                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3398                                    }
3399                                } else if let Some(model) = &args.contextual_model {
3400                                    ContextualEngine::openai_with_model(model)?
3401                                } else {
3402                                    ContextualEngine::openai()?
3403                                };
3404
3405                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3406                                {
3407                                    Ok(contexts) => {
3408                                        info!("Generated {} contextual prefixes", contexts.len());
3409                                        apply_contextual_prefixes(
3410                                            &embed_text,
3411                                            &enriched_chunks,
3412                                            &contexts,
3413                                        )
3414                                    }
3415                                    Err(e) => {
3416                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3417                                        enriched_chunks.clone()
3418                                    }
3419                                }
3420                            } else {
3421                                enriched_chunks
3422                            };
3423
3424                            let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3425                            for chunk_text in &embed_chunks {
3426                                // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
3427                                let truncated_chunk = truncate_for_embedding(chunk_text);
3428                                if truncated_chunk.len() < chunk_text.len() {
3429                                    info!("parallel mode: Truncated chunk from {} to {} chars for embedding", chunk_text.len(), truncated_chunk.len());
3430                                }
3431                                let chunk_emb = runtime.embed(&truncated_chunk)?;
3432                                chunk_embeddings.push(chunk_emb);
3433                            }
3434                            let num_chunks = chunk_embeddings.len();
3435                            chunk_embeddings_vec = Some(chunk_embeddings);
3436                            // Also embed the parent for the parent frame (truncate to avoid token limits)
3437                            let parent_text = truncate_for_embedding(&embed_text);
3438                            if parent_text.len() < embed_text.len() {
3439                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3440                            }
3441                            parent_embedding = Some(runtime.embed(&parent_text)?);
3442                            embedded = true;
3443                            info!(
3444                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3445                                num_chunks
3446                            );
3447                        } else {
3448                            // No chunking - just embed the whole document (truncate to avoid token limits)
3449                            info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3450                            let truncated_text = truncate_for_embedding(&embed_text);
3451                            if truncated_text.len() < embed_text.len() {
3452                                info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3453                            }
3454                            parent_embedding = Some(runtime.embed(&truncated_text)?);
3455                            embedded = true;
3456                        }
3457                    }
3458                }
3459                let payload_variant = if let Some(path) = source_path {
3460                    ParallelPayload::Path(path.to_path_buf())
3461                } else {
3462                    ParallelPayload::Bytes(payload)
3463                };
3464                let input = ParallelInput {
3465                    payload: payload_variant,
3466                    options: options.clone(),
3467                    embedding: parent_embedding,
3468                    chunk_embeddings: chunk_embeddings_vec,
3469                };
3470                return Ok(IngestOutcome {
3471                    report: PutReport {
3472                        seq: 0,
3473                        frame_id: 0, // Will be populated after parallel commit
3474                        uri,
3475                        title: inferred_title,
3476                        original_bytes,
3477                        stored_bytes,
3478                        compressed,
3479                        source: source_path.map(Path::to_path_buf),
3480                        mime: Some(analysis.mime),
3481                        metadata: analysis.metadata,
3482                        extraction: analysis.extraction,
3483                        #[cfg(feature = "logic_mesh")]
3484                        search_text: search_text.clone(),
3485                    },
3486                    embedded,
3487                    parallel_input: Some(input),
3488                });
3489            }
3490            #[cfg(not(feature = "parallel_segments"))]
3491            {
3492                mem.put_bytes_with_options(&payload, options)
3493                    .map_err(|err| {
3494                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3495                    })?
3496            }
3497        } else if allow_embedding {
3498            let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3499            let embed_text = search_text
3500                .clone()
3501                .or_else(|| payload_text.map(|text| text.to_string()))
3502                .unwrap_or_default();
3503            if embed_text.trim().is_empty() {
3504                warn!("no textual content available; embedding skipped");
3505                mem.put_bytes_with_options(&payload, options)
3506                    .map_err(|err| {
3507                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3508                    })?
3509            } else {
3510                // Check if document will be chunked - if so, embed each chunk
3511                if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3512                    // Document will be chunked - embed each chunk for full semantic coverage
3513                    info!(
3514                        "Document will be chunked into {} chunks, generating embeddings for each",
3515                        chunk_texts.len()
3516                    );
3517
3518                    // Apply temporal enrichment if enabled (before contextual)
3519                    #[cfg(feature = "temporal_enrich")]
3520                    let enriched_chunks = if args.temporal_enrich {
3521                        info!(
3522                            "Temporal enrichment enabled, processing {} chunks",
3523                            chunk_texts.len()
3524                        );
3525                        // Use today's date as fallback document date
3526                        let today = chrono::Local::now().date_naive();
3527                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3528                        // Results are tuples of (enriched_text, TemporalEnrichment)
3529                        let enriched: Vec<String> =
3530                            results.iter().map(|(text, _)| text.clone()).collect();
3531                        let resolved_count = results
3532                            .iter()
3533                            .filter(|(_, e)| {
3534                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
3535                            })
3536                            .count();
3537                        info!(
3538                            "Temporal enrichment: resolved {} chunks with temporal context",
3539                            resolved_count
3540                        );
3541                        enriched
3542                    } else {
3543                        chunk_texts.clone()
3544                    };
3545                    #[cfg(not(feature = "temporal_enrich"))]
3546                    let enriched_chunks = {
3547                        if args.temporal_enrich {
3548                            warn!("Temporal enrichment requested but feature not compiled in");
3549                        }
3550                        chunk_texts.clone()
3551                    };
3552
3553                    // Apply contextual retrieval if enabled
3554                    let embed_chunks = if args.contextual {
3555                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3556                        let engine = if args.contextual_model.as_deref() == Some("local") {
3557                            #[cfg(feature = "llama-cpp")]
3558                            {
3559                                let model_path = get_local_contextual_model_path()?;
3560                                ContextualEngine::local(model_path)
3561                            }
3562                            #[cfg(not(feature = "llama-cpp"))]
3563                            {
3564                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3565                            }
3566                        } else if let Some(model) = &args.contextual_model {
3567                            ContextualEngine::openai_with_model(model)?
3568                        } else {
3569                            ContextualEngine::openai()?
3570                        };
3571
3572                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3573                            Ok(contexts) => {
3574                                info!("Generated {} contextual prefixes", contexts.len());
3575                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3576                            }
3577                            Err(e) => {
3578                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3579                                enriched_chunks.clone()
3580                            }
3581                        }
3582                    } else {
3583                        enriched_chunks
3584                    };
3585
3586                    let mut chunk_embeddings = Vec::with_capacity(embed_chunks.len());
3587                    for chunk_text in &embed_chunks {
3588                        // Truncate chunk to avoid OpenAI token limits (contextual prefixes can make chunks large)
3589                        let truncated_chunk = truncate_for_embedding(chunk_text);
3590                        if truncated_chunk.len() < chunk_text.len() {
3591                            info!(
3592                                "Truncated chunk from {} to {} chars for embedding",
3593                                chunk_text.len(),
3594                                truncated_chunk.len()
3595                            );
3596                        }
3597                        let chunk_emb = runtime.embed(&truncated_chunk)?;
3598                        chunk_embeddings.push(chunk_emb);
3599                    }
3600                    // Truncate parent text to avoid token limits
3601                    let parent_text = truncate_for_embedding(&embed_text);
3602                    if parent_text.len() < embed_text.len() {
3603                        info!(
3604                            "Truncated parent text from {} to {} chars for embedding",
3605                            embed_text.len(),
3606                            parent_text.len()
3607                        );
3608                    }
3609                    let parent_embedding = runtime.embed(&parent_text)?;
3610                    embedded = true;
3611                    info!(
3612                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
3613                        chunk_embeddings.len()
3614                    );
3615                    mem.put_with_chunk_embeddings(
3616                        &payload,
3617                        Some(parent_embedding),
3618                        chunk_embeddings,
3619                        options,
3620                    )
3621                    .map_err(|err| {
3622                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3623                    })?
3624                } else {
3625                    // No chunking - just embed the whole document (truncate to avoid token limits)
3626                    info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3627                    let truncated_text = truncate_for_embedding(&embed_text);
3628                    if truncated_text.len() < embed_text.len() {
3629                        info!(
3630                            "Truncated text from {} to {} chars for embedding",
3631                            embed_text.len(),
3632                            truncated_text.len()
3633                        );
3634                    }
3635                    let embedding = runtime.embed(&truncated_text)?;
3636                    embedded = true;
3637                    mem.put_with_embedding_and_options(&payload, embedding, options)
3638                        .map_err(|err| {
3639                            map_put_error(
3640                                err,
3641                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3642                            )
3643                        })?
3644                }
3645            }
3646        } else {
3647            mem.put_bytes_with_options(&payload, options)
3648                .map_err(|err| {
3649                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3650                })?
3651        }
3652    };
3653
3654    Ok(IngestOutcome {
3655        report: PutReport {
3656            seq,
3657            frame_id,
3658            uri,
3659            title: inferred_title,
3660            original_bytes,
3661            stored_bytes,
3662            compressed,
3663            source: source_path.map(Path::to_path_buf),
3664            mime: Some(analysis.mime),
3665            metadata: analysis.metadata,
3666            extraction: analysis.extraction,
3667            #[cfg(feature = "logic_mesh")]
3668            search_text,
3669        },
3670        embedded,
3671        #[cfg(feature = "parallel_segments")]
3672        parallel_input: None,
3673    })
3674}
3675
3676fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
3677    if std::str::from_utf8(payload).is_ok() {
3678        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
3679        Ok((compressed.len(), true))
3680    } else {
3681        Ok((payload.len(), false))
3682    }
3683}
3684
3685fn print_report(report: &PutReport) {
3686    let name = report
3687        .source
3688        .as_ref()
3689        .map(|path| {
3690            let pretty = env::current_dir()
3691                .ok()
3692                .as_ref()
3693                .and_then(|cwd| diff_paths(path, cwd))
3694                .unwrap_or_else(|| path.to_path_buf());
3695            pretty.to_string_lossy().into_owned()
3696        })
3697        .unwrap_or_else(|| "stdin".to_string());
3698
3699    let ratio = if report.original_bytes > 0 {
3700        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
3701    } else {
3702        100.0
3703    };
3704
3705    println!("• {name} → {}", report.uri);
3706    println!("  seq: {}", report.seq);
3707    if report.compressed {
3708        println!(
3709            "  size: {} B → {} B ({:.1}% of original, compressed)",
3710            report.original_bytes, report.stored_bytes, ratio
3711        );
3712    } else {
3713        println!("  size: {} B (stored as-is)", report.original_bytes);
3714    }
3715    if let Some(mime) = &report.mime {
3716        println!("  mime: {mime}");
3717    }
3718    if let Some(title) = &report.title {
3719        println!("  title: {title}");
3720    }
3721    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
3722    println!(
3723        "  extraction: status={} reader={}",
3724        report.extraction.status.label(),
3725        extraction_reader
3726    );
3727    if let Some(ms) = report.extraction.duration_ms {
3728        println!("  extraction-duration: {} ms", ms);
3729    }
3730    if let Some(pages) = report.extraction.pages_processed {
3731        println!("  extraction-pages: {}", pages);
3732    }
3733    for warning in &report.extraction.warnings {
3734        println!("  warning: {warning}");
3735    }
3736    if let Some(metadata) = &report.metadata {
3737        if let Some(caption) = &metadata.caption {
3738            println!("  caption: {caption}");
3739        }
3740        if let Some(audio) = metadata.audio.as_ref() {
3741            if audio.duration_secs.is_some()
3742                || audio.sample_rate_hz.is_some()
3743                || audio.channels.is_some()
3744            {
3745                let duration = audio
3746                    .duration_secs
3747                    .map(|secs| format!("{secs:.1}s"))
3748                    .unwrap_or_else(|| "unknown".into());
3749                let rate = audio
3750                    .sample_rate_hz
3751                    .map(|hz| format!("{} Hz", hz))
3752                    .unwrap_or_else(|| "? Hz".into());
3753                let channels = audio
3754                    .channels
3755                    .map(|ch| ch.to_string())
3756                    .unwrap_or_else(|| "?".into());
3757                println!("  audio: duration {duration}, {channels} ch, {rate}");
3758            }
3759        }
3760    }
3761
3762    println!();
3763}
3764
3765fn put_report_to_json(report: &PutReport) -> serde_json::Value {
3766    let source = report
3767        .source
3768        .as_ref()
3769        .map(|path| path.to_string_lossy().into_owned());
3770    let extraction = json!({
3771        "status": report.extraction.status.label(),
3772        "reader": report.extraction.reader,
3773        "duration_ms": report.extraction.duration_ms,
3774        "pages_processed": report.extraction.pages_processed,
3775        "warnings": report.extraction.warnings,
3776    });
3777    json!({
3778        "seq": report.seq,
3779        "uri": report.uri,
3780        "title": report.title,
3781        "original_bytes": report.original_bytes,
3782        "original_bytes_human": format_bytes(report.original_bytes as u64),
3783        "stored_bytes": report.stored_bytes,
3784        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
3785        "compressed": report.compressed,
3786        "mime": report.mime,
3787        "source": source,
3788        "metadata": report.metadata,
3789        "extraction": extraction,
3790    })
3791}
3792
3793fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
3794    match err {
3795        MemvidError::CapacityExceeded {
3796            current,
3797            limit,
3798            required,
3799        } => anyhow!(CapacityExceededMessage {
3800            current,
3801            limit,
3802            required,
3803        }),
3804        other => other.into(),
3805    }
3806}
3807
3808fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
3809    for (idx, entry) in entries.iter().enumerate() {
3810        match serde_json::from_str::<DocMetadata>(entry) {
3811            Ok(meta) => {
3812                options.metadata = Some(meta);
3813            }
3814            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3815                Ok(value) => {
3816                    options
3817                        .extra_metadata
3818                        .insert(format!("custom_metadata_{idx}"), value.to_string());
3819                }
3820                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
3821            },
3822        }
3823    }
3824}
3825
3826fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
3827    let stats = mem.stats()?;
3828    let message = match stats.tier {
3829        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
3830        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
3831    };
3832    Ok(message)
3833}
3834
3835fn sanitize_uri(raw: &str, keep_path: bool) -> String {
3836    let trimmed = raw.trim().trim_start_matches("mv2://");
3837    let normalized = trimmed.replace('\\', "/");
3838    let mut segments: Vec<String> = Vec::new();
3839    for segment in normalized.split('/') {
3840        if segment.is_empty() || segment == "." {
3841            continue;
3842        }
3843        if segment == ".." {
3844            segments.pop();
3845            continue;
3846        }
3847        segments.push(segment.to_string());
3848    }
3849
3850    if segments.is_empty() {
3851        return normalized
3852            .split('/')
3853            .last()
3854            .filter(|s| !s.is_empty())
3855            .unwrap_or("document")
3856            .to_string();
3857    }
3858
3859    if keep_path {
3860        segments.join("/")
3861    } else {
3862        segments
3863            .last()
3864            .cloned()
3865            .unwrap_or_else(|| "document".to_string())
3866    }
3867}
3868
3869fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
3870    if quiet {
3871        let pb = ProgressBar::hidden();
3872        pb.set_message(message.to_string());
3873        return pb;
3874    }
3875
3876    match total {
3877        Some(len) => {
3878            let pb = ProgressBar::new(len);
3879            pb.set_draw_target(ProgressDrawTarget::stderr());
3880            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
3881                .unwrap_or_else(|_| ProgressStyle::default_bar());
3882            pb.set_style(style);
3883            pb.set_message(message.to_string());
3884            pb
3885        }
3886        None => {
3887            let pb = ProgressBar::new_spinner();
3888            pb.set_draw_target(ProgressDrawTarget::stderr());
3889            pb.set_message(message.to_string());
3890            pb.enable_steady_tick(Duration::from_millis(120));
3891            pb
3892        }
3893    }
3894}
3895
3896fn create_spinner(message: &str) -> ProgressBar {
3897    let pb = ProgressBar::new_spinner();
3898    pb.set_draw_target(ProgressDrawTarget::stderr());
3899    let style = ProgressStyle::with_template("{spinner} {msg}")
3900        .unwrap_or_else(|_| ProgressStyle::default_spinner())
3901        .tick_strings(&["-", "\\", "|", "/"]);
3902    pb.set_style(style);
3903    pb.set_message(message.to_string());
3904    pb.enable_steady_tick(Duration::from_millis(120));
3905    pb
3906}
3907
3908// ============================================================================
3909// put-many command - batch ingestion with pre-computed embeddings
3910// ============================================================================
3911
3912/// Arguments for the `put-many` subcommand
3913#[cfg(feature = "parallel_segments")]
3914#[derive(Args)]
3915pub struct PutManyArgs {
3916    /// Path to the memory file to append to
3917    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
3918    pub file: PathBuf,
3919    /// Emit JSON instead of human-readable output
3920    #[arg(long)]
3921    pub json: bool,
3922    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
3923    /// If not specified, reads from stdin
3924    #[arg(long, value_name = "PATH")]
3925    pub input: Option<PathBuf>,
3926    /// Zstd compression level (1-9, default: 3)
3927    #[arg(long, default_value = "3")]
3928    pub compression_level: i32,
3929    #[command(flatten)]
3930    pub lock: LockCliArgs,
3931}
3932
3933/// JSON input format for put-many requests
3934#[cfg(feature = "parallel_segments")]
3935#[derive(Debug, serde::Deserialize)]
3936pub struct PutManyRequest {
3937    pub title: String,
3938    #[serde(default)]
3939    pub label: String,
3940    pub text: String,
3941    #[serde(default)]
3942    pub uri: Option<String>,
3943    #[serde(default)]
3944    pub tags: Vec<String>,
3945    #[serde(default)]
3946    pub labels: Vec<String>,
3947    #[serde(default)]
3948    pub metadata: serde_json::Value,
3949    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
3950    #[serde(default)]
3951    pub embedding: Option<Vec<f32>>,
3952    /// Pre-computed embeddings for each chunk (matched by index)
3953    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
3954    /// The number of chunk embeddings should match the number of chunks that will be created.
3955    #[serde(default)]
3956    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
3957}
3958
3959/// Batch input format (array of requests)
3960#[cfg(feature = "parallel_segments")]
3961#[derive(Debug, serde::Deserialize)]
3962#[serde(transparent)]
3963pub struct PutManyBatch {
3964    pub requests: Vec<PutManyRequest>,
3965}
3966
3967#[cfg(feature = "parallel_segments")]
3968pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
3969    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
3970
3971    let mut mem = Memvid::open(&args.file)?;
3972    crate::utils::ensure_cli_mutation_allowed(&mem)?;
3973    crate::utils::apply_lock_cli(&mut mem, &args.lock);
3974
3975    // Ensure indexes are enabled
3976    let stats = mem.stats()?;
3977    if !stats.has_lex_index {
3978        mem.enable_lex()?;
3979    }
3980    if !stats.has_vec_index {
3981        mem.enable_vec()?;
3982    }
3983
3984    // Read batch input
3985    let batch_json: String = if let Some(input_path) = &args.input {
3986        fs::read_to_string(input_path)
3987            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
3988    } else {
3989        let mut buffer = String::new();
3990        io::stdin()
3991            .read_line(&mut buffer)
3992            .context("Failed to read from stdin")?;
3993        buffer
3994    };
3995
3996    let batch: PutManyBatch =
3997        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
3998
3999    if batch.requests.is_empty() {
4000        if args.json {
4001            println!("{{\"frame_ids\": [], \"count\": 0}}");
4002        } else {
4003            println!("No requests to process");
4004        }
4005        return Ok(());
4006    }
4007
4008    let count = batch.requests.len();
4009    if !args.json {
4010        eprintln!("Processing {} documents...", count);
4011    }
4012
4013    // Convert to ParallelInput
4014    let inputs: Vec<ParallelInput> = batch
4015        .requests
4016        .into_iter()
4017        .enumerate()
4018        .map(|(i, req)| {
4019            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4020            let mut options = PutOptions::default();
4021            options.title = Some(req.title);
4022            options.uri = Some(uri);
4023            options.tags = req.tags;
4024            options.labels = req.labels;
4025
4026            ParallelInput {
4027                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4028                options,
4029                embedding: req.embedding,
4030                chunk_embeddings: req.chunk_embeddings,
4031            }
4032        })
4033        .collect();
4034
4035    // Build options
4036    let build_opts = BuildOpts {
4037        zstd_level: args.compression_level.clamp(1, 9),
4038        ..BuildOpts::default()
4039    };
4040
4041    // Ingest batch
4042    let frame_ids = mem
4043        .put_parallel_inputs(&inputs, build_opts)
4044        .context("Failed to ingest batch")?;
4045
4046    if args.json {
4047        let output = serde_json::json!({
4048            "frame_ids": frame_ids,
4049            "count": frame_ids.len()
4050        });
4051        println!("{}", serde_json::to_string(&output)?);
4052    } else {
4053        println!("Ingested {} documents", frame_ids.len());
4054        if frame_ids.len() <= 10 {
4055            for fid in &frame_ids {
4056                println!("  frame_id: {}", fid);
4057            }
4058        } else {
4059            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4060            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4061        }
4062    }
4063
4064    Ok(())
4065}