memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34    MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
48/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
49const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53/// Maximum number of images to extract from a PDF for CLIP visual search.
54/// Set high to capture all images; processing stops when no more images found.
55#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60/// Truncate text for embedding to fit within OpenAI token limits.
61/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
62fn truncate_for_embedding(text: &str) -> String {
63    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64        text.to_string()
65    } else {
66        // Find a char boundary to avoid splitting UTF-8
67        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68        // Try to find the last complete character
69        let end = truncated
70            .char_indices()
71            .rev()
72            .next()
73            .map(|(i, c)| i + c.len_utf8())
74            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75        text[..end].to_string()
76    }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80    options.extra_metadata.insert(
81        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82        runtime.provider_kind().to_string(),
83    );
84    options.extra_metadata.insert(
85        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86        runtime.provider_model_id(),
87    );
88    options.extra_metadata.insert(
89        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90        runtime.dimension().to_string(),
91    );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95    options: &mut PutOptions,
96    model: EmbeddingModelChoice,
97    dimension: usize,
98    model_id_override: Option<&str>,
99) {
100    let provider = match model {
101        EmbeddingModelChoice::OpenAILarge
102        | EmbeddingModelChoice::OpenAISmall
103        | EmbeddingModelChoice::OpenAIAda => "openai",
104        EmbeddingModelChoice::Nvidia => "nvidia",
105        _ => "fastembed",
106    };
107
108    let model_id = match model {
109        EmbeddingModelChoice::Nvidia => model_id_override
110            .map(str::trim)
111            .filter(|value| !value.is_empty())
112            .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113            .filter(|value| !value.is_empty())
114            .map(|value| {
115                if value.contains('/') {
116                    value.to_string()
117                } else {
118                    format!("nvidia/{value}")
119                }
120            })
121            .unwrap_or_else(|| model.canonical_model_id().to_string()),
122        _ => model.canonical_model_id().to_string(),
123    };
124
125    options.extra_metadata.insert(
126        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127        provider.to_string(),
128    );
129    options.extra_metadata.insert(
130        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131        model_id,
132    );
133    options.extra_metadata.insert(
134        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135        dimension.to_string(),
136    );
137}
138
139/// Get the default local model path for contextual retrieval (phi-3.5-mini).
140/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
141#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144        // Handle ~ expansion manually
145        let expanded = if custom.starts_with("~/") {
146            if let Ok(home) = env::var("HOME") {
147                PathBuf::from(home).join(&custom[2..])
148            } else {
149                PathBuf::from(&custom)
150            }
151        } else {
152            PathBuf::from(&custom)
153        };
154        expanded
155    } else {
156        // Default to ~/.memvid/models
157        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158        PathBuf::from(home).join(".memvid").join("models")
159    };
160
161    let model_path = models_dir
162        .join("llm")
163        .join("phi-3-mini-q4")
164        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166    if model_path.exists() {
167        Ok(model_path)
168    } else {
169        Err(anyhow!(
170            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171            model_path.display()
172        ))
173    }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185    select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190/// Helper function to parse boolean flags from environment variables
191fn parse_bool_flag(value: &str) -> Option<bool> {
192    match value.trim().to_ascii_lowercase().as_str() {
193        "1" | "true" | "yes" | "on" => Some(true),
194        "0" | "false" | "no" | "off" => Some(false),
195        _ => None,
196    }
197}
198
199/// Helper function to check for parallel segments environment override
200#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202    std::env::var("MEMVID_PARALLEL_SEGMENTS")
203        .ok()
204        .and_then(|value| parse_bool_flag(&value))
205}
206
207/// Check if CLIP model files are installed
208#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211        PathBuf::from(custom)
212    } else if let Ok(home) = env::var("HOME") {
213        PathBuf::from(home).join(".memvid/models")
214    } else {
215        PathBuf::from(".memvid/models")
216    };
217
218    let model_name =
219        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224    vision_model.exists() && text_model.exists()
225}
226
227/// Minimum confidence threshold for NER entities (0.0-1.0)
228/// Note: Lower threshold captures more entities but also more noise
229#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232/// Minimum length for entity names (characters)
233#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236/// Maximum gap (in characters) between adjacent entities to merge
237/// Allows merging "S" + "&" + "P Global" when they're close together
238#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241/// Merge adjacent NER entities that appear to be tokenization artifacts.
242///
243/// The BERT tokenizer splits names at special characters like `&`, producing
244/// separate entities like "S", "&", "P Global" instead of "S&P Global".
245/// This function merges adjacent entities of the same type when they're
246/// contiguous or nearly contiguous in the original text.
247#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249    entities: Vec<memvid_core::ExtractedEntity>,
250    original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252    use memvid_core::ExtractedEntity;
253
254    if entities.is_empty() {
255        return entities;
256    }
257
258    // Sort by byte position
259    let mut sorted: Vec<ExtractedEntity> = entities;
260    sorted.sort_by_key(|e| e.byte_start);
261
262    let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264    for entity in sorted {
265        if let Some(last) = merged.last_mut() {
266            // Check if this entity is adjacent to the previous one and same type
267            let gap = entity.byte_start.saturating_sub(last.byte_end);
268            let same_type = last.entity_type == entity.entity_type;
269
270            // Check what's in the gap (if any)
271            let gap_is_mergeable = if gap == 0 {
272                true
273            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274                // Gap content should be whitespace (but not newlines), punctuation, or connectors
275                let gap_text = &original_text[last.byte_end..entity.byte_start];
276                // Don't merge across newlines - that indicates separate entities
277                if gap_text.contains('\n') || gap_text.contains('\r') {
278                    false
279                } else {
280                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281                }
282            } else {
283                false
284            };
285
286            if same_type && gap_is_mergeable {
287                // Merge: extend the previous entity to include this one
288                let merged_text = if entity.byte_end <= original_text.len() {
289                    original_text[last.byte_start..entity.byte_end].to_string()
290                } else {
291                    format!("{}{}", last.text, entity.text)
292                };
293
294                tracing::debug!(
295                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296                    last.text, entity.text, merged_text, gap
297                );
298
299                last.text = merged_text;
300                last.byte_end = entity.byte_end;
301                // Average the confidence
302                last.confidence = (last.confidence + entity.confidence) / 2.0;
303                continue;
304            }
305        }
306
307        merged.push(entity);
308    }
309
310    merged
311}
312
313/// Filter and clean extracted NER entities to remove noise.
314/// Returns true if the entity should be kept.
315#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317    // Filter by confidence
318    if confidence < NER_MIN_CONFIDENCE {
319        return false;
320    }
321
322    let trimmed = text.trim();
323
324    // Filter by minimum length
325    if trimmed.len() < NER_MIN_ENTITY_LEN {
326        return false;
327    }
328
329    // Filter out entities that are just whitespace or punctuation
330    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331        return false;
332    }
333
334    // Filter out entities containing newlines (malformed extractions)
335    if trimmed.contains('\n') || trimmed.contains('\r') {
336        return false;
337    }
338
339    // Filter out entities that look like partial words (start/end with ##)
340    if trimmed.starts_with("##") || trimmed.ends_with("##") {
341        return false;
342    }
343
344    // Filter out entities ending with period (likely sentence fragments)
345    if trimmed.ends_with('.') {
346        return false;
347    }
348
349    let lower = trimmed.to_lowercase();
350
351    // Filter out common single-letter false positives
352    if trimmed.len() == 1 {
353        return false;
354    }
355
356    // Filter out 2-letter tokens that are common noise
357    if trimmed.len() == 2 {
358        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
359        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360            return true;
361        }
362        // Filter other 2-letter tokens
363        return false;
364    }
365
366    // Filter out common noise words
367    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368        return false;
369    }
370
371    // Filter out partial tokenization artifacts
372    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
373    let words: Vec<&str> = trimmed.split_whitespace().collect();
374    if words.len() >= 2 {
375        let first_word = words[0];
376        // If first word is 1-2 chars and looks like a fragment, filter it
377        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378            // But allow "US Bank", "UK Office", etc.
379            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380                return false;
381            }
382        }
383    }
384
385    // Filter entities that start with lowercase (likely partial words)
386    if let Some(first_char) = trimmed.chars().next() {
387        if first_char.is_lowercase() {
388            return false;
389        }
390    }
391
392    true
393}
394
395/// Parse key=value pairs for tags
396pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397    let (key, value) = s
398        .split_once('=')
399        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400    Ok((key.to_string(), value.to_string()))
401}
402
403/// Lock-related CLI arguments (shared across commands)
404#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406    /// Maximum time to wait for an active writer before failing (ms)
407    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408    pub lock_timeout: u64,
409    /// Attempt a stale takeover if the recorded writer heartbeat has expired
410    #[arg(long = "force", action = ArgAction::SetTrue)]
411    pub force: bool,
412}
413
414/// Arguments for the `put` subcommand
415#[derive(Args)]
416pub struct PutArgs {
417    /// Path to the memory file to append to
418    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419    pub file: PathBuf,
420    /// Emit JSON instead of human-readable output
421    #[arg(long)]
422    pub json: bool,
423    /// Read payload bytes from a file instead of stdin
424    #[arg(long, value_name = "PATH")]
425    pub input: Option<String>,
426    /// Override the derived URI for the document
427    #[arg(long, value_name = "URI")]
428    pub uri: Option<String>,
429    /// Override the derived title for the document
430    #[arg(long, value_name = "TITLE")]
431    pub title: Option<String>,
432    /// Optional POSIX timestamp to store on the frame
433    #[arg(long)]
434    pub timestamp: Option<i64>,
435    /// Track name for the frame
436    #[arg(long)]
437    pub track: Option<String>,
438    /// Kind metadata for the frame
439    #[arg(long)]
440    pub kind: Option<String>,
441    /// Store the payload as a binary video without transcoding
442    #[arg(long, action = ArgAction::SetTrue)]
443    pub video: bool,
444    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
445    #[arg(long, action = ArgAction::SetTrue)]
446    pub transcript: bool,
447    /// Tags to attach in key=value form
448    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449    pub tags: Vec<(String, String)>,
450    /// Free-form labels to attach to the frame
451    #[arg(long = "label", value_name = "LABEL")]
452    pub labels: Vec<String>,
453    /// Additional metadata payloads expressed as JSON
454    #[arg(long = "metadata", value_name = "JSON")]
455    pub metadata: Vec<String>,
456    /// Disable automatic tag generation from extracted text
457    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458    pub no_auto_tag: bool,
459    /// Disable automatic date extraction from content
460    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461    pub no_extract_dates: bool,
462    /// Disable automatic triplet extraction (SPO facts as MemoryCards)
463    #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
464    pub no_extract_triplets: bool,
465    /// Force audio analysis and tagging when ingesting binary data
466    #[arg(long, action = ArgAction::SetTrue)]
467    pub audio: bool,
468    /// Transcribe audio using Whisper and use the transcript for text embedding
469    /// Requires the 'whisper' feature to be enabled
470    #[arg(long, action = ArgAction::SetTrue)]
471    pub transcribe: bool,
472    /// Override the default segment duration (in seconds) when generating audio snippets
473    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
474    pub audio_segment_seconds: Option<u32>,
475    /// Compute semantic embeddings using the installed model
476    /// Increases storage overhead from ~5KB to ~270KB per document
477    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
478    pub embedding: bool,
479    /// Explicitly disable embeddings (default, but kept for clarity)
480    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
481    pub no_embedding: bool,
482    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
483    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
484    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
485    pub embedding_vec: Option<PathBuf>,
486    /// Embedding model identity for the provided `--embedding-vec` vector.
487    ///
488    /// This is written into per-frame `extra_metadata` (`memvid.embedding.*`) so that
489    /// semantic queries can auto-select the correct runtime across CLI/SDKs.
490    ///
491    /// Options: bge-small, bge-base, nomic, gte-large, openai, openai-small, openai-ada, nvidia
492    /// (also accepts canonical IDs like `text-embedding-3-small` and `BAAI/bge-small-en-v1.5`).
493    #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
494    pub embedding_vec_model: Option<String>,
495    /// Enable Product Quantization compression for vectors (16x compression)
496    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
497    /// Only applies when --embedding is enabled
498    #[arg(long, action = ArgAction::SetTrue)]
499    pub vector_compression: bool,
500    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
501    /// This improves retrieval accuracy for preference/personalization queries
502    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
503    #[arg(long, action = ArgAction::SetTrue)]
504    pub contextual: bool,
505    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
506    #[arg(long = "contextual-model", value_name = "MODEL")]
507    pub contextual_model: Option<String>,
508    /// Automatically extract tables from PDF documents
509    /// Uses aggressive mode with fallbacks for best results
510    #[arg(long, action = ArgAction::SetTrue)]
511    pub tables: bool,
512    /// Disable automatic table extraction (when --tables is the default)
513    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
514    pub no_tables: bool,
515    /// Enable CLIP visual embeddings for images and PDF pages
516    /// Allows text-to-image search using natural language queries
517    /// Auto-enabled when CLIP model is installed; use --no-clip to disable
518    #[cfg(feature = "clip")]
519    #[arg(long, action = ArgAction::SetTrue)]
520    pub clip: bool,
521
522    /// Disable CLIP visual embeddings even when model is available
523    #[cfg(feature = "clip")]
524    #[arg(long, action = ArgAction::SetTrue)]
525    pub no_clip: bool,
526
527    /// Replace any existing frame with the same URI instead of inserting a duplicate
528    #[arg(long, conflicts_with = "allow_duplicate")]
529    pub update_existing: bool,
530    /// Allow inserting a new frame even if a frame with the same URI already exists
531    #[arg(long)]
532    pub allow_duplicate: bool,
533
534    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
535    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
536    /// Requires the temporal_enrich feature in memvid-core
537    #[arg(long, action = ArgAction::SetTrue)]
538    pub temporal_enrich: bool,
539
540    /// Bind to a dashboard memory ID (UUID or 24-char ObjectId) for capacity and tracking
541    /// If the file is not already bound, this will sync the capacity ticket from the dashboard
542    #[arg(long = "memory-id", value_name = "ID")]
543    pub memory_id: Option<crate::commands::tickets::MemoryId>,
544
545    /// Build Logic-Mesh entity graph during ingestion (opt-in)
546    /// Extracts entities (people, organizations, locations, etc.) and their relationships
547    /// Enables graph traversal with `memvid follow`
548    /// Requires NER model: `memvid models install --ner distilbert-ner`
549    #[arg(long, action = ArgAction::SetTrue)]
550    pub logic_mesh: bool,
551
552    /// Use the experimental parallel segment builder (requires --features parallel_segments)
553    #[cfg(feature = "parallel_segments")]
554    #[arg(long, action = ArgAction::SetTrue)]
555    pub parallel_segments: bool,
556    /// Force the legacy ingestion path even when the parallel feature is available
557    #[cfg(feature = "parallel_segments")]
558    #[arg(long, action = ArgAction::SetTrue)]
559    pub no_parallel_segments: bool,
560    /// Target tokens per segment when using the parallel builder
561    #[cfg(feature = "parallel_segments")]
562    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
563    pub parallel_segment_tokens: Option<usize>,
564    /// Target pages per segment when using the parallel builder
565    #[cfg(feature = "parallel_segments")]
566    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
567    pub parallel_segment_pages: Option<usize>,
568    /// Worker threads used by the parallel builder
569    #[cfg(feature = "parallel_segments")]
570    #[arg(long = "parallel-threads", value_name = "N")]
571    pub parallel_threads: Option<usize>,
572    /// Worker queue depth used by the parallel builder
573    #[cfg(feature = "parallel_segments")]
574    #[arg(long = "parallel-queue-depth", value_name = "N")]
575    pub parallel_queue_depth: Option<usize>,
576
577    #[command(flatten)]
578    pub lock: LockCliArgs,
579}
580
581#[cfg(feature = "parallel_segments")]
582impl PutArgs {
583    pub fn wants_parallel(&self) -> bool {
584        if self.parallel_segments {
585            return true;
586        }
587        if self.no_parallel_segments {
588            return false;
589        }
590        if let Some(env_flag) = parallel_env_override() {
591            return env_flag;
592        }
593        false
594    }
595
596    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
597        let mut opts = memvid_core::BuildOpts::default();
598        if let Some(tokens) = self.parallel_segment_tokens {
599            opts.segment_tokens = tokens;
600        }
601        if let Some(pages) = self.parallel_segment_pages {
602            opts.segment_pages = pages;
603        }
604        if let Some(threads) = self.parallel_threads {
605            opts.threads = threads;
606        }
607        if let Some(depth) = self.parallel_queue_depth {
608            opts.queue_depth = depth;
609        }
610        opts.sanitize();
611        opts
612    }
613}
614
615#[cfg(not(feature = "parallel_segments"))]
616impl PutArgs {
617    pub fn wants_parallel(&self) -> bool {
618        false
619    }
620}
621
622/// Arguments for the `api-fetch` subcommand
623#[derive(Args)]
624pub struct ApiFetchArgs {
625    /// Path to the memory file to ingest into
626    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
627    pub file: PathBuf,
628    /// Path to the fetch configuration JSON file
629    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
630    pub config: PathBuf,
631    /// Perform a dry run without writing to the memory
632    #[arg(long, action = ArgAction::SetTrue)]
633    pub dry_run: bool,
634    /// Override the configured ingest mode
635    #[arg(long, value_name = "MODE")]
636    pub mode: Option<ApiFetchMode>,
637    /// Override the base URI used when constructing frame URIs
638    #[arg(long, value_name = "URI")]
639    pub uri: Option<String>,
640    /// Emit JSON instead of human-readable output
641    #[arg(long, action = ArgAction::SetTrue)]
642    pub json: bool,
643
644    #[command(flatten)]
645    pub lock: LockCliArgs,
646}
647
648/// Arguments for the `update` subcommand
649#[derive(Args)]
650pub struct UpdateArgs {
651    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
652    pub file: PathBuf,
653    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
654    pub frame_id: Option<u64>,
655    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
656    pub uri: Option<String>,
657    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
658    pub input: Option<PathBuf>,
659    #[arg(long = "set-uri", value_name = "URI")]
660    pub set_uri: Option<String>,
661    #[arg(long, value_name = "TITLE")]
662    pub title: Option<String>,
663    #[arg(long, value_name = "TIMESTAMP")]
664    pub timestamp: Option<i64>,
665    #[arg(long, value_name = "TRACK")]
666    pub track: Option<String>,
667    #[arg(long, value_name = "KIND")]
668    pub kind: Option<String>,
669    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
670    pub tags: Vec<(String, String)>,
671    #[arg(long = "label", value_name = "LABEL")]
672    pub labels: Vec<String>,
673    #[arg(long = "metadata", value_name = "JSON")]
674    pub metadata: Vec<String>,
675    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
676    pub embeddings: bool,
677    #[arg(long)]
678    pub json: bool,
679
680    #[command(flatten)]
681    pub lock: LockCliArgs,
682}
683
684/// Arguments for the `delete` subcommand
685#[derive(Args)]
686pub struct DeleteArgs {
687    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
688    pub file: PathBuf,
689    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
690    pub frame_id: Option<u64>,
691    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
692    pub uri: Option<String>,
693    #[arg(long, action = ArgAction::SetTrue)]
694    pub yes: bool,
695    #[arg(long)]
696    pub json: bool,
697
698    #[command(flatten)]
699    pub lock: LockCliArgs,
700}
701
702/// Handler for `memvid api-fetch`
703pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
704    let command = ApiFetchCommand {
705        file: args.file,
706        config_path: args.config,
707        dry_run: args.dry_run,
708        mode_override: args.mode,
709        uri_override: args.uri,
710        output_json: args.json,
711        lock_timeout_ms: args.lock.lock_timeout,
712        force_lock: args.lock.force,
713    };
714    run_api_fetch(config, command)
715}
716
717/// Handler for `memvid delete`
718pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
719    let mut mem = Memvid::open(&args.file)?;
720    ensure_cli_mutation_allowed(&mem)?;
721    apply_lock_cli(&mut mem, &args.lock);
722    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
723
724    if !args.yes {
725        if let Some(uri) = &frame.uri {
726            println!("About to delete frame {} ({uri})", frame.id);
727        } else {
728            println!("About to delete frame {}", frame.id);
729        }
730        print!("Confirm? [y/N]: ");
731        io::stdout().flush()?;
732        let mut input = String::new();
733        io::stdin().read_line(&mut input)?;
734        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
735        if !confirmed {
736            println!("Aborted");
737            return Ok(());
738        }
739    }
740
741    let seq = mem.delete_frame(frame.id)?;
742    mem.commit()?;
743    let deleted = mem.frame_by_id(frame.id)?;
744
745    if args.json {
746        let json = json!({
747            "frame_id": frame.id,
748            "sequence": seq,
749            "status": frame_status_str(deleted.status),
750        });
751        println!("{}", serde_json::to_string_pretty(&json)?);
752    } else {
753        println!("Deleted frame {} (seq {})", frame.id, seq);
754    }
755
756    Ok(())
757}
758pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
759    let mut mem = Memvid::open(&args.file)?;
760    ensure_cli_mutation_allowed(&mem)?;
761    apply_lock_cli(&mut mem, &args.lock);
762
763    // If memory-id is provided and file isn't already bound, bind it now
764    if let Some(memory_id) = &args.memory_id {
765        let needs_binding = match mem.get_memory_binding() {
766            Some(binding) => binding.memory_id != **memory_id,
767            None => true,
768        };
769        if needs_binding {
770            match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
771                Ok((bound_id, capacity)) => {
772                    eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
773                }
774                Err(e) => {
775                    eprintln!("⚠️  Failed to bind to dashboard memory: {}", e);
776                    eprintln!("   Continuing with existing capacity. Bind manually with:");
777                    eprintln!("   memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
778                }
779            }
780        }
781    }
782
783    // Load active replay session if one exists
784    #[cfg(feature = "replay")]
785    let _ = mem.load_active_session();
786    let mut stats = mem.stats()?;
787    if stats.frame_count == 0 && !stats.has_lex_index {
788        mem.enable_lex()?;
789        stats = mem.stats()?;
790    }
791
792    // Check if current memory size exceeds free tier limit
793    // If so, require API key to continue
794    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
795
796    // Set vector compression mode if requested
797    if args.vector_compression {
798        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
799    }
800
801    let mut capacity_guard = CapacityGuard::from_stats(&stats);
802    let use_parallel = args.wants_parallel();
803    #[cfg(feature = "parallel_segments")]
804    let mut parallel_opts = if use_parallel {
805        Some(args.sanitized_parallel_opts())
806    } else {
807        None
808    };
809    #[cfg(feature = "parallel_segments")]
810    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
811    #[cfg(feature = "parallel_segments")]
812    let mut pending_parallel_indices: Vec<usize> = Vec::new();
813    let input_set = resolve_inputs(args.input.as_deref())?;
814    if args.embedding && args.embedding_vec.is_some() {
815        anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
816    }
817    if args.embedding_vec.is_some() {
818        match &input_set {
819            InputSet::Files(paths) if paths.len() != 1 => {
820                anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
821            }
822            _ => {}
823        }
824    }
825
826    if args.video {
827        if let Some(kind) = &args.kind {
828            if !kind.eq_ignore_ascii_case("video") {
829                anyhow::bail!("--video conflicts with explicit --kind={kind}");
830            }
831        }
832        if matches!(input_set, InputSet::Stdin) {
833            anyhow::bail!("--video requires --input <file>");
834        }
835    }
836
837    #[allow(dead_code)]
838    enum EmbeddingMode {
839        None,
840        Auto,
841        Explicit,
842    }
843
844    // PHASE 1: Make embeddings opt-in, not opt-out
845    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
846    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
847    let mv2_dimension = mem.effective_vec_index_dimension()?;
848    let inferred_embedding_model = if args.embedding {
849        match mem.embedding_identity_summary(10_000) {
850            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
851            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
852                let models: Vec<_> = identities
853                    .iter()
854                    .filter_map(|entry| entry.identity.model.as_deref())
855                    .collect();
856                anyhow::bail!(
857                    "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
858                    Detected models: {:?}\n\n\
859                    Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
860                    models
861                );
862            }
863            memvid_core::EmbeddingIdentitySummary::Unknown => None,
864        }
865    } else {
866        None
867    };
868    let (embedding_mode, embedding_runtime) = if args.embedding {
869        (
870            EmbeddingMode::Explicit,
871            Some(load_embedding_runtime_for_mv2(
872                config,
873                inferred_embedding_model.as_deref(),
874                mv2_dimension,
875            )?),
876        )
877    } else if args.embedding_vec.is_some() {
878        (EmbeddingMode::Explicit, None)
879    } else {
880        (EmbeddingMode::None, None)
881    };
882    let embedding_enabled = args.embedding || args.embedding_vec.is_some();
883    let runtime_ref = embedding_runtime.as_ref();
884
885    // Enable CLIP index if:
886    // 1. --clip flag is set explicitly, OR
887    // 2. Input contains image files and model is available, OR
888    // 3. Model is installed (auto-detect) and --no-clip is not set
889    #[cfg(feature = "clip")]
890    let clip_enabled = {
891        if args.no_clip {
892            false
893        } else if args.clip {
894            true // Explicitly requested
895        } else {
896            // Auto-detect: check if model is installed
897            let model_available = is_clip_model_installed();
898            if model_available {
899                // Model is installed, check if input has images or PDFs
900                match &input_set {
901                    InputSet::Files(paths) => paths.iter().any(|p| {
902                        is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
903                    }),
904                    InputSet::Stdin => false,
905                }
906            } else {
907                false
908            }
909        }
910    };
911    #[cfg(feature = "clip")]
912    let clip_model = if clip_enabled {
913        mem.enable_clip()?;
914        if !args.json {
915            let mode = if args.clip {
916                "explicit"
917            } else {
918                "auto-detected"
919            };
920            eprintln!("ℹ️  CLIP visual embeddings enabled ({mode})");
921            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
922            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
923            eprintln!("   Use --no-clip to disable auto-detection");
924            eprintln!();
925        }
926        // Initialize CLIP model for generating image embeddings
927        match memvid_core::ClipModel::default_model() {
928            Ok(model) => Some(model),
929            Err(e) => {
930                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
931                None
932            }
933        }
934    } else {
935        None
936    };
937
938    // Warn user about vector storage overhead (PHASE 2)
939    if embedding_enabled && !args.json {
940        let capacity = stats.capacity_bytes;
941        if args.vector_compression {
942            // PHASE 3: PQ compression reduces storage 16x
943            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
944            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
945            eprintln!("   Storage: ~20 KB per document (16x compressed)");
946            eprintln!("   Accuracy: ~95% recall@10 maintained");
947            eprintln!(
948                "   Capacity: ~{} documents max for {:.1} GB",
949                max_docs,
950                capacity as f64 / 1e9
951            );
952            eprintln!();
953        } else {
954            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
955            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
956            eprintln!("   Storage: ~270 KB per document");
957            eprintln!(
958                "   Capacity: ~{} documents max for {:.1} GB",
959                max_docs,
960                capacity as f64 / 1e9
961            );
962            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
963            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
964            eprintln!();
965        }
966    }
967
968    let embed_progress = if embedding_enabled {
969        let total = match &input_set {
970            InputSet::Files(paths) => Some(paths.len() as u64),
971            InputSet::Stdin => None,
972        };
973        Some(create_task_progress_bar(total, "embed", false))
974    } else {
975        None
976    };
977    let mut embedded_docs = 0usize;
978
979    if let InputSet::Files(paths) = &input_set {
980        if paths.len() > 1 {
981            if args.uri.is_some() || args.title.is_some() {
982                anyhow::bail!(
983                    "--uri/--title apply to a single file; omit them when using a directory or glob"
984                );
985            }
986        }
987    }
988
989    let mut reports = Vec::new();
990    let mut processed = 0usize;
991    let mut skipped = 0usize;
992    let mut bytes_added = 0u64;
993    let mut summary_warnings: Vec<String> = Vec::new();
994    #[cfg(feature = "clip")]
995    let mut clip_embeddings_added = false;
996
997    let mut capacity_reached = false;
998    match input_set {
999        InputSet::Stdin => {
1000            processed += 1;
1001            match read_payload(None) {
1002                Ok(payload) => {
1003                    let mut analyze_spinner = if args.json {
1004                        None
1005                    } else {
1006                        Some(create_spinner("Analyzing stdin payload..."))
1007                    };
1008                    match ingest_payload(
1009                        &mut mem,
1010                        &args,
1011                        config,
1012                        payload,
1013                        None,
1014                        capacity_guard.as_mut(),
1015                        embedding_enabled,
1016                        runtime_ref,
1017                        use_parallel,
1018                    ) {
1019                        Ok(outcome) => {
1020                            if let Some(pb) = analyze_spinner.take() {
1021                                pb.finish_and_clear();
1022                            }
1023                            bytes_added += outcome.report.stored_bytes as u64;
1024                            if args.json {
1025                                summary_warnings
1026                                    .extend(outcome.report.extraction.warnings.iter().cloned());
1027                            }
1028                            reports.push(outcome.report);
1029                            let report_index = reports.len() - 1;
1030                            if !args.json && !use_parallel {
1031                                if let Some(report) = reports.get(report_index) {
1032                                    print_report(report);
1033                                }
1034                            }
1035                            if args.transcript {
1036                                let notice = transcript_notice_message(&mut mem)?;
1037                                if args.json {
1038                                    summary_warnings.push(notice);
1039                                } else {
1040                                    println!("{notice}");
1041                                }
1042                            }
1043                            if outcome.embedded {
1044                                if let Some(pb) = embed_progress.as_ref() {
1045                                    pb.inc(1);
1046                                }
1047                                embedded_docs += 1;
1048                            }
1049                            if use_parallel {
1050                                #[cfg(feature = "parallel_segments")]
1051                                if let Some(input) = outcome.parallel_input {
1052                                    pending_parallel_indices.push(report_index);
1053                                    pending_parallel_inputs.push(input);
1054                                }
1055                            } else if let Some(guard) = capacity_guard.as_mut() {
1056                                mem.commit()?;
1057                                let stats = mem.stats()?;
1058                                guard.update_after_commit(&stats);
1059                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1060                            }
1061                        }
1062                        Err(err) => {
1063                            if let Some(pb) = analyze_spinner.take() {
1064                                pb.finish_and_clear();
1065                            }
1066                            if err.downcast_ref::<DuplicateUriError>().is_some() {
1067                                return Err(err);
1068                            }
1069                            warn!("skipped stdin payload: {err}");
1070                            skipped += 1;
1071                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1072                                capacity_reached = true;
1073                            }
1074                        }
1075                    }
1076                }
1077                Err(err) => {
1078                    warn!("failed to read stdin: {err}");
1079                    skipped += 1;
1080                }
1081            }
1082        }
1083        InputSet::Files(ref paths) => {
1084            let mut transcript_notice_printed = false;
1085            for path in paths {
1086                processed += 1;
1087                match read_payload(Some(&path)) {
1088                    Ok(payload) => {
1089                        let label = path.display().to_string();
1090                        let mut analyze_spinner = if args.json {
1091                            None
1092                        } else {
1093                            Some(create_spinner(&format!("Analyzing {label}...")))
1094                        };
1095                        match ingest_payload(
1096                            &mut mem,
1097                            &args,
1098                            config,
1099                            payload,
1100                            Some(&path),
1101                            capacity_guard.as_mut(),
1102                            embedding_enabled,
1103                            runtime_ref,
1104                            use_parallel,
1105                        ) {
1106                            Ok(outcome) => {
1107                                if let Some(pb) = analyze_spinner.take() {
1108                                    pb.finish_and_clear();
1109                                }
1110                                bytes_added += outcome.report.stored_bytes as u64;
1111
1112                                // Generate CLIP embedding for image files
1113                                #[cfg(feature = "clip")]
1114                                if let Some(ref model) = clip_model {
1115                                    let mime = outcome.report.mime.as_deref();
1116                                    let clip_frame_id = outcome.report.frame_id;
1117
1118                                    if is_image_file(&path) {
1119                                        match model.encode_image_file(&path) {
1120                                            Ok(embedding) => {
1121                                                if let Err(e) = mem.add_clip_embedding_with_page(
1122                                                    clip_frame_id,
1123                                                    None,
1124                                                    embedding,
1125                                                ) {
1126                                                    warn!(
1127                                                        "Failed to add CLIP embedding for {}: {e}",
1128                                                        path.display()
1129                                                    );
1130                                                } else {
1131                                                    info!(
1132                                                        "Added CLIP embedding for frame {}",
1133                                                        clip_frame_id
1134                                                    );
1135                                                    clip_embeddings_added = true;
1136                                                }
1137                                            }
1138                                            Err(e) => {
1139                                                warn!(
1140                                                    "Failed to encode CLIP embedding for {}: {e}",
1141                                                    path.display()
1142                                                );
1143                                            }
1144                                        }
1145                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1146                                        // Commit before adding child frames to ensure WAL has space
1147                                        if let Err(e) = mem.commit() {
1148                                            warn!("Failed to commit before CLIP extraction: {e}");
1149                                        }
1150
1151                                        match render_pdf_pages_for_clip(
1152                                            &path,
1153                                            CLIP_PDF_MAX_IMAGES,
1154                                            CLIP_PDF_TARGET_PX,
1155                                        ) {
1156                                            Ok(rendered_pages) => {
1157                                                use rayon::prelude::*;
1158
1159                                                // Pre-encode all images to PNG in parallel for better performance
1160                                                // Filter out junk images (solid colors, black, too small)
1161                                                let path_for_closure = path.clone();
1162
1163                                                // Temporarily suppress panic output during image encoding
1164                                                // (some PDFs have malformed images that cause panics in the image crate)
1165                                                let prev_hook = std::panic::take_hook();
1166                                                std::panic::set_hook(Box::new(|_| {
1167                                                    // Silently ignore panics - we handle them with catch_unwind
1168                                                }));
1169
1170                                                let encoded_images: Vec<_> = rendered_pages
1171                                                    .into_par_iter()
1172                                                    .filter_map(|(page_num, image)| {
1173                                                        // Check if image is worth embedding (not junk)
1174                                                        let image_info = get_image_info(&image);
1175                                                        if !image_info.should_embed() {
1176                                                            info!(
1177                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1178                                                                path_for_closure.display(),
1179                                                                page_num,
1180                                                                image_info.color_variance,
1181                                                                image_info.width,
1182                                                                image_info.height
1183                                                            );
1184                                                            return None;
1185                                                        }
1186
1187                                                        // Convert to RGB8 first to ensure consistent buffer size
1188                                                        // (some PDFs have images with alpha or other formats that cause panics)
1189                                                        // Use catch_unwind to handle any panics from corrupted image data
1190                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1191                                                            let rgb_image = image.to_rgb8();
1192                                                            let mut png_bytes = Vec::new();
1193                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1194                                                                &mut Cursor::new(&mut png_bytes),
1195                                                                image::ImageFormat::Png,
1196                                                            ).map(|()| png_bytes)
1197                                                        }));
1198
1199                                                        match encode_result {
1200                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1201                                                            Ok(Err(e)) => {
1202                                                                info!(
1203                                                                    "Skipping image for {} page {}: {e}",
1204                                                                    path_for_closure.display(),
1205                                                                    page_num
1206                                                                );
1207                                                                None
1208                                                            }
1209                                                            Err(_) => {
1210                                                                // Panic occurred - corrupted image data, silently skip
1211                                                                info!(
1212                                                                    "Skipping malformed image for {} page {}",
1213                                                                    path_for_closure.display(),
1214                                                                    page_num
1215                                                                );
1216                                                                None
1217                                                            }
1218                                                        }
1219                                                    })
1220                                                    .collect();
1221
1222                                                // Restore the original panic hook
1223                                                std::panic::set_hook(prev_hook);
1224
1225                                                // Process encoded images sequentially (storage + CLIP encoding)
1226                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1227                                                let mut child_frame_offset = 1u64;
1228                                                let parent_uri = outcome.report.uri.clone();
1229                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1230                                                let total_images = encoded_images.len();
1231
1232                                                // Show progress bar for CLIP extraction
1233                                                let clip_pb = if !args.json && total_images > 0 {
1234                                                    let pb = ProgressBar::new(total_images as u64);
1235                                                    pb.set_style(
1236                                                        ProgressStyle::with_template(
1237                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1238                                                        )
1239                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1240                                                        .progress_chars("█▓░"),
1241                                                    );
1242                                                    Some(pb)
1243                                                } else {
1244                                                    None
1245                                                };
1246
1247                                                for (page_num, image, png_bytes) in encoded_images {
1248                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1249                                                    let child_title = format!(
1250                                                        "{} - Image {} (page {})",
1251                                                        parent_title,
1252                                                        child_frame_offset,
1253                                                        page_num
1254                                                    );
1255
1256                                                    let child_options = PutOptions::builder()
1257                                                        .uri(&child_uri)
1258                                                        .title(&child_title)
1259                                                        .parent_id(clip_frame_id)
1260                                                        .role(FrameRole::ExtractedImage)
1261                                                        .auto_tag(false)
1262                                                        .extract_dates(false)
1263                                                        .build();
1264
1265                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1266                                                        Ok(_child_seq) => {
1267                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1268                                                            child_frame_offset += 1;
1269
1270                                                            match model.encode_image(&image) {
1271                                                                Ok(embedding) => {
1272                                                                    if let Err(e) = mem
1273                                                                        .add_clip_embedding_with_page(
1274                                                                            child_frame_id,
1275                                                                            Some(page_num),
1276                                                                            embedding,
1277                                                                        )
1278                                                                    {
1279                                                                        warn!(
1280                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1281                                                                            path.display(),
1282                                                                            page_num
1283                                                                        );
1284                                                                    } else {
1285                                                                        info!(
1286                                                                            "Added CLIP image frame {} for {} page {}",
1287                                                                            child_frame_id,
1288                                                                            clip_frame_id,
1289                                                                            page_num
1290                                                                        );
1291                                                                        clip_embeddings_added = true;
1292                                                                    }
1293                                                                }
1294                                                                Err(e) => warn!(
1295                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1296                                                                    path.display(),
1297                                                                    page_num
1298                                                                ),
1299                                                            }
1300
1301                                                            // Commit after each image to checkpoint WAL
1302                                                            if let Err(e) = mem.commit() {
1303                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1304                                                            }
1305                                                        }
1306                                                        Err(e) => warn!(
1307                                                            "Failed to store image frame for {} page {}: {e}",
1308                                                            path.display(),
1309                                                            page_num
1310                                                        ),
1311                                                    }
1312
1313                                                    if let Some(ref pb) = clip_pb {
1314                                                        pb.inc(1);
1315                                                    }
1316                                                }
1317
1318                                                if let Some(pb) = clip_pb {
1319                                                    pb.finish_and_clear();
1320                                                }
1321                                            }
1322                                            Err(err) => warn!(
1323                                                "Failed to render PDF pages for CLIP {}: {err}",
1324                                                path.display()
1325                                            ),
1326                                        }
1327                                    }
1328                                }
1329
1330                                if args.json {
1331                                    summary_warnings
1332                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1333                                }
1334                                reports.push(outcome.report);
1335                                let report_index = reports.len() - 1;
1336                                if !args.json && !use_parallel {
1337                                    if let Some(report) = reports.get(report_index) {
1338                                        print_report(report);
1339                                    }
1340                                }
1341                                if args.transcript && !transcript_notice_printed {
1342                                    let notice = transcript_notice_message(&mut mem)?;
1343                                    if args.json {
1344                                        summary_warnings.push(notice);
1345                                    } else {
1346                                        println!("{notice}");
1347                                    }
1348                                    transcript_notice_printed = true;
1349                                }
1350                                if outcome.embedded {
1351                                    if let Some(pb) = embed_progress.as_ref() {
1352                                        pb.inc(1);
1353                                    }
1354                                    embedded_docs += 1;
1355                                }
1356                                if use_parallel {
1357                                    #[cfg(feature = "parallel_segments")]
1358                                    if let Some(input) = outcome.parallel_input {
1359                                        pending_parallel_indices.push(report_index);
1360                                        pending_parallel_inputs.push(input);
1361                                    }
1362                                } else if let Some(guard) = capacity_guard.as_mut() {
1363                                    mem.commit()?;
1364                                    let stats = mem.stats()?;
1365                                    guard.update_after_commit(&stats);
1366                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1367                                }
1368                            }
1369                            Err(err) => {
1370                                if let Some(pb) = analyze_spinner.take() {
1371                                    pb.finish_and_clear();
1372                                }
1373                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1374                                    return Err(err);
1375                                }
1376                                warn!("skipped {}: {err}", path.display());
1377                                skipped += 1;
1378                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1379                                    capacity_reached = true;
1380                                }
1381                            }
1382                        }
1383                    }
1384                    Err(err) => {
1385                        warn!("failed to read {}: {err}", path.display());
1386                        skipped += 1;
1387                    }
1388                }
1389                if capacity_reached {
1390                    break;
1391                }
1392            }
1393        }
1394    }
1395
1396    if capacity_reached {
1397        warn!("capacity limit reached; remaining inputs were not processed");
1398    }
1399
1400    if let Some(pb) = embed_progress.as_ref() {
1401        pb.finish_with_message("embed");
1402    }
1403
1404    if let Some(runtime) = embedding_runtime.as_ref() {
1405        if embedded_docs > 0 {
1406            match embedding_mode {
1407                EmbeddingMode::Explicit => println!(
1408                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1409                    embedded_docs,
1410                    runtime.dimension()
1411                ),
1412                EmbeddingMode::Auto => println!(
1413                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1414                    embedded_docs,
1415                    runtime.dimension()
1416                ),
1417                EmbeddingMode::None => {}
1418            }
1419        } else {
1420            match embedding_mode {
1421                EmbeddingMode::Explicit => {
1422                    println!("No embeddings were generated (no textual content available)");
1423                }
1424                EmbeddingMode::Auto => {
1425                    println!(
1426                        "Semantic runtime available, but no textual content produced embeddings"
1427                    );
1428                }
1429                EmbeddingMode::None => {}
1430            }
1431        }
1432    }
1433
1434    if reports.is_empty() {
1435        if args.json {
1436            if capacity_reached {
1437                summary_warnings.push("capacity_limit_reached".to_string());
1438            }
1439            let summary_json = json!({
1440                "processed": processed,
1441                "ingested": 0,
1442                "skipped": skipped,
1443                "bytes_added": bytes_added,
1444                "bytes_added_human": format_bytes(bytes_added),
1445                "embedded_documents": embedded_docs,
1446                "capacity_reached": capacity_reached,
1447                "warnings": summary_warnings,
1448                "reports": Vec::<serde_json::Value>::new(),
1449            });
1450            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1451        } else {
1452            println!(
1453                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1454                processed, skipped
1455            );
1456        }
1457        return Ok(());
1458    }
1459
1460    #[cfg(feature = "parallel_segments")]
1461    let mut parallel_flush_spinner = if use_parallel && !args.json {
1462        Some(create_spinner("Flushing parallel segments..."))
1463    } else {
1464        None
1465    };
1466
1467    if use_parallel {
1468        #[cfg(feature = "parallel_segments")]
1469        {
1470            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1471                let mut opts = BuildOpts::default();
1472                opts.sanitize();
1473                opts
1474            });
1475            // Set vector compression mode from mem state
1476            opts.vec_compression = mem.vector_compression().clone();
1477            let seqs = if pending_parallel_inputs.is_empty() {
1478                mem.commit_parallel(opts)?;
1479                Vec::new()
1480            } else {
1481                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1482            };
1483            if let Some(pb) = parallel_flush_spinner.take() {
1484                pb.finish_with_message("Flushed parallel segments");
1485            }
1486            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1487                if let Some(report) = reports.get_mut(idx) {
1488                    report.seq = seq;
1489                }
1490            }
1491        }
1492        #[cfg(not(feature = "parallel_segments"))]
1493        {
1494            mem.commit()?;
1495        }
1496    } else {
1497        mem.commit()?;
1498    }
1499
1500    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1501    #[cfg(feature = "clip")]
1502    if clip_embeddings_added {
1503        mem.commit()?;
1504    }
1505
1506    if !args.json && use_parallel {
1507        for report in &reports {
1508            print_report(report);
1509        }
1510    }
1511
1512    // Table extraction for PDF files
1513    let mut tables_extracted = 0usize;
1514    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1515    if args.tables && !args.no_tables {
1516        if let InputSet::Files(ref paths) = input_set {
1517            let pdf_paths: Vec<_> = paths
1518                .iter()
1519                .filter(|p| {
1520                    p.extension()
1521                        .and_then(|e| e.to_str())
1522                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1523                        .unwrap_or(false)
1524                })
1525                .collect();
1526
1527            if !pdf_paths.is_empty() {
1528                let table_spinner = if args.json {
1529                    None
1530                } else {
1531                    Some(create_spinner(&format!(
1532                        "Extracting tables from {} PDF(s)...",
1533                        pdf_paths.len()
1534                    )))
1535                };
1536
1537                // Use aggressive mode with auto fallback for best results
1538                let table_options = TableExtractionOptions::builder()
1539                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1540                    .min_rows(2)
1541                    .min_cols(2)
1542                    .min_quality(memvid_core::table::TableQuality::Low)
1543                    .merge_multi_page(true)
1544                    .build();
1545
1546                for pdf_path in pdf_paths {
1547                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1548                        let filename = pdf_path
1549                            .file_name()
1550                            .and_then(|s| s.to_str())
1551                            .unwrap_or("unknown.pdf");
1552
1553                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1554                            for table in &result.tables {
1555                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1556                                {
1557                                    tables_extracted += 1;
1558                                    table_summaries.push(json!({
1559                                        "table_id": table.table_id,
1560                                        "source_file": table.source_file,
1561                                        "meta_frame_id": meta_id,
1562                                        "rows": table.n_rows,
1563                                        "cols": table.n_cols,
1564                                        "quality": format!("{:?}", table.quality),
1565                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1566                                    }));
1567                                }
1568                            }
1569                        }
1570                    }
1571                }
1572
1573                if let Some(pb) = table_spinner {
1574                    if tables_extracted > 0 {
1575                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1576                    } else {
1577                        pb.finish_with_message("No tables found");
1578                    }
1579                }
1580
1581                // Commit table frames
1582                if tables_extracted > 0 {
1583                    mem.commit()?;
1584                }
1585            }
1586        }
1587    }
1588
1589    // Logic-Mesh: Extract entities and build relationship graph
1590    // Opt-in only: requires --logic-mesh flag
1591    #[cfg(feature = "logic_mesh")]
1592    let mut logic_mesh_entities = 0usize;
1593    #[cfg(feature = "logic_mesh")]
1594    {
1595        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1596
1597        let models_dir = config.models_dir.clone();
1598        let model_path = ner_model_path(&models_dir);
1599        let tokenizer_path = ner_tokenizer_path(&models_dir);
1600
1601        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1602        let ner_available = model_path.exists() && tokenizer_path.exists();
1603        let should_run_ner = args.logic_mesh;
1604
1605        if should_run_ner && !ner_available {
1606            // User explicitly requested --logic-mesh but model not installed
1607            if args.json {
1608                summary_warnings.push(
1609                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1610                );
1611            } else {
1612                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1613                eprintln!("   Run: memvid models install --ner distilbert-ner");
1614            }
1615        } else if should_run_ner {
1616            let ner_spinner = if args.json {
1617                None
1618            } else {
1619                Some(create_spinner("Building Logic-Mesh entity graph..."))
1620            };
1621
1622            match NerModel::load(&model_path, &tokenizer_path, None) {
1623                Ok(mut ner_model) => {
1624                    let mut mesh = LogicMesh::new();
1625                    let mut filtered_count = 0usize;
1626
1627                    // Process each ingested frame for entity extraction using cached search_text
1628                    for report in &reports {
1629                        let frame_id = report.frame_id;
1630                        // Use the cached search_text from ingestion
1631                        if let Some(ref content) = report.search_text {
1632                            if !content.is_empty() {
1633                                match ner_model.extract(content) {
1634                                    Ok(raw_entities) => {
1635                                        // Merge adjacent entities that were split by tokenization
1636                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1637                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1638
1639                                        for entity in &merged_entities {
1640                                            // Apply post-processing filter to remove noisy entities
1641                                            if !is_valid_entity(&entity.text, entity.confidence) {
1642                                                tracing::debug!(
1643                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1644                                                    entity.text, entity.confidence * 100.0
1645                                                );
1646                                                filtered_count += 1;
1647                                                continue;
1648                                            }
1649
1650                                            // Convert NER entity type to EntityKind
1651                                            let kind = entity.to_entity_kind();
1652                                            // Create mesh node with proper structure
1653                                            let node = MeshNode::new(
1654                                                entity.text.to_lowercase(),
1655                                                entity.text.trim().to_string(),
1656                                                kind,
1657                                                entity.confidence,
1658                                                frame_id,
1659                                                entity.byte_start as u32,
1660                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1661                                            );
1662                                            mesh.merge_node(node);
1663                                            logic_mesh_entities += 1;
1664                                        }
1665                                    }
1666                                    Err(e) => {
1667                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1668                                    }
1669                                }
1670                            }
1671                        }
1672                    }
1673
1674                    if logic_mesh_entities > 0 {
1675                        mesh.finalize();
1676                        let node_count = mesh.stats().node_count;
1677                        // Persist mesh to MV2 file
1678                        mem.set_logic_mesh(mesh);
1679                        mem.commit()?;
1680                        info!(
1681                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1682                            logic_mesh_entities,
1683                            node_count,
1684                            filtered_count
1685                        );
1686                    }
1687
1688                    if let Some(pb) = ner_spinner {
1689                        pb.finish_with_message(format!(
1690                            "Logic-Mesh: {} entities ({} unique)",
1691                            logic_mesh_entities,
1692                            mem.mesh_node_count()
1693                        ));
1694                    }
1695                }
1696                Err(e) => {
1697                    if let Some(pb) = ner_spinner {
1698                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1699                    }
1700                    if args.json {
1701                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1702                    }
1703                }
1704            }
1705        }
1706    }
1707
1708    let stats = mem.stats()?;
1709    if args.json {
1710        if capacity_reached {
1711            summary_warnings.push("capacity_limit_reached".to_string());
1712        }
1713        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1714        let total_duration_ms: Option<u64> = {
1715            let sum: u64 = reports
1716                .iter()
1717                .filter_map(|r| r.extraction.duration_ms)
1718                .sum();
1719            if sum > 0 {
1720                Some(sum)
1721            } else {
1722                None
1723            }
1724        };
1725        let total_pages: Option<u32> = {
1726            let sum: u32 = reports
1727                .iter()
1728                .filter_map(|r| r.extraction.pages_processed)
1729                .sum();
1730            if sum > 0 {
1731                Some(sum)
1732            } else {
1733                None
1734            }
1735        };
1736        let embedding_mode_str = match embedding_mode {
1737            EmbeddingMode::None => "none",
1738            EmbeddingMode::Auto => "auto",
1739            EmbeddingMode::Explicit => "explicit",
1740        };
1741        let summary_json = json!({
1742            "processed": processed,
1743            "ingested": reports.len(),
1744            "skipped": skipped,
1745            "bytes_added": bytes_added,
1746            "bytes_added_human": format_bytes(bytes_added),
1747            "embedded_documents": embedded_docs,
1748            "embedding": {
1749                "enabled": embedding_enabled,
1750                "mode": embedding_mode_str,
1751                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1752            },
1753            "tables": {
1754                "enabled": args.tables && !args.no_tables,
1755                "extracted": tables_extracted,
1756                "tables": table_summaries,
1757            },
1758            "capacity_reached": capacity_reached,
1759            "warnings": summary_warnings,
1760            "extraction": {
1761                "total_duration_ms": total_duration_ms,
1762                "total_pages_processed": total_pages,
1763            },
1764            "memory": {
1765                "path": args.file.display().to_string(),
1766                "frame_count": stats.frame_count,
1767                "size_bytes": stats.size_bytes,
1768                "tier": format!("{:?}", stats.tier),
1769            },
1770            "reports": reports_json,
1771        });
1772        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1773    } else {
1774        println!(
1775            "Committed {} frame(s); total frames {}",
1776            reports.len(),
1777            stats.frame_count
1778        );
1779        println!(
1780            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1781            processed,
1782            reports.len(),
1783            skipped,
1784            format_bytes(bytes_added)
1785        );
1786        if tables_extracted > 0 {
1787            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1788        }
1789    }
1790
1791    // Save active replay session if one exists
1792    #[cfg(feature = "replay")]
1793    let _ = mem.save_active_session();
1794
1795    Ok(())
1796}
1797
1798pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1799    let mut mem = Memvid::open(&args.file)?;
1800    ensure_cli_mutation_allowed(&mem)?;
1801    apply_lock_cli(&mut mem, &args.lock);
1802    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1803    let frame_id = existing.id;
1804
1805    let payload_bytes = match args.input.as_ref() {
1806        Some(path) => Some(read_payload(Some(path))?),
1807        None => None,
1808    };
1809    let payload_slice = payload_bytes.as_deref();
1810    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1811    let source_path = args.input.as_deref();
1812
1813    let mut options = PutOptions::default();
1814    options.enable_embedding = false;
1815    options.auto_tag = payload_slice.is_some();
1816    options.extract_dates = payload_slice.is_some();
1817    options.timestamp = Some(existing.timestamp);
1818    options.track = existing.track.clone();
1819    options.kind = existing.kind.clone();
1820    options.uri = existing.uri.clone();
1821    options.title = existing.title.clone();
1822    options.metadata = existing.metadata.clone();
1823    options.search_text = existing.search_text.clone();
1824    options.tags = existing.tags.clone();
1825    options.labels = existing.labels.clone();
1826    options.extra_metadata = existing.extra_metadata.clone();
1827
1828    if let Some(new_uri) = &args.set_uri {
1829        options.uri = Some(derive_uri(Some(new_uri), None));
1830    }
1831
1832    if options.uri.is_none() && payload_slice.is_some() {
1833        options.uri = Some(derive_uri(None, source_path));
1834    }
1835
1836    if let Some(title) = &args.title {
1837        options.title = Some(title.clone());
1838    }
1839    if let Some(ts) = args.timestamp {
1840        options.timestamp = Some(ts);
1841    }
1842    if let Some(track) = &args.track {
1843        options.track = Some(track.clone());
1844    }
1845    if let Some(kind) = &args.kind {
1846        options.kind = Some(kind.clone());
1847    }
1848
1849    if !args.labels.is_empty() {
1850        options.labels = args.labels.clone();
1851    }
1852
1853    if !args.tags.is_empty() {
1854        options.tags.clear();
1855        for (key, value) in &args.tags {
1856            if !key.is_empty() {
1857                options.tags.push(key.clone());
1858            }
1859            if !value.is_empty() && value != key {
1860                options.tags.push(value.clone());
1861            }
1862            options.extra_metadata.insert(key.clone(), value.clone());
1863        }
1864    }
1865
1866    apply_metadata_overrides(&mut options, &args.metadata);
1867
1868    let mut search_source = options.search_text.clone();
1869
1870    if let Some(payload) = payload_slice {
1871        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1872        if let Some(title) = inferred_title {
1873            options.title = Some(title);
1874        }
1875
1876        if options.uri.is_none() {
1877            options.uri = Some(derive_uri(None, source_path));
1878        }
1879
1880        let analysis = analyze_file(
1881            source_path,
1882            payload,
1883            payload_utf8,
1884            options.title.as_deref(),
1885            AudioAnalyzeOptions::default(),
1886            false,
1887        );
1888        if let Some(meta) = analysis.metadata {
1889            options.metadata = Some(meta);
1890        }
1891        if let Some(text) = analysis.search_text {
1892            search_source = Some(text.clone());
1893            options.search_text = Some(text);
1894        }
1895    } else {
1896        options.auto_tag = false;
1897        options.extract_dates = false;
1898    }
1899
1900    let stats = mem.stats()?;
1901    options.enable_embedding = stats.has_vec_index;
1902
1903    // Auto-detect embedding model from existing MV2 dimension
1904    let mv2_dimension = mem.effective_vec_index_dimension()?;
1905    let mut embedding: Option<Vec<f32>> = None;
1906    if args.embeddings {
1907        let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1908            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1909            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1910                let models: Vec<_> = identities
1911                    .iter()
1912                    .filter_map(|entry| entry.identity.model.as_deref())
1913                    .collect();
1914                anyhow::bail!(
1915                    "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1916                    Detected models: {:?}",
1917                    models
1918                );
1919            }
1920            memvid_core::EmbeddingIdentitySummary::Unknown => None,
1921        };
1922
1923        let runtime = load_embedding_runtime_for_mv2(
1924            config,
1925            inferred_embedding_model.as_deref(),
1926            mv2_dimension,
1927        )?;
1928        if let Some(text) = search_source.as_ref() {
1929            if !text.trim().is_empty() {
1930                embedding = Some(runtime.embed_passage(text)?);
1931            }
1932        }
1933        if embedding.is_none() {
1934            if let Some(text) = payload_utf8 {
1935                if !text.trim().is_empty() {
1936                    embedding = Some(runtime.embed_passage(text)?);
1937                }
1938            }
1939        }
1940        if embedding.is_none() {
1941            warn!("no textual content available; embeddings not recomputed");
1942        }
1943        if embedding.is_some() {
1944            apply_embedding_identity_metadata(&mut options, &runtime);
1945        }
1946    }
1947
1948    let mut effective_embedding = embedding;
1949    if effective_embedding.is_none() && stats.has_vec_index {
1950        effective_embedding = mem.frame_embedding(frame_id)?;
1951    }
1952
1953    let final_uri = options.uri.clone();
1954    let replaced_payload = payload_slice.is_some();
1955    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1956    mem.commit()?;
1957
1958    let updated_frame =
1959        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1960            mem.frame_by_uri(&uri)?
1961        } else {
1962            let mut query = TimelineQueryBuilder::default();
1963            if let Some(limit) = NonZeroU64::new(1) {
1964                query = query.limit(limit).reverse(true);
1965            }
1966            let latest = mem.timeline(query.build())?;
1967            if let Some(entry) = latest.first() {
1968                mem.frame_by_id(entry.frame_id)?
1969            } else {
1970                mem.frame_by_id(frame_id)?
1971            }
1972        };
1973
1974    if args.json {
1975        let json = json!({
1976            "sequence": seq,
1977            "previous_frame_id": frame_id,
1978            "frame": frame_to_json(&updated_frame),
1979            "replaced_payload": replaced_payload,
1980        });
1981        println!("{}", serde_json::to_string_pretty(&json)?);
1982    } else {
1983        println!(
1984            "Updated frame {} → new frame {} (seq {})",
1985            frame_id, updated_frame.id, seq
1986        );
1987        println!(
1988            "Payload: {}",
1989            if replaced_payload {
1990                "replaced"
1991            } else {
1992                "reused"
1993            }
1994        );
1995        print_frame_summary(&mut mem, &updated_frame)?;
1996    }
1997
1998    Ok(())
1999}
2000
2001fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2002    if let Some(uri) = provided {
2003        let sanitized = sanitize_uri(uri, true);
2004        return format!("mv2://{}", sanitized);
2005    }
2006
2007    if let Some(path) = source {
2008        let raw = path.to_string_lossy();
2009        let sanitized = sanitize_uri(&raw, false);
2010        return format!("mv2://{}", sanitized);
2011    }
2012
2013    format!("mv2://frames/{}", Uuid::new_v4())
2014}
2015
2016pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2017    let digest = hash(payload).to_hex();
2018    let short = &digest[..32];
2019    let ext_from_path = source
2020        .and_then(|path| path.extension())
2021        .and_then(|ext| ext.to_str())
2022        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2023        .filter(|ext| !ext.is_empty());
2024    let ext = ext_from_path
2025        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2026        .unwrap_or_else(|| "bin".to_string());
2027    format!("mv2://video/{short}.{ext}")
2028}
2029
2030fn derive_title(
2031    provided: Option<String>,
2032    source: Option<&Path>,
2033    payload_utf8: Option<&str>,
2034) -> Option<String> {
2035    if let Some(title) = provided {
2036        let trimmed = title.trim();
2037        if trimmed.is_empty() {
2038            None
2039        } else {
2040            Some(trimmed.to_string())
2041        }
2042    } else {
2043        if let Some(text) = payload_utf8 {
2044            if let Some(markdown_title) = extract_markdown_title(text) {
2045                return Some(markdown_title);
2046            }
2047        }
2048        source
2049            .and_then(|path| path.file_stem())
2050            .and_then(|stem| stem.to_str())
2051            .map(to_title_case)
2052    }
2053}
2054
2055fn extract_markdown_title(text: &str) -> Option<String> {
2056    for line in text.lines() {
2057        let trimmed = line.trim();
2058        if trimmed.starts_with('#') {
2059            let title = trimmed.trim_start_matches('#').trim();
2060            if !title.is_empty() {
2061                return Some(title.to_string());
2062            }
2063        }
2064    }
2065    None
2066}
2067
2068fn to_title_case(input: &str) -> String {
2069    if input.is_empty() {
2070        return String::new();
2071    }
2072    let mut chars = input.chars();
2073    let Some(first) = chars.next() else {
2074        return String::new();
2075    };
2076    let prefix = first.to_uppercase().collect::<String>();
2077    prefix + chars.as_str()
2078}
2079
2080fn should_skip_input(path: &Path) -> bool {
2081    matches!(
2082        path.file_name().and_then(|name| name.to_str()),
2083        Some(".DS_Store")
2084    )
2085}
2086
2087fn should_skip_dir(path: &Path) -> bool {
2088    matches!(
2089        path.file_name().and_then(|name| name.to_str()),
2090        Some(name) if name.starts_with('.')
2091    )
2092}
2093
2094enum InputSet {
2095    Stdin,
2096    Files(Vec<PathBuf>),
2097}
2098
2099/// Check if a file is an image based on extension
2100#[cfg(feature = "clip")]
2101fn is_image_file(path: &std::path::Path) -> bool {
2102    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2103        return false;
2104    };
2105    matches!(
2106        ext.to_ascii_lowercase().as_str(),
2107        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2108    )
2109}
2110
2111fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2112    let Some(raw) = input else {
2113        return Ok(InputSet::Stdin);
2114    };
2115
2116    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2117        let mut files = Vec::new();
2118        let mut matched_any = false;
2119        for entry in glob(raw)? {
2120            let path = entry?;
2121            if path.is_file() {
2122                matched_any = true;
2123                if should_skip_input(&path) {
2124                    continue;
2125                }
2126                files.push(path);
2127            }
2128        }
2129        files.sort();
2130        if files.is_empty() {
2131            if matched_any {
2132                anyhow::bail!(
2133                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2134                );
2135            }
2136            anyhow::bail!("pattern '{raw}' did not match any files");
2137        }
2138        return Ok(InputSet::Files(files));
2139    }
2140
2141    let path = PathBuf::from(raw);
2142    if path.is_dir() {
2143        let (mut files, skipped_any) = collect_directory_files(&path)?;
2144        files.sort();
2145        if files.is_empty() {
2146            if skipped_any {
2147                anyhow::bail!(
2148                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2149                    path.display()
2150                );
2151            }
2152            anyhow::bail!(
2153                "directory '{}' contains no ingestible files",
2154                path.display()
2155            );
2156        }
2157        Ok(InputSet::Files(files))
2158    } else {
2159        Ok(InputSet::Files(vec![path]))
2160    }
2161}
2162
2163fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2164    let mut files = Vec::new();
2165    let mut skipped_any = false;
2166    let mut stack = vec![root.to_path_buf()];
2167    while let Some(dir) = stack.pop() {
2168        for entry in fs::read_dir(&dir)? {
2169            let entry = entry?;
2170            let path = entry.path();
2171            let file_type = entry.file_type()?;
2172            if file_type.is_dir() {
2173                if should_skip_dir(&path) {
2174                    skipped_any = true;
2175                    continue;
2176                }
2177                stack.push(path);
2178            } else if file_type.is_file() {
2179                if should_skip_input(&path) {
2180                    skipped_any = true;
2181                    continue;
2182                }
2183                files.push(path);
2184            }
2185        }
2186    }
2187    Ok((files, skipped_any))
2188}
2189
2190struct IngestOutcome {
2191    report: PutReport,
2192    embedded: bool,
2193    #[cfg(feature = "parallel_segments")]
2194    parallel_input: Option<ParallelInput>,
2195}
2196
2197struct PutReport {
2198    seq: u64,
2199    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2200    frame_id: u64,
2201    uri: String,
2202    title: Option<String>,
2203    original_bytes: usize,
2204    stored_bytes: usize,
2205    compressed: bool,
2206    source: Option<PathBuf>,
2207    mime: Option<String>,
2208    metadata: Option<DocMetadata>,
2209    extraction: ExtractionSummary,
2210    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2211    #[cfg(feature = "logic_mesh")]
2212    search_text: Option<String>,
2213}
2214
2215struct CapacityGuard {
2216    #[allow(dead_code)]
2217    tier: Tier,
2218    capacity: u64,
2219    current_size: u64,
2220}
2221
2222impl CapacityGuard {
2223    const MIN_OVERHEAD: u64 = 128 * 1024;
2224    const RELATIVE_OVERHEAD: f64 = 0.05;
2225
2226    fn from_stats(stats: &Stats) -> Option<Self> {
2227        if stats.capacity_bytes == 0 {
2228            return None;
2229        }
2230        Some(Self {
2231            tier: stats.tier,
2232            capacity: stats.capacity_bytes,
2233            current_size: stats.size_bytes,
2234        })
2235    }
2236
2237    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2238        let projected = self
2239            .current_size
2240            .saturating_add(additional)
2241            .saturating_add(Self::estimate_overhead(additional));
2242        if projected > self.capacity {
2243            return Err(map_put_error(
2244                MemvidError::CapacityExceeded {
2245                    current: self.current_size,
2246                    limit: self.capacity,
2247                    required: projected,
2248                },
2249                Some(self.capacity),
2250            ));
2251        }
2252        Ok(())
2253    }
2254
2255    fn update_after_commit(&mut self, stats: &Stats) {
2256        self.current_size = stats.size_bytes;
2257    }
2258
2259    fn capacity_hint(&self) -> Option<u64> {
2260        Some(self.capacity)
2261    }
2262
2263    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2264    fn check_and_warn_capacity(&self) {
2265        if self.capacity == 0 {
2266            return;
2267        }
2268
2269        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2270
2271        // Warn at key thresholds
2272        if usage_pct >= 90.0 && usage_pct < 91.0 {
2273            eprintln!(
2274                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2275                format_bytes(self.current_size),
2276                format_bytes(self.capacity)
2277            );
2278            eprintln!("   Actions:");
2279            eprintln!("   1. Recreate with larger --size");
2280            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2281            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2282        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2283            eprintln!(
2284                "⚠️  WARNING: 75% capacity used ({} / {})",
2285                format_bytes(self.current_size),
2286                format_bytes(self.capacity)
2287            );
2288            eprintln!("   Consider planning for capacity expansion soon");
2289        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2290            eprintln!(
2291                "ℹ️  INFO: 50% capacity used ({} / {})",
2292                format_bytes(self.current_size),
2293                format_bytes(self.capacity)
2294            );
2295        }
2296    }
2297
2298    fn estimate_overhead(additional: u64) -> u64 {
2299        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2300        fractional.max(Self::MIN_OVERHEAD)
2301    }
2302}
2303
2304#[derive(Debug, Clone)]
2305pub struct FileAnalysis {
2306    pub mime: String,
2307    pub metadata: Option<DocMetadata>,
2308    pub search_text: Option<String>,
2309    pub extraction: ExtractionSummary,
2310}
2311
2312#[derive(Clone, Copy)]
2313pub struct AudioAnalyzeOptions {
2314    force: bool,
2315    segment_secs: u32,
2316    transcribe: bool,
2317}
2318
2319impl AudioAnalyzeOptions {
2320    const DEFAULT_SEGMENT_SECS: u32 = 30;
2321
2322    fn normalised_segment_secs(self) -> u32 {
2323        self.segment_secs.max(1)
2324    }
2325}
2326
2327impl Default for AudioAnalyzeOptions {
2328    fn default() -> Self {
2329        Self {
2330            force: false,
2331            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2332            transcribe: false,
2333        }
2334    }
2335}
2336
2337struct AudioAnalysis {
2338    metadata: DocAudioMetadata,
2339    caption: Option<String>,
2340    search_terms: Vec<String>,
2341    transcript: Option<String>,
2342}
2343
2344struct ImageAnalysis {
2345    width: u32,
2346    height: u32,
2347    palette: Vec<String>,
2348    caption: Option<String>,
2349    exif: Option<DocExifMetadata>,
2350}
2351
2352#[derive(Debug, Clone)]
2353pub struct ExtractionSummary {
2354    reader: Option<String>,
2355    status: ExtractionStatus,
2356    warnings: Vec<String>,
2357    duration_ms: Option<u64>,
2358    pages_processed: Option<u32>,
2359}
2360
2361impl ExtractionSummary {
2362    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2363        self.warnings.push(warning.into());
2364    }
2365}
2366
2367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2368enum ExtractionStatus {
2369    Skipped,
2370    Ok,
2371    FallbackUsed,
2372    Empty,
2373    Failed,
2374}
2375
2376impl ExtractionStatus {
2377    fn label(self) -> &'static str {
2378        match self {
2379            Self::Skipped => "skipped",
2380            Self::Ok => "ok",
2381            Self::FallbackUsed => "fallback_used",
2382            Self::Empty => "empty",
2383            Self::Failed => "failed",
2384        }
2385    }
2386}
2387
2388fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2389    let filename = source_path
2390        .and_then(|path| path.file_name())
2391        .and_then(|name| name.to_str())
2392        .map(|value| value.to_string());
2393    MediaManifest {
2394        kind: "video".to_string(),
2395        mime: mime.to_string(),
2396        bytes,
2397        filename,
2398        duration_ms: None,
2399        width: None,
2400        height: None,
2401        codec: None,
2402    }
2403}
2404
2405pub fn analyze_file(
2406    source_path: Option<&Path>,
2407    payload: &[u8],
2408    payload_utf8: Option<&str>,
2409    inferred_title: Option<&str>,
2410    audio_opts: AudioAnalyzeOptions,
2411    force_video: bool,
2412) -> FileAnalysis {
2413    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2414    let is_video = force_video || mime.starts_with("video/");
2415    let treat_as_text = if is_video { false } else { treat_as_text_base };
2416
2417    let mut metadata = DocMetadata::default();
2418    metadata.mime = Some(mime.clone());
2419    metadata.bytes = Some(payload.len() as u64);
2420    metadata.hash = Some(hash(payload).to_hex().to_string());
2421
2422    let mut search_text = None;
2423
2424    let mut extraction = ExtractionSummary {
2425        reader: None,
2426        status: ExtractionStatus::Skipped,
2427        warnings: Vec::new(),
2428        duration_ms: None,
2429        pages_processed: None,
2430    };
2431
2432    let reader_registry = default_reader_registry();
2433    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2434        if slice.is_empty() {
2435            None
2436        } else {
2437            Some(slice)
2438        }
2439    });
2440
2441    let apply_extracted_text = |text: &str,
2442                                reader_label: &str,
2443                                status_if_applied: ExtractionStatus,
2444                                extraction: &mut ExtractionSummary,
2445                                metadata: &mut DocMetadata,
2446                                search_text: &mut Option<String>|
2447     -> bool {
2448        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2449            let mut value = normalized.text;
2450            if normalized.truncated {
2451                value.push('…');
2452            }
2453            if metadata.caption.is_none() {
2454                if let Some(caption) = caption_from_text(&value) {
2455                    metadata.caption = Some(caption);
2456                }
2457            }
2458            *search_text = Some(value);
2459            extraction.reader = Some(reader_label.to_string());
2460            extraction.status = status_if_applied;
2461            true
2462        } else {
2463            false
2464        }
2465    };
2466
2467    if is_video {
2468        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2469    }
2470
2471    if treat_as_text {
2472        if let Some(text) = payload_utf8 {
2473            if !apply_extracted_text(
2474                text,
2475                "bytes",
2476                ExtractionStatus::Ok,
2477                &mut extraction,
2478                &mut metadata,
2479                &mut search_text,
2480            ) {
2481                extraction.status = ExtractionStatus::Empty;
2482                extraction.reader = Some("bytes".to_string());
2483                let msg = "text payload contained no searchable content after normalization";
2484                extraction.record_warning(msg);
2485                warn!("{msg}");
2486            }
2487        } else {
2488            extraction.reader = Some("bytes".to_string());
2489            extraction.status = ExtractionStatus::Failed;
2490            let msg = "payload reported as text but was not valid UTF-8";
2491            extraction.record_warning(msg);
2492            warn!("{msg}");
2493        }
2494    } else if mime == "application/pdf"
2495        || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2496        || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2497        || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2498        || mime == "application/vnd.ms-excel"
2499        || mime == "application/msword"
2500    {
2501        // Use reader registry for PDFs and Office documents (XLSX, DOCX, PPTX, etc.)
2502        let mut applied = false;
2503
2504        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2505        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2506            .with_uri(source_path.and_then(|p| p.to_str()))
2507            .with_magic(magic);
2508
2509        if let Some(reader) = reader_registry.find_reader(&hint) {
2510            match reader.extract(payload, &hint) {
2511                Ok(output) => {
2512                    extraction.reader = Some(output.reader_name.clone());
2513                    if let Some(ms) = output.diagnostics.duration_ms {
2514                        extraction.duration_ms = Some(ms);
2515                    }
2516                    if let Some(pages) = output.diagnostics.pages_processed {
2517                        extraction.pages_processed = Some(pages);
2518                    }
2519                    if let Some(mime_type) = output.document.mime_type.clone() {
2520                        metadata.mime = Some(mime_type);
2521                    }
2522
2523                    for warning in output.diagnostics.warnings.iter() {
2524                        extraction.record_warning(warning);
2525                        warn!("{warning}");
2526                    }
2527
2528                    let status = if output.diagnostics.fallback {
2529                        ExtractionStatus::FallbackUsed
2530                    } else {
2531                        ExtractionStatus::Ok
2532                    };
2533
2534                    if let Some(doc_text) = output.document.text.as_ref() {
2535                        applied = apply_extracted_text(
2536                            doc_text,
2537                            output.reader_name.as_str(),
2538                            status,
2539                            &mut extraction,
2540                            &mut metadata,
2541                            &mut search_text,
2542                        );
2543                        if !applied {
2544                            extraction.reader = Some(output.reader_name);
2545                            extraction.status = ExtractionStatus::Empty;
2546                            let msg = "primary reader returned no usable text";
2547                            extraction.record_warning(msg);
2548                            warn!("{msg}");
2549                        }
2550                    } else {
2551                        extraction.reader = Some(output.reader_name);
2552                        extraction.status = ExtractionStatus::Empty;
2553                        let msg = "primary reader returned empty text";
2554                        extraction.record_warning(msg);
2555                        warn!("{msg}");
2556                    }
2557                }
2558                Err(err) => {
2559                    let name = reader.name();
2560                    extraction.reader = Some(name.to_string());
2561                    extraction.status = ExtractionStatus::Failed;
2562                    let msg = format!("primary reader {name} failed: {err}");
2563                    extraction.record_warning(&msg);
2564                    warn!("{msg}");
2565                }
2566            }
2567        } else {
2568            extraction.record_warning("no registered reader matched this PDF payload");
2569            warn!("no registered reader matched this PDF payload");
2570        }
2571
2572        if !applied {
2573            match extract_pdf_text(payload) {
2574                Ok(pdf_text) => {
2575                    if !apply_extracted_text(
2576                        &pdf_text,
2577                        "pdf_extract",
2578                        ExtractionStatus::FallbackUsed,
2579                        &mut extraction,
2580                        &mut metadata,
2581                        &mut search_text,
2582                    ) {
2583                        extraction.reader = Some("pdf_extract".to_string());
2584                        extraction.status = ExtractionStatus::Empty;
2585                        let msg = "PDF text extraction yielded no searchable content";
2586                        extraction.record_warning(msg);
2587                        warn!("{msg}");
2588                    }
2589                }
2590                Err(err) => {
2591                    extraction.reader = Some("pdf_extract".to_string());
2592                    extraction.status = ExtractionStatus::Failed;
2593                    let msg = format!("failed to extract PDF text via fallback: {err}");
2594                    extraction.record_warning(&msg);
2595                    warn!("{msg}");
2596                }
2597            }
2598        }
2599    }
2600
2601    if !is_video && mime.starts_with("image/") {
2602        if let Some(image_meta) = analyze_image(payload) {
2603            let ImageAnalysis {
2604                width,
2605                height,
2606                palette,
2607                caption,
2608                exif,
2609            } = image_meta;
2610            metadata.width = Some(width);
2611            metadata.height = Some(height);
2612            if !palette.is_empty() {
2613                metadata.colors = Some(palette);
2614            }
2615            if metadata.caption.is_none() {
2616                metadata.caption = caption;
2617            }
2618            if let Some(exif) = exif {
2619                metadata.exif = Some(exif);
2620            }
2621        }
2622    }
2623
2624    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2625        if let Some(AudioAnalysis {
2626            metadata: audio_metadata,
2627            caption: audio_caption,
2628            mut search_terms,
2629            transcript,
2630        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2631        {
2632            if metadata.audio.is_none()
2633                || metadata
2634                    .audio
2635                    .as_ref()
2636                    .map_or(true, DocAudioMetadata::is_empty)
2637            {
2638                metadata.audio = Some(audio_metadata);
2639            }
2640            if metadata.caption.is_none() {
2641                metadata.caption = audio_caption;
2642            }
2643
2644            // Use transcript as primary search text if available
2645            if let Some(ref text) = transcript {
2646                extraction.reader = Some("whisper".to_string());
2647                extraction.status = ExtractionStatus::Ok;
2648                search_text = Some(text.clone());
2649            } else if !search_terms.is_empty() {
2650                match &mut search_text {
2651                    Some(existing) => {
2652                        for term in search_terms.drain(..) {
2653                            if !existing.contains(&term) {
2654                                if !existing.ends_with(' ') {
2655                                    existing.push(' ');
2656                                }
2657                                existing.push_str(&term);
2658                            }
2659                        }
2660                    }
2661                    None => {
2662                        search_text = Some(search_terms.join(" "));
2663                    }
2664                }
2665            }
2666        }
2667    }
2668
2669    if metadata.caption.is_none() {
2670        if let Some(title) = inferred_title {
2671            let trimmed = title.trim();
2672            if !trimmed.is_empty() {
2673                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2674            }
2675        }
2676    }
2677
2678    FileAnalysis {
2679        mime,
2680        metadata: if metadata.is_empty() {
2681            None
2682        } else {
2683            Some(metadata)
2684        },
2685        search_text,
2686        extraction,
2687    }
2688}
2689
2690fn default_reader_registry() -> &'static ReaderRegistry {
2691    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2692    REGISTRY.get_or_init(ReaderRegistry::default)
2693}
2694
2695fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2696    if detect_pdf_magic(magic) {
2697        return Some(DocumentFormat::Pdf);
2698    }
2699
2700    let mime = mime?.trim().to_ascii_lowercase();
2701    match mime.as_str() {
2702        "application/pdf" => Some(DocumentFormat::Pdf),
2703        "text/plain" => Some(DocumentFormat::PlainText),
2704        "text/markdown" => Some(DocumentFormat::Markdown),
2705        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2706        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2707            Some(DocumentFormat::Docx)
2708        }
2709        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2710            Some(DocumentFormat::Pptx)
2711        }
2712        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2713            Some(DocumentFormat::Xlsx)
2714        }
2715        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2716        _ => None,
2717    }
2718}
2719
2720fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2721    let mut slice = match magic {
2722        Some(slice) if !slice.is_empty() => slice,
2723        _ => return false,
2724    };
2725    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2726        slice = &slice[3..];
2727    }
2728    while let Some((first, rest)) = slice.split_first() {
2729        if first.is_ascii_whitespace() {
2730            slice = rest;
2731        } else {
2732            break;
2733        }
2734    }
2735    slice.starts_with(b"%PDF")
2736}
2737
2738/// Robust PDF text extraction with multiple fallbacks for cross-platform support.
2739/// Tries all extractors and returns the one with the most text content.
2740fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2741    let mut best_text = String::new();
2742    let mut best_source = "none";
2743
2744    // Minimum chars to consider extraction "good" - scales with PDF size
2745    // For a 12MB PDF, expect at least ~1000 chars; for 1MB, ~100 chars
2746    let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2747
2748    // Try pdf_extract first
2749    match pdf_extract::extract_text_from_mem(payload) {
2750        Ok(text) => {
2751            let trimmed = text.trim();
2752            if trimmed.len() > best_text.len() {
2753                best_text = trimmed.to_string();
2754                best_source = "pdf_extract";
2755            }
2756            if trimmed.len() >= min_good_chars {
2757                info!("pdf_extract extracted {} chars (good)", trimmed.len());
2758                return Ok(best_text);
2759            } else if !trimmed.is_empty() {
2760                warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2761            }
2762        }
2763        Err(err) => {
2764            warn!("pdf_extract failed: {err}");
2765        }
2766    }
2767
2768    // Try lopdf - pure Rust, works on all platforms
2769    match extract_pdf_with_lopdf(payload) {
2770        Ok(text) => {
2771            let trimmed = text.trim();
2772            if trimmed.len() > best_text.len() {
2773                best_text = trimmed.to_string();
2774                best_source = "lopdf";
2775            }
2776            if trimmed.len() >= min_good_chars {
2777                info!("lopdf extracted {} chars (good)", trimmed.len());
2778                return Ok(best_text);
2779            } else if !trimmed.is_empty() {
2780                warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2781            }
2782        }
2783        Err(err) => {
2784            warn!("lopdf failed: {err}");
2785        }
2786    }
2787
2788    // Try pdftotext binary (requires poppler installed)
2789    match fallback_pdftotext(payload) {
2790        Ok(text) => {
2791            let trimmed = text.trim();
2792            if trimmed.len() > best_text.len() {
2793                best_text = trimmed.to_string();
2794                best_source = "pdftotext";
2795            }
2796            if !trimmed.is_empty() {
2797                info!("pdftotext extracted {} chars", trimmed.len());
2798            }
2799        }
2800        Err(err) => {
2801            warn!("pdftotext failed: {err}");
2802        }
2803    }
2804
2805    // Return the best result we found
2806    if !best_text.is_empty() {
2807        info!("using {} extraction ({} chars)", best_source, best_text.len());
2808        Ok(best_text)
2809    } else {
2810        warn!("all PDF extraction methods failed, storing without searchable text");
2811        Ok(String::new())
2812    }
2813}
2814
2815/// Extract text from PDF using lopdf (pure Rust, no native dependencies)
2816fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2817    use lopdf::Document;
2818
2819    let mut document = Document::load_mem(payload)
2820        .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2821
2822    // Try to decrypt if encrypted (empty password)
2823    if document.is_encrypted() {
2824        let _ = document.decrypt("");
2825    }
2826
2827    // Decompress streams for text extraction
2828    let _ = document.decompress();
2829
2830    // Get sorted page numbers
2831    let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
2832    if page_numbers.is_empty() {
2833        return Err(anyhow!("PDF has no pages"));
2834    }
2835    page_numbers.sort_unstable();
2836
2837    // Limit to 4096 pages max
2838    if page_numbers.len() > 4096 {
2839        page_numbers.truncate(4096);
2840        warn!("PDF has more than 4096 pages, truncating extraction");
2841    }
2842
2843    // Extract text from all pages
2844    let text = document
2845        .extract_text(&page_numbers)
2846        .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
2847
2848    Ok(text.trim().to_string())
2849}
2850
2851fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2852    use std::io::Write;
2853    use std::process::{Command, Stdio};
2854
2855    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2856
2857    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2858    temp_pdf
2859        .write_all(payload)
2860        .context("failed to write pdf payload to temp file")?;
2861    temp_pdf.flush().context("failed to flush temp pdf file")?;
2862
2863    let child = Command::new(pdftotext)
2864        .arg("-layout")
2865        .arg(temp_pdf.path())
2866        .arg("-")
2867        .stdout(Stdio::piped())
2868        .spawn()
2869        .context("failed to spawn pdftotext")?;
2870
2871    let output = child
2872        .wait_with_output()
2873        .context("failed to read pdftotext output")?;
2874
2875    if !output.status.success() {
2876        return Err(anyhow!("pdftotext exited with status {}", output.status));
2877    }
2878
2879    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2880    Ok(text)
2881}
2882
2883fn detect_mime(
2884    source_path: Option<&Path>,
2885    payload: &[u8],
2886    payload_utf8: Option<&str>,
2887) -> (String, bool) {
2888    if let Some(kind) = infer::get(payload) {
2889        let mime = kind.mime_type().to_string();
2890        let treat_as_text = mime.starts_with("text/")
2891            || matches!(
2892                mime.as_str(),
2893                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2894            );
2895        return (mime, treat_as_text);
2896    }
2897
2898    let magic = magic_from_u8(payload);
2899    if !magic.is_empty() && magic != "application/octet-stream" {
2900        let treat_as_text = magic.starts_with("text/")
2901            || matches!(
2902                magic,
2903                "application/json"
2904                    | "application/xml"
2905                    | "application/javascript"
2906                    | "image/svg+xml"
2907                    | "text/plain"
2908            );
2909        return (magic.to_string(), treat_as_text);
2910    }
2911
2912    if let Some(path) = source_path {
2913        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2914            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2915                return (mime.to_string(), treat_as_text);
2916            }
2917        }
2918    }
2919
2920    if payload_utf8.is_some() {
2921        return ("text/plain".to_string(), true);
2922    }
2923
2924    ("application/octet-stream".to_string(), false)
2925}
2926
2927fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2928    let ext_lower = ext.to_ascii_lowercase();
2929    match ext_lower.as_str() {
2930        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2931        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2932        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2933        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2934        "md" | "markdown" => Some(("text/markdown", true)),
2935        "rst" => Some(("text/x-rst", true)),
2936        "json" => Some(("application/json", true)),
2937        "csv" => Some(("text/csv", true)),
2938        "tsv" => Some(("text/tab-separated-values", true)),
2939        "yaml" | "yml" => Some(("application/yaml", true)),
2940        "toml" => Some(("application/toml", true)),
2941        "html" | "htm" => Some(("text/html", true)),
2942        "xml" => Some(("application/xml", true)),
2943        "svg" => Some(("image/svg+xml", true)),
2944        "gif" => Some(("image/gif", false)),
2945        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2946        "png" => Some(("image/png", false)),
2947        "bmp" => Some(("image/bmp", false)),
2948        "ico" => Some(("image/x-icon", false)),
2949        "tif" | "tiff" => Some(("image/tiff", false)),
2950        "webp" => Some(("image/webp", false)),
2951        _ => None,
2952    }
2953}
2954
2955fn caption_from_text(text: &str) -> Option<String> {
2956    for line in text.lines() {
2957        let trimmed = line.trim();
2958        if !trimmed.is_empty() {
2959            return Some(truncate_to_boundary(trimmed, 240));
2960        }
2961    }
2962    None
2963}
2964
2965fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2966    if text.len() <= max_len {
2967        return text.to_string();
2968    }
2969    let mut end = max_len;
2970    while end > 0 && !text.is_char_boundary(end) {
2971        end -= 1;
2972    }
2973    if end == 0 {
2974        return String::new();
2975    }
2976    let mut truncated = text[..end].trim_end().to_string();
2977    truncated.push('…');
2978    truncated
2979}
2980
2981fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2982    let reader = ImageReader::new(Cursor::new(payload))
2983        .with_guessed_format()
2984        .map_err(|err| warn!("failed to guess image format: {err}"))
2985        .ok()?;
2986    let image = reader
2987        .decode()
2988        .map_err(|err| warn!("failed to decode image: {err}"))
2989        .ok()?;
2990    let width = image.width();
2991    let height = image.height();
2992    let rgb = image.to_rgb8();
2993    let palette = match get_palette(
2994        rgb.as_raw(),
2995        ColorFormat::Rgb,
2996        COLOR_PALETTE_SIZE,
2997        COLOR_PALETTE_QUALITY,
2998    ) {
2999        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3000        Err(err) => {
3001            warn!("failed to compute colour palette: {err}");
3002            Vec::new()
3003        }
3004    };
3005
3006    let (exif, exif_caption) = extract_exif_metadata(payload);
3007
3008    Some(ImageAnalysis {
3009        width,
3010        height,
3011        palette,
3012        caption: exif_caption,
3013        exif,
3014    })
3015}
3016
3017fn color_to_hex(color: Color) -> String {
3018    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3019}
3020
3021fn analyze_audio(
3022    payload: &[u8],
3023    source_path: Option<&Path>,
3024    mime: &str,
3025    options: AudioAnalyzeOptions,
3026) -> Option<AudioAnalysis> {
3027    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3028    use symphonia::core::errors::Error as SymphoniaError;
3029    use symphonia::core::formats::FormatOptions;
3030    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3031    use symphonia::core::meta::MetadataOptions;
3032    use symphonia::core::probe::Hint;
3033
3034    let cursor = Cursor::new(payload.to_vec());
3035    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3036
3037    let mut hint = Hint::new();
3038    if let Some(path) = source_path {
3039        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3040            hint.with_extension(ext);
3041        }
3042    }
3043    if let Some(suffix) = mime.split('/').nth(1) {
3044        hint.with_extension(suffix);
3045    }
3046
3047    let probed = symphonia::default::get_probe()
3048        .format(
3049            &hint,
3050            mss,
3051            &FormatOptions::default(),
3052            &MetadataOptions::default(),
3053        )
3054        .map_err(|err| warn!("failed to probe audio stream: {err}"))
3055        .ok()?;
3056
3057    let mut format = probed.format;
3058    let track = format.default_track()?;
3059    let track_id = track.id;
3060    let codec_params: CodecParameters = track.codec_params.clone();
3061    let sample_rate = codec_params.sample_rate;
3062    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3063
3064    let mut decoder = symphonia::default::get_codecs()
3065        .make(&codec_params, &DecoderOptions::default())
3066        .map_err(|err| warn!("failed to create audio decoder: {err}"))
3067        .ok()?;
3068
3069    let mut decoded_frames: u64 = 0;
3070    loop {
3071        let packet = match format.next_packet() {
3072            Ok(packet) => packet,
3073            Err(SymphoniaError::IoError(_)) => break,
3074            Err(SymphoniaError::DecodeError(err)) => {
3075                warn!("skipping undecodable audio packet: {err}");
3076                continue;
3077            }
3078            Err(SymphoniaError::ResetRequired) => {
3079                decoder.reset();
3080                continue;
3081            }
3082            Err(other) => {
3083                warn!("stopping audio analysis due to error: {other}");
3084                break;
3085            }
3086        };
3087
3088        if packet.track_id() != track_id {
3089            continue;
3090        }
3091
3092        match decoder.decode(&packet) {
3093            Ok(audio_buf) => {
3094                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3095            }
3096            Err(SymphoniaError::DecodeError(err)) => {
3097                warn!("failed to decode audio packet: {err}");
3098            }
3099            Err(SymphoniaError::IoError(_)) => break,
3100            Err(SymphoniaError::ResetRequired) => {
3101                decoder.reset();
3102            }
3103            Err(other) => {
3104                warn!("decoder error: {other}");
3105                break;
3106            }
3107        }
3108    }
3109
3110    let duration_secs = match sample_rate {
3111        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3112        _ => 0.0,
3113    };
3114
3115    let bitrate_kbps = if duration_secs > 0.0 {
3116        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3117    } else {
3118        None
3119    };
3120
3121    let tags = extract_lofty_tags(payload);
3122    let caption = tags
3123        .get("title")
3124        .cloned()
3125        .or_else(|| tags.get("tracktitle").cloned());
3126
3127    let mut search_terms = Vec::new();
3128    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3129        search_terms.push(title.clone());
3130    }
3131    if let Some(artist) = tags.get("artist") {
3132        search_terms.push(artist.clone());
3133    }
3134    if let Some(album) = tags.get("album") {
3135        search_terms.push(album.clone());
3136    }
3137    if let Some(genre) = tags.get("genre") {
3138        search_terms.push(genre.clone());
3139    }
3140
3141    let mut segments = Vec::new();
3142    if duration_secs > 0.0 {
3143        let segment_len = options.normalised_segment_secs() as f64;
3144        if segment_len > 0.0 {
3145            let mut start = 0.0;
3146            let mut idx = 1usize;
3147            while start < duration_secs {
3148                let end = (start + segment_len).min(duration_secs);
3149                segments.push(AudioSegmentMetadata {
3150                    start_seconds: start as f32,
3151                    end_seconds: end as f32,
3152                    label: Some(format!("Segment {}", idx)),
3153                });
3154                if end >= duration_secs {
3155                    break;
3156                }
3157                start = end;
3158                idx += 1;
3159            }
3160        }
3161    }
3162
3163    let mut metadata = DocAudioMetadata::default();
3164    if duration_secs > 0.0 {
3165        metadata.duration_secs = Some(duration_secs as f32);
3166    }
3167    if let Some(rate) = sample_rate {
3168        metadata.sample_rate_hz = Some(rate);
3169    }
3170    if let Some(channels) = channel_count {
3171        metadata.channels = Some(channels);
3172    }
3173    if let Some(bitrate) = bitrate_kbps {
3174        metadata.bitrate_kbps = Some(bitrate);
3175    }
3176    if codec_params.codec != CODEC_TYPE_NULL {
3177        metadata.codec = Some(format!("{:?}", codec_params.codec));
3178    }
3179    if !segments.is_empty() {
3180        metadata.segments = segments;
3181    }
3182    if !tags.is_empty() {
3183        metadata.tags = tags
3184            .iter()
3185            .map(|(k, v)| (k.clone(), v.clone()))
3186            .collect::<BTreeMap<_, _>>();
3187    }
3188
3189    // Transcribe audio if requested
3190    let transcript = if options.transcribe {
3191        transcribe_audio(source_path)
3192    } else {
3193        None
3194    };
3195
3196    Some(AudioAnalysis {
3197        metadata,
3198        caption,
3199        search_terms,
3200        transcript,
3201    })
3202}
3203
3204/// Transcribe audio file using Whisper
3205#[cfg(feature = "whisper")]
3206fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3207    use memvid_core::{WhisperConfig, WhisperTranscriber};
3208
3209    let path = source_path?;
3210
3211    tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3212
3213    let config = WhisperConfig::default();
3214    let mut transcriber = match WhisperTranscriber::new(&config) {
3215        Ok(t) => t,
3216        Err(e) => {
3217            tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3218            return None;
3219        }
3220    };
3221
3222    match transcriber.transcribe_file(path) {
3223        Ok(result) => {
3224            tracing::info!(
3225                duration = result.duration_secs,
3226                text_len = result.text.len(),
3227                "Audio transcription complete"
3228            );
3229            Some(result.text)
3230        }
3231        Err(e) => {
3232            tracing::warn!(error = %e, "Failed to transcribe audio");
3233            None
3234        }
3235    }
3236}
3237
3238#[cfg(not(feature = "whisper"))]
3239fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3240    tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3241    None
3242}
3243
3244fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3245    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3246
3247    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3248        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3249            out.entry("title".into())
3250                .or_insert_with(|| value.to_string());
3251            out.entry("tracktitle".into())
3252                .or_insert_with(|| value.to_string());
3253        }
3254        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3255            out.entry("artist".into())
3256                .or_insert_with(|| value.to_string());
3257        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3258            out.entry("artist".into())
3259                .or_insert_with(|| value.to_string());
3260        }
3261        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3262            out.entry("album".into())
3263                .or_insert_with(|| value.to_string());
3264        }
3265        if let Some(value) = tag.get_string(&ItemKey::Genre) {
3266            out.entry("genre".into())
3267                .or_insert_with(|| value.to_string());
3268        }
3269        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3270            out.entry("track_number".into())
3271                .or_insert_with(|| value.to_string());
3272        }
3273        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3274            out.entry("disc_number".into())
3275                .or_insert_with(|| value.to_string());
3276        }
3277    }
3278
3279    let mut tags = HashMap::new();
3280    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3281        Ok(probe) => probe,
3282        Err(err) => {
3283            warn!("failed to guess audio tag file type: {err}");
3284            return tags;
3285        }
3286    };
3287
3288    let tagged_file = match probe.read() {
3289        Ok(file) => file,
3290        Err(err) => {
3291            warn!("failed to read audio tags: {err}");
3292            return tags;
3293        }
3294    };
3295
3296    if let Some(primary) = tagged_file.primary_tag() {
3297        collect_tag(primary, &mut tags);
3298    }
3299    for tag in tagged_file.tags() {
3300        collect_tag(tag, &mut tags);
3301    }
3302
3303    tags
3304}
3305
3306fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3307    let mut cursor = Cursor::new(payload);
3308    let exif = match ExifReader::new().read_from_container(&mut cursor) {
3309        Ok(exif) => exif,
3310        Err(err) => {
3311            warn!("failed to read EXIF metadata: {err}");
3312            return (None, None);
3313        }
3314    };
3315
3316    let mut doc = DocExifMetadata::default();
3317    let mut has_data = false;
3318
3319    if let Some(make) = exif_string(&exif, Tag::Make) {
3320        doc.make = Some(make);
3321        has_data = true;
3322    }
3323    if let Some(model) = exif_string(&exif, Tag::Model) {
3324        doc.model = Some(model);
3325        has_data = true;
3326    }
3327    if let Some(lens) =
3328        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3329    {
3330        doc.lens = Some(lens);
3331        has_data = true;
3332    }
3333    if let Some(dt) =
3334        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3335    {
3336        doc.datetime = Some(dt);
3337        has_data = true;
3338    }
3339
3340    if let Some(gps) = extract_gps_metadata(&exif) {
3341        doc.gps = Some(gps);
3342        has_data = true;
3343    }
3344
3345    let caption = exif_string(&exif, Tag::ImageDescription);
3346
3347    let metadata = if has_data { Some(doc) } else { None };
3348    (metadata, caption)
3349}
3350
3351fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3352    exif.fields()
3353        .find(|field| field.tag == tag)
3354        .and_then(|field| field_to_string(field, exif))
3355}
3356
3357fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3358    let value = field.display_value().with_unit(exif).to_string();
3359    let trimmed = value.trim_matches('\0').trim();
3360    if trimmed.is_empty() {
3361        None
3362    } else {
3363        Some(trimmed.to_string())
3364    }
3365}
3366
3367fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3368    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3369    let latitude_ref_field = exif
3370        .fields()
3371        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3372    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3373    let longitude_ref_field = exif
3374        .fields()
3375        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3376
3377    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3378    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3379
3380    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3381        if reference.eq_ignore_ascii_case("S") {
3382            latitude = -latitude;
3383        }
3384    }
3385    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3386        if reference.eq_ignore_ascii_case("W") {
3387            longitude = -longitude;
3388        }
3389    }
3390
3391    Some(DocGpsMetadata {
3392        latitude,
3393        longitude,
3394    })
3395}
3396
3397fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3398    match value {
3399        ExifValue::Rational(values) if !values.is_empty() => {
3400            let deg = rational_to_f64_u(values.get(0)?)?;
3401            let min = values
3402                .get(1)
3403                .and_then(|v| rational_to_f64_u(v))
3404                .unwrap_or(0.0);
3405            let sec = values
3406                .get(2)
3407                .and_then(|v| rational_to_f64_u(v))
3408                .unwrap_or(0.0);
3409            Some(deg + (min / 60.0) + (sec / 3600.0))
3410        }
3411        ExifValue::SRational(values) if !values.is_empty() => {
3412            let deg = rational_to_f64_i(values.get(0)?)?;
3413            let min = values
3414                .get(1)
3415                .and_then(|v| rational_to_f64_i(v))
3416                .unwrap_or(0.0);
3417            let sec = values
3418                .get(2)
3419                .and_then(|v| rational_to_f64_i(v))
3420                .unwrap_or(0.0);
3421            Some(deg + (min / 60.0) + (sec / 3600.0))
3422        }
3423        _ => None,
3424    }
3425}
3426
3427fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3428    if value.denom == 0 {
3429        None
3430    } else {
3431        Some(value.num as f64 / value.denom as f64)
3432    }
3433}
3434
3435fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3436    if value.denom == 0 {
3437        None
3438    } else {
3439        Some(value.num as f64 / value.denom as f64)
3440    }
3441}
3442
3443fn value_to_ascii(value: &ExifValue) -> Option<String> {
3444    if let ExifValue::Ascii(values) = value {
3445        values
3446            .first()
3447            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3448            .map(|s| s.trim_matches('\0').trim().to_string())
3449            .filter(|s| !s.is_empty())
3450    } else {
3451        None
3452    }
3453}
3454
3455fn ingest_payload(
3456    mem: &mut Memvid,
3457    args: &PutArgs,
3458    config: &CliConfig,
3459    payload: Vec<u8>,
3460    source_path: Option<&Path>,
3461    capacity_guard: Option<&mut CapacityGuard>,
3462    enable_embedding: bool,
3463    runtime: Option<&EmbeddingRuntime>,
3464    parallel_mode: bool,
3465) -> Result<IngestOutcome> {
3466    let original_bytes = payload.len();
3467    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3468
3469    // Check if adding this payload would exceed free tier limit
3470    // Require API key for operations that would exceed 1GB total
3471    let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3472    crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3473
3474    let payload_text = std::str::from_utf8(&payload).ok();
3475    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3476
3477    let audio_opts = AudioAnalyzeOptions {
3478        force: args.audio || args.transcribe,
3479        segment_secs: args
3480            .audio_segment_seconds
3481            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3482        transcribe: args.transcribe,
3483    };
3484
3485    let mut analysis = analyze_file(
3486        source_path,
3487        &payload,
3488        payload_text,
3489        inferred_title.as_deref(),
3490        audio_opts,
3491        args.video,
3492    );
3493
3494    if args.video && !analysis.mime.starts_with("video/") {
3495        anyhow::bail!(
3496            "--video requires a video input; detected MIME type {}",
3497            analysis.mime
3498        );
3499    }
3500
3501    let mut search_text = analysis.search_text.clone();
3502    if let Some(ref mut text) = search_text {
3503        if text.len() > MAX_SEARCH_TEXT_LEN {
3504            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3505            *text = truncated;
3506        }
3507    }
3508    analysis.search_text = search_text.clone();
3509
3510    let uri = if let Some(ref explicit) = args.uri {
3511        derive_uri(Some(explicit), None)
3512    } else if args.video {
3513        derive_video_uri(&payload, source_path, &analysis.mime)
3514    } else {
3515        derive_uri(None, source_path)
3516    };
3517
3518    let mut options_builder = PutOptions::builder()
3519        .enable_embedding(enable_embedding)
3520        .auto_tag(!args.no_auto_tag)
3521        .extract_dates(!args.no_extract_dates)
3522        .extract_triplets(!args.no_extract_triplets);
3523    if let Some(ts) = args.timestamp {
3524        options_builder = options_builder.timestamp(ts);
3525    }
3526    if let Some(track) = &args.track {
3527        options_builder = options_builder.track(track.clone());
3528    }
3529    if args.video {
3530        options_builder = options_builder.kind("video");
3531        options_builder = options_builder.tag("kind", "video");
3532        options_builder = options_builder.push_tag("video");
3533    } else if let Some(kind) = &args.kind {
3534        options_builder = options_builder.kind(kind.clone());
3535    }
3536    options_builder = options_builder.uri(uri.clone());
3537    if let Some(ref title) = inferred_title {
3538        options_builder = options_builder.title(title.clone());
3539    }
3540    for (key, value) in &args.tags {
3541        options_builder = options_builder.tag(key.clone(), value.clone());
3542        if !key.is_empty() {
3543            options_builder = options_builder.push_tag(key.clone());
3544        }
3545        if !value.is_empty() && value != key {
3546            options_builder = options_builder.push_tag(value.clone());
3547        }
3548    }
3549    for label in &args.labels {
3550        options_builder = options_builder.label(label.clone());
3551    }
3552    if let Some(metadata) = analysis.metadata.clone() {
3553        if !metadata.is_empty() {
3554            options_builder = options_builder.metadata(metadata);
3555        }
3556    }
3557    for (idx, entry) in args.metadata.iter().enumerate() {
3558        match serde_json::from_str::<DocMetadata>(entry) {
3559            Ok(meta) => {
3560                options_builder = options_builder.metadata(meta);
3561            }
3562            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3563                Ok(value) => {
3564                    options_builder =
3565                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3566                }
3567                Err(err) => {
3568                    warn!("failed to parse --metadata JSON: {err}");
3569                }
3570            },
3571        }
3572    }
3573    if let Some(text) = search_text.clone() {
3574        if !text.trim().is_empty() {
3575            options_builder = options_builder.search_text(text);
3576        }
3577    }
3578    let mut options = options_builder.build();
3579
3580    let existing_frame = if args.update_existing || !args.allow_duplicate {
3581        match mem.frame_by_uri(&uri) {
3582            Ok(frame) => Some(frame),
3583            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3584            Err(err) => return Err(err.into()),
3585        }
3586    } else {
3587        None
3588    };
3589
3590    // Compute frame_id: for updates it's the existing frame's id,
3591    // for new frames it's the current frame count (which becomes the next id)
3592    let frame_id = if let Some(ref existing) = existing_frame {
3593        existing.id
3594    } else {
3595        mem.stats()?.frame_count
3596    };
3597
3598    let mut embedded = false;
3599    let allow_embedding = enable_embedding && !args.video;
3600    if enable_embedding && !allow_embedding && args.video {
3601        warn!("semantic embeddings are not generated for video payloads");
3602    }
3603    let seq = if let Some(existing) = existing_frame {
3604        if args.update_existing {
3605            let payload_for_update = payload.clone();
3606            if allow_embedding {
3607                if let Some(path) = args.embedding_vec.as_deref() {
3608                    let embedding = crate::utils::read_embedding(path)?;
3609                    if let Some(model_str) = args.embedding_vec_model.as_deref() {
3610                        let choice = model_str.parse::<EmbeddingModelChoice>()?;
3611                        let expected = choice.dimensions();
3612                        if expected != 0 && embedding.len() != expected {
3613                            anyhow::bail!(
3614                                "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3615                                embedding.len(),
3616                                choice.name(),
3617                                expected
3618                            );
3619                        }
3620                        apply_embedding_identity_metadata_from_choice(
3621                            &mut options,
3622                            choice,
3623                            embedding.len(),
3624                            Some(model_str),
3625                        );
3626                    } else {
3627                        options.extra_metadata.insert(
3628                            MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3629                            embedding.len().to_string(),
3630                        );
3631                    }
3632                    embedded = true;
3633                    mem.update_frame(
3634                        existing.id,
3635                        Some(payload_for_update),
3636                        options.clone(),
3637                        Some(embedding),
3638                    )
3639                    .map_err(|err| {
3640                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3641                    })?
3642                } else {
3643                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3644                    let embed_text = search_text
3645                        .clone()
3646                        .or_else(|| payload_text.map(|text| text.to_string()))
3647                        .unwrap_or_default();
3648                    if embed_text.trim().is_empty() {
3649                        warn!("no textual content available; embedding skipped");
3650                        mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3651                            .map_err(|err| {
3652                                map_put_error(
3653                                    err,
3654                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3655                                )
3656                            })?
3657                    } else {
3658                        // Truncate to avoid token limits
3659                        let truncated_text = truncate_for_embedding(&embed_text);
3660                        if truncated_text.len() < embed_text.len() {
3661                            info!(
3662                                "Truncated text from {} to {} chars for embedding",
3663                                embed_text.len(),
3664                                truncated_text.len()
3665                            );
3666                        }
3667                        let embedding = runtime.embed_passage(&truncated_text)?;
3668                        embedded = true;
3669                        apply_embedding_identity_metadata(&mut options, runtime);
3670                        mem.update_frame(
3671                            existing.id,
3672                            Some(payload_for_update),
3673                            options.clone(),
3674                            Some(embedding),
3675                        )
3676                        .map_err(|err| {
3677                            map_put_error(
3678                                err,
3679                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3680                            )
3681                        })?
3682                    }
3683                }
3684            } else {
3685                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3686                    .map_err(|err| {
3687                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3688                    })?
3689            }
3690        } else {
3691            return Err(DuplicateUriError::new(uri.as_str()).into());
3692        }
3693    } else {
3694        if let Some(guard) = capacity_guard.as_ref() {
3695            guard.ensure_capacity(stored_bytes as u64)?;
3696        }
3697        if parallel_mode {
3698            #[cfg(feature = "parallel_segments")]
3699            {
3700                let mut parent_embedding = None;
3701                let mut chunk_embeddings_vec = None;
3702                if allow_embedding {
3703                    if let Some(path) = args.embedding_vec.as_deref() {
3704                        let embedding = crate::utils::read_embedding(path)?;
3705                        if let Some(model_str) = args.embedding_vec_model.as_deref() {
3706                            let choice = model_str.parse::<EmbeddingModelChoice>()?;
3707                            let expected = choice.dimensions();
3708                            if expected != 0 && embedding.len() != expected {
3709                                anyhow::bail!(
3710                                    "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3711                                    embedding.len(),
3712                                    choice.name(),
3713                                    expected
3714                                );
3715                            }
3716                            apply_embedding_identity_metadata_from_choice(
3717                                &mut options,
3718                                choice,
3719                                embedding.len(),
3720                                Some(model_str),
3721                            );
3722                        } else {
3723                            options.extra_metadata.insert(
3724                                MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3725                                embedding.len().to_string(),
3726                            );
3727                        }
3728                        parent_embedding = Some(embedding);
3729                        embedded = true;
3730                    } else {
3731                        let runtime =
3732                            runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3733                        let embed_text = search_text
3734                            .clone()
3735                            .or_else(|| payload_text.map(|text| text.to_string()))
3736                            .unwrap_or_default();
3737                        if embed_text.trim().is_empty() {
3738                            warn!("no textual content available; embedding skipped");
3739                        } else {
3740                            // Check if document will be chunked - if so, embed each chunk
3741                            info!(
3742                                "parallel mode: checking for chunks on {} bytes payload",
3743                                payload.len()
3744                            );
3745                            if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3746                                // Document will be chunked - embed each chunk for full semantic coverage
3747                                info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3748
3749                            // Apply temporal enrichment if enabled (before contextual)
3750                            #[cfg(feature = "temporal_enrich")]
3751                            let enriched_chunks = if args.temporal_enrich {
3752                                info!(
3753                                    "Temporal enrichment enabled, processing {} chunks",
3754                                    chunk_texts.len()
3755                                );
3756                                // Use today's date as fallback document date
3757                                let today = chrono::Local::now().date_naive();
3758                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3759                                // Results are tuples of (enriched_text, TemporalEnrichment)
3760                                let enriched: Vec<String> =
3761                                    results.iter().map(|(text, _)| text.clone()).collect();
3762                                let resolved_count = results
3763                                    .iter()
3764                                    .filter(|(_, e)| {
3765                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3766                                    })
3767                                    .count();
3768                                info!(
3769                                    "Temporal enrichment: resolved {} chunks with temporal context",
3770                                    resolved_count
3771                                );
3772                                enriched
3773                            } else {
3774                                chunk_texts.clone()
3775                            };
3776                            #[cfg(not(feature = "temporal_enrich"))]
3777                            let enriched_chunks = {
3778                                if args.temporal_enrich {
3779                                    warn!(
3780                                        "Temporal enrichment requested but feature not compiled in"
3781                                    );
3782                                }
3783                                chunk_texts.clone()
3784                            };
3785
3786                            // Apply contextual retrieval if enabled
3787                            let embed_chunks = if args.contextual {
3788                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3789                                let engine = if args.contextual_model.as_deref() == Some("local") {
3790                                    #[cfg(feature = "llama-cpp")]
3791                                    {
3792                                        let model_path = get_local_contextual_model_path()?;
3793                                        ContextualEngine::local(model_path)
3794                                    }
3795                                    #[cfg(not(feature = "llama-cpp"))]
3796                                    {
3797                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3798                                    }
3799                                } else if let Some(model) = &args.contextual_model {
3800                                    ContextualEngine::openai_with_model(model)?
3801                                } else {
3802                                    ContextualEngine::openai()?
3803                                };
3804
3805                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3806                                {
3807                                    Ok(contexts) => {
3808                                        info!("Generated {} contextual prefixes", contexts.len());
3809                                        apply_contextual_prefixes(
3810                                            &embed_text,
3811                                            &enriched_chunks,
3812                                            &contexts,
3813                                        )
3814                                    }
3815                                    Err(e) => {
3816                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3817                                        enriched_chunks.clone()
3818                                    }
3819                                }
3820                            } else {
3821                                enriched_chunks
3822                            };
3823
3824                            let truncated_chunks: Vec<String> = embed_chunks
3825                                .iter()
3826                                .map(|chunk_text| {
3827                                    // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
3828                                    let truncated_chunk = truncate_for_embedding(chunk_text);
3829                                    if truncated_chunk.len() < chunk_text.len() {
3830                                        info!(
3831                                            "parallel mode: Truncated chunk from {} to {} chars for embedding",
3832                                            chunk_text.len(),
3833                                            truncated_chunk.len()
3834                                        );
3835                                    }
3836                                    truncated_chunk
3837                                })
3838                                .collect();
3839                            let truncated_refs: Vec<&str> =
3840                                truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3841                            let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3842                            let num_chunks = chunk_embeddings.len();
3843                            chunk_embeddings_vec = Some(chunk_embeddings);
3844                            // Also embed the parent for the parent frame (truncate to avoid token limits)
3845                            let parent_text = truncate_for_embedding(&embed_text);
3846                            if parent_text.len() < embed_text.len() {
3847                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3848                            }
3849                            parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3850                            embedded = true;
3851                            info!(
3852                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3853                                num_chunks
3854                            );
3855                            } else {
3856                                // No chunking - just embed the whole document (truncate to avoid token limits)
3857                                info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3858                                let truncated_text = truncate_for_embedding(&embed_text);
3859                                if truncated_text.len() < embed_text.len() {
3860                                    info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3861                                }
3862                                parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3863                                embedded = true;
3864                            }
3865                        }
3866                    }
3867                }
3868                if embedded {
3869                    if let Some(runtime) = runtime {
3870                        apply_embedding_identity_metadata(&mut options, runtime);
3871                    }
3872                }
3873                let payload_variant = if let Some(path) = source_path {
3874                    ParallelPayload::Path(path.to_path_buf())
3875                } else {
3876                    ParallelPayload::Bytes(payload)
3877                };
3878                let input = ParallelInput {
3879                    payload: payload_variant,
3880                    options: options.clone(),
3881                    embedding: parent_embedding,
3882                    chunk_embeddings: chunk_embeddings_vec,
3883                };
3884                return Ok(IngestOutcome {
3885                    report: PutReport {
3886                        seq: 0,
3887                        frame_id: 0, // Will be populated after parallel commit
3888                        uri,
3889                        title: inferred_title,
3890                        original_bytes,
3891                        stored_bytes,
3892                        compressed,
3893                        source: source_path.map(Path::to_path_buf),
3894                        mime: Some(analysis.mime),
3895                        metadata: analysis.metadata,
3896                        extraction: analysis.extraction,
3897                        #[cfg(feature = "logic_mesh")]
3898                        search_text: search_text.clone(),
3899                    },
3900                    embedded,
3901                    parallel_input: Some(input),
3902                });
3903            }
3904            #[cfg(not(feature = "parallel_segments"))]
3905            {
3906                mem.put_bytes_with_options(&payload, options)
3907                    .map_err(|err| {
3908                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3909                    })?
3910            }
3911        } else if allow_embedding {
3912            if let Some(path) = args.embedding_vec.as_deref() {
3913                let embedding = crate::utils::read_embedding(path)?;
3914                if let Some(model_str) = args.embedding_vec_model.as_deref() {
3915                    let choice = model_str.parse::<EmbeddingModelChoice>()?;
3916                    let expected = choice.dimensions();
3917                    if expected != 0 && embedding.len() != expected {
3918                        anyhow::bail!(
3919                            "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3920                            embedding.len(),
3921                            choice.name(),
3922                            expected
3923                        );
3924                    }
3925                    apply_embedding_identity_metadata_from_choice(
3926                        &mut options,
3927                        choice,
3928                        embedding.len(),
3929                        Some(model_str),
3930                    );
3931                } else {
3932                    options.extra_metadata.insert(
3933                        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3934                        embedding.len().to_string(),
3935                    );
3936                }
3937                embedded = true;
3938                mem.put_with_embedding_and_options(&payload, embedding, options)
3939                    .map_err(|err| {
3940                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3941                    })?
3942            } else {
3943                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3944                let embed_text = search_text
3945                    .clone()
3946                    .or_else(|| payload_text.map(|text| text.to_string()))
3947                    .unwrap_or_default();
3948                if embed_text.trim().is_empty() {
3949                    warn!("no textual content available; embedding skipped");
3950                    mem.put_bytes_with_options(&payload, options)
3951                        .map_err(|err| {
3952                            map_put_error(
3953                                err,
3954                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3955                            )
3956                        })?
3957                } else {
3958                    // Check if document will be chunked - if so, embed each chunk
3959                    if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3960                    // Document will be chunked - embed each chunk for full semantic coverage
3961                    info!(
3962                        "Document will be chunked into {} chunks, generating embeddings for each",
3963                        chunk_texts.len()
3964                    );
3965
3966                    // Apply temporal enrichment if enabled (before contextual)
3967                    #[cfg(feature = "temporal_enrich")]
3968                    let enriched_chunks = if args.temporal_enrich {
3969                        info!(
3970                            "Temporal enrichment enabled, processing {} chunks",
3971                            chunk_texts.len()
3972                        );
3973                        // Use today's date as fallback document date
3974                        let today = chrono::Local::now().date_naive();
3975                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3976                        // Results are tuples of (enriched_text, TemporalEnrichment)
3977                        let enriched: Vec<String> =
3978                            results.iter().map(|(text, _)| text.clone()).collect();
3979                        let resolved_count = results
3980                            .iter()
3981                            .filter(|(_, e)| {
3982                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
3983                            })
3984                            .count();
3985                        info!(
3986                            "Temporal enrichment: resolved {} chunks with temporal context",
3987                            resolved_count
3988                        );
3989                        enriched
3990                    } else {
3991                        chunk_texts.clone()
3992                    };
3993                    #[cfg(not(feature = "temporal_enrich"))]
3994                    let enriched_chunks = {
3995                        if args.temporal_enrich {
3996                            warn!("Temporal enrichment requested but feature not compiled in");
3997                        }
3998                        chunk_texts.clone()
3999                    };
4000
4001                    // Apply contextual retrieval if enabled
4002                    let embed_chunks = if args.contextual {
4003                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4004                        let engine = if args.contextual_model.as_deref() == Some("local") {
4005                            #[cfg(feature = "llama-cpp")]
4006                            {
4007                                let model_path = get_local_contextual_model_path()?;
4008                                ContextualEngine::local(model_path)
4009                            }
4010                            #[cfg(not(feature = "llama-cpp"))]
4011                            {
4012                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4013                            }
4014                        } else if let Some(model) = &args.contextual_model {
4015                            ContextualEngine::openai_with_model(model)?
4016                        } else {
4017                            ContextualEngine::openai()?
4018                        };
4019
4020                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4021                            Ok(contexts) => {
4022                                info!("Generated {} contextual prefixes", contexts.len());
4023                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4024                            }
4025                            Err(e) => {
4026                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4027                                enriched_chunks.clone()
4028                            }
4029                        }
4030                    } else {
4031                        enriched_chunks
4032                    };
4033
4034                    let truncated_chunks: Vec<String> = embed_chunks
4035                        .iter()
4036                        .map(|chunk_text| {
4037                            // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4038                            let truncated_chunk = truncate_for_embedding(chunk_text);
4039                            if truncated_chunk.len() < chunk_text.len() {
4040                                info!(
4041                                    "Truncated chunk from {} to {} chars for embedding",
4042                                    chunk_text.len(),
4043                                    truncated_chunk.len()
4044                                );
4045                            }
4046                            truncated_chunk
4047                        })
4048                        .collect();
4049                    let truncated_refs: Vec<&str> =
4050                        truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4051                    let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4052                    // Truncate parent text to avoid token limits
4053                    let parent_text = truncate_for_embedding(&embed_text);
4054                    if parent_text.len() < embed_text.len() {
4055                        info!(
4056                            "Truncated parent text from {} to {} chars for embedding",
4057                            embed_text.len(),
4058                            parent_text.len()
4059                        );
4060                    }
4061                    let parent_embedding = runtime.embed_passage(&parent_text)?;
4062                    embedded = true;
4063                    apply_embedding_identity_metadata(&mut options, runtime);
4064                    info!(
4065                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
4066                        chunk_embeddings.len()
4067                    );
4068                    mem.put_with_chunk_embeddings(
4069                        &payload,
4070                        Some(parent_embedding),
4071                        chunk_embeddings,
4072                        options,
4073                    )
4074                    .map_err(|err| {
4075                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4076                    })?
4077                    } else {
4078                        // No chunking - just embed the whole document (truncate to avoid token limits)
4079                        info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4080                        let truncated_text = truncate_for_embedding(&embed_text);
4081                        if truncated_text.len() < embed_text.len() {
4082                            info!(
4083                                "Truncated text from {} to {} chars for embedding",
4084                                embed_text.len(),
4085                                truncated_text.len()
4086                            );
4087                        }
4088                        let embedding = runtime.embed_passage(&truncated_text)?;
4089                        embedded = true;
4090                        apply_embedding_identity_metadata(&mut options, runtime);
4091                        mem.put_with_embedding_and_options(&payload, embedding, options)
4092                            .map_err(|err| {
4093                                map_put_error(
4094                                    err,
4095                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4096                                )
4097                            })?
4098                    }
4099                }
4100            }
4101        } else {
4102            mem.put_bytes_with_options(&payload, options)
4103                .map_err(|err| {
4104                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4105                })?
4106        }
4107    };
4108
4109    Ok(IngestOutcome {
4110        report: PutReport {
4111            seq,
4112            frame_id,
4113            uri,
4114            title: inferred_title,
4115            original_bytes,
4116            stored_bytes,
4117            compressed,
4118            source: source_path.map(Path::to_path_buf),
4119            mime: Some(analysis.mime),
4120            metadata: analysis.metadata,
4121            extraction: analysis.extraction,
4122            #[cfg(feature = "logic_mesh")]
4123            search_text,
4124        },
4125        embedded,
4126        #[cfg(feature = "parallel_segments")]
4127        parallel_input: None,
4128    })
4129}
4130
4131fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4132    if std::str::from_utf8(payload).is_ok() {
4133        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4134        Ok((compressed.len(), true))
4135    } else {
4136        Ok((payload.len(), false))
4137    }
4138}
4139
4140fn print_report(report: &PutReport) {
4141    let name = report
4142        .source
4143        .as_ref()
4144        .map(|path| {
4145            let pretty = env::current_dir()
4146                .ok()
4147                .as_ref()
4148                .and_then(|cwd| diff_paths(path, cwd))
4149                .unwrap_or_else(|| path.to_path_buf());
4150            pretty.to_string_lossy().into_owned()
4151        })
4152        .unwrap_or_else(|| "stdin".to_string());
4153
4154    let ratio = if report.original_bytes > 0 {
4155        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4156    } else {
4157        100.0
4158    };
4159
4160    println!("• {name} → {}", report.uri);
4161    println!("  seq: {}", report.seq);
4162    if report.compressed {
4163        println!(
4164            "  size: {} B → {} B ({:.1}% of original, compressed)",
4165            report.original_bytes, report.stored_bytes, ratio
4166        );
4167    } else {
4168        println!("  size: {} B (stored as-is)", report.original_bytes);
4169    }
4170    if let Some(mime) = &report.mime {
4171        println!("  mime: {mime}");
4172    }
4173    if let Some(title) = &report.title {
4174        println!("  title: {title}");
4175    }
4176    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4177    println!(
4178        "  extraction: status={} reader={}",
4179        report.extraction.status.label(),
4180        extraction_reader
4181    );
4182    if let Some(ms) = report.extraction.duration_ms {
4183        println!("  extraction-duration: {} ms", ms);
4184    }
4185    if let Some(pages) = report.extraction.pages_processed {
4186        println!("  extraction-pages: {}", pages);
4187    }
4188    for warning in &report.extraction.warnings {
4189        println!("  warning: {warning}");
4190    }
4191    if let Some(metadata) = &report.metadata {
4192        if let Some(caption) = &metadata.caption {
4193            println!("  caption: {caption}");
4194        }
4195        if let Some(audio) = metadata.audio.as_ref() {
4196            if audio.duration_secs.is_some()
4197                || audio.sample_rate_hz.is_some()
4198                || audio.channels.is_some()
4199            {
4200                let duration = audio
4201                    .duration_secs
4202                    .map(|secs| format!("{secs:.1}s"))
4203                    .unwrap_or_else(|| "unknown".into());
4204                let rate = audio
4205                    .sample_rate_hz
4206                    .map(|hz| format!("{} Hz", hz))
4207                    .unwrap_or_else(|| "? Hz".into());
4208                let channels = audio
4209                    .channels
4210                    .map(|ch| ch.to_string())
4211                    .unwrap_or_else(|| "?".into());
4212                println!("  audio: duration {duration}, {channels} ch, {rate}");
4213            }
4214        }
4215    }
4216
4217    println!();
4218}
4219
4220fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4221    let source = report
4222        .source
4223        .as_ref()
4224        .map(|path| path.to_string_lossy().into_owned());
4225    let extraction = json!({
4226        "status": report.extraction.status.label(),
4227        "reader": report.extraction.reader,
4228        "duration_ms": report.extraction.duration_ms,
4229        "pages_processed": report.extraction.pages_processed,
4230        "warnings": report.extraction.warnings,
4231    });
4232    json!({
4233        "seq": report.seq,
4234        "uri": report.uri,
4235        "title": report.title,
4236        "original_bytes": report.original_bytes,
4237        "original_bytes_human": format_bytes(report.original_bytes as u64),
4238        "stored_bytes": report.stored_bytes,
4239        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4240        "compressed": report.compressed,
4241        "mime": report.mime,
4242        "source": source,
4243        "metadata": report.metadata,
4244        "extraction": extraction,
4245    })
4246}
4247
4248fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4249    match err {
4250        MemvidError::CapacityExceeded {
4251            current,
4252            limit,
4253            required,
4254        } => anyhow!(CapacityExceededMessage {
4255            current,
4256            limit,
4257            required,
4258        }),
4259        other => other.into(),
4260    }
4261}
4262
4263fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4264    for (idx, entry) in entries.iter().enumerate() {
4265        match serde_json::from_str::<DocMetadata>(entry) {
4266            Ok(meta) => {
4267                options.metadata = Some(meta);
4268            }
4269            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4270                Ok(value) => {
4271                    options
4272                        .extra_metadata
4273                        .insert(format!("custom_metadata_{idx}"), value.to_string());
4274                }
4275                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4276            },
4277        }
4278    }
4279}
4280
4281fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4282    let stats = mem.stats()?;
4283    let message = match stats.tier {
4284        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4285        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4286    };
4287    Ok(message)
4288}
4289
4290fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4291    let trimmed = raw.trim().trim_start_matches("mv2://");
4292    let normalized = trimmed.replace('\\', "/");
4293    let mut segments: Vec<String> = Vec::new();
4294    for segment in normalized.split('/') {
4295        if segment.is_empty() || segment == "." {
4296            continue;
4297        }
4298        if segment == ".." {
4299            segments.pop();
4300            continue;
4301        }
4302        segments.push(segment.to_string());
4303    }
4304
4305    if segments.is_empty() {
4306        return normalized
4307            .split('/')
4308            .last()
4309            .filter(|s| !s.is_empty())
4310            .unwrap_or("document")
4311            .to_string();
4312    }
4313
4314    if keep_path {
4315        segments.join("/")
4316    } else {
4317        segments
4318            .last()
4319            .cloned()
4320            .unwrap_or_else(|| "document".to_string())
4321    }
4322}
4323
4324fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4325    if quiet {
4326        let pb = ProgressBar::hidden();
4327        pb.set_message(message.to_string());
4328        return pb;
4329    }
4330
4331    match total {
4332        Some(len) => {
4333            let pb = ProgressBar::new(len);
4334            pb.set_draw_target(ProgressDrawTarget::stderr());
4335            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4336                .unwrap_or_else(|_| ProgressStyle::default_bar());
4337            pb.set_style(style);
4338            pb.set_message(message.to_string());
4339            pb
4340        }
4341        None => {
4342            let pb = ProgressBar::new_spinner();
4343            pb.set_draw_target(ProgressDrawTarget::stderr());
4344            pb.set_message(message.to_string());
4345            pb.enable_steady_tick(Duration::from_millis(120));
4346            pb
4347        }
4348    }
4349}
4350
4351fn create_spinner(message: &str) -> ProgressBar {
4352    let pb = ProgressBar::new_spinner();
4353    pb.set_draw_target(ProgressDrawTarget::stderr());
4354    let style = ProgressStyle::with_template("{spinner} {msg}")
4355        .unwrap_or_else(|_| ProgressStyle::default_spinner())
4356        .tick_strings(&["-", "\\", "|", "/"]);
4357    pb.set_style(style);
4358    pb.set_message(message.to_string());
4359    pb.enable_steady_tick(Duration::from_millis(120));
4360    pb
4361}
4362
4363// ============================================================================
4364// put-many command - batch ingestion with pre-computed embeddings
4365// ============================================================================
4366
4367/// Arguments for the `put-many` subcommand
4368#[cfg(feature = "parallel_segments")]
4369#[derive(Args)]
4370pub struct PutManyArgs {
4371    /// Path to the memory file to append to
4372    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4373    pub file: PathBuf,
4374    /// Emit JSON instead of human-readable output
4375    #[arg(long)]
4376    pub json: bool,
4377    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
4378    /// If not specified, reads from stdin
4379    #[arg(long, value_name = "PATH")]
4380    pub input: Option<PathBuf>,
4381    /// Zstd compression level (1-9, default: 3)
4382    #[arg(long, default_value = "3")]
4383    pub compression_level: i32,
4384    #[command(flatten)]
4385    pub lock: LockCliArgs,
4386}
4387
4388/// JSON input format for put-many requests
4389#[cfg(feature = "parallel_segments")]
4390#[derive(Debug, serde::Deserialize)]
4391pub struct PutManyRequest {
4392    pub title: String,
4393    #[serde(default)]
4394    pub label: String,
4395    pub text: String,
4396    #[serde(default)]
4397    pub uri: Option<String>,
4398    #[serde(default)]
4399    pub tags: Vec<String>,
4400    #[serde(default)]
4401    pub labels: Vec<String>,
4402    #[serde(default)]
4403    pub metadata: serde_json::Value,
4404    /// Extra metadata (string key/value) stored on the frame.
4405    /// Use this to persist embedding identity keys like `memvid.embedding.model`.
4406    #[serde(default)]
4407    pub extra_metadata: BTreeMap<String, String>,
4408    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
4409    #[serde(default)]
4410    pub embedding: Option<Vec<f32>>,
4411    /// Pre-computed embeddings for each chunk (matched by index)
4412    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
4413    /// The number of chunk embeddings should match the number of chunks that will be created.
4414    #[serde(default)]
4415    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4416}
4417
4418/// Batch input format (array of requests)
4419#[cfg(feature = "parallel_segments")]
4420#[derive(Debug, serde::Deserialize)]
4421#[serde(transparent)]
4422pub struct PutManyBatch {
4423    pub requests: Vec<PutManyRequest>,
4424}
4425
4426#[cfg(feature = "parallel_segments")]
4427pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4428    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4429    use std::io::Read;
4430
4431    let mut mem = Memvid::open(&args.file)?;
4432    crate::utils::ensure_cli_mutation_allowed(&mem)?;
4433    crate::utils::apply_lock_cli(&mut mem, &args.lock);
4434
4435    // Ensure indexes are enabled
4436    let stats = mem.stats()?;
4437    if !stats.has_lex_index {
4438        mem.enable_lex()?;
4439    }
4440    if !stats.has_vec_index {
4441        mem.enable_vec()?;
4442    }
4443
4444    // Check if current memory size exceeds free tier limit
4445    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4446
4447    // Read batch input
4448    let batch_json: String = if let Some(input_path) = &args.input {
4449        fs::read_to_string(input_path)
4450            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4451    } else {
4452        let mut buffer = String::new();
4453        io::stdin()
4454            .read_to_string(&mut buffer)
4455            .context("Failed to read from stdin")?;
4456        buffer
4457    };
4458
4459    let batch: PutManyBatch =
4460        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4461
4462    if batch.requests.is_empty() {
4463        if args.json {
4464            println!("{{\"frame_ids\": [], \"count\": 0}}");
4465        } else {
4466            println!("No requests to process");
4467        }
4468        return Ok(());
4469    }
4470
4471    let count = batch.requests.len();
4472    if !args.json {
4473        eprintln!("Processing {} documents...", count);
4474    }
4475
4476    // Convert to ParallelInput
4477    let inputs: Vec<ParallelInput> = batch
4478        .requests
4479        .into_iter()
4480        .enumerate()
4481        .map(|(i, req)| {
4482            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4483            let mut options = PutOptions::default();
4484            options.title = Some(req.title);
4485            options.uri = Some(uri);
4486            options.tags = req.tags;
4487            options.labels = req.labels;
4488            options.extra_metadata = req.extra_metadata;
4489            if !req.metadata.is_null() {
4490                match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4491                    Ok(meta) => options.metadata = Some(meta),
4492                    Err(_) => {
4493                        options
4494                            .extra_metadata
4495                            .entry("memvid.metadata.json".to_string())
4496                            .or_insert_with(|| req.metadata.to_string());
4497                    }
4498                }
4499            }
4500
4501            if let Some(embedding) = req
4502                .embedding
4503                .as_ref()
4504                .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4505            {
4506                options
4507                    .extra_metadata
4508                    .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4509                    .or_insert_with(|| embedding.len().to_string());
4510            }
4511
4512            ParallelInput {
4513                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4514                options,
4515                embedding: req.embedding,
4516                chunk_embeddings: req.chunk_embeddings,
4517            }
4518        })
4519        .collect();
4520
4521    // Build options
4522    let build_opts = BuildOpts {
4523        zstd_level: args.compression_level.clamp(1, 9),
4524        ..BuildOpts::default()
4525    };
4526
4527    // Ingest batch
4528    let frame_ids = mem
4529        .put_parallel_inputs(&inputs, build_opts)
4530        .context("Failed to ingest batch")?;
4531
4532    if args.json {
4533        let output = serde_json::json!({
4534            "frame_ids": frame_ids,
4535            "count": frame_ids.len()
4536        });
4537        println!("{}", serde_json::to_string(&output)?);
4538    } else {
4539        println!("Ingested {} documents", frame_ids.len());
4540        if frame_ids.len() <= 10 {
4541            for fid in &frame_ids {
4542                println!("  frame_id: {}", fid);
4543            }
4544        } else {
4545            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4546            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4547        }
4548    }
4549
4550    Ok(())
4551}