memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, EnrichmentEngine, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, RulesEngine, Stats, Tier, TimelineQueryBuilder,
34    MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
48/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
49const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50
51/// Parse a timestamp from either epoch seconds or human-readable date formats.
52/// Accepts:
53/// - Epoch seconds: "1673740800"
54/// - ISO format: "2023-01-15"
55/// - US format: "Jan 15, 2023" or "January 15, 2023"
56/// - Slash format: "01/15/2023" or "1/15/2023"
57fn parse_timestamp(s: &str) -> Result<i64, String> {
58    let s = s.trim();
59
60    // Try parsing as epoch timestamp first (pure digits, optionally negative)
61    if s.chars().all(|c| c.is_ascii_digit()) ||
62       (s.starts_with('-') && s[1..].chars().all(|c| c.is_ascii_digit())) {
63        return s.parse::<i64>().map_err(|e| format!("Invalid epoch timestamp: {e}"));
64    }
65
66    // Try various date formats
67    use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
68
69    // Parse date and return midnight UTC
70    fn date_to_epoch(date: NaiveDate) -> i64 {
71        let datetime = NaiveDateTime::new(date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
72        Utc.from_utc_datetime(&datetime).timestamp()
73    }
74
75    // Try ISO format: "2023-01-15"
76    if let Ok(date) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
77        return Ok(date_to_epoch(date));
78    }
79
80    // Try US formats: "Jan 15, 2023" or "January 15, 2023"
81    if let Ok(date) = NaiveDate::parse_from_str(s, "%b %d, %Y") {
82        return Ok(date_to_epoch(date));
83    }
84    if let Ok(date) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
85        return Ok(date_to_epoch(date));
86    }
87
88    // Try slash format: "01/15/2023" or "1/15/2023"
89    if let Ok(date) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
90        return Ok(date_to_epoch(date));
91    }
92
93    // Try European format: "15-01-2023" or "15/01/2023"
94    if let Ok(date) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
95        return Ok(date_to_epoch(date));
96    }
97    if let Ok(date) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
98        return Ok(date_to_epoch(date));
99    }
100
101    Err(format!(
102        "Invalid timestamp format: '{}'. Use epoch seconds (1673740800) or date format (Jan 15, 2023 / 2023-01-15 / 01/15/2023)",
103        s
104    ))
105}
106const COLOR_PALETTE_SIZE: u8 = 5;
107const COLOR_PALETTE_QUALITY: u8 = 8;
108const MAGIC_SNIFF_BYTES: usize = 16;
109/// Maximum number of images to extract from a PDF for CLIP visual search.
110/// Set high to capture all images; processing stops when no more images found.
111#[cfg(feature = "clip")]
112const CLIP_PDF_MAX_IMAGES: usize = 100;
113#[cfg(feature = "clip")]
114const CLIP_PDF_TARGET_PX: u32 = 768;
115
116/// Truncate text for embedding to fit within OpenAI token limits.
117/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
118fn truncate_for_embedding(text: &str) -> String {
119    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
120        text.to_string()
121    } else {
122        // Find a char boundary to avoid splitting UTF-8
123        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
124        // Try to find the last complete character
125        let end = truncated
126            .char_indices()
127            .rev()
128            .next()
129            .map(|(i, c)| i + c.len_utf8())
130            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
131        text[..end].to_string()
132    }
133}
134
135fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
136    options.extra_metadata.insert(
137        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
138        runtime.provider_kind().to_string(),
139    );
140    options.extra_metadata.insert(
141        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
142        runtime.provider_model_id(),
143    );
144    options.extra_metadata.insert(
145        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
146        runtime.dimension().to_string(),
147    );
148}
149
150fn apply_embedding_identity_metadata_from_choice(
151    options: &mut PutOptions,
152    model: EmbeddingModelChoice,
153    dimension: usize,
154    model_id_override: Option<&str>,
155) {
156    let provider = match model {
157        EmbeddingModelChoice::OpenAILarge
158        | EmbeddingModelChoice::OpenAISmall
159        | EmbeddingModelChoice::OpenAIAda => "openai",
160        EmbeddingModelChoice::Nvidia => "nvidia",
161        _ => "fastembed",
162    };
163
164    let model_id = match model {
165        EmbeddingModelChoice::Nvidia => model_id_override
166            .map(str::trim)
167            .filter(|value| !value.is_empty())
168            .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
169            .filter(|value| !value.is_empty())
170            .map(|value| {
171                if value.contains('/') {
172                    value.to_string()
173                } else {
174                    format!("nvidia/{value}")
175                }
176            })
177            .unwrap_or_else(|| model.canonical_model_id().to_string()),
178        _ => model.canonical_model_id().to_string(),
179    };
180
181    options.extra_metadata.insert(
182        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
183        provider.to_string(),
184    );
185    options.extra_metadata.insert(
186        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
187        model_id,
188    );
189    options.extra_metadata.insert(
190        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
191        dimension.to_string(),
192    );
193}
194
195/// Get the default local model path for contextual retrieval (phi-3.5-mini).
196/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
197#[cfg(feature = "llama-cpp")]
198fn get_local_contextual_model_path() -> Result<PathBuf> {
199    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
200        // Handle ~ expansion manually
201        let expanded = if custom.starts_with("~/") {
202            if let Ok(home) = env::var("HOME") {
203                PathBuf::from(home).join(&custom[2..])
204            } else {
205                PathBuf::from(&custom)
206            }
207        } else {
208            PathBuf::from(&custom)
209        };
210        expanded
211    } else {
212        // Default to ~/.memvid/models
213        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
214        PathBuf::from(home).join(".memvid").join("models")
215    };
216
217    let model_path = models_dir
218        .join("llm")
219        .join("phi-3-mini-q4")
220        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
221
222    if model_path.exists() {
223        Ok(model_path)
224    } else {
225        Err(anyhow!(
226            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
227            model_path.display()
228        ))
229    }
230}
231
232use pathdiff::diff_paths;
233
234use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
235use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
236use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
237use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
238use crate::error::{CapacityExceededMessage, DuplicateUriError};
239use crate::utils::{
240    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
241    select_frame,
242};
243#[cfg(feature = "temporal_enrich")]
244use memvid_core::enrich_chunks as temporal_enrich_chunks;
245
246/// Helper function to parse boolean flags from environment variables
247fn parse_bool_flag(value: &str) -> Option<bool> {
248    match value.trim().to_ascii_lowercase().as_str() {
249        "1" | "true" | "yes" | "on" => Some(true),
250        "0" | "false" | "no" | "off" => Some(false),
251        _ => None,
252    }
253}
254
255/// Helper function to check for parallel segments environment override
256#[cfg(feature = "parallel_segments")]
257fn parallel_env_override() -> Option<bool> {
258    std::env::var("MEMVID_PARALLEL_SEGMENTS")
259        .ok()
260        .and_then(|value| parse_bool_flag(&value))
261}
262
263/// Check if CLIP model files are installed
264#[cfg(feature = "clip")]
265#[allow(dead_code)]
266fn is_clip_model_installed() -> bool {
267    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
268        PathBuf::from(custom)
269    } else if let Ok(home) = env::var("HOME") {
270        PathBuf::from(home).join(".memvid/models")
271    } else {
272        PathBuf::from(".memvid/models")
273    };
274
275    let model_name =
276        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
277
278    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
279    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
280
281    vision_model.exists() && text_model.exists()
282}
283
284/// Minimum confidence threshold for NER entities (0.0-1.0)
285/// Note: Lower threshold captures more entities but also more noise
286#[cfg(feature = "logic_mesh")]
287const NER_MIN_CONFIDENCE: f32 = 0.50;
288
289/// Minimum length for entity names (characters)
290#[cfg(feature = "logic_mesh")]
291const NER_MIN_ENTITY_LEN: usize = 2;
292
293/// Maximum gap (in characters) between adjacent entities to merge
294/// Allows merging "S" + "&" + "P Global" when they're close together
295#[cfg(feature = "logic_mesh")]
296const MAX_MERGE_GAP: usize = 3;
297
298/// Merge adjacent NER entities that appear to be tokenization artifacts.
299///
300/// The BERT tokenizer splits names at special characters like `&`, producing
301/// separate entities like "S", "&", "P Global" instead of "S&P Global".
302/// This function merges adjacent entities of the same type when they're
303/// contiguous or nearly contiguous in the original text.
304#[cfg(feature = "logic_mesh")]
305fn merge_adjacent_entities(
306    entities: Vec<memvid_core::ExtractedEntity>,
307    original_text: &str,
308) -> Vec<memvid_core::ExtractedEntity> {
309    use memvid_core::ExtractedEntity;
310
311    if entities.is_empty() {
312        return entities;
313    }
314
315    // Sort by byte position
316    let mut sorted: Vec<ExtractedEntity> = entities;
317    sorted.sort_by_key(|e| e.byte_start);
318
319    let mut merged: Vec<ExtractedEntity> = Vec::new();
320
321    for entity in sorted {
322        if let Some(last) = merged.last_mut() {
323            // Check if this entity is adjacent to the previous one and same type
324            let gap = entity.byte_start.saturating_sub(last.byte_end);
325            let same_type = last.entity_type == entity.entity_type;
326
327            // Check what's in the gap (if any)
328            let gap_is_mergeable = if gap == 0 {
329                true
330            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
331                // Gap content should be whitespace (but not newlines), punctuation, or connectors
332                let gap_text = &original_text[last.byte_end..entity.byte_start];
333                // Don't merge across newlines - that indicates separate entities
334                if gap_text.contains('\n') || gap_text.contains('\r') {
335                    false
336                } else {
337                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
338                }
339            } else {
340                false
341            };
342
343            if same_type && gap_is_mergeable {
344                // Merge: extend the previous entity to include this one
345                let merged_text = if entity.byte_end <= original_text.len() {
346                    original_text[last.byte_start..entity.byte_end].to_string()
347                } else {
348                    format!("{}{}", last.text, entity.text)
349                };
350
351                tracing::debug!(
352                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
353                    last.text, entity.text, merged_text, gap
354                );
355
356                last.text = merged_text;
357                last.byte_end = entity.byte_end;
358                // Average the confidence
359                last.confidence = (last.confidence + entity.confidence) / 2.0;
360                continue;
361            }
362        }
363
364        merged.push(entity);
365    }
366
367    merged
368}
369
370/// Filter and clean extracted NER entities to remove noise.
371/// Returns true if the entity should be kept.
372#[cfg(feature = "logic_mesh")]
373fn is_valid_entity(text: &str, confidence: f32) -> bool {
374    // Filter by confidence
375    if confidence < NER_MIN_CONFIDENCE {
376        return false;
377    }
378
379    let trimmed = text.trim();
380
381    // Filter by minimum length
382    if trimmed.len() < NER_MIN_ENTITY_LEN {
383        return false;
384    }
385
386    // Filter out entities that are just whitespace or punctuation
387    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
388        return false;
389    }
390
391    // Filter out entities containing newlines (malformed extractions)
392    if trimmed.contains('\n') || trimmed.contains('\r') {
393        return false;
394    }
395
396    // Filter out entities that look like partial words (start/end with ##)
397    if trimmed.starts_with("##") || trimmed.ends_with("##") {
398        return false;
399    }
400
401    // Filter out entities ending with period (likely sentence fragments)
402    if trimmed.ends_with('.') {
403        return false;
404    }
405
406    let lower = trimmed.to_lowercase();
407
408    // Filter out common single-letter false positives
409    if trimmed.len() == 1 {
410        return false;
411    }
412
413    // Filter out 2-letter tokens that are common noise
414    if trimmed.len() == 2 {
415        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
416        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
417            return true;
418        }
419        // Filter other 2-letter tokens
420        return false;
421    }
422
423    // Filter out common noise words
424    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
425        return false;
426    }
427
428    // Filter out partial tokenization artifacts
429    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
430    let words: Vec<&str> = trimmed.split_whitespace().collect();
431    if words.len() >= 2 {
432        let first_word = words[0];
433        // If first word is 1-2 chars and looks like a fragment, filter it
434        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
435            // But allow "US Bank", "UK Office", etc.
436            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
437                return false;
438            }
439        }
440    }
441
442    // Filter entities that start with lowercase (likely partial words)
443    if let Some(first_char) = trimmed.chars().next() {
444        if first_char.is_lowercase() {
445            return false;
446        }
447    }
448
449    true
450}
451
452/// Parse key=value pairs for tags
453pub fn parse_key_val(s: &str) -> Result<(String, String)> {
454    let (key, value) = s
455        .split_once('=')
456        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
457    Ok((key.to_string(), value.to_string()))
458}
459
460/// Lock-related CLI arguments (shared across commands)
461#[derive(Args, Clone, Copy, Debug)]
462pub struct LockCliArgs {
463    /// Maximum time to wait for an active writer before failing (ms)
464    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
465    pub lock_timeout: u64,
466    /// Attempt a stale takeover if the recorded writer heartbeat has expired
467    #[arg(long = "force", action = ArgAction::SetTrue)]
468    pub force: bool,
469}
470
471/// Arguments for the `put` subcommand
472#[derive(Args)]
473pub struct PutArgs {
474    /// Path to the memory file to append to
475    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
476    pub file: PathBuf,
477    /// Emit JSON instead of human-readable output
478    #[arg(long)]
479    pub json: bool,
480    /// Read payload bytes from a file instead of stdin
481    #[arg(long, value_name = "PATH")]
482    pub input: Option<String>,
483    /// Override the derived URI for the document
484    #[arg(long, value_name = "URI")]
485    pub uri: Option<String>,
486    /// Override the derived title for the document
487    #[arg(long, value_name = "TITLE")]
488    pub title: Option<String>,
489    /// Optional timestamp to store on the frame.
490    /// Accepts epoch seconds (1673740800) or human-readable dates:
491    /// "Jan 15, 2023", "2023-01-15", "01/15/2023"
492    #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
493    pub timestamp: Option<i64>,
494    /// Track name for the frame
495    #[arg(long)]
496    pub track: Option<String>,
497    /// Kind metadata for the frame
498    #[arg(long)]
499    pub kind: Option<String>,
500    /// Store the payload as a binary video without transcoding
501    #[arg(long, action = ArgAction::SetTrue)]
502    pub video: bool,
503    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
504    #[arg(long, action = ArgAction::SetTrue)]
505    pub transcript: bool,
506    /// Tags to attach in key=value form
507    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
508    pub tags: Vec<(String, String)>,
509    /// Free-form labels to attach to the frame
510    #[arg(long = "label", value_name = "LABEL")]
511    pub labels: Vec<String>,
512    /// Additional metadata payloads expressed as JSON
513    #[arg(long = "metadata", value_name = "JSON")]
514    pub metadata: Vec<String>,
515    /// Disable automatic tag generation from extracted text
516    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
517    pub no_auto_tag: bool,
518    /// Disable automatic date extraction from content
519    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
520    pub no_extract_dates: bool,
521    /// Disable automatic triplet extraction (SPO facts as MemoryCards)
522    #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
523    pub no_extract_triplets: bool,
524    /// Force audio analysis and tagging when ingesting binary data
525    #[arg(long, action = ArgAction::SetTrue)]
526    pub audio: bool,
527    /// Transcribe audio using Whisper and use the transcript for text embedding
528    /// Requires the 'whisper' feature to be enabled
529    #[arg(long, action = ArgAction::SetTrue)]
530    pub transcribe: bool,
531    /// Override the default segment duration (in seconds) when generating audio snippets
532    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
533    pub audio_segment_seconds: Option<u32>,
534    /// Compute semantic embeddings using the installed model
535    /// Increases storage overhead from ~5KB to ~270KB per document
536    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
537    pub embedding: bool,
538    /// Explicitly disable embeddings (default, but kept for clarity)
539    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
540    pub no_embedding: bool,
541    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
542    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
543    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
544    pub embedding_vec: Option<PathBuf>,
545    /// Embedding model identity for the provided `--embedding-vec` vector.
546    ///
547    /// This is written into per-frame `extra_metadata` (`memvid.embedding.*`) so that
548    /// semantic queries can auto-select the correct runtime across CLI/SDKs.
549    ///
550    /// Options: bge-small, bge-base, nomic, gte-large, openai, openai-small, openai-ada, nvidia
551    /// (also accepts canonical IDs like `text-embedding-3-small` and `BAAI/bge-small-en-v1.5`).
552    #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
553    pub embedding_vec_model: Option<String>,
554    /// Enable Product Quantization compression for vectors (16x compression)
555    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
556    /// Only applies when --embedding is enabled
557    #[arg(long, action = ArgAction::SetTrue)]
558    pub vector_compression: bool,
559    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
560    /// This improves retrieval accuracy for preference/personalization queries
561    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
562    #[arg(long, action = ArgAction::SetTrue)]
563    pub contextual: bool,
564    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
565    #[arg(long = "contextual-model", value_name = "MODEL")]
566    pub contextual_model: Option<String>,
567    /// Automatically extract tables from PDF documents
568    /// Uses aggressive mode with fallbacks for best results
569    #[arg(long, action = ArgAction::SetTrue)]
570    pub tables: bool,
571    /// Disable automatic table extraction (when --tables is the default)
572    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
573    pub no_tables: bool,
574    /// Time budget for text extraction per document (milliseconds)
575    /// When set, extraction stops early and queues the frame for background enrichment
576    /// Use `memvid process-queue` to complete extraction later
577    #[arg(long = "extraction-budget-ms", value_name = "MS")]
578    pub extraction_budget_ms: Option<u64>,
579    /// Enable CLIP visual embeddings for images and PDF pages
580    /// Allows text-to-image search using natural language queries
581    /// Disabled by default; use --clip to enable
582    #[cfg(feature = "clip")]
583    #[arg(long, action = ArgAction::SetTrue)]
584    pub clip: bool,
585
586    /// Disable CLIP visual embeddings (no-op, CLIP is disabled by default)
587    #[cfg(feature = "clip")]
588    #[arg(long, action = ArgAction::SetTrue, hide = true)]
589    pub no_clip: bool,
590
591    /// Replace any existing frame with the same URI instead of inserting a duplicate
592    #[arg(long, conflicts_with = "allow_duplicate")]
593    pub update_existing: bool,
594    /// Allow inserting a new frame even if a frame with the same URI already exists
595    #[arg(long)]
596    pub allow_duplicate: bool,
597
598    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
599    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
600    /// Requires the temporal_enrich feature in memvid-core
601    #[arg(long, action = ArgAction::SetTrue)]
602    pub temporal_enrich: bool,
603
604    /// Bind to a dashboard memory ID (UUID or 24-char ObjectId) for capacity and tracking
605    /// If the file is not already bound, this will sync the capacity ticket from the dashboard
606    #[arg(long = "memory-id", value_name = "ID")]
607    pub memory_id: Option<crate::commands::tickets::MemoryId>,
608
609    /// Build Logic-Mesh entity graph during ingestion (opt-in)
610    /// Extracts entities (people, organizations, locations, etc.) and their relationships
611    /// Enables graph traversal with `memvid follow`
612    /// Requires NER model: `memvid models install --ner distilbert-ner`
613    #[arg(long, action = ArgAction::SetTrue)]
614    pub logic_mesh: bool,
615
616    /// Use the experimental parallel segment builder (requires --features parallel_segments)
617    #[cfg(feature = "parallel_segments")]
618    #[arg(long, action = ArgAction::SetTrue)]
619    pub parallel_segments: bool,
620    /// Force the legacy ingestion path even when the parallel feature is available
621    #[cfg(feature = "parallel_segments")]
622    #[arg(long, action = ArgAction::SetTrue)]
623    pub no_parallel_segments: bool,
624    /// Target tokens per segment when using the parallel builder
625    #[cfg(feature = "parallel_segments")]
626    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
627    pub parallel_segment_tokens: Option<usize>,
628    /// Target pages per segment when using the parallel builder
629    #[cfg(feature = "parallel_segments")]
630    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
631    pub parallel_segment_pages: Option<usize>,
632    /// Worker threads used by the parallel builder
633    #[cfg(feature = "parallel_segments")]
634    #[arg(long = "parallel-threads", value_name = "N")]
635    pub parallel_threads: Option<usize>,
636    /// Worker queue depth used by the parallel builder
637    #[cfg(feature = "parallel_segments")]
638    #[arg(long = "parallel-queue-depth", value_name = "N")]
639    pub parallel_queue_depth: Option<usize>,
640
641    /// Store raw binary content along with extracted text.
642    /// By default, only extracted text + BLAKE3 hash is stored (--no-raw behavior).
643    /// Use --raw when you need to retrieve the original file later.
644    #[arg(long = "raw", action = ArgAction::SetTrue)]
645    pub raw: bool,
646
647    /// Don't store raw binary content (DEFAULT behavior).
648    /// Only extracted text + BLAKE3 hash is stored.
649    /// This flag is kept for backwards compatibility but is now the default.
650    #[arg(long = "no-raw", action = ArgAction::SetTrue, hide = true)]
651    pub no_raw: bool,
652
653    /// Skip ingestion if identical content (by BLAKE3 hash) already exists in the memory.
654    /// Returns the existing frame's ID instead of creating a duplicate.
655    #[arg(long, action = ArgAction::SetTrue)]
656    pub dedup: bool,
657
658    /// Run rules-based memory extraction after ingestion (default: enabled)
659    /// Extracts facts, preferences, events, relationships from ingested content
660    #[arg(long, action = ArgAction::SetTrue, default_value_t = true)]
661    pub enrich: bool,
662
663    /// Disable automatic rules-based enrichment
664    #[arg(long = "no-enrich", action = ArgAction::SetTrue)]
665    pub no_enrich: bool,
666
667    #[command(flatten)]
668    pub lock: LockCliArgs,
669}
670
671#[cfg(feature = "parallel_segments")]
672impl PutArgs {
673    pub fn wants_parallel(&self) -> bool {
674        if self.parallel_segments {
675            return true;
676        }
677        if self.no_parallel_segments {
678            return false;
679        }
680        if let Some(env_flag) = parallel_env_override() {
681            return env_flag;
682        }
683        false
684    }
685
686    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
687        let mut opts = memvid_core::BuildOpts::default();
688        if let Some(tokens) = self.parallel_segment_tokens {
689            opts.segment_tokens = tokens;
690        }
691        if let Some(pages) = self.parallel_segment_pages {
692            opts.segment_pages = pages;
693        }
694        if let Some(threads) = self.parallel_threads {
695            opts.threads = threads;
696        }
697        if let Some(depth) = self.parallel_queue_depth {
698            opts.queue_depth = depth;
699        }
700        opts.sanitize();
701        opts
702    }
703}
704
705#[cfg(not(feature = "parallel_segments"))]
706impl PutArgs {
707    pub fn wants_parallel(&self) -> bool {
708        false
709    }
710}
711
712/// Arguments for the `api-fetch` subcommand
713#[derive(Args)]
714pub struct ApiFetchArgs {
715    /// Path to the memory file to ingest into
716    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
717    pub file: PathBuf,
718    /// Path to the fetch configuration JSON file
719    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
720    pub config: PathBuf,
721    /// Perform a dry run without writing to the memory
722    #[arg(long, action = ArgAction::SetTrue)]
723    pub dry_run: bool,
724    /// Override the configured ingest mode
725    #[arg(long, value_name = "MODE")]
726    pub mode: Option<ApiFetchMode>,
727    /// Override the base URI used when constructing frame URIs
728    #[arg(long, value_name = "URI")]
729    pub uri: Option<String>,
730    /// Emit JSON instead of human-readable output
731    #[arg(long, action = ArgAction::SetTrue)]
732    pub json: bool,
733
734    #[command(flatten)]
735    pub lock: LockCliArgs,
736}
737
738/// Arguments for the `update` subcommand
739#[derive(Args)]
740pub struct UpdateArgs {
741    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
742    pub file: PathBuf,
743    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
744    pub frame_id: Option<u64>,
745    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
746    pub uri: Option<String>,
747    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
748    pub input: Option<PathBuf>,
749    #[arg(long = "set-uri", value_name = "URI")]
750    pub set_uri: Option<String>,
751    #[arg(long, value_name = "TITLE")]
752    pub title: Option<String>,
753    /// Timestamp for the frame. Accepts epoch seconds or human-readable dates.
754    #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
755    pub timestamp: Option<i64>,
756    #[arg(long, value_name = "TRACK")]
757    pub track: Option<String>,
758    #[arg(long, value_name = "KIND")]
759    pub kind: Option<String>,
760    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
761    pub tags: Vec<(String, String)>,
762    #[arg(long = "label", value_name = "LABEL")]
763    pub labels: Vec<String>,
764    #[arg(long = "metadata", value_name = "JSON")]
765    pub metadata: Vec<String>,
766    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
767    pub embeddings: bool,
768    #[arg(long)]
769    pub json: bool,
770
771    #[command(flatten)]
772    pub lock: LockCliArgs,
773}
774
775/// Arguments for the `delete` subcommand
776#[derive(Args)]
777pub struct DeleteArgs {
778    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
779    pub file: PathBuf,
780    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
781    pub frame_id: Option<u64>,
782    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
783    pub uri: Option<String>,
784    #[arg(long, action = ArgAction::SetTrue)]
785    pub yes: bool,
786    #[arg(long)]
787    pub json: bool,
788
789    #[command(flatten)]
790    pub lock: LockCliArgs,
791}
792
793/// Handler for `memvid api-fetch`
794pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
795    let command = ApiFetchCommand {
796        file: args.file,
797        config_path: args.config,
798        dry_run: args.dry_run,
799        mode_override: args.mode,
800        uri_override: args.uri,
801        output_json: args.json,
802        lock_timeout_ms: args.lock.lock_timeout,
803        force_lock: args.lock.force,
804    };
805    run_api_fetch(config, command)
806}
807
808/// Handler for `memvid delete`
809pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
810    let mut mem = Memvid::open(&args.file)?;
811    ensure_cli_mutation_allowed(&mem)?;
812    apply_lock_cli(&mut mem, &args.lock);
813    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
814
815    if !args.yes {
816        if let Some(uri) = &frame.uri {
817            println!("About to delete frame {} ({uri})", frame.id);
818        } else {
819            println!("About to delete frame {}", frame.id);
820        }
821        print!("Confirm? [y/N]: ");
822        io::stdout().flush()?;
823        let mut input = String::new();
824        io::stdin().read_line(&mut input)?;
825        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
826        if !confirmed {
827            println!("Aborted");
828            return Ok(());
829        }
830    }
831
832    let seq = mem.delete_frame(frame.id)?;
833    mem.commit()?;
834    let deleted = mem.frame_by_id(frame.id)?;
835
836    if args.json {
837        let json = json!({
838            "frame_id": frame.id,
839            "sequence": seq,
840            "status": frame_status_str(deleted.status),
841        });
842        println!("{}", serde_json::to_string_pretty(&json)?);
843    } else {
844        println!("Deleted frame {} (seq {})", frame.id, seq);
845    }
846
847    Ok(())
848}
849pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
850    // Check if plan allows write operations (blocks expired subscriptions)
851    crate::utils::require_active_plan(config, "put")?;
852
853    let mut mem = Memvid::open(&args.file)?;
854    ensure_cli_mutation_allowed(&mem)?;
855    apply_lock_cli(&mut mem, &args.lock);
856
857    // If memory-id is provided and file isn't already bound, bind it now
858    if let Some(memory_id) = &args.memory_id {
859        let needs_binding = match mem.get_memory_binding() {
860            Some(binding) => binding.memory_id != **memory_id,
861            None => true,
862        };
863        if needs_binding {
864            match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
865                Ok((bound_id, capacity)) => {
866                    eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
867                }
868                Err(e) => {
869                    eprintln!("⚠️  Failed to bind to dashboard memory: {}", e);
870                    eprintln!("   Continuing with existing capacity. Bind manually with:");
871                    eprintln!("   memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
872                }
873            }
874        }
875    }
876
877    // Load active replay session if one exists
878    #[cfg(feature = "replay")]
879    let _ = mem.load_active_session();
880    let mut stats = mem.stats()?;
881    if stats.frame_count == 0 && !stats.has_lex_index {
882        mem.enable_lex()?;
883        stats = mem.stats()?;
884    }
885
886    // Check if current memory size exceeds free tier limit
887    // If so, require API key to continue
888    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
889
890    // Set vector compression mode if requested
891    if args.vector_compression {
892        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
893    }
894
895    let mut capacity_guard = CapacityGuard::from_stats(&stats);
896    let use_parallel = args.wants_parallel();
897    #[cfg(feature = "parallel_segments")]
898    let mut parallel_opts = if use_parallel {
899        Some(args.sanitized_parallel_opts())
900    } else {
901        None
902    };
903    #[cfg(feature = "parallel_segments")]
904    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
905    #[cfg(feature = "parallel_segments")]
906    let mut pending_parallel_indices: Vec<usize> = Vec::new();
907    let input_set = resolve_inputs(args.input.as_deref())?;
908    if args.embedding && args.embedding_vec.is_some() {
909        anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
910    }
911    if args.embedding_vec.is_some() {
912        match &input_set {
913            InputSet::Files(paths) if paths.len() != 1 => {
914                anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
915            }
916            _ => {}
917        }
918    }
919
920    if args.video {
921        if let Some(kind) = &args.kind {
922            if !kind.eq_ignore_ascii_case("video") {
923                anyhow::bail!("--video conflicts with explicit --kind={kind}");
924            }
925        }
926        if matches!(input_set, InputSet::Stdin) {
927            anyhow::bail!("--video requires --input <file>");
928        }
929    }
930
931    #[allow(dead_code)]
932    enum EmbeddingMode {
933        None,
934        Auto,
935        Explicit,
936    }
937
938    // PHASE 1: Make embeddings opt-in, not opt-out
939    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
940    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
941    let mv2_dimension = mem.effective_vec_index_dimension()?;
942    let inferred_embedding_model = if args.embedding {
943        match mem.embedding_identity_summary(10_000) {
944            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
945            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
946                let models: Vec<_> = identities
947                    .iter()
948                    .filter_map(|entry| entry.identity.model.as_deref())
949                    .collect();
950                anyhow::bail!(
951                    "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
952                    Detected models: {:?}\n\n\
953                    Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
954                    models
955                );
956            }
957            memvid_core::EmbeddingIdentitySummary::Unknown => None,
958        }
959    } else {
960        None
961    };
962    let (embedding_mode, embedding_runtime) = if args.embedding {
963        (
964            EmbeddingMode::Explicit,
965            Some(load_embedding_runtime_for_mv2(
966                config,
967                inferred_embedding_model.as_deref(),
968                mv2_dimension,
969            )?),
970        )
971    } else if args.embedding_vec.is_some() {
972        (EmbeddingMode::Explicit, None)
973    } else {
974        (EmbeddingMode::None, None)
975    };
976    let embedding_enabled = args.embedding || args.embedding_vec.is_some();
977    let runtime_ref = embedding_runtime.as_ref();
978
979    // Enable CLIP index only if --clip flag is set explicitly
980    // CLIP is disabled by default to avoid unexpected overhead
981    #[cfg(feature = "clip")]
982    let clip_enabled = args.clip;
983    #[cfg(feature = "clip")]
984    let clip_model = if clip_enabled {
985        mem.enable_clip()?;
986        if !args.json {
987            eprintln!("ℹ️  CLIP visual embeddings enabled");
988            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
989            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
990            eprintln!();
991        }
992        // Initialize CLIP model for generating image embeddings
993        match memvid_core::ClipModel::default_model() {
994            Ok(model) => Some(model),
995            Err(e) => {
996                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
997                None
998            }
999        }
1000    } else {
1001        None
1002    };
1003
1004    // Warn user about vector storage overhead (PHASE 2)
1005    if embedding_enabled && !args.json {
1006        let capacity = stats.capacity_bytes;
1007        if args.vector_compression {
1008            // PHASE 3: PQ compression reduces storage 16x
1009            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
1010            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
1011            eprintln!("   Storage: ~20 KB per document (16x compressed)");
1012            eprintln!("   Accuracy: ~95% recall@10 maintained");
1013            eprintln!(
1014                "   Capacity: ~{} documents max for {:.1} GB",
1015                max_docs,
1016                capacity as f64 / 1e9
1017            );
1018            eprintln!();
1019        } else {
1020            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
1021            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
1022            eprintln!("   Storage: ~270 KB per document");
1023            eprintln!(
1024                "   Capacity: ~{} documents max for {:.1} GB",
1025                max_docs,
1026                capacity as f64 / 1e9
1027            );
1028            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
1029            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
1030            eprintln!();
1031        }
1032    }
1033
1034    let embed_progress = if embedding_enabled {
1035        let total = match &input_set {
1036            InputSet::Files(paths) => Some(paths.len() as u64),
1037            InputSet::Stdin => None,
1038        };
1039        Some(create_task_progress_bar(total, "embed", false))
1040    } else {
1041        None
1042    };
1043    let mut embedded_docs = 0usize;
1044
1045    if let InputSet::Files(paths) = &input_set {
1046        if paths.len() > 1 {
1047            if args.uri.is_some() || args.title.is_some() {
1048                anyhow::bail!(
1049                    "--uri/--title apply to a single file; omit them when using a directory or glob"
1050                );
1051            }
1052        }
1053    }
1054
1055    let mut reports = Vec::new();
1056    let mut processed = 0usize;
1057    let mut skipped = 0usize;
1058    let mut bytes_added = 0u64;
1059    let mut bytes_input = 0u64;
1060    let mut no_raw_count = 0usize;
1061    let mut summary_warnings: Vec<String> = Vec::new();
1062    #[cfg(feature = "clip")]
1063    let mut clip_embeddings_added = false;
1064
1065    let mut capacity_reached = false;
1066    match input_set {
1067        InputSet::Stdin => {
1068            processed += 1;
1069            match read_payload(None) {
1070                Ok(payload) => {
1071                    let mut analyze_spinner = if args.json {
1072                        None
1073                    } else {
1074                        Some(create_spinner("Analyzing stdin payload..."))
1075                    };
1076                    match ingest_payload(
1077                        &mut mem,
1078                        &args,
1079                        config,
1080                        payload,
1081                        None,
1082                        capacity_guard.as_mut(),
1083                        embedding_enabled,
1084                        runtime_ref,
1085                        use_parallel,
1086                    ) {
1087                        Ok(outcome) => {
1088                            if let Some(pb) = analyze_spinner.take() {
1089                                pb.finish_and_clear();
1090                            }
1091                            bytes_added += outcome.report.stored_bytes as u64;
1092                            bytes_input += outcome.report.original_bytes as u64;
1093                            if outcome.report.no_raw {
1094                                no_raw_count += 1;
1095                            }
1096                            if args.json {
1097                                summary_warnings
1098                                    .extend(outcome.report.extraction.warnings.iter().cloned());
1099                            }
1100                            reports.push(outcome.report);
1101                            let report_index = reports.len() - 1;
1102                            if !args.json && !use_parallel {
1103                                if let Some(report) = reports.get(report_index) {
1104                                    print_report(report);
1105                                }
1106                            }
1107                            if args.transcript {
1108                                let notice = transcript_notice_message(&mut mem)?;
1109                                if args.json {
1110                                    summary_warnings.push(notice);
1111                                } else {
1112                                    println!("{notice}");
1113                                }
1114                            }
1115                            if outcome.embedded {
1116                                if let Some(pb) = embed_progress.as_ref() {
1117                                    pb.inc(1);
1118                                }
1119                                embedded_docs += 1;
1120                            }
1121                            if use_parallel {
1122                                #[cfg(feature = "parallel_segments")]
1123                                if let Some(input) = outcome.parallel_input {
1124                                    pending_parallel_indices.push(report_index);
1125                                    pending_parallel_inputs.push(input);
1126                                }
1127                            } else if let Some(guard) = capacity_guard.as_mut() {
1128                                mem.commit()?;
1129                                let stats = mem.stats()?;
1130                                guard.update_after_commit(&stats);
1131                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1132                            }
1133                        }
1134                        Err(err) => {
1135                            if let Some(pb) = analyze_spinner.take() {
1136                                pb.finish_and_clear();
1137                            }
1138                            if err.downcast_ref::<DuplicateUriError>().is_some() {
1139                                return Err(err);
1140                            }
1141                            warn!("skipped stdin payload: {err}");
1142                            skipped += 1;
1143                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1144                                capacity_reached = true;
1145                            }
1146                        }
1147                    }
1148                }
1149                Err(err) => {
1150                    warn!("failed to read stdin: {err}");
1151                    skipped += 1;
1152                }
1153            }
1154        }
1155        InputSet::Files(ref paths) => {
1156            let mut transcript_notice_printed = false;
1157            for path in paths {
1158                processed += 1;
1159                match read_payload(Some(&path)) {
1160                    Ok(payload) => {
1161                        let label = path.display().to_string();
1162                        let mut analyze_spinner = if args.json {
1163                            None
1164                        } else {
1165                            Some(create_spinner(&format!("Analyzing {label}...")))
1166                        };
1167                        match ingest_payload(
1168                            &mut mem,
1169                            &args,
1170                            config,
1171                            payload,
1172                            Some(&path),
1173                            capacity_guard.as_mut(),
1174                            embedding_enabled,
1175                            runtime_ref,
1176                            use_parallel,
1177                        ) {
1178                            Ok(outcome) => {
1179                                if let Some(pb) = analyze_spinner.take() {
1180                                    pb.finish_and_clear();
1181                                }
1182                                bytes_added += outcome.report.stored_bytes as u64;
1183                                bytes_input += outcome.report.original_bytes as u64;
1184                                if outcome.report.no_raw {
1185                                    no_raw_count += 1;
1186                                }
1187
1188                                // Generate CLIP embedding for image files
1189                                #[cfg(feature = "clip")]
1190                                if let Some(ref model) = clip_model {
1191                                    let mime = outcome.report.mime.as_deref();
1192                                    let clip_frame_id = outcome.report.frame_id;
1193
1194                                    if is_image_file(&path) {
1195                                        match model.encode_image_file(&path) {
1196                                            Ok(embedding) => {
1197                                                if let Err(e) = mem.add_clip_embedding_with_page(
1198                                                    clip_frame_id,
1199                                                    None,
1200                                                    embedding,
1201                                                ) {
1202                                                    warn!(
1203                                                        "Failed to add CLIP embedding for {}: {e}",
1204                                                        path.display()
1205                                                    );
1206                                                } else {
1207                                                    info!(
1208                                                        "Added CLIP embedding for frame {}",
1209                                                        clip_frame_id
1210                                                    );
1211                                                    clip_embeddings_added = true;
1212                                                }
1213                                            }
1214                                            Err(e) => {
1215                                                warn!(
1216                                                    "Failed to encode CLIP embedding for {}: {e}",
1217                                                    path.display()
1218                                                );
1219                                            }
1220                                        }
1221                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1222                                        // Commit before adding child frames to ensure WAL has space
1223                                        if let Err(e) = mem.commit() {
1224                                            warn!("Failed to commit before CLIP extraction: {e}");
1225                                        }
1226
1227                                        match render_pdf_pages_for_clip(
1228                                            &path,
1229                                            CLIP_PDF_MAX_IMAGES,
1230                                            CLIP_PDF_TARGET_PX,
1231                                        ) {
1232                                            Ok(rendered_pages) => {
1233                                                use rayon::prelude::*;
1234
1235                                                // Pre-encode all images to PNG in parallel for better performance
1236                                                // Filter out junk images (solid colors, black, too small)
1237                                                let path_for_closure = path.clone();
1238
1239                                                // Temporarily suppress panic output during image encoding
1240                                                // (some PDFs have malformed images that cause panics in the image crate)
1241                                                let prev_hook = std::panic::take_hook();
1242                                                std::panic::set_hook(Box::new(|_| {
1243                                                    // Silently ignore panics - we handle them with catch_unwind
1244                                                }));
1245
1246                                                let encoded_images: Vec<_> = rendered_pages
1247                                                    .into_par_iter()
1248                                                    .filter_map(|(page_num, image)| {
1249                                                        // Check if image is worth embedding (not junk)
1250                                                        let image_info = get_image_info(&image);
1251                                                        if !image_info.should_embed() {
1252                                                            info!(
1253                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1254                                                                path_for_closure.display(),
1255                                                                page_num,
1256                                                                image_info.color_variance,
1257                                                                image_info.width,
1258                                                                image_info.height
1259                                                            );
1260                                                            return None;
1261                                                        }
1262
1263                                                        // Convert to RGB8 first to ensure consistent buffer size
1264                                                        // (some PDFs have images with alpha or other formats that cause panics)
1265                                                        // Use catch_unwind to handle any panics from corrupted image data
1266                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1267                                                            let rgb_image = image.to_rgb8();
1268                                                            let mut png_bytes = Vec::new();
1269                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1270                                                                &mut Cursor::new(&mut png_bytes),
1271                                                                image::ImageFormat::Png,
1272                                                            ).map(|()| png_bytes)
1273                                                        }));
1274
1275                                                        match encode_result {
1276                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1277                                                            Ok(Err(e)) => {
1278                                                                info!(
1279                                                                    "Skipping image for {} page {}: {e}",
1280                                                                    path_for_closure.display(),
1281                                                                    page_num
1282                                                                );
1283                                                                None
1284                                                            }
1285                                                            Err(_) => {
1286                                                                // Panic occurred - corrupted image data, silently skip
1287                                                                info!(
1288                                                                    "Skipping malformed image for {} page {}",
1289                                                                    path_for_closure.display(),
1290                                                                    page_num
1291                                                                );
1292                                                                None
1293                                                            }
1294                                                        }
1295                                                    })
1296                                                    .collect();
1297
1298                                                // Restore the original panic hook
1299                                                std::panic::set_hook(prev_hook);
1300
1301                                                // Process encoded images sequentially (storage + CLIP encoding)
1302                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1303                                                let mut child_frame_offset = 1u64;
1304                                                let parent_uri = outcome.report.uri.clone();
1305                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1306                                                let total_images = encoded_images.len();
1307
1308                                                // Show progress bar for CLIP extraction
1309                                                let clip_pb = if !args.json && total_images > 0 {
1310                                                    let pb = ProgressBar::new(total_images as u64);
1311                                                    pb.set_style(
1312                                                        ProgressStyle::with_template(
1313                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1314                                                        )
1315                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1316                                                        .progress_chars("█▓░"),
1317                                                    );
1318                                                    Some(pb)
1319                                                } else {
1320                                                    None
1321                                                };
1322
1323                                                for (page_num, image, png_bytes) in encoded_images {
1324                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1325                                                    let child_title = format!(
1326                                                        "{} - Image {} (page {})",
1327                                                        parent_title,
1328                                                        child_frame_offset,
1329                                                        page_num
1330                                                    );
1331
1332                                                    let child_options = PutOptions::builder()
1333                                                        .uri(&child_uri)
1334                                                        .title(&child_title)
1335                                                        .parent_id(clip_frame_id)
1336                                                        .role(FrameRole::ExtractedImage)
1337                                                        .auto_tag(false)
1338                                                        .extract_dates(false)
1339                                                        .build();
1340
1341                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1342                                                        Ok(_child_seq) => {
1343                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1344                                                            child_frame_offset += 1;
1345
1346                                                            match model.encode_image(&image) {
1347                                                                Ok(embedding) => {
1348                                                                    if let Err(e) = mem
1349                                                                        .add_clip_embedding_with_page(
1350                                                                            child_frame_id,
1351                                                                            Some(page_num),
1352                                                                            embedding,
1353                                                                        )
1354                                                                    {
1355                                                                        warn!(
1356                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1357                                                                            path.display(),
1358                                                                            page_num
1359                                                                        );
1360                                                                    } else {
1361                                                                        info!(
1362                                                                            "Added CLIP image frame {} for {} page {}",
1363                                                                            child_frame_id,
1364                                                                            clip_frame_id,
1365                                                                            page_num
1366                                                                        );
1367                                                                        clip_embeddings_added = true;
1368                                                                    }
1369                                                                }
1370                                                                Err(e) => warn!(
1371                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1372                                                                    path.display(),
1373                                                                    page_num
1374                                                                ),
1375                                                            }
1376
1377                                                            // Commit after each image to checkpoint WAL
1378                                                            if let Err(e) = mem.commit() {
1379                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1380                                                            }
1381                                                        }
1382                                                        Err(e) => warn!(
1383                                                            "Failed to store image frame for {} page {}: {e}",
1384                                                            path.display(),
1385                                                            page_num
1386                                                        ),
1387                                                    }
1388
1389                                                    if let Some(ref pb) = clip_pb {
1390                                                        pb.inc(1);
1391                                                    }
1392                                                }
1393
1394                                                if let Some(pb) = clip_pb {
1395                                                    pb.finish_and_clear();
1396                                                }
1397                                            }
1398                                            Err(err) => warn!(
1399                                                "Failed to render PDF pages for CLIP {}: {err}",
1400                                                path.display()
1401                                            ),
1402                                        }
1403                                    }
1404                                }
1405
1406                                if args.json {
1407                                    summary_warnings
1408                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1409                                }
1410                                reports.push(outcome.report);
1411                                let report_index = reports.len() - 1;
1412                                if !args.json && !use_parallel {
1413                                    if let Some(report) = reports.get(report_index) {
1414                                        print_report(report);
1415                                    }
1416                                }
1417                                if args.transcript && !transcript_notice_printed {
1418                                    let notice = transcript_notice_message(&mut mem)?;
1419                                    if args.json {
1420                                        summary_warnings.push(notice);
1421                                    } else {
1422                                        println!("{notice}");
1423                                    }
1424                                    transcript_notice_printed = true;
1425                                }
1426                                if outcome.embedded {
1427                                    if let Some(pb) = embed_progress.as_ref() {
1428                                        pb.inc(1);
1429                                    }
1430                                    embedded_docs += 1;
1431                                }
1432                                if use_parallel {
1433                                    #[cfg(feature = "parallel_segments")]
1434                                    if let Some(input) = outcome.parallel_input {
1435                                        pending_parallel_indices.push(report_index);
1436                                        pending_parallel_inputs.push(input);
1437                                    }
1438                                } else if let Some(guard) = capacity_guard.as_mut() {
1439                                    mem.commit()?;
1440                                    let stats = mem.stats()?;
1441                                    guard.update_after_commit(&stats);
1442                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1443                                }
1444                            }
1445                            Err(err) => {
1446                                if let Some(pb) = analyze_spinner.take() {
1447                                    pb.finish_and_clear();
1448                                }
1449                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1450                                    return Err(err);
1451                                }
1452                                warn!("skipped {}: {err}", path.display());
1453                                skipped += 1;
1454                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1455                                    capacity_reached = true;
1456                                }
1457                            }
1458                        }
1459                    }
1460                    Err(err) => {
1461                        warn!("failed to read {}: {err}", path.display());
1462                        skipped += 1;
1463                    }
1464                }
1465                if capacity_reached {
1466                    break;
1467                }
1468            }
1469        }
1470    }
1471
1472    if capacity_reached {
1473        warn!("capacity limit reached; remaining inputs were not processed");
1474    }
1475
1476    if let Some(pb) = embed_progress.as_ref() {
1477        pb.finish_with_message("embed");
1478    }
1479
1480    if let Some(runtime) = embedding_runtime.as_ref() {
1481        if embedded_docs > 0 {
1482            match embedding_mode {
1483                EmbeddingMode::Explicit => println!(
1484                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1485                    embedded_docs,
1486                    runtime.dimension()
1487                ),
1488                EmbeddingMode::Auto => println!(
1489                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1490                    embedded_docs,
1491                    runtime.dimension()
1492                ),
1493                EmbeddingMode::None => {}
1494            }
1495        } else {
1496            match embedding_mode {
1497                EmbeddingMode::Explicit => {
1498                    println!("No embeddings were generated (no textual content available)");
1499                }
1500                EmbeddingMode::Auto => {
1501                    println!(
1502                        "Semantic runtime available, but no textual content produced embeddings"
1503                    );
1504                }
1505                EmbeddingMode::None => {}
1506            }
1507        }
1508    }
1509
1510    if reports.is_empty() {
1511        if args.json {
1512            if capacity_reached {
1513                summary_warnings.push("capacity_limit_reached".to_string());
1514            }
1515            let summary_json = json!({
1516                "processed": processed,
1517                "ingested": 0,
1518                "skipped": skipped,
1519                "bytes_added": bytes_added,
1520                "bytes_added_human": format_bytes(bytes_added),
1521                "embedded_documents": embedded_docs,
1522                "capacity_reached": capacity_reached,
1523                "warnings": summary_warnings,
1524                "reports": Vec::<serde_json::Value>::new(),
1525            });
1526            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1527        } else {
1528            println!(
1529                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1530                processed, skipped
1531            );
1532        }
1533        return Ok(());
1534    }
1535
1536    #[cfg(feature = "parallel_segments")]
1537    let mut parallel_flush_spinner = if use_parallel && !args.json {
1538        Some(create_spinner("Flushing parallel segments..."))
1539    } else {
1540        None
1541    };
1542
1543    if use_parallel {
1544        #[cfg(feature = "parallel_segments")]
1545        {
1546            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1547                let mut opts = BuildOpts::default();
1548                opts.sanitize();
1549                opts
1550            });
1551            // Set vector compression mode from mem state
1552            opts.vec_compression = mem.vector_compression().clone();
1553            let seqs = if pending_parallel_inputs.is_empty() {
1554                mem.commit_parallel(opts)?;
1555                Vec::new()
1556            } else {
1557                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1558            };
1559            if let Some(pb) = parallel_flush_spinner.take() {
1560                pb.finish_with_message("Flushed parallel segments");
1561            }
1562            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1563                if let Some(report) = reports.get_mut(idx) {
1564                    report.seq = seq;
1565                }
1566            }
1567        }
1568        #[cfg(not(feature = "parallel_segments"))]
1569        {
1570            mem.commit()?;
1571        }
1572    } else {
1573        mem.commit()?;
1574    }
1575
1576    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1577    #[cfg(feature = "clip")]
1578    if clip_embeddings_added {
1579        mem.commit()?;
1580    }
1581
1582    if !args.json && use_parallel {
1583        for report in &reports {
1584            print_report(report);
1585        }
1586    }
1587
1588    // Table extraction for PDF files
1589    let mut tables_extracted = 0usize;
1590    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1591    if args.tables && !args.no_tables {
1592        if let InputSet::Files(ref paths) = input_set {
1593            let pdf_paths: Vec<_> = paths
1594                .iter()
1595                .filter(|p| {
1596                    p.extension()
1597                        .and_then(|e| e.to_str())
1598                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1599                        .unwrap_or(false)
1600                })
1601                .collect();
1602
1603            if !pdf_paths.is_empty() {
1604                let table_spinner = if args.json {
1605                    None
1606                } else {
1607                    Some(create_spinner(&format!(
1608                        "Extracting tables from {} PDF(s)...",
1609                        pdf_paths.len()
1610                    )))
1611                };
1612
1613                // Use aggressive mode with auto fallback for best results
1614                let table_options = TableExtractionOptions::builder()
1615                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1616                    .min_rows(2)
1617                    .min_cols(2)
1618                    .min_quality(memvid_core::table::TableQuality::Low)
1619                    .merge_multi_page(true)
1620                    .build();
1621
1622                for pdf_path in pdf_paths {
1623                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1624                        let filename = pdf_path
1625                            .file_name()
1626                            .and_then(|s| s.to_str())
1627                            .unwrap_or("unknown.pdf");
1628
1629                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1630                            for table in &result.tables {
1631                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1632                                {
1633                                    tables_extracted += 1;
1634                                    table_summaries.push(json!({
1635                                        "table_id": table.table_id,
1636                                        "source_file": table.source_file,
1637                                        "meta_frame_id": meta_id,
1638                                        "rows": table.n_rows,
1639                                        "cols": table.n_cols,
1640                                        "quality": format!("{:?}", table.quality),
1641                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1642                                    }));
1643                                }
1644                            }
1645                        }
1646                    }
1647                }
1648
1649                if let Some(pb) = table_spinner {
1650                    if tables_extracted > 0 {
1651                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1652                    } else {
1653                        pb.finish_with_message("No tables found");
1654                    }
1655                }
1656
1657                // Commit table frames
1658                if tables_extracted > 0 {
1659                    mem.commit()?;
1660                }
1661            }
1662        }
1663    }
1664
1665    // Logic-Mesh: Extract entities and build relationship graph
1666    // Opt-in only: requires --logic-mesh flag
1667    #[cfg(feature = "logic_mesh")]
1668    let mut logic_mesh_entities = 0usize;
1669    #[cfg(feature = "logic_mesh")]
1670    {
1671        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1672
1673        let models_dir = config.models_dir.clone();
1674        let model_path = ner_model_path(&models_dir);
1675        let tokenizer_path = ner_tokenizer_path(&models_dir);
1676
1677        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1678        let ner_available = model_path.exists() && tokenizer_path.exists();
1679        let should_run_ner = args.logic_mesh;
1680
1681        if should_run_ner && !ner_available {
1682            // User explicitly requested --logic-mesh but model not installed
1683            if args.json {
1684                summary_warnings.push(
1685                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1686                );
1687            } else {
1688                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1689                eprintln!("   Run: memvid models install --ner distilbert-ner");
1690            }
1691        } else if should_run_ner {
1692            let ner_spinner = if args.json {
1693                None
1694            } else {
1695                Some(create_spinner("Building Logic-Mesh entity graph..."))
1696            };
1697
1698            match NerModel::load(&model_path, &tokenizer_path, None) {
1699                Ok(mut ner_model) => {
1700                    let mut mesh = LogicMesh::new();
1701                    let mut filtered_count = 0usize;
1702
1703                    // Process each ingested frame for entity extraction using cached search_text
1704                    for report in &reports {
1705                        let frame_id = report.frame_id;
1706                        // Use the cached search_text from ingestion
1707                        if let Some(ref content) = report.search_text {
1708                            if !content.is_empty() {
1709                                match ner_model.extract(content) {
1710                                    Ok(raw_entities) => {
1711                                        // Merge adjacent entities that were split by tokenization
1712                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1713                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1714
1715                                        for entity in &merged_entities {
1716                                            // Apply post-processing filter to remove noisy entities
1717                                            if !is_valid_entity(&entity.text, entity.confidence) {
1718                                                tracing::debug!(
1719                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1720                                                    entity.text, entity.confidence * 100.0
1721                                                );
1722                                                filtered_count += 1;
1723                                                continue;
1724                                            }
1725
1726                                            // Convert NER entity type to EntityKind
1727                                            let kind = entity.to_entity_kind();
1728                                            // Create mesh node with proper structure
1729                                            let node = MeshNode::new(
1730                                                entity.text.to_lowercase(),
1731                                                entity.text.trim().to_string(),
1732                                                kind,
1733                                                entity.confidence,
1734                                                frame_id,
1735                                                entity.byte_start as u32,
1736                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1737                                            );
1738                                            mesh.merge_node(node);
1739                                            logic_mesh_entities += 1;
1740                                        }
1741                                    }
1742                                    Err(e) => {
1743                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1744                                    }
1745                                }
1746                            }
1747                        }
1748                    }
1749
1750                    if logic_mesh_entities > 0 {
1751                        mesh.finalize();
1752                        let node_count = mesh.stats().node_count;
1753                        // Persist mesh to MV2 file
1754                        mem.set_logic_mesh(mesh);
1755                        mem.commit()?;
1756                        info!(
1757                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1758                            logic_mesh_entities,
1759                            node_count,
1760                            filtered_count
1761                        );
1762                    }
1763
1764                    if let Some(pb) = ner_spinner {
1765                        pb.finish_with_message(format!(
1766                            "Logic-Mesh: {} entities ({} unique)",
1767                            logic_mesh_entities,
1768                            mem.mesh_node_count()
1769                        ));
1770                    }
1771                }
1772                Err(e) => {
1773                    if let Some(pb) = ner_spinner {
1774                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1775                    }
1776                    if args.json {
1777                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1778                    }
1779                }
1780            }
1781        }
1782    }
1783
1784    // Rules-based enrichment: extract memory cards from ingested frames
1785    let mut enrichment_cards = 0usize;
1786    if args.enrich && !args.no_enrich && !reports.is_empty() {
1787        let enrich_spinner = if args.json {
1788            None
1789        } else {
1790            Some(create_spinner(&format!(
1791                "Extracting memories from {} frame(s)...",
1792                reports.len()
1793            )))
1794        };
1795
1796        let mut rules_engine = RulesEngine::new();
1797        rules_engine.init().ok(); // Rules engine doesn't need external init
1798
1799        let frame_ids: Vec<u64> = reports.iter().map(|r| r.frame_id).collect();
1800        for frame_id in &frame_ids {
1801            if let Ok(frame) = mem.frame_by_id(*frame_id) {
1802                let text = frame.search_text.as_deref().unwrap_or("");
1803                if !text.is_empty() {
1804                    let ctx = memvid_core::enrich::EnrichmentContext::new(
1805                        *frame_id,
1806                        frame.uri.clone().unwrap_or_default(),
1807                        text.to_string(),
1808                        frame.title.clone(),
1809                        frame.timestamp,
1810                        None,
1811                    );
1812                    let result = rules_engine.enrich(&ctx);
1813                    if !result.cards.is_empty() {
1814                        for card in result.cards {
1815                            if mem.put_memory_card(card).is_ok() {
1816                                enrichment_cards += 1;
1817                            }
1818                        }
1819                    }
1820                }
1821            }
1822        }
1823
1824        if enrichment_cards > 0 {
1825            mem.commit()?;
1826        }
1827
1828        if let Some(pb) = enrich_spinner {
1829            if enrichment_cards > 0 {
1830                pb.finish_with_message(format!("Extracted {} memory card(s)", enrichment_cards));
1831            } else {
1832                pb.finish_with_message("No memories extracted");
1833            }
1834        }
1835    }
1836
1837    let stats = mem.stats()?;
1838    if args.json {
1839        if capacity_reached {
1840            summary_warnings.push("capacity_limit_reached".to_string());
1841        }
1842        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1843        let total_duration_ms: Option<u64> = {
1844            let sum: u64 = reports
1845                .iter()
1846                .filter_map(|r| r.extraction.duration_ms)
1847                .sum();
1848            if sum > 0 {
1849                Some(sum)
1850            } else {
1851                None
1852            }
1853        };
1854        let total_pages: Option<u32> = {
1855            let sum: u32 = reports
1856                .iter()
1857                .filter_map(|r| r.extraction.pages_processed)
1858                .sum();
1859            if sum > 0 {
1860                Some(sum)
1861            } else {
1862                None
1863            }
1864        };
1865        let embedding_mode_str = match embedding_mode {
1866            EmbeddingMode::None => "none",
1867            EmbeddingMode::Auto => "auto",
1868            EmbeddingMode::Explicit => "explicit",
1869        };
1870        let reduction_percent = if bytes_input > 0 && bytes_input > bytes_added {
1871            Some(((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0)
1872        } else {
1873            None
1874        };
1875        let summary_json = json!({
1876            "processed": processed,
1877            "ingested": reports.len(),
1878            "skipped": skipped,
1879            "bytes_input": bytes_input,
1880            "bytes_input_human": format_bytes(bytes_input),
1881            "bytes_stored": bytes_added,
1882            "bytes_stored_human": format_bytes(bytes_added),
1883            "reduction_percent": reduction_percent,
1884            "embedded_documents": embedded_docs,
1885            "embedding": {
1886                "enabled": embedding_enabled,
1887                "mode": embedding_mode_str,
1888                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1889            },
1890            "tables": {
1891                "enabled": args.tables && !args.no_tables,
1892                "extracted": tables_extracted,
1893                "tables": table_summaries,
1894            },
1895            "enrichment": {
1896                "enabled": args.enrich && !args.no_enrich,
1897                "cards_extracted": enrichment_cards,
1898            },
1899            "capacity_reached": capacity_reached,
1900            "warnings": summary_warnings,
1901            "extraction": {
1902                "total_duration_ms": total_duration_ms,
1903                "total_pages_processed": total_pages,
1904            },
1905            "memory": {
1906                "path": args.file.display().to_string(),
1907                "frame_count": stats.frame_count,
1908                "size_bytes": stats.size_bytes,
1909                "tier": format!("{:?}", stats.tier),
1910            },
1911            "reports": reports_json,
1912        });
1913        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1914    } else {
1915        println!(
1916            "Committed {} frame(s); total frames {}",
1917            reports.len(),
1918            stats.frame_count
1919        );
1920        println!(
1921            "Summary: processed {}, ingested {}, skipped {}",
1922            processed,
1923            reports.len(),
1924            skipped
1925        );
1926        // Show storage efficiency - use actual payload bytes for accuracy
1927        // When --no-raw is used, stored_bytes in reports doesn't reflect actual storage
1928        if bytes_input > 0 {
1929            let actual_stored = stats.payload_bytes;
1930            if actual_stored < bytes_input && no_raw_count > 0 {
1931                // --no-raw mode: show the dramatic reduction (raw files → extracted text)
1932                let reduction = ((bytes_input - actual_stored) as f64 / bytes_input as f64) * 100.0;
1933                println!(
1934                    "Storage: {} input → {} stored ({:.1}% smaller, text extracted)",
1935                    format_bytes(bytes_input),
1936                    format_bytes(actual_stored),
1937                    reduction
1938                );
1939            } else if bytes_added < bytes_input {
1940                // Normal compression
1941                let reduction = ((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0;
1942                println!(
1943                    "Storage: {} input → {} stored ({:.1}% smaller)",
1944                    format_bytes(bytes_input),
1945                    format_bytes(bytes_added),
1946                    reduction
1947                );
1948            } else {
1949                println!("Storage: {} added", format_bytes(bytes_added));
1950            }
1951        }
1952        if tables_extracted > 0 {
1953            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1954        }
1955        if enrichment_cards > 0 {
1956            println!("Enrichment: {} memory card(s) extracted", enrichment_cards);
1957        }
1958    }
1959
1960    // Save active replay session if one exists
1961    #[cfg(feature = "replay")]
1962    let _ = mem.save_active_session();
1963
1964    Ok(())
1965}
1966
1967pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1968    let mut mem = Memvid::open(&args.file)?;
1969    ensure_cli_mutation_allowed(&mem)?;
1970    apply_lock_cli(&mut mem, &args.lock);
1971    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1972    let frame_id = existing.id;
1973
1974    let payload_bytes = match args.input.as_ref() {
1975        Some(path) => Some(read_payload(Some(path))?),
1976        None => None,
1977    };
1978    let payload_slice = payload_bytes.as_deref();
1979    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1980    let source_path = args.input.as_deref();
1981
1982    let mut options = PutOptions::default();
1983    options.enable_embedding = false;
1984    options.auto_tag = payload_slice.is_some();
1985    options.extract_dates = payload_slice.is_some();
1986    options.timestamp = Some(existing.timestamp);
1987    options.track = existing.track.clone();
1988    options.kind = existing.kind.clone();
1989    options.uri = existing.uri.clone();
1990    options.title = existing.title.clone();
1991    options.metadata = existing.metadata.clone();
1992    options.search_text = existing.search_text.clone();
1993    options.tags = existing.tags.clone();
1994    options.labels = existing.labels.clone();
1995    options.extra_metadata = existing.extra_metadata.clone();
1996
1997    if let Some(new_uri) = &args.set_uri {
1998        options.uri = Some(derive_uri(Some(new_uri), None));
1999    }
2000
2001    if options.uri.is_none() && payload_slice.is_some() {
2002        options.uri = Some(derive_uri(None, source_path));
2003    }
2004
2005    if let Some(title) = &args.title {
2006        options.title = Some(title.clone());
2007    }
2008    if let Some(ts) = args.timestamp {
2009        options.timestamp = Some(ts);
2010    }
2011    if let Some(track) = &args.track {
2012        options.track = Some(track.clone());
2013    }
2014    if let Some(kind) = &args.kind {
2015        options.kind = Some(kind.clone());
2016    }
2017
2018    if !args.labels.is_empty() {
2019        options.labels = args.labels.clone();
2020    }
2021
2022    if !args.tags.is_empty() {
2023        options.tags.clear();
2024        for (key, value) in &args.tags {
2025            if !key.is_empty() {
2026                options.tags.push(key.clone());
2027            }
2028            if !value.is_empty() && value != key {
2029                options.tags.push(value.clone());
2030            }
2031            options.extra_metadata.insert(key.clone(), value.clone());
2032        }
2033    }
2034
2035    apply_metadata_overrides(&mut options, &args.metadata);
2036
2037    let mut search_source = options.search_text.clone();
2038
2039    if let Some(payload) = payload_slice {
2040        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
2041        if let Some(title) = inferred_title {
2042            options.title = Some(title);
2043        }
2044
2045        if options.uri.is_none() {
2046            options.uri = Some(derive_uri(None, source_path));
2047        }
2048
2049        let analysis = analyze_file(
2050            source_path,
2051            payload,
2052            payload_utf8,
2053            options.title.as_deref(),
2054            AudioAnalyzeOptions::default(),
2055            false,
2056        );
2057        if let Some(meta) = analysis.metadata {
2058            options.metadata = Some(meta);
2059        }
2060        if let Some(text) = analysis.search_text {
2061            search_source = Some(text.clone());
2062            options.search_text = Some(text);
2063        }
2064    } else {
2065        options.auto_tag = false;
2066        options.extract_dates = false;
2067    }
2068
2069    let stats = mem.stats()?;
2070    options.enable_embedding = stats.has_vec_index;
2071
2072    // Auto-detect embedding model from existing MV2 dimension
2073    let mv2_dimension = mem.effective_vec_index_dimension()?;
2074    let mut embedding: Option<Vec<f32>> = None;
2075    if args.embeddings {
2076        let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
2077            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
2078            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
2079                let models: Vec<_> = identities
2080                    .iter()
2081                    .filter_map(|entry| entry.identity.model.as_deref())
2082                    .collect();
2083                anyhow::bail!(
2084                    "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
2085                    Detected models: {:?}",
2086                    models
2087                );
2088            }
2089            memvid_core::EmbeddingIdentitySummary::Unknown => None,
2090        };
2091
2092        let runtime = load_embedding_runtime_for_mv2(
2093            config,
2094            inferred_embedding_model.as_deref(),
2095            mv2_dimension,
2096        )?;
2097        if let Some(text) = search_source.as_ref() {
2098            if !text.trim().is_empty() {
2099                embedding = Some(runtime.embed_passage(text)?);
2100            }
2101        }
2102        if embedding.is_none() {
2103            if let Some(text) = payload_utf8 {
2104                if !text.trim().is_empty() {
2105                    embedding = Some(runtime.embed_passage(text)?);
2106                }
2107            }
2108        }
2109        if embedding.is_none() {
2110            warn!("no textual content available; embeddings not recomputed");
2111        }
2112        if embedding.is_some() {
2113            apply_embedding_identity_metadata(&mut options, &runtime);
2114        }
2115    }
2116
2117    let mut effective_embedding = embedding;
2118    if effective_embedding.is_none() && stats.has_vec_index {
2119        effective_embedding = mem.frame_embedding(frame_id)?;
2120    }
2121
2122    let final_uri = options.uri.clone();
2123    let replaced_payload = payload_slice.is_some();
2124    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
2125    mem.commit()?;
2126
2127    let updated_frame =
2128        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
2129            mem.frame_by_uri(&uri)?
2130        } else {
2131            let mut query = TimelineQueryBuilder::default();
2132            if let Some(limit) = NonZeroU64::new(1) {
2133                query = query.limit(limit).reverse(true);
2134            }
2135            let latest = mem.timeline(query.build())?;
2136            if let Some(entry) = latest.first() {
2137                mem.frame_by_id(entry.frame_id)?
2138            } else {
2139                mem.frame_by_id(frame_id)?
2140            }
2141        };
2142
2143    if args.json {
2144        let json = json!({
2145            "sequence": seq,
2146            "previous_frame_id": frame_id,
2147            "frame": frame_to_json(&updated_frame),
2148            "replaced_payload": replaced_payload,
2149        });
2150        println!("{}", serde_json::to_string_pretty(&json)?);
2151    } else {
2152        println!(
2153            "Updated frame {} → new frame {} (seq {})",
2154            frame_id, updated_frame.id, seq
2155        );
2156        println!(
2157            "Payload: {}",
2158            if replaced_payload {
2159                "replaced"
2160            } else {
2161                "reused"
2162            }
2163        );
2164        print_frame_summary(&mut mem, &updated_frame)?;
2165    }
2166
2167    Ok(())
2168}
2169
2170fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2171    if let Some(uri) = provided {
2172        let sanitized = sanitize_uri(uri, true);
2173        return format!("mv2://{}", sanitized);
2174    }
2175
2176    if let Some(path) = source {
2177        let raw = path.to_string_lossy();
2178        let sanitized = sanitize_uri(&raw, false);
2179        return format!("mv2://{}", sanitized);
2180    }
2181
2182    format!("mv2://frames/{}", Uuid::new_v4())
2183}
2184
2185pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2186    let digest = hash(payload).to_hex();
2187    let short = &digest[..32];
2188    let ext_from_path = source
2189        .and_then(|path| path.extension())
2190        .and_then(|ext| ext.to_str())
2191        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2192        .filter(|ext| !ext.is_empty());
2193    let ext = ext_from_path
2194        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2195        .unwrap_or_else(|| "bin".to_string());
2196    format!("mv2://video/{short}.{ext}")
2197}
2198
2199fn derive_title(
2200    provided: Option<String>,
2201    source: Option<&Path>,
2202    payload_utf8: Option<&str>,
2203) -> Option<String> {
2204    if let Some(title) = provided {
2205        let trimmed = title.trim();
2206        if trimmed.is_empty() {
2207            None
2208        } else {
2209            Some(trimmed.to_string())
2210        }
2211    } else {
2212        if let Some(text) = payload_utf8 {
2213            if let Some(markdown_title) = extract_markdown_title(text) {
2214                return Some(markdown_title);
2215            }
2216        }
2217        source
2218            .and_then(|path| path.file_stem())
2219            .and_then(|stem| stem.to_str())
2220            .map(to_title_case)
2221    }
2222}
2223
2224fn extract_markdown_title(text: &str) -> Option<String> {
2225    for line in text.lines() {
2226        let trimmed = line.trim();
2227        if trimmed.starts_with('#') {
2228            let title = trimmed.trim_start_matches('#').trim();
2229            if !title.is_empty() {
2230                return Some(title.to_string());
2231            }
2232        }
2233    }
2234    None
2235}
2236
2237fn to_title_case(input: &str) -> String {
2238    if input.is_empty() {
2239        return String::new();
2240    }
2241    let mut chars = input.chars();
2242    let Some(first) = chars.next() else {
2243        return String::new();
2244    };
2245    let prefix = first.to_uppercase().collect::<String>();
2246    prefix + chars.as_str()
2247}
2248
2249fn should_skip_input(path: &Path) -> bool {
2250    matches!(
2251        path.file_name().and_then(|name| name.to_str()),
2252        Some(".DS_Store")
2253    )
2254}
2255
2256fn should_skip_dir(path: &Path) -> bool {
2257    matches!(
2258        path.file_name().and_then(|name| name.to_str()),
2259        Some(name) if name.starts_with('.')
2260    )
2261}
2262
2263enum InputSet {
2264    Stdin,
2265    Files(Vec<PathBuf>),
2266}
2267
2268/// Check if a file is an image based on extension
2269#[cfg(feature = "clip")]
2270fn is_image_file(path: &std::path::Path) -> bool {
2271    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2272        return false;
2273    };
2274    matches!(
2275        ext.to_ascii_lowercase().as_str(),
2276        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2277    )
2278}
2279
2280fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2281    let Some(raw) = input else {
2282        return Ok(InputSet::Stdin);
2283    };
2284
2285    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2286        let mut files = Vec::new();
2287        let mut matched_any = false;
2288        for entry in glob(raw)? {
2289            let path = entry?;
2290            if path.is_file() {
2291                matched_any = true;
2292                if should_skip_input(&path) {
2293                    continue;
2294                }
2295                files.push(path);
2296            }
2297        }
2298        files.sort();
2299        if files.is_empty() {
2300            if matched_any {
2301                anyhow::bail!(
2302                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2303                );
2304            }
2305            anyhow::bail!("pattern '{raw}' did not match any files");
2306        }
2307        return Ok(InputSet::Files(files));
2308    }
2309
2310    let path = PathBuf::from(raw);
2311    if path.is_dir() {
2312        let (mut files, skipped_any) = collect_directory_files(&path)?;
2313        files.sort();
2314        if files.is_empty() {
2315            if skipped_any {
2316                anyhow::bail!(
2317                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2318                    path.display()
2319                );
2320            }
2321            anyhow::bail!(
2322                "directory '{}' contains no ingestible files",
2323                path.display()
2324            );
2325        }
2326        Ok(InputSet::Files(files))
2327    } else {
2328        Ok(InputSet::Files(vec![path]))
2329    }
2330}
2331
2332fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2333    let mut files = Vec::new();
2334    let mut skipped_any = false;
2335    let mut stack = vec![root.to_path_buf()];
2336    while let Some(dir) = stack.pop() {
2337        for entry in fs::read_dir(&dir)? {
2338            let entry = entry?;
2339            let path = entry.path();
2340            let file_type = entry.file_type()?;
2341            if file_type.is_dir() {
2342                if should_skip_dir(&path) {
2343                    skipped_any = true;
2344                    continue;
2345                }
2346                stack.push(path);
2347            } else if file_type.is_file() {
2348                if should_skip_input(&path) {
2349                    skipped_any = true;
2350                    continue;
2351                }
2352                files.push(path);
2353            }
2354        }
2355    }
2356    Ok((files, skipped_any))
2357}
2358
2359struct IngestOutcome {
2360    report: PutReport,
2361    embedded: bool,
2362    #[cfg(feature = "parallel_segments")]
2363    parallel_input: Option<ParallelInput>,
2364}
2365
2366struct PutReport {
2367    seq: u64,
2368    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2369    frame_id: u64,
2370    uri: String,
2371    title: Option<String>,
2372    original_bytes: usize,
2373    stored_bytes: usize,
2374    compressed: bool,
2375    /// --no-raw mode: raw binary not stored, only extracted text + hash
2376    no_raw: bool,
2377    source: Option<PathBuf>,
2378    mime: Option<String>,
2379    metadata: Option<DocMetadata>,
2380    extraction: ExtractionSummary,
2381    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2382    #[cfg(feature = "logic_mesh")]
2383    search_text: Option<String>,
2384}
2385
2386struct CapacityGuard {
2387    #[allow(dead_code)]
2388    tier: Tier,
2389    capacity: u64,
2390    current_size: u64,
2391}
2392
2393impl CapacityGuard {
2394    const MIN_OVERHEAD: u64 = 128 * 1024;
2395    const RELATIVE_OVERHEAD: f64 = 0.05;
2396
2397    fn from_stats(stats: &Stats) -> Option<Self> {
2398        if stats.capacity_bytes == 0 {
2399            return None;
2400        }
2401        Some(Self {
2402            tier: stats.tier,
2403            capacity: stats.capacity_bytes,
2404            current_size: stats.size_bytes,
2405        })
2406    }
2407
2408    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2409        let projected = self
2410            .current_size
2411            .saturating_add(additional)
2412            .saturating_add(Self::estimate_overhead(additional));
2413        if projected > self.capacity {
2414            return Err(map_put_error(
2415                MemvidError::CapacityExceeded {
2416                    current: self.current_size,
2417                    limit: self.capacity,
2418                    required: projected,
2419                },
2420                Some(self.capacity),
2421            ));
2422        }
2423        Ok(())
2424    }
2425
2426    fn update_after_commit(&mut self, stats: &Stats) {
2427        self.current_size = stats.size_bytes;
2428    }
2429
2430    fn capacity_hint(&self) -> Option<u64> {
2431        Some(self.capacity)
2432    }
2433
2434    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2435    fn check_and_warn_capacity(&self) {
2436        if self.capacity == 0 {
2437            return;
2438        }
2439
2440        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2441
2442        // Warn at key thresholds
2443        if usage_pct >= 90.0 && usage_pct < 91.0 {
2444            eprintln!(
2445                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2446                format_bytes(self.current_size),
2447                format_bytes(self.capacity)
2448            );
2449            eprintln!("   Actions:");
2450            eprintln!("   1. Recreate with larger --size");
2451            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2452            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2453        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2454            eprintln!(
2455                "⚠️  WARNING: 75% capacity used ({} / {})",
2456                format_bytes(self.current_size),
2457                format_bytes(self.capacity)
2458            );
2459            eprintln!("   Consider planning for capacity expansion soon");
2460        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2461            eprintln!(
2462                "ℹ️  INFO: 50% capacity used ({} / {})",
2463                format_bytes(self.current_size),
2464                format_bytes(self.capacity)
2465            );
2466        }
2467    }
2468
2469    fn estimate_overhead(additional: u64) -> u64 {
2470        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2471        fractional.max(Self::MIN_OVERHEAD)
2472    }
2473}
2474
2475#[derive(Debug, Clone)]
2476pub struct FileAnalysis {
2477    pub mime: String,
2478    pub metadata: Option<DocMetadata>,
2479    pub search_text: Option<String>,
2480    pub extraction: ExtractionSummary,
2481}
2482
2483#[derive(Clone, Copy)]
2484pub struct AudioAnalyzeOptions {
2485    force: bool,
2486    segment_secs: u32,
2487    transcribe: bool,
2488}
2489
2490impl AudioAnalyzeOptions {
2491    const DEFAULT_SEGMENT_SECS: u32 = 30;
2492
2493    fn normalised_segment_secs(self) -> u32 {
2494        self.segment_secs.max(1)
2495    }
2496}
2497
2498impl Default for AudioAnalyzeOptions {
2499    fn default() -> Self {
2500        Self {
2501            force: false,
2502            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2503            transcribe: false,
2504        }
2505    }
2506}
2507
2508struct AudioAnalysis {
2509    metadata: DocAudioMetadata,
2510    caption: Option<String>,
2511    search_terms: Vec<String>,
2512    transcript: Option<String>,
2513}
2514
2515struct ImageAnalysis {
2516    width: u32,
2517    height: u32,
2518    palette: Vec<String>,
2519    caption: Option<String>,
2520    exif: Option<DocExifMetadata>,
2521}
2522
2523#[derive(Debug, Clone)]
2524pub struct ExtractionSummary {
2525    reader: Option<String>,
2526    status: ExtractionStatus,
2527    warnings: Vec<String>,
2528    duration_ms: Option<u64>,
2529    pages_processed: Option<u32>,
2530}
2531
2532impl ExtractionSummary {
2533    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2534        self.warnings.push(warning.into());
2535    }
2536}
2537
2538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2539enum ExtractionStatus {
2540    Skipped,
2541    Ok,
2542    FallbackUsed,
2543    Empty,
2544    Failed,
2545}
2546
2547impl ExtractionStatus {
2548    fn label(self) -> &'static str {
2549        match self {
2550            Self::Skipped => "skipped",
2551            Self::Ok => "ok",
2552            Self::FallbackUsed => "fallback_used",
2553            Self::Empty => "empty",
2554            Self::Failed => "failed",
2555        }
2556    }
2557}
2558
2559fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2560    let filename = source_path
2561        .and_then(|path| path.file_name())
2562        .and_then(|name| name.to_str())
2563        .map(|value| value.to_string());
2564    MediaManifest {
2565        kind: "video".to_string(),
2566        mime: mime.to_string(),
2567        bytes,
2568        filename,
2569        duration_ms: None,
2570        width: None,
2571        height: None,
2572        codec: None,
2573    }
2574}
2575
2576pub fn analyze_file(
2577    source_path: Option<&Path>,
2578    payload: &[u8],
2579    payload_utf8: Option<&str>,
2580    inferred_title: Option<&str>,
2581    audio_opts: AudioAnalyzeOptions,
2582    force_video: bool,
2583) -> FileAnalysis {
2584    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2585    let is_video = force_video || mime.starts_with("video/");
2586    let treat_as_text = if is_video { false } else { treat_as_text_base };
2587
2588    let mut metadata = DocMetadata::default();
2589    metadata.mime = Some(mime.clone());
2590    metadata.bytes = Some(payload.len() as u64);
2591    metadata.hash = Some(hash(payload).to_hex().to_string());
2592
2593    let mut search_text = None;
2594
2595    let mut extraction = ExtractionSummary {
2596        reader: None,
2597        status: ExtractionStatus::Skipped,
2598        warnings: Vec::new(),
2599        duration_ms: None,
2600        pages_processed: None,
2601    };
2602
2603    let reader_registry = default_reader_registry();
2604    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2605        if slice.is_empty() {
2606            None
2607        } else {
2608            Some(slice)
2609        }
2610    });
2611
2612    let apply_extracted_text = |text: &str,
2613                                reader_label: &str,
2614                                status_if_applied: ExtractionStatus,
2615                                extraction: &mut ExtractionSummary,
2616                                metadata: &mut DocMetadata,
2617                                search_text: &mut Option<String>|
2618     -> bool {
2619        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2620            let mut value = normalized.text;
2621            if normalized.truncated {
2622                value.push('…');
2623            }
2624            if metadata.caption.is_none() {
2625                if let Some(caption) = caption_from_text(&value) {
2626                    metadata.caption = Some(caption);
2627                }
2628            }
2629            *search_text = Some(value);
2630            extraction.reader = Some(reader_label.to_string());
2631            extraction.status = status_if_applied;
2632            true
2633        } else {
2634            false
2635        }
2636    };
2637
2638    if is_video {
2639        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2640    }
2641
2642    if treat_as_text {
2643        if let Some(text) = payload_utf8 {
2644            if !apply_extracted_text(
2645                text,
2646                "bytes",
2647                ExtractionStatus::Ok,
2648                &mut extraction,
2649                &mut metadata,
2650                &mut search_text,
2651            ) {
2652                extraction.status = ExtractionStatus::Empty;
2653                extraction.reader = Some("bytes".to_string());
2654                let msg = "text payload contained no searchable content after normalization";
2655                extraction.record_warning(msg);
2656                warn!("{msg}");
2657            }
2658        } else {
2659            extraction.reader = Some("bytes".to_string());
2660            extraction.status = ExtractionStatus::Failed;
2661            let msg = "payload reported as text but was not valid UTF-8";
2662            extraction.record_warning(msg);
2663            warn!("{msg}");
2664        }
2665    } else if mime == "application/pdf"
2666        || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2667        || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2668        || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2669        || mime == "application/vnd.ms-excel"
2670        || mime == "application/msword"
2671    {
2672        // Use reader registry for PDFs and Office documents (XLSX, DOCX, PPTX, etc.)
2673        let mut applied = false;
2674
2675        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2676        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2677            .with_uri(source_path.and_then(|p| p.to_str()))
2678            .with_magic(magic);
2679
2680        if let Some(reader) = reader_registry.find_reader(&hint) {
2681            match reader.extract(payload, &hint) {
2682                Ok(output) => {
2683                    extraction.reader = Some(output.reader_name.clone());
2684                    if let Some(ms) = output.diagnostics.duration_ms {
2685                        extraction.duration_ms = Some(ms);
2686                    }
2687                    if let Some(pages) = output.diagnostics.pages_processed {
2688                        extraction.pages_processed = Some(pages);
2689                    }
2690                    if let Some(mime_type) = output.document.mime_type.clone() {
2691                        metadata.mime = Some(mime_type);
2692                    }
2693
2694                    for warning in output.diagnostics.warnings.iter() {
2695                        extraction.record_warning(warning);
2696                        warn!("{warning}");
2697                    }
2698
2699                    let status = if output.diagnostics.fallback {
2700                        ExtractionStatus::FallbackUsed
2701                    } else {
2702                        ExtractionStatus::Ok
2703                    };
2704
2705                    if let Some(doc_text) = output.document.text.as_ref() {
2706                        applied = apply_extracted_text(
2707                            doc_text,
2708                            output.reader_name.as_str(),
2709                            status,
2710                            &mut extraction,
2711                            &mut metadata,
2712                            &mut search_text,
2713                        );
2714                        if !applied {
2715                            extraction.reader = Some(output.reader_name);
2716                            extraction.status = ExtractionStatus::Empty;
2717                            let msg = "primary reader returned no usable text";
2718                            extraction.record_warning(msg);
2719                            warn!("{msg}");
2720                        }
2721                    } else {
2722                        extraction.reader = Some(output.reader_name);
2723                        extraction.status = ExtractionStatus::Empty;
2724                        let msg = "primary reader returned empty text";
2725                        extraction.record_warning(msg);
2726                        warn!("{msg}");
2727                    }
2728                }
2729                Err(err) => {
2730                    let name = reader.name();
2731                    extraction.reader = Some(name.to_string());
2732                    extraction.status = ExtractionStatus::Failed;
2733                    let msg = format!("primary reader {name} failed: {err}");
2734                    extraction.record_warning(&msg);
2735                    warn!("{msg}");
2736                }
2737            }
2738        } else {
2739            extraction.record_warning("no registered reader matched this PDF payload");
2740            warn!("no registered reader matched this PDF payload");
2741        }
2742
2743        if !applied {
2744            match extract_pdf_text(payload) {
2745                Ok(pdf_text) => {
2746                    if !apply_extracted_text(
2747                        &pdf_text,
2748                        "pdf_extract",
2749                        ExtractionStatus::FallbackUsed,
2750                        &mut extraction,
2751                        &mut metadata,
2752                        &mut search_text,
2753                    ) {
2754                        extraction.reader = Some("pdf_extract".to_string());
2755                        extraction.status = ExtractionStatus::Empty;
2756                        let msg = "PDF text extraction yielded no searchable content";
2757                        extraction.record_warning(msg);
2758                        warn!("{msg}");
2759                    }
2760                }
2761                Err(err) => {
2762                    extraction.reader = Some("pdf_extract".to_string());
2763                    extraction.status = ExtractionStatus::Failed;
2764                    let msg = format!("failed to extract PDF text via fallback: {err}");
2765                    extraction.record_warning(&msg);
2766                    warn!("{msg}");
2767                }
2768            }
2769        }
2770    }
2771
2772    if !is_video && mime.starts_with("image/") {
2773        if let Some(image_meta) = analyze_image(payload) {
2774            let ImageAnalysis {
2775                width,
2776                height,
2777                palette,
2778                caption,
2779                exif,
2780            } = image_meta;
2781            metadata.width = Some(width);
2782            metadata.height = Some(height);
2783            if !palette.is_empty() {
2784                metadata.colors = Some(palette);
2785            }
2786            if metadata.caption.is_none() {
2787                metadata.caption = caption;
2788            }
2789            if let Some(exif) = exif {
2790                metadata.exif = Some(exif);
2791            }
2792        }
2793    }
2794
2795    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2796        if let Some(AudioAnalysis {
2797            metadata: audio_metadata,
2798            caption: audio_caption,
2799            mut search_terms,
2800            transcript,
2801        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2802        {
2803            if metadata.audio.is_none()
2804                || metadata
2805                    .audio
2806                    .as_ref()
2807                    .map_or(true, DocAudioMetadata::is_empty)
2808            {
2809                metadata.audio = Some(audio_metadata);
2810            }
2811            if metadata.caption.is_none() {
2812                metadata.caption = audio_caption;
2813            }
2814
2815            // Use transcript as primary search text if available
2816            if let Some(ref text) = transcript {
2817                extraction.reader = Some("whisper".to_string());
2818                extraction.status = ExtractionStatus::Ok;
2819                search_text = Some(text.clone());
2820            } else if !search_terms.is_empty() {
2821                match &mut search_text {
2822                    Some(existing) => {
2823                        for term in search_terms.drain(..) {
2824                            if !existing.contains(&term) {
2825                                if !existing.ends_with(' ') {
2826                                    existing.push(' ');
2827                                }
2828                                existing.push_str(&term);
2829                            }
2830                        }
2831                    }
2832                    None => {
2833                        search_text = Some(search_terms.join(" "));
2834                    }
2835                }
2836            }
2837        }
2838    }
2839
2840    if metadata.caption.is_none() {
2841        if let Some(title) = inferred_title {
2842            let trimmed = title.trim();
2843            if !trimmed.is_empty() {
2844                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2845            }
2846        }
2847    }
2848
2849    FileAnalysis {
2850        mime,
2851        metadata: if metadata.is_empty() {
2852            None
2853        } else {
2854            Some(metadata)
2855        },
2856        search_text,
2857        extraction,
2858    }
2859}
2860
2861fn default_reader_registry() -> &'static ReaderRegistry {
2862    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2863    REGISTRY.get_or_init(ReaderRegistry::default)
2864}
2865
2866fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2867    if detect_pdf_magic(magic) {
2868        return Some(DocumentFormat::Pdf);
2869    }
2870
2871    let mime = mime?.trim().to_ascii_lowercase();
2872    match mime.as_str() {
2873        "application/pdf" => Some(DocumentFormat::Pdf),
2874        "text/plain" => Some(DocumentFormat::PlainText),
2875        "text/markdown" => Some(DocumentFormat::Markdown),
2876        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2877        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2878            Some(DocumentFormat::Docx)
2879        }
2880        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2881            Some(DocumentFormat::Pptx)
2882        }
2883        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2884            Some(DocumentFormat::Xlsx)
2885        }
2886        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2887        _ => None,
2888    }
2889}
2890
2891fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2892    let mut slice = match magic {
2893        Some(slice) if !slice.is_empty() => slice,
2894        _ => return false,
2895    };
2896    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2897        slice = &slice[3..];
2898    }
2899    while let Some((first, rest)) = slice.split_first() {
2900        if first.is_ascii_whitespace() {
2901            slice = rest;
2902        } else {
2903            break;
2904        }
2905    }
2906    slice.starts_with(b"%PDF")
2907}
2908
2909/// Robust PDF text extraction with multiple fallbacks for cross-platform support.
2910/// Tries all extractors and returns the one with the most text content.
2911fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2912    let mut best_text = String::new();
2913    let mut best_source = "none";
2914
2915    // Minimum chars to consider extraction "good" - scales with PDF size
2916    // For a 12MB PDF, expect at least ~1000 chars; for 1MB, ~100 chars
2917    let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2918
2919    // Try pdf_extract first
2920    match pdf_extract::extract_text_from_mem(payload) {
2921        Ok(text) => {
2922            let trimmed = text.trim();
2923            if trimmed.len() > best_text.len() {
2924                best_text = trimmed.to_string();
2925                best_source = "pdf_extract";
2926            }
2927            if trimmed.len() >= min_good_chars {
2928                info!("pdf_extract extracted {} chars (good)", trimmed.len());
2929                return Ok(best_text);
2930            } else if !trimmed.is_empty() {
2931                warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2932            }
2933        }
2934        Err(err) => {
2935            warn!("pdf_extract failed: {err}");
2936        }
2937    }
2938
2939    // Try lopdf - pure Rust, works on all platforms
2940    match extract_pdf_with_lopdf(payload) {
2941        Ok(text) => {
2942            let trimmed = text.trim();
2943            if trimmed.len() > best_text.len() {
2944                best_text = trimmed.to_string();
2945                best_source = "lopdf";
2946            }
2947            if trimmed.len() >= min_good_chars {
2948                info!("lopdf extracted {} chars (good)", trimmed.len());
2949                return Ok(best_text);
2950            } else if !trimmed.is_empty() {
2951                warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2952            }
2953        }
2954        Err(err) => {
2955            warn!("lopdf failed: {err}");
2956        }
2957    }
2958
2959    // Try pdftotext binary (requires poppler installed)
2960    match fallback_pdftotext(payload) {
2961        Ok(text) => {
2962            let trimmed = text.trim();
2963            if trimmed.len() > best_text.len() {
2964                best_text = trimmed.to_string();
2965                best_source = "pdftotext";
2966            }
2967            if !trimmed.is_empty() {
2968                info!("pdftotext extracted {} chars", trimmed.len());
2969            }
2970        }
2971        Err(err) => {
2972            warn!("pdftotext failed: {err}");
2973        }
2974    }
2975
2976    // Return the best result we found
2977    if !best_text.is_empty() {
2978        info!("using {} extraction ({} chars)", best_source, best_text.len());
2979        Ok(best_text)
2980    } else {
2981        warn!("all PDF extraction methods failed, storing without searchable text");
2982        Ok(String::new())
2983    }
2984}
2985
2986/// Extract text from PDF using lopdf (pure Rust, no native dependencies)
2987fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2988    use lopdf::Document;
2989
2990    let mut document = Document::load_mem(payload)
2991        .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2992
2993    // Try to decrypt if encrypted (empty password)
2994    if document.is_encrypted() {
2995        let _ = document.decrypt("");
2996    }
2997
2998    // Decompress streams for text extraction
2999    let _ = document.decompress();
3000
3001    // Get sorted page numbers
3002    let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
3003    if page_numbers.is_empty() {
3004        return Err(anyhow!("PDF has no pages"));
3005    }
3006    page_numbers.sort_unstable();
3007
3008    // Limit to 4096 pages max
3009    if page_numbers.len() > 4096 {
3010        page_numbers.truncate(4096);
3011        warn!("PDF has more than 4096 pages, truncating extraction");
3012    }
3013
3014    // Extract text from all pages
3015    let text = document
3016        .extract_text(&page_numbers)
3017        .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
3018
3019    Ok(text.trim().to_string())
3020}
3021
3022fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
3023    use std::io::Write;
3024    use std::process::{Command, Stdio};
3025
3026    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
3027
3028    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
3029    temp_pdf
3030        .write_all(payload)
3031        .context("failed to write pdf payload to temp file")?;
3032    temp_pdf.flush().context("failed to flush temp pdf file")?;
3033
3034    let child = Command::new(pdftotext)
3035        .arg("-layout")
3036        .arg(temp_pdf.path())
3037        .arg("-")
3038        .stdout(Stdio::piped())
3039        .spawn()
3040        .context("failed to spawn pdftotext")?;
3041
3042    let output = child
3043        .wait_with_output()
3044        .context("failed to read pdftotext output")?;
3045
3046    if !output.status.success() {
3047        return Err(anyhow!("pdftotext exited with status {}", output.status));
3048    }
3049
3050    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
3051    Ok(text)
3052}
3053
3054fn detect_mime(
3055    source_path: Option<&Path>,
3056    payload: &[u8],
3057    payload_utf8: Option<&str>,
3058) -> (String, bool) {
3059    // For ZIP-based formats (OOXML: docx, xlsx, pptx), check extension FIRST
3060    // because magic byte detection returns generic "application/zip"
3061    if let Some(path) = source_path {
3062        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3063            let ext_lower = ext.to_ascii_lowercase();
3064            // Office Open XML and legacy Office formats need extension-based detection
3065            if matches!(
3066                ext_lower.as_str(),
3067                "docx" | "xlsx" | "pptx" | "doc" | "xls" | "ppt"
3068            ) {
3069                if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3070                    return (mime.to_string(), treat_as_text);
3071                }
3072            }
3073        }
3074    }
3075
3076    if let Some(kind) = infer::get(payload) {
3077        let mime = kind.mime_type().to_string();
3078        let treat_as_text = mime.starts_with("text/")
3079            || matches!(
3080                mime.as_str(),
3081                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
3082            );
3083        return (mime, treat_as_text);
3084    }
3085
3086    let magic = magic_from_u8(payload);
3087    if !magic.is_empty() && magic != "application/octet-stream" {
3088        let treat_as_text = magic.starts_with("text/")
3089            || matches!(
3090                magic,
3091                "application/json"
3092                    | "application/xml"
3093                    | "application/javascript"
3094                    | "image/svg+xml"
3095                    | "text/plain"
3096            );
3097        return (magic.to_string(), treat_as_text);
3098    }
3099
3100    if let Some(path) = source_path {
3101        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3102            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3103                return (mime.to_string(), treat_as_text);
3104            }
3105        }
3106    }
3107
3108    if payload_utf8.is_some() {
3109        return ("text/plain".to_string(), true);
3110    }
3111
3112    ("application/octet-stream".to_string(), false)
3113}
3114
3115fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
3116    let ext_lower = ext.to_ascii_lowercase();
3117    match ext_lower.as_str() {
3118        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
3119        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
3120        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
3121        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
3122        "md" | "markdown" => Some(("text/markdown", true)),
3123        "rst" => Some(("text/x-rst", true)),
3124        "json" => Some(("application/json", true)),
3125        "csv" => Some(("text/csv", true)),
3126        "tsv" => Some(("text/tab-separated-values", true)),
3127        "yaml" | "yml" => Some(("application/yaml", true)),
3128        "toml" => Some(("application/toml", true)),
3129        "html" | "htm" => Some(("text/html", true)),
3130        "xml" => Some(("application/xml", true)),
3131        "svg" => Some(("image/svg+xml", true)),
3132        // Office Open XML formats (ZIP-based, need explicit extension mapping)
3133        "docx" => Some((
3134            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
3135            false,
3136        )),
3137        "xlsx" => Some((
3138            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
3139            false,
3140        )),
3141        "pptx" => Some((
3142            "application/vnd.openxmlformats-officedocument.presentationml.presentation",
3143            false,
3144        )),
3145        // Legacy Office formats
3146        "doc" => Some(("application/msword", false)),
3147        "xls" => Some(("application/vnd.ms-excel", false)),
3148        "ppt" => Some(("application/vnd.ms-powerpoint", false)),
3149        "gif" => Some(("image/gif", false)),
3150        "jpg" | "jpeg" => Some(("image/jpeg", false)),
3151        "png" => Some(("image/png", false)),
3152        "bmp" => Some(("image/bmp", false)),
3153        "ico" => Some(("image/x-icon", false)),
3154        "tif" | "tiff" => Some(("image/tiff", false)),
3155        "webp" => Some(("image/webp", false)),
3156        _ => None,
3157    }
3158}
3159
3160fn caption_from_text(text: &str) -> Option<String> {
3161    for line in text.lines() {
3162        let trimmed = line.trim();
3163        if !trimmed.is_empty() {
3164            return Some(truncate_to_boundary(trimmed, 240));
3165        }
3166    }
3167    None
3168}
3169
3170fn truncate_to_boundary(text: &str, max_len: usize) -> String {
3171    if text.len() <= max_len {
3172        return text.to_string();
3173    }
3174    let mut end = max_len;
3175    while end > 0 && !text.is_char_boundary(end) {
3176        end -= 1;
3177    }
3178    if end == 0 {
3179        return String::new();
3180    }
3181    let mut truncated = text[..end].trim_end().to_string();
3182    truncated.push('…');
3183    truncated
3184}
3185
3186fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
3187    let reader = ImageReader::new(Cursor::new(payload))
3188        .with_guessed_format()
3189        .map_err(|err| warn!("failed to guess image format: {err}"))
3190        .ok()?;
3191    let image = reader
3192        .decode()
3193        .map_err(|err| warn!("failed to decode image: {err}"))
3194        .ok()?;
3195    let width = image.width();
3196    let height = image.height();
3197    let rgb = image.to_rgb8();
3198    let palette = match get_palette(
3199        rgb.as_raw(),
3200        ColorFormat::Rgb,
3201        COLOR_PALETTE_SIZE,
3202        COLOR_PALETTE_QUALITY,
3203    ) {
3204        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3205        Err(err) => {
3206            warn!("failed to compute colour palette: {err}");
3207            Vec::new()
3208        }
3209    };
3210
3211    let (exif, exif_caption) = extract_exif_metadata(payload);
3212
3213    Some(ImageAnalysis {
3214        width,
3215        height,
3216        palette,
3217        caption: exif_caption,
3218        exif,
3219    })
3220}
3221
3222fn color_to_hex(color: Color) -> String {
3223    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3224}
3225
3226fn analyze_audio(
3227    payload: &[u8],
3228    source_path: Option<&Path>,
3229    mime: &str,
3230    options: AudioAnalyzeOptions,
3231) -> Option<AudioAnalysis> {
3232    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3233    use symphonia::core::errors::Error as SymphoniaError;
3234    use symphonia::core::formats::FormatOptions;
3235    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3236    use symphonia::core::meta::MetadataOptions;
3237    use symphonia::core::probe::Hint;
3238
3239    let cursor = Cursor::new(payload.to_vec());
3240    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3241
3242    let mut hint = Hint::new();
3243    if let Some(path) = source_path {
3244        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3245            hint.with_extension(ext);
3246        }
3247    }
3248    if let Some(suffix) = mime.split('/').nth(1) {
3249        hint.with_extension(suffix);
3250    }
3251
3252    let probed = symphonia::default::get_probe()
3253        .format(
3254            &hint,
3255            mss,
3256            &FormatOptions::default(),
3257            &MetadataOptions::default(),
3258        )
3259        .map_err(|err| warn!("failed to probe audio stream: {err}"))
3260        .ok()?;
3261
3262    let mut format = probed.format;
3263    let track = format.default_track()?;
3264    let track_id = track.id;
3265    let codec_params: CodecParameters = track.codec_params.clone();
3266    let sample_rate = codec_params.sample_rate;
3267    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3268
3269    let mut decoder = symphonia::default::get_codecs()
3270        .make(&codec_params, &DecoderOptions::default())
3271        .map_err(|err| warn!("failed to create audio decoder: {err}"))
3272        .ok()?;
3273
3274    let mut decoded_frames: u64 = 0;
3275    loop {
3276        let packet = match format.next_packet() {
3277            Ok(packet) => packet,
3278            Err(SymphoniaError::IoError(_)) => break,
3279            Err(SymphoniaError::DecodeError(err)) => {
3280                warn!("skipping undecodable audio packet: {err}");
3281                continue;
3282            }
3283            Err(SymphoniaError::ResetRequired) => {
3284                decoder.reset();
3285                continue;
3286            }
3287            Err(other) => {
3288                warn!("stopping audio analysis due to error: {other}");
3289                break;
3290            }
3291        };
3292
3293        if packet.track_id() != track_id {
3294            continue;
3295        }
3296
3297        match decoder.decode(&packet) {
3298            Ok(audio_buf) => {
3299                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3300            }
3301            Err(SymphoniaError::DecodeError(err)) => {
3302                warn!("failed to decode audio packet: {err}");
3303            }
3304            Err(SymphoniaError::IoError(_)) => break,
3305            Err(SymphoniaError::ResetRequired) => {
3306                decoder.reset();
3307            }
3308            Err(other) => {
3309                warn!("decoder error: {other}");
3310                break;
3311            }
3312        }
3313    }
3314
3315    let duration_secs = match sample_rate {
3316        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3317        _ => 0.0,
3318    };
3319
3320    let bitrate_kbps = if duration_secs > 0.0 {
3321        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3322    } else {
3323        None
3324    };
3325
3326    let tags = extract_lofty_tags(payload);
3327    let caption = tags
3328        .get("title")
3329        .cloned()
3330        .or_else(|| tags.get("tracktitle").cloned());
3331
3332    let mut search_terms = Vec::new();
3333    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3334        search_terms.push(title.clone());
3335    }
3336    if let Some(artist) = tags.get("artist") {
3337        search_terms.push(artist.clone());
3338    }
3339    if let Some(album) = tags.get("album") {
3340        search_terms.push(album.clone());
3341    }
3342    if let Some(genre) = tags.get("genre") {
3343        search_terms.push(genre.clone());
3344    }
3345
3346    let mut segments = Vec::new();
3347    if duration_secs > 0.0 {
3348        let segment_len = options.normalised_segment_secs() as f64;
3349        if segment_len > 0.0 {
3350            let mut start = 0.0;
3351            let mut idx = 1usize;
3352            while start < duration_secs {
3353                let end = (start + segment_len).min(duration_secs);
3354                segments.push(AudioSegmentMetadata {
3355                    start_seconds: start as f32,
3356                    end_seconds: end as f32,
3357                    label: Some(format!("Segment {}", idx)),
3358                });
3359                if end >= duration_secs {
3360                    break;
3361                }
3362                start = end;
3363                idx += 1;
3364            }
3365        }
3366    }
3367
3368    let mut metadata = DocAudioMetadata::default();
3369    if duration_secs > 0.0 {
3370        metadata.duration_secs = Some(duration_secs as f32);
3371    }
3372    if let Some(rate) = sample_rate {
3373        metadata.sample_rate_hz = Some(rate);
3374    }
3375    if let Some(channels) = channel_count {
3376        metadata.channels = Some(channels);
3377    }
3378    if let Some(bitrate) = bitrate_kbps {
3379        metadata.bitrate_kbps = Some(bitrate);
3380    }
3381    if codec_params.codec != CODEC_TYPE_NULL {
3382        metadata.codec = Some(format!("{:?}", codec_params.codec));
3383    }
3384    if !segments.is_empty() {
3385        metadata.segments = segments;
3386    }
3387    if !tags.is_empty() {
3388        metadata.tags = tags
3389            .iter()
3390            .map(|(k, v)| (k.clone(), v.clone()))
3391            .collect::<BTreeMap<_, _>>();
3392    }
3393
3394    // Transcribe audio if requested
3395    let transcript = if options.transcribe {
3396        transcribe_audio(source_path)
3397    } else {
3398        None
3399    };
3400
3401    Some(AudioAnalysis {
3402        metadata,
3403        caption,
3404        search_terms,
3405        transcript,
3406    })
3407}
3408
3409/// Transcribe audio file using Whisper
3410#[cfg(feature = "whisper")]
3411fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3412    use memvid_core::{WhisperConfig, WhisperTranscriber};
3413
3414    let path = source_path?;
3415
3416    tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3417
3418    let config = WhisperConfig::default();
3419    let mut transcriber = match WhisperTranscriber::new(&config) {
3420        Ok(t) => t,
3421        Err(e) => {
3422            tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3423            return None;
3424        }
3425    };
3426
3427    match transcriber.transcribe_file(path) {
3428        Ok(result) => {
3429            tracing::info!(
3430                duration = result.duration_secs,
3431                text_len = result.text.len(),
3432                "Audio transcription complete"
3433            );
3434            Some(result.text)
3435        }
3436        Err(e) => {
3437            tracing::warn!(error = %e, "Failed to transcribe audio");
3438            None
3439        }
3440    }
3441}
3442
3443#[cfg(not(feature = "whisper"))]
3444fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3445    tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3446    None
3447}
3448
3449fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3450    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3451
3452    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3453        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3454            out.entry("title".into())
3455                .or_insert_with(|| value.to_string());
3456            out.entry("tracktitle".into())
3457                .or_insert_with(|| value.to_string());
3458        }
3459        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3460            out.entry("artist".into())
3461                .or_insert_with(|| value.to_string());
3462        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3463            out.entry("artist".into())
3464                .or_insert_with(|| value.to_string());
3465        }
3466        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3467            out.entry("album".into())
3468                .or_insert_with(|| value.to_string());
3469        }
3470        if let Some(value) = tag.get_string(&ItemKey::Genre) {
3471            out.entry("genre".into())
3472                .or_insert_with(|| value.to_string());
3473        }
3474        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3475            out.entry("track_number".into())
3476                .or_insert_with(|| value.to_string());
3477        }
3478        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3479            out.entry("disc_number".into())
3480                .or_insert_with(|| value.to_string());
3481        }
3482    }
3483
3484    let mut tags = HashMap::new();
3485    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3486        Ok(probe) => probe,
3487        Err(err) => {
3488            warn!("failed to guess audio tag file type: {err}");
3489            return tags;
3490        }
3491    };
3492
3493    let tagged_file = match probe.read() {
3494        Ok(file) => file,
3495        Err(err) => {
3496            warn!("failed to read audio tags: {err}");
3497            return tags;
3498        }
3499    };
3500
3501    if let Some(primary) = tagged_file.primary_tag() {
3502        collect_tag(primary, &mut tags);
3503    }
3504    for tag in tagged_file.tags() {
3505        collect_tag(tag, &mut tags);
3506    }
3507
3508    tags
3509}
3510
3511fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3512    let mut cursor = Cursor::new(payload);
3513    let exif = match ExifReader::new().read_from_container(&mut cursor) {
3514        Ok(exif) => exif,
3515        Err(err) => {
3516            warn!("failed to read EXIF metadata: {err}");
3517            return (None, None);
3518        }
3519    };
3520
3521    let mut doc = DocExifMetadata::default();
3522    let mut has_data = false;
3523
3524    if let Some(make) = exif_string(&exif, Tag::Make) {
3525        doc.make = Some(make);
3526        has_data = true;
3527    }
3528    if let Some(model) = exif_string(&exif, Tag::Model) {
3529        doc.model = Some(model);
3530        has_data = true;
3531    }
3532    if let Some(lens) =
3533        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3534    {
3535        doc.lens = Some(lens);
3536        has_data = true;
3537    }
3538    if let Some(dt) =
3539        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3540    {
3541        doc.datetime = Some(dt);
3542        has_data = true;
3543    }
3544
3545    if let Some(gps) = extract_gps_metadata(&exif) {
3546        doc.gps = Some(gps);
3547        has_data = true;
3548    }
3549
3550    let caption = exif_string(&exif, Tag::ImageDescription);
3551
3552    let metadata = if has_data { Some(doc) } else { None };
3553    (metadata, caption)
3554}
3555
3556fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3557    exif.fields()
3558        .find(|field| field.tag == tag)
3559        .and_then(|field| field_to_string(field, exif))
3560}
3561
3562fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3563    let value = field.display_value().with_unit(exif).to_string();
3564    let trimmed = value.trim_matches('\0').trim();
3565    if trimmed.is_empty() {
3566        None
3567    } else {
3568        Some(trimmed.to_string())
3569    }
3570}
3571
3572fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3573    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3574    let latitude_ref_field = exif
3575        .fields()
3576        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3577    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3578    let longitude_ref_field = exif
3579        .fields()
3580        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3581
3582    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3583    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3584
3585    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3586        if reference.eq_ignore_ascii_case("S") {
3587            latitude = -latitude;
3588        }
3589    }
3590    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3591        if reference.eq_ignore_ascii_case("W") {
3592            longitude = -longitude;
3593        }
3594    }
3595
3596    Some(DocGpsMetadata {
3597        latitude,
3598        longitude,
3599    })
3600}
3601
3602fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3603    match value {
3604        ExifValue::Rational(values) if !values.is_empty() => {
3605            let deg = rational_to_f64_u(values.get(0)?)?;
3606            let min = values
3607                .get(1)
3608                .and_then(|v| rational_to_f64_u(v))
3609                .unwrap_or(0.0);
3610            let sec = values
3611                .get(2)
3612                .and_then(|v| rational_to_f64_u(v))
3613                .unwrap_or(0.0);
3614            Some(deg + (min / 60.0) + (sec / 3600.0))
3615        }
3616        ExifValue::SRational(values) if !values.is_empty() => {
3617            let deg = rational_to_f64_i(values.get(0)?)?;
3618            let min = values
3619                .get(1)
3620                .and_then(|v| rational_to_f64_i(v))
3621                .unwrap_or(0.0);
3622            let sec = values
3623                .get(2)
3624                .and_then(|v| rational_to_f64_i(v))
3625                .unwrap_or(0.0);
3626            Some(deg + (min / 60.0) + (sec / 3600.0))
3627        }
3628        _ => None,
3629    }
3630}
3631
3632fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3633    if value.denom == 0 {
3634        None
3635    } else {
3636        Some(value.num as f64 / value.denom as f64)
3637    }
3638}
3639
3640fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3641    if value.denom == 0 {
3642        None
3643    } else {
3644        Some(value.num as f64 / value.denom as f64)
3645    }
3646}
3647
3648fn value_to_ascii(value: &ExifValue) -> Option<String> {
3649    if let ExifValue::Ascii(values) = value {
3650        values
3651            .first()
3652            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3653            .map(|s| s.trim_matches('\0').trim().to_string())
3654            .filter(|s| !s.is_empty())
3655    } else {
3656        None
3657    }
3658}
3659
3660fn ingest_payload(
3661    mem: &mut Memvid,
3662    args: &PutArgs,
3663    config: &CliConfig,
3664    payload: Vec<u8>,
3665    source_path: Option<&Path>,
3666    capacity_guard: Option<&mut CapacityGuard>,
3667    enable_embedding: bool,
3668    runtime: Option<&EmbeddingRuntime>,
3669    parallel_mode: bool,
3670) -> Result<IngestOutcome> {
3671    let original_bytes = payload.len();
3672    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3673
3674    // Check if adding this payload would exceed free tier limit
3675    // Require API key for operations that would exceed 1GB total
3676    let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3677    crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3678
3679    let payload_text = std::str::from_utf8(&payload).ok();
3680    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3681
3682    let audio_opts = AudioAnalyzeOptions {
3683        force: args.audio || args.transcribe,
3684        segment_secs: args
3685            .audio_segment_seconds
3686            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3687        transcribe: args.transcribe,
3688    };
3689
3690    let mut analysis = analyze_file(
3691        source_path,
3692        &payload,
3693        payload_text,
3694        inferred_title.as_deref(),
3695        audio_opts,
3696        args.video,
3697    );
3698
3699    if args.video && !analysis.mime.starts_with("video/") {
3700        anyhow::bail!(
3701            "--video requires a video input; detected MIME type {}",
3702            analysis.mime
3703        );
3704    }
3705
3706    let mut search_text = analysis.search_text.clone();
3707    if let Some(ref mut text) = search_text {
3708        if text.len() > MAX_SEARCH_TEXT_LEN {
3709            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3710            *text = truncated;
3711        }
3712    }
3713    analysis.search_text = search_text.clone();
3714
3715    let uri = if let Some(ref explicit) = args.uri {
3716        derive_uri(Some(explicit), None)
3717    } else if args.video {
3718        derive_video_uri(&payload, source_path, &analysis.mime)
3719    } else {
3720        derive_uri(None, source_path)
3721    };
3722
3723    let mut options_builder = PutOptions::builder()
3724        .enable_embedding(enable_embedding)
3725        .auto_tag(!args.no_auto_tag)
3726        .extract_dates(!args.no_extract_dates)
3727        .extract_triplets(!args.no_extract_triplets);
3728    if let Some(ts) = args.timestamp {
3729        options_builder = options_builder.timestamp(ts);
3730    }
3731    if let Some(track) = &args.track {
3732        options_builder = options_builder.track(track.clone());
3733    }
3734    if args.video {
3735        options_builder = options_builder.kind("video");
3736        options_builder = options_builder.tag("kind", "video");
3737        options_builder = options_builder.push_tag("video");
3738    } else if let Some(kind) = &args.kind {
3739        options_builder = options_builder.kind(kind.clone());
3740    }
3741    options_builder = options_builder.uri(uri.clone());
3742    if let Some(ref title) = inferred_title {
3743        options_builder = options_builder.title(title.clone());
3744    }
3745    for (key, value) in &args.tags {
3746        options_builder = options_builder.tag(key.clone(), value.clone());
3747        if !key.is_empty() {
3748            options_builder = options_builder.push_tag(key.clone());
3749        }
3750        if !value.is_empty() && value != key {
3751            options_builder = options_builder.push_tag(value.clone());
3752        }
3753    }
3754    for label in &args.labels {
3755        options_builder = options_builder.label(label.clone());
3756    }
3757    if let Some(metadata) = analysis.metadata.clone() {
3758        if !metadata.is_empty() {
3759            options_builder = options_builder.metadata(metadata);
3760        }
3761    }
3762    for (idx, entry) in args.metadata.iter().enumerate() {
3763        match serde_json::from_str::<DocMetadata>(entry) {
3764            Ok(meta) => {
3765                options_builder = options_builder.metadata(meta);
3766            }
3767            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3768                Ok(value) => {
3769                    options_builder =
3770                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3771                }
3772                Err(err) => {
3773                    warn!("failed to parse --metadata JSON: {err}");
3774                }
3775            },
3776        }
3777    }
3778    if let Some(text) = search_text.clone() {
3779        if !text.trim().is_empty() {
3780            options_builder = options_builder.search_text(text);
3781        }
3782    }
3783    // Default: no-raw mode (only store extracted text + BLAKE3 hash)
3784    // Use --raw to store raw binary content for later retrieval
3785    let effective_no_raw = !args.raw; // true by default unless --raw is passed
3786    if effective_no_raw {
3787        options_builder = options_builder.no_raw(true);
3788        if let Some(path) = source_path {
3789            options_builder = options_builder.source_path(path.display().to_string());
3790        }
3791    }
3792    // --dedup: skip if identical content already exists
3793    if args.dedup {
3794        options_builder = options_builder.dedup(true);
3795    }
3796    let mut options = options_builder.build();
3797
3798    // Set extraction budget for progressive ingestion
3799    if let Some(budget_ms) = args.extraction_budget_ms {
3800        options.extraction_budget_ms = budget_ms;
3801    }
3802
3803    let existing_frame = if args.update_existing || !args.allow_duplicate {
3804        match mem.frame_by_uri(&uri) {
3805            Ok(frame) => Some(frame),
3806            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3807            Err(err) => return Err(err.into()),
3808        }
3809    } else {
3810        None
3811    };
3812
3813    // Compute frame_id: for updates it's the existing frame's id,
3814    // for new frames it's the current frame count (which becomes the next id)
3815    let frame_id = if let Some(ref existing) = existing_frame {
3816        existing.id
3817    } else {
3818        mem.stats()?.frame_count
3819    };
3820
3821    let mut embedded = false;
3822    let allow_embedding = enable_embedding && !args.video;
3823    if enable_embedding && !allow_embedding && args.video {
3824        warn!("semantic embeddings are not generated for video payloads");
3825    }
3826    let seq = if let Some(existing) = existing_frame {
3827        if args.update_existing {
3828            let payload_for_update = payload.clone();
3829            if allow_embedding {
3830                if let Some(path) = args.embedding_vec.as_deref() {
3831                    let embedding = crate::utils::read_embedding(path)?;
3832                    if let Some(model_str) = args.embedding_vec_model.as_deref() {
3833                        let choice = model_str.parse::<EmbeddingModelChoice>()?;
3834                        let expected = choice.dimensions();
3835                        if expected != 0 && embedding.len() != expected {
3836                            anyhow::bail!(
3837                                "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3838                                embedding.len(),
3839                                choice.name(),
3840                                expected
3841                            );
3842                        }
3843                        apply_embedding_identity_metadata_from_choice(
3844                            &mut options,
3845                            choice,
3846                            embedding.len(),
3847                            Some(model_str),
3848                        );
3849                    } else {
3850                        options.extra_metadata.insert(
3851                            MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3852                            embedding.len().to_string(),
3853                        );
3854                    }
3855                    embedded = true;
3856                    mem.update_frame(
3857                        existing.id,
3858                        Some(payload_for_update),
3859                        options.clone(),
3860                        Some(embedding),
3861                    )
3862                    .map_err(|err| {
3863                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3864                    })?
3865                } else {
3866                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3867                    let embed_text = search_text
3868                        .clone()
3869                        .or_else(|| payload_text.map(|text| text.to_string()))
3870                        .unwrap_or_default();
3871                    if embed_text.trim().is_empty() {
3872                        warn!("no textual content available; embedding skipped");
3873                        mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3874                            .map_err(|err| {
3875                                map_put_error(
3876                                    err,
3877                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3878                                )
3879                            })?
3880                    } else {
3881                        // Truncate to avoid token limits
3882                        let truncated_text = truncate_for_embedding(&embed_text);
3883                        if truncated_text.len() < embed_text.len() {
3884                            info!(
3885                                "Truncated text from {} to {} chars for embedding",
3886                                embed_text.len(),
3887                                truncated_text.len()
3888                            );
3889                        }
3890                        let embedding = runtime.embed_passage(&truncated_text)?;
3891                        embedded = true;
3892                        apply_embedding_identity_metadata(&mut options, runtime);
3893                        mem.update_frame(
3894                            existing.id,
3895                            Some(payload_for_update),
3896                            options.clone(),
3897                            Some(embedding),
3898                        )
3899                        .map_err(|err| {
3900                            map_put_error(
3901                                err,
3902                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3903                            )
3904                        })?
3905                    }
3906                }
3907            } else {
3908                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3909                    .map_err(|err| {
3910                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3911                    })?
3912            }
3913        } else {
3914            return Err(DuplicateUriError::new(uri.as_str()).into());
3915        }
3916    } else {
3917        if let Some(guard) = capacity_guard.as_ref() {
3918            guard.ensure_capacity(stored_bytes as u64)?;
3919        }
3920        if parallel_mode {
3921            #[cfg(feature = "parallel_segments")]
3922            {
3923                let mut parent_embedding = None;
3924                let mut chunk_embeddings_vec = None;
3925                if allow_embedding {
3926                    if let Some(path) = args.embedding_vec.as_deref() {
3927                        let embedding = crate::utils::read_embedding(path)?;
3928                        if let Some(model_str) = args.embedding_vec_model.as_deref() {
3929                            let choice = model_str.parse::<EmbeddingModelChoice>()?;
3930                            let expected = choice.dimensions();
3931                            if expected != 0 && embedding.len() != expected {
3932                                anyhow::bail!(
3933                                    "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3934                                    embedding.len(),
3935                                    choice.name(),
3936                                    expected
3937                                );
3938                            }
3939                            apply_embedding_identity_metadata_from_choice(
3940                                &mut options,
3941                                choice,
3942                                embedding.len(),
3943                                Some(model_str),
3944                            );
3945                        } else {
3946                            options.extra_metadata.insert(
3947                                MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3948                                embedding.len().to_string(),
3949                            );
3950                        }
3951                        parent_embedding = Some(embedding);
3952                        embedded = true;
3953                    } else {
3954                        let runtime =
3955                            runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3956                        let embed_text = search_text
3957                            .clone()
3958                            .or_else(|| payload_text.map(|text| text.to_string()))
3959                            .unwrap_or_default();
3960                        if embed_text.trim().is_empty() {
3961                            warn!("no textual content available; embedding skipped");
3962                        } else {
3963                            // Check if document will be chunked - if so, embed each chunk
3964                            info!(
3965                                "parallel mode: checking for chunks on {} bytes payload",
3966                                payload.len()
3967                            );
3968                            if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3969                                // Document will be chunked - embed each chunk for full semantic coverage
3970                                info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3971
3972                            // Apply temporal enrichment if enabled (before contextual)
3973                            #[cfg(feature = "temporal_enrich")]
3974                            let enriched_chunks = if args.temporal_enrich {
3975                                info!(
3976                                    "Temporal enrichment enabled, processing {} chunks",
3977                                    chunk_texts.len()
3978                                );
3979                                // Use today's date as fallback document date
3980                                let today = chrono::Local::now().date_naive();
3981                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3982                                // Results are tuples of (enriched_text, TemporalEnrichment)
3983                                let enriched: Vec<String> =
3984                                    results.iter().map(|(text, _)| text.clone()).collect();
3985                                let resolved_count = results
3986                                    .iter()
3987                                    .filter(|(_, e)| {
3988                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3989                                    })
3990                                    .count();
3991                                info!(
3992                                    "Temporal enrichment: resolved {} chunks with temporal context",
3993                                    resolved_count
3994                                );
3995                                enriched
3996                            } else {
3997                                chunk_texts.clone()
3998                            };
3999                            #[cfg(not(feature = "temporal_enrich"))]
4000                            let enriched_chunks = {
4001                                if args.temporal_enrich {
4002                                    warn!(
4003                                        "Temporal enrichment requested but feature not compiled in"
4004                                    );
4005                                }
4006                                chunk_texts.clone()
4007                            };
4008
4009                            // Apply contextual retrieval if enabled
4010                            let embed_chunks = if args.contextual {
4011                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4012                                let engine = if args.contextual_model.as_deref() == Some("local") {
4013                                    #[cfg(feature = "llama-cpp")]
4014                                    {
4015                                        let model_path = get_local_contextual_model_path()?;
4016                                        ContextualEngine::local(model_path)
4017                                    }
4018                                    #[cfg(not(feature = "llama-cpp"))]
4019                                    {
4020                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4021                                    }
4022                                } else if let Some(model) = &args.contextual_model {
4023                                    ContextualEngine::openai_with_model(model)?
4024                                } else {
4025                                    ContextualEngine::openai()?
4026                                };
4027
4028                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
4029                                {
4030                                    Ok(contexts) => {
4031                                        info!("Generated {} contextual prefixes", contexts.len());
4032                                        apply_contextual_prefixes(
4033                                            &embed_text,
4034                                            &enriched_chunks,
4035                                            &contexts,
4036                                        )
4037                                    }
4038                                    Err(e) => {
4039                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4040                                        enriched_chunks.clone()
4041                                    }
4042                                }
4043                            } else {
4044                                enriched_chunks
4045                            };
4046
4047                            let truncated_chunks: Vec<String> = embed_chunks
4048                                .iter()
4049                                .map(|chunk_text| {
4050                                    // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4051                                    let truncated_chunk = truncate_for_embedding(chunk_text);
4052                                    if truncated_chunk.len() < chunk_text.len() {
4053                                        info!(
4054                                            "parallel mode: Truncated chunk from {} to {} chars for embedding",
4055                                            chunk_text.len(),
4056                                            truncated_chunk.len()
4057                                        );
4058                                    }
4059                                    truncated_chunk
4060                                })
4061                                .collect();
4062                            let truncated_refs: Vec<&str> =
4063                                truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4064                            let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4065                            let num_chunks = chunk_embeddings.len();
4066                            chunk_embeddings_vec = Some(chunk_embeddings);
4067                            // Also embed the parent for the parent frame (truncate to avoid token limits)
4068                            let parent_text = truncate_for_embedding(&embed_text);
4069                            if parent_text.len() < embed_text.len() {
4070                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
4071                            }
4072                            parent_embedding = Some(runtime.embed_passage(&parent_text)?);
4073                            embedded = true;
4074                            info!(
4075                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
4076                                num_chunks
4077                            );
4078                            } else {
4079                                // No chunking - just embed the whole document (truncate to avoid token limits)
4080                                info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
4081                                let truncated_text = truncate_for_embedding(&embed_text);
4082                                if truncated_text.len() < embed_text.len() {
4083                                    info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
4084                                }
4085                                parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
4086                                embedded = true;
4087                            }
4088                        }
4089                    }
4090                }
4091                if embedded {
4092                    if let Some(runtime) = runtime {
4093                        apply_embedding_identity_metadata(&mut options, runtime);
4094                    }
4095                }
4096                let payload_variant = if let Some(path) = source_path {
4097                    ParallelPayload::Path(path.to_path_buf())
4098                } else {
4099                    ParallelPayload::Bytes(payload)
4100                };
4101                let input = ParallelInput {
4102                    payload: payload_variant,
4103                    options: options.clone(),
4104                    embedding: parent_embedding,
4105                    chunk_embeddings: chunk_embeddings_vec,
4106                };
4107                return Ok(IngestOutcome {
4108                    report: PutReport {
4109                        seq: 0,
4110                        frame_id: 0, // Will be populated after parallel commit
4111                        uri,
4112                        title: inferred_title,
4113                        original_bytes,
4114                        stored_bytes,
4115                        compressed,
4116                        no_raw: effective_no_raw,
4117                        source: source_path.map(Path::to_path_buf),
4118                        mime: Some(analysis.mime),
4119                        metadata: analysis.metadata,
4120                        extraction: analysis.extraction,
4121                        #[cfg(feature = "logic_mesh")]
4122                        search_text: search_text.clone(),
4123                    },
4124                    embedded,
4125                    parallel_input: Some(input),
4126                });
4127            }
4128            #[cfg(not(feature = "parallel_segments"))]
4129            {
4130                mem.put_bytes_with_options(&payload, options)
4131                    .map_err(|err| {
4132                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4133                    })?
4134            }
4135        } else if allow_embedding {
4136            if let Some(path) = args.embedding_vec.as_deref() {
4137                let embedding = crate::utils::read_embedding(path)?;
4138                if let Some(model_str) = args.embedding_vec_model.as_deref() {
4139                    let choice = model_str.parse::<EmbeddingModelChoice>()?;
4140                    let expected = choice.dimensions();
4141                    if expected != 0 && embedding.len() != expected {
4142                        anyhow::bail!(
4143                            "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4144                            embedding.len(),
4145                            choice.name(),
4146                            expected
4147                        );
4148                    }
4149                    apply_embedding_identity_metadata_from_choice(
4150                        &mut options,
4151                        choice,
4152                        embedding.len(),
4153                        Some(model_str),
4154                    );
4155                } else {
4156                    options.extra_metadata.insert(
4157                        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4158                        embedding.len().to_string(),
4159                    );
4160                }
4161                embedded = true;
4162                mem.put_with_embedding_and_options(&payload, embedding, options)
4163                    .map_err(|err| {
4164                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4165                    })?
4166            } else {
4167                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4168                let embed_text = search_text
4169                    .clone()
4170                    .or_else(|| payload_text.map(|text| text.to_string()))
4171                    .unwrap_or_default();
4172                if embed_text.trim().is_empty() {
4173                    warn!("no textual content available; embedding skipped");
4174                    mem.put_bytes_with_options(&payload, options)
4175                        .map_err(|err| {
4176                            map_put_error(
4177                                err,
4178                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4179                            )
4180                        })?
4181                } else {
4182                    // Check if document will be chunked - if so, embed each chunk
4183                    if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4184                    // Document will be chunked - embed each chunk for full semantic coverage
4185                    info!(
4186                        "Document will be chunked into {} chunks, generating embeddings for each",
4187                        chunk_texts.len()
4188                    );
4189
4190                    // Apply temporal enrichment if enabled (before contextual)
4191                    #[cfg(feature = "temporal_enrich")]
4192                    let enriched_chunks = if args.temporal_enrich {
4193                        info!(
4194                            "Temporal enrichment enabled, processing {} chunks",
4195                            chunk_texts.len()
4196                        );
4197                        // Use today's date as fallback document date
4198                        let today = chrono::Local::now().date_naive();
4199                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4200                        // Results are tuples of (enriched_text, TemporalEnrichment)
4201                        let enriched: Vec<String> =
4202                            results.iter().map(|(text, _)| text.clone()).collect();
4203                        let resolved_count = results
4204                            .iter()
4205                            .filter(|(_, e)| {
4206                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
4207                            })
4208                            .count();
4209                        info!(
4210                            "Temporal enrichment: resolved {} chunks with temporal context",
4211                            resolved_count
4212                        );
4213                        enriched
4214                    } else {
4215                        chunk_texts.clone()
4216                    };
4217                    #[cfg(not(feature = "temporal_enrich"))]
4218                    let enriched_chunks = {
4219                        if args.temporal_enrich {
4220                            warn!("Temporal enrichment requested but feature not compiled in");
4221                        }
4222                        chunk_texts.clone()
4223                    };
4224
4225                    // Apply contextual retrieval if enabled
4226                    let embed_chunks = if args.contextual {
4227                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4228                        let engine = if args.contextual_model.as_deref() == Some("local") {
4229                            #[cfg(feature = "llama-cpp")]
4230                            {
4231                                let model_path = get_local_contextual_model_path()?;
4232                                ContextualEngine::local(model_path)
4233                            }
4234                            #[cfg(not(feature = "llama-cpp"))]
4235                            {
4236                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4237                            }
4238                        } else if let Some(model) = &args.contextual_model {
4239                            ContextualEngine::openai_with_model(model)?
4240                        } else {
4241                            ContextualEngine::openai()?
4242                        };
4243
4244                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4245                            Ok(contexts) => {
4246                                info!("Generated {} contextual prefixes", contexts.len());
4247                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4248                            }
4249                            Err(e) => {
4250                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4251                                enriched_chunks.clone()
4252                            }
4253                        }
4254                    } else {
4255                        enriched_chunks
4256                    };
4257
4258                    let truncated_chunks: Vec<String> = embed_chunks
4259                        .iter()
4260                        .map(|chunk_text| {
4261                            // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4262                            let truncated_chunk = truncate_for_embedding(chunk_text);
4263                            if truncated_chunk.len() < chunk_text.len() {
4264                                info!(
4265                                    "Truncated chunk from {} to {} chars for embedding",
4266                                    chunk_text.len(),
4267                                    truncated_chunk.len()
4268                                );
4269                            }
4270                            truncated_chunk
4271                        })
4272                        .collect();
4273                    let truncated_refs: Vec<&str> =
4274                        truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4275                    let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4276                    // Truncate parent text to avoid token limits
4277                    let parent_text = truncate_for_embedding(&embed_text);
4278                    if parent_text.len() < embed_text.len() {
4279                        info!(
4280                            "Truncated parent text from {} to {} chars for embedding",
4281                            embed_text.len(),
4282                            parent_text.len()
4283                        );
4284                    }
4285                    let parent_embedding = runtime.embed_passage(&parent_text)?;
4286                    embedded = true;
4287                    apply_embedding_identity_metadata(&mut options, runtime);
4288                    info!(
4289                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
4290                        chunk_embeddings.len()
4291                    );
4292                    mem.put_with_chunk_embeddings(
4293                        &payload,
4294                        Some(parent_embedding),
4295                        chunk_embeddings,
4296                        options,
4297                    )
4298                    .map_err(|err| {
4299                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4300                    })?
4301                    } else {
4302                        // No chunking - just embed the whole document (truncate to avoid token limits)
4303                        info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4304                        let truncated_text = truncate_for_embedding(&embed_text);
4305                        if truncated_text.len() < embed_text.len() {
4306                            info!(
4307                                "Truncated text from {} to {} chars for embedding",
4308                                embed_text.len(),
4309                                truncated_text.len()
4310                            );
4311                        }
4312                        let embedding = runtime.embed_passage(&truncated_text)?;
4313                        embedded = true;
4314                        apply_embedding_identity_metadata(&mut options, runtime);
4315                        mem.put_with_embedding_and_options(&payload, embedding, options)
4316                            .map_err(|err| {
4317                                map_put_error(
4318                                    err,
4319                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4320                                )
4321                            })?
4322                    }
4323                }
4324            }
4325        } else {
4326            mem.put_bytes_with_options(&payload, options)
4327                .map_err(|err| {
4328                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4329                })?
4330        }
4331    };
4332
4333    Ok(IngestOutcome {
4334        report: PutReport {
4335            seq,
4336            frame_id,
4337            uri,
4338            title: inferred_title,
4339            original_bytes,
4340            stored_bytes,
4341            compressed,
4342            no_raw: effective_no_raw,
4343            source: source_path.map(Path::to_path_buf),
4344            mime: Some(analysis.mime),
4345            metadata: analysis.metadata,
4346            extraction: analysis.extraction,
4347            #[cfg(feature = "logic_mesh")]
4348            search_text,
4349        },
4350        embedded,
4351        #[cfg(feature = "parallel_segments")]
4352        parallel_input: None,
4353    })
4354}
4355
4356fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4357    if std::str::from_utf8(payload).is_ok() {
4358        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4359        Ok((compressed.len(), true))
4360    } else {
4361        Ok((payload.len(), false))
4362    }
4363}
4364
4365fn print_report(report: &PutReport) {
4366    let name = report
4367        .source
4368        .as_ref()
4369        .map(|path| {
4370            let pretty = env::current_dir()
4371                .ok()
4372                .as_ref()
4373                .and_then(|cwd| diff_paths(path, cwd))
4374                .unwrap_or_else(|| path.to_path_buf());
4375            pretty.to_string_lossy().into_owned()
4376        })
4377        .unwrap_or_else(|| "stdin".to_string());
4378
4379    let ratio = if report.original_bytes > 0 {
4380        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4381    } else {
4382        100.0
4383    };
4384
4385    println!("• {name} → {}", report.uri);
4386    println!("  seq: {}", report.seq);
4387    if report.no_raw {
4388        println!(
4389            "  size: {} B (--no-raw: text only, hash stored)",
4390            report.original_bytes
4391        );
4392    } else if report.compressed {
4393        println!(
4394            "  size: {} B → {} B ({:.1}% of original, compressed)",
4395            report.original_bytes, report.stored_bytes, ratio
4396        );
4397    } else {
4398        println!("  size: {} B (stored as-is)", report.original_bytes);
4399    }
4400    if let Some(mime) = &report.mime {
4401        println!("  mime: {mime}");
4402    }
4403    if let Some(title) = &report.title {
4404        println!("  title: {title}");
4405    }
4406    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4407    println!(
4408        "  extraction: status={} reader={}",
4409        report.extraction.status.label(),
4410        extraction_reader
4411    );
4412    if let Some(ms) = report.extraction.duration_ms {
4413        println!("  extraction-duration: {} ms", ms);
4414    }
4415    if let Some(pages) = report.extraction.pages_processed {
4416        println!("  extraction-pages: {}", pages);
4417    }
4418    for warning in &report.extraction.warnings {
4419        println!("  warning: {warning}");
4420    }
4421    if let Some(metadata) = &report.metadata {
4422        if let Some(caption) = &metadata.caption {
4423            println!("  caption: {caption}");
4424        }
4425        if let Some(audio) = metadata.audio.as_ref() {
4426            if audio.duration_secs.is_some()
4427                || audio.sample_rate_hz.is_some()
4428                || audio.channels.is_some()
4429            {
4430                let duration = audio
4431                    .duration_secs
4432                    .map(|secs| format!("{secs:.1}s"))
4433                    .unwrap_or_else(|| "unknown".into());
4434                let rate = audio
4435                    .sample_rate_hz
4436                    .map(|hz| format!("{} Hz", hz))
4437                    .unwrap_or_else(|| "? Hz".into());
4438                let channels = audio
4439                    .channels
4440                    .map(|ch| ch.to_string())
4441                    .unwrap_or_else(|| "?".into());
4442                println!("  audio: duration {duration}, {channels} ch, {rate}");
4443            }
4444        }
4445    }
4446
4447    println!();
4448}
4449
4450fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4451    let source = report
4452        .source
4453        .as_ref()
4454        .map(|path| path.to_string_lossy().into_owned());
4455    let extraction = json!({
4456        "status": report.extraction.status.label(),
4457        "reader": report.extraction.reader,
4458        "duration_ms": report.extraction.duration_ms,
4459        "pages_processed": report.extraction.pages_processed,
4460        "warnings": report.extraction.warnings,
4461    });
4462    json!({
4463        "seq": report.seq,
4464        "uri": report.uri,
4465        "title": report.title,
4466        "original_bytes": report.original_bytes,
4467        "original_bytes_human": format_bytes(report.original_bytes as u64),
4468        "stored_bytes": report.stored_bytes,
4469        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4470        "compressed": report.compressed,
4471        "mime": report.mime,
4472        "source": source,
4473        "metadata": report.metadata,
4474        "extraction": extraction,
4475    })
4476}
4477
4478fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4479    match err {
4480        MemvidError::CapacityExceeded {
4481            current,
4482            limit,
4483            required,
4484        } => anyhow!(CapacityExceededMessage {
4485            current,
4486            limit,
4487            required,
4488        }),
4489        other => other.into(),
4490    }
4491}
4492
4493fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4494    for (idx, entry) in entries.iter().enumerate() {
4495        match serde_json::from_str::<DocMetadata>(entry) {
4496            Ok(meta) => {
4497                options.metadata = Some(meta);
4498            }
4499            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4500                Ok(value) => {
4501                    options
4502                        .extra_metadata
4503                        .insert(format!("custom_metadata_{idx}"), value.to_string());
4504                }
4505                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4506            },
4507        }
4508    }
4509}
4510
4511fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4512    let stats = mem.stats()?;
4513    let message = match stats.tier {
4514        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4515        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4516    };
4517    Ok(message)
4518}
4519
4520fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4521    let trimmed = raw.trim().trim_start_matches("mv2://");
4522    let normalized = trimmed.replace('\\', "/");
4523    let mut segments: Vec<String> = Vec::new();
4524    for segment in normalized.split('/') {
4525        if segment.is_empty() || segment == "." {
4526            continue;
4527        }
4528        if segment == ".." {
4529            segments.pop();
4530            continue;
4531        }
4532        segments.push(segment.to_string());
4533    }
4534
4535    if segments.is_empty() {
4536        return normalized
4537            .split('/')
4538            .last()
4539            .filter(|s| !s.is_empty())
4540            .unwrap_or("document")
4541            .to_string();
4542    }
4543
4544    if keep_path {
4545        segments.join("/")
4546    } else {
4547        segments
4548            .last()
4549            .cloned()
4550            .unwrap_or_else(|| "document".to_string())
4551    }
4552}
4553
4554fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4555    if quiet {
4556        let pb = ProgressBar::hidden();
4557        pb.set_message(message.to_string());
4558        return pb;
4559    }
4560
4561    match total {
4562        Some(len) => {
4563            let pb = ProgressBar::new(len);
4564            pb.set_draw_target(ProgressDrawTarget::stderr());
4565            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4566                .unwrap_or_else(|_| ProgressStyle::default_bar());
4567            pb.set_style(style);
4568            pb.set_message(message.to_string());
4569            pb
4570        }
4571        None => {
4572            let pb = ProgressBar::new_spinner();
4573            pb.set_draw_target(ProgressDrawTarget::stderr());
4574            pb.set_message(message.to_string());
4575            pb.enable_steady_tick(Duration::from_millis(120));
4576            pb
4577        }
4578    }
4579}
4580
4581fn create_spinner(message: &str) -> ProgressBar {
4582    let pb = ProgressBar::new_spinner();
4583    pb.set_draw_target(ProgressDrawTarget::stderr());
4584    let style = ProgressStyle::with_template("{spinner} {msg}")
4585        .unwrap_or_else(|_| ProgressStyle::default_spinner())
4586        .tick_strings(&["-", "\\", "|", "/"]);
4587    pb.set_style(style);
4588    pb.set_message(message.to_string());
4589    pb.enable_steady_tick(Duration::from_millis(120));
4590    pb
4591}
4592
4593// ============================================================================
4594// put-many command - batch ingestion with pre-computed embeddings
4595// ============================================================================
4596
4597/// Arguments for the `put-many` subcommand
4598#[cfg(feature = "parallel_segments")]
4599#[derive(Args)]
4600pub struct PutManyArgs {
4601    /// Path to the memory file to append to
4602    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4603    pub file: PathBuf,
4604    /// Emit JSON instead of human-readable output
4605    #[arg(long)]
4606    pub json: bool,
4607    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
4608    /// If not specified, reads from stdin
4609    #[arg(long, value_name = "PATH")]
4610    pub input: Option<PathBuf>,
4611    /// Zstd compression level (1-9, default: 3)
4612    #[arg(long, default_value = "3")]
4613    pub compression_level: i32,
4614    /// Store raw binary content along with extracted text.
4615    /// By default, only extracted text + BLAKE3 hash is stored (--no-raw behavior).
4616    #[arg(long)]
4617    pub raw: bool,
4618    /// Don't store raw binary content (DEFAULT behavior).
4619    /// Kept for backwards compatibility.
4620    #[arg(long, hide = true)]
4621    pub no_raw: bool,
4622    /// Skip ingestion if identical content (by BLAKE3 hash) already exists.
4623    /// Returns the existing frame's ID instead of creating a duplicate.
4624    #[arg(long)]
4625    pub dedup: bool,
4626    #[command(flatten)]
4627    pub lock: LockCliArgs,
4628}
4629
4630/// JSON input format for put-many requests
4631#[cfg(feature = "parallel_segments")]
4632#[derive(Debug, serde::Deserialize)]
4633pub struct PutManyRequest {
4634    pub title: String,
4635    #[serde(default)]
4636    pub label: String,
4637    pub text: String,
4638    #[serde(default)]
4639    pub uri: Option<String>,
4640    #[serde(default)]
4641    pub tags: Vec<String>,
4642    #[serde(default)]
4643    pub labels: Vec<String>,
4644    #[serde(default)]
4645    pub metadata: serde_json::Value,
4646    /// Extra metadata (string key/value) stored on the frame.
4647    /// Use this to persist embedding identity keys like `memvid.embedding.model`.
4648    #[serde(default)]
4649    pub extra_metadata: BTreeMap<String, String>,
4650    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
4651    #[serde(default)]
4652    pub embedding: Option<Vec<f32>>,
4653    /// Pre-computed embeddings for each chunk (matched by index)
4654    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
4655    /// The number of chunk embeddings should match the number of chunks that will be created.
4656    #[serde(default)]
4657    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4658    /// Per-item override for no_raw mode. If not specified, uses the global --no-raw flag.
4659    /// When true, stores only extracted text + BLAKE3 hash instead of raw content.
4660    #[serde(default)]
4661    pub no_raw: Option<bool>,
4662    /// Per-item override for dedup mode. If not specified, uses the global --dedup flag.
4663    /// When true, skips ingestion if identical content already exists.
4664    #[serde(default)]
4665    pub dedup: Option<bool>,
4666}
4667
4668/// Batch input format (array of requests)
4669#[cfg(feature = "parallel_segments")]
4670#[derive(Debug, serde::Deserialize)]
4671#[serde(transparent)]
4672pub struct PutManyBatch {
4673    pub requests: Vec<PutManyRequest>,
4674}
4675
4676#[cfg(feature = "parallel_segments")]
4677pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4678    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4679    use std::io::Read;
4680
4681    // Check if plan allows write operations (blocks expired subscriptions)
4682    crate::utils::require_active_plan(config, "put-many")?;
4683
4684    let mut mem = Memvid::open(&args.file)?;
4685    crate::utils::ensure_cli_mutation_allowed(&mem)?;
4686    crate::utils::apply_lock_cli(&mut mem, &args.lock);
4687
4688    // Ensure indexes are enabled
4689    let stats = mem.stats()?;
4690    if !stats.has_lex_index {
4691        mem.enable_lex()?;
4692    }
4693    if !stats.has_vec_index {
4694        mem.enable_vec()?;
4695    }
4696
4697    // Check if current memory size exceeds free tier limit
4698    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4699
4700    // Read batch input
4701    let batch_json: String = if let Some(input_path) = &args.input {
4702        fs::read_to_string(input_path)
4703            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4704    } else {
4705        let mut buffer = String::new();
4706        io::stdin()
4707            .read_to_string(&mut buffer)
4708            .context("Failed to read from stdin")?;
4709        buffer
4710    };
4711
4712    let batch: PutManyBatch =
4713        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4714
4715    if batch.requests.is_empty() {
4716        if args.json {
4717            println!("{{\"frame_ids\": [], \"count\": 0}}");
4718        } else {
4719            println!("No requests to process");
4720        }
4721        return Ok(());
4722    }
4723
4724    let count = batch.requests.len();
4725    if !args.json {
4726        eprintln!("Processing {} documents...", count);
4727    }
4728
4729    // Convert to ParallelInput
4730    // Default: no-raw mode (only store extracted text + BLAKE3 hash)
4731    // Use --raw to store raw binary content for later retrieval
4732    let global_no_raw = !args.raw; // true by default unless --raw is passed
4733    let global_dedup = args.dedup;
4734    let inputs: Vec<ParallelInput> = batch
4735        .requests
4736        .into_iter()
4737        .enumerate()
4738        .map(|(i, req)| {
4739            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4740            let mut options = PutOptions::default();
4741            options.title = Some(req.title.clone());
4742            options.uri = Some(uri);
4743            options.tags = req.tags;
4744            options.labels = req.labels;
4745            options.extra_metadata = req.extra_metadata;
4746            // Use per-item no_raw if specified, otherwise use global --no-raw flag
4747            options.no_raw = req.no_raw.unwrap_or(global_no_raw);
4748            // Use per-item dedup if specified, otherwise use global --dedup flag
4749            options.dedup = req.dedup.unwrap_or(global_dedup);
4750            if !req.metadata.is_null() {
4751                match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4752                    Ok(meta) => options.metadata = Some(meta),
4753                    Err(_) => {
4754                        options
4755                            .extra_metadata
4756                            .entry("memvid.metadata.json".to_string())
4757                            .or_insert_with(|| req.metadata.to_string());
4758                    }
4759                }
4760            }
4761
4762            if let Some(embedding) = req
4763                .embedding
4764                .as_ref()
4765                .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4766            {
4767                options
4768                    .extra_metadata
4769                    .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4770                    .or_insert_with(|| embedding.len().to_string());
4771            }
4772
4773            ParallelInput {
4774                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4775                options,
4776                embedding: req.embedding,
4777                chunk_embeddings: req.chunk_embeddings,
4778            }
4779        })
4780        .collect();
4781
4782    // Build options
4783    let build_opts = BuildOpts {
4784        zstd_level: args.compression_level.clamp(1, 9),
4785        ..BuildOpts::default()
4786    };
4787
4788    // Ingest batch
4789    let frame_ids = mem
4790        .put_parallel_inputs(&inputs, build_opts)
4791        .context("Failed to ingest batch")?;
4792
4793    if args.json {
4794        let output = serde_json::json!({
4795            "frame_ids": frame_ids,
4796            "count": frame_ids.len()
4797        });
4798        println!("{}", serde_json::to_string(&output)?);
4799    } else {
4800        println!("Ingested {} documents", frame_ids.len());
4801        if frame_ids.len() <= 10 {
4802            for fid in &frame_ids {
4803                println!("  frame_id: {}", fid);
4804            }
4805        } else {
4806            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4807            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4808        }
4809    }
4810
4811    Ok(())
4812}
4813
4814/// Arguments for the `correct` subcommand
4815#[derive(Args)]
4816pub struct CorrectArgs {
4817    /// Path to the memory file
4818    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4819    pub file: PathBuf,
4820    /// The correction statement (e.g., "Ben Koenig reported to Chloe Nguyen before 2025")
4821    #[arg(value_name = "STATEMENT")]
4822    pub statement: String,
4823    /// Topics for better retrieval matching (comma-separated)
4824    #[arg(long, value_name = "TOPICS", value_delimiter = ',')]
4825    pub topics: Option<Vec<String>>,
4826    /// Source attribution for the correction
4827    #[arg(long, value_name = "SOURCE")]
4828    pub source: Option<String>,
4829    /// Retrieval boost factor (default: 2.0)
4830    #[arg(long, default_value = "2.0")]
4831    pub boost: f64,
4832    /// Emit JSON instead of human-readable output
4833    #[arg(long)]
4834    pub json: bool,
4835    #[command(flatten)]
4836    pub lock: LockCliArgs,
4837}
4838
4839/// Store a correction with retrieval priority boost.
4840///
4841/// Corrections are stored as frames with special metadata that gives them
4842/// priority during retrieval. This ensures corrected information surfaces
4843/// first when relevant queries are made.
4844pub fn handle_correct(config: &CliConfig, args: CorrectArgs) -> Result<()> {
4845    crate::utils::require_active_plan(config, "correct")?;
4846
4847    let mut mem = Memvid::open(&args.file)?;
4848    ensure_cli_mutation_allowed(&mem)?;
4849    apply_lock_cli(&mut mem, &args.lock);
4850
4851    let stats = mem.stats()?;
4852    if stats.frame_count == 0 && !stats.has_lex_index {
4853        mem.enable_lex()?;
4854    }
4855
4856    // Build extra_metadata for correction
4857    let mut extra_metadata = BTreeMap::new();
4858    extra_metadata.insert("memvid.correction".to_string(), "true".to_string());
4859    extra_metadata.insert("memvid.correction.boost".to_string(), args.boost.to_string());
4860    if let Some(source) = &args.source {
4861        extra_metadata.insert("memvid.correction.source".to_string(), source.clone());
4862    }
4863
4864    // Convert topics to tags for better retrieval matching
4865    let tags: Vec<String> = args.topics.clone().unwrap_or_default();
4866
4867    // Build put options
4868    let mut options = PutOptions::default();
4869    options.title = Some(format!("Correction: {}", truncate_title(&args.statement, 50)));
4870    options.uri = Some(format!("mv2://correction/{}", Uuid::new_v4()));
4871    options.labels = vec!["correction".to_string()];
4872    options.tags = tags;
4873    options.extra_metadata = extra_metadata;
4874    options.no_raw = true; // Corrections are text-only
4875
4876    // Store the correction
4877    let frame_id = mem.put_bytes_with_options(args.statement.as_bytes(), options)?;
4878
4879    if args.json {
4880        let output = json!({
4881            "frame_id": frame_id,
4882            "type": "correction",
4883            "boost": args.boost,
4884            "topics": args.topics.unwrap_or_default(),
4885            "source": args.source,
4886        });
4887        println!("{}", serde_json::to_string_pretty(&output)?);
4888    } else {
4889        println!("✓ Correction stored (frame_id: {})", frame_id);
4890        if let Some(topics) = &args.topics {
4891            println!("  Topics: {}", topics.join(", "));
4892        }
4893        if let Some(source) = &args.source {
4894            println!("  Source: {}", source);
4895        }
4896        println!("  Boost:  {}x", args.boost);
4897    }
4898
4899    Ok(())
4900}
4901
4902/// Truncate a string to a maximum length, adding "..." if truncated
4903fn truncate_title(s: &str, max_len: usize) -> String {
4904    if s.len() <= max_len {
4905        s.to_string()
4906    } else {
4907        format!("{}...", &s[..max_len.saturating_sub(3)])
4908    }
4909}