memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30use memvid_core::{
31    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
32    DocMetadata, DocumentFormat, MediaManifest, Memvid, MemvidError, PutOptions,
33    ReaderHint, ReaderRegistry, Stats, Tier, TimelineQueryBuilder,
34    MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
35};
36#[cfg(feature = "clip")]
37use memvid_core::FrameRole;
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
48/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
49const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50const COLOR_PALETTE_SIZE: u8 = 5;
51const COLOR_PALETTE_QUALITY: u8 = 8;
52const MAGIC_SNIFF_BYTES: usize = 16;
53/// Maximum number of images to extract from a PDF for CLIP visual search.
54/// Set high to capture all images; processing stops when no more images found.
55#[cfg(feature = "clip")]
56const CLIP_PDF_MAX_IMAGES: usize = 100;
57#[cfg(feature = "clip")]
58const CLIP_PDF_TARGET_PX: u32 = 768;
59
60/// Truncate text for embedding to fit within OpenAI token limits.
61/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
62fn truncate_for_embedding(text: &str) -> String {
63    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
64        text.to_string()
65    } else {
66        // Find a char boundary to avoid splitting UTF-8
67        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
68        // Try to find the last complete character
69        let end = truncated
70            .char_indices()
71            .rev()
72            .next()
73            .map(|(i, c)| i + c.len_utf8())
74            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
75        text[..end].to_string()
76    }
77}
78
79fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
80    options.extra_metadata.insert(
81        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
82        runtime.provider_kind().to_string(),
83    );
84    options.extra_metadata.insert(
85        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
86        runtime.provider_model_id(),
87    );
88    options.extra_metadata.insert(
89        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
90        runtime.dimension().to_string(),
91    );
92}
93
94fn apply_embedding_identity_metadata_from_choice(
95    options: &mut PutOptions,
96    model: EmbeddingModelChoice,
97    dimension: usize,
98    model_id_override: Option<&str>,
99) {
100    let provider = match model {
101        EmbeddingModelChoice::OpenAILarge
102        | EmbeddingModelChoice::OpenAISmall
103        | EmbeddingModelChoice::OpenAIAda => "openai",
104        EmbeddingModelChoice::Nvidia => "nvidia",
105        _ => "fastembed",
106    };
107
108    let model_id = match model {
109        EmbeddingModelChoice::Nvidia => model_id_override
110            .map(str::trim)
111            .filter(|value| !value.is_empty())
112            .map(|value| value.trim_start_matches("nvidia:").trim_start_matches("nv:"))
113            .filter(|value| !value.is_empty())
114            .map(|value| {
115                if value.contains('/') {
116                    value.to_string()
117                } else {
118                    format!("nvidia/{value}")
119                }
120            })
121            .unwrap_or_else(|| model.canonical_model_id().to_string()),
122        _ => model.canonical_model_id().to_string(),
123    };
124
125    options.extra_metadata.insert(
126        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
127        provider.to_string(),
128    );
129    options.extra_metadata.insert(
130        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
131        model_id,
132    );
133    options.extra_metadata.insert(
134        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
135        dimension.to_string(),
136    );
137}
138
139/// Get the default local model path for contextual retrieval (phi-3.5-mini).
140/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
141#[cfg(feature = "llama-cpp")]
142fn get_local_contextual_model_path() -> Result<PathBuf> {
143    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
144        // Handle ~ expansion manually
145        let expanded = if custom.starts_with("~/") {
146            if let Ok(home) = env::var("HOME") {
147                PathBuf::from(home).join(&custom[2..])
148            } else {
149                PathBuf::from(&custom)
150            }
151        } else {
152            PathBuf::from(&custom)
153        };
154        expanded
155    } else {
156        // Default to ~/.memvid/models
157        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
158        PathBuf::from(home).join(".memvid").join("models")
159    };
160
161    let model_path = models_dir
162        .join("llm")
163        .join("phi-3-mini-q4")
164        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
165
166    if model_path.exists() {
167        Ok(model_path)
168    } else {
169        Err(anyhow!(
170            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
171            model_path.display()
172        ))
173    }
174}
175
176use pathdiff::diff_paths;
177
178use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
179use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
180use crate::config::{load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime};
181use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
182use crate::error::{CapacityExceededMessage, DuplicateUriError};
183use crate::utils::{
184    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
185    select_frame,
186};
187#[cfg(feature = "temporal_enrich")]
188use memvid_core::enrich_chunks as temporal_enrich_chunks;
189
190/// Helper function to parse boolean flags from environment variables
191fn parse_bool_flag(value: &str) -> Option<bool> {
192    match value.trim().to_ascii_lowercase().as_str() {
193        "1" | "true" | "yes" | "on" => Some(true),
194        "0" | "false" | "no" | "off" => Some(false),
195        _ => None,
196    }
197}
198
199/// Helper function to check for parallel segments environment override
200#[cfg(feature = "parallel_segments")]
201fn parallel_env_override() -> Option<bool> {
202    std::env::var("MEMVID_PARALLEL_SEGMENTS")
203        .ok()
204        .and_then(|value| parse_bool_flag(&value))
205}
206
207/// Check if CLIP model files are installed
208#[cfg(feature = "clip")]
209fn is_clip_model_installed() -> bool {
210    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
211        PathBuf::from(custom)
212    } else if let Ok(home) = env::var("HOME") {
213        PathBuf::from(home).join(".memvid/models")
214    } else {
215        PathBuf::from(".memvid/models")
216    };
217
218    let model_name =
219        env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
220
221    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
222    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
223
224    vision_model.exists() && text_model.exists()
225}
226
227/// Minimum confidence threshold for NER entities (0.0-1.0)
228/// Note: Lower threshold captures more entities but also more noise
229#[cfg(feature = "logic_mesh")]
230const NER_MIN_CONFIDENCE: f32 = 0.50;
231
232/// Minimum length for entity names (characters)
233#[cfg(feature = "logic_mesh")]
234const NER_MIN_ENTITY_LEN: usize = 2;
235
236/// Maximum gap (in characters) between adjacent entities to merge
237/// Allows merging "S" + "&" + "P Global" when they're close together
238#[cfg(feature = "logic_mesh")]
239const MAX_MERGE_GAP: usize = 3;
240
241/// Merge adjacent NER entities that appear to be tokenization artifacts.
242///
243/// The BERT tokenizer splits names at special characters like `&`, producing
244/// separate entities like "S", "&", "P Global" instead of "S&P Global".
245/// This function merges adjacent entities of the same type when they're
246/// contiguous or nearly contiguous in the original text.
247#[cfg(feature = "logic_mesh")]
248fn merge_adjacent_entities(
249    entities: Vec<memvid_core::ExtractedEntity>,
250    original_text: &str,
251) -> Vec<memvid_core::ExtractedEntity> {
252    use memvid_core::ExtractedEntity;
253
254    if entities.is_empty() {
255        return entities;
256    }
257
258    // Sort by byte position
259    let mut sorted: Vec<ExtractedEntity> = entities;
260    sorted.sort_by_key(|e| e.byte_start);
261
262    let mut merged: Vec<ExtractedEntity> = Vec::new();
263
264    for entity in sorted {
265        if let Some(last) = merged.last_mut() {
266            // Check if this entity is adjacent to the previous one and same type
267            let gap = entity.byte_start.saturating_sub(last.byte_end);
268            let same_type = last.entity_type == entity.entity_type;
269
270            // Check what's in the gap (if any)
271            let gap_is_mergeable = if gap == 0 {
272                true
273            } else if gap <= MAX_MERGE_GAP && entity.byte_start <= original_text.len() && last.byte_end <= original_text.len() {
274                // Gap content should be whitespace (but not newlines), punctuation, or connectors
275                let gap_text = &original_text[last.byte_end..entity.byte_start];
276                // Don't merge across newlines - that indicates separate entities
277                if gap_text.contains('\n') || gap_text.contains('\r') {
278                    false
279                } else {
280                    gap_text.chars().all(|c| c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.')
281                }
282            } else {
283                false
284            };
285
286            if same_type && gap_is_mergeable {
287                // Merge: extend the previous entity to include this one
288                let merged_text = if entity.byte_end <= original_text.len() {
289                    original_text[last.byte_start..entity.byte_end].to_string()
290                } else {
291                    format!("{}{}", last.text, entity.text)
292                };
293
294                tracing::debug!(
295                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
296                    last.text, entity.text, merged_text, gap
297                );
298
299                last.text = merged_text;
300                last.byte_end = entity.byte_end;
301                // Average the confidence
302                last.confidence = (last.confidence + entity.confidence) / 2.0;
303                continue;
304            }
305        }
306
307        merged.push(entity);
308    }
309
310    merged
311}
312
313/// Filter and clean extracted NER entities to remove noise.
314/// Returns true if the entity should be kept.
315#[cfg(feature = "logic_mesh")]
316fn is_valid_entity(text: &str, confidence: f32) -> bool {
317    // Filter by confidence
318    if confidence < NER_MIN_CONFIDENCE {
319        return false;
320    }
321
322    let trimmed = text.trim();
323
324    // Filter by minimum length
325    if trimmed.len() < NER_MIN_ENTITY_LEN {
326        return false;
327    }
328
329    // Filter out entities that are just whitespace or punctuation
330    if trimmed.chars().all(|c| c.is_whitespace() || c.is_ascii_punctuation()) {
331        return false;
332    }
333
334    // Filter out entities containing newlines (malformed extractions)
335    if trimmed.contains('\n') || trimmed.contains('\r') {
336        return false;
337    }
338
339    // Filter out entities that look like partial words (start/end with ##)
340    if trimmed.starts_with("##") || trimmed.ends_with("##") {
341        return false;
342    }
343
344    // Filter out entities ending with period (likely sentence fragments)
345    if trimmed.ends_with('.') {
346        return false;
347    }
348
349    let lower = trimmed.to_lowercase();
350
351    // Filter out common single-letter false positives
352    if trimmed.len() == 1 {
353        return false;
354    }
355
356    // Filter out 2-letter tokens that are common noise
357    if trimmed.len() == 2 {
358        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
359        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
360            return true;
361        }
362        // Filter other 2-letter tokens
363        return false;
364    }
365
366    // Filter out common noise words
367    if matches!(lower.as_str(), "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc") {
368        return false;
369    }
370
371    // Filter out partial tokenization artifacts
372    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
373    let words: Vec<&str> = trimmed.split_whitespace().collect();
374    if words.len() >= 2 {
375        let first_word = words[0];
376        // If first word is 1-2 chars and looks like a fragment, filter it
377        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
378            // But allow "US Bank", "UK Office", etc.
379            if !matches!(first_word.to_lowercase().as_str(), "us" | "uk" | "eu" | "un") {
380                return false;
381            }
382        }
383    }
384
385    // Filter entities that start with lowercase (likely partial words)
386    if let Some(first_char) = trimmed.chars().next() {
387        if first_char.is_lowercase() {
388            return false;
389        }
390    }
391
392    true
393}
394
395/// Parse key=value pairs for tags
396pub fn parse_key_val(s: &str) -> Result<(String, String)> {
397    let (key, value) = s
398        .split_once('=')
399        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
400    Ok((key.to_string(), value.to_string()))
401}
402
403/// Lock-related CLI arguments (shared across commands)
404#[derive(Args, Clone, Copy, Debug)]
405pub struct LockCliArgs {
406    /// Maximum time to wait for an active writer before failing (ms)
407    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
408    pub lock_timeout: u64,
409    /// Attempt a stale takeover if the recorded writer heartbeat has expired
410    #[arg(long = "force", action = ArgAction::SetTrue)]
411    pub force: bool,
412}
413
414/// Arguments for the `put` subcommand
415#[derive(Args)]
416pub struct PutArgs {
417    /// Path to the memory file to append to
418    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
419    pub file: PathBuf,
420    /// Emit JSON instead of human-readable output
421    #[arg(long)]
422    pub json: bool,
423    /// Read payload bytes from a file instead of stdin
424    #[arg(long, value_name = "PATH")]
425    pub input: Option<String>,
426    /// Override the derived URI for the document
427    #[arg(long, value_name = "URI")]
428    pub uri: Option<String>,
429    /// Override the derived title for the document
430    #[arg(long, value_name = "TITLE")]
431    pub title: Option<String>,
432    /// Optional POSIX timestamp to store on the frame
433    #[arg(long)]
434    pub timestamp: Option<i64>,
435    /// Track name for the frame
436    #[arg(long)]
437    pub track: Option<String>,
438    /// Kind metadata for the frame
439    #[arg(long)]
440    pub kind: Option<String>,
441    /// Store the payload as a binary video without transcoding
442    #[arg(long, action = ArgAction::SetTrue)]
443    pub video: bool,
444    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
445    #[arg(long, action = ArgAction::SetTrue)]
446    pub transcript: bool,
447    /// Tags to attach in key=value form
448    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
449    pub tags: Vec<(String, String)>,
450    /// Free-form labels to attach to the frame
451    #[arg(long = "label", value_name = "LABEL")]
452    pub labels: Vec<String>,
453    /// Additional metadata payloads expressed as JSON
454    #[arg(long = "metadata", value_name = "JSON")]
455    pub metadata: Vec<String>,
456    /// Disable automatic tag generation from extracted text
457    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
458    pub no_auto_tag: bool,
459    /// Disable automatic date extraction from content
460    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
461    pub no_extract_dates: bool,
462    /// Force audio analysis and tagging when ingesting binary data
463    #[arg(long, action = ArgAction::SetTrue)]
464    pub audio: bool,
465    /// Transcribe audio using Whisper and use the transcript for text embedding
466    /// Requires the 'whisper' feature to be enabled
467    #[arg(long, action = ArgAction::SetTrue)]
468    pub transcribe: bool,
469    /// Override the default segment duration (in seconds) when generating audio snippets
470    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
471    pub audio_segment_seconds: Option<u32>,
472    /// Compute semantic embeddings using the installed model
473    /// Increases storage overhead from ~5KB to ~270KB per document
474    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
475    pub embedding: bool,
476    /// Explicitly disable embeddings (default, but kept for clarity)
477    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
478    pub no_embedding: bool,
479    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
480    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
481    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
482    pub embedding_vec: Option<PathBuf>,
483    /// Embedding model identity for the provided `--embedding-vec` vector.
484    ///
485    /// This is written into per-frame `extra_metadata` (`memvid.embedding.*`) so that
486    /// semantic queries can auto-select the correct runtime across CLI/SDKs.
487    ///
488    /// Options: bge-small, bge-base, nomic, gte-large, openai, openai-small, openai-ada, nvidia
489    /// (also accepts canonical IDs like `text-embedding-3-small` and `BAAI/bge-small-en-v1.5`).
490    #[arg(long = "embedding-vec-model", value_name = "EMB_MODEL", requires = "embedding_vec")]
491    pub embedding_vec_model: Option<String>,
492    /// Enable Product Quantization compression for vectors (16x compression)
493    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
494    /// Only applies when --embedding is enabled
495    #[arg(long, action = ArgAction::SetTrue)]
496    pub vector_compression: bool,
497    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
498    /// This improves retrieval accuracy for preference/personalization queries
499    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
500    #[arg(long, action = ArgAction::SetTrue)]
501    pub contextual: bool,
502    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
503    #[arg(long = "contextual-model", value_name = "MODEL")]
504    pub contextual_model: Option<String>,
505    /// Automatically extract tables from PDF documents
506    /// Uses aggressive mode with fallbacks for best results
507    #[arg(long, action = ArgAction::SetTrue)]
508    pub tables: bool,
509    /// Disable automatic table extraction (when --tables is the default)
510    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
511    pub no_tables: bool,
512    /// Enable CLIP visual embeddings for images and PDF pages
513    /// Allows text-to-image search using natural language queries
514    /// Auto-enabled when CLIP model is installed; use --no-clip to disable
515    #[cfg(feature = "clip")]
516    #[arg(long, action = ArgAction::SetTrue)]
517    pub clip: bool,
518
519    /// Disable CLIP visual embeddings even when model is available
520    #[cfg(feature = "clip")]
521    #[arg(long, action = ArgAction::SetTrue)]
522    pub no_clip: bool,
523
524    /// Replace any existing frame with the same URI instead of inserting a duplicate
525    #[arg(long, conflicts_with = "allow_duplicate")]
526    pub update_existing: bool,
527    /// Allow inserting a new frame even if a frame with the same URI already exists
528    #[arg(long)]
529    pub allow_duplicate: bool,
530
531    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
532    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
533    /// Requires the temporal_enrich feature in memvid-core
534    #[arg(long, action = ArgAction::SetTrue)]
535    pub temporal_enrich: bool,
536
537    /// Build Logic-Mesh entity graph during ingestion (opt-in)
538    /// Extracts entities (people, organizations, locations, etc.) and their relationships
539    /// Enables graph traversal with `memvid follow`
540    /// Requires NER model: `memvid models install --ner distilbert-ner`
541    #[arg(long, action = ArgAction::SetTrue)]
542    pub logic_mesh: bool,
543
544    /// Use the experimental parallel segment builder (requires --features parallel_segments)
545    #[cfg(feature = "parallel_segments")]
546    #[arg(long, action = ArgAction::SetTrue)]
547    pub parallel_segments: bool,
548    /// Force the legacy ingestion path even when the parallel feature is available
549    #[cfg(feature = "parallel_segments")]
550    #[arg(long, action = ArgAction::SetTrue)]
551    pub no_parallel_segments: bool,
552    /// Target tokens per segment when using the parallel builder
553    #[cfg(feature = "parallel_segments")]
554    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
555    pub parallel_segment_tokens: Option<usize>,
556    /// Target pages per segment when using the parallel builder
557    #[cfg(feature = "parallel_segments")]
558    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
559    pub parallel_segment_pages: Option<usize>,
560    /// Worker threads used by the parallel builder
561    #[cfg(feature = "parallel_segments")]
562    #[arg(long = "parallel-threads", value_name = "N")]
563    pub parallel_threads: Option<usize>,
564    /// Worker queue depth used by the parallel builder
565    #[cfg(feature = "parallel_segments")]
566    #[arg(long = "parallel-queue-depth", value_name = "N")]
567    pub parallel_queue_depth: Option<usize>,
568
569    #[command(flatten)]
570    pub lock: LockCliArgs,
571}
572
573#[cfg(feature = "parallel_segments")]
574impl PutArgs {
575    pub fn wants_parallel(&self) -> bool {
576        if self.parallel_segments {
577            return true;
578        }
579        if self.no_parallel_segments {
580            return false;
581        }
582        if let Some(env_flag) = parallel_env_override() {
583            return env_flag;
584        }
585        false
586    }
587
588    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
589        let mut opts = memvid_core::BuildOpts::default();
590        if let Some(tokens) = self.parallel_segment_tokens {
591            opts.segment_tokens = tokens;
592        }
593        if let Some(pages) = self.parallel_segment_pages {
594            opts.segment_pages = pages;
595        }
596        if let Some(threads) = self.parallel_threads {
597            opts.threads = threads;
598        }
599        if let Some(depth) = self.parallel_queue_depth {
600            opts.queue_depth = depth;
601        }
602        opts.sanitize();
603        opts
604    }
605}
606
607#[cfg(not(feature = "parallel_segments"))]
608impl PutArgs {
609    pub fn wants_parallel(&self) -> bool {
610        false
611    }
612}
613
614/// Arguments for the `api-fetch` subcommand
615#[derive(Args)]
616pub struct ApiFetchArgs {
617    /// Path to the memory file to ingest into
618    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
619    pub file: PathBuf,
620    /// Path to the fetch configuration JSON file
621    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
622    pub config: PathBuf,
623    /// Perform a dry run without writing to the memory
624    #[arg(long, action = ArgAction::SetTrue)]
625    pub dry_run: bool,
626    /// Override the configured ingest mode
627    #[arg(long, value_name = "MODE")]
628    pub mode: Option<ApiFetchMode>,
629    /// Override the base URI used when constructing frame URIs
630    #[arg(long, value_name = "URI")]
631    pub uri: Option<String>,
632    /// Emit JSON instead of human-readable output
633    #[arg(long, action = ArgAction::SetTrue)]
634    pub json: bool,
635
636    #[command(flatten)]
637    pub lock: LockCliArgs,
638}
639
640/// Arguments for the `update` subcommand
641#[derive(Args)]
642pub struct UpdateArgs {
643    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
644    pub file: PathBuf,
645    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
646    pub frame_id: Option<u64>,
647    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
648    pub uri: Option<String>,
649    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
650    pub input: Option<PathBuf>,
651    #[arg(long = "set-uri", value_name = "URI")]
652    pub set_uri: Option<String>,
653    #[arg(long, value_name = "TITLE")]
654    pub title: Option<String>,
655    #[arg(long, value_name = "TIMESTAMP")]
656    pub timestamp: Option<i64>,
657    #[arg(long, value_name = "TRACK")]
658    pub track: Option<String>,
659    #[arg(long, value_name = "KIND")]
660    pub kind: Option<String>,
661    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
662    pub tags: Vec<(String, String)>,
663    #[arg(long = "label", value_name = "LABEL")]
664    pub labels: Vec<String>,
665    #[arg(long = "metadata", value_name = "JSON")]
666    pub metadata: Vec<String>,
667    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
668    pub embeddings: bool,
669    #[arg(long)]
670    pub json: bool,
671
672    #[command(flatten)]
673    pub lock: LockCliArgs,
674}
675
676/// Arguments for the `delete` subcommand
677#[derive(Args)]
678pub struct DeleteArgs {
679    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
680    pub file: PathBuf,
681    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
682    pub frame_id: Option<u64>,
683    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
684    pub uri: Option<String>,
685    #[arg(long, action = ArgAction::SetTrue)]
686    pub yes: bool,
687    #[arg(long)]
688    pub json: bool,
689
690    #[command(flatten)]
691    pub lock: LockCliArgs,
692}
693
694/// Handler for `memvid api-fetch`
695pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
696    let command = ApiFetchCommand {
697        file: args.file,
698        config_path: args.config,
699        dry_run: args.dry_run,
700        mode_override: args.mode,
701        uri_override: args.uri,
702        output_json: args.json,
703        lock_timeout_ms: args.lock.lock_timeout,
704        force_lock: args.lock.force,
705    };
706    run_api_fetch(config, command)
707}
708
709/// Handler for `memvid delete`
710pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
711    let mut mem = Memvid::open(&args.file)?;
712    ensure_cli_mutation_allowed(&mem)?;
713    apply_lock_cli(&mut mem, &args.lock);
714    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
715
716    if !args.yes {
717        if let Some(uri) = &frame.uri {
718            println!("About to delete frame {} ({uri})", frame.id);
719        } else {
720            println!("About to delete frame {}", frame.id);
721        }
722        print!("Confirm? [y/N]: ");
723        io::stdout().flush()?;
724        let mut input = String::new();
725        io::stdin().read_line(&mut input)?;
726        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
727        if !confirmed {
728            println!("Aborted");
729            return Ok(());
730        }
731    }
732
733    let seq = mem.delete_frame(frame.id)?;
734    mem.commit()?;
735    let deleted = mem.frame_by_id(frame.id)?;
736
737    if args.json {
738        let json = json!({
739            "frame_id": frame.id,
740            "sequence": seq,
741            "status": frame_status_str(deleted.status),
742        });
743        println!("{}", serde_json::to_string_pretty(&json)?);
744    } else {
745        println!("Deleted frame {} (seq {})", frame.id, seq);
746    }
747
748    Ok(())
749}
750pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
751    let mut mem = Memvid::open(&args.file)?;
752    ensure_cli_mutation_allowed(&mem)?;
753    apply_lock_cli(&mut mem, &args.lock);
754
755    // Load active replay session if one exists
756    #[cfg(feature = "replay")]
757    let _ = mem.load_active_session();
758    let mut stats = mem.stats()?;
759    if stats.frame_count == 0 && !stats.has_lex_index {
760        mem.enable_lex()?;
761        stats = mem.stats()?;
762    }
763
764    // Set vector compression mode if requested
765    if args.vector_compression {
766        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
767    }
768
769    let mut capacity_guard = CapacityGuard::from_stats(&stats);
770    let use_parallel = args.wants_parallel();
771    #[cfg(feature = "parallel_segments")]
772    let mut parallel_opts = if use_parallel {
773        Some(args.sanitized_parallel_opts())
774    } else {
775        None
776    };
777    #[cfg(feature = "parallel_segments")]
778    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
779    #[cfg(feature = "parallel_segments")]
780    let mut pending_parallel_indices: Vec<usize> = Vec::new();
781    let input_set = resolve_inputs(args.input.as_deref())?;
782    if args.embedding && args.embedding_vec.is_some() {
783        anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
784    }
785    if args.embedding_vec.is_some() {
786        match &input_set {
787            InputSet::Files(paths) if paths.len() != 1 => {
788                anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
789            }
790            _ => {}
791        }
792    }
793
794    if args.video {
795        if let Some(kind) = &args.kind {
796            if !kind.eq_ignore_ascii_case("video") {
797                anyhow::bail!("--video conflicts with explicit --kind={kind}");
798            }
799        }
800        if matches!(input_set, InputSet::Stdin) {
801            anyhow::bail!("--video requires --input <file>");
802        }
803    }
804
805    #[allow(dead_code)]
806    enum EmbeddingMode {
807        None,
808        Auto,
809        Explicit,
810    }
811
812    // PHASE 1: Make embeddings opt-in, not opt-out
813    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
814    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
815    let mv2_dimension = mem.effective_vec_index_dimension()?;
816    let inferred_embedding_model = if args.embedding {
817        match mem.embedding_identity_summary(10_000) {
818            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
819            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
820                let models: Vec<_> = identities
821                    .iter()
822                    .filter_map(|entry| entry.identity.model.as_deref())
823                    .collect();
824                anyhow::bail!(
825                    "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
826                    Detected models: {:?}\n\n\
827                    Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
828                    models
829                );
830            }
831            memvid_core::EmbeddingIdentitySummary::Unknown => None,
832        }
833    } else {
834        None
835    };
836    let (embedding_mode, embedding_runtime) = if args.embedding {
837        (
838            EmbeddingMode::Explicit,
839            Some(load_embedding_runtime_for_mv2(
840                config,
841                inferred_embedding_model.as_deref(),
842                mv2_dimension,
843            )?),
844        )
845    } else if args.embedding_vec.is_some() {
846        (EmbeddingMode::Explicit, None)
847    } else {
848        (EmbeddingMode::None, None)
849    };
850    let embedding_enabled = args.embedding || args.embedding_vec.is_some();
851    let runtime_ref = embedding_runtime.as_ref();
852
853    // Enable CLIP index if:
854    // 1. --clip flag is set explicitly, OR
855    // 2. Input contains image files and model is available, OR
856    // 3. Model is installed (auto-detect) and --no-clip is not set
857    #[cfg(feature = "clip")]
858    let clip_enabled = {
859        if args.no_clip {
860            false
861        } else if args.clip {
862            true // Explicitly requested
863        } else {
864            // Auto-detect: check if model is installed
865            let model_available = is_clip_model_installed();
866            if model_available {
867                // Model is installed, check if input has images or PDFs
868                match &input_set {
869                    InputSet::Files(paths) => paths.iter().any(|p| {
870                        is_image_file(p) || p.extension().map_or(false, |ext| ext.eq_ignore_ascii_case("pdf"))
871                    }),
872                    InputSet::Stdin => false,
873                }
874            } else {
875                false
876            }
877        }
878    };
879    #[cfg(feature = "clip")]
880    let clip_model = if clip_enabled {
881        mem.enable_clip()?;
882        if !args.json {
883            let mode = if args.clip {
884                "explicit"
885            } else {
886                "auto-detected"
887            };
888            eprintln!("ℹ️  CLIP visual embeddings enabled ({mode})");
889            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
890            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
891            eprintln!("   Use --no-clip to disable auto-detection");
892            eprintln!();
893        }
894        // Initialize CLIP model for generating image embeddings
895        match memvid_core::ClipModel::default_model() {
896            Ok(model) => Some(model),
897            Err(e) => {
898                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
899                None
900            }
901        }
902    } else {
903        None
904    };
905
906    // Warn user about vector storage overhead (PHASE 2)
907    if embedding_enabled && !args.json {
908        let capacity = stats.capacity_bytes;
909        if args.vector_compression {
910            // PHASE 3: PQ compression reduces storage 16x
911            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
912            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
913            eprintln!("   Storage: ~20 KB per document (16x compressed)");
914            eprintln!("   Accuracy: ~95% recall@10 maintained");
915            eprintln!(
916                "   Capacity: ~{} documents max for {:.1} GB",
917                max_docs,
918                capacity as f64 / 1e9
919            );
920            eprintln!();
921        } else {
922            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
923            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
924            eprintln!("   Storage: ~270 KB per document");
925            eprintln!(
926                "   Capacity: ~{} documents max for {:.1} GB",
927                max_docs,
928                capacity as f64 / 1e9
929            );
930            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
931            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
932            eprintln!();
933        }
934    }
935
936    let embed_progress = if embedding_enabled {
937        let total = match &input_set {
938            InputSet::Files(paths) => Some(paths.len() as u64),
939            InputSet::Stdin => None,
940        };
941        Some(create_task_progress_bar(total, "embed", false))
942    } else {
943        None
944    };
945    let mut embedded_docs = 0usize;
946
947    if let InputSet::Files(paths) = &input_set {
948        if paths.len() > 1 {
949            if args.uri.is_some() || args.title.is_some() {
950                anyhow::bail!(
951                    "--uri/--title apply to a single file; omit them when using a directory or glob"
952                );
953            }
954        }
955    }
956
957    let mut reports = Vec::new();
958    let mut processed = 0usize;
959    let mut skipped = 0usize;
960    let mut bytes_added = 0u64;
961    let mut summary_warnings: Vec<String> = Vec::new();
962    #[cfg(feature = "clip")]
963    let mut clip_embeddings_added = false;
964
965    let mut capacity_reached = false;
966    match input_set {
967        InputSet::Stdin => {
968            processed += 1;
969            match read_payload(None) {
970                Ok(payload) => {
971                    let mut analyze_spinner = if args.json {
972                        None
973                    } else {
974                        Some(create_spinner("Analyzing stdin payload..."))
975                    };
976                    match ingest_payload(
977                        &mut mem,
978                        &args,
979                        payload,
980                        None,
981                        capacity_guard.as_mut(),
982                        embedding_enabled,
983                        runtime_ref,
984                        use_parallel,
985                    ) {
986                        Ok(outcome) => {
987                            if let Some(pb) = analyze_spinner.take() {
988                                pb.finish_and_clear();
989                            }
990                            bytes_added += outcome.report.stored_bytes as u64;
991                            if args.json {
992                                summary_warnings
993                                    .extend(outcome.report.extraction.warnings.iter().cloned());
994                            }
995                            reports.push(outcome.report);
996                            let report_index = reports.len() - 1;
997                            if !args.json && !use_parallel {
998                                if let Some(report) = reports.get(report_index) {
999                                    print_report(report);
1000                                }
1001                            }
1002                            if args.transcript {
1003                                let notice = transcript_notice_message(&mut mem)?;
1004                                if args.json {
1005                                    summary_warnings.push(notice);
1006                                } else {
1007                                    println!("{notice}");
1008                                }
1009                            }
1010                            if outcome.embedded {
1011                                if let Some(pb) = embed_progress.as_ref() {
1012                                    pb.inc(1);
1013                                }
1014                                embedded_docs += 1;
1015                            }
1016                            if use_parallel {
1017                                #[cfg(feature = "parallel_segments")]
1018                                if let Some(input) = outcome.parallel_input {
1019                                    pending_parallel_indices.push(report_index);
1020                                    pending_parallel_inputs.push(input);
1021                                }
1022                            } else if let Some(guard) = capacity_guard.as_mut() {
1023                                mem.commit()?;
1024                                let stats = mem.stats()?;
1025                                guard.update_after_commit(&stats);
1026                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1027                            }
1028                        }
1029                        Err(err) => {
1030                            if let Some(pb) = analyze_spinner.take() {
1031                                pb.finish_and_clear();
1032                            }
1033                            if err.downcast_ref::<DuplicateUriError>().is_some() {
1034                                return Err(err);
1035                            }
1036                            warn!("skipped stdin payload: {err}");
1037                            skipped += 1;
1038                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1039                                capacity_reached = true;
1040                            }
1041                        }
1042                    }
1043                }
1044                Err(err) => {
1045                    warn!("failed to read stdin: {err}");
1046                    skipped += 1;
1047                }
1048            }
1049        }
1050        InputSet::Files(ref paths) => {
1051            let mut transcript_notice_printed = false;
1052            for path in paths {
1053                processed += 1;
1054                match read_payload(Some(&path)) {
1055                    Ok(payload) => {
1056                        let label = path.display().to_string();
1057                        let mut analyze_spinner = if args.json {
1058                            None
1059                        } else {
1060                            Some(create_spinner(&format!("Analyzing {label}...")))
1061                        };
1062                        match ingest_payload(
1063                            &mut mem,
1064                            &args,
1065                            payload,
1066                            Some(&path),
1067                            capacity_guard.as_mut(),
1068                            embedding_enabled,
1069                            runtime_ref,
1070                            use_parallel,
1071                        ) {
1072                            Ok(outcome) => {
1073                                if let Some(pb) = analyze_spinner.take() {
1074                                    pb.finish_and_clear();
1075                                }
1076                                bytes_added += outcome.report.stored_bytes as u64;
1077
1078                                // Generate CLIP embedding for image files
1079                                #[cfg(feature = "clip")]
1080                                if let Some(ref model) = clip_model {
1081                                    let mime = outcome.report.mime.as_deref();
1082                                    let clip_frame_id = outcome.report.frame_id;
1083
1084                                    if is_image_file(&path) {
1085                                        match model.encode_image_file(&path) {
1086                                            Ok(embedding) => {
1087                                                if let Err(e) = mem.add_clip_embedding_with_page(
1088                                                    clip_frame_id,
1089                                                    None,
1090                                                    embedding,
1091                                                ) {
1092                                                    warn!(
1093                                                        "Failed to add CLIP embedding for {}: {e}",
1094                                                        path.display()
1095                                                    );
1096                                                } else {
1097                                                    info!(
1098                                                        "Added CLIP embedding for frame {}",
1099                                                        clip_frame_id
1100                                                    );
1101                                                    clip_embeddings_added = true;
1102                                                }
1103                                            }
1104                                            Err(e) => {
1105                                                warn!(
1106                                                    "Failed to encode CLIP embedding for {}: {e}",
1107                                                    path.display()
1108                                                );
1109                                            }
1110                                        }
1111                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1112                                        // Commit before adding child frames to ensure WAL has space
1113                                        if let Err(e) = mem.commit() {
1114                                            warn!("Failed to commit before CLIP extraction: {e}");
1115                                        }
1116
1117                                        match render_pdf_pages_for_clip(
1118                                            &path,
1119                                            CLIP_PDF_MAX_IMAGES,
1120                                            CLIP_PDF_TARGET_PX,
1121                                        ) {
1122                                            Ok(rendered_pages) => {
1123                                                use rayon::prelude::*;
1124
1125                                                // Pre-encode all images to PNG in parallel for better performance
1126                                                // Filter out junk images (solid colors, black, too small)
1127                                                let path_for_closure = path.clone();
1128
1129                                                // Temporarily suppress panic output during image encoding
1130                                                // (some PDFs have malformed images that cause panics in the image crate)
1131                                                let prev_hook = std::panic::take_hook();
1132                                                std::panic::set_hook(Box::new(|_| {
1133                                                    // Silently ignore panics - we handle them with catch_unwind
1134                                                }));
1135
1136                                                let encoded_images: Vec<_> = rendered_pages
1137                                                    .into_par_iter()
1138                                                    .filter_map(|(page_num, image)| {
1139                                                        // Check if image is worth embedding (not junk)
1140                                                        let image_info = get_image_info(&image);
1141                                                        if !image_info.should_embed() {
1142                                                            info!(
1143                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1144                                                                path_for_closure.display(),
1145                                                                page_num,
1146                                                                image_info.color_variance,
1147                                                                image_info.width,
1148                                                                image_info.height
1149                                                            );
1150                                                            return None;
1151                                                        }
1152
1153                                                        // Convert to RGB8 first to ensure consistent buffer size
1154                                                        // (some PDFs have images with alpha or other formats that cause panics)
1155                                                        // Use catch_unwind to handle any panics from corrupted image data
1156                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1157                                                            let rgb_image = image.to_rgb8();
1158                                                            let mut png_bytes = Vec::new();
1159                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1160                                                                &mut Cursor::new(&mut png_bytes),
1161                                                                image::ImageFormat::Png,
1162                                                            ).map(|()| png_bytes)
1163                                                        }));
1164
1165                                                        match encode_result {
1166                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1167                                                            Ok(Err(e)) => {
1168                                                                info!(
1169                                                                    "Skipping image for {} page {}: {e}",
1170                                                                    path_for_closure.display(),
1171                                                                    page_num
1172                                                                );
1173                                                                None
1174                                                            }
1175                                                            Err(_) => {
1176                                                                // Panic occurred - corrupted image data, silently skip
1177                                                                info!(
1178                                                                    "Skipping malformed image for {} page {}",
1179                                                                    path_for_closure.display(),
1180                                                                    page_num
1181                                                                );
1182                                                                None
1183                                                            }
1184                                                        }
1185                                                    })
1186                                                    .collect();
1187
1188                                                // Restore the original panic hook
1189                                                std::panic::set_hook(prev_hook);
1190
1191                                                // Process encoded images sequentially (storage + CLIP encoding)
1192                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1193                                                let mut child_frame_offset = 1u64;
1194                                                let parent_uri = outcome.report.uri.clone();
1195                                                let parent_title = outcome.report.title.clone().unwrap_or_else(|| "Image".to_string());
1196                                                let total_images = encoded_images.len();
1197
1198                                                // Show progress bar for CLIP extraction
1199                                                let clip_pb = if !args.json && total_images > 0 {
1200                                                    let pb = ProgressBar::new(total_images as u64);
1201                                                    pb.set_style(
1202                                                        ProgressStyle::with_template(
1203                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1204                                                        )
1205                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1206                                                        .progress_chars("█▓░"),
1207                                                    );
1208                                                    Some(pb)
1209                                                } else {
1210                                                    None
1211                                                };
1212
1213                                                for (page_num, image, png_bytes) in encoded_images {
1214                                                    let child_uri = format!("{}/image/{}", parent_uri, child_frame_offset);
1215                                                    let child_title = format!(
1216                                                        "{} - Image {} (page {})",
1217                                                        parent_title,
1218                                                        child_frame_offset,
1219                                                        page_num
1220                                                    );
1221
1222                                                    let child_options = PutOptions::builder()
1223                                                        .uri(&child_uri)
1224                                                        .title(&child_title)
1225                                                        .parent_id(clip_frame_id)
1226                                                        .role(FrameRole::ExtractedImage)
1227                                                        .auto_tag(false)
1228                                                        .extract_dates(false)
1229                                                        .build();
1230
1231                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1232                                                        Ok(_child_seq) => {
1233                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1234                                                            child_frame_offset += 1;
1235
1236                                                            match model.encode_image(&image) {
1237                                                                Ok(embedding) => {
1238                                                                    if let Err(e) = mem
1239                                                                        .add_clip_embedding_with_page(
1240                                                                            child_frame_id,
1241                                                                            Some(page_num),
1242                                                                            embedding,
1243                                                                        )
1244                                                                    {
1245                                                                        warn!(
1246                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1247                                                                            path.display(),
1248                                                                            page_num
1249                                                                        );
1250                                                                    } else {
1251                                                                        info!(
1252                                                                            "Added CLIP image frame {} for {} page {}",
1253                                                                            child_frame_id,
1254                                                                            clip_frame_id,
1255                                                                            page_num
1256                                                                        );
1257                                                                        clip_embeddings_added = true;
1258                                                                    }
1259                                                                }
1260                                                                Err(e) => warn!(
1261                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1262                                                                    path.display(),
1263                                                                    page_num
1264                                                                ),
1265                                                            }
1266
1267                                                            // Commit after each image to checkpoint WAL
1268                                                            if let Err(e) = mem.commit() {
1269                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1270                                                            }
1271                                                        }
1272                                                        Err(e) => warn!(
1273                                                            "Failed to store image frame for {} page {}: {e}",
1274                                                            path.display(),
1275                                                            page_num
1276                                                        ),
1277                                                    }
1278
1279                                                    if let Some(ref pb) = clip_pb {
1280                                                        pb.inc(1);
1281                                                    }
1282                                                }
1283
1284                                                if let Some(pb) = clip_pb {
1285                                                    pb.finish_and_clear();
1286                                                }
1287                                            }
1288                                            Err(err) => warn!(
1289                                                "Failed to render PDF pages for CLIP {}: {err}",
1290                                                path.display()
1291                                            ),
1292                                        }
1293                                    }
1294                                }
1295
1296                                if args.json {
1297                                    summary_warnings
1298                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1299                                }
1300                                reports.push(outcome.report);
1301                                let report_index = reports.len() - 1;
1302                                if !args.json && !use_parallel {
1303                                    if let Some(report) = reports.get(report_index) {
1304                                        print_report(report);
1305                                    }
1306                                }
1307                                if args.transcript && !transcript_notice_printed {
1308                                    let notice = transcript_notice_message(&mut mem)?;
1309                                    if args.json {
1310                                        summary_warnings.push(notice);
1311                                    } else {
1312                                        println!("{notice}");
1313                                    }
1314                                    transcript_notice_printed = true;
1315                                }
1316                                if outcome.embedded {
1317                                    if let Some(pb) = embed_progress.as_ref() {
1318                                        pb.inc(1);
1319                                    }
1320                                    embedded_docs += 1;
1321                                }
1322                                if use_parallel {
1323                                    #[cfg(feature = "parallel_segments")]
1324                                    if let Some(input) = outcome.parallel_input {
1325                                        pending_parallel_indices.push(report_index);
1326                                        pending_parallel_inputs.push(input);
1327                                    }
1328                                } else if let Some(guard) = capacity_guard.as_mut() {
1329                                    mem.commit()?;
1330                                    let stats = mem.stats()?;
1331                                    guard.update_after_commit(&stats);
1332                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1333                                }
1334                            }
1335                            Err(err) => {
1336                                if let Some(pb) = analyze_spinner.take() {
1337                                    pb.finish_and_clear();
1338                                }
1339                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1340                                    return Err(err);
1341                                }
1342                                warn!("skipped {}: {err}", path.display());
1343                                skipped += 1;
1344                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1345                                    capacity_reached = true;
1346                                }
1347                            }
1348                        }
1349                    }
1350                    Err(err) => {
1351                        warn!("failed to read {}: {err}", path.display());
1352                        skipped += 1;
1353                    }
1354                }
1355                if capacity_reached {
1356                    break;
1357                }
1358            }
1359        }
1360    }
1361
1362    if capacity_reached {
1363        warn!("capacity limit reached; remaining inputs were not processed");
1364    }
1365
1366    if let Some(pb) = embed_progress.as_ref() {
1367        pb.finish_with_message("embed");
1368    }
1369
1370    if let Some(runtime) = embedding_runtime.as_ref() {
1371        if embedded_docs > 0 {
1372            match embedding_mode {
1373                EmbeddingMode::Explicit => println!(
1374                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1375                    embedded_docs,
1376                    runtime.dimension()
1377                ),
1378                EmbeddingMode::Auto => println!(
1379                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1380                    embedded_docs,
1381                    runtime.dimension()
1382                ),
1383                EmbeddingMode::None => {}
1384            }
1385        } else {
1386            match embedding_mode {
1387                EmbeddingMode::Explicit => {
1388                    println!("No embeddings were generated (no textual content available)");
1389                }
1390                EmbeddingMode::Auto => {
1391                    println!(
1392                        "Semantic runtime available, but no textual content produced embeddings"
1393                    );
1394                }
1395                EmbeddingMode::None => {}
1396            }
1397        }
1398    }
1399
1400    if reports.is_empty() {
1401        if args.json {
1402            if capacity_reached {
1403                summary_warnings.push("capacity_limit_reached".to_string());
1404            }
1405            let summary_json = json!({
1406                "processed": processed,
1407                "ingested": 0,
1408                "skipped": skipped,
1409                "bytes_added": bytes_added,
1410                "bytes_added_human": format_bytes(bytes_added),
1411                "embedded_documents": embedded_docs,
1412                "capacity_reached": capacity_reached,
1413                "warnings": summary_warnings,
1414                "reports": Vec::<serde_json::Value>::new(),
1415            });
1416            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1417        } else {
1418            println!(
1419                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1420                processed, skipped
1421            );
1422        }
1423        return Ok(());
1424    }
1425
1426    #[cfg(feature = "parallel_segments")]
1427    let mut parallel_flush_spinner = if use_parallel && !args.json {
1428        Some(create_spinner("Flushing parallel segments..."))
1429    } else {
1430        None
1431    };
1432
1433    if use_parallel {
1434        #[cfg(feature = "parallel_segments")]
1435        {
1436            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1437                let mut opts = BuildOpts::default();
1438                opts.sanitize();
1439                opts
1440            });
1441            // Set vector compression mode from mem state
1442            opts.vec_compression = mem.vector_compression().clone();
1443            let seqs = if pending_parallel_inputs.is_empty() {
1444                mem.commit_parallel(opts)?;
1445                Vec::new()
1446            } else {
1447                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1448            };
1449            if let Some(pb) = parallel_flush_spinner.take() {
1450                pb.finish_with_message("Flushed parallel segments");
1451            }
1452            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1453                if let Some(report) = reports.get_mut(idx) {
1454                    report.seq = seq;
1455                }
1456            }
1457        }
1458        #[cfg(not(feature = "parallel_segments"))]
1459        {
1460            mem.commit()?;
1461        }
1462    } else {
1463        mem.commit()?;
1464    }
1465
1466    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1467    #[cfg(feature = "clip")]
1468    if clip_embeddings_added {
1469        mem.commit()?;
1470    }
1471
1472    if !args.json && use_parallel {
1473        for report in &reports {
1474            print_report(report);
1475        }
1476    }
1477
1478    // Table extraction for PDF files
1479    let mut tables_extracted = 0usize;
1480    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1481    if args.tables && !args.no_tables {
1482        if let InputSet::Files(ref paths) = input_set {
1483            let pdf_paths: Vec<_> = paths
1484                .iter()
1485                .filter(|p| {
1486                    p.extension()
1487                        .and_then(|e| e.to_str())
1488                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1489                        .unwrap_or(false)
1490                })
1491                .collect();
1492
1493            if !pdf_paths.is_empty() {
1494                let table_spinner = if args.json {
1495                    None
1496                } else {
1497                    Some(create_spinner(&format!(
1498                        "Extracting tables from {} PDF(s)...",
1499                        pdf_paths.len()
1500                    )))
1501                };
1502
1503                // Use aggressive mode with auto fallback for best results
1504                let table_options = TableExtractionOptions::builder()
1505                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1506                    .min_rows(2)
1507                    .min_cols(2)
1508                    .min_quality(memvid_core::table::TableQuality::Low)
1509                    .merge_multi_page(true)
1510                    .build();
1511
1512                for pdf_path in pdf_paths {
1513                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1514                        let filename = pdf_path
1515                            .file_name()
1516                            .and_then(|s| s.to_str())
1517                            .unwrap_or("unknown.pdf");
1518
1519                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1520                            for table in &result.tables {
1521                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1522                                {
1523                                    tables_extracted += 1;
1524                                    table_summaries.push(json!({
1525                                        "table_id": table.table_id,
1526                                        "source_file": table.source_file,
1527                                        "meta_frame_id": meta_id,
1528                                        "rows": table.n_rows,
1529                                        "cols": table.n_cols,
1530                                        "quality": format!("{:?}", table.quality),
1531                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1532                                    }));
1533                                }
1534                            }
1535                        }
1536                    }
1537                }
1538
1539                if let Some(pb) = table_spinner {
1540                    if tables_extracted > 0 {
1541                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1542                    } else {
1543                        pb.finish_with_message("No tables found");
1544                    }
1545                }
1546
1547                // Commit table frames
1548                if tables_extracted > 0 {
1549                    mem.commit()?;
1550                }
1551            }
1552        }
1553    }
1554
1555    // Logic-Mesh: Extract entities and build relationship graph
1556    // Opt-in only: requires --logic-mesh flag
1557    #[cfg(feature = "logic_mesh")]
1558    let mut logic_mesh_entities = 0usize;
1559    #[cfg(feature = "logic_mesh")]
1560    {
1561        use memvid_core::{ner_model_path, ner_tokenizer_path, NerModel, LogicMesh, MeshNode};
1562
1563        let models_dir = config.models_dir.clone();
1564        let model_path = ner_model_path(&models_dir);
1565        let tokenizer_path = ner_tokenizer_path(&models_dir);
1566
1567        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1568        let ner_available = model_path.exists() && tokenizer_path.exists();
1569        let should_run_ner = args.logic_mesh;
1570
1571        if should_run_ner && !ner_available {
1572            // User explicitly requested --logic-mesh but model not installed
1573            if args.json {
1574                summary_warnings.push(
1575                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1576                );
1577            } else {
1578                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1579                eprintln!("   Run: memvid models install --ner distilbert-ner");
1580            }
1581        } else if should_run_ner {
1582            let ner_spinner = if args.json {
1583                None
1584            } else {
1585                Some(create_spinner("Building Logic-Mesh entity graph..."))
1586            };
1587
1588            match NerModel::load(&model_path, &tokenizer_path, None) {
1589                Ok(mut ner_model) => {
1590                    let mut mesh = LogicMesh::new();
1591                    let mut filtered_count = 0usize;
1592
1593                    // Process each ingested frame for entity extraction using cached search_text
1594                    for report in &reports {
1595                        let frame_id = report.frame_id;
1596                        // Use the cached search_text from ingestion
1597                        if let Some(ref content) = report.search_text {
1598                            if !content.is_empty() {
1599                                match ner_model.extract(content) {
1600                                    Ok(raw_entities) => {
1601                                        // Merge adjacent entities that were split by tokenization
1602                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1603                                        let merged_entities = merge_adjacent_entities(raw_entities, content);
1604
1605                                        for entity in &merged_entities {
1606                                            // Apply post-processing filter to remove noisy entities
1607                                            if !is_valid_entity(&entity.text, entity.confidence) {
1608                                                tracing::debug!(
1609                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1610                                                    entity.text, entity.confidence * 100.0
1611                                                );
1612                                                filtered_count += 1;
1613                                                continue;
1614                                            }
1615
1616                                            // Convert NER entity type to EntityKind
1617                                            let kind = entity.to_entity_kind();
1618                                            // Create mesh node with proper structure
1619                                            let node = MeshNode::new(
1620                                                entity.text.to_lowercase(),
1621                                                entity.text.trim().to_string(),
1622                                                kind,
1623                                                entity.confidence,
1624                                                frame_id,
1625                                                entity.byte_start as u32,
1626                                                (entity.byte_end - entity.byte_start).min(u16::MAX as usize) as u16,
1627                                            );
1628                                            mesh.merge_node(node);
1629                                            logic_mesh_entities += 1;
1630                                        }
1631                                    }
1632                                    Err(e) => {
1633                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1634                                    }
1635                                }
1636                            }
1637                        }
1638                    }
1639
1640                    if logic_mesh_entities > 0 {
1641                        mesh.finalize();
1642                        let node_count = mesh.stats().node_count;
1643                        // Persist mesh to MV2 file
1644                        mem.set_logic_mesh(mesh);
1645                        mem.commit()?;
1646                        info!(
1647                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1648                            logic_mesh_entities,
1649                            node_count,
1650                            filtered_count
1651                        );
1652                    }
1653
1654                    if let Some(pb) = ner_spinner {
1655                        pb.finish_with_message(format!(
1656                            "Logic-Mesh: {} entities ({} unique)",
1657                            logic_mesh_entities,
1658                            mem.mesh_node_count()
1659                        ));
1660                    }
1661                }
1662                Err(e) => {
1663                    if let Some(pb) = ner_spinner {
1664                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1665                    }
1666                    if args.json {
1667                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1668                    }
1669                }
1670            }
1671        }
1672    }
1673
1674    let stats = mem.stats()?;
1675    if args.json {
1676        if capacity_reached {
1677            summary_warnings.push("capacity_limit_reached".to_string());
1678        }
1679        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1680        let total_duration_ms: Option<u64> = {
1681            let sum: u64 = reports
1682                .iter()
1683                .filter_map(|r| r.extraction.duration_ms)
1684                .sum();
1685            if sum > 0 {
1686                Some(sum)
1687            } else {
1688                None
1689            }
1690        };
1691        let total_pages: Option<u32> = {
1692            let sum: u32 = reports
1693                .iter()
1694                .filter_map(|r| r.extraction.pages_processed)
1695                .sum();
1696            if sum > 0 {
1697                Some(sum)
1698            } else {
1699                None
1700            }
1701        };
1702        let embedding_mode_str = match embedding_mode {
1703            EmbeddingMode::None => "none",
1704            EmbeddingMode::Auto => "auto",
1705            EmbeddingMode::Explicit => "explicit",
1706        };
1707        let summary_json = json!({
1708            "processed": processed,
1709            "ingested": reports.len(),
1710            "skipped": skipped,
1711            "bytes_added": bytes_added,
1712            "bytes_added_human": format_bytes(bytes_added),
1713            "embedded_documents": embedded_docs,
1714            "embedding": {
1715                "enabled": embedding_enabled,
1716                "mode": embedding_mode_str,
1717                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1718            },
1719            "tables": {
1720                "enabled": args.tables && !args.no_tables,
1721                "extracted": tables_extracted,
1722                "tables": table_summaries,
1723            },
1724            "capacity_reached": capacity_reached,
1725            "warnings": summary_warnings,
1726            "extraction": {
1727                "total_duration_ms": total_duration_ms,
1728                "total_pages_processed": total_pages,
1729            },
1730            "memory": {
1731                "path": args.file.display().to_string(),
1732                "frame_count": stats.frame_count,
1733                "size_bytes": stats.size_bytes,
1734                "tier": format!("{:?}", stats.tier),
1735            },
1736            "reports": reports_json,
1737        });
1738        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1739    } else {
1740        println!(
1741            "Committed {} frame(s); total frames {}",
1742            reports.len(),
1743            stats.frame_count
1744        );
1745        println!(
1746            "Summary: processed {}, ingested {}, skipped {}, bytes added {}",
1747            processed,
1748            reports.len(),
1749            skipped,
1750            format_bytes(bytes_added)
1751        );
1752        if tables_extracted > 0 {
1753            println!("Tables: {} extracted from PDF(s)", tables_extracted);
1754        }
1755    }
1756
1757    // Save active replay session if one exists
1758    #[cfg(feature = "replay")]
1759    let _ = mem.save_active_session();
1760
1761    Ok(())
1762}
1763
1764pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
1765    let mut mem = Memvid::open(&args.file)?;
1766    ensure_cli_mutation_allowed(&mem)?;
1767    apply_lock_cli(&mut mem, &args.lock);
1768    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
1769    let frame_id = existing.id;
1770
1771    let payload_bytes = match args.input.as_ref() {
1772        Some(path) => Some(read_payload(Some(path))?),
1773        None => None,
1774    };
1775    let payload_slice = payload_bytes.as_deref();
1776    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
1777    let source_path = args.input.as_deref();
1778
1779    let mut options = PutOptions::default();
1780    options.enable_embedding = false;
1781    options.auto_tag = payload_slice.is_some();
1782    options.extract_dates = payload_slice.is_some();
1783    options.timestamp = Some(existing.timestamp);
1784    options.track = existing.track.clone();
1785    options.kind = existing.kind.clone();
1786    options.uri = existing.uri.clone();
1787    options.title = existing.title.clone();
1788    options.metadata = existing.metadata.clone();
1789    options.search_text = existing.search_text.clone();
1790    options.tags = existing.tags.clone();
1791    options.labels = existing.labels.clone();
1792    options.extra_metadata = existing.extra_metadata.clone();
1793
1794    if let Some(new_uri) = &args.set_uri {
1795        options.uri = Some(derive_uri(Some(new_uri), None));
1796    }
1797
1798    if options.uri.is_none() && payload_slice.is_some() {
1799        options.uri = Some(derive_uri(None, source_path));
1800    }
1801
1802    if let Some(title) = &args.title {
1803        options.title = Some(title.clone());
1804    }
1805    if let Some(ts) = args.timestamp {
1806        options.timestamp = Some(ts);
1807    }
1808    if let Some(track) = &args.track {
1809        options.track = Some(track.clone());
1810    }
1811    if let Some(kind) = &args.kind {
1812        options.kind = Some(kind.clone());
1813    }
1814
1815    if !args.labels.is_empty() {
1816        options.labels = args.labels.clone();
1817    }
1818
1819    if !args.tags.is_empty() {
1820        options.tags.clear();
1821        for (key, value) in &args.tags {
1822            if !key.is_empty() {
1823                options.tags.push(key.clone());
1824            }
1825            if !value.is_empty() && value != key {
1826                options.tags.push(value.clone());
1827            }
1828            options.extra_metadata.insert(key.clone(), value.clone());
1829        }
1830    }
1831
1832    apply_metadata_overrides(&mut options, &args.metadata);
1833
1834    let mut search_source = options.search_text.clone();
1835
1836    if let Some(payload) = payload_slice {
1837        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
1838        if let Some(title) = inferred_title {
1839            options.title = Some(title);
1840        }
1841
1842        if options.uri.is_none() {
1843            options.uri = Some(derive_uri(None, source_path));
1844        }
1845
1846        let analysis = analyze_file(
1847            source_path,
1848            payload,
1849            payload_utf8,
1850            options.title.as_deref(),
1851            AudioAnalyzeOptions::default(),
1852            false,
1853        );
1854        if let Some(meta) = analysis.metadata {
1855            options.metadata = Some(meta);
1856        }
1857        if let Some(text) = analysis.search_text {
1858            search_source = Some(text.clone());
1859            options.search_text = Some(text);
1860        }
1861    } else {
1862        options.auto_tag = false;
1863        options.extract_dates = false;
1864    }
1865
1866    let stats = mem.stats()?;
1867    options.enable_embedding = stats.has_vec_index;
1868
1869    // Auto-detect embedding model from existing MV2 dimension
1870    let mv2_dimension = mem.effective_vec_index_dimension()?;
1871    let mut embedding: Option<Vec<f32>> = None;
1872    if args.embeddings {
1873        let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
1874            memvid_core::EmbeddingIdentitySummary::Single(identity) => identity.model.map(String::from),
1875            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
1876                let models: Vec<_> = identities
1877                    .iter()
1878                    .filter_map(|entry| entry.identity.model.as_deref())
1879                    .collect();
1880                anyhow::bail!(
1881                    "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
1882                    Detected models: {:?}",
1883                    models
1884                );
1885            }
1886            memvid_core::EmbeddingIdentitySummary::Unknown => None,
1887        };
1888
1889        let runtime = load_embedding_runtime_for_mv2(
1890            config,
1891            inferred_embedding_model.as_deref(),
1892            mv2_dimension,
1893        )?;
1894        if let Some(text) = search_source.as_ref() {
1895            if !text.trim().is_empty() {
1896                embedding = Some(runtime.embed_passage(text)?);
1897            }
1898        }
1899        if embedding.is_none() {
1900            if let Some(text) = payload_utf8 {
1901                if !text.trim().is_empty() {
1902                    embedding = Some(runtime.embed_passage(text)?);
1903                }
1904            }
1905        }
1906        if embedding.is_none() {
1907            warn!("no textual content available; embeddings not recomputed");
1908        }
1909        if embedding.is_some() {
1910            apply_embedding_identity_metadata(&mut options, &runtime);
1911        }
1912    }
1913
1914    let mut effective_embedding = embedding;
1915    if effective_embedding.is_none() && stats.has_vec_index {
1916        effective_embedding = mem.frame_embedding(frame_id)?;
1917    }
1918
1919    let final_uri = options.uri.clone();
1920    let replaced_payload = payload_slice.is_some();
1921    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
1922    mem.commit()?;
1923
1924    let updated_frame =
1925        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
1926            mem.frame_by_uri(&uri)?
1927        } else {
1928            let mut query = TimelineQueryBuilder::default();
1929            if let Some(limit) = NonZeroU64::new(1) {
1930                query = query.limit(limit).reverse(true);
1931            }
1932            let latest = mem.timeline(query.build())?;
1933            if let Some(entry) = latest.first() {
1934                mem.frame_by_id(entry.frame_id)?
1935            } else {
1936                mem.frame_by_id(frame_id)?
1937            }
1938        };
1939
1940    if args.json {
1941        let json = json!({
1942            "sequence": seq,
1943            "previous_frame_id": frame_id,
1944            "frame": frame_to_json(&updated_frame),
1945            "replaced_payload": replaced_payload,
1946        });
1947        println!("{}", serde_json::to_string_pretty(&json)?);
1948    } else {
1949        println!(
1950            "Updated frame {} → new frame {} (seq {})",
1951            frame_id, updated_frame.id, seq
1952        );
1953        println!(
1954            "Payload: {}",
1955            if replaced_payload {
1956                "replaced"
1957            } else {
1958                "reused"
1959            }
1960        );
1961        print_frame_summary(&mut mem, &updated_frame)?;
1962    }
1963
1964    Ok(())
1965}
1966
1967fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
1968    if let Some(uri) = provided {
1969        let sanitized = sanitize_uri(uri, true);
1970        return format!("mv2://{}", sanitized);
1971    }
1972
1973    if let Some(path) = source {
1974        let raw = path.to_string_lossy();
1975        let sanitized = sanitize_uri(&raw, false);
1976        return format!("mv2://{}", sanitized);
1977    }
1978
1979    format!("mv2://frames/{}", Uuid::new_v4())
1980}
1981
1982pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
1983    let digest = hash(payload).to_hex();
1984    let short = &digest[..32];
1985    let ext_from_path = source
1986        .and_then(|path| path.extension())
1987        .and_then(|ext| ext.to_str())
1988        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
1989        .filter(|ext| !ext.is_empty());
1990    let ext = ext_from_path
1991        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
1992        .unwrap_or_else(|| "bin".to_string());
1993    format!("mv2://video/{short}.{ext}")
1994}
1995
1996fn derive_title(
1997    provided: Option<String>,
1998    source: Option<&Path>,
1999    payload_utf8: Option<&str>,
2000) -> Option<String> {
2001    if let Some(title) = provided {
2002        let trimmed = title.trim();
2003        if trimmed.is_empty() {
2004            None
2005        } else {
2006            Some(trimmed.to_string())
2007        }
2008    } else {
2009        if let Some(text) = payload_utf8 {
2010            if let Some(markdown_title) = extract_markdown_title(text) {
2011                return Some(markdown_title);
2012            }
2013        }
2014        source
2015            .and_then(|path| path.file_stem())
2016            .and_then(|stem| stem.to_str())
2017            .map(to_title_case)
2018    }
2019}
2020
2021fn extract_markdown_title(text: &str) -> Option<String> {
2022    for line in text.lines() {
2023        let trimmed = line.trim();
2024        if trimmed.starts_with('#') {
2025            let title = trimmed.trim_start_matches('#').trim();
2026            if !title.is_empty() {
2027                return Some(title.to_string());
2028            }
2029        }
2030    }
2031    None
2032}
2033
2034fn to_title_case(input: &str) -> String {
2035    if input.is_empty() {
2036        return String::new();
2037    }
2038    let mut chars = input.chars();
2039    let Some(first) = chars.next() else {
2040        return String::new();
2041    };
2042    let prefix = first.to_uppercase().collect::<String>();
2043    prefix + chars.as_str()
2044}
2045
2046fn should_skip_input(path: &Path) -> bool {
2047    matches!(
2048        path.file_name().and_then(|name| name.to_str()),
2049        Some(".DS_Store")
2050    )
2051}
2052
2053fn should_skip_dir(path: &Path) -> bool {
2054    matches!(
2055        path.file_name().and_then(|name| name.to_str()),
2056        Some(name) if name.starts_with('.')
2057    )
2058}
2059
2060enum InputSet {
2061    Stdin,
2062    Files(Vec<PathBuf>),
2063}
2064
2065/// Check if a file is an image based on extension
2066#[cfg(feature = "clip")]
2067fn is_image_file(path: &std::path::Path) -> bool {
2068    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2069        return false;
2070    };
2071    matches!(
2072        ext.to_ascii_lowercase().as_str(),
2073        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2074    )
2075}
2076
2077fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2078    let Some(raw) = input else {
2079        return Ok(InputSet::Stdin);
2080    };
2081
2082    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2083        let mut files = Vec::new();
2084        let mut matched_any = false;
2085        for entry in glob(raw)? {
2086            let path = entry?;
2087            if path.is_file() {
2088                matched_any = true;
2089                if should_skip_input(&path) {
2090                    continue;
2091                }
2092                files.push(path);
2093            }
2094        }
2095        files.sort();
2096        if files.is_empty() {
2097            if matched_any {
2098                anyhow::bail!(
2099                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2100                );
2101            }
2102            anyhow::bail!("pattern '{raw}' did not match any files");
2103        }
2104        return Ok(InputSet::Files(files));
2105    }
2106
2107    let path = PathBuf::from(raw);
2108    if path.is_dir() {
2109        let (mut files, skipped_any) = collect_directory_files(&path)?;
2110        files.sort();
2111        if files.is_empty() {
2112            if skipped_any {
2113                anyhow::bail!(
2114                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2115                    path.display()
2116                );
2117            }
2118            anyhow::bail!(
2119                "directory '{}' contains no ingestible files",
2120                path.display()
2121            );
2122        }
2123        Ok(InputSet::Files(files))
2124    } else {
2125        Ok(InputSet::Files(vec![path]))
2126    }
2127}
2128
2129fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2130    let mut files = Vec::new();
2131    let mut skipped_any = false;
2132    let mut stack = vec![root.to_path_buf()];
2133    while let Some(dir) = stack.pop() {
2134        for entry in fs::read_dir(&dir)? {
2135            let entry = entry?;
2136            let path = entry.path();
2137            let file_type = entry.file_type()?;
2138            if file_type.is_dir() {
2139                if should_skip_dir(&path) {
2140                    skipped_any = true;
2141                    continue;
2142                }
2143                stack.push(path);
2144            } else if file_type.is_file() {
2145                if should_skip_input(&path) {
2146                    skipped_any = true;
2147                    continue;
2148                }
2149                files.push(path);
2150            }
2151        }
2152    }
2153    Ok((files, skipped_any))
2154}
2155
2156struct IngestOutcome {
2157    report: PutReport,
2158    embedded: bool,
2159    #[cfg(feature = "parallel_segments")]
2160    parallel_input: Option<ParallelInput>,
2161}
2162
2163struct PutReport {
2164    seq: u64,
2165    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2166    frame_id: u64,
2167    uri: String,
2168    title: Option<String>,
2169    original_bytes: usize,
2170    stored_bytes: usize,
2171    compressed: bool,
2172    source: Option<PathBuf>,
2173    mime: Option<String>,
2174    metadata: Option<DocMetadata>,
2175    extraction: ExtractionSummary,
2176    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2177    #[cfg(feature = "logic_mesh")]
2178    search_text: Option<String>,
2179}
2180
2181struct CapacityGuard {
2182    #[allow(dead_code)]
2183    tier: Tier,
2184    capacity: u64,
2185    current_size: u64,
2186}
2187
2188impl CapacityGuard {
2189    const MIN_OVERHEAD: u64 = 128 * 1024;
2190    const RELATIVE_OVERHEAD: f64 = 0.05;
2191
2192    fn from_stats(stats: &Stats) -> Option<Self> {
2193        if stats.capacity_bytes == 0 {
2194            return None;
2195        }
2196        Some(Self {
2197            tier: stats.tier,
2198            capacity: stats.capacity_bytes,
2199            current_size: stats.size_bytes,
2200        })
2201    }
2202
2203    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2204        let projected = self
2205            .current_size
2206            .saturating_add(additional)
2207            .saturating_add(Self::estimate_overhead(additional));
2208        if projected > self.capacity {
2209            return Err(map_put_error(
2210                MemvidError::CapacityExceeded {
2211                    current: self.current_size,
2212                    limit: self.capacity,
2213                    required: projected,
2214                },
2215                Some(self.capacity),
2216            ));
2217        }
2218        Ok(())
2219    }
2220
2221    fn update_after_commit(&mut self, stats: &Stats) {
2222        self.current_size = stats.size_bytes;
2223    }
2224
2225    fn capacity_hint(&self) -> Option<u64> {
2226        Some(self.capacity)
2227    }
2228
2229    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2230    fn check_and_warn_capacity(&self) {
2231        if self.capacity == 0 {
2232            return;
2233        }
2234
2235        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2236
2237        // Warn at key thresholds
2238        if usage_pct >= 90.0 && usage_pct < 91.0 {
2239            eprintln!(
2240                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2241                format_bytes(self.current_size),
2242                format_bytes(self.capacity)
2243            );
2244            eprintln!("   Actions:");
2245            eprintln!("   1. Recreate with larger --size");
2246            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2247            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2248        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2249            eprintln!(
2250                "⚠️  WARNING: 75% capacity used ({} / {})",
2251                format_bytes(self.current_size),
2252                format_bytes(self.capacity)
2253            );
2254            eprintln!("   Consider planning for capacity expansion soon");
2255        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2256            eprintln!(
2257                "ℹ️  INFO: 50% capacity used ({} / {})",
2258                format_bytes(self.current_size),
2259                format_bytes(self.capacity)
2260            );
2261        }
2262    }
2263
2264    fn estimate_overhead(additional: u64) -> u64 {
2265        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2266        fractional.max(Self::MIN_OVERHEAD)
2267    }
2268}
2269
2270#[derive(Debug, Clone)]
2271pub struct FileAnalysis {
2272    pub mime: String,
2273    pub metadata: Option<DocMetadata>,
2274    pub search_text: Option<String>,
2275    pub extraction: ExtractionSummary,
2276}
2277
2278#[derive(Clone, Copy)]
2279pub struct AudioAnalyzeOptions {
2280    force: bool,
2281    segment_secs: u32,
2282    transcribe: bool,
2283}
2284
2285impl AudioAnalyzeOptions {
2286    const DEFAULT_SEGMENT_SECS: u32 = 30;
2287
2288    fn normalised_segment_secs(self) -> u32 {
2289        self.segment_secs.max(1)
2290    }
2291}
2292
2293impl Default for AudioAnalyzeOptions {
2294    fn default() -> Self {
2295        Self {
2296            force: false,
2297            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2298            transcribe: false,
2299        }
2300    }
2301}
2302
2303struct AudioAnalysis {
2304    metadata: DocAudioMetadata,
2305    caption: Option<String>,
2306    search_terms: Vec<String>,
2307    transcript: Option<String>,
2308}
2309
2310struct ImageAnalysis {
2311    width: u32,
2312    height: u32,
2313    palette: Vec<String>,
2314    caption: Option<String>,
2315    exif: Option<DocExifMetadata>,
2316}
2317
2318#[derive(Debug, Clone)]
2319pub struct ExtractionSummary {
2320    reader: Option<String>,
2321    status: ExtractionStatus,
2322    warnings: Vec<String>,
2323    duration_ms: Option<u64>,
2324    pages_processed: Option<u32>,
2325}
2326
2327impl ExtractionSummary {
2328    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2329        self.warnings.push(warning.into());
2330    }
2331}
2332
2333#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2334enum ExtractionStatus {
2335    Skipped,
2336    Ok,
2337    FallbackUsed,
2338    Empty,
2339    Failed,
2340}
2341
2342impl ExtractionStatus {
2343    fn label(self) -> &'static str {
2344        match self {
2345            Self::Skipped => "skipped",
2346            Self::Ok => "ok",
2347            Self::FallbackUsed => "fallback_used",
2348            Self::Empty => "empty",
2349            Self::Failed => "failed",
2350        }
2351    }
2352}
2353
2354fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2355    let filename = source_path
2356        .and_then(|path| path.file_name())
2357        .and_then(|name| name.to_str())
2358        .map(|value| value.to_string());
2359    MediaManifest {
2360        kind: "video".to_string(),
2361        mime: mime.to_string(),
2362        bytes,
2363        filename,
2364        duration_ms: None,
2365        width: None,
2366        height: None,
2367        codec: None,
2368    }
2369}
2370
2371pub fn analyze_file(
2372    source_path: Option<&Path>,
2373    payload: &[u8],
2374    payload_utf8: Option<&str>,
2375    inferred_title: Option<&str>,
2376    audio_opts: AudioAnalyzeOptions,
2377    force_video: bool,
2378) -> FileAnalysis {
2379    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2380    let is_video = force_video || mime.starts_with("video/");
2381    let treat_as_text = if is_video { false } else { treat_as_text_base };
2382
2383    let mut metadata = DocMetadata::default();
2384    metadata.mime = Some(mime.clone());
2385    metadata.bytes = Some(payload.len() as u64);
2386    metadata.hash = Some(hash(payload).to_hex().to_string());
2387
2388    let mut search_text = None;
2389
2390    let mut extraction = ExtractionSummary {
2391        reader: None,
2392        status: ExtractionStatus::Skipped,
2393        warnings: Vec::new(),
2394        duration_ms: None,
2395        pages_processed: None,
2396    };
2397
2398    let reader_registry = default_reader_registry();
2399    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2400        if slice.is_empty() {
2401            None
2402        } else {
2403            Some(slice)
2404        }
2405    });
2406
2407    let apply_extracted_text = |text: &str,
2408                                reader_label: &str,
2409                                status_if_applied: ExtractionStatus,
2410                                extraction: &mut ExtractionSummary,
2411                                metadata: &mut DocMetadata,
2412                                search_text: &mut Option<String>|
2413     -> bool {
2414        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2415            let mut value = normalized.text;
2416            if normalized.truncated {
2417                value.push('…');
2418            }
2419            if metadata.caption.is_none() {
2420                if let Some(caption) = caption_from_text(&value) {
2421                    metadata.caption = Some(caption);
2422                }
2423            }
2424            *search_text = Some(value);
2425            extraction.reader = Some(reader_label.to_string());
2426            extraction.status = status_if_applied;
2427            true
2428        } else {
2429            false
2430        }
2431    };
2432
2433    if is_video {
2434        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2435    }
2436
2437    if treat_as_text {
2438        if let Some(text) = payload_utf8 {
2439            if !apply_extracted_text(
2440                text,
2441                "bytes",
2442                ExtractionStatus::Ok,
2443                &mut extraction,
2444                &mut metadata,
2445                &mut search_text,
2446            ) {
2447                extraction.status = ExtractionStatus::Empty;
2448                extraction.reader = Some("bytes".to_string());
2449                let msg = "text payload contained no searchable content after normalization";
2450                extraction.record_warning(msg);
2451                warn!("{msg}");
2452            }
2453        } else {
2454            extraction.reader = Some("bytes".to_string());
2455            extraction.status = ExtractionStatus::Failed;
2456            let msg = "payload reported as text but was not valid UTF-8";
2457            extraction.record_warning(msg);
2458            warn!("{msg}");
2459        }
2460    } else if mime == "application/pdf"
2461        || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2462        || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2463        || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2464        || mime == "application/vnd.ms-excel"
2465        || mime == "application/msword"
2466    {
2467        // Use reader registry for PDFs and Office documents (XLSX, DOCX, PPTX, etc.)
2468        let mut applied = false;
2469
2470        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2471        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2472            .with_uri(source_path.and_then(|p| p.to_str()))
2473            .with_magic(magic);
2474
2475        if let Some(reader) = reader_registry.find_reader(&hint) {
2476            match reader.extract(payload, &hint) {
2477                Ok(output) => {
2478                    extraction.reader = Some(output.reader_name.clone());
2479                    if let Some(ms) = output.diagnostics.duration_ms {
2480                        extraction.duration_ms = Some(ms);
2481                    }
2482                    if let Some(pages) = output.diagnostics.pages_processed {
2483                        extraction.pages_processed = Some(pages);
2484                    }
2485                    if let Some(mime_type) = output.document.mime_type.clone() {
2486                        metadata.mime = Some(mime_type);
2487                    }
2488
2489                    for warning in output.diagnostics.warnings.iter() {
2490                        extraction.record_warning(warning);
2491                        warn!("{warning}");
2492                    }
2493
2494                    let status = if output.diagnostics.fallback {
2495                        ExtractionStatus::FallbackUsed
2496                    } else {
2497                        ExtractionStatus::Ok
2498                    };
2499
2500                    if let Some(doc_text) = output.document.text.as_ref() {
2501                        applied = apply_extracted_text(
2502                            doc_text,
2503                            output.reader_name.as_str(),
2504                            status,
2505                            &mut extraction,
2506                            &mut metadata,
2507                            &mut search_text,
2508                        );
2509                        if !applied {
2510                            extraction.reader = Some(output.reader_name);
2511                            extraction.status = ExtractionStatus::Empty;
2512                            let msg = "primary reader returned no usable text";
2513                            extraction.record_warning(msg);
2514                            warn!("{msg}");
2515                        }
2516                    } else {
2517                        extraction.reader = Some(output.reader_name);
2518                        extraction.status = ExtractionStatus::Empty;
2519                        let msg = "primary reader returned empty text";
2520                        extraction.record_warning(msg);
2521                        warn!("{msg}");
2522                    }
2523                }
2524                Err(err) => {
2525                    let name = reader.name();
2526                    extraction.reader = Some(name.to_string());
2527                    extraction.status = ExtractionStatus::Failed;
2528                    let msg = format!("primary reader {name} failed: {err}");
2529                    extraction.record_warning(&msg);
2530                    warn!("{msg}");
2531                }
2532            }
2533        } else {
2534            extraction.record_warning("no registered reader matched this PDF payload");
2535            warn!("no registered reader matched this PDF payload");
2536        }
2537
2538        if !applied {
2539            match extract_pdf_text(payload) {
2540                Ok(pdf_text) => {
2541                    if !apply_extracted_text(
2542                        &pdf_text,
2543                        "pdf_extract",
2544                        ExtractionStatus::FallbackUsed,
2545                        &mut extraction,
2546                        &mut metadata,
2547                        &mut search_text,
2548                    ) {
2549                        extraction.reader = Some("pdf_extract".to_string());
2550                        extraction.status = ExtractionStatus::Empty;
2551                        let msg = "PDF text extraction yielded no searchable content";
2552                        extraction.record_warning(msg);
2553                        warn!("{msg}");
2554                    }
2555                }
2556                Err(err) => {
2557                    extraction.reader = Some("pdf_extract".to_string());
2558                    extraction.status = ExtractionStatus::Failed;
2559                    let msg = format!("failed to extract PDF text via fallback: {err}");
2560                    extraction.record_warning(&msg);
2561                    warn!("{msg}");
2562                }
2563            }
2564        }
2565    }
2566
2567    if !is_video && mime.starts_with("image/") {
2568        if let Some(image_meta) = analyze_image(payload) {
2569            let ImageAnalysis {
2570                width,
2571                height,
2572                palette,
2573                caption,
2574                exif,
2575            } = image_meta;
2576            metadata.width = Some(width);
2577            metadata.height = Some(height);
2578            if !palette.is_empty() {
2579                metadata.colors = Some(palette);
2580            }
2581            if metadata.caption.is_none() {
2582                metadata.caption = caption;
2583            }
2584            if let Some(exif) = exif {
2585                metadata.exif = Some(exif);
2586            }
2587        }
2588    }
2589
2590    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2591        if let Some(AudioAnalysis {
2592            metadata: audio_metadata,
2593            caption: audio_caption,
2594            mut search_terms,
2595            transcript,
2596        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2597        {
2598            if metadata.audio.is_none()
2599                || metadata
2600                    .audio
2601                    .as_ref()
2602                    .map_or(true, DocAudioMetadata::is_empty)
2603            {
2604                metadata.audio = Some(audio_metadata);
2605            }
2606            if metadata.caption.is_none() {
2607                metadata.caption = audio_caption;
2608            }
2609
2610            // Use transcript as primary search text if available
2611            if let Some(ref text) = transcript {
2612                extraction.reader = Some("whisper".to_string());
2613                extraction.status = ExtractionStatus::Ok;
2614                search_text = Some(text.clone());
2615            } else if !search_terms.is_empty() {
2616                match &mut search_text {
2617                    Some(existing) => {
2618                        for term in search_terms.drain(..) {
2619                            if !existing.contains(&term) {
2620                                if !existing.ends_with(' ') {
2621                                    existing.push(' ');
2622                                }
2623                                existing.push_str(&term);
2624                            }
2625                        }
2626                    }
2627                    None => {
2628                        search_text = Some(search_terms.join(" "));
2629                    }
2630                }
2631            }
2632        }
2633    }
2634
2635    if metadata.caption.is_none() {
2636        if let Some(title) = inferred_title {
2637            let trimmed = title.trim();
2638            if !trimmed.is_empty() {
2639                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2640            }
2641        }
2642    }
2643
2644    FileAnalysis {
2645        mime,
2646        metadata: if metadata.is_empty() {
2647            None
2648        } else {
2649            Some(metadata)
2650        },
2651        search_text,
2652        extraction,
2653    }
2654}
2655
2656fn default_reader_registry() -> &'static ReaderRegistry {
2657    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2658    REGISTRY.get_or_init(ReaderRegistry::default)
2659}
2660
2661fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2662    if detect_pdf_magic(magic) {
2663        return Some(DocumentFormat::Pdf);
2664    }
2665
2666    let mime = mime?.trim().to_ascii_lowercase();
2667    match mime.as_str() {
2668        "application/pdf" => Some(DocumentFormat::Pdf),
2669        "text/plain" => Some(DocumentFormat::PlainText),
2670        "text/markdown" => Some(DocumentFormat::Markdown),
2671        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2672        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2673            Some(DocumentFormat::Docx)
2674        }
2675        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2676            Some(DocumentFormat::Pptx)
2677        }
2678        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2679            Some(DocumentFormat::Xlsx)
2680        }
2681        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2682        _ => None,
2683    }
2684}
2685
2686fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2687    let mut slice = match magic {
2688        Some(slice) if !slice.is_empty() => slice,
2689        _ => return false,
2690    };
2691    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2692        slice = &slice[3..];
2693    }
2694    while let Some((first, rest)) = slice.split_first() {
2695        if first.is_ascii_whitespace() {
2696            slice = rest;
2697        } else {
2698            break;
2699        }
2700    }
2701    slice.starts_with(b"%PDF")
2702}
2703
2704fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2705    match pdf_extract::extract_text_from_mem(payload) {
2706        Ok(text) if text.trim().is_empty() => match fallback_pdftotext(payload) {
2707            Ok(fallback) => Ok(fallback),
2708            Err(err) => {
2709                warn!("pdf_extract returned empty text and pdftotext fallback failed: {err}");
2710                Ok(text)
2711            }
2712        },
2713        Err(err) => {
2714            warn!("pdf_extract failed: {err}");
2715            match fallback_pdftotext(payload) {
2716                Ok(fallback) => Ok(fallback),
2717                Err(fallback_err) => {
2718                    warn!("pdftotext fallback failed: {fallback_err}");
2719                    Err(err)
2720                }
2721            }
2722        }
2723        other => other,
2724    }
2725}
2726
2727fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
2728    use std::io::Write;
2729    use std::process::{Command, Stdio};
2730
2731    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
2732
2733    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
2734    temp_pdf
2735        .write_all(payload)
2736        .context("failed to write pdf payload to temp file")?;
2737    temp_pdf.flush().context("failed to flush temp pdf file")?;
2738
2739    let child = Command::new(pdftotext)
2740        .arg("-layout")
2741        .arg(temp_pdf.path())
2742        .arg("-")
2743        .stdout(Stdio::piped())
2744        .spawn()
2745        .context("failed to spawn pdftotext")?;
2746
2747    let output = child
2748        .wait_with_output()
2749        .context("failed to read pdftotext output")?;
2750
2751    if !output.status.success() {
2752        return Err(anyhow!("pdftotext exited with status {}", output.status));
2753    }
2754
2755    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
2756    Ok(text)
2757}
2758
2759fn detect_mime(
2760    source_path: Option<&Path>,
2761    payload: &[u8],
2762    payload_utf8: Option<&str>,
2763) -> (String, bool) {
2764    if let Some(kind) = infer::get(payload) {
2765        let mime = kind.mime_type().to_string();
2766        let treat_as_text = mime.starts_with("text/")
2767            || matches!(
2768                mime.as_str(),
2769                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
2770            );
2771        return (mime, treat_as_text);
2772    }
2773
2774    let magic = magic_from_u8(payload);
2775    if !magic.is_empty() && magic != "application/octet-stream" {
2776        let treat_as_text = magic.starts_with("text/")
2777            || matches!(
2778                magic,
2779                "application/json"
2780                    | "application/xml"
2781                    | "application/javascript"
2782                    | "image/svg+xml"
2783                    | "text/plain"
2784            );
2785        return (magic.to_string(), treat_as_text);
2786    }
2787
2788    if let Some(path) = source_path {
2789        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2790            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
2791                return (mime.to_string(), treat_as_text);
2792            }
2793        }
2794    }
2795
2796    if payload_utf8.is_some() {
2797        return ("text/plain".to_string(), true);
2798    }
2799
2800    ("application/octet-stream".to_string(), false)
2801}
2802
2803fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
2804    let ext_lower = ext.to_ascii_lowercase();
2805    match ext_lower.as_str() {
2806        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
2807        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
2808        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
2809        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
2810        "md" | "markdown" => Some(("text/markdown", true)),
2811        "rst" => Some(("text/x-rst", true)),
2812        "json" => Some(("application/json", true)),
2813        "csv" => Some(("text/csv", true)),
2814        "tsv" => Some(("text/tab-separated-values", true)),
2815        "yaml" | "yml" => Some(("application/yaml", true)),
2816        "toml" => Some(("application/toml", true)),
2817        "html" | "htm" => Some(("text/html", true)),
2818        "xml" => Some(("application/xml", true)),
2819        "svg" => Some(("image/svg+xml", true)),
2820        "gif" => Some(("image/gif", false)),
2821        "jpg" | "jpeg" => Some(("image/jpeg", false)),
2822        "png" => Some(("image/png", false)),
2823        "bmp" => Some(("image/bmp", false)),
2824        "ico" => Some(("image/x-icon", false)),
2825        "tif" | "tiff" => Some(("image/tiff", false)),
2826        "webp" => Some(("image/webp", false)),
2827        _ => None,
2828    }
2829}
2830
2831fn caption_from_text(text: &str) -> Option<String> {
2832    for line in text.lines() {
2833        let trimmed = line.trim();
2834        if !trimmed.is_empty() {
2835            return Some(truncate_to_boundary(trimmed, 240));
2836        }
2837    }
2838    None
2839}
2840
2841fn truncate_to_boundary(text: &str, max_len: usize) -> String {
2842    if text.len() <= max_len {
2843        return text.to_string();
2844    }
2845    let mut end = max_len;
2846    while end > 0 && !text.is_char_boundary(end) {
2847        end -= 1;
2848    }
2849    if end == 0 {
2850        return String::new();
2851    }
2852    let mut truncated = text[..end].trim_end().to_string();
2853    truncated.push('…');
2854    truncated
2855}
2856
2857fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
2858    let reader = ImageReader::new(Cursor::new(payload))
2859        .with_guessed_format()
2860        .map_err(|err| warn!("failed to guess image format: {err}"))
2861        .ok()?;
2862    let image = reader
2863        .decode()
2864        .map_err(|err| warn!("failed to decode image: {err}"))
2865        .ok()?;
2866    let width = image.width();
2867    let height = image.height();
2868    let rgb = image.to_rgb8();
2869    let palette = match get_palette(
2870        rgb.as_raw(),
2871        ColorFormat::Rgb,
2872        COLOR_PALETTE_SIZE,
2873        COLOR_PALETTE_QUALITY,
2874    ) {
2875        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
2876        Err(err) => {
2877            warn!("failed to compute colour palette: {err}");
2878            Vec::new()
2879        }
2880    };
2881
2882    let (exif, exif_caption) = extract_exif_metadata(payload);
2883
2884    Some(ImageAnalysis {
2885        width,
2886        height,
2887        palette,
2888        caption: exif_caption,
2889        exif,
2890    })
2891}
2892
2893fn color_to_hex(color: Color) -> String {
2894    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
2895}
2896
2897fn analyze_audio(
2898    payload: &[u8],
2899    source_path: Option<&Path>,
2900    mime: &str,
2901    options: AudioAnalyzeOptions,
2902) -> Option<AudioAnalysis> {
2903    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
2904    use symphonia::core::errors::Error as SymphoniaError;
2905    use symphonia::core::formats::FormatOptions;
2906    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
2907    use symphonia::core::meta::MetadataOptions;
2908    use symphonia::core::probe::Hint;
2909
2910    let cursor = Cursor::new(payload.to_vec());
2911    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
2912
2913    let mut hint = Hint::new();
2914    if let Some(path) = source_path {
2915        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
2916            hint.with_extension(ext);
2917        }
2918    }
2919    if let Some(suffix) = mime.split('/').nth(1) {
2920        hint.with_extension(suffix);
2921    }
2922
2923    let probed = symphonia::default::get_probe()
2924        .format(
2925            &hint,
2926            mss,
2927            &FormatOptions::default(),
2928            &MetadataOptions::default(),
2929        )
2930        .map_err(|err| warn!("failed to probe audio stream: {err}"))
2931        .ok()?;
2932
2933    let mut format = probed.format;
2934    let track = format.default_track()?;
2935    let track_id = track.id;
2936    let codec_params: CodecParameters = track.codec_params.clone();
2937    let sample_rate = codec_params.sample_rate;
2938    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
2939
2940    let mut decoder = symphonia::default::get_codecs()
2941        .make(&codec_params, &DecoderOptions::default())
2942        .map_err(|err| warn!("failed to create audio decoder: {err}"))
2943        .ok()?;
2944
2945    let mut decoded_frames: u64 = 0;
2946    loop {
2947        let packet = match format.next_packet() {
2948            Ok(packet) => packet,
2949            Err(SymphoniaError::IoError(_)) => break,
2950            Err(SymphoniaError::DecodeError(err)) => {
2951                warn!("skipping undecodable audio packet: {err}");
2952                continue;
2953            }
2954            Err(SymphoniaError::ResetRequired) => {
2955                decoder.reset();
2956                continue;
2957            }
2958            Err(other) => {
2959                warn!("stopping audio analysis due to error: {other}");
2960                break;
2961            }
2962        };
2963
2964        if packet.track_id() != track_id {
2965            continue;
2966        }
2967
2968        match decoder.decode(&packet) {
2969            Ok(audio_buf) => {
2970                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
2971            }
2972            Err(SymphoniaError::DecodeError(err)) => {
2973                warn!("failed to decode audio packet: {err}");
2974            }
2975            Err(SymphoniaError::IoError(_)) => break,
2976            Err(SymphoniaError::ResetRequired) => {
2977                decoder.reset();
2978            }
2979            Err(other) => {
2980                warn!("decoder error: {other}");
2981                break;
2982            }
2983        }
2984    }
2985
2986    let duration_secs = match sample_rate {
2987        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
2988        _ => 0.0,
2989    };
2990
2991    let bitrate_kbps = if duration_secs > 0.0 {
2992        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
2993    } else {
2994        None
2995    };
2996
2997    let tags = extract_lofty_tags(payload);
2998    let caption = tags
2999        .get("title")
3000        .cloned()
3001        .or_else(|| tags.get("tracktitle").cloned());
3002
3003    let mut search_terms = Vec::new();
3004    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3005        search_terms.push(title.clone());
3006    }
3007    if let Some(artist) = tags.get("artist") {
3008        search_terms.push(artist.clone());
3009    }
3010    if let Some(album) = tags.get("album") {
3011        search_terms.push(album.clone());
3012    }
3013    if let Some(genre) = tags.get("genre") {
3014        search_terms.push(genre.clone());
3015    }
3016
3017    let mut segments = Vec::new();
3018    if duration_secs > 0.0 {
3019        let segment_len = options.normalised_segment_secs() as f64;
3020        if segment_len > 0.0 {
3021            let mut start = 0.0;
3022            let mut idx = 1usize;
3023            while start < duration_secs {
3024                let end = (start + segment_len).min(duration_secs);
3025                segments.push(AudioSegmentMetadata {
3026                    start_seconds: start as f32,
3027                    end_seconds: end as f32,
3028                    label: Some(format!("Segment {}", idx)),
3029                });
3030                if end >= duration_secs {
3031                    break;
3032                }
3033                start = end;
3034                idx += 1;
3035            }
3036        }
3037    }
3038
3039    let mut metadata = DocAudioMetadata::default();
3040    if duration_secs > 0.0 {
3041        metadata.duration_secs = Some(duration_secs as f32);
3042    }
3043    if let Some(rate) = sample_rate {
3044        metadata.sample_rate_hz = Some(rate);
3045    }
3046    if let Some(channels) = channel_count {
3047        metadata.channels = Some(channels);
3048    }
3049    if let Some(bitrate) = bitrate_kbps {
3050        metadata.bitrate_kbps = Some(bitrate);
3051    }
3052    if codec_params.codec != CODEC_TYPE_NULL {
3053        metadata.codec = Some(format!("{:?}", codec_params.codec));
3054    }
3055    if !segments.is_empty() {
3056        metadata.segments = segments;
3057    }
3058    if !tags.is_empty() {
3059        metadata.tags = tags
3060            .iter()
3061            .map(|(k, v)| (k.clone(), v.clone()))
3062            .collect::<BTreeMap<_, _>>();
3063    }
3064
3065    // Transcribe audio if requested
3066    let transcript = if options.transcribe {
3067        transcribe_audio(source_path)
3068    } else {
3069        None
3070    };
3071
3072    Some(AudioAnalysis {
3073        metadata,
3074        caption,
3075        search_terms,
3076        transcript,
3077    })
3078}
3079
3080/// Transcribe audio file using Whisper
3081#[cfg(feature = "whisper")]
3082fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3083    use memvid_core::{WhisperConfig, WhisperTranscriber};
3084
3085    let path = source_path?;
3086
3087    tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3088
3089    let config = WhisperConfig::default();
3090    let mut transcriber = match WhisperTranscriber::new(&config) {
3091        Ok(t) => t,
3092        Err(e) => {
3093            tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3094            return None;
3095        }
3096    };
3097
3098    match transcriber.transcribe_file(path) {
3099        Ok(result) => {
3100            tracing::info!(
3101                duration = result.duration_secs,
3102                text_len = result.text.len(),
3103                "Audio transcription complete"
3104            );
3105            Some(result.text)
3106        }
3107        Err(e) => {
3108            tracing::warn!(error = %e, "Failed to transcribe audio");
3109            None
3110        }
3111    }
3112}
3113
3114#[cfg(not(feature = "whisper"))]
3115fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3116    tracing::warn!("Whisper feature not enabled. Rebuild with --features whisper to enable transcription.");
3117    None
3118}
3119
3120fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3121    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3122
3123    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3124        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3125            out.entry("title".into())
3126                .or_insert_with(|| value.to_string());
3127            out.entry("tracktitle".into())
3128                .or_insert_with(|| value.to_string());
3129        }
3130        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3131            out.entry("artist".into())
3132                .or_insert_with(|| value.to_string());
3133        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3134            out.entry("artist".into())
3135                .or_insert_with(|| value.to_string());
3136        }
3137        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3138            out.entry("album".into())
3139                .or_insert_with(|| value.to_string());
3140        }
3141        if let Some(value) = tag.get_string(&ItemKey::Genre) {
3142            out.entry("genre".into())
3143                .or_insert_with(|| value.to_string());
3144        }
3145        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3146            out.entry("track_number".into())
3147                .or_insert_with(|| value.to_string());
3148        }
3149        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3150            out.entry("disc_number".into())
3151                .or_insert_with(|| value.to_string());
3152        }
3153    }
3154
3155    let mut tags = HashMap::new();
3156    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3157        Ok(probe) => probe,
3158        Err(err) => {
3159            warn!("failed to guess audio tag file type: {err}");
3160            return tags;
3161        }
3162    };
3163
3164    let tagged_file = match probe.read() {
3165        Ok(file) => file,
3166        Err(err) => {
3167            warn!("failed to read audio tags: {err}");
3168            return tags;
3169        }
3170    };
3171
3172    if let Some(primary) = tagged_file.primary_tag() {
3173        collect_tag(primary, &mut tags);
3174    }
3175    for tag in tagged_file.tags() {
3176        collect_tag(tag, &mut tags);
3177    }
3178
3179    tags
3180}
3181
3182fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3183    let mut cursor = Cursor::new(payload);
3184    let exif = match ExifReader::new().read_from_container(&mut cursor) {
3185        Ok(exif) => exif,
3186        Err(err) => {
3187            warn!("failed to read EXIF metadata: {err}");
3188            return (None, None);
3189        }
3190    };
3191
3192    let mut doc = DocExifMetadata::default();
3193    let mut has_data = false;
3194
3195    if let Some(make) = exif_string(&exif, Tag::Make) {
3196        doc.make = Some(make);
3197        has_data = true;
3198    }
3199    if let Some(model) = exif_string(&exif, Tag::Model) {
3200        doc.model = Some(model);
3201        has_data = true;
3202    }
3203    if let Some(lens) =
3204        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3205    {
3206        doc.lens = Some(lens);
3207        has_data = true;
3208    }
3209    if let Some(dt) =
3210        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3211    {
3212        doc.datetime = Some(dt);
3213        has_data = true;
3214    }
3215
3216    if let Some(gps) = extract_gps_metadata(&exif) {
3217        doc.gps = Some(gps);
3218        has_data = true;
3219    }
3220
3221    let caption = exif_string(&exif, Tag::ImageDescription);
3222
3223    let metadata = if has_data { Some(doc) } else { None };
3224    (metadata, caption)
3225}
3226
3227fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3228    exif.fields()
3229        .find(|field| field.tag == tag)
3230        .and_then(|field| field_to_string(field, exif))
3231}
3232
3233fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3234    let value = field.display_value().with_unit(exif).to_string();
3235    let trimmed = value.trim_matches('\0').trim();
3236    if trimmed.is_empty() {
3237        None
3238    } else {
3239        Some(trimmed.to_string())
3240    }
3241}
3242
3243fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3244    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3245    let latitude_ref_field = exif
3246        .fields()
3247        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3248    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3249    let longitude_ref_field = exif
3250        .fields()
3251        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3252
3253    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3254    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3255
3256    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3257        if reference.eq_ignore_ascii_case("S") {
3258            latitude = -latitude;
3259        }
3260    }
3261    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3262        if reference.eq_ignore_ascii_case("W") {
3263            longitude = -longitude;
3264        }
3265    }
3266
3267    Some(DocGpsMetadata {
3268        latitude,
3269        longitude,
3270    })
3271}
3272
3273fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3274    match value {
3275        ExifValue::Rational(values) if !values.is_empty() => {
3276            let deg = rational_to_f64_u(values.get(0)?)?;
3277            let min = values
3278                .get(1)
3279                .and_then(|v| rational_to_f64_u(v))
3280                .unwrap_or(0.0);
3281            let sec = values
3282                .get(2)
3283                .and_then(|v| rational_to_f64_u(v))
3284                .unwrap_or(0.0);
3285            Some(deg + (min / 60.0) + (sec / 3600.0))
3286        }
3287        ExifValue::SRational(values) if !values.is_empty() => {
3288            let deg = rational_to_f64_i(values.get(0)?)?;
3289            let min = values
3290                .get(1)
3291                .and_then(|v| rational_to_f64_i(v))
3292                .unwrap_or(0.0);
3293            let sec = values
3294                .get(2)
3295                .and_then(|v| rational_to_f64_i(v))
3296                .unwrap_or(0.0);
3297            Some(deg + (min / 60.0) + (sec / 3600.0))
3298        }
3299        _ => None,
3300    }
3301}
3302
3303fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3304    if value.denom == 0 {
3305        None
3306    } else {
3307        Some(value.num as f64 / value.denom as f64)
3308    }
3309}
3310
3311fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3312    if value.denom == 0 {
3313        None
3314    } else {
3315        Some(value.num as f64 / value.denom as f64)
3316    }
3317}
3318
3319fn value_to_ascii(value: &ExifValue) -> Option<String> {
3320    if let ExifValue::Ascii(values) = value {
3321        values
3322            .first()
3323            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3324            .map(|s| s.trim_matches('\0').trim().to_string())
3325            .filter(|s| !s.is_empty())
3326    } else {
3327        None
3328    }
3329}
3330
3331fn ingest_payload(
3332    mem: &mut Memvid,
3333    args: &PutArgs,
3334    payload: Vec<u8>,
3335    source_path: Option<&Path>,
3336    capacity_guard: Option<&mut CapacityGuard>,
3337    enable_embedding: bool,
3338    runtime: Option<&EmbeddingRuntime>,
3339    parallel_mode: bool,
3340) -> Result<IngestOutcome> {
3341    let original_bytes = payload.len();
3342    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3343
3344    let payload_text = std::str::from_utf8(&payload).ok();
3345    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3346
3347    let audio_opts = AudioAnalyzeOptions {
3348        force: args.audio || args.transcribe,
3349        segment_secs: args
3350            .audio_segment_seconds
3351            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3352        transcribe: args.transcribe,
3353    };
3354
3355    let mut analysis = analyze_file(
3356        source_path,
3357        &payload,
3358        payload_text,
3359        inferred_title.as_deref(),
3360        audio_opts,
3361        args.video,
3362    );
3363
3364    if args.video && !analysis.mime.starts_with("video/") {
3365        anyhow::bail!(
3366            "--video requires a video input; detected MIME type {}",
3367            analysis.mime
3368        );
3369    }
3370
3371    let mut search_text = analysis.search_text.clone();
3372    if let Some(ref mut text) = search_text {
3373        if text.len() > MAX_SEARCH_TEXT_LEN {
3374            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3375            *text = truncated;
3376        }
3377    }
3378    analysis.search_text = search_text.clone();
3379
3380    let uri = if let Some(ref explicit) = args.uri {
3381        derive_uri(Some(explicit), None)
3382    } else if args.video {
3383        derive_video_uri(&payload, source_path, &analysis.mime)
3384    } else {
3385        derive_uri(None, source_path)
3386    };
3387
3388    let mut options_builder = PutOptions::builder()
3389        .enable_embedding(enable_embedding)
3390        .auto_tag(!args.no_auto_tag)
3391        .extract_dates(!args.no_extract_dates);
3392    if let Some(ts) = args.timestamp {
3393        options_builder = options_builder.timestamp(ts);
3394    }
3395    if let Some(track) = &args.track {
3396        options_builder = options_builder.track(track.clone());
3397    }
3398    if args.video {
3399        options_builder = options_builder.kind("video");
3400        options_builder = options_builder.tag("kind", "video");
3401        options_builder = options_builder.push_tag("video");
3402    } else if let Some(kind) = &args.kind {
3403        options_builder = options_builder.kind(kind.clone());
3404    }
3405    options_builder = options_builder.uri(uri.clone());
3406    if let Some(ref title) = inferred_title {
3407        options_builder = options_builder.title(title.clone());
3408    }
3409    for (key, value) in &args.tags {
3410        options_builder = options_builder.tag(key.clone(), value.clone());
3411        if !key.is_empty() {
3412            options_builder = options_builder.push_tag(key.clone());
3413        }
3414        if !value.is_empty() && value != key {
3415            options_builder = options_builder.push_tag(value.clone());
3416        }
3417    }
3418    for label in &args.labels {
3419        options_builder = options_builder.label(label.clone());
3420    }
3421    if let Some(metadata) = analysis.metadata.clone() {
3422        if !metadata.is_empty() {
3423            options_builder = options_builder.metadata(metadata);
3424        }
3425    }
3426    for (idx, entry) in args.metadata.iter().enumerate() {
3427        match serde_json::from_str::<DocMetadata>(entry) {
3428            Ok(meta) => {
3429                options_builder = options_builder.metadata(meta);
3430            }
3431            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3432                Ok(value) => {
3433                    options_builder =
3434                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3435                }
3436                Err(err) => {
3437                    warn!("failed to parse --metadata JSON: {err}");
3438                }
3439            },
3440        }
3441    }
3442    if let Some(text) = search_text.clone() {
3443        if !text.trim().is_empty() {
3444            options_builder = options_builder.search_text(text);
3445        }
3446    }
3447    let mut options = options_builder.build();
3448
3449    let existing_frame = if args.update_existing || !args.allow_duplicate {
3450        match mem.frame_by_uri(&uri) {
3451            Ok(frame) => Some(frame),
3452            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3453            Err(err) => return Err(err.into()),
3454        }
3455    } else {
3456        None
3457    };
3458
3459    // Compute frame_id: for updates it's the existing frame's id,
3460    // for new frames it's the current frame count (which becomes the next id)
3461    let frame_id = if let Some(ref existing) = existing_frame {
3462        existing.id
3463    } else {
3464        mem.stats()?.frame_count
3465    };
3466
3467    let mut embedded = false;
3468    let allow_embedding = enable_embedding && !args.video;
3469    if enable_embedding && !allow_embedding && args.video {
3470        warn!("semantic embeddings are not generated for video payloads");
3471    }
3472    let seq = if let Some(existing) = existing_frame {
3473        if args.update_existing {
3474            let payload_for_update = payload.clone();
3475            if allow_embedding {
3476                if let Some(path) = args.embedding_vec.as_deref() {
3477                    let embedding = crate::utils::read_embedding(path)?;
3478                    if let Some(model_str) = args.embedding_vec_model.as_deref() {
3479                        let choice = model_str.parse::<EmbeddingModelChoice>()?;
3480                        let expected = choice.dimensions();
3481                        if expected != 0 && embedding.len() != expected {
3482                            anyhow::bail!(
3483                                "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3484                                embedding.len(),
3485                                choice.name(),
3486                                expected
3487                            );
3488                        }
3489                        apply_embedding_identity_metadata_from_choice(
3490                            &mut options,
3491                            choice,
3492                            embedding.len(),
3493                            Some(model_str),
3494                        );
3495                    } else {
3496                        options.extra_metadata.insert(
3497                            MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3498                            embedding.len().to_string(),
3499                        );
3500                    }
3501                    embedded = true;
3502                    mem.update_frame(
3503                        existing.id,
3504                        Some(payload_for_update),
3505                        options.clone(),
3506                        Some(embedding),
3507                    )
3508                    .map_err(|err| {
3509                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3510                    })?
3511                } else {
3512                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3513                    let embed_text = search_text
3514                        .clone()
3515                        .or_else(|| payload_text.map(|text| text.to_string()))
3516                        .unwrap_or_default();
3517                    if embed_text.trim().is_empty() {
3518                        warn!("no textual content available; embedding skipped");
3519                        mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3520                            .map_err(|err| {
3521                                map_put_error(
3522                                    err,
3523                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3524                                )
3525                            })?
3526                    } else {
3527                        // Truncate to avoid token limits
3528                        let truncated_text = truncate_for_embedding(&embed_text);
3529                        if truncated_text.len() < embed_text.len() {
3530                            info!(
3531                                "Truncated text from {} to {} chars for embedding",
3532                                embed_text.len(),
3533                                truncated_text.len()
3534                            );
3535                        }
3536                        let embedding = runtime.embed_passage(&truncated_text)?;
3537                        embedded = true;
3538                        apply_embedding_identity_metadata(&mut options, runtime);
3539                        mem.update_frame(
3540                            existing.id,
3541                            Some(payload_for_update),
3542                            options.clone(),
3543                            Some(embedding),
3544                        )
3545                        .map_err(|err| {
3546                            map_put_error(
3547                                err,
3548                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3549                            )
3550                        })?
3551                    }
3552                }
3553            } else {
3554                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3555                    .map_err(|err| {
3556                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3557                    })?
3558            }
3559        } else {
3560            return Err(DuplicateUriError::new(uri.as_str()).into());
3561        }
3562    } else {
3563        if let Some(guard) = capacity_guard.as_ref() {
3564            guard.ensure_capacity(stored_bytes as u64)?;
3565        }
3566        if parallel_mode {
3567            #[cfg(feature = "parallel_segments")]
3568            {
3569                let mut parent_embedding = None;
3570                let mut chunk_embeddings_vec = None;
3571                if allow_embedding {
3572                    if let Some(path) = args.embedding_vec.as_deref() {
3573                        let embedding = crate::utils::read_embedding(path)?;
3574                        if let Some(model_str) = args.embedding_vec_model.as_deref() {
3575                            let choice = model_str.parse::<EmbeddingModelChoice>()?;
3576                            let expected = choice.dimensions();
3577                            if expected != 0 && embedding.len() != expected {
3578                                anyhow::bail!(
3579                                    "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3580                                    embedding.len(),
3581                                    choice.name(),
3582                                    expected
3583                                );
3584                            }
3585                            apply_embedding_identity_metadata_from_choice(
3586                                &mut options,
3587                                choice,
3588                                embedding.len(),
3589                                Some(model_str),
3590                            );
3591                        } else {
3592                            options.extra_metadata.insert(
3593                                MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3594                                embedding.len().to_string(),
3595                            );
3596                        }
3597                        parent_embedding = Some(embedding);
3598                        embedded = true;
3599                    } else {
3600                        let runtime =
3601                            runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3602                        let embed_text = search_text
3603                            .clone()
3604                            .or_else(|| payload_text.map(|text| text.to_string()))
3605                            .unwrap_or_default();
3606                        if embed_text.trim().is_empty() {
3607                            warn!("no textual content available; embedding skipped");
3608                        } else {
3609                            // Check if document will be chunked - if so, embed each chunk
3610                            info!(
3611                                "parallel mode: checking for chunks on {} bytes payload",
3612                                payload.len()
3613                            );
3614                            if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3615                                // Document will be chunked - embed each chunk for full semantic coverage
3616                                info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
3617
3618                            // Apply temporal enrichment if enabled (before contextual)
3619                            #[cfg(feature = "temporal_enrich")]
3620                            let enriched_chunks = if args.temporal_enrich {
3621                                info!(
3622                                    "Temporal enrichment enabled, processing {} chunks",
3623                                    chunk_texts.len()
3624                                );
3625                                // Use today's date as fallback document date
3626                                let today = chrono::Local::now().date_naive();
3627                                let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3628                                // Results are tuples of (enriched_text, TemporalEnrichment)
3629                                let enriched: Vec<String> =
3630                                    results.iter().map(|(text, _)| text.clone()).collect();
3631                                let resolved_count = results
3632                                    .iter()
3633                                    .filter(|(_, e)| {
3634                                        e.relative_phrases.iter().any(|p| p.resolved.is_some())
3635                                    })
3636                                    .count();
3637                                info!(
3638                                    "Temporal enrichment: resolved {} chunks with temporal context",
3639                                    resolved_count
3640                                );
3641                                enriched
3642                            } else {
3643                                chunk_texts.clone()
3644                            };
3645                            #[cfg(not(feature = "temporal_enrich"))]
3646                            let enriched_chunks = {
3647                                if args.temporal_enrich {
3648                                    warn!(
3649                                        "Temporal enrichment requested but feature not compiled in"
3650                                    );
3651                                }
3652                                chunk_texts.clone()
3653                            };
3654
3655                            // Apply contextual retrieval if enabled
3656                            let embed_chunks = if args.contextual {
3657                                info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3658                                let engine = if args.contextual_model.as_deref() == Some("local") {
3659                                    #[cfg(feature = "llama-cpp")]
3660                                    {
3661                                        let model_path = get_local_contextual_model_path()?;
3662                                        ContextualEngine::local(model_path)
3663                                    }
3664                                    #[cfg(not(feature = "llama-cpp"))]
3665                                    {
3666                                        anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3667                                    }
3668                                } else if let Some(model) = &args.contextual_model {
3669                                    ContextualEngine::openai_with_model(model)?
3670                                } else {
3671                                    ContextualEngine::openai()?
3672                                };
3673
3674                                match engine.generate_contexts_batch(&embed_text, &enriched_chunks)
3675                                {
3676                                    Ok(contexts) => {
3677                                        info!("Generated {} contextual prefixes", contexts.len());
3678                                        apply_contextual_prefixes(
3679                                            &embed_text,
3680                                            &enriched_chunks,
3681                                            &contexts,
3682                                        )
3683                                    }
3684                                    Err(e) => {
3685                                        warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3686                                        enriched_chunks.clone()
3687                                    }
3688                                }
3689                            } else {
3690                                enriched_chunks
3691                            };
3692
3693                            let truncated_chunks: Vec<String> = embed_chunks
3694                                .iter()
3695                                .map(|chunk_text| {
3696                                    // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
3697                                    let truncated_chunk = truncate_for_embedding(chunk_text);
3698                                    if truncated_chunk.len() < chunk_text.len() {
3699                                        info!(
3700                                            "parallel mode: Truncated chunk from {} to {} chars for embedding",
3701                                            chunk_text.len(),
3702                                            truncated_chunk.len()
3703                                        );
3704                                    }
3705                                    truncated_chunk
3706                                })
3707                                .collect();
3708                            let truncated_refs: Vec<&str> =
3709                                truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3710                            let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3711                            let num_chunks = chunk_embeddings.len();
3712                            chunk_embeddings_vec = Some(chunk_embeddings);
3713                            // Also embed the parent for the parent frame (truncate to avoid token limits)
3714                            let parent_text = truncate_for_embedding(&embed_text);
3715                            if parent_text.len() < embed_text.len() {
3716                                info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
3717                            }
3718                            parent_embedding = Some(runtime.embed_passage(&parent_text)?);
3719                            embedded = true;
3720                            info!(
3721                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
3722                                num_chunks
3723                            );
3724                            } else {
3725                                // No chunking - just embed the whole document (truncate to avoid token limits)
3726                                info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
3727                                let truncated_text = truncate_for_embedding(&embed_text);
3728                                if truncated_text.len() < embed_text.len() {
3729                                    info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
3730                                }
3731                                parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
3732                                embedded = true;
3733                            }
3734                        }
3735                    }
3736                }
3737                if embedded {
3738                    if let Some(runtime) = runtime {
3739                        apply_embedding_identity_metadata(&mut options, runtime);
3740                    }
3741                }
3742                let payload_variant = if let Some(path) = source_path {
3743                    ParallelPayload::Path(path.to_path_buf())
3744                } else {
3745                    ParallelPayload::Bytes(payload)
3746                };
3747                let input = ParallelInput {
3748                    payload: payload_variant,
3749                    options: options.clone(),
3750                    embedding: parent_embedding,
3751                    chunk_embeddings: chunk_embeddings_vec,
3752                };
3753                return Ok(IngestOutcome {
3754                    report: PutReport {
3755                        seq: 0,
3756                        frame_id: 0, // Will be populated after parallel commit
3757                        uri,
3758                        title: inferred_title,
3759                        original_bytes,
3760                        stored_bytes,
3761                        compressed,
3762                        source: source_path.map(Path::to_path_buf),
3763                        mime: Some(analysis.mime),
3764                        metadata: analysis.metadata,
3765                        extraction: analysis.extraction,
3766                        #[cfg(feature = "logic_mesh")]
3767                        search_text: search_text.clone(),
3768                    },
3769                    embedded,
3770                    parallel_input: Some(input),
3771                });
3772            }
3773            #[cfg(not(feature = "parallel_segments"))]
3774            {
3775                mem.put_bytes_with_options(&payload, options)
3776                    .map_err(|err| {
3777                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3778                    })?
3779            }
3780        } else if allow_embedding {
3781            if let Some(path) = args.embedding_vec.as_deref() {
3782                let embedding = crate::utils::read_embedding(path)?;
3783                if let Some(model_str) = args.embedding_vec_model.as_deref() {
3784                    let choice = model_str.parse::<EmbeddingModelChoice>()?;
3785                    let expected = choice.dimensions();
3786                    if expected != 0 && embedding.len() != expected {
3787                        anyhow::bail!(
3788                            "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3789                            embedding.len(),
3790                            choice.name(),
3791                            expected
3792                        );
3793                    }
3794                    apply_embedding_identity_metadata_from_choice(
3795                        &mut options,
3796                        choice,
3797                        embedding.len(),
3798                        Some(model_str),
3799                    );
3800                } else {
3801                    options.extra_metadata.insert(
3802                        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3803                        embedding.len().to_string(),
3804                    );
3805                }
3806                embedded = true;
3807                mem.put_with_embedding_and_options(&payload, embedding, options)
3808                    .map_err(|err| {
3809                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3810                    })?
3811            } else {
3812                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3813                let embed_text = search_text
3814                    .clone()
3815                    .or_else(|| payload_text.map(|text| text.to_string()))
3816                    .unwrap_or_default();
3817                if embed_text.trim().is_empty() {
3818                    warn!("no textual content available; embedding skipped");
3819                    mem.put_bytes_with_options(&payload, options)
3820                        .map_err(|err| {
3821                            map_put_error(
3822                                err,
3823                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3824                            )
3825                        })?
3826                } else {
3827                    // Check if document will be chunked - if so, embed each chunk
3828                    if let Some(chunk_texts) = mem.preview_chunks(&payload) {
3829                    // Document will be chunked - embed each chunk for full semantic coverage
3830                    info!(
3831                        "Document will be chunked into {} chunks, generating embeddings for each",
3832                        chunk_texts.len()
3833                    );
3834
3835                    // Apply temporal enrichment if enabled (before contextual)
3836                    #[cfg(feature = "temporal_enrich")]
3837                    let enriched_chunks = if args.temporal_enrich {
3838                        info!(
3839                            "Temporal enrichment enabled, processing {} chunks",
3840                            chunk_texts.len()
3841                        );
3842                        // Use today's date as fallback document date
3843                        let today = chrono::Local::now().date_naive();
3844                        let results = temporal_enrich_chunks(&chunk_texts, Some(today));
3845                        // Results are tuples of (enriched_text, TemporalEnrichment)
3846                        let enriched: Vec<String> =
3847                            results.iter().map(|(text, _)| text.clone()).collect();
3848                        let resolved_count = results
3849                            .iter()
3850                            .filter(|(_, e)| {
3851                                e.relative_phrases.iter().any(|p| p.resolved.is_some())
3852                            })
3853                            .count();
3854                        info!(
3855                            "Temporal enrichment: resolved {} chunks with temporal context",
3856                            resolved_count
3857                        );
3858                        enriched
3859                    } else {
3860                        chunk_texts.clone()
3861                    };
3862                    #[cfg(not(feature = "temporal_enrich"))]
3863                    let enriched_chunks = {
3864                        if args.temporal_enrich {
3865                            warn!("Temporal enrichment requested but feature not compiled in");
3866                        }
3867                        chunk_texts.clone()
3868                    };
3869
3870                    // Apply contextual retrieval if enabled
3871                    let embed_chunks = if args.contextual {
3872                        info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
3873                        let engine = if args.contextual_model.as_deref() == Some("local") {
3874                            #[cfg(feature = "llama-cpp")]
3875                            {
3876                                let model_path = get_local_contextual_model_path()?;
3877                                ContextualEngine::local(model_path)
3878                            }
3879                            #[cfg(not(feature = "llama-cpp"))]
3880                            {
3881                                anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
3882                            }
3883                        } else if let Some(model) = &args.contextual_model {
3884                            ContextualEngine::openai_with_model(model)?
3885                        } else {
3886                            ContextualEngine::openai()?
3887                        };
3888
3889                        match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
3890                            Ok(contexts) => {
3891                                info!("Generated {} contextual prefixes", contexts.len());
3892                                apply_contextual_prefixes(&embed_text, &enriched_chunks, &contexts)
3893                            }
3894                            Err(e) => {
3895                                warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
3896                                enriched_chunks.clone()
3897                            }
3898                        }
3899                    } else {
3900                        enriched_chunks
3901                    };
3902
3903                    let truncated_chunks: Vec<String> = embed_chunks
3904                        .iter()
3905                        .map(|chunk_text| {
3906                            // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
3907                            let truncated_chunk = truncate_for_embedding(chunk_text);
3908                            if truncated_chunk.len() < chunk_text.len() {
3909                                info!(
3910                                    "Truncated chunk from {} to {} chars for embedding",
3911                                    chunk_text.len(),
3912                                    truncated_chunk.len()
3913                                );
3914                            }
3915                            truncated_chunk
3916                        })
3917                        .collect();
3918                    let truncated_refs: Vec<&str> =
3919                        truncated_chunks.iter().map(|chunk| chunk.as_str()).collect();
3920                    let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
3921                    // Truncate parent text to avoid token limits
3922                    let parent_text = truncate_for_embedding(&embed_text);
3923                    if parent_text.len() < embed_text.len() {
3924                        info!(
3925                            "Truncated parent text from {} to {} chars for embedding",
3926                            embed_text.len(),
3927                            parent_text.len()
3928                        );
3929                    }
3930                    let parent_embedding = runtime.embed_passage(&parent_text)?;
3931                    embedded = true;
3932                    apply_embedding_identity_metadata(&mut options, runtime);
3933                    info!(
3934                        "Calling put_with_chunk_embeddings with {} chunk embeddings",
3935                        chunk_embeddings.len()
3936                    );
3937                    mem.put_with_chunk_embeddings(
3938                        &payload,
3939                        Some(parent_embedding),
3940                        chunk_embeddings,
3941                        options,
3942                    )
3943                    .map_err(|err| {
3944                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3945                    })?
3946                    } else {
3947                        // No chunking - just embed the whole document (truncate to avoid token limits)
3948                        info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
3949                        let truncated_text = truncate_for_embedding(&embed_text);
3950                        if truncated_text.len() < embed_text.len() {
3951                            info!(
3952                                "Truncated text from {} to {} chars for embedding",
3953                                embed_text.len(),
3954                                truncated_text.len()
3955                            );
3956                        }
3957                        let embedding = runtime.embed_passage(&truncated_text)?;
3958                        embedded = true;
3959                        apply_embedding_identity_metadata(&mut options, runtime);
3960                        mem.put_with_embedding_and_options(&payload, embedding, options)
3961                            .map_err(|err| {
3962                                map_put_error(
3963                                    err,
3964                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3965                                )
3966                            })?
3967                    }
3968                }
3969            }
3970        } else {
3971            mem.put_bytes_with_options(&payload, options)
3972                .map_err(|err| {
3973                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3974                })?
3975        }
3976    };
3977
3978    Ok(IngestOutcome {
3979        report: PutReport {
3980            seq,
3981            frame_id,
3982            uri,
3983            title: inferred_title,
3984            original_bytes,
3985            stored_bytes,
3986            compressed,
3987            source: source_path.map(Path::to_path_buf),
3988            mime: Some(analysis.mime),
3989            metadata: analysis.metadata,
3990            extraction: analysis.extraction,
3991            #[cfg(feature = "logic_mesh")]
3992            search_text,
3993        },
3994        embedded,
3995        #[cfg(feature = "parallel_segments")]
3996        parallel_input: None,
3997    })
3998}
3999
4000fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4001    if std::str::from_utf8(payload).is_ok() {
4002        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4003        Ok((compressed.len(), true))
4004    } else {
4005        Ok((payload.len(), false))
4006    }
4007}
4008
4009fn print_report(report: &PutReport) {
4010    let name = report
4011        .source
4012        .as_ref()
4013        .map(|path| {
4014            let pretty = env::current_dir()
4015                .ok()
4016                .as_ref()
4017                .and_then(|cwd| diff_paths(path, cwd))
4018                .unwrap_or_else(|| path.to_path_buf());
4019            pretty.to_string_lossy().into_owned()
4020        })
4021        .unwrap_or_else(|| "stdin".to_string());
4022
4023    let ratio = if report.original_bytes > 0 {
4024        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4025    } else {
4026        100.0
4027    };
4028
4029    println!("• {name} → {}", report.uri);
4030    println!("  seq: {}", report.seq);
4031    if report.compressed {
4032        println!(
4033            "  size: {} B → {} B ({:.1}% of original, compressed)",
4034            report.original_bytes, report.stored_bytes, ratio
4035        );
4036    } else {
4037        println!("  size: {} B (stored as-is)", report.original_bytes);
4038    }
4039    if let Some(mime) = &report.mime {
4040        println!("  mime: {mime}");
4041    }
4042    if let Some(title) = &report.title {
4043        println!("  title: {title}");
4044    }
4045    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4046    println!(
4047        "  extraction: status={} reader={}",
4048        report.extraction.status.label(),
4049        extraction_reader
4050    );
4051    if let Some(ms) = report.extraction.duration_ms {
4052        println!("  extraction-duration: {} ms", ms);
4053    }
4054    if let Some(pages) = report.extraction.pages_processed {
4055        println!("  extraction-pages: {}", pages);
4056    }
4057    for warning in &report.extraction.warnings {
4058        println!("  warning: {warning}");
4059    }
4060    if let Some(metadata) = &report.metadata {
4061        if let Some(caption) = &metadata.caption {
4062            println!("  caption: {caption}");
4063        }
4064        if let Some(audio) = metadata.audio.as_ref() {
4065            if audio.duration_secs.is_some()
4066                || audio.sample_rate_hz.is_some()
4067                || audio.channels.is_some()
4068            {
4069                let duration = audio
4070                    .duration_secs
4071                    .map(|secs| format!("{secs:.1}s"))
4072                    .unwrap_or_else(|| "unknown".into());
4073                let rate = audio
4074                    .sample_rate_hz
4075                    .map(|hz| format!("{} Hz", hz))
4076                    .unwrap_or_else(|| "? Hz".into());
4077                let channels = audio
4078                    .channels
4079                    .map(|ch| ch.to_string())
4080                    .unwrap_or_else(|| "?".into());
4081                println!("  audio: duration {duration}, {channels} ch, {rate}");
4082            }
4083        }
4084    }
4085
4086    println!();
4087}
4088
4089fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4090    let source = report
4091        .source
4092        .as_ref()
4093        .map(|path| path.to_string_lossy().into_owned());
4094    let extraction = json!({
4095        "status": report.extraction.status.label(),
4096        "reader": report.extraction.reader,
4097        "duration_ms": report.extraction.duration_ms,
4098        "pages_processed": report.extraction.pages_processed,
4099        "warnings": report.extraction.warnings,
4100    });
4101    json!({
4102        "seq": report.seq,
4103        "uri": report.uri,
4104        "title": report.title,
4105        "original_bytes": report.original_bytes,
4106        "original_bytes_human": format_bytes(report.original_bytes as u64),
4107        "stored_bytes": report.stored_bytes,
4108        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4109        "compressed": report.compressed,
4110        "mime": report.mime,
4111        "source": source,
4112        "metadata": report.metadata,
4113        "extraction": extraction,
4114    })
4115}
4116
4117fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4118    match err {
4119        MemvidError::CapacityExceeded {
4120            current,
4121            limit,
4122            required,
4123        } => anyhow!(CapacityExceededMessage {
4124            current,
4125            limit,
4126            required,
4127        }),
4128        other => other.into(),
4129    }
4130}
4131
4132fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4133    for (idx, entry) in entries.iter().enumerate() {
4134        match serde_json::from_str::<DocMetadata>(entry) {
4135            Ok(meta) => {
4136                options.metadata = Some(meta);
4137            }
4138            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4139                Ok(value) => {
4140                    options
4141                        .extra_metadata
4142                        .insert(format!("custom_metadata_{idx}"), value.to_string());
4143                }
4144                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4145            },
4146        }
4147    }
4148}
4149
4150fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4151    let stats = mem.stats()?;
4152    let message = match stats.tier {
4153        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4154        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4155    };
4156    Ok(message)
4157}
4158
4159fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4160    let trimmed = raw.trim().trim_start_matches("mv2://");
4161    let normalized = trimmed.replace('\\', "/");
4162    let mut segments: Vec<String> = Vec::new();
4163    for segment in normalized.split('/') {
4164        if segment.is_empty() || segment == "." {
4165            continue;
4166        }
4167        if segment == ".." {
4168            segments.pop();
4169            continue;
4170        }
4171        segments.push(segment.to_string());
4172    }
4173
4174    if segments.is_empty() {
4175        return normalized
4176            .split('/')
4177            .last()
4178            .filter(|s| !s.is_empty())
4179            .unwrap_or("document")
4180            .to_string();
4181    }
4182
4183    if keep_path {
4184        segments.join("/")
4185    } else {
4186        segments
4187            .last()
4188            .cloned()
4189            .unwrap_or_else(|| "document".to_string())
4190    }
4191}
4192
4193fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4194    if quiet {
4195        let pb = ProgressBar::hidden();
4196        pb.set_message(message.to_string());
4197        return pb;
4198    }
4199
4200    match total {
4201        Some(len) => {
4202            let pb = ProgressBar::new(len);
4203            pb.set_draw_target(ProgressDrawTarget::stderr());
4204            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4205                .unwrap_or_else(|_| ProgressStyle::default_bar());
4206            pb.set_style(style);
4207            pb.set_message(message.to_string());
4208            pb
4209        }
4210        None => {
4211            let pb = ProgressBar::new_spinner();
4212            pb.set_draw_target(ProgressDrawTarget::stderr());
4213            pb.set_message(message.to_string());
4214            pb.enable_steady_tick(Duration::from_millis(120));
4215            pb
4216        }
4217    }
4218}
4219
4220fn create_spinner(message: &str) -> ProgressBar {
4221    let pb = ProgressBar::new_spinner();
4222    pb.set_draw_target(ProgressDrawTarget::stderr());
4223    let style = ProgressStyle::with_template("{spinner} {msg}")
4224        .unwrap_or_else(|_| ProgressStyle::default_spinner())
4225        .tick_strings(&["-", "\\", "|", "/"]);
4226    pb.set_style(style);
4227    pb.set_message(message.to_string());
4228    pb.enable_steady_tick(Duration::from_millis(120));
4229    pb
4230}
4231
4232// ============================================================================
4233// put-many command - batch ingestion with pre-computed embeddings
4234// ============================================================================
4235
4236/// Arguments for the `put-many` subcommand
4237#[cfg(feature = "parallel_segments")]
4238#[derive(Args)]
4239pub struct PutManyArgs {
4240    /// Path to the memory file to append to
4241    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4242    pub file: PathBuf,
4243    /// Emit JSON instead of human-readable output
4244    #[arg(long)]
4245    pub json: bool,
4246    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
4247    /// If not specified, reads from stdin
4248    #[arg(long, value_name = "PATH")]
4249    pub input: Option<PathBuf>,
4250    /// Zstd compression level (1-9, default: 3)
4251    #[arg(long, default_value = "3")]
4252    pub compression_level: i32,
4253    #[command(flatten)]
4254    pub lock: LockCliArgs,
4255}
4256
4257/// JSON input format for put-many requests
4258#[cfg(feature = "parallel_segments")]
4259#[derive(Debug, serde::Deserialize)]
4260pub struct PutManyRequest {
4261    pub title: String,
4262    #[serde(default)]
4263    pub label: String,
4264    pub text: String,
4265    #[serde(default)]
4266    pub uri: Option<String>,
4267    #[serde(default)]
4268    pub tags: Vec<String>,
4269    #[serde(default)]
4270    pub labels: Vec<String>,
4271    #[serde(default)]
4272    pub metadata: serde_json::Value,
4273    /// Extra metadata (string key/value) stored on the frame.
4274    /// Use this to persist embedding identity keys like `memvid.embedding.model`.
4275    #[serde(default)]
4276    pub extra_metadata: BTreeMap<String, String>,
4277    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
4278    #[serde(default)]
4279    pub embedding: Option<Vec<f32>>,
4280    /// Pre-computed embeddings for each chunk (matched by index)
4281    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
4282    /// The number of chunk embeddings should match the number of chunks that will be created.
4283    #[serde(default)]
4284    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4285}
4286
4287/// Batch input format (array of requests)
4288#[cfg(feature = "parallel_segments")]
4289#[derive(Debug, serde::Deserialize)]
4290#[serde(transparent)]
4291pub struct PutManyBatch {
4292    pub requests: Vec<PutManyRequest>,
4293}
4294
4295#[cfg(feature = "parallel_segments")]
4296pub fn handle_put_many(_config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4297    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4298    use std::io::Read;
4299
4300    let mut mem = Memvid::open(&args.file)?;
4301    crate::utils::ensure_cli_mutation_allowed(&mem)?;
4302    crate::utils::apply_lock_cli(&mut mem, &args.lock);
4303
4304    // Ensure indexes are enabled
4305    let stats = mem.stats()?;
4306    if !stats.has_lex_index {
4307        mem.enable_lex()?;
4308    }
4309    if !stats.has_vec_index {
4310        mem.enable_vec()?;
4311    }
4312
4313    // Read batch input
4314    let batch_json: String = if let Some(input_path) = &args.input {
4315        fs::read_to_string(input_path)
4316            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4317    } else {
4318        let mut buffer = String::new();
4319        io::stdin()
4320            .read_to_string(&mut buffer)
4321            .context("Failed to read from stdin")?;
4322        buffer
4323    };
4324
4325    let batch: PutManyBatch =
4326        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4327
4328    if batch.requests.is_empty() {
4329        if args.json {
4330            println!("{{\"frame_ids\": [], \"count\": 0}}");
4331        } else {
4332            println!("No requests to process");
4333        }
4334        return Ok(());
4335    }
4336
4337    let count = batch.requests.len();
4338    if !args.json {
4339        eprintln!("Processing {} documents...", count);
4340    }
4341
4342    // Convert to ParallelInput
4343    let inputs: Vec<ParallelInput> = batch
4344        .requests
4345        .into_iter()
4346        .enumerate()
4347        .map(|(i, req)| {
4348            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4349            let mut options = PutOptions::default();
4350            options.title = Some(req.title);
4351            options.uri = Some(uri);
4352            options.tags = req.tags;
4353            options.labels = req.labels;
4354            options.extra_metadata = req.extra_metadata;
4355            if !req.metadata.is_null() {
4356                match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4357                    Ok(meta) => options.metadata = Some(meta),
4358                    Err(_) => {
4359                        options
4360                            .extra_metadata
4361                            .entry("memvid.metadata.json".to_string())
4362                            .or_insert_with(|| req.metadata.to_string());
4363                    }
4364                }
4365            }
4366
4367            if let Some(embedding) = req
4368                .embedding
4369                .as_ref()
4370                .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4371            {
4372                options
4373                    .extra_metadata
4374                    .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4375                    .or_insert_with(|| embedding.len().to_string());
4376            }
4377
4378            ParallelInput {
4379                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4380                options,
4381                embedding: req.embedding,
4382                chunk_embeddings: req.chunk_embeddings,
4383            }
4384        })
4385        .collect();
4386
4387    // Build options
4388    let build_opts = BuildOpts {
4389        zstd_level: args.compression_level.clamp(1, 9),
4390        ..BuildOpts::default()
4391    };
4392
4393    // Ingest batch
4394    let frame_ids = mem
4395        .put_parallel_inputs(&inputs, build_opts)
4396        .context("Failed to ingest batch")?;
4397
4398    if args.json {
4399        let output = serde_json::json!({
4400            "frame_ids": frame_ids,
4401            "count": frame_ids.len()
4402        });
4403        println!("{}", serde_json::to_string(&output)?);
4404    } else {
4405        println!("Ingested {} documents", frame_ids.len());
4406        if frame_ids.len() <= 10 {
4407            for fid in &frame_ids {
4408                println!("  frame_id: {}", fid);
4409            }
4410        } else {
4411            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4412            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4413        }
4414    }
4415
4416    Ok(())
4417}