memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34    MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
48/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
49const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53/// Maximum number of images to extract from a PDF for CLIP visual search.
54/// Set high to capture all images; processing stops when no more images found.
55#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60/// Truncate text for embedding to fit within OpenAI token limits.
61/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
62fn truncate_for_embedding(text: &str) -> String {
63    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64        text.to_string()
65    } else {
66        // Find a char boundary to avoid splitting UTF-8
67        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68        // Try to find the last complete character
69        let end = truncated
70            .char_indices()
71            .rev()
72            .next()
73            .map(|(i, c)| i + c.len_utf8())
74            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75        text[..end].to_string()
76    }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80    options.extra_metadata.insert(
81        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82        runtime.provider_kind().to_string(),
83    );
84    options.extra_metadata.insert(
85        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86        runtime.provider_model_id(),
87    );
88    options.extra_metadata.insert(
89        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90        runtime.dimension().to_string(),
91    );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95    options: &mut PutOptions,
96    model: EmbeddingModelChoice,
97    dimension: usize,
98    model_id_override: Option<&str>,
99) {
100    let provider = match model {
101        EmbeddingModelChoice::OpenAILarge
102        | EmbeddingModelChoice::OpenAISmall
103        | EmbeddingModelChoice::OpenAIAda => "openai",
104        EmbeddingModelChoice::Nvidia => "nvidia",
105        _ => "fastembed",
106    };
107
108    let model_id = match model {
109        EmbeddingModelChoice::Nvidia => model_id_override
110            .map(str::trim)
111            .filter(|value| !value.is_empty())
112            .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113            .filter(|value| !value.is_empty())
114            .map(|value| {
115                if value.contains('/') {
116                    value.to_string()
117                } else {
118                    format!("nvidia/{value}")
119                }
120            })
121            .unwrap_or_else(|| model.canonical_model_id().to_string()),
122        _ => model.canonical_model_id().to_string(),
123    };
124
125    options.extra_metadata.insert(
126        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127        provider.to_string(),
128    );
129    options.extra_metadata.insert(
130        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131        model_id,
132    );
133    options.extra_metadata.insert(
134        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135        dimension.to_string(),
136    );
137}
138
139/// Get the default local model path for contextual retrieval (phi-3.5-mini).
140/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
141#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144        // Handle ~ expansion manually
145        let expanded = if custom.starts_with("~/") {
146            if let Ok(home) = env::var("HOME") {
147                PathBuf::from(home).join(&custom[2..])
148            } else {
149                PathBuf::from(&custom)
150            }
151        } else {
152            PathBuf::from(&custom)
153        };
154        expanded
155    } else {
156        // Default to ~/.memvid/models
157        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158        PathBuf::from(home).join(".memvid").join("models")
159    };
160
161    let model_path = models_dir
162        .join("llm")
163        .join("phi-3-mini-q4")
164        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166    if model_path.exists() {
167        Ok(model_path)
168    } else {
169        Err(anyhow!(
170            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171            model_path.display()
172        ))
173    }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185    select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190/// Helper function to parse boolean flags from environment variables
191fn parse_bool_flag(value: &str) -> Option<bool> {
192    match value.trim().to_ascii_lowercase().as_str() {
193        "1" | "true" | "yes" | "on" => Some(true),
194        "0" | "false" | "no" | "off" => Some(false),
195        _ => None,
196    }
197}
198
199/// Helper function to check for parallel segments environment override
200#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202    std::env::var("MEMVID_PARALLEL_SEGMENTS")
203        .ok()
204        .and_then(|value| parse_bool_flag(&value))
205}
206
207/// Check if CLIP model files are installed
208#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211        PathBuf::from(custom)
212    } else if let Ok(home) = env::var("HOME") {
213        PathBuf::from(home).join(".memvid/models")
214    } else {
215        PathBuf::from(".memvid/models")
216    };
217
218    let model_name =
219        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224    vision_model.exists() && text_model.exists()
225}
226
227/// Minimum confidence threshold for NER entities (0.0-1.0)
228/// Note: Lower threshold captures more entities but also more noise
229#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232/// Minimum length for entity names (characters)
233#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236/// Maximum gap (in characters) between adjacent entities to merge
237/// Allows merging "S" + "&" + "P Global" when they're close together
238#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241/// Merge adjacent NER entities that appear to be tokenization artifacts.
242///
243/// The BERT tokenizer splits names at special characters like `&`, producing
244/// separate entities like "S", "&", "P Global" instead of "S&P Global".
245/// This function merges adjacent entities of the same type when they're
246/// contiguous or nearly contiguous in the original text.
247#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249    entities: Vec<memvid_core::ExtractedEntity>,
250    original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252    use memvid_core::ExtractedEntity;
253
254    if entities.is_empty() {
255        return entities;
256    }
257
258    // Sort by byte position
259    let mut sorted: Vec<ExtractedEntity> = entities;
260    sorted.sort_by_key(|e| e.byte_start);
261
262    let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264    for entity in sorted {
265        if let Some(last) = merged.last_mut() {
266            // Check if this entity is adjacent to the previous one and same type
267            let gap = entity.byte_start.saturating_sub(last.byte_end);
268            let same_type = last.entity_type == entity.entity_type;
269
270            // Check what's in the gap (if any)
271            let gap_is_mergeable = if gap == 0 {
272                true
273            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274                // Gap content should be whitespace (but not newlines), punctuation, or connectors
275                let gap_text = &original_text[last.byte_end..entity.byte_start];
276                // Don't merge across newlines - that indicates separate entities
277                if gap_text.contains('\n') || gap_text.contains('\r') {
278                    false
279                } else {
280                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281                }
282            } else {
283                false
284            };
285
286            if same_type && gap_is_mergeable {
287                // Merge: extend the previous entity to include this one
288                let merged_text = if entity.byte_end <= original_text.len() {
289                    original_text[last.byte_start..entity.byte_end].to_string()
290                } else {
291                    format!("{}{}", last.text, entity.text)
292                };
293
294                tracing::debug!(
295                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296                    last.text, entity.text, merged_text, gap
297                );
298
299                last.text = merged_text;
300                last.byte_end = entity.byte_end;
301                // Average the confidence
302                last.confidence = (last.confidence + entity.confidence) / 2.0;
303                continue;
304            }
305        }
306
307        merged.push(entity);
308    }
309
310    merged
311}
312
313/// Filter and clean extracted NER entities to remove noise.
314/// Returns true if the entity should be kept.
315#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317    // Filter by confidence
318    if confidence < NER_MIN_CONFIDENCE {
319        return false;
320    }
321
322    let trimmed = text.trim();
323
324    // Filter by minimum length
325    if trimmed.len() < NER_MIN_ENTITY_LEN {
326        return false;
327    }
328
329    // Filter out entities that are just whitespace or punctuation
330    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331        return false;
332    }
333
334    // Filter out entities containing newlines (malformed extractions)
335    if trimmed.contains('\n') || trimmed.contains('\r') {
336        return false;
337    }
338
339    // Filter out entities that look like partial words (start/end with ##)
340    if trimmed.starts_with("##") || trimmed.ends_with("##") {
341        return false;
342    }
343
344    // Filter out entities ending with period (likely sentence fragments)
345    if trimmed.ends_with('.') {
346        return false;
347    }
348
349    let lower = trimmed.to_lowercase();
350
351    // Filter out common single-letter false positives
352    if trimmed.len() == 1 {
353        return false;
354    }
355
356    // Filter out 2-letter tokens that are common noise
357    if trimmed.len() == 2 {
358        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
359        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360            return true;
361        }
362        // Filter other 2-letter tokens
363        return false;
364    }
365
366    // Filter out common noise words
367    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368        return false;
369    }
370
371    // Filter out partial tokenization artifacts
372    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
373    let words: Vec<&str> = trimmed.split_whitespace().collect();
374    if words.len() >= 2 {
375        let first_word = words[0];
376        // If first word is 1-2 chars and looks like a fragment, filter it
377        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378            // But allow "US Bank", "UK Office", etc.
379            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380                return false;
381            }
382        }
383    }
384
385    // Filter entities that start with lowercase (likely partial words)
386    if let Some(first_char) = trimmed.chars().next() {
387        if first_char.is_lowercase() {
388            return false;
389        }
390    }
391
392    true
393}
394
395/// Parse key=value pairs for tags
396pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397    let (key, value) = s
398        .split_once('=')
399        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400    Ok((key.to_string(), value.to_string()))
401}
402
403/// Lock-related CLI arguments (shared across commands)
404#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406    /// Maximum time to wait for an active writer before failing (ms)
407    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408    pub lock_timeout: u64,
409    /// Attempt a stale takeover if the recorded writer heartbeat has expired
410    #[arg(long = "force", action = ArgAction::SetTrue)]
411    pub force: bool,
412}
413
414/// Arguments for the `put` subcommand
415#[derive(Args)]
416pub struct PutArgs {
417    /// Path to the memory file to append to
418    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419    pub file: PathBuf,
420    /// Emit JSON instead of human-readable output
421    #[arg(long)]
422    pub json: bool,
423    /// Read payload bytes from a file instead of stdin
424    #[arg(long, value_name = "PATH")]
425    pub input: Option<String>,
426    /// Override the derived URI for the document
427    #[arg(long, value_name = "URI")]
428    pub uri: Option<String>,
429    /// Override the derived title for the document
430    #[arg(long, value_name = "TITLE")]
431    pub title: Option<String>,
432    /// Optional POSIX timestamp to store on the frame
433    #[arg(long)]
434    pub timestamp: Option<i64>,
435    /// Track name for the frame
436    #[arg(long)]
437    pub track: Option<String>,
438    /// Kind metadata for the frame
439    #[arg(long)]
440    pub kind: Option<String>,
441    /// Store the payload as a binary video without transcoding
442    #[arg(long, action = ArgAction::SetTrue)]
443    pub video: bool,
444    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
445    #[arg(long, action = ArgAction::SetTrue)]
446    pub transcript: bool,
447    /// Tags to attach in key=value form
448    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449    pub tags: Vec<(String, String)>,
450    /// Free-form labels to attach to the frame
451    #[arg(long = "label", value_name = "LABEL")]
452    pub labels: Vec<String>,
453    /// Additional metadata payloads expressed as JSON
454    #[arg(long = "metadata", value_name = "JSON")]
455    pub metadata: Vec<String>,
456    /// Disable automatic tag generation from extracted text
457    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458    pub no_auto_tag: bool,
459    /// Disable automatic date extraction from content
460    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461    pub no_extract_dates: bool,
462    /// Disable automatic triplet extraction (SPO facts as MemoryCards)
463    #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
464    pub no_extract_triplets: bool,
465    /// Force audio analysis and tagging when ingesting binary data
466    #[arg(long, action = ArgAction::SetTrue)]
467    pub audio: bool,
468    /// Transcribe audio using Whisper and use the transcript for text embedding
469    /// Requires the 'whisper' feature to be enabled
470    #[arg(long, action = ArgAction::SetTrue)]
471    pub transcribe: bool,
472    /// Override the default segment duration (in seconds) when generating audio snippets
473    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
474    pub audio_segment_seconds: Option<u32>,
475    /// Compute semantic embeddings using the installed model
476    /// Increases storage overhead from ~5KB to ~270KB per document
477    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
478    pub embedding: bool,
479    /// Explicitly disable embeddings (default, but kept for clarity)
480    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
481    pub no_embedding: bool,
482    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
483    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
484    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
485    pub embedding_vec: Option<PathBuf>,
486    /// Embedding model identity for the provided `--embedding-vec` vector.
487    ///
488    /// This is written into per-frame `extra_metadata` (`memvid.embedding.*`) so that
489    /// semantic queries can auto-select the correct runtime across CLI/SDKs.
490    ///
491    /// Options: bge-small, bge-base, nomic, gte-large, openai, openai-small, openai-ada, nvidia
492    /// (also accepts canonical IDs like `text-embedding-3-small` and `BAAI/bge-small-en-v1.5`).
493    #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
494    pub embedding_vec_model: Option<String>,
495    /// Enable Product Quantization compression for vectors (16x compression)
496    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
497    /// Only applies when --embedding is enabled
498    #[arg(long, action = ArgAction::SetTrue)]
499    pub vector_compression: bool,
500    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
501    /// This improves retrieval accuracy for preference/personalization queries
502    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
503    #[arg(long, action = ArgAction::SetTrue)]
504    pub contextual: bool,
505    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
506    #[arg(long = "contextual-model", value_name = "MODEL")]
507    pub contextual_model: Option<String>,
508    /// Automatically extract tables from PDF documents
509    /// Uses aggressive mode with fallbacks for best results
510    #[arg(long, action = ArgAction::SetTrue)]
511    pub tables: bool,
512    /// Disable automatic table extraction (when --tables is the default)
513    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
514    pub no_tables: bool,
515    /// Enable CLIP visual embeddings for images and PDF pages
516    /// Allows text-to-image search using natural language queries
517    /// Auto-enabled when CLIP model is installed; use --no-clip to disable
518    #[cfg(feature = "clip")]
519    #[arg(long, action = ArgAction::SetTrue)]
520    pub clip: bool,
521
522    /// Disable CLIP visual embeddings even when model is available
523    #[cfg(feature = "clip")]
524    #[arg(long, action = ArgAction::SetTrue)]
525    pub no_clip: bool,
526
527    /// Replace any existing frame with the same URI instead of inserting a duplicate
528    #[arg(long, conflicts_with = "allow_duplicate")]
529    pub update_existing: bool,
530    /// Allow inserting a new frame even if a frame with the same URI already exists
531    #[arg(long)]
532    pub allow_duplicate: bool,
533
534    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
535    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
536    /// Requires the temporal_enrich feature in memvid-core
537    #[arg(long, action = ArgAction::SetTrue)]
538    pub temporal_enrich: bool,
539
540    /// Bind to a dashboard memory ID (UUID or 24-char ObjectId) for capacity and tracking
541    /// If the file is not already bound, this will sync the capacity ticket from the dashboard
542    #[arg(long = "memory-id", value_name = "ID")]
543    pub memory_id: Option<crate::commands::tickets::MemoryId>,
544
545    /// Build Logic-Mesh entity graph during ingestion (opt-in)
546    /// Extracts entities (people, organizations, locations, etc.) and their relationships
547    /// Enables graph traversal with `memvid follow`
548    /// Requires NER model: `memvid models install --ner distilbert-ner`
549    #[arg(long, action = ArgAction::SetTrue)]
550    pub logic_mesh: bool,
551
552    /// Use the experimental parallel segment builder (requires --features parallel_segments)
553    #[cfg(feature = "parallel_segments")]
554    #[arg(long, action = ArgAction::SetTrue)]
555    pub parallel_segments: bool,
556    /// Force the legacy ingestion path even when the parallel feature is available
557    #[cfg(feature = "parallel_segments")]
558    #[arg(long, action = ArgAction::SetTrue)]
559    pub no_parallel_segments: bool,
560    /// Target tokens per segment when using the parallel builder
561    #[cfg(feature = "parallel_segments")]
562    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
563    pub parallel_segment_tokens: Option<usize>,
564    /// Target pages per segment when using the parallel builder
565    #[cfg(feature = "parallel_segments")]
566    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
567    pub parallel_segment_pages: Option<usize>,
568    /// Worker threads used by the parallel builder
569    #[cfg(feature = "parallel_segments")]
570    #[arg(long = "parallel-threads", value_name = "N")]
571    pub parallel_threads: Option<usize>,
572    /// Worker queue depth used by the parallel builder
573    #[cfg(feature = "parallel_segments")]
574    #[arg(long = "parallel-queue-depth", value_name = "N")]
575    pub parallel_queue_depth: Option<usize>,
576
577    #[command(flatten)]
578    pub lock: LockCliArgs,
579}
580
581#[cfg(feature = "parallel_segments")]
582impl PutArgs {
583    pub fn wants_parallel(&self) -> bool {
584        if self.parallel_segments {
585            return true;
586        }
587        if self.no_parallel_segments {
588            return false;
589        }
590        if let Some(env_flag) = parallel_env_override() {
591            return env_flag;
592        }
593        false
594    }
595
596    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
597        let mut opts = memvid_core::BuildOpts::default();
598        if let Some(tokens) = self.parallel_segment_tokens {
599            opts.segment_tokens = tokens;
600        }
601        if let Some(pages) = self.parallel_segment_pages {
602            opts.segment_pages = pages;
603        }
604        if let Some(threads) = self.parallel_threads {
605            opts.threads = threads;
606        }
607        if let Some(depth) = self.parallel_queue_depth {
608            opts.queue_depth = depth;
609        }
610        opts.sanitize();
611        opts
612    }
613}
614
615#[cfg(not(feature = "parallel_segments"))]
616impl PutArgs {
617    pub fn wants_parallel(&self) -> bool {
618        false
619    }
620}
621
622/// Arguments for the `api-fetch` subcommand
623#[derive(Args)]
624pub struct ApiFetchArgs {
625    /// Path to the memory file to ingest into
626    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
627    pub file: PathBuf,
628    /// Path to the fetch configuration JSON file
629    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
630    pub config: PathBuf,
631    /// Perform a dry run without writing to the memory
632    #[arg(long, action = ArgAction::SetTrue)]
633    pub dry_run: bool,
634    /// Override the configured ingest mode
635    #[arg(long, value_name = "MODE")]
636    pub mode: Option<ApiFetchMode>,
637    /// Override the base URI used when constructing frame URIs
638    #[arg(long, value_name = "URI")]
639    pub uri: Option<String>,
640    /// Emit JSON instead of human-readable output
641    #[arg(long, action = ArgAction::SetTrue)]
642    pub json: bool,
643
644    #[command(flatten)]
645    pub lock: LockCliArgs,
646}
647
648/// Arguments for the `update` subcommand
649#[derive(Args)]
650pub struct UpdateArgs {
651    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
652    pub file: PathBuf,
653    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
654    pub frame_id: Option<u64>,
655    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
656    pub uri: Option<String>,
657    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
658    pub input: Option<PathBuf>,
659    #[arg(long = "set-uri", value_name = "URI")]
660    pub set_uri: Option<String>,
661    #[arg(long, value_name = "TITLE")]
662    pub title: Option<String>,
663    #[arg(long, value_name = "TIMESTAMP")]
664    pub timestamp: Option<i64>,
665    #[arg(long, value_name = "TRACK")]
666    pub track: Option<String>,
667    #[arg(long, value_name = "KIND")]
668    pub kind: Option<String>,
669    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
670    pub tags: Vec<(String, String)>,
671    #[arg(long = "label", value_name = "LABEL")]
672    pub labels: Vec<String>,
673    #[arg(long = "metadata", value_name = "JSON")]
674    pub metadata: Vec<String>,
675    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
676    pub embeddings: bool,
677    #[arg(long)]
678    pub json: bool,
679
680    #[command(flatten)]
681    pub lock: LockCliArgs,
682}
683
684/// Arguments for the `delete` subcommand
685#[derive(Args)]
686pub struct DeleteArgs {
687    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
688    pub file: PathBuf,
689    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
690    pub frame_id: Option<u64>,
691    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
692    pub uri: Option<String>,
693    #[arg(long, action = ArgAction::SetTrue)]
694    pub yes: bool,
695    #[arg(long)]
696    pub json: bool,
697
698    #[command(flatten)]
699    pub lock: LockCliArgs,
700}
701
702/// Handler for `memvid api-fetch`
703pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
704    let command = ApiFetchCommand {
705        file: args.file,
706        config_path: args.config,
707        dry_run: args.dry_run,
708        mode_override: args.mode,
709        uri_override: args.uri,
710        output_json: args.json,
711        lock_timeout_ms: args.lock.lock_timeout,
712        force_lock: args.lock.force,
713    };
714    run_api_fetch(config, command)
715}
716
717/// Handler for `memvid delete`
718pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
719    let mut mem = Memvid::open(&args.file)?;
720    ensure_cli_mutation_allowed(&mem)?;
721    apply_lock_cli(&mut mem, &args.lock);
722    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
723
724    if !args.yes {
725        if let Some(uri) = &frame.uri {
726            println!("About to delete frame {} ({uri})", frame.id);
727        } else {
728            println!("About to delete frame {}", frame.id);
729        }
730        print!("Confirm? [y/N]: ");
731        io::stdout().flush()?;
732        let mut input = String::new();
733        io::stdin().read_line(&mut input)?;
734        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
735        if !confirmed {
736            println!("Aborted");
737            return Ok(());
738        }
739    }
740
741    let seq = mem.delete_frame(frame.id)?;
742    mem.commit()?;
743    let deleted = mem.frame_by_id(frame.id)?;
744
745    if args.json {
746        let json = json!({
747            "frame_id": frame.id,
748            "sequence": seq,
749            "status": frame_status_str(deleted.status),
750        });
751        println!("{}", serde_json::to_string_pretty(&json)?);
752    } else {
753        println!("Deleted frame {} (seq {})", frame.id, seq);
754    }
755
756    Ok(())
757}
758pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
759    // Check if plan allows write operations (blocks expired subscriptions)
760    crate::utils::require_active_plan(config, "put")?;
761
762    let mut mem = Memvid::open(&args.file)?;
763    ensure_cli_mutation_allowed(&mem)?;
764    apply_lock_cli(&mut mem, &args.lock);
765
766    // If memory-id is provided and file isn't already bound, bind it now
767    if let Some(memory_id) = &args.memory_id {
768        let needs_binding = match mem.get_memory_binding() {
769            Some(binding) => binding.memory_id != **memory_id,
770            None => true,
771        };
772        if needs_binding {
773            match crate::commands::creation::bind_to_dashboard_memory(config, &mut mem, &args.file, memory_id) {
774                Ok((bound_id, capacity)) => {
775                    eprintln!("✓ Bound to dashboard memory: {} (capacity: {})", bound_id, crate::utils::format_bytes(capacity));
776                }
777                Err(e) => {
778                    eprintln!("⚠️  Failed to bind to dashboard memory: {}", e);
779                    eprintln!("   Continuing with existing capacity. Bind manually with:");
780                    eprintln!("   memvid tickets sync {} --memory-id {}", args.file.display(), memory_id);
781                }
782            }
783        }
784    }
785
786    // Load active replay session if one exists
787    #[cfg(feature = "replay")]
788    let _ = mem.load_active_session();
789    let mut stats = mem.stats()?;
790    if stats.frame_count == 0 && !stats.has_lex_index {
791        mem.enable_lex()?;
792        stats = mem.stats()?;
793    }
794
795    // Check if current memory size exceeds free tier limit
796    // If so, require API key to continue
797    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
798
799    // Set vector compression mode if requested
800    if args.vector_compression {
801        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
802    }
803
804    let mut capacity_guard = CapacityGuard::from_stats(&stats);
805    let use_parallel = args.wants_parallel();
806    #[cfg(feature = "parallel_segments")]
807    let mut parallel_opts = if use_parallel {
808        Some(args.sanitized_parallel_opts())
809    } else {
810        None
811    };
812    #[cfg(feature = "parallel_segments")]
813    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
814    #[cfg(feature = "parallel_segments")]
815    let mut pending_parallel_indices: Vec<usize> = Vec::new();
816    let input_set = resolve_inputs(args.input.as_deref())?;
817    if args.embedding && args.embedding_vec.is_some() {
818        anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
819    }
820    if args.embedding_vec.is_some() {
821        match &input_set {
822            InputSet::Files(paths) if paths.len() != 1 => {
823                anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
824            }
825            _ => {}
826        }
827    }
828
829    if args.video {
830        if let Some(kind) = &args.kind {
831            if !kind.eq_ignore_ascii_case("video") {
832                anyhow::bail!("--video conflicts with explicit --kind={kind}");
833            }
834        }
835        if matches!(input_set, InputSet::Stdin) {
836            anyhow::bail!("--video requires --input <file>");
837        }
838    }
839
840    #[allow(dead_code)]
841    enum EmbeddingMode {
842        None,
843        Auto,
844        Explicit,
845    }
846
847    // PHASE 1: Make embeddings opt-in, not opt-out
848    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
849    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
850    let mv2_dimension = mem.effective_vec_index_dimension()?;
851    let inferred_embedding_model = if args.embedding {
852        match mem.embedding_identity_summary(10_000) {
853            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
854            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
855                let models: Vec<_> = identities
856                    .iter()
857                    .filter_map(|entry| entry.identity.model.as_deref())
858                    .collect();
859                anyhow::bail!(
860                    "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
861                    Detected models: {:?}\n\n\
862                    Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
863                    models
864                );
865            }
866            memvid_core::EmbeddingIdentitySummary::Unknown => None,
867        }
868    } else {
869        None
870    };
871    let (embedding_mode, embedding_runtime) = if args.embedding {
872        (
873            EmbeddingMode::Explicit,
874            Some(load_embedding_runtime_for_mv2(
875                config,
876                inferred_embedding_model.as_deref(),
877                mv2_dimension,
878            )?),
879        )
880    } else if args.embedding_vec.is_some() {
881        (EmbeddingMode::Explicit, None)
882    } else {
883        (EmbeddingMode::None, None)
884    };
885    let embedding_enabled = args.embedding || args.embedding_vec.is_some();
886    let runtime_ref = embedding_runtime.as_ref();
887
888    // Enable CLIP index if:
889    // 1. --clip flag is set explicitly, OR
890    // 2. Input contains image files and model is available, OR
891    // 3. Model is installed (auto-detect) and --no-clip is not set
892    #[cfg(feature = "clip")]
893    let clip_enabled = {
894        if args.no_clip {
895            false
896        } else if args.clip {
897            true // Explicitly requested
898        } else {
899            // Auto-detect: check if model is installed
900            let model_available = is_clip_model_installed();
901            if model_available {
902                // Model is installed, check if input has images or PDFs
903                match &input_set {
904                    InputSet::Files(paths) => paths.iter().any(|p| {
905                        is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
906                    }),
907                    InputSet::Stdin => false,
908                }
909            } else {
910                false
911            }
912        }
913    };
914    #[cfg(feature = "clip")]
915    let clip_model = if clip_enabled {
916        mem.enable_clip()?;
917        if !args.json {
918            let mode = if args.clip {
919                "explicit"
920            } else {
921                "auto-detected"
922            };
923            eprintln!("ℹ️  CLIP visual embeddings enabled ({mode})");
924            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
925            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
926            eprintln!("   Use --no-clip to disable auto-detection");
927            eprintln!();
928        }
929        // Initialize CLIP model for generating image embeddings
930        match memvid_core::ClipModel::default_model() {
931            Ok(model) => Some(model),
932            Err(e) => {
933                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
934                None
935            }
936        }
937    } else {
938        None
939    };
940
941    // Warn user about vector storage overhead (PHASE 2)
942    if embedding_enabled && !args.json {
943        let capacity = stats.capacity_bytes;
944        if args.vector_compression {
945            // PHASE 3: PQ compression reduces storage 16x
946            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
947            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
948            eprintln!("   Storage: ~20 KB per document (16x compressed)");
949            eprintln!("   Accuracy: ~95% recall@10 maintained");
950            eprintln!(
951                "   Capacity: ~{} documents max for {:.1} GB",
952                max_docs,
953                capacity as f64 / 1e9
954            );
955            eprintln!();
956        } else {
957            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
958            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
959            eprintln!("   Storage: ~270 KB per document");
960            eprintln!(
961                "   Capacity: ~{} documents max for {:.1} GB",
962                max_docs,
963                capacity as f64 / 1e9
964            );
965            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
966            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
967            eprintln!();
968        }
969    }
970
971    let embed_progress = if embedding_enabled {
972        let total = match &input_set {
973            InputSet::Files(paths) => Some(paths.len() as u64),
974            InputSet::Stdin => None,
975        };
976        Some(create_task_progress_bar(total, "embed", false))
977    } else {
978        None
979    };
980    let mut embedded_docs = 0usize;
981
982    if let InputSet::Files(paths) = &input_set {
983        if paths.len() > 1 {
984            if args.uri.is_some() || args.title.is_some() {
985                anyhow::bail!(
986                    "--uri/--title apply to a single file; omit them when using a directory or glob"
987                );
988            }
989        }
990    }
991
992    let mut reports = Vec::new();
993    let mut processed = 0usize;
994    let mut skipped = 0usize;
995    let mut bytes_added = 0u64;
996    let mut summary_warnings: Vec<String> = Vec::new();
997    #[cfg(feature = "clip")]
998    let mut clip_embeddings_added = false;
999
1000    let mut capacity_reached = false;
1001    match input_set {
1002        InputSet::Stdin => {
1003            processed += 1;
1004            match read_payload(None) {
1005                Ok(payload) => {
1006                    let mut analyze_spinner = if args.json {
1007                        None
1008                    } else {
1009                        Some(create_spinner("Analyzing stdin payload..."))
1010                    };
1011                    match ingest_payload(
1012                        &mut mem,
1013                        &args,
1014                        config,
1015                        payload,
1016                        None,
1017                        capacity_guard.as_mut(),
1018                        embedding_enabled,
1019                        runtime_ref,
1020                        use_parallel,
1021                    ) {
1022                        Ok(outcome) => {
1023                            if let Some(pb) = analyze_spinner.take() {
1024                                pb.finish_and_clear();
1025                            }
1026                            bytes_added += outcome.report.stored_bytes as u64;
1027                            if args.json {
1028                                summary_warnings
1029                                    .extend(outcome.report.extraction.warnings.iter().cloned());
1030                            }
1031                            reports.push(outcome.report);
1032                            let report_index = reports.len() - 1;
1033                            if !args.json && !use_parallel {
1034                                if let Some(report) = reports.get(report_index) {
1035                                    print_report(report);
1036                                }
1037                            }
1038                            if args.transcript {
1039                                let notice = transcript_notice_message(&mut mem)?;
1040                                if args.json {
1041                                    summary_warnings.push(notice);
1042                                } else {
1043                                    println!("{notice}");
1044                                }
1045                            }
1046                            if outcome.embedded {
1047                                if let Some(pb) = embed_progress.as_ref() {
1048                                    pb.inc(1);
1049                                }
1050                                embedded_docs += 1;
1051                            }
1052                            if use_parallel {
1053                                #[cfg(feature = "parallel_segments")]
1054                                if let Some(input) = outcome.parallel_input {
1055                                    pending_parallel_indices.push(report_index);
1056                                    pending_parallel_inputs.push(input);
1057                                }
1058                            } else if let Some(guard) = capacity_guard.as_mut() {
1059                                mem.commit()?;
1060                                let stats = mem.stats()?;
1061                                guard.update_after_commit(&stats);
1062                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1063                            }
1064                        }
1065                        Err(err) => {
1066                            if let Some(pb) = analyze_spinner.take() {
1067                                pb.finish_and_clear();
1068                            }
1069                            if err.downcast_ref::<DuplicateUriError>().is_some() {
1070                                return Err(err);
1071                            }
1072                            warn!("skipped stdin payload: {err}");
1073                            skipped += 1;
1074                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1075                                capacity_reached = true;
1076                            }
1077                        }
1078                    }
1079                }
1080                Err(err) => {
1081                    warn!("failed to read stdin: {err}");
1082                    skipped += 1;
1083                }
1084            }
1085        }
1086        InputSet::Files(ref paths) => {
1087            let mut transcript_notice_printed = false;
1088            for path in paths {
1089                processed += 1;
1090                match read_payload(Some(&path)) {
1091                    Ok(payload) => {
1092                        let label = path.display().to_string();
1093                        let mut analyze_spinner = if args.json {
1094                            None
1095                        } else {
1096                            Some(create_spinner(&format!("Analyzing {label}...")))
1097                        };
1098                        match ingest_payload(
1099                            &mut mem,
1100                            &args,
1101                            config,
1102                            payload,
1103                            Some(&path),
1104                            capacity_guard.as_mut(),
1105                            embedding_enabled,
1106                            runtime_ref,
1107                            use_parallel,
1108                        ) {
1109                            Ok(outcome) => {
1110                                if let Some(pb) = analyze_spinner.take() {
1111                                    pb.finish_and_clear();
1112                                }
1113                                bytes_added += outcome.report.stored_bytes as u64;
1114
1115                                // Generate CLIP embedding for image files
1116                                #[cfg(feature = "clip")]
1117                                if let Some(ref model) = clip_model {
1118                                    let mime = outcome.report.mime.as_deref();
1119                                    let clip_frame_id = outcome.report.frame_id;
1120
1121                                    if is_image_file(&path) {
1122                                        match model.encode_image_file(&path) {
1123                                            Ok(embedding) => {
1124                                                if let Err(e) = mem.add_clip_embedding_with_page(
1125                                                    clip_frame_id,
1126                                                    None,
1127                                                    embedding,
1128                                                ) {
1129                                                    warn!(
1130                                                        "Failed to add CLIP embedding for {}: {e}",
1131                                                        path.display()
1132                                                    );
1133                                                } else {
1134                                                    info!(
1135                                                        "Added CLIP embedding for frame {}",
1136                                                        clip_frame_id
1137                                                    );
1138                                                    clip_embeddings_added = true;
1139                                                }
1140                                            }
1141                                            Err(e) => {
1142                                                warn!(
1143                                                    "Failed to encode CLIP embedding for {}: {e}",
1144                                                    path.display()
1145                                                );
1146                                            }
1147                                        }
1148                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1149                                        // Commit before adding child frames to ensure WAL has space
1150                                        if let Err(e) = mem.commit() {
1151                                            warn!("Failed to commit before CLIP extraction: {e}");
1152                                        }
1153
1154                                        match render_pdf_pages_for_clip(
1155                                            &path,
1156                                            CLIP_PDF_MAX_IMAGES,
1157                                            CLIP_PDF_TARGET_PX,
1158                                        ) {
1159                                            Ok(rendered_pages) => {
1160                                                use rayon::prelude::*;
1161
1162                                                // Pre-encode all images to PNG in parallel for better performance
1163                                                // Filter out junk images (solid colors, black, too small)
1164                                                let path_for_closure = path.clone();
1165
1166                                                // Temporarily suppress panic output during image encoding
1167                                                // (some PDFs have malformed images that cause panics in the image crate)
1168                                                let prev_hook = std::panic::take_hook();
1169                                                std::panic::set_hook(Box::new(|_| {
1170                                                    // Silently ignore panics - we handle them with catch_unwind
1171                                                }));
1172
1173                                                let encoded_images: Vec<_> = rendered_pages
1174                                                    .into_par_iter()
1175                                                    .filter_map(|(page_num, image)| {
1176                                                        // Check if image is worth embedding (not junk)
1177                                                        let image_info = get_image_info(&image);
1178                                                        if !image_info.should_embed() {
1179                                                            info!(
1180                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1181                                                                path_for_closure.display(),
1182                                                                page_num,
1183                                                                image_info.color_variance,
1184                                                                image_info.width,
1185                                                                image_info.height
1186                                                            );
1187                                                            return None;
1188                                                        }
1189
1190                                                        // Convert to RGB8 first to ensure consistent buffer size
1191                                                        // (some PDFs have images with alpha or other formats that cause panics)
1192                                                        // Use catch_unwind to handle any panics from corrupted image data
1193                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1194                                                            let rgb_image = image.to_rgb8();
1195                                                            let mut png_bytes = Vec::new();
1196                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1197                                                                &mut Cursor::new(&mut png_bytes),
1198                                                                image::ImageFormat::Png,
1199                                                            ).map(|()| png_bytes)
1200                                                        }));
1201
1202                                                        match encode_result {
1203                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1204                                                            Ok(Err(e)) => {
1205                                                                info!(
1206                                                                    "Skipping image for {} page {}: {e}",
1207                                                                    path_for_closure.display(),
1208                                                                    page_num
1209                                                                );
1210                                                                None
1211                                                            }
1212                                                            Err(_) => {
1213                                                                // Panic occurred - corrupted image data, silently skip
1214                                                                info!(
1215                                                                    "Skipping malformed image for {} page {}",
1216                                                                    path_for_closure.display(),
1217                                                                    page_num
1218                                                                );
1219                                                                None
1220                                                            }
1221                                                        }
1222                                                    })
1223                                                    .collect();
1224
1225                                                // Restore the original panic hook
1226                                                std::panic::set_hook(prev_hook);
1227
1228                                                // Process encoded images sequentially (storage + CLIP encoding)
1229                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1230                                                let mut child_frame_offset = 1u64;
1231                                                let parent_uri = outcome.report.uri.clone();
1232                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1233                                                let total_images = encoded_images.len();
1234
1235                                                // Show progress bar for CLIP extraction
1236                                                let clip_pb = if !args.json && total_images > 0 {
1237                                                    let pb = ProgressBar::new(total_images as u64);
1238                                                    pb.set_style(
1239                                                        ProgressStyle::with_template(
1240                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1241                                                        )
1242                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1243                                                        .progress_chars("█▓░"),
1244                                                    );
1245                                                    Some(pb)
1246                                                } else {
1247                                                    None
1248                                                };
1249
1250                                                for (page_num, image, png_bytes) in encoded_images {
1251                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1252                                                    let child_title = format!(
1253                                                        "{} - Image {} (page {})",
1254                                                        parent_title,
1255                                                        child_frame_offset,
1256                                                        page_num
1257                                                    );
1258
1259                                                    let child_options = PutOptions::builder()
1260                                                        .uri(&child_uri)
1261                                                        .title(&child_title)
1262                                                        .parent_id(clip_frame_id)
1263                                                        .role(FrameRole::ExtractedImage)
1264                                                        .auto_tag(false)
1265                                                        .extract_dates(false)
1266                                                        .build();
1267
1268                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1269                                                        Ok(_child_seq) => {
1270                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1271                                                            child_frame_offset += 1;
1272
1273                                                            match model.encode_image(&image) {
1274                                                                Ok(embedding) => {
1275                                                                    if let Err(e) = mem
1276                                                                        .add_clip_embedding_with_page(
1277                                                                            child_frame_id,
1278                                                                            Some(page_num),
1279                                                                            embedding,
1280                                                                        )
1281                                                                    {
1282                                                                        warn!(
1283                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1284                                                                            path.display(),
1285                                                                            page_num
1286                                                                        );
1287                                                                    } else {
1288                                                                        info!(
1289                                                                            "Added CLIP image frame {} for {} page {}",
1290                                                                            child_frame_id,
1291                                                                            clip_frame_id,
1292                                                                            page_num
1293                                                                        );
1294                                                                        clip_embeddings_added = true;
1295                                                                    }
1296                                                                }
1297                                                                Err(e) => warn!(
1298                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1299                                                                    path.display(),
1300                                                                    page_num
1301                                                                ),
1302                                                            }
1303
1304                                                            // Commit after each image to checkpoint WAL
1305                                                            if let Err(e) = mem.commit() {
1306                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1307                                                            }
1308                                                        }
1309                                                        Err(e) => warn!(
1310                                                            "Failed to store image frame for {} page {}: {e}",
1311                                                            path.display(),
1312                                                            page_num
1313                                                        ),
1314                                                    }
1315
1316                                                    if let Some(ref pb) = clip_pb {
1317                                                        pb.inc(1);
1318                                                    }
1319                                                }
1320
1321                                                if let Some(pb) = clip_pb {
1322                                                    pb.finish_and_clear();
1323                                                }
1324                                            }
1325                                            Err(err) => warn!(
1326                                                "Failed to render PDF pages for CLIP {}: {err}",
1327                                                path.display()
1328                                            ),
1329                                        }
1330                                    }
1331                                }
1332
1333                                if args.json {
1334                                    summary_warnings
1335                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1336                                }
1337                                reports.push(outcome.report);
1338                                let report_index = reports.len() - 1;
1339                                if !args.json && !use_parallel {
1340                                    if let Some(report) = reports.get(report_index) {
1341                                        print_report(report);
1342                                    }
1343                                }
1344                                if args.transcript && !transcript_notice_printed {
1345                                    let notice = transcript_notice_message(&mut mem)?;
1346                                    if args.json {
1347                                        summary_warnings.push(notice);
1348                                    } else {
1349                                        println!("{notice}");
1350                                    }
1351                                    transcript_notice_printed = true;
1352                                }
1353                                if outcome.embedded {
1354                                    if let Some(pb) = embed_progress.as_ref() {
1355                                        pb.inc(1);
1356                                    }
1357                                    embedded_docs += 1;
1358                                }
1359                                if use_parallel {
1360                                    #[cfg(feature = "parallel_segments")]
1361                                    if let Some(input) = outcome.parallel_input {
1362                                        pending_parallel_indices.push(report_index);
1363                                        pending_parallel_inputs.push(input);
1364                                    }
1365                                } else if let Some(guard) = capacity_guard.as_mut() {
1366                                    mem.commit()?;
1367                                    let stats = mem.stats()?;
1368                                    guard.update_after_commit(&stats);
1369                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1370                                }
1371                            }
1372                            Err(err) => {
1373                                if let Some(pb) = analyze_spinner.take() {
1374                                    pb.finish_and_clear();
1375                                }
1376                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1377                                    return Err(err);
1378                                }
1379                                warn!("skipped {}: {err}", path.display());
1380                                skipped += 1;
1381                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1382                                    capacity_reached = true;
1383                                }
1384                            }
1385                        }
1386                    }
1387                    Err(err) => {
1388                        warn!("failed to read {}: {err}", path.display());
1389                        skipped += 1;
1390                    }
1391                }
1392                if capacity_reached {
1393                    break;
1394                }
1395            }
1396        }
1397    }
1398
1399    if capacity_reached {
1400        warn!("capacity limit reached; remaining inputs were not processed");
1401    }
1402
1403    if let Some(pb) = embed_progress.as_ref() {
1404        pb.finish_with_message("embed");
1405    }
1406
1407    if let Some(runtime) = embedding_runtime.as_ref() {
1408        if embedded_docs > 0 {
1409            match embedding_mode {
1410                EmbeddingMode::Explicit => println!(
1411                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1412                    embedded_docs,
1413                    runtime.dimension()
1414                ),
1415                EmbeddingMode::Auto => println!(
1416                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1417                    embedded_docs,
1418                    runtime.dimension()
1419                ),
1420                EmbeddingMode::None => {}
1421            }
1422        } else {
1423            match embedding_mode {
1424                EmbeddingMode::Explicit => {
1425                    println!("No embeddings were generated (no textual content available)");
1426                }
1427                EmbeddingMode::Auto => {
1428                    println!(
1429                        "Semantic runtime available, but no textual content produced embeddings"
1430                    );
1431                }
1432                EmbeddingMode::None => {}
1433            }
1434        }
1435    }
1436
1437    if reports.is_empty() {
1438        if args.json {
1439            if capacity_reached {
1440                summary_warnings.push("capacity_limit_reached".to_string());
1441            }
1442            let summary_json = json!({
1443                "processed": processed,
1444                "ingested": 0,
1445                "skipped": skipped,
1446                "bytes_added": bytes_added,
1447                "bytes_added_human": format_bytes(bytes_added),
1448                "embedded_documents": embedded_docs,
1449                "capacity_reached": capacity_reached,
1450                "warnings": summary_warnings,
1451                "reports": Vec::<serde_json::Value>::new(),
1452            });
1453            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1454        } else {
1455            println!(
1456                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1457                processed, skipped
1458            );
1459        }
1460        return Ok(());
1461    }
1462
1463    #[cfg(feature = "parallel_segments")]
1464    let mut parallel_flush_spinner = if use_parallel && !args.json {
1465        Some(create_spinner("Flushing parallel segments..."))
1466    } else {
1467        None
1468    };
1469
1470    if use_parallel {
1471        #[cfg(feature = "parallel_segments")]
1472        {
1473            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1474                let mut opts = BuildOpts::default();
1475                opts.sanitize();
1476                opts
1477            });
1478            // Set vector compression mode from mem state
1479            opts.vec_compression = mem.vector_compression().clone();
1480            let seqs = if pending_parallel_inputs.is_empty() {
1481                mem.commit_parallel(opts)?;
1482                Vec::new()
1483            } else {
1484                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1485            };
1486            if let Some(pb) = parallel_flush_spinner.take() {
1487                pb.finish_with_message("Flushed parallel segments");
1488            }
1489            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1490                if let Some(report) = reports.get_mut(idx) {
1491                    report.seq = seq;
1492                }
1493            }
1494        }
1495        #[cfg(not(feature = "parallel_segments"))]
1496        {
1497            mem.commit()?;
1498        }
1499    } else {
1500        mem.commit()?;
1501    }
1502
1503    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1504    #[cfg(feature = "clip")]
1505    if clip_embeddings_added {
1506        mem.commit()?;
1507    }
1508
1509    if !args.json && use_parallel {
1510        for report in &reports {
1511            print_report(report);
1512        }
1513    }
1514
1515    // Table extraction for PDF files
1516    let mut tables_extracted = 0usize;
1517    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1518    if args.tables && !args.no_tables {
1519        if let InputSet::Files(ref paths) = input_set {
1520            let pdf_paths: Vec<_> = paths
1521                .iter()
1522                .filter(|p| {
1523                    p.extension()
1524                        .and_then(|e| e.to_str())
1525                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1526                        .unwrap_or(false)
1527                })
1528                .collect();
1529
1530            if !pdf_paths.is_empty() {
1531                let table_spinner = if args.json {
1532                    None
1533                } else {
1534                    Some(create_spinner(&format!(
1535                        "Extracting tables from {} PDF(s)...",
1536                        pdf_paths.len()
1537                    )))
1538                };
1539
1540                // Use aggressive mode with auto fallback for best results
1541                let table_options = TableExtractionOptions::builder()
1542                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1543                    .min_rows(2)
1544                    .min_cols(2)
1545                    .min_quality(memvid_core::table::TableQuality::Low)
1546                    .merge_multi_page(true)
1547                    .build();
1548
1549                for pdf_path in pdf_paths {
1550                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1551                        let filename = pdf_path
1552                            .file_name()
1553                            .and_then(|s| s.to_str())
1554                            .unwrap_or("unknown.pdf");
1555
1556                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1557                            for table in &result.tables {
1558                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1559                                {
1560                                    tables_extracted += 1;
1561                                    table_summaries.push(json!({
1562                                        "table_id": table.table_id,
1563                                        "source_file": table.source_file,
1564                                        "meta_frame_id": meta_id,
1565                                        "rows": table.n_rows,
1566                                        "cols": table.n_cols,
1567                                        "quality": format!("{:?}", table.quality),
1568                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1569                                    }));
1570                                }
1571                            }
1572                        }
1573                    }
1574                }
1575
1576                if let Some(pb) = table_spinner {
1577                    if tables_extracted > 0 {
1578                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1579                    } else {
1580                        pb.finish_with_message("No tables found");
1581                    }
1582                }
1583
1584                // Commit table frames
1585                if tables_extracted > 0 {
1586                    mem.commit()?;
1587                }
1588            }
1589        }
1590    }
1591
1592    // Logic-Mesh: Extract entities and build relationship graph
1593    // Opt-in only: requires --logic-mesh flag
1594    #[cfg(feature = "logic_mesh")]
1595    let mut logic_mesh_entities = 0usize;
1596    #[cfg(feature = "logic_mesh")]
1597    {
1598        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1599
1600        let models_dir = config.models_dir.clone();
1601        let model_path = ner_model_path(&models_dir);
1602        let tokenizer_path = ner_tokenizer_path(&models_dir);
1603
1604        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1605        let ner_available = model_path.exists() && tokenizer_path.exists();
1606        let should_run_ner = args.logic_mesh;
1607
1608        if should_run_ner && !ner_available {
1609            // User explicitly requested --logic-mesh but model not installed
1610            if args.json {
1611                summary_warnings.push(
1612                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1613                );
1614            } else {
1615                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1616                eprintln!("   Run: memvid models install --ner distilbert-ner");
1617            }
1618        } else if should_run_ner {
1619            let ner_spinner = if args.json {
1620                None
1621            } else {
1622                Some(create_spinner("Building Logic-Mesh entity graph..."))
1623            };
1624
1625            match NerModel::load(&model_path, &tokenizer_path, None) {
1626                Ok(mut ner_model) => {
1627                    let mut mesh = LogicMesh::new();
1628                    let mut filtered_count = 0usize;
1629
1630                    // Process each ingested frame for entity extraction using cached search_text
1631                    for report in &reports {
1632                        let frame_id = report.frame_id;
1633                        // Use the cached search_text from ingestion
1634                        if let Some(ref content) = report.search_text {
1635                            if !content.is_empty() {
1636                                match ner_model.extract(content) {
1637                                    Ok(raw_entities) => {
1638                                        // Merge adjacent entities that were split by tokenization
1639                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1640                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1641
1642                                        for entity in &merged_entities {
1643                                            // Apply post-processing filter to remove noisy entities
1644                                            if !is_valid_entity(&entity.text, entity.confidence) {
1645                                                tracing::debug!(
1646                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1647                                                    entity.text, entity.confidence * 100.0
1648                                                );
1649                                                filtered_count += 1;
1650                                                continue;
1651                                            }
1652
1653                                            // Convert NER entity type to EntityKind
1654                                            let kind = entity.to_entity_kind();
1655                                            // Create mesh node with proper structure
1656                                            let node = MeshNode::new(
1657                                                entity.text.to_lowercase(),
1658                                                entity.text.trim().to_string(),
1659                                                kind,
1660                                                entity.confidence,
1661                                                frame_id,
1662                                                entity.byte_start as u32,
1663                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1664                                            );
1665                                            mesh.merge_node(node);
1666                                            logic_mesh_entities += 1;
1667                                        }
1668                                    }
1669                                    Err(e) => {
1670                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1671                                    }
1672                                }
1673                            }
1674                        }
1675                    }
1676
1677                    if logic_mesh_entities > 0 {
1678                        mesh.finalize();
1679                        let node_count = mesh.stats().node_count;
1680                        // Persist mesh to MV2 file
1681                        mem.set_logic_mesh(mesh);
1682                        mem.commit()?;
1683                        info!(
1684                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1685                            logic_mesh_entities,
1686                            node_count,
1687                            filtered_count
1688                        );
1689                    }
1690
1691                    if let Some(pb) = ner_spinner {
1692                        pb.finish_with_message(format!(
1693                            "Logic-Mesh: {} entities ({} unique)",
1694                            logic_mesh_entities,
1695                            mem.mesh_node_count()
1696                        ));
1697                    }
1698                }
1699                Err(e) => {
1700                    if let Some(pb) = ner_spinner {
1701                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1702                    }
1703                    if args.json {
1704                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1705                    }
1706                }
1707            }
1708        }
1709    }
1710
1711    let stats = mem.stats()?;
1712    if args.json {
1713        if capacity_reached {
1714            summary_warnings.push("capacity_limit_reached".to_string());
1715        }
1716        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1717        let total_duration_ms: Option<u64> = {
1718            let sum: u64 = reports
1719                .iter()
1720                .filter_map(|r| r.extraction.duration_ms)
1721                .sum();
1722            if sum > 0 {
1723                Some(sum)
1724            } else {
1725                None
1726            }
1727        };
1728        let total_pages: Option<u32> = {
1729            let sum: u32 = reports
1730                .iter()
1731                .filter_map(|r| r.extraction.pages_processed)
1732                .sum();
1733            if sum > 0 {
1734                Some(sum)
1735            } else {
1736                None
1737            }
1738        };
1739        let embedding_mode_str = match embedding_mode {
1740            EmbeddingMode::None => "none",
1741            EmbeddingMode::Auto => "auto",
1742            EmbeddingMode::Explicit => "explicit",
1743        };
1744        let summary_json = json!({
1745            "processed": processed,
1746            "ingested": reports.len(),
1747            "skipped": skipped,
1748            "bytes_added": bytes_added,
1749            "bytes_added_human": format_bytes(bytes_added),
1750            "embedded_documents": embedded_docs,
1751            "embedding": {
1752                "enabled": embedding_enabled,
1753                "mode": embedding_mode_str,
1754                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1755            },
1756            "tables": {
1757                "enabled": args.tables && !args.no_tables,
1758                "extracted": tables_extracted,
1759                "tables": table_summaries,
1760            },
1761            "capacity_reached": capacity_reached,
1762            "warnings": summary_warnings,
1763            "extraction": {
1764                "total_duration_ms": total_duration_ms,
1765                "total_pages_processed": total_pages,
1766            },
1767            "memory": {
1768                "path": args.file.display().to_string(),
1769                "frame_count": stats.frame_count,
1770                "size_bytes": stats.size_bytes,
1771                "tier": format!("{:?}", stats.tier),
1772            },
1773            "reports": reports_json,
1774        });
1775        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1776    } else {
1777        println!(
1778            "Committed {} frame(s); total frames {}",
1779            reports.len(),
1780            stats.frame_count
1781        );
1782        println!(
1783            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1784            processed,
1785            reports.len(),
1786            skipped,
1787            format_bytes(bytes_added)
1788        );
1789        if tables_extracted > 0 {
1790            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1791        }
1792    }
1793
1794    // Save active replay session if one exists
1795    #[cfg(feature = "replay")]
1796    let _ = mem.save_active_session();
1797
1798    Ok(())
1799}
1800
1801pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1802    let mut mem = Memvid::open(&args.file)?;
1803    ensure_cli_mutation_allowed(&mem)?;
1804    apply_lock_cli(&mut mem, &args.lock);
1805    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1806    let frame_id = existing.id;
1807
1808    let payload_bytes = match args.input.as_ref() {
1809        Some(path) => Some(read_payload(Some(path))?),
1810        None => None,
1811    };
1812    let payload_slice = payload_bytes.as_deref();
1813    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1814    let source_path = args.input.as_deref();
1815
1816    let mut options = PutOptions::default();
1817    options.enable_embedding = false;
1818    options.auto_tag = payload_slice.is_some();
1819    options.extract_dates = payload_slice.is_some();
1820    options.timestamp = Some(existing.timestamp);
1821    options.track = existing.track.clone();
1822    options.kind = existing.kind.clone();
1823    options.uri = existing.uri.clone();
1824    options.title = existing.title.clone();
1825    options.metadata = existing.metadata.clone();
1826    options.search_text = existing.search_text.clone();
1827    options.tags = existing.tags.clone();
1828    options.labels = existing.labels.clone();
1829    options.extra_metadata = existing.extra_metadata.clone();
1830
1831    if let Some(new_uri) = &args.set_uri {
1832        options.uri = Some(derive_uri(Some(new_uri), None));
1833    }
1834
1835    if options.uri.is_none() && payload_slice.is_some() {
1836        options.uri = Some(derive_uri(None, source_path));
1837    }
1838
1839    if let Some(title) = &args.title {
1840        options.title = Some(title.clone());
1841    }
1842    if let Some(ts) = args.timestamp {
1843        options.timestamp = Some(ts);
1844    }
1845    if let Some(track) = &args.track {
1846        options.track = Some(track.clone());
1847    }
1848    if let Some(kind) = &args.kind {
1849        options.kind = Some(kind.clone());
1850    }
1851
1852    if !args.labels.is_empty() {
1853        options.labels = args.labels.clone();
1854    }
1855
1856    if !args.tags.is_empty() {
1857        options.tags.clear();
1858        for (key, value) in &args.tags {
1859            if !key.is_empty() {
1860                options.tags.push(key.clone());
1861            }
1862            if !value.is_empty() && value != key {
1863                options.tags.push(value.clone());
1864            }
1865            options.extra_metadata.insert(key.clone(), value.clone());
1866        }
1867    }
1868
1869    apply_metadata_overrides(&mut options, &args.metadata);
1870
1871    let mut search_source = options.search_text.clone();
1872
1873    if let Some(payload) = payload_slice {
1874        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1875        if let Some(title) = inferred_title {
1876            options.title = Some(title);
1877        }
1878
1879        if options.uri.is_none() {
1880            options.uri = Some(derive_uri(None, source_path));
1881        }
1882
1883        let analysis = analyze_file(
1884            source_path,
1885            payload,
1886            payload_utf8,
1887            options.title.as_deref(),
1888            AudioAnalyzeOptions::default(),
1889            false,
1890        );
1891        if let Some(meta) = analysis.metadata {
1892            options.metadata = Some(meta);
1893        }
1894        if let Some(text) = analysis.search_text {
1895            search_source = Some(text.clone());
1896            options.search_text = Some(text);
1897        }
1898    } else {
1899        options.auto_tag = false;
1900        options.extract_dates = false;
1901    }
1902
1903    let stats = mem.stats()?;
1904    options.enable_embedding = stats.has_vec_index;
1905
1906    // Auto-detect embedding model from existing MV2 dimension
1907    let mv2_dimension = mem.effective_vec_index_dimension()?;
1908    let mut embedding: Option<Vec<f32>> = None;
1909    if args.embeddings {
1910        let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1911            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1912            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1913                let models: Vec<_> = identities
1914                    .iter()
1915                    .filter_map(|entry| entry.identity.model.as_deref())
1916                    .collect();
1917                anyhow::bail!(
1918                    "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1919                    Detected models: {:?}",
1920                    models
1921                );
1922            }
1923            memvid_core::EmbeddingIdentitySummary::Unknown => None,
1924        };
1925
1926        let runtime = load_embedding_runtime_for_mv2(
1927            config,
1928            inferred_embedding_model.as_deref(),
1929            mv2_dimension,
1930        )?;
1931        if let Some(text) = search_source.as_ref() {
1932            if !text.trim().is_empty() {
1933                embedding = Some(runtime.embed_passage(text)?);
1934            }
1935        }
1936        if embedding.is_none() {
1937            if let Some(text) = payload_utf8 {
1938                if !text.trim().is_empty() {
1939                    embedding = Some(runtime.embed_passage(text)?);
1940                }
1941            }
1942        }
1943        if embedding.is_none() {
1944            warn!("no textual content available; embeddings not recomputed");
1945        }
1946        if embedding.is_some() {
1947            apply_embedding_identity_metadata(&mut options, &runtime);
1948        }
1949    }
1950
1951    let mut effective_embedding = embedding;
1952    if effective_embedding.is_none() && stats.has_vec_index {
1953        effective_embedding = mem.frame_embedding(frame_id)?;
1954    }
1955
1956    let final_uri = options.uri.clone();
1957    let replaced_payload = payload_slice.is_some();
1958    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1959    mem.commit()?;
1960
1961    let updated_frame =
1962        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1963            mem.frame_by_uri(&uri)?
1964        } else {
1965            let mut query = TimelineQueryBuilder::default();
1966            if let Some(limit) = NonZeroU64::new(1) {
1967                query = query.limit(limit).reverse(true);
1968            }
1969            let latest = mem.timeline(query.build())?;
1970            if let Some(entry) = latest.first() {
1971                mem.frame_by_id(entry.frame_id)?
1972            } else {
1973                mem.frame_by_id(frame_id)?
1974            }
1975        };
1976
1977    if args.json {
1978        let json = json!({
1979            "sequence": seq,
1980            "previous_frame_id": frame_id,
1981            "frame": frame_to_json(&updated_frame),
1982            "replaced_payload": replaced_payload,
1983        });
1984        println!("{}", serde_json::to_string_pretty(&json)?);
1985    } else {
1986        println!(
1987            "Updated frame {} → new frame {} (seq {})",
1988            frame_id, updated_frame.id, seq
1989        );
1990        println!(
1991            "Payload: {}",
1992            if replaced_payload {
1993                "replaced"
1994            } else {
1995                "reused"
1996            }
1997        );
1998        print_frame_summary(&mut mem, &updated_frame)?;
1999    }
2000
2001    Ok(())
2002}
2003
2004fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2005    if let Some(uri) = provided {
2006        let sanitized = sanitize_uri(uri, true);
2007        return format!("mv2://{}", sanitized);
2008    }
2009
2010    if let Some(path) = source {
2011        let raw = path.to_string_lossy();
2012        let sanitized = sanitize_uri(&raw, false);
2013        return format!("mv2://{}", sanitized);
2014    }
2015
2016    format!("mv2://frames/{}", Uuid::new_v4())
2017}
2018
2019pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2020    let digest = hash(payload).to_hex();
2021    let short = &digest[..32];
2022    let ext_from_path = source
2023        .and_then(|path| path.extension())
2024        .and_then(|ext| ext.to_str())
2025        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2026        .filter(|ext| !ext.is_empty());
2027    let ext = ext_from_path
2028        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2029        .unwrap_or_else(|| "bin".to_string());
2030    format!("mv2://video/{short}.{ext}")
2031}
2032
2033fn derive_title(
2034    provided: Option<String>,
2035    source: Option<&Path>,
2036    payload_utf8: Option<&str>,
2037) -> Option<String> {
2038    if let Some(title) = provided {
2039        let trimmed = title.trim();
2040        if trimmed.is_empty() {
2041            None
2042        } else {
2043            Some(trimmed.to_string())
2044        }
2045    } else {
2046        if let Some(text) = payload_utf8 {
2047            if let Some(markdown_title) = extract_markdown_title(text) {
2048                return Some(markdown_title);
2049            }
2050        }
2051        source
2052            .and_then(|path| path.file_stem())
2053            .and_then(|stem| stem.to_str())
2054            .map(to_title_case)
2055    }
2056}
2057
2058fn extract_markdown_title(text: &str) -> Option<String> {
2059    for line in text.lines() {
2060        let trimmed = line.trim();
2061        if trimmed.starts_with('#') {
2062            let title = trimmed.trim_start_matches('#').trim();
2063            if !title.is_empty() {
2064                return Some(title.to_string());
2065            }
2066        }
2067    }
2068    None
2069}
2070
2071fn to_title_case(input: &str) -> String {
2072    if input.is_empty() {
2073        return String::new();
2074    }
2075    let mut chars = input.chars();
2076    let Some(first) = chars.next() else {
2077        return String::new();
2078    };
2079    let prefix = first.to_uppercase().collect::<String>();
2080    prefix + chars.as_str()
2081}
2082
2083fn should_skip_input(path: &Path) -> bool {
2084    matches!(
2085        path.file_name().and_then(|name| name.to_str()),
2086        Some(".DS_Store")
2087    )
2088}
2089
2090fn should_skip_dir(path: &Path) -> bool {
2091    matches!(
2092        path.file_name().and_then(|name| name.to_str()),
2093        Some(name) if name.starts_with('.')
2094    )
2095}
2096
2097enum InputSet {
2098    Stdin,
2099    Files(Vec<PathBuf>),
2100}
2101
2102/// Check if a file is an image based on extension
2103#[cfg(feature = "clip")]
2104fn is_image_file(path: &std::path::Path) -> bool {
2105    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2106        return false;
2107    };
2108    matches!(
2109        ext.to_ascii_lowercase().as_str(),
2110        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2111    )
2112}
2113
2114fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2115    let Some(raw) = input else {
2116        return Ok(InputSet::Stdin);
2117    };
2118
2119    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2120        let mut files = Vec::new();
2121        let mut matched_any = false;
2122        for entry in glob(raw)? {
2123            let path = entry?;
2124            if path.is_file() {
2125                matched_any = true;
2126                if should_skip_input(&path) {
2127                    continue;
2128                }
2129                files.push(path);
2130            }
2131        }
2132        files.sort();
2133        if files.is_empty() {
2134            if matched_any {
2135                anyhow::bail!(
2136                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2137                );
2138            }
2139            anyhow::bail!("pattern '{raw}' did not match any files");
2140        }
2141        return Ok(InputSet::Files(files));
2142    }
2143
2144    let path = PathBuf::from(raw);
2145    if path.is_dir() {
2146        let (mut files, skipped_any) = collect_directory_files(&path)?;
2147        files.sort();
2148        if files.is_empty() {
2149            if skipped_any {
2150                anyhow::bail!(
2151                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2152                    path.display()
2153                );
2154            }
2155            anyhow::bail!(
2156                "directory '{}' contains no ingestible files",
2157                path.display()
2158            );
2159        }
2160        Ok(InputSet::Files(files))
2161    } else {
2162        Ok(InputSet::Files(vec![path]))
2163    }
2164}
2165
2166fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2167    let mut files = Vec::new();
2168    let mut skipped_any = false;
2169    let mut stack = vec![root.to_path_buf()];
2170    while let Some(dir) = stack.pop() {
2171        for entry in fs::read_dir(&dir)? {
2172            let entry = entry?;
2173            let path = entry.path();
2174            let file_type = entry.file_type()?;
2175            if file_type.is_dir() {
2176                if should_skip_dir(&path) {
2177                    skipped_any = true;
2178                    continue;
2179                }
2180                stack.push(path);
2181            } else if file_type.is_file() {
2182                if should_skip_input(&path) {
2183                    skipped_any = true;
2184                    continue;
2185                }
2186                files.push(path);
2187            }
2188        }
2189    }
2190    Ok((files, skipped_any))
2191}
2192
2193struct IngestOutcome {
2194    report: PutReport,
2195    embedded: bool,
2196    #[cfg(feature = "parallel_segments")]
2197    parallel_input: Option<ParallelInput>,
2198}
2199
2200struct PutReport {
2201    seq: u64,
2202    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2203    frame_id: u64,
2204    uri: String,
2205    title: Option<String>,
2206    original_bytes: usize,
2207    stored_bytes: usize,
2208    compressed: bool,
2209    source: Option<PathBuf>,
2210    mime: Option<String>,
2211    metadata: Option<DocMetadata>,
2212    extraction: ExtractionSummary,
2213    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2214    #[cfg(feature = "logic_mesh")]
2215    search_text: Option<String>,
2216}
2217
2218struct CapacityGuard {
2219    #[allow(dead_code)]
2220    tier: Tier,
2221    capacity: u64,
2222    current_size: u64,
2223}
2224
2225impl CapacityGuard {
2226    const MIN_OVERHEAD: u64 = 128 * 1024;
2227    const RELATIVE_OVERHEAD: f64 = 0.05;
2228
2229    fn from_stats(stats: &Stats) -> Option<Self> {
2230        if stats.capacity_bytes == 0 {
2231            return None;
2232        }
2233        Some(Self {
2234            tier: stats.tier,
2235            capacity: stats.capacity_bytes,
2236            current_size: stats.size_bytes,
2237        })
2238    }
2239
2240    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2241        let projected = self
2242            .current_size
2243            .saturating_add(additional)
2244            .saturating_add(Self::estimate_overhead(additional));
2245        if projected > self.capacity {
2246            return Err(map_put_error(
2247                MemvidError::CapacityExceeded {
2248                    current: self.current_size,
2249                    limit: self.capacity,
2250                    required: projected,
2251                },
2252                Some(self.capacity),
2253            ));
2254        }
2255        Ok(())
2256    }
2257
2258    fn update_after_commit(&mut self, stats: &Stats) {
2259        self.current_size = stats.size_bytes;
2260    }
2261
2262    fn capacity_hint(&self) -> Option<u64> {
2263        Some(self.capacity)
2264    }
2265
2266    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2267    fn check_and_warn_capacity(&self) {
2268        if self.capacity == 0 {
2269            return;
2270        }
2271
2272        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2273
2274        // Warn at key thresholds
2275        if usage_pct >= 90.0 && usage_pct < 91.0 {
2276            eprintln!(
2277                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2278                format_bytes(self.current_size),
2279                format_bytes(self.capacity)
2280            );
2281            eprintln!("   Actions:");
2282            eprintln!("   1. Recreate with larger --size");
2283            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2284            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2285        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2286            eprintln!(
2287                "⚠️  WARNING: 75% capacity used ({} / {})",
2288                format_bytes(self.current_size),
2289                format_bytes(self.capacity)
2290            );
2291            eprintln!("   Consider planning for capacity expansion soon");
2292        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2293            eprintln!(
2294                "ℹ️  INFO: 50% capacity used ({} / {})",
2295                format_bytes(self.current_size),
2296                format_bytes(self.capacity)
2297            );
2298        }
2299    }
2300
2301    fn estimate_overhead(additional: u64) -> u64 {
2302        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2303        fractional.max(Self::MIN_OVERHEAD)
2304    }
2305}
2306
2307#[derive(Debug, Clone)]
2308pub struct FileAnalysis {
2309    pub mime: String,
2310    pub metadata: Option<DocMetadata>,
2311    pub search_text: Option<String>,
2312    pub extraction: ExtractionSummary,
2313}
2314
2315#[derive(Clone, Copy)]
2316pub struct AudioAnalyzeOptions {
2317    force: bool,
2318    segment_secs: u32,
2319    transcribe: bool,
2320}
2321
2322impl AudioAnalyzeOptions {
2323    const DEFAULT_SEGMENT_SECS: u32 = 30;
2324
2325    fn normalised_segment_secs(self) -> u32 {
2326        self.segment_secs.max(1)
2327    }
2328}
2329
2330impl Default for AudioAnalyzeOptions {
2331    fn default() -> Self {
2332        Self {
2333            force: false,
2334            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2335            transcribe: false,
2336        }
2337    }
2338}
2339
2340struct AudioAnalysis {
2341    metadata: DocAudioMetadata,
2342    caption: Option<String>,
2343    search_terms: Vec<String>,
2344    transcript: Option<String>,
2345}
2346
2347struct ImageAnalysis {
2348    width: u32,
2349    height: u32,
2350    palette: Vec<String>,
2351    caption: Option<String>,
2352    exif: Option<DocExifMetadata>,
2353}
2354
2355#[derive(Debug, Clone)]
2356pub struct ExtractionSummary {
2357    reader: Option<String>,
2358    status: ExtractionStatus,
2359    warnings: Vec<String>,
2360    duration_ms: Option<u64>,
2361    pages_processed: Option<u32>,
2362}
2363
2364impl ExtractionSummary {
2365    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2366        self.warnings.push(warning.into());
2367    }
2368}
2369
2370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2371enum ExtractionStatus {
2372    Skipped,
2373    Ok,
2374    FallbackUsed,
2375    Empty,
2376    Failed,
2377}
2378
2379impl ExtractionStatus {
2380    fn label(self) -> &'static str {
2381        match self {
2382            Self::Skipped => "skipped",
2383            Self::Ok => "ok",
2384            Self::FallbackUsed => "fallback_used",
2385            Self::Empty => "empty",
2386            Self::Failed => "failed",
2387        }
2388    }
2389}
2390
2391fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2392    let filename = source_path
2393        .and_then(|path| path.file_name())
2394        .and_then(|name| name.to_str())
2395        .map(|value| value.to_string());
2396    MediaManifest {
2397        kind: "video".to_string(),
2398        mime: mime.to_string(),
2399        bytes,
2400        filename,
2401        duration_ms: None,
2402        width: None,
2403        height: None,
2404        codec: None,
2405    }
2406}
2407
2408pub fn analyze_file(
2409    source_path: Option<&Path>,
2410    payload: &[u8],
2411    payload_utf8: Option<&str>,
2412    inferred_title: Option<&str>,
2413    audio_opts: AudioAnalyzeOptions,
2414    force_video: bool,
2415) -> FileAnalysis {
2416    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2417    let is_video = force_video || mime.starts_with("video/");
2418    let treat_as_text = if is_video { false } else { treat_as_text_base };
2419
2420    let mut metadata = DocMetadata::default();
2421    metadata.mime = Some(mime.clone());
2422    metadata.bytes = Some(payload.len() as u64);
2423    metadata.hash = Some(hash(payload).to_hex().to_string());
2424
2425    let mut search_text = None;
2426
2427    let mut extraction = ExtractionSummary {
2428        reader: None,
2429        status: ExtractionStatus::Skipped,
2430        warnings: Vec::new(),
2431        duration_ms: None,
2432        pages_processed: None,
2433    };
2434
2435    let reader_registry = default_reader_registry();
2436    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2437        if slice.is_empty() {
2438            None
2439        } else {
2440            Some(slice)
2441        }
2442    });
2443
2444    let apply_extracted_text = |text: &str,
2445                                reader_label: &str,
2446                                status_if_applied: ExtractionStatus,
2447                                extraction: &mut ExtractionSummary,
2448                                metadata: &mut DocMetadata,
2449                                search_text: &mut Option<String>|
2450     -> bool {
2451        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2452            let mut value = normalized.text;
2453            if normalized.truncated {
2454                value.push('…');
2455            }
2456            if metadata.caption.is_none() {
2457                if let Some(caption) = caption_from_text(&value) {
2458                    metadata.caption = Some(caption);
2459                }
2460            }
2461            *search_text = Some(value);
2462            extraction.reader = Some(reader_label.to_string());
2463            extraction.status = status_if_applied;
2464            true
2465        } else {
2466            false
2467        }
2468    };
2469
2470    if is_video {
2471        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2472    }
2473
2474    if treat_as_text {
2475        if let Some(text) = payload_utf8 {
2476            if !apply_extracted_text(
2477                text,
2478                "bytes",
2479                ExtractionStatus::Ok,
2480                &mut extraction,
2481                &mut metadata,
2482                &mut search_text,
2483            ) {
2484                extraction.status = ExtractionStatus::Empty;
2485                extraction.reader = Some("bytes".to_string());
2486                let msg = "text payload contained no searchable content after normalization";
2487                extraction.record_warning(msg);
2488                warn!("{msg}");
2489            }
2490        } else {
2491            extraction.reader = Some("bytes".to_string());
2492            extraction.status = ExtractionStatus::Failed;
2493            let msg = "payload reported as text but was not valid UTF-8";
2494            extraction.record_warning(msg);
2495            warn!("{msg}");
2496        }
2497    } else if mime == "application/pdf"
2498        || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2499        || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2500        || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2501        || mime == "application/vnd.ms-excel"
2502        || mime == "application/msword"
2503    {
2504        // Use reader registry for PDFs and Office documents (XLSX, DOCX, PPTX, etc.)
2505        let mut applied = false;
2506
2507        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2508        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2509            .with_uri(source_path.and_then(|p| p.to_str()))
2510            .with_magic(magic);
2511
2512        if let Some(reader) = reader_registry.find_reader(&hint) {
2513            match reader.extract(payload, &hint) {
2514                Ok(output) => {
2515                    extraction.reader = Some(output.reader_name.clone());
2516                    if let Some(ms) = output.diagnostics.duration_ms {
2517                        extraction.duration_ms = Some(ms);
2518                    }
2519                    if let Some(pages) = output.diagnostics.pages_processed {
2520                        extraction.pages_processed = Some(pages);
2521                    }
2522                    if let Some(mime_type) = output.document.mime_type.clone() {
2523                        metadata.mime = Some(mime_type);
2524                    }
2525
2526                    for warning in output.diagnostics.warnings.iter() {
2527                        extraction.record_warning(warning);
2528                        warn!("{warning}");
2529                    }
2530
2531                    let status = if output.diagnostics.fallback {
2532                        ExtractionStatus::FallbackUsed
2533                    } else {
2534                        ExtractionStatus::Ok
2535                    };
2536
2537                    if let Some(doc_text) = output.document.text.as_ref() {
2538                        applied = apply_extracted_text(
2539                            doc_text,
2540                            output.reader_name.as_str(),
2541                            status,
2542                            &mut extraction,
2543                            &mut metadata,
2544                            &mut search_text,
2545                        );
2546                        if !applied {
2547                            extraction.reader = Some(output.reader_name);
2548                            extraction.status = ExtractionStatus::Empty;
2549                            let msg = "primary reader returned no usable text";
2550                            extraction.record_warning(msg);
2551                            warn!("{msg}");
2552                        }
2553                    } else {
2554                        extraction.reader = Some(output.reader_name);
2555                        extraction.status = ExtractionStatus::Empty;
2556                        let msg = "primary reader returned empty text";
2557                        extraction.record_warning(msg);
2558                        warn!("{msg}");
2559                    }
2560                }
2561                Err(err) => {
2562                    let name = reader.name();
2563                    extraction.reader = Some(name.to_string());
2564                    extraction.status = ExtractionStatus::Failed;
2565                    let msg = format!("primary reader {name} failed: {err}");
2566                    extraction.record_warning(&msg);
2567                    warn!("{msg}");
2568                }
2569            }
2570        } else {
2571            extraction.record_warning("no registered reader matched this PDF payload");
2572            warn!("no registered reader matched this PDF payload");
2573        }
2574
2575        if !applied {
2576            match extract_pdf_text(payload) {
2577                Ok(pdf_text) => {
2578                    if !apply_extracted_text(
2579                        &pdf_text,
2580                        "pdf_extract",
2581                        ExtractionStatus::FallbackUsed,
2582                        &mut extraction,
2583                        &mut metadata,
2584                        &mut search_text,
2585                    ) {
2586                        extraction.reader = Some("pdf_extract".to_string());
2587                        extraction.status = ExtractionStatus::Empty;
2588                        let msg = "PDF text extraction yielded no searchable content";
2589                        extraction.record_warning(msg);
2590                        warn!("{msg}");
2591                    }
2592                }
2593                Err(err) => {
2594                    extraction.reader = Some("pdf_extract".to_string());
2595                    extraction.status = ExtractionStatus::Failed;
2596                    let msg = format!("failed to extract PDF text via fallback: {err}");
2597                    extraction.record_warning(&msg);
2598                    warn!("{msg}");
2599                }
2600            }
2601        }
2602    }
2603
2604    if !is_video && mime.starts_with("image/") {
2605        if let Some(image_meta) = analyze_image(payload) {
2606            let ImageAnalysis {
2607                width,
2608                height,
2609                palette,
2610                caption,
2611                exif,
2612            } = image_meta;
2613            metadata.width = Some(width);
2614            metadata.height = Some(height);
2615            if !palette.is_empty() {
2616                metadata.colors = Some(palette);
2617            }
2618            if metadata.caption.is_none() {
2619                metadata.caption = caption;
2620            }
2621            if let Some(exif) = exif {
2622                metadata.exif = Some(exif);
2623            }
2624        }
2625    }
2626
2627    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2628        if let Some(AudioAnalysis {
2629            metadata: audio_metadata,
2630            caption: audio_caption,
2631            mut search_terms,
2632            transcript,
2633        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2634        {
2635            if metadata.audio.is_none()
2636                || metadata
2637                    .audio
2638                    .as_ref()
2639                    .map_or(true, DocAudioMetadata::is_empty)
2640            {
2641                metadata.audio = Some(audio_metadata);
2642            }
2643            if metadata.caption.is_none() {
2644                metadata.caption = audio_caption;
2645            }
2646
2647            // Use transcript as primary search text if available
2648            if let Some(ref text) = transcript {
2649                extraction.reader = Some("whisper".to_string());
2650                extraction.status = ExtractionStatus::Ok;
2651                search_text = Some(text.clone());
2652            } else if !search_terms.is_empty() {
2653                match &mut search_text {
2654                    Some(existing) => {
2655                        for term in search_terms.drain(..) {
2656                            if !existing.contains(&term) {
2657                                if !existing.ends_with(' ') {
2658                                    existing.push(' ');
2659                                }
2660                                existing.push_str(&term);
2661                            }
2662                        }
2663                    }
2664                    None => {
2665                        search_text = Some(search_terms.join(" "));
2666                    }
2667                }
2668            }
2669        }
2670    }
2671
2672    if metadata.caption.is_none() {
2673        if let Some(title) = inferred_title {
2674            let trimmed = title.trim();
2675            if !trimmed.is_empty() {
2676                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2677            }
2678        }
2679    }
2680
2681    FileAnalysis {
2682        mime,
2683        metadata: if metadata.is_empty() {
2684            None
2685        } else {
2686            Some(metadata)
2687        },
2688        search_text,
2689        extraction,
2690    }
2691}
2692
2693fn default_reader_registry() -> &'static ReaderRegistry {
2694    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2695    REGISTRY.get_or_init(ReaderRegistry::default)
2696}
2697
2698fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2699    if detect_pdf_magic(magic) {
2700        return Some(DocumentFormat::Pdf);
2701    }
2702
2703    let mime = mime?.trim().to_ascii_lowercase();
2704    match mime.as_str() {
2705        "application/pdf" => Some(DocumentFormat::Pdf),
2706        "text/plain" => Some(DocumentFormat::PlainText),
2707        "text/markdown" => Some(DocumentFormat::Markdown),
2708        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2709        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2710            Some(DocumentFormat::Docx)
2711        }
2712        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2713            Some(DocumentFormat::Pptx)
2714        }
2715        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2716            Some(DocumentFormat::Xlsx)
2717        }
2718        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2719        _ => None,
2720    }
2721}
2722
2723fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2724    let mut slice = match magic {
2725        Some(slice) if !slice.is_empty() => slice,
2726        _ => return false,
2727    };
2728    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2729        slice = &slice[3..];
2730    }
2731    while let Some((first, rest)) = slice.split_first() {
2732        if first.is_ascii_whitespace() {
2733            slice = rest;
2734        } else {
2735            break;
2736        }
2737    }
2738    slice.starts_with(b"%PDF")
2739}
2740
2741/// Robust PDF text extraction with multiple fallbacks for cross-platform support.
2742/// Tries all extractors and returns the one with the most text content.
2743fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2744    let mut best_text = String::new();
2745    let mut best_source = "none";
2746
2747    // Minimum chars to consider extraction "good" - scales with PDF size
2748    // For a 12MB PDF, expect at least ~1000 chars; for 1MB, ~100 chars
2749    let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2750
2751    // Try pdf_extract first
2752    match pdf_extract::extract_text_from_mem(payload) {
2753        Ok(text) => {
2754            let trimmed = text.trim();
2755            if trimmed.len() > best_text.len() {
2756                best_text = trimmed.to_string();
2757                best_source = "pdf_extract";
2758            }
2759            if trimmed.len() >= min_good_chars {
2760                info!("pdf_extract extracted {} chars (good)", trimmed.len());
2761                return Ok(best_text);
2762            } else if !trimmed.is_empty() {
2763                warn!("pdf_extract returned only {} chars, trying other extractors", trimmed.len());
2764            }
2765        }
2766        Err(err) => {
2767            warn!("pdf_extract failed: {err}");
2768        }
2769    }
2770
2771    // Try lopdf - pure Rust, works on all platforms
2772    match extract_pdf_with_lopdf(payload) {
2773        Ok(text) => {
2774            let trimmed = text.trim();
2775            if trimmed.len() > best_text.len() {
2776                best_text = trimmed.to_string();
2777                best_source = "lopdf";
2778            }
2779            if trimmed.len() >= min_good_chars {
2780                info!("lopdf extracted {} chars (good)", trimmed.len());
2781                return Ok(best_text);
2782            } else if !trimmed.is_empty() {
2783                warn!("lopdf returned only {} chars, trying pdftotext", trimmed.len());
2784            }
2785        }
2786        Err(err) => {
2787            warn!("lopdf failed: {err}");
2788        }
2789    }
2790
2791    // Try pdftotext binary (requires poppler installed)
2792    match fallback_pdftotext(payload) {
2793        Ok(text) => {
2794            let trimmed = text.trim();
2795            if trimmed.len() > best_text.len() {
2796                best_text = trimmed.to_string();
2797                best_source = "pdftotext";
2798            }
2799            if !trimmed.is_empty() {
2800                info!("pdftotext extracted {} chars", trimmed.len());
2801            }
2802        }
2803        Err(err) => {
2804            warn!("pdftotext failed: {err}");
2805        }
2806    }
2807
2808    // Return the best result we found
2809    if !best_text.is_empty() {
2810        info!("using {} extraction ({} chars)", best_source, best_text.len());
2811        Ok(best_text)
2812    } else {
2813        warn!("all PDF extraction methods failed, storing without searchable text");
2814        Ok(String::new())
2815    }
2816}
2817
2818/// Extract text from PDF using lopdf (pure Rust, no native dependencies)
2819fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
2820    use lopdf::Document;
2821
2822    let mut document = Document::load_mem(payload)
2823        .map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
2824
2825    // Try to decrypt if encrypted (empty password)
2826    if document.is_encrypted() {
2827        let _ = document.decrypt("");
2828    }
2829
2830    // Decompress streams for text extraction
2831    let _ = document.decompress();
2832
2833    // Get sorted page numbers
2834    let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
2835    if page_numbers.is_empty() {
2836        return Err(anyhow!("PDF has no pages"));
2837    }
2838    page_numbers.sort_unstable();
2839
2840    // Limit to 4096 pages max
2841    if page_numbers.len() > 4096 {
2842        page_numbers.truncate(4096);
2843        warn!("PDF has more than 4096 pages, truncating extraction");
2844    }
2845
2846    // Extract text from all pages
2847    let text = document
2848        .extract_text(&page_numbers)
2849        .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
2850
2851    Ok(text.trim().to_string())
2852}
2853
2854fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2855    use std::io::Write;
2856    use std::process::{Command, Stdio};
2857
2858    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2859
2860    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2861    temp_pdf
2862        .write_all(payload)
2863        .context("failed to write pdf payload to temp file")?;
2864    temp_pdf.flush().context("failed to flush temp pdf file")?;
2865
2866    let child = Command::new(pdftotext)
2867        .arg("-layout")
2868        .arg(temp_pdf.path())
2869        .arg("-")
2870        .stdout(Stdio::piped())
2871        .spawn()
2872        .context("failed to spawn pdftotext")?;
2873
2874    let output = child
2875        .wait_with_output()
2876        .context("failed to read pdftotext output")?;
2877
2878    if !output.status.success() {
2879        return Err(anyhow!("pdftotext exited with status {}", output.status));
2880    }
2881
2882    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2883    Ok(text)
2884}
2885
2886fn detect_mime(
2887    source_path: Option<&Path>,
2888    payload: &[u8],
2889    payload_utf8: Option<&str>,
2890) -> (String, bool) {
2891    if let Some(kind) = infer::get(payload) {
2892        let mime = kind.mime_type().to_string();
2893        let treat_as_text = mime.starts_with("text/")
2894            || matches!(
2895                mime.as_str(),
2896                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2897            );
2898        return (mime, treat_as_text);
2899    }
2900
2901    let magic = magic_from_u8(payload);
2902    if !magic.is_empty() && magic != "application/octet-stream" {
2903        let treat_as_text = magic.starts_with("text/")
2904            || matches!(
2905                magic,
2906                "application/json"
2907                    | "application/xml"
2908                    | "application/javascript"
2909                    | "image/svg+xml"
2910                    | "text/plain"
2911            );
2912        return (magic.to_string(), treat_as_text);
2913    }
2914
2915    if let Some(path) = source_path {
2916        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2917            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2918                return (mime.to_string(), treat_as_text);
2919            }
2920        }
2921    }
2922
2923    if payload_utf8.is_some() {
2924        return ("text/plain".to_string(), true);
2925    }
2926
2927    ("application/octet-stream".to_string(), false)
2928}
2929
2930fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2931    let ext_lower = ext.to_ascii_lowercase();
2932    match ext_lower.as_str() {
2933        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2934        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2935        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2936        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2937        "md" | "markdown" => Some(("text/markdown", true)),
2938        "rst" => Some(("text/x-rst", true)),
2939        "json" => Some(("application/json", true)),
2940        "csv" => Some(("text/csv", true)),
2941        "tsv" => Some(("text/tab-separated-values", true)),
2942        "yaml" | "yml" => Some(("application/yaml", true)),
2943        "toml" => Some(("application/toml", true)),
2944        "html" | "htm" => Some(("text/html", true)),
2945        "xml" => Some(("application/xml", true)),
2946        "svg" => Some(("image/svg+xml", true)),
2947        "gif" => Some(("image/gif", false)),
2948        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2949        "png" => Some(("image/png", false)),
2950        "bmp" => Some(("image/bmp", false)),
2951        "ico" => Some(("image/x-icon", false)),
2952        "tif" | "tiff" => Some(("image/tiff", false)),
2953        "webp" => Some(("image/webp", false)),
2954        _ => None,
2955    }
2956}
2957
2958fn caption_from_text(text: &str) -> Option<String> {
2959    for line in text.lines() {
2960        let trimmed = line.trim();
2961        if !trimmed.is_empty() {
2962            return Some(truncate_to_boundary(trimmed, 240));
2963        }
2964    }
2965    None
2966}
2967
2968fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2969    if text.len() <= max_len {
2970        return text.to_string();
2971    }
2972    let mut end = max_len;
2973    while end > 0 && !text.is_char_boundary(end) {
2974        end -= 1;
2975    }
2976    if end == 0 {
2977        return String::new();
2978    }
2979    let mut truncated = text[..end].trim_end().to_string();
2980    truncated.push('…');
2981    truncated
2982}
2983
2984fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2985    let reader = ImageReader::new(Cursor::new(payload))
2986        .with_guessed_format()
2987        .map_err(|err| warn!("failed to guess image format: {err}"))
2988        .ok()?;
2989    let image = reader
2990        .decode()
2991        .map_err(|err| warn!("failed to decode image: {err}"))
2992        .ok()?;
2993    let width = image.width();
2994    let height = image.height();
2995    let rgb = image.to_rgb8();
2996    let palette = match get_palette(
2997        rgb.as_raw(),
2998        ColorFormat::Rgb,
2999        COLOR_PALETTE_SIZE,
3000        COLOR_PALETTE_QUALITY,
3001    ) {
3002        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3003        Err(err) => {
3004            warn!("failed to compute colour palette: {err}");
3005            Vec::new()
3006        }
3007    };
3008
3009    let (exif, exif_caption) = extract_exif_metadata(payload);
3010
3011    Some(ImageAnalysis {
3012        width,
3013        height,
3014        palette,
3015        caption: exif_caption,
3016        exif,
3017    })
3018}
3019
3020fn color_to_hex(color: Color) -> String {
3021    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3022}
3023
3024fn analyze_audio(
3025    payload: &[u8],
3026    source_path: Option<&Path>,
3027    mime: &str,
3028    options: AudioAnalyzeOptions,
3029) -> Option<AudioAnalysis> {
3030    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3031    use symphonia::core::errors::Error as SymphoniaError;
3032    use symphonia::core::formats::FormatOptions;
3033    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3034    use symphonia::core::meta::MetadataOptions;
3035    use symphonia::core::probe::Hint;
3036
3037    let cursor = Cursor::new(payload.to_vec());
3038    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3039
3040    let mut hint = Hint::new();
3041    if let Some(path) = source_path {
3042        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3043            hint.with_extension(ext);
3044        }
3045    }
3046    if let Some(suffix) = mime.split('/').nth(1) {
3047        hint.with_extension(suffix);
3048    }
3049
3050    let probed = symphonia::default::get_probe()
3051        .format(
3052            &hint,
3053            mss,
3054            &FormatOptions::default(),
3055            &MetadataOptions::default(),
3056        )
3057        .map_err(|err| warn!("failed to probe audio stream: {err}"))
3058        .ok()?;
3059
3060    let mut format = probed.format;
3061    let track = format.default_track()?;
3062    let track_id = track.id;
3063    let codec_params: CodecParameters = track.codec_params.clone();
3064    let sample_rate = codec_params.sample_rate;
3065    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3066
3067    let mut decoder = symphonia::default::get_codecs()
3068        .make(&codec_params, &DecoderOptions::default())
3069        .map_err(|err| warn!("failed to create audio decoder: {err}"))
3070        .ok()?;
3071
3072    let mut decoded_frames: u64 = 0;
3073    loop {
3074        let packet = match format.next_packet() {
3075            Ok(packet) => packet,
3076            Err(SymphoniaError::IoError(_)) => break,
3077            Err(SymphoniaError::DecodeError(err)) => {
3078                warn!("skipping undecodable audio packet: {err}");
3079                continue;
3080            }
3081            Err(SymphoniaError::ResetRequired) => {
3082                decoder.reset();
3083                continue;
3084            }
3085            Err(other) => {
3086                warn!("stopping audio analysis due to error: {other}");
3087                break;
3088            }
3089        };
3090
3091        if packet.track_id() != track_id {
3092            continue;
3093        }
3094
3095        match decoder.decode(&packet) {
3096            Ok(audio_buf) => {
3097                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3098            }
3099            Err(SymphoniaError::DecodeError(err)) => {
3100                warn!("failed to decode audio packet: {err}");
3101            }
3102            Err(SymphoniaError::IoError(_)) => break,
3103            Err(SymphoniaError::ResetRequired) => {
3104                decoder.reset();
3105            }
3106            Err(other) => {
3107                warn!("decoder error: {other}");
3108                break;
3109            }
3110        }
3111    }
3112
3113    let duration_secs = match sample_rate {
3114        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3115        _ => 0.0,
3116    };
3117
3118    let bitrate_kbps = if duration_secs > 0.0 {
3119        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3120    } else {
3121        None
3122    };
3123
3124    let tags = extract_lofty_tags(payload);
3125    let caption = tags
3126        .get("title")
3127        .cloned()
3128        .or_else(|| tags.get("tracktitle").cloned());
3129
3130    let mut search_terms = Vec::new();
3131    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3132        search_terms.push(title.clone());
3133    }
3134    if let Some(artist) = tags.get("artist") {
3135        search_terms.push(artist.clone());
3136    }
3137    if let Some(album) = tags.get("album") {
3138        search_terms.push(album.clone());
3139    }
3140    if let Some(genre) = tags.get("genre") {
3141        search_terms.push(genre.clone());
3142    }
3143
3144    let mut segments = Vec::new();
3145    if duration_secs > 0.0 {
3146        let segment_len = options.normalised_segment_secs() as f64;
3147        if segment_len > 0.0 {
3148            let mut start = 0.0;
3149            let mut idx = 1usize;
3150            while start < duration_secs {
3151                let end = (start + segment_len).min(duration_secs);
3152                segments.push(AudioSegmentMetadata {
3153                    start_seconds: start as f32,
3154                    end_seconds: end as f32,
3155                    label: Some(format!("Segment {}", idx)),
3156                });
3157                if end >= duration_secs {
3158                    break;
3159                }
3160                start = end;
3161                idx += 1;
3162            }
3163        }
3164    }
3165
3166    let mut metadata = DocAudioMetadata::default();
3167    if duration_secs > 0.0 {
3168        metadata.duration_secs = Some(duration_secs as f32);
3169    }
3170    if let Some(rate) = sample_rate {
3171        metadata.sample_rate_hz = Some(rate);
3172    }
3173    if let Some(channels) = channel_count {
3174        metadata.channels = Some(channels);
3175    }
3176    if let Some(bitrate) = bitrate_kbps {
3177        metadata.bitrate_kbps = Some(bitrate);
3178    }
3179    if codec_params.codec != CODEC_TYPE_NULL {
3180        metadata.codec = Some(format!("{:?}", codec_params.codec));
3181    }
3182    if !segments.is_empty() {
3183        metadata.segments = segments;
3184    }
3185    if !tags.is_empty() {
3186        metadata.tags = tags
3187            .iter()
3188            .map(|(k, v)| (k.clone(), v.clone()))
3189            .collect::<BTreeMap<_, _>>();
3190    }
3191
3192    // Transcribe audio if requested
3193    let transcript = if options.transcribe {
3194        transcribe_audio(source_path)
3195    } else {
3196        None
3197    };
3198
3199    Some(AudioAnalysis {
3200        metadata,
3201        caption,
3202        search_terms,
3203        transcript,
3204    })
3205}
3206
3207/// Transcribe audio file using Whisper
3208#[cfg(feature = "whisper")]
3209fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3210    use memvid_core::{WhisperConfig, WhisperTranscriber};
3211
3212    let path = source_path?;
3213
3214    tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3215
3216    let config = WhisperConfig::default();
3217    let mut transcriber = match WhisperTranscriber::new(&config) {
3218        Ok(t) => t,
3219        Err(e) => {
3220            tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3221            return None;
3222        }
3223    };
3224
3225    match transcriber.transcribe_file(path) {
3226        Ok(result) => {
3227            tracing::info!(
3228                duration = result.duration_secs,
3229                text_len = result.text.len(),
3230                "Audio transcription complete"
3231            );
3232            Some(result.text)
3233        }
3234        Err(e) => {
3235            tracing::warn!(error = %e, "Failed to transcribe audio");
3236            None
3237        }
3238    }
3239}
3240
3241#[cfg(not(feature = "whisper"))]
3242fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3243    tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3244    None
3245}
3246
3247fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3248    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3249
3250    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3251        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3252            out.entry("title".into())
3253                .or_insert_with(|| value.to_string());
3254            out.entry("tracktitle".into())
3255                .or_insert_with(|| value.to_string());
3256        }
3257        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3258            out.entry("artist".into())
3259                .or_insert_with(|| value.to_string());
3260        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3261            out.entry("artist".into())
3262                .or_insert_with(|| value.to_string());
3263        }
3264        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3265            out.entry("album".into())
3266                .or_insert_with(|| value.to_string());
3267        }
3268        if let Some(value) = tag.get_string(&ItemKey::Genre) {
3269            out.entry("genre".into())
3270                .or_insert_with(|| value.to_string());
3271        }
3272        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3273            out.entry("track_number".into())
3274                .or_insert_with(|| value.to_string());
3275        }
3276        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3277            out.entry("disc_number".into())
3278                .or_insert_with(|| value.to_string());
3279        }
3280    }
3281
3282    let mut tags = HashMap::new();
3283    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3284        Ok(probe) => probe,
3285        Err(err) => {
3286            warn!("failed to guess audio tag file type: {err}");
3287            return tags;
3288        }
3289    };
3290
3291    let tagged_file = match probe.read() {
3292        Ok(file) => file,
3293        Err(err) => {
3294            warn!("failed to read audio tags: {err}");
3295            return tags;
3296        }
3297    };
3298
3299    if let Some(primary) = tagged_file.primary_tag() {
3300        collect_tag(primary, &mut tags);
3301    }
3302    for tag in tagged_file.tags() {
3303        collect_tag(tag, &mut tags);
3304    }
3305
3306    tags
3307}
3308
3309fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3310    let mut cursor = Cursor::new(payload);
3311    let exif = match ExifReader::new().read_from_container(&mut cursor) {
3312        Ok(exif) => exif,
3313        Err(err) => {
3314            warn!("failed to read EXIF metadata: {err}");
3315            return (None, None);
3316        }
3317    };
3318
3319    let mut doc = DocExifMetadata::default();
3320    let mut has_data = false;
3321
3322    if let Some(make) = exif_string(&exif, Tag::Make) {
3323        doc.make = Some(make);
3324        has_data = true;
3325    }
3326    if let Some(model) = exif_string(&exif, Tag::Model) {
3327        doc.model = Some(model);
3328        has_data = true;
3329    }
3330    if let Some(lens) =
3331        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3332    {
3333        doc.lens = Some(lens);
3334        has_data = true;
3335    }
3336    if let Some(dt) =
3337        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3338    {
3339        doc.datetime = Some(dt);
3340        has_data = true;
3341    }
3342
3343    if let Some(gps) = extract_gps_metadata(&exif) {
3344        doc.gps = Some(gps);
3345        has_data = true;
3346    }
3347
3348    let caption = exif_string(&exif, Tag::ImageDescription);
3349
3350    let metadata = if has_data { Some(doc) } else { None };
3351    (metadata, caption)
3352}
3353
3354fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3355    exif.fields()
3356        .find(|field| field.tag == tag)
3357        .and_then(|field| field_to_string(field, exif))
3358}
3359
3360fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3361    let value = field.display_value().with_unit(exif).to_string();
3362    let trimmed = value.trim_matches('\0').trim();
3363    if trimmed.is_empty() {
3364        None
3365    } else {
3366        Some(trimmed.to_string())
3367    }
3368}
3369
3370fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3371    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3372    let latitude_ref_field = exif
3373        .fields()
3374        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3375    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3376    let longitude_ref_field = exif
3377        .fields()
3378        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3379
3380    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3381    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3382
3383    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3384        if reference.eq_ignore_ascii_case("S") {
3385            latitude = -latitude;
3386        }
3387    }
3388    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3389        if reference.eq_ignore_ascii_case("W") {
3390            longitude = -longitude;
3391        }
3392    }
3393
3394    Some(DocGpsMetadata {
3395        latitude,
3396        longitude,
3397    })
3398}
3399
3400fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3401    match value {
3402        ExifValue::Rational(values) if !values.is_empty() => {
3403            let deg = rational_to_f64_u(values.get(0)?)?;
3404            let min = values
3405                .get(1)
3406                .and_then(|v| rational_to_f64_u(v))
3407                .unwrap_or(0.0);
3408            let sec = values
3409                .get(2)
3410                .and_then(|v| rational_to_f64_u(v))
3411                .unwrap_or(0.0);
3412            Some(deg + (min / 60.0) + (sec / 3600.0))
3413        }
3414        ExifValue::SRational(values) if !values.is_empty() => {
3415            let deg = rational_to_f64_i(values.get(0)?)?;
3416            let min = values
3417                .get(1)
3418                .and_then(|v| rational_to_f64_i(v))
3419                .unwrap_or(0.0);
3420            let sec = values
3421                .get(2)
3422                .and_then(|v| rational_to_f64_i(v))
3423                .unwrap_or(0.0);
3424            Some(deg + (min / 60.0) + (sec / 3600.0))
3425        }
3426        _ => None,
3427    }
3428}
3429
3430fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3431    if value.denom == 0 {
3432        None
3433    } else {
3434        Some(value.num as f64 / value.denom as f64)
3435    }
3436}
3437
3438fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3439    if value.denom == 0 {
3440        None
3441    } else {
3442        Some(value.num as f64 / value.denom as f64)
3443    }
3444}
3445
3446fn value_to_ascii(value: &ExifValue) -> Option<String> {
3447    if let ExifValue::Ascii(values) = value {
3448        values
3449            .first()
3450            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3451            .map(|s| s.trim_matches('\0').trim().to_string())
3452            .filter(|s| !s.is_empty())
3453    } else {
3454        None
3455    }
3456}
3457
3458fn ingest_payload(
3459    mem: &mut Memvid,
3460    args: &PutArgs,
3461    config: &CliConfig,
3462    payload: Vec<u8>,
3463    source_path: Option<&Path>,
3464    capacity_guard: Option<&mut CapacityGuard>,
3465    enable_embedding: bool,
3466    runtime: Option<&EmbeddingRuntime>,
3467    parallel_mode: bool,
3468) -> Result<IngestOutcome> {
3469    let original_bytes = payload.len();
3470    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3471
3472    // Check if adding this payload would exceed free tier limit
3473    // Require API key for operations that would exceed 1GB total
3474    let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3475    crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3476
3477    let payload_text = std::str::from_utf8(&payload).ok();
3478    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3479
3480    let audio_opts = AudioAnalyzeOptions {
3481        force: args.audio || args.transcribe,
3482        segment_secs: args
3483            .audio_segment_seconds
3484            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3485        transcribe: args.transcribe,
3486    };
3487
3488    let mut analysis = analyze_file(
3489        source_path,
3490        &payload,
3491        payload_text,
3492        inferred_title.as_deref(),
3493        audio_opts,
3494        args.video,
3495    );
3496
3497    if args.video && !analysis.mime.starts_with("video/") {
3498        anyhow::bail!(
3499            "--video requires a video input; detected MIME type {}",
3500            analysis.mime
3501        );
3502    }
3503
3504    let mut search_text = analysis.search_text.clone();
3505    if let Some(ref mut text) = search_text {
3506        if text.len() > MAX_SEARCH_TEXT_LEN {
3507            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3508            *text = truncated;
3509        }
3510    }
3511    analysis.search_text = search_text.clone();
3512
3513    let uri = if let Some(ref explicit) = args.uri {
3514        derive_uri(Some(explicit), None)
3515    } else if args.video {
3516        derive_video_uri(&payload, source_path, &analysis.mime)
3517    } else {
3518        derive_uri(None, source_path)
3519    };
3520
3521    let mut options_builder = PutOptions::builder()
3522        .enable_embedding(enable_embedding)
3523        .auto_tag(!args.no_auto_tag)
3524        .extract_dates(!args.no_extract_dates)
3525        .extract_triplets(!args.no_extract_triplets);
3526    if let Some(ts) = args.timestamp {
3527        options_builder = options_builder.timestamp(ts);
3528    }
3529    if let Some(track) = &args.track {
3530        options_builder = options_builder.track(track.clone());
3531    }
3532    if args.video {
3533        options_builder = options_builder.kind("video");
3534        options_builder = options_builder.tag("kind", "video");
3535        options_builder = options_builder.push_tag("video");
3536    } else if let Some(kind) = &args.kind {
3537        options_builder = options_builder.kind(kind.clone());
3538    }
3539    options_builder = options_builder.uri(uri.clone());
3540    if let Some(ref title) = inferred_title {
3541        options_builder = options_builder.title(title.clone());
3542    }
3543    for (key, value) in &args.tags {
3544        options_builder = options_builder.tag(key.clone(), value.clone());
3545        if !key.is_empty() {
3546            options_builder = options_builder.push_tag(key.clone());
3547        }
3548        if !value.is_empty() && value != key {
3549            options_builder = options_builder.push_tag(value.clone());
3550        }
3551    }
3552    for label in &args.labels {
3553        options_builder = options_builder.label(label.clone());
3554    }
3555    if let Some(metadata) = analysis.metadata.clone() {
3556        if !metadata.is_empty() {
3557            options_builder = options_builder.metadata(metadata);
3558        }
3559    }
3560    for (idx, entry) in args.metadata.iter().enumerate() {
3561        match serde_json::from_str::<DocMetadata>(entry) {
3562            Ok(meta) => {
3563                options_builder = options_builder.metadata(meta);
3564            }
3565            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3566                Ok(value) => {
3567                    options_builder =
3568                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3569                }
3570                Err(err) => {
3571                    warn!("failed to parse --metadata JSON: {err}");
3572                }
3573            },
3574        }
3575    }
3576    if let Some(text) = search_text.clone() {
3577        if !text.trim().is_empty() {
3578            options_builder = options_builder.search_text(text);
3579        }
3580    }
3581    let mut options = options_builder.build();
3582
3583    let existing_frame = if args.update_existing || !args.allow_duplicate {
3584        match mem.frame_by_uri(&uri) {
3585            Ok(frame) => Some(frame),
3586            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3587            Err(err) => return Err(err.into()),
3588        }
3589    } else {
3590        None
3591    };
3592
3593    // Compute frame_id: for updates it's the existing frame's id,
3594    // for new frames it's the current frame count (which becomes the next id)
3595    let frame_id = if let Some(ref existing) = existing_frame {
3596        existing.id
3597    } else {
3598        mem.stats()?.frame_count
3599    };
3600
3601    let mut embedded = false;
3602    let allow_embedding = enable_embedding && !args.video;
3603    if enable_embedding && !allow_embedding && args.video {
3604        warn!("semantic embeddings are not generated for video payloads");
3605    }
3606    let seq = if let Some(existing) = existing_frame {
3607        if args.update_existing {
3608            let payload_for_update = payload.clone();
3609            if allow_embedding {
3610                if let Some(path) = args.embedding_vec.as_deref() {
3611                    let embedding = crate::utils::read_embedding(path)?;
3612                    if let Some(model_str) = args.embedding_vec_model.as_deref() {
3613                        let choice = model_str.parse::<EmbeddingModelChoice>()?;
3614                        let expected = choice.dimensions();
3615                        if expected != 0 && embedding.len() != expected {
3616                            anyhow::bail!(
3617                                "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3618                                embedding.len(),
3619                                choice.name(),
3620                                expected
3621                            );
3622                        }
3623                        apply_embedding_identity_metadata_from_choice(
3624                            &mut options,
3625                            choice,
3626                            embedding.len(),
3627                            Some(model_str),
3628                        );
3629                    } else {
3630                        options.extra_metadata.insert(
3631                            MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3632                            embedding.len().to_string(),
3633                        );
3634                    }
3635                    embedded = true;
3636                    mem.update_frame(
3637                        existing.id,
3638                        Some(payload_for_update),
3639                        options.clone(),
3640                        Some(embedding),
3641                    )
3642                    .map_err(|err| {
3643                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3644                    })?
3645                } else {
3646                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3647                    let embed_text = search_text
3648                        .clone()
3649                        .or_else(|| payload_text.map(|text| text.to_string()))
3650                        .unwrap_or_default();
3651                    if embed_text.trim().is_empty() {
3652                        warn!("no textual content available; embedding skipped");
3653                        mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3654                            .map_err(|err| {
3655                                map_put_error(
3656                                    err,
3657                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3658                                )
3659                            })?
3660                    } else {
3661                        // Truncate to avoid token limits
3662                        let truncated_text = truncate_for_embedding(&embed_text);
3663                        if truncated_text.len() < embed_text.len() {
3664                            info!(
3665                                "Truncated text from {} to {} chars for embedding",
3666                                embed_text.len(),
3667                                truncated_text.len()
3668                            );
3669                        }
3670                        let embedding = runtime.embed_passage(&truncated_text)?;
3671                        embedded = true;
3672                        apply_embedding_identity_metadata(&mut options, runtime);
3673                        mem.update_frame(
3674                            existing.id,
3675                            Some(payload_for_update),
3676                            options.clone(),
3677                            Some(embedding),
3678                        )
3679                        .map_err(|err| {
3680                            map_put_error(
3681                                err,
3682                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3683                            )
3684                        })?
3685                    }
3686                }
3687            } else {
3688                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3689                    .map_err(|err| {
3690                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3691                    })?
3692            }
3693        } else {
3694            return Err(DuplicateUriError::new(uri.as_str()).into());
3695        }
3696    } else {
3697        if let Some(guard) = capacity_guard.as_ref() {
3698            guard.ensure_capacity(stored_bytes as u64)?;
3699        }
3700        if parallel_mode {
3701            #[cfg(feature = "parallel_segments")]
3702            {
3703                let mut parent_embedding = None;
3704                let mut chunk_embeddings_vec = None;
3705                if allow_embedding {
3706                    if let Some(path) = args.embedding_vec.as_deref() {
3707                        let embedding = crate::utils::read_embedding(path)?;
3708                        if let Some(model_str) = args.embedding_vec_model.as_deref() {
3709                            let choice = model_str.parse::<EmbeddingModelChoice>()?;
3710                            let expected = choice.dimensions();
3711                            if expected != 0 && embedding.len() != expected {
3712                                anyhow::bail!(
3713                                    "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3714                                    embedding.len(),
3715                                    choice.name(),
3716                                    expected
3717                                );
3718                            }
3719                            apply_embedding_identity_metadata_from_choice(
3720                                &mut options,
3721                                choice,
3722                                embedding.len(),
3723                                Some(model_str),
3724                            );
3725                        } else {
3726                            options.extra_metadata.insert(
3727                                MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3728                                embedding.len().to_string(),
3729                            );
3730                        }
3731                        parent_embedding = Some(embedding);
3732                        embedded = true;
3733                    } else {
3734                        let runtime =
3735                            runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3736                        let embed_text = search_text
3737                            .clone()
3738                            .or_else(|| payload_text.map(|text| text.to_string()))
3739                            .unwrap_or_default();
3740                        if embed_text.trim().is_empty() {
3741                            warn!("no textual content available; embedding skipped");
3742                        } else {
3743                            // Check if document will be chunked - if so, embed each chunk
3744                            info!(
3745                                "parallel mode: checking for chunks on {} bytes payload",
3746                                payload.len()
3747                            );
3748                            if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3749                                // Document will be chunked - embed each chunk for full semantic coverage
3750                                info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3751
3752                            // Apply temporal enrichment if enabled (before contextual)
3753                            #[cfg(feature = "temporal_enrich")]
3754                            let enriched_chunks = if args.temporal_enrich {
3755                                info!(
3756                                    "Temporal enrichment enabled, processing {} chunks",
3757                                    chunk_texts.len()
3758                                );
3759                                // Use today's date as fallback document date
3760                                let today = chrono::Local::now().date_naive();
3761                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3762                                // Results are tuples of (enriched_text, TemporalEnrichment)
3763                                let enriched: Vec<String> =
3764                                    results.iter().map(|(text, _)| text.clone()).collect();
3765                                let resolved_count = results
3766                                    .iter()
3767                                    .filter(|(_, e)| {
3768                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3769                                    })
3770                                    .count();
3771                                info!(
3772                                    "Temporal enrichment: resolved {} chunks with temporal context",
3773                                    resolved_count
3774                                );
3775                                enriched
3776                            } else {
3777                                chunk_texts.clone()
3778                            };
3779                            #[cfg(not(feature = "temporal_enrich"))]
3780                            let enriched_chunks = {
3781                                if args.temporal_enrich {
3782                                    warn!(
3783                                        "Temporal enrichment requested but feature not compiled in"
3784                                    );
3785                                }
3786                                chunk_texts.clone()
3787                            };
3788
3789                            // Apply contextual retrieval if enabled
3790                            let embed_chunks = if args.contextual {
3791                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3792                                let engine = if args.contextual_model.as_deref() == Some("local") {
3793                                    #[cfg(feature = "llama-cpp")]
3794                                    {
3795                                        let model_path = get_local_contextual_model_path()?;
3796                                        ContextualEngine::local(model_path)
3797                                    }
3798                                    #[cfg(not(feature = "llama-cpp"))]
3799                                    {
3800                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3801                                    }
3802                                } else if let Some(model) = &args.contextual_model {
3803                                    ContextualEngine::openai_with_model(model)?
3804                                } else {
3805                                    ContextualEngine::openai()?
3806                                };
3807
3808                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3809                                {
3810                                    Ok(contexts) => {
3811                                        info!("Generated {} contextual prefixes", contexts.len());
3812                                        apply_contextual_prefixes(
3813                                            &embed_text,
3814                                            &enriched_chunks,
3815                                            &contexts,
3816                                        )
3817                                    }
3818                                    Err(e) => {
3819                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3820                                        enriched_chunks.clone()
3821                                    }
3822                                }
3823                            } else {
3824                                enriched_chunks
3825                            };
3826
3827                            let truncated_chunks: Vec<String> = embed_chunks
3828                                .iter()
3829                                .map(|chunk_text| {
3830                                    // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
3831                                    let truncated_chunk = truncate_for_embedding(chunk_text);
3832                                    if truncated_chunk.len() < chunk_text.len() {
3833                                        info!(
3834                                            "parallel mode: Truncated chunk from {} to {} chars for embedding",
3835                                            chunk_text.len(),
3836                                            truncated_chunk.len()
3837                                        );
3838                                    }
3839                                    truncated_chunk
3840                                })
3841                                .collect();
3842                            let truncated_refs: Vec<&str> =
3843                                truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3844                            let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3845                            let num_chunks = chunk_embeddings.len();
3846                            chunk_embeddings_vec = Some(chunk_embeddings);
3847                            // Also embed the parent for the parent frame (truncate to avoid token limits)
3848                            let parent_text = truncate_for_embedding(&embed_text);
3849                            if parent_text.len() < embed_text.len() {
3850                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3851                            }
3852                            parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3853                            embedded = true;
3854                            info!(
3855                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3856                                num_chunks
3857                            );
3858                            } else {
3859                                // No chunking - just embed the whole document (truncate to avoid token limits)
3860                                info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3861                                let truncated_text = truncate_for_embedding(&embed_text);
3862                                if truncated_text.len() < embed_text.len() {
3863                                    info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3864                                }
3865                                parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3866                                embedded = true;
3867                            }
3868                        }
3869                    }
3870                }
3871                if embedded {
3872                    if let Some(runtime) = runtime {
3873                        apply_embedding_identity_metadata(&mut options, runtime);
3874                    }
3875                }
3876                let payload_variant = if let Some(path) = source_path {
3877                    ParallelPayload::Path(path.to_path_buf())
3878                } else {
3879                    ParallelPayload::Bytes(payload)
3880                };
3881                let input = ParallelInput {
3882                    payload: payload_variant,
3883                    options: options.clone(),
3884                    embedding: parent_embedding,
3885                    chunk_embeddings: chunk_embeddings_vec,
3886                };
3887                return Ok(IngestOutcome {
3888                    report: PutReport {
3889                        seq: 0,
3890                        frame_id: 0, // Will be populated after parallel commit
3891                        uri,
3892                        title: inferred_title,
3893                        original_bytes,
3894                        stored_bytes,
3895                        compressed,
3896                        source: source_path.map(Path::to_path_buf),
3897                        mime: Some(analysis.mime),
3898                        metadata: analysis.metadata,
3899                        extraction: analysis.extraction,
3900                        #[cfg(feature = "logic_mesh")]
3901                        search_text: search_text.clone(),
3902                    },
3903                    embedded,
3904                    parallel_input: Some(input),
3905                });
3906            }
3907            #[cfg(not(feature = "parallel_segments"))]
3908            {
3909                mem.put_bytes_with_options(&payload, options)
3910                    .map_err(|err| {
3911                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3912                    })?
3913            }
3914        } else if allow_embedding {
3915            if let Some(path) = args.embedding_vec.as_deref() {
3916                let embedding = crate::utils::read_embedding(path)?;
3917                if let Some(model_str) = args.embedding_vec_model.as_deref() {
3918                    let choice = model_str.parse::<EmbeddingModelChoice>()?;
3919                    let expected = choice.dimensions();
3920                    if expected != 0 && embedding.len() != expected {
3921                        anyhow::bail!(
3922                            "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3923                            embedding.len(),
3924                            choice.name(),
3925                            expected
3926                        );
3927                    }
3928                    apply_embedding_identity_metadata_from_choice(
3929                        &mut options,
3930                        choice,
3931                        embedding.len(),
3932                        Some(model_str),
3933                    );
3934                } else {
3935                    options.extra_metadata.insert(
3936                        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3937                        embedding.len().to_string(),
3938                    );
3939                }
3940                embedded = true;
3941                mem.put_with_embedding_and_options(&payload, embedding, options)
3942                    .map_err(|err| {
3943                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3944                    })?
3945            } else {
3946                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3947                let embed_text = search_text
3948                    .clone()
3949                    .or_else(|| payload_text.map(|text| text.to_string()))
3950                    .unwrap_or_default();
3951                if embed_text.trim().is_empty() {
3952                    warn!("no textual content available; embedding skipped");
3953                    mem.put_bytes_with_options(&payload, options)
3954                        .map_err(|err| {
3955                            map_put_error(
3956                                err,
3957                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3958                            )
3959                        })?
3960                } else {
3961                    // Check if document will be chunked - if so, embed each chunk
3962                    if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3963                    // Document will be chunked - embed each chunk for full semantic coverage
3964                    info!(
3965                        "Document will be chunked into {} chunks, generating embeddings for each",
3966                        chunk_texts.len()
3967                    );
3968
3969                    // Apply temporal enrichment if enabled (before contextual)
3970                    #[cfg(feature = "temporal_enrich")]
3971                    let enriched_chunks = if args.temporal_enrich {
3972                        info!(
3973                            "Temporal enrichment enabled, processing {} chunks",
3974                            chunk_texts.len()
3975                        );
3976                        // Use today's date as fallback document date
3977                        let today = chrono::Local::now().date_naive();
3978                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3979                        // Results are tuples of (enriched_text, TemporalEnrichment)
3980                        let enriched: Vec<String> =
3981                            results.iter().map(|(text, _)| text.clone()).collect();
3982                        let resolved_count = results
3983                            .iter()
3984                            .filter(|(_, e)| {
3985                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
3986                            })
3987                            .count();
3988                        info!(
3989                            "Temporal enrichment: resolved {} chunks with temporal context",
3990                            resolved_count
3991                        );
3992                        enriched
3993                    } else {
3994                        chunk_texts.clone()
3995                    };
3996                    #[cfg(not(feature = "temporal_enrich"))]
3997                    let enriched_chunks = {
3998                        if args.temporal_enrich {
3999                            warn!("Temporal enrichment requested but feature not compiled in");
4000                        }
4001                        chunk_texts.clone()
4002                    };
4003
4004                    // Apply contextual retrieval if enabled
4005                    let embed_chunks = if args.contextual {
4006                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4007                        let engine = if args.contextual_model.as_deref() == Some("local") {
4008                            #[cfg(feature = "llama-cpp")]
4009                            {
4010                                let model_path = get_local_contextual_model_path()?;
4011                                ContextualEngine::local(model_path)
4012                            }
4013                            #[cfg(not(feature = "llama-cpp"))]
4014                            {
4015                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4016                            }
4017                        } else if let Some(model) = &args.contextual_model {
4018                            ContextualEngine::openai_with_model(model)?
4019                        } else {
4020                            ContextualEngine::openai()?
4021                        };
4022
4023                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4024                            Ok(contexts) => {
4025                                info!("Generated {} contextual prefixes", contexts.len());
4026                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
4027                            }
4028                            Err(e) => {
4029                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4030                                enriched_chunks.clone()
4031                            }
4032                        }
4033                    } else {
4034                        enriched_chunks
4035                    };
4036
4037                    let truncated_chunks: Vec<String> = embed_chunks
4038                        .iter()
4039                        .map(|chunk_text| {
4040                            // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4041                            let truncated_chunk = truncate_for_embedding(chunk_text);
4042                            if truncated_chunk.len() < chunk_text.len() {
4043                                info!(
4044                                    "Truncated chunk from {} to {} chars for embedding",
4045                                    chunk_text.len(),
4046                                    truncated_chunk.len()
4047                                );
4048                            }
4049                            truncated_chunk
4050                        })
4051                        .collect();
4052                    let truncated_refs: Vec<&str> =
4053                        truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
4054                    let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4055                    // Truncate parent text to avoid token limits
4056                    let parent_text = truncate_for_embedding(&embed_text);
4057                    if parent_text.len() < embed_text.len() {
4058                        info!(
4059                            "Truncated parent text from {} to {} chars for embedding",
4060                            embed_text.len(),
4061                            parent_text.len()
4062                        );
4063                    }
4064                    let parent_embedding = runtime.embed_passage(&parent_text)?;
4065                    embedded = true;
4066                    apply_embedding_identity_metadata(&mut options, runtime);
4067                    info!(
4068                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
4069                        chunk_embeddings.len()
4070                    );
4071                    mem.put_with_chunk_embeddings(
4072                        &payload,
4073                        Some(parent_embedding),
4074                        chunk_embeddings,
4075                        options,
4076                    )
4077                    .map_err(|err| {
4078                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4079                    })?
4080                    } else {
4081                        // No chunking - just embed the whole document (truncate to avoid token limits)
4082                        info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4083                        let truncated_text = truncate_for_embedding(&embed_text);
4084                        if truncated_text.len() < embed_text.len() {
4085                            info!(
4086                                "Truncated text from {} to {} chars for embedding",
4087                                embed_text.len(),
4088                                truncated_text.len()
4089                            );
4090                        }
4091                        let embedding = runtime.embed_passage(&truncated_text)?;
4092                        embedded = true;
4093                        apply_embedding_identity_metadata(&mut options, runtime);
4094                        mem.put_with_embedding_and_options(&payload, embedding, options)
4095                            .map_err(|err| {
4096                                map_put_error(
4097                                    err,
4098                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4099                                )
4100                            })?
4101                    }
4102                }
4103            }
4104        } else {
4105            mem.put_bytes_with_options(&payload, options)
4106                .map_err(|err| {
4107                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4108                })?
4109        }
4110    };
4111
4112    Ok(IngestOutcome {
4113        report: PutReport {
4114            seq,
4115            frame_id,
4116            uri,
4117            title: inferred_title,
4118            original_bytes,
4119            stored_bytes,
4120            compressed,
4121            source: source_path.map(Path::to_path_buf),
4122            mime: Some(analysis.mime),
4123            metadata: analysis.metadata,
4124            extraction: analysis.extraction,
4125            #[cfg(feature = "logic_mesh")]
4126            search_text,
4127        },
4128        embedded,
4129        #[cfg(feature = "parallel_segments")]
4130        parallel_input: None,
4131    })
4132}
4133
4134fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4135    if std::str::from_utf8(payload).is_ok() {
4136        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4137        Ok((compressed.len(), true))
4138    } else {
4139        Ok((payload.len(), false))
4140    }
4141}
4142
4143fn print_report(report: &PutReport) {
4144    let name = report
4145        .source
4146        .as_ref()
4147        .map(|path| {
4148            let pretty = env::current_dir()
4149                .ok()
4150                .as_ref()
4151                .and_then(|cwd| diff_paths(path, cwd))
4152                .unwrap_or_else(|| path.to_path_buf());
4153            pretty.to_string_lossy().into_owned()
4154        })
4155        .unwrap_or_else(|| "stdin".to_string());
4156
4157    let ratio = if report.original_bytes > 0 {
4158        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4159    } else {
4160        100.0
4161    };
4162
4163    println!("• {name} → {}", report.uri);
4164    println!("  seq: {}", report.seq);
4165    if report.compressed {
4166        println!(
4167            "  size: {} B → {} B ({:.1}% of original, compressed)",
4168            report.original_bytes, report.stored_bytes, ratio
4169        );
4170    } else {
4171        println!("  size: {} B (stored as-is)", report.original_bytes);
4172    }
4173    if let Some(mime) = &report.mime {
4174        println!("  mime: {mime}");
4175    }
4176    if let Some(title) = &report.title {
4177        println!("  title: {title}");
4178    }
4179    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4180    println!(
4181        "  extraction: status={} reader={}",
4182        report.extraction.status.label(),
4183        extraction_reader
4184    );
4185    if let Some(ms) = report.extraction.duration_ms {
4186        println!("  extraction-duration: {} ms", ms);
4187    }
4188    if let Some(pages) = report.extraction.pages_processed {
4189        println!("  extraction-pages: {}", pages);
4190    }
4191    for warning in &report.extraction.warnings {
4192        println!("  warning: {warning}");
4193    }
4194    if let Some(metadata) = &report.metadata {
4195        if let Some(caption) = &metadata.caption {
4196            println!("  caption: {caption}");
4197        }
4198        if let Some(audio) = metadata.audio.as_ref() {
4199            if audio.duration_secs.is_some()
4200                || audio.sample_rate_hz.is_some()
4201                || audio.channels.is_some()
4202            {
4203                let duration = audio
4204                    .duration_secs
4205                    .map(|secs| format!("{secs:.1}s"))
4206                    .unwrap_or_else(|| "unknown".into());
4207                let rate = audio
4208                    .sample_rate_hz
4209                    .map(|hz| format!("{} Hz", hz))
4210                    .unwrap_or_else(|| "? Hz".into());
4211                let channels = audio
4212                    .channels
4213                    .map(|ch| ch.to_string())
4214                    .unwrap_or_else(|| "?".into());
4215                println!("  audio: duration {duration}, {channels} ch, {rate}");
4216            }
4217        }
4218    }
4219
4220    println!();
4221}
4222
4223fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4224    let source = report
4225        .source
4226        .as_ref()
4227        .map(|path| path.to_string_lossy().into_owned());
4228    let extraction = json!({
4229        "status": report.extraction.status.label(),
4230        "reader": report.extraction.reader,
4231        "duration_ms": report.extraction.duration_ms,
4232        "pages_processed": report.extraction.pages_processed,
4233        "warnings": report.extraction.warnings,
4234    });
4235    json!({
4236        "seq": report.seq,
4237        "uri": report.uri,
4238        "title": report.title,
4239        "original_bytes": report.original_bytes,
4240        "original_bytes_human": format_bytes(report.original_bytes as u64),
4241        "stored_bytes": report.stored_bytes,
4242        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4243        "compressed": report.compressed,
4244        "mime": report.mime,
4245        "source": source,
4246        "metadata": report.metadata,
4247        "extraction": extraction,
4248    })
4249}
4250
4251fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4252    match err {
4253        MemvidError::CapacityExceeded {
4254            current,
4255            limit,
4256            required,
4257        } => anyhow!(CapacityExceededMessage {
4258            current,
4259            limit,
4260            required,
4261        }),
4262        other => other.into(),
4263    }
4264}
4265
4266fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4267    for (idx, entry) in entries.iter().enumerate() {
4268        match serde_json::from_str::<DocMetadata>(entry) {
4269            Ok(meta) => {
4270                options.metadata = Some(meta);
4271            }
4272            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4273                Ok(value) => {
4274                    options
4275                        .extra_metadata
4276                        .insert(format!("custom_metadata_{idx}"), value.to_string());
4277                }
4278                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4279            },
4280        }
4281    }
4282}
4283
4284fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4285    let stats = mem.stats()?;
4286    let message = match stats.tier {
4287        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4288        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4289    };
4290    Ok(message)
4291}
4292
4293fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4294    let trimmed = raw.trim().trim_start_matches("mv2://");
4295    let normalized = trimmed.replace('\\', "/");
4296    let mut segments: Vec<String> = Vec::new();
4297    for segment in normalized.split('/') {
4298        if segment.is_empty() || segment == "." {
4299            continue;
4300        }
4301        if segment == ".." {
4302            segments.pop();
4303            continue;
4304        }
4305        segments.push(segment.to_string());
4306    }
4307
4308    if segments.is_empty() {
4309        return normalized
4310            .split('/')
4311            .last()
4312            .filter(|s| !s.is_empty())
4313            .unwrap_or("document")
4314            .to_string();
4315    }
4316
4317    if keep_path {
4318        segments.join("/")
4319    } else {
4320        segments
4321            .last()
4322            .cloned()
4323            .unwrap_or_else(|| "document".to_string())
4324    }
4325}
4326
4327fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4328    if quiet {
4329        let pb = ProgressBar::hidden();
4330        pb.set_message(message.to_string());
4331        return pb;
4332    }
4333
4334    match total {
4335        Some(len) => {
4336            let pb = ProgressBar::new(len);
4337            pb.set_draw_target(ProgressDrawTarget::stderr());
4338            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4339                .unwrap_or_else(|_| ProgressStyle::default_bar());
4340            pb.set_style(style);
4341            pb.set_message(message.to_string());
4342            pb
4343        }
4344        None => {
4345            let pb = ProgressBar::new_spinner();
4346            pb.set_draw_target(ProgressDrawTarget::stderr());
4347            pb.set_message(message.to_string());
4348            pb.enable_steady_tick(Duration::from_millis(120));
4349            pb
4350        }
4351    }
4352}
4353
4354fn create_spinner(message: &str) -> ProgressBar {
4355    let pb = ProgressBar::new_spinner();
4356    pb.set_draw_target(ProgressDrawTarget::stderr());
4357    let style = ProgressStyle::with_template("{spinner} {msg}")
4358        .unwrap_or_else(|_| ProgressStyle::default_spinner())
4359        .tick_strings(&["-", "\\", "|", "/"]);
4360    pb.set_style(style);
4361    pb.set_message(message.to_string());
4362    pb.enable_steady_tick(Duration::from_millis(120));
4363    pb
4364}
4365
4366// ============================================================================
4367// put-many command - batch ingestion with pre-computed embeddings
4368// ============================================================================
4369
4370/// Arguments for the `put-many` subcommand
4371#[cfg(feature = "parallel_segments")]
4372#[derive(Args)]
4373pub struct PutManyArgs {
4374    /// Path to the memory file to append to
4375    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4376    pub file: PathBuf,
4377    /// Emit JSON instead of human-readable output
4378    #[arg(long)]
4379    pub json: bool,
4380    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
4381    /// If not specified, reads from stdin
4382    #[arg(long, value_name = "PATH")]
4383    pub input: Option<PathBuf>,
4384    /// Zstd compression level (1-9, default: 3)
4385    #[arg(long, default_value = "3")]
4386    pub compression_level: i32,
4387    #[command(flatten)]
4388    pub lock: LockCliArgs,
4389}
4390
4391/// JSON input format for put-many requests
4392#[cfg(feature = "parallel_segments")]
4393#[derive(Debug, serde::Deserialize)]
4394pub struct PutManyRequest {
4395    pub title: String,
4396    #[serde(default)]
4397    pub label: String,
4398    pub text: String,
4399    #[serde(default)]
4400    pub uri: Option<String>,
4401    #[serde(default)]
4402    pub tags: Vec<String>,
4403    #[serde(default)]
4404    pub labels: Vec<String>,
4405    #[serde(default)]
4406    pub metadata: serde_json::Value,
4407    /// Extra metadata (string key/value) stored on the frame.
4408    /// Use this to persist embedding identity keys like `memvid.embedding.model`.
4409    #[serde(default)]
4410    pub extra_metadata: BTreeMap<String, String>,
4411    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
4412    #[serde(default)]
4413    pub embedding: Option<Vec<f32>>,
4414    /// Pre-computed embeddings for each chunk (matched by index)
4415    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
4416    /// The number of chunk embeddings should match the number of chunks that will be created.
4417    #[serde(default)]
4418    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4419}
4420
4421/// Batch input format (array of requests)
4422#[cfg(feature = "parallel_segments")]
4423#[derive(Debug, serde::Deserialize)]
4424#[serde(transparent)]
4425pub struct PutManyBatch {
4426    pub requests: Vec<PutManyRequest>,
4427}
4428
4429#[cfg(feature = "parallel_segments")]
4430pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4431    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4432    use std::io::Read;
4433
4434    // Check if plan allows write operations (blocks expired subscriptions)
4435    crate::utils::require_active_plan(config, "put-many")?;
4436
4437    let mut mem = Memvid::open(&args.file)?;
4438    crate::utils::ensure_cli_mutation_allowed(&mem)?;
4439    crate::utils::apply_lock_cli(&mut mem, &args.lock);
4440
4441    // Ensure indexes are enabled
4442    let stats = mem.stats()?;
4443    if !stats.has_lex_index {
4444        mem.enable_lex()?;
4445    }
4446    if !stats.has_vec_index {
4447        mem.enable_vec()?;
4448    }
4449
4450    // Check if current memory size exceeds free tier limit
4451    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4452
4453    // Read batch input
4454    let batch_json: String = if let Some(input_path) = &args.input {
4455        fs::read_to_string(input_path)
4456            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4457    } else {
4458        let mut buffer = String::new();
4459        io::stdin()
4460            .read_to_string(&mut buffer)
4461            .context("Failed to read from stdin")?;
4462        buffer
4463    };
4464
4465    let batch: PutManyBatch =
4466        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4467
4468    if batch.requests.is_empty() {
4469        if args.json {
4470            println!("{{\"frame_ids\": [], \"count\": 0}}");
4471        } else {
4472            println!("No requests to process");
4473        }
4474        return Ok(());
4475    }
4476
4477    let count = batch.requests.len();
4478    if !args.json {
4479        eprintln!("Processing {} documents...", count);
4480    }
4481
4482    // Convert to ParallelInput
4483    let inputs: Vec<ParallelInput> = batch
4484        .requests
4485        .into_iter()
4486        .enumerate()
4487        .map(|(i, req)| {
4488            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4489            let mut options = PutOptions::default();
4490            options.title = Some(req.title);
4491            options.uri = Some(uri);
4492            options.tags = req.tags;
4493            options.labels = req.labels;
4494            options.extra_metadata = req.extra_metadata;
4495            if !req.metadata.is_null() {
4496                match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4497                    Ok(meta) => options.metadata = Some(meta),
4498                    Err(_) => {
4499                        options
4500                            .extra_metadata
4501                            .entry("memvid.metadata.json".to_string())
4502                            .or_insert_with(|| req.metadata.to_string());
4503                    }
4504                }
4505            }
4506
4507            if let Some(embedding) = req
4508                .embedding
4509                .as_ref()
4510                .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4511            {
4512                options
4513                    .extra_metadata
4514                    .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4515                    .or_insert_with(|| embedding.len().to_string());
4516            }
4517
4518            ParallelInput {
4519                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4520                options,
4521                embedding: req.embedding,
4522                chunk_embeddings: req.chunk_embeddings,
4523            }
4524        })
4525        .collect();
4526
4527    // Build options
4528    let build_opts = BuildOpts {
4529        zstd_level: args.compression_level.clamp(1, 9),
4530        ..BuildOpts::default()
4531    };
4532
4533    // Ingest batch
4534    let frame_ids = mem
4535        .put_parallel_inputs(&inputs, build_opts)
4536        .context("Failed to ingest batch")?;
4537
4538    if args.json {
4539        let output = serde_json::json!({
4540            "frame_ids": frame_ids,
4541            "count": frame_ids.len()
4542        });
4543        println!("{}", serde_json::to_string(&output)?);
4544    } else {
4545        println!("Ingested {} documents", frame_ids.len());
4546        if frame_ids.len() <= 10 {
4547            for fid in &frame_ids {
4548                println!("  frame_id: {}", fid);
4549            }
4550        } else {
4551            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4552            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4553        }
4554    }
4555
4556    Ok(())
4557}