memvid_cli/commands/
data.rs

1//! Data operation command handlers (put, update, delete, api-fetch).
2//!
3//! Focused on ingesting files/bytes into `.mv2` memories, updating/deleting frames,
4//! and fetching remote content. Keeps ingestion flags and capacity checks consistent
5//! with the core engine while presenting concise CLI output.
6
7use std::collections::{BTreeMap, HashMap};
8use std::env;
9use std::fs;
10use std::io::{self, Cursor, Write};
11use std::num::NonZeroU64;
12use std::path::{Path, PathBuf};
13use std::sync::OnceLock;
14use std::time::Duration;
15
16use anyhow::{anyhow, Context, Result};
17use blake3::hash;
18use clap::{ArgAction, Args};
19use color_thief::{get_palette, Color, ColorFormat};
20use exif::{Reader as ExifReader, Tag, Value as ExifValue};
21use glob::glob;
22use image::ImageReader;
23use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
24use infer;
25#[cfg(feature = "clip")]
26use memvid_core::clip::render_pdf_pages_for_clip;
27#[cfg(feature = "clip")]
28use memvid_core::get_image_info;
29use memvid_core::table::{extract_tables, store_table, TableExtractionOptions};
30#[cfg(feature = "clip")]
31use memvid_core::FrameRole;
32use memvid_core::{
33    normalize_text, AudioSegmentMetadata, DocAudioMetadata, DocExifMetadata, DocGpsMetadata,
34    DocMetadata, DocumentFormat, EnrichmentEngine, MediaManifest, Memvid, MemvidError, PutOptions,
35    ReaderHint, ReaderRegistry, RulesEngine, Stats, Tier, TimelineQueryBuilder,
36    MEMVID_EMBEDDING_DIMENSION_KEY, MEMVID_EMBEDDING_MODEL_KEY, MEMVID_EMBEDDING_PROVIDER_KEY,
37};
38#[cfg(feature = "parallel_segments")]
39use memvid_core::{BuildOpts, ParallelInput, ParallelPayload};
40use pdf_extract::OutputError as PdfExtractError;
41use serde_json::json;
42use tracing::{info, warn};
43use tree_magic_mini::from_u8 as magic_from_u8;
44use uuid::Uuid;
45
46const MAX_SEARCH_TEXT_LEN: usize = 32_768;
47/// Maximum characters for embedding text to avoid exceeding OpenAI's 8192 token limit.
48/// Using ~4 chars/token estimate, 30K chars ≈ 7.5K tokens (safe margin).
49const MAX_EMBEDDING_TEXT_LEN: usize = 20_000;
50
51/// Parse a timestamp from either epoch seconds or human-readable date formats.
52/// Accepts:
53/// - Epoch seconds: "1673740800"
54/// - ISO format: "2023-01-15"
55/// - US format: "Jan 15, 2023" or "January 15, 2023"
56/// - Slash format: "01/15/2023" or "1/15/2023"
57fn parse_timestamp(s: &str) -> Result<i64, String> {
58    let s = s.trim();
59
60    // Try parsing as epoch timestamp first (pure digits, optionally negative)
61    if s.chars().all(|c| c.is_ascii_digit())
62        || (s.starts_with('-') && s[1..].chars().all(|c| c.is_ascii_digit()))
63    {
64        return s
65            .parse::<i64>()
66            .map_err(|e| format!("Invalid epoch timestamp: {e}"));
67    }
68
69    // Try various date formats
70    use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
71
72    // Parse date and return midnight UTC
73    fn date_to_epoch(date: NaiveDate) -> i64 {
74        let datetime = NaiveDateTime::new(date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
75        Utc.from_utc_datetime(&datetime).timestamp()
76    }
77
78    // Try ISO format: "2023-01-15"
79    if let Ok(date) = NaiveDate::parse_from_str(s, "%Y-%m-%d") {
80        return Ok(date_to_epoch(date));
81    }
82
83    // Try US formats: "Jan 15, 2023" or "January 15, 2023"
84    if let Ok(date) = NaiveDate::parse_from_str(s, "%b %d, %Y") {
85        return Ok(date_to_epoch(date));
86    }
87    if let Ok(date) = NaiveDate::parse_from_str(s, "%B %d, %Y") {
88        return Ok(date_to_epoch(date));
89    }
90
91    // Try slash format: "01/15/2023" or "1/15/2023"
92    if let Ok(date) = NaiveDate::parse_from_str(s, "%m/%d/%Y") {
93        return Ok(date_to_epoch(date));
94    }
95
96    // Try European format: "15-01-2023" or "15/01/2023"
97    if let Ok(date) = NaiveDate::parse_from_str(s, "%d-%m-%Y") {
98        return Ok(date_to_epoch(date));
99    }
100    if let Ok(date) = NaiveDate::parse_from_str(s, "%d/%m/%Y") {
101        return Ok(date_to_epoch(date));
102    }
103
104    Err(format!(
105        "Invalid timestamp format: '{}'. Use epoch seconds (1673740800) or date format (Jan 15, 2023 / 2023-01-15 / 01/15/2023)",
106        s
107    ))
108}
109const COLOR_PALETTE_SIZE: u8 = 5;
110const COLOR_PALETTE_QUALITY: u8 = 8;
111const MAGIC_SNIFF_BYTES: usize = 16;
112/// Maximum number of images to extract from a PDF for CLIP visual search.
113/// Set high to capture all images; processing stops when no more images found.
114#[cfg(feature = "clip")]
115const CLIP_PDF_MAX_IMAGES: usize = 100;
116#[cfg(feature = "clip")]
117const CLIP_PDF_TARGET_PX: u32 = 768;
118
119/// Truncate text for embedding to fit within OpenAI token limits.
120/// Returns a truncated string that fits within MAX_EMBEDDING_TEXT_LEN.
121fn truncate_for_embedding(text: &str) -> String {
122    if text.len() <= MAX_EMBEDDING_TEXT_LEN {
123        text.to_string()
124    } else {
125        // Find a char boundary to avoid splitting UTF-8
126        let truncated = &text[..MAX_EMBEDDING_TEXT_LEN];
127        // Try to find the last complete character
128        let end = truncated
129            .char_indices()
130            .rev()
131            .next()
132            .map(|(i, c)| i + c.len_utf8())
133            .unwrap_or(MAX_EMBEDDING_TEXT_LEN);
134        text[..end].to_string()
135    }
136}
137
138fn apply_embedding_identity_metadata(options: &mut PutOptions, runtime: &EmbeddingRuntime) {
139    options.extra_metadata.insert(
140        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
141        runtime.provider_kind().to_string(),
142    );
143    options.extra_metadata.insert(
144        MEMVID_EMBEDDING_MODEL_KEY.to_string(),
145        runtime.provider_model_id(),
146    );
147    options.extra_metadata.insert(
148        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
149        runtime.dimension().to_string(),
150    );
151}
152
153fn apply_embedding_identity_metadata_from_choice(
154    options: &mut PutOptions,
155    model: EmbeddingModelChoice,
156    dimension: usize,
157    model_id_override: Option<&str>,
158) {
159    let provider = match model {
160        EmbeddingModelChoice::OpenAILarge
161        | EmbeddingModelChoice::OpenAISmall
162        | EmbeddingModelChoice::OpenAIAda => "openai",
163        EmbeddingModelChoice::Nvidia => "nvidia",
164        _ => "fastembed",
165    };
166
167    let model_id = match model {
168        EmbeddingModelChoice::Nvidia => model_id_override
169            .map(str::trim)
170            .filter(|value| !value.is_empty())
171            .map(|value| {
172                value
173                    .trim_start_matches("nvidia:")
174                    .trim_start_matches("nv:")
175            })
176            .filter(|value| !value.is_empty())
177            .map(|value| {
178                if value.contains('/') {
179                    value.to_string()
180                } else {
181                    format!("nvidia/{value}")
182                }
183            })
184            .unwrap_or_else(|| model.canonical_model_id().to_string()),
185        _ => model.canonical_model_id().to_string(),
186    };
187
188    options.extra_metadata.insert(
189        MEMVID_EMBEDDING_PROVIDER_KEY.to_string(),
190        provider.to_string(),
191    );
192    options
193        .extra_metadata
194        .insert(MEMVID_EMBEDDING_MODEL_KEY.to_string(), model_id);
195    options.extra_metadata.insert(
196        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
197        dimension.to_string(),
198    );
199}
200
201/// Get the default local model path for contextual retrieval (phi-3.5-mini).
202/// Uses MEMVID_MODELS_DIR or defaults to ~/.memvid/models/llm/phi-3-mini-q4/Phi-3.5-mini-instruct-Q4_K_M.gguf
203#[cfg(feature = "llama-cpp")]
204fn get_local_contextual_model_path() -> Result<PathBuf> {
205    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
206        // Handle ~ expansion manually
207        let expanded = if custom.starts_with("~/") {
208            if let Ok(home) = env::var("HOME") {
209                PathBuf::from(home).join(&custom[2..])
210            } else {
211                PathBuf::from(&custom)
212            }
213        } else {
214            PathBuf::from(&custom)
215        };
216        expanded
217    } else {
218        // Default to ~/.memvid/models
219        let home = env::var("HOME").map_err(|_| anyhow!("HOME environment variable not set"))?;
220        PathBuf::from(home).join(".memvid").join("models")
221    };
222
223    let model_path = models_dir
224        .join("llm")
225        .join("phi-3-mini-q4")
226        .join("Phi-3.5-mini-instruct-Q4_K_M.gguf");
227
228    if model_path.exists() {
229        Ok(model_path)
230    } else {
231        Err(anyhow!(
232            "Local model not found at {}. Run 'memvid models install phi-3.5-mini' first.",
233            model_path.display()
234        ))
235    }
236}
237
238use pathdiff::diff_paths;
239
240use crate::api_fetch::{run_api_fetch, ApiFetchCommand, ApiFetchMode};
241use crate::commands::{extension_from_mime, frame_to_json, print_frame_summary};
242use crate::config::{
243    load_embedding_runtime_for_mv2, CliConfig, EmbeddingModelChoice, EmbeddingRuntime,
244};
245use crate::contextual::{apply_contextual_prefixes, ContextualEngine};
246use crate::error::{CapacityExceededMessage, DuplicateUriError};
247use crate::utils::{
248    apply_lock_cli, ensure_cli_mutation_allowed, format_bytes, frame_status_str, read_payload,
249    select_frame,
250};
251#[cfg(feature = "temporal_enrich")]
252use memvid_core::enrich_chunks as temporal_enrich_chunks;
253
254/// Helper function to parse boolean flags from environment variables
255fn parse_bool_flag(value: &str) -> Option<bool> {
256    match value.trim().to_ascii_lowercase().as_str() {
257        "1" | "true" | "yes" | "on" => Some(true),
258        "0" | "false" | "no" | "off" => Some(false),
259        _ => None,
260    }
261}
262
263/// Helper function to check for parallel segments environment override
264#[cfg(feature = "parallel_segments")]
265fn parallel_env_override() -> Option<bool> {
266    std::env::var("MEMVID_PARALLEL_SEGMENTS")
267        .ok()
268        .and_then(|value| parse_bool_flag(&value))
269}
270
271/// Check if CLIP model files are installed
272#[cfg(feature = "clip")]
273#[allow(dead_code)]
274fn is_clip_model_installed() -> bool {
275    let models_dir = if let Ok(custom) = env::var("MEMVID_MODELS_DIR") {
276        PathBuf::from(custom)
277    } else if let Ok(home) = env::var("HOME") {
278        PathBuf::from(home).join(".memvid/models")
279    } else {
280        PathBuf::from(".memvid/models")
281    };
282
283    let model_name = env::var("MEMVID_CLIP_MODEL").unwrap_or_else(|_| "mobileclip-s2".to_string());
284
285    let vision_model = models_dir.join(format!("{}_vision.onnx", model_name));
286    let text_model = models_dir.join(format!("{}_text.onnx", model_name));
287
288    vision_model.exists() && text_model.exists()
289}
290
291/// Minimum confidence threshold for NER entities (0.0-1.0)
292/// Note: Lower threshold captures more entities but also more noise
293#[cfg(feature = "logic_mesh")]
294const NER_MIN_CONFIDENCE: f32 = 0.50;
295
296/// Minimum length for entity names (characters)
297#[cfg(feature = "logic_mesh")]
298const NER_MIN_ENTITY_LEN: usize = 2;
299
300/// Maximum gap (in characters) between adjacent entities to merge
301/// Allows merging "S" + "&" + "P Global" when they're close together
302#[cfg(feature = "logic_mesh")]
303const MAX_MERGE_GAP: usize = 3;
304
305/// Merge adjacent NER entities that appear to be tokenization artifacts.
306///
307/// The BERT tokenizer splits names at special characters like `&`, producing
308/// separate entities like "S", "&", "P Global" instead of "S&P Global".
309/// This function merges adjacent entities of the same type when they're
310/// contiguous or nearly contiguous in the original text.
311#[cfg(feature = "logic_mesh")]
312fn merge_adjacent_entities(
313    entities: Vec<memvid_core::ExtractedEntity>,
314    original_text: &str,
315) -> Vec<memvid_core::ExtractedEntity> {
316    use memvid_core::ExtractedEntity;
317
318    if entities.is_empty() {
319        return entities;
320    }
321
322    // Sort by byte position
323    let mut sorted: Vec<ExtractedEntity> = entities;
324    sorted.sort_by_key(|e| e.byte_start);
325
326    let mut merged: Vec<ExtractedEntity> = Vec::new();
327
328    for entity in sorted {
329        if let Some(last) = merged.last_mut() {
330            // Check if this entity is adjacent to the previous one and same type
331            let gap = entity.byte_start.saturating_sub(last.byte_end);
332            let same_type = last.entity_type == entity.entity_type;
333
334            // Check what's in the gap (if any)
335            let gap_is_mergeable = if gap == 0 {
336                true
337            } else if gap <= MAX_MERGE_GAP
338                && entity.byte_start <= original_text.len()
339                && last.byte_end <= original_text.len()
340            {
341                // Gap content should be whitespace (but not newlines), punctuation, or connectors
342                let gap_text = &original_text[last.byte_end..entity.byte_start];
343                // Don't merge across newlines - that indicates separate entities
344                if gap_text.contains('\n') || gap_text.contains('\r') {
345                    false
346                } else {
347                    gap_text.chars().all(|c| {
348                        c == ' ' || c == '\t' || c == '&' || c == '-' || c == '\'' || c == '.'
349                    })
350                }
351            } else {
352                false
353            };
354
355            if same_type && gap_is_mergeable {
356                // Merge: extend the previous entity to include this one
357                let merged_text = if entity.byte_end <= original_text.len() {
358                    original_text[last.byte_start..entity.byte_end].to_string()
359                } else {
360                    format!("{}{}", last.text, entity.text)
361                };
362
363                tracing::debug!(
364                    "Merged entities: '{}' + '{}' -> '{}' (gap: {} chars)",
365                    last.text,
366                    entity.text,
367                    merged_text,
368                    gap
369                );
370
371                last.text = merged_text;
372                last.byte_end = entity.byte_end;
373                // Average the confidence
374                last.confidence = (last.confidence + entity.confidence) / 2.0;
375                continue;
376            }
377        }
378
379        merged.push(entity);
380    }
381
382    merged
383}
384
385/// Filter and clean extracted NER entities to remove noise.
386/// Returns true if the entity should be kept.
387#[cfg(feature = "logic_mesh")]
388fn is_valid_entity(text: &str, confidence: f32) -> bool {
389    // Filter by confidence
390    if confidence < NER_MIN_CONFIDENCE {
391        return false;
392    }
393
394    let trimmed = text.trim();
395
396    // Filter by minimum length
397    if trimmed.len() < NER_MIN_ENTITY_LEN {
398        return false;
399    }
400
401    // Filter out entities that are just whitespace or punctuation
402    if trimmed
403        .chars()
404        .all(|c| c.is_whitespace() || c.is_ascii_punctuation())
405    {
406        return false;
407    }
408
409    // Filter out entities containing newlines (malformed extractions)
410    if trimmed.contains('\n') || trimmed.contains('\r') {
411        return false;
412    }
413
414    // Filter out entities that look like partial words (start/end with ##)
415    if trimmed.starts_with("##") || trimmed.ends_with("##") {
416        return false;
417    }
418
419    // Filter out entities ending with period (likely sentence fragments)
420    if trimmed.ends_with('.') {
421        return false;
422    }
423
424    let lower = trimmed.to_lowercase();
425
426    // Filter out common single-letter false positives
427    if trimmed.len() == 1 {
428        return false;
429    }
430
431    // Filter out 2-letter tokens that are common noise
432    if trimmed.len() == 2 {
433        // Allow legitimate 2-letter entities like "EU", "UN", "UK", "US"
434        if matches!(lower.as_str(), "eu" | "un" | "uk" | "us" | "ai" | "it") {
435            return true;
436        }
437        // Filter other 2-letter tokens
438        return false;
439    }
440
441    // Filter out common noise words
442    if matches!(
443        lower.as_str(),
444        "the" | "api" | "url" | "pdf" | "inc" | "ltd" | "llc"
445    ) {
446        return false;
447    }
448
449    // Filter out partial tokenization artifacts
450    // e.g., "S&P Global" becomes "P Global", "IHS Markit" becomes "HS Markit"
451    let words: Vec<&str> = trimmed.split_whitespace().collect();
452    if words.len() >= 2 {
453        let first_word = words[0];
454        // If first word is 1-2 chars and looks like a fragment, filter it
455        if first_word.len() <= 2 && first_word.chars().all(|c| c.is_uppercase()) {
456            // But allow "US Bank", "UK Office", etc.
457            if !matches!(
458                first_word.to_lowercase().as_str(),
459                "us" | "uk" | "eu" | "un"
460            ) {
461                return false;
462            }
463        }
464    }
465
466    // Filter entities that start with lowercase (likely partial words)
467    if let Some(first_char) = trimmed.chars().next() {
468        if first_char.is_lowercase() {
469            return false;
470        }
471    }
472
473    true
474}
475
476/// Parse key=value pairs for tags
477pub fn parse_key_val(s: &str) -> Result<(String, String)> {
478    let (key, value) = s
479        .split_once('=')
480        .ok_or_else(|| anyhow!("expected KEY=VALUE, got `{s}`"))?;
481    Ok((key.to_string(), value.to_string()))
482}
483
484/// Lock-related CLI arguments (shared across commands)
485#[derive(Args, Clone, Copy, Debug)]
486pub struct LockCliArgs {
487    /// Maximum time to wait for an active writer before failing (ms)
488    #[arg(long = "lock-timeout", value_name = "MS", default_value_t = 250)]
489    pub lock_timeout: u64,
490    /// Attempt a stale takeover if the recorded writer heartbeat has expired
491    #[arg(long = "force", action = ArgAction::SetTrue)]
492    pub force: bool,
493}
494
495/// Arguments for the `put` subcommand
496#[derive(Args)]
497pub struct PutArgs {
498    /// Path to the memory file to append to
499    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
500    pub file: PathBuf,
501    /// Emit JSON instead of human-readable output
502    #[arg(long)]
503    pub json: bool,
504    /// Read payload bytes from a file instead of stdin
505    #[arg(long, value_name = "PATH")]
506    pub input: Option<String>,
507    /// Override the derived URI for the document
508    #[arg(long, value_name = "URI")]
509    pub uri: Option<String>,
510    /// Override the derived title for the document
511    #[arg(long, value_name = "TITLE")]
512    pub title: Option<String>,
513    /// Optional timestamp to store on the frame.
514    /// Accepts epoch seconds (1673740800) or human-readable dates:
515    /// "Jan 15, 2023", "2023-01-15", "01/15/2023"
516    #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
517    pub timestamp: Option<i64>,
518    /// Track name for the frame
519    #[arg(long)]
520    pub track: Option<String>,
521    /// Kind metadata for the frame
522    #[arg(long)]
523    pub kind: Option<String>,
524    /// Store the payload as a binary video without transcoding
525    #[arg(long, action = ArgAction::SetTrue)]
526    pub video: bool,
527    /// Attempt to attach a transcript (Dev/Enterprise tiers only)
528    #[arg(long, action = ArgAction::SetTrue)]
529    pub transcript: bool,
530    /// Tags to attach in key=value form
531    #[arg(long = "tag", value_parser = parse_key_val, value_name = "KEY=VALUE")]
532    pub tags: Vec<(String, String)>,
533    /// Free-form labels to attach to the frame
534    #[arg(long = "label", value_name = "LABEL")]
535    pub labels: Vec<String>,
536    /// Additional metadata payloads expressed as JSON
537    #[arg(long = "metadata", value_name = "JSON")]
538    pub metadata: Vec<String>,
539    /// Disable automatic tag generation from extracted text
540    #[arg(long = "no-auto-tag", action = ArgAction::SetTrue)]
541    pub no_auto_tag: bool,
542    /// Disable automatic date extraction from content
543    #[arg(long = "no-extract-dates", action = ArgAction::SetTrue)]
544    pub no_extract_dates: bool,
545    /// Disable automatic triplet extraction (SPO facts as MemoryCards)
546    #[arg(long = "no-extract-triplets", action = ArgAction::SetTrue)]
547    pub no_extract_triplets: bool,
548    /// Force audio analysis and tagging when ingesting binary data
549    #[arg(long, action = ArgAction::SetTrue)]
550    pub audio: bool,
551    /// Transcribe audio using Whisper and use the transcript for text embedding
552    /// Requires the 'whisper' feature to be enabled
553    #[arg(long, action = ArgAction::SetTrue)]
554    pub transcribe: bool,
555    /// Override the default segment duration (in seconds) when generating audio snippets
556    #[arg(long = "audio-segment-seconds", value_name = "SECS")]
557    pub audio_segment_seconds: Option<u32>,
558    /// Compute semantic embeddings using the installed model
559    /// Increases storage overhead from ~5KB to ~270KB per document
560    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue, conflicts_with = "no_embedding")]
561    pub embedding: bool,
562    /// Explicitly disable embeddings (default, but kept for clarity)
563    #[arg(long = "no-embedding", action = ArgAction::SetTrue)]
564    pub no_embedding: bool,
565    /// Path to a JSON file containing a pre-computed embedding vector (e.g., from OpenAI)
566    /// The JSON file should contain a flat array of floats like [0.1, 0.2, ...]
567    #[arg(long = "embedding-vec", value_name = "JSON_PATH")]
568    pub embedding_vec: Option<PathBuf>,
569    /// Embedding model identity for the provided `--embedding-vec` vector.
570    ///
571    /// This is written into per-frame `extra_metadata` (`memvid.embedding.*`) so that
572    /// semantic queries can auto-select the correct runtime across CLI/SDKs.
573    ///
574    /// Options: bge-small, bge-base, nomic, gte-large, openai, openai-small, openai-ada, nvidia
575    /// (also accepts canonical IDs like `text-embedding-3-small` and `BAAI/bge-small-en-v1.5`).
576    #[arg(
577        long = "embedding-vec-model",
578        value_name = "EMB_MODEL",
579        requires = "embedding_vec"
580    )]
581    pub embedding_vec_model: Option<String>,
582    /// Enable Product Quantization compression for vectors (16x compression)
583    /// Reduces vector storage from ~270KB to ~20KB per document with ~95% accuracy
584    /// Only applies when --embedding is enabled
585    #[arg(long, action = ArgAction::SetTrue)]
586    pub vector_compression: bool,
587    /// Enable contextual retrieval: prepend LLM-generated context to each chunk before embedding
588    /// This improves retrieval accuracy for preference/personalization queries
589    /// Requires OPENAI_API_KEY or a local model (phi-3.5-mini)
590    #[arg(long, action = ArgAction::SetTrue)]
591    pub contextual: bool,
592    /// Model to use for contextual retrieval (default: gpt-4o-mini, or "local" for phi-3.5-mini)
593    #[arg(long = "contextual-model", value_name = "MODEL")]
594    pub contextual_model: Option<String>,
595    /// Automatically extract tables from PDF documents
596    /// Uses aggressive mode with fallbacks for best results
597    #[arg(long, action = ArgAction::SetTrue)]
598    pub tables: bool,
599    /// Disable automatic table extraction (when --tables is the default)
600    #[arg(long = "no-tables", action = ArgAction::SetTrue)]
601    pub no_tables: bool,
602    /// Time budget for text extraction per document (milliseconds)
603    /// When set, extraction stops early and queues the frame for background enrichment
604    /// Use `memvid process-queue` to complete extraction later
605    #[arg(long = "extraction-budget-ms", value_name = "MS")]
606    pub extraction_budget_ms: Option<u64>,
607    /// Enable CLIP visual embeddings for images and PDF pages
608    /// Allows text-to-image search using natural language queries
609    /// Disabled by default; use --clip to enable
610    #[cfg(feature = "clip")]
611    #[arg(long, action = ArgAction::SetTrue)]
612    pub clip: bool,
613
614    /// Disable CLIP visual embeddings (no-op, CLIP is disabled by default)
615    #[cfg(feature = "clip")]
616    #[arg(long, action = ArgAction::SetTrue, hide = true)]
617    pub no_clip: bool,
618
619    /// Replace any existing frame with the same URI instead of inserting a duplicate
620    #[arg(long, conflicts_with = "allow_duplicate")]
621    pub update_existing: bool,
622    /// Allow inserting a new frame even if a frame with the same URI already exists
623    #[arg(long)]
624    pub allow_duplicate: bool,
625
626    /// Enable temporal enrichment: resolve relative time phrases ("last year") to absolute dates
627    /// Prepends resolved temporal context to chunks for improved temporal question accuracy
628    /// Requires the temporal_enrich feature in memvid-core
629    #[arg(long, action = ArgAction::SetTrue)]
630    pub temporal_enrich: bool,
631
632    /// Bind to a dashboard memory ID (UUID or 24-char ObjectId) for capacity and tracking
633    /// If the file is not already bound, this will sync the capacity ticket from the dashboard
634    #[arg(long = "memory-id", value_name = "ID")]
635    pub memory_id: Option<crate::commands::tickets::MemoryId>,
636
637    /// Build Logic-Mesh entity graph during ingestion (opt-in)
638    /// Extracts entities (people, organizations, locations, etc.) and their relationships
639    /// Enables graph traversal with `memvid follow`
640    /// Requires NER model: `memvid models install --ner distilbert-ner`
641    #[arg(long, action = ArgAction::SetTrue)]
642    pub logic_mesh: bool,
643
644    /// Use the experimental parallel segment builder (requires --features parallel_segments)
645    #[cfg(feature = "parallel_segments")]
646    #[arg(long, action = ArgAction::SetTrue)]
647    pub parallel_segments: bool,
648    /// Force the legacy ingestion path even when the parallel feature is available
649    #[cfg(feature = "parallel_segments")]
650    #[arg(long, action = ArgAction::SetTrue)]
651    pub no_parallel_segments: bool,
652    /// Target tokens per segment when using the parallel builder
653    #[cfg(feature = "parallel_segments")]
654    #[arg(long = "parallel-seg-tokens", value_name = "TOKENS")]
655    pub parallel_segment_tokens: Option<usize>,
656    /// Target pages per segment when using the parallel builder
657    #[cfg(feature = "parallel_segments")]
658    #[arg(long = "parallel-seg-pages", value_name = "PAGES")]
659    pub parallel_segment_pages: Option<usize>,
660    /// Worker threads used by the parallel builder
661    #[cfg(feature = "parallel_segments")]
662    #[arg(long = "parallel-threads", value_name = "N")]
663    pub parallel_threads: Option<usize>,
664    /// Worker queue depth used by the parallel builder
665    #[cfg(feature = "parallel_segments")]
666    #[arg(long = "parallel-queue-depth", value_name = "N")]
667    pub parallel_queue_depth: Option<usize>,
668
669    /// Store raw binary content along with extracted text.
670    /// By default, only extracted text + BLAKE3 hash is stored (--no-raw behavior).
671    /// Use --raw when you need to retrieve the original file later.
672    #[arg(long = "raw", action = ArgAction::SetTrue)]
673    pub raw: bool,
674
675    /// Don't store raw binary content (DEFAULT behavior).
676    /// Only extracted text + BLAKE3 hash is stored.
677    /// This flag is kept for backwards compatibility but is now the default.
678    #[arg(long = "no-raw", action = ArgAction::SetTrue, hide = true)]
679    pub no_raw: bool,
680
681    /// Skip ingestion if identical content (by BLAKE3 hash) already exists in the memory.
682    /// Returns the existing frame's ID instead of creating a duplicate.
683    #[arg(long, action = ArgAction::SetTrue)]
684    pub dedup: bool,
685
686    /// Run rules-based memory extraction after ingestion (default: enabled)
687    /// Extracts facts, preferences, events, relationships from ingested content
688    #[arg(long, action = ArgAction::SetTrue, default_value_t = true)]
689    pub enrich: bool,
690
691    /// Disable automatic rules-based enrichment
692    #[arg(long = "no-enrich", action = ArgAction::SetTrue)]
693    pub no_enrich: bool,
694
695    #[command(flatten)]
696    pub lock: LockCliArgs,
697}
698
699#[cfg(feature = "parallel_segments")]
700impl PutArgs {
701    pub fn wants_parallel(&self) -> bool {
702        if self.parallel_segments {
703            return true;
704        }
705        if self.no_parallel_segments {
706            return false;
707        }
708        if let Some(env_flag) = parallel_env_override() {
709            return env_flag;
710        }
711        false
712    }
713
714    pub fn sanitized_parallel_opts(&self) -> memvid_core::BuildOpts {
715        let mut opts = memvid_core::BuildOpts::default();
716        if let Some(tokens) = self.parallel_segment_tokens {
717            opts.segment_tokens = tokens;
718        }
719        if let Some(pages) = self.parallel_segment_pages {
720            opts.segment_pages = pages;
721        }
722        if let Some(threads) = self.parallel_threads {
723            opts.threads = threads;
724        }
725        if let Some(depth) = self.parallel_queue_depth {
726            opts.queue_depth = depth;
727        }
728        opts.sanitize();
729        opts
730    }
731}
732
733#[cfg(not(feature = "parallel_segments"))]
734impl PutArgs {
735    pub fn wants_parallel(&self) -> bool {
736        false
737    }
738}
739
740/// Arguments for the `api-fetch` subcommand
741#[derive(Args)]
742pub struct ApiFetchArgs {
743    /// Path to the memory file to ingest into
744    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
745    pub file: PathBuf,
746    /// Path to the fetch configuration JSON file
747    #[arg(value_name = "CONFIG", value_parser = clap::value_parser!(PathBuf))]
748    pub config: PathBuf,
749    /// Perform a dry run without writing to the memory
750    #[arg(long, action = ArgAction::SetTrue)]
751    pub dry_run: bool,
752    /// Override the configured ingest mode
753    #[arg(long, value_name = "MODE")]
754    pub mode: Option<ApiFetchMode>,
755    /// Override the base URI used when constructing frame URIs
756    #[arg(long, value_name = "URI")]
757    pub uri: Option<String>,
758    /// Emit JSON instead of human-readable output
759    #[arg(long, action = ArgAction::SetTrue)]
760    pub json: bool,
761
762    #[command(flatten)]
763    pub lock: LockCliArgs,
764}
765
766/// Arguments for the `update` subcommand
767#[derive(Args)]
768pub struct UpdateArgs {
769    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
770    pub file: PathBuf,
771    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
772    pub frame_id: Option<u64>,
773    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
774    pub uri: Option<String>,
775    #[arg(long = "input", value_name = "PATH", value_parser = clap::value_parser!(PathBuf))]
776    pub input: Option<PathBuf>,
777    #[arg(long = "set-uri", value_name = "URI")]
778    pub set_uri: Option<String>,
779    #[arg(long, value_name = "TITLE")]
780    pub title: Option<String>,
781    /// Timestamp for the frame. Accepts epoch seconds or human-readable dates.
782    #[arg(long, value_parser = parse_timestamp, value_name = "DATE")]
783    pub timestamp: Option<i64>,
784    #[arg(long, value_name = "TRACK")]
785    pub track: Option<String>,
786    #[arg(long, value_name = "KIND")]
787    pub kind: Option<String>,
788    #[arg(long = "tag", value_name = "KEY=VALUE", value_parser = parse_key_val)]
789    pub tags: Vec<(String, String)>,
790    #[arg(long = "label", value_name = "LABEL")]
791    pub labels: Vec<String>,
792    #[arg(long = "metadata", value_name = "JSON")]
793    pub metadata: Vec<String>,
794    #[arg(long, aliases = ["embeddings"], action = ArgAction::SetTrue)]
795    pub embeddings: bool,
796    #[arg(long)]
797    pub json: bool,
798
799    #[command(flatten)]
800    pub lock: LockCliArgs,
801}
802
803/// Arguments for the `delete` subcommand
804#[derive(Args)]
805pub struct DeleteArgs {
806    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
807    pub file: PathBuf,
808    #[arg(long = "frame-id", value_name = "ID", conflicts_with = "uri")]
809    pub frame_id: Option<u64>,
810    #[arg(long, value_name = "URI", conflicts_with = "frame_id")]
811    pub uri: Option<String>,
812    #[arg(long, action = ArgAction::SetTrue)]
813    pub yes: bool,
814    #[arg(long)]
815    pub json: bool,
816
817    #[command(flatten)]
818    pub lock: LockCliArgs,
819}
820
821/// Handler for `memvid api-fetch`
822pub fn handle_api_fetch(config: &CliConfig, args: ApiFetchArgs) -> Result<()> {
823    let command = ApiFetchCommand {
824        file: args.file,
825        config_path: args.config,
826        dry_run: args.dry_run,
827        mode_override: args.mode,
828        uri_override: args.uri,
829        output_json: args.json,
830        lock_timeout_ms: args.lock.lock_timeout,
831        force_lock: args.lock.force,
832    };
833    run_api_fetch(config, command)
834}
835
836/// Handler for `memvid delete`
837pub fn handle_delete(_config: &CliConfig, args: DeleteArgs) -> Result<()> {
838    let mut mem = Memvid::open(&args.file)?;
839    ensure_cli_mutation_allowed(&mem)?;
840    apply_lock_cli(&mut mem, &args.lock);
841    let frame = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
842
843    if !args.yes {
844        if let Some(uri) = &frame.uri {
845            println!("About to delete frame {} ({uri})", frame.id);
846        } else {
847            println!("About to delete frame {}", frame.id);
848        }
849        print!("Confirm? [y/N]: ");
850        io::stdout().flush()?;
851        let mut input = String::new();
852        io::stdin().read_line(&mut input)?;
853        let confirmed = matches!(input.trim().to_ascii_lowercase().as_str(), "y" | "yes");
854        if !confirmed {
855            println!("Aborted");
856            return Ok(());
857        }
858    }
859
860    let seq = mem.delete_frame(frame.id)?;
861    mem.commit()?;
862    let deleted = mem.frame_by_id(frame.id)?;
863
864    if args.json {
865        let json = json!({
866            "frame_id": frame.id,
867            "sequence": seq,
868            "status": frame_status_str(deleted.status),
869        });
870        println!("{}", serde_json::to_string_pretty(&json)?);
871    } else {
872        println!("Deleted frame {} (seq {})", frame.id, seq);
873    }
874
875    Ok(())
876}
877pub fn handle_put(config: &CliConfig, args: PutArgs) -> Result<()> {
878    // Check if plan allows write operations (blocks expired subscriptions)
879    crate::utils::require_active_plan(config, "put")?;
880
881    let mut mem = Memvid::open(&args.file)?;
882    ensure_cli_mutation_allowed(&mem)?;
883    apply_lock_cli(&mut mem, &args.lock);
884
885    // If memory-id is provided and file isn't already bound, bind it now
886    if let Some(memory_id) = &args.memory_id {
887        let needs_binding = match mem.get_memory_binding() {
888            Some(binding) => binding.memory_id != **memory_id,
889            None => true,
890        };
891        if needs_binding {
892            match crate::commands::creation::bind_to_dashboard_memory(
893                config, &mut mem, &args.file, memory_id,
894            ) {
895                Ok((bound_id, capacity)) => {
896                    eprintln!(
897                        "✓ Bound to dashboard memory: {} (capacity: {})",
898                        bound_id,
899                        crate::utils::format_bytes(capacity)
900                    );
901                }
902                Err(e) => {
903                    eprintln!("⚠️  Failed to bind to dashboard memory: {}", e);
904                    eprintln!("   Continuing with existing capacity. Bind manually with:");
905                    eprintln!(
906                        "   memvid tickets sync {} --memory-id {}",
907                        args.file.display(),
908                        memory_id
909                    );
910                }
911            }
912        }
913    }
914
915    // Load active replay session if one exists
916    #[cfg(feature = "replay")]
917    let _ = mem.load_active_session();
918    let mut stats = mem.stats()?;
919    if stats.frame_count == 0 && !stats.has_lex_index {
920        mem.enable_lex()?;
921        stats = mem.stats()?;
922    }
923
924    // Check if current memory size exceeds free tier limit
925    // If so, require API key to continue
926    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
927
928    // Set vector compression mode if requested
929    if args.vector_compression {
930        mem.set_vector_compression(memvid_core::VectorCompression::Pq96);
931    }
932
933    let mut capacity_guard = CapacityGuard::from_stats(&stats);
934    let use_parallel = args.wants_parallel();
935    #[cfg(feature = "parallel_segments")]
936    let mut parallel_opts = if use_parallel {
937        Some(args.sanitized_parallel_opts())
938    } else {
939        None
940    };
941    #[cfg(feature = "parallel_segments")]
942    let mut pending_parallel_inputs: Vec<ParallelInput> = Vec::new();
943    #[cfg(feature = "parallel_segments")]
944    let mut pending_parallel_indices: Vec<usize> = Vec::new();
945    let input_set = resolve_inputs(args.input.as_deref())?;
946    if args.embedding && args.embedding_vec.is_some() {
947        anyhow::bail!("--embedding-vec conflicts with --embedding; choose one");
948    }
949    if args.embedding_vec.is_some() {
950        match &input_set {
951            InputSet::Files(paths) if paths.len() != 1 => {
952                anyhow::bail!("--embedding-vec requires exactly one input file (or stdin)")
953            }
954            _ => {}
955        }
956    }
957
958    if args.video {
959        if let Some(kind) = &args.kind {
960            if !kind.eq_ignore_ascii_case("video") {
961                anyhow::bail!("--video conflicts with explicit --kind={kind}");
962            }
963        }
964        if matches!(input_set, InputSet::Stdin) {
965            anyhow::bail!("--video requires --input <file>");
966        }
967    }
968
969    #[allow(dead_code)]
970    enum EmbeddingMode {
971        None,
972        Auto,
973        Explicit,
974    }
975
976    // PHASE 1: Make embeddings opt-in, not opt-out
977    // Removed auto-embedding mode to reduce storage bloat (270 KB/doc → 5 KB/doc without vectors)
978    // Auto-detect embedding model from existing MV2 dimension to ensure compatibility
979    let mv2_dimension = mem.effective_vec_index_dimension()?;
980    let inferred_embedding_model = if args.embedding {
981        match mem.embedding_identity_summary(10_000) {
982            memvid_core::EmbeddingIdentitySummary::Single(identity) => {
983                identity.model.map(String::from)
984            }
985            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
986                let models: Vec<_> = identities
987                    .iter()
988                    .filter_map(|entry| entry.identity.model.as_deref())
989                    .collect();
990                anyhow::bail!(
991                    "memory contains mixed embedding models; refusing to add more embeddings.\n\n\
992                    Detected models: {:?}\n\n\
993                    Suggested fix: separate memories per embedding model, or re-ingest into a fresh .mv2.",
994                    models
995                );
996            }
997            memvid_core::EmbeddingIdentitySummary::Unknown => None,
998        }
999    } else {
1000        None
1001    };
1002    let (embedding_mode, embedding_runtime) = if args.embedding {
1003        (
1004            EmbeddingMode::Explicit,
1005            Some(load_embedding_runtime_for_mv2(
1006                config,
1007                inferred_embedding_model.as_deref(),
1008                mv2_dimension,
1009            )?),
1010        )
1011    } else if args.embedding_vec.is_some() {
1012        (EmbeddingMode::Explicit, None)
1013    } else {
1014        (EmbeddingMode::None, None)
1015    };
1016    let embedding_enabled = args.embedding || args.embedding_vec.is_some();
1017    let runtime_ref = embedding_runtime.as_ref();
1018
1019    // Enable CLIP index only if --clip flag is set explicitly
1020    // CLIP is disabled by default to avoid unexpected overhead
1021    #[cfg(feature = "clip")]
1022    let clip_enabled = args.clip;
1023    #[cfg(feature = "clip")]
1024    let clip_model = if clip_enabled {
1025        mem.enable_clip()?;
1026        if !args.json {
1027            eprintln!("ℹ️  CLIP visual embeddings enabled");
1028            eprintln!("   Model: MobileCLIP-S2 (512 dimensions)");
1029            eprintln!("   Use 'memvid find --mode clip' to search with natural language");
1030            eprintln!();
1031        }
1032        // Initialize CLIP model for generating image embeddings
1033        match memvid_core::ClipModel::default_model() {
1034            Ok(model) => Some(model),
1035            Err(e) => {
1036                warn!("Failed to load CLIP model: {e}. CLIP embeddings will be skipped.");
1037                None
1038            }
1039        }
1040    } else {
1041        None
1042    };
1043
1044    // Warn user about vector storage overhead (PHASE 2)
1045    if embedding_enabled && !args.json {
1046        let capacity = stats.capacity_bytes;
1047        if args.vector_compression {
1048            // PHASE 3: PQ compression reduces storage 16x
1049            let max_docs = capacity / 20_000; // ~20 KB/doc with PQ compression
1050            eprintln!("ℹ️  Vector embeddings enabled with Product Quantization compression");
1051            eprintln!("   Storage: ~20 KB per document (16x compressed)");
1052            eprintln!("   Accuracy: ~95% recall@10 maintained");
1053            eprintln!(
1054                "   Capacity: ~{} documents max for {:.1} GB",
1055                max_docs,
1056                capacity as f64 / 1e9
1057            );
1058            eprintln!();
1059        } else {
1060            let max_docs = capacity / 270_000; // Conservative estimate: 270 KB/doc
1061            eprintln!("⚠️  WARNING: Vector embeddings enabled (uncompressed)");
1062            eprintln!("   Storage: ~270 KB per document");
1063            eprintln!(
1064                "   Capacity: ~{} documents max for {:.1} GB",
1065                max_docs,
1066                capacity as f64 / 1e9
1067            );
1068            eprintln!("   Use --vector-compression for 16x storage savings (~20 KB/doc)");
1069            eprintln!("   Use --no-embedding for lexical search only (~5 KB/doc)");
1070            eprintln!();
1071        }
1072    }
1073
1074    let embed_progress = if embedding_enabled {
1075        let total = match &input_set {
1076            InputSet::Files(paths) => Some(paths.len() as u64),
1077            InputSet::Stdin => None,
1078        };
1079        Some(create_task_progress_bar(total, "embed", false))
1080    } else {
1081        None
1082    };
1083    let mut embedded_docs = 0usize;
1084
1085    if let InputSet::Files(paths) = &input_set {
1086        if paths.len() > 1 {
1087            if args.uri.is_some() || args.title.is_some() {
1088                anyhow::bail!(
1089                    "--uri/--title apply to a single file; omit them when using a directory or glob"
1090                );
1091            }
1092        }
1093    }
1094
1095    let mut reports = Vec::new();
1096    let mut processed = 0usize;
1097    let mut skipped = 0usize;
1098    let mut bytes_added = 0u64;
1099    let mut bytes_input = 0u64;
1100    let mut no_raw_count = 0usize;
1101    let mut summary_warnings: Vec<String> = Vec::new();
1102    #[cfg(feature = "clip")]
1103    let mut clip_embeddings_added = false;
1104
1105    let mut capacity_reached = false;
1106    match input_set {
1107        InputSet::Stdin => {
1108            processed += 1;
1109            match read_payload(None) {
1110                Ok(payload) => {
1111                    let mut analyze_spinner = if args.json {
1112                        None
1113                    } else {
1114                        Some(create_spinner("Analyzing stdin payload..."))
1115                    };
1116                    match ingest_payload(
1117                        &mut mem,
1118                        &args,
1119                        config,
1120                        payload,
1121                        None,
1122                        capacity_guard.as_mut(),
1123                        embedding_enabled,
1124                        runtime_ref,
1125                        use_parallel,
1126                    ) {
1127                        Ok(outcome) => {
1128                            if let Some(pb) = analyze_spinner.take() {
1129                                pb.finish_and_clear();
1130                            }
1131                            bytes_added += outcome.report.stored_bytes as u64;
1132                            bytes_input += outcome.report.original_bytes as u64;
1133                            if outcome.report.no_raw {
1134                                no_raw_count += 1;
1135                            }
1136                            if args.json {
1137                                summary_warnings
1138                                    .extend(outcome.report.extraction.warnings.iter().cloned());
1139                            }
1140                            reports.push(outcome.report);
1141                            let report_index = reports.len() - 1;
1142                            if !args.json && !use_parallel {
1143                                if let Some(report) = reports.get(report_index) {
1144                                    print_report(report);
1145                                }
1146                            }
1147                            if args.transcript {
1148                                let notice = transcript_notice_message(&mut mem)?;
1149                                if args.json {
1150                                    summary_warnings.push(notice);
1151                                } else {
1152                                    println!("{notice}");
1153                                }
1154                            }
1155                            if outcome.embedded {
1156                                if let Some(pb) = embed_progress.as_ref() {
1157                                    pb.inc(1);
1158                                }
1159                                embedded_docs += 1;
1160                            }
1161                            if use_parallel {
1162                                #[cfg(feature = "parallel_segments")]
1163                                if let Some(input) = outcome.parallel_input {
1164                                    pending_parallel_indices.push(report_index);
1165                                    pending_parallel_inputs.push(input);
1166                                }
1167                            } else if let Some(guard) = capacity_guard.as_mut() {
1168                                mem.commit()?;
1169                                let stats = mem.stats()?;
1170                                guard.update_after_commit(&stats);
1171                                guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1172                            }
1173                        }
1174                        Err(err) => {
1175                            if let Some(pb) = analyze_spinner.take() {
1176                                pb.finish_and_clear();
1177                            }
1178                            if err.downcast_ref::<DuplicateUriError>().is_some() {
1179                                return Err(err);
1180                            }
1181                            warn!("skipped stdin payload: {err}");
1182                            skipped += 1;
1183                            if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1184                                capacity_reached = true;
1185                            }
1186                        }
1187                    }
1188                }
1189                Err(err) => {
1190                    warn!("failed to read stdin: {err}");
1191                    skipped += 1;
1192                }
1193            }
1194        }
1195        InputSet::Files(ref paths) => {
1196            let mut transcript_notice_printed = false;
1197            for path in paths {
1198                processed += 1;
1199                match read_payload(Some(&path)) {
1200                    Ok(payload) => {
1201                        let label = path.display().to_string();
1202                        let mut analyze_spinner = if args.json {
1203                            None
1204                        } else {
1205                            Some(create_spinner(&format!("Analyzing {label}...")))
1206                        };
1207                        match ingest_payload(
1208                            &mut mem,
1209                            &args,
1210                            config,
1211                            payload,
1212                            Some(&path),
1213                            capacity_guard.as_mut(),
1214                            embedding_enabled,
1215                            runtime_ref,
1216                            use_parallel,
1217                        ) {
1218                            Ok(outcome) => {
1219                                if let Some(pb) = analyze_spinner.take() {
1220                                    pb.finish_and_clear();
1221                                }
1222                                bytes_added += outcome.report.stored_bytes as u64;
1223                                bytes_input += outcome.report.original_bytes as u64;
1224                                if outcome.report.no_raw {
1225                                    no_raw_count += 1;
1226                                }
1227
1228                                // Generate CLIP embedding for image files
1229                                #[cfg(feature = "clip")]
1230                                if let Some(ref model) = clip_model {
1231                                    let mime = outcome.report.mime.as_deref();
1232                                    let clip_frame_id = outcome.report.frame_id;
1233
1234                                    if is_image_file(&path) {
1235                                        match model.encode_image_file(&path) {
1236                                            Ok(embedding) => {
1237                                                if let Err(e) = mem.add_clip_embedding_with_page(
1238                                                    clip_frame_id,
1239                                                    None,
1240                                                    embedding,
1241                                                ) {
1242                                                    warn!(
1243                                                        "Failed to add CLIP embedding for {}: {e}",
1244                                                        path.display()
1245                                                    );
1246                                                } else {
1247                                                    info!(
1248                                                        "Added CLIP embedding for frame {}",
1249                                                        clip_frame_id
1250                                                    );
1251                                                    clip_embeddings_added = true;
1252                                                }
1253                                            }
1254                                            Err(e) => {
1255                                                warn!(
1256                                                    "Failed to encode CLIP embedding for {}: {e}",
1257                                                    path.display()
1258                                                );
1259                                            }
1260                                        }
1261                                    } else if matches!(mime, Some(m) if m == "application/pdf") {
1262                                        // Commit before adding child frames to ensure WAL has space
1263                                        if let Err(e) = mem.commit() {
1264                                            warn!("Failed to commit before CLIP extraction: {e}");
1265                                        }
1266
1267                                        match render_pdf_pages_for_clip(
1268                                            &path,
1269                                            CLIP_PDF_MAX_IMAGES,
1270                                            CLIP_PDF_TARGET_PX,
1271                                        ) {
1272                                            Ok(rendered_pages) => {
1273                                                use rayon::prelude::*;
1274
1275                                                // Pre-encode all images to PNG in parallel for better performance
1276                                                // Filter out junk images (solid colors, black, too small)
1277                                                let path_for_closure = path.clone();
1278
1279                                                // Temporarily suppress panic output during image encoding
1280                                                // (some PDFs have malformed images that cause panics in the image crate)
1281                                                let prev_hook = std::panic::take_hook();
1282                                                std::panic::set_hook(Box::new(|_| {
1283                                                    // Silently ignore panics - we handle them with catch_unwind
1284                                                }));
1285
1286                                                let encoded_images: Vec<_> = rendered_pages
1287                                                    .into_par_iter()
1288                                                    .filter_map(|(page_num, image)| {
1289                                                        // Check if image is worth embedding (not junk)
1290                                                        let image_info = get_image_info(&image);
1291                                                        if !image_info.should_embed() {
1292                                                            info!(
1293                                                                "Skipping junk image for {} page {} (variance={:.4}, {}x{})",
1294                                                                path_for_closure.display(),
1295                                                                page_num,
1296                                                                image_info.color_variance,
1297                                                                image_info.width,
1298                                                                image_info.height
1299                                                            );
1300                                                            return None;
1301                                                        }
1302
1303                                                        // Convert to RGB8 first to ensure consistent buffer size
1304                                                        // (some PDFs have images with alpha or other formats that cause panics)
1305                                                        // Use catch_unwind to handle any panics from corrupted image data
1306                                                        let encode_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1307                                                            let rgb_image = image.to_rgb8();
1308                                                            let mut png_bytes = Vec::new();
1309                                                            image::DynamicImage::ImageRgb8(rgb_image).write_to(
1310                                                                &mut Cursor::new(&mut png_bytes),
1311                                                                image::ImageFormat::Png,
1312                                                            ).map(|()| png_bytes)
1313                                                        }));
1314
1315                                                        match encode_result {
1316                                                            Ok(Ok(png_bytes)) => Some((page_num, image, png_bytes)),
1317                                                            Ok(Err(e)) => {
1318                                                                info!(
1319                                                                    "Skipping image for {} page {}: {e}",
1320                                                                    path_for_closure.display(),
1321                                                                    page_num
1322                                                                );
1323                                                                None
1324                                                            }
1325                                                            Err(_) => {
1326                                                                // Panic occurred - corrupted image data, silently skip
1327                                                                info!(
1328                                                                    "Skipping malformed image for {} page {}",
1329                                                                    path_for_closure.display(),
1330                                                                    page_num
1331                                                                );
1332                                                                None
1333                                                            }
1334                                                        }
1335                                                    })
1336                                                    .collect();
1337
1338                                                // Restore the original panic hook
1339                                                std::panic::set_hook(prev_hook);
1340
1341                                                // Process encoded images sequentially (storage + CLIP encoding)
1342                                                // Commit after each image to prevent WAL overflow (PNG images can be large)
1343                                                let mut child_frame_offset = 1u64;
1344                                                let parent_uri = outcome.report.uri.clone();
1345                                                let parent_title = outcome
1346                                                    .report
1347                                                    .title
1348                                                    .clone()
1349                                                    .unwrap_or_else(|| "Image".to_string());
1350                                                let total_images = encoded_images.len();
1351
1352                                                // Show progress bar for CLIP extraction
1353                                                let clip_pb = if !args.json && total_images > 0 {
1354                                                    let pb = ProgressBar::new(total_images as u64);
1355                                                    pb.set_style(
1356                                                        ProgressStyle::with_template(
1357                                                            "  CLIP {bar:40.cyan/blue} {pos}/{len} images ({eta})"
1358                                                        )
1359                                                        .unwrap_or_else(|_| ProgressStyle::default_bar())
1360                                                        .progress_chars("█▓░"),
1361                                                    );
1362                                                    Some(pb)
1363                                                } else {
1364                                                    None
1365                                                };
1366
1367                                                for (page_num, image, png_bytes) in encoded_images {
1368                                                    let child_uri = format!(
1369                                                        "{}/image/{}",
1370                                                        parent_uri, child_frame_offset
1371                                                    );
1372                                                    let child_title = format!(
1373                                                        "{} - Image {} (page {})",
1374                                                        parent_title, child_frame_offset, page_num
1375                                                    );
1376
1377                                                    let child_options = PutOptions::builder()
1378                                                        .uri(&child_uri)
1379                                                        .title(&child_title)
1380                                                        .parent_id(clip_frame_id)
1381                                                        .role(FrameRole::ExtractedImage)
1382                                                        .auto_tag(false)
1383                                                        .extract_dates(false)
1384                                                        .build();
1385
1386                                                    match mem.put_bytes_with_options(&png_bytes, child_options) {
1387                                                        Ok(_child_seq) => {
1388                                                            let child_frame_id = clip_frame_id + child_frame_offset;
1389                                                            child_frame_offset += 1;
1390
1391                                                            match model.encode_image(&image) {
1392                                                                Ok(embedding) => {
1393                                                                    if let Err(e) = mem
1394                                                                        .add_clip_embedding_with_page(
1395                                                                            child_frame_id,
1396                                                                            Some(page_num),
1397                                                                            embedding,
1398                                                                        )
1399                                                                    {
1400                                                                        warn!(
1401                                                                            "Failed to add CLIP embedding for {} page {}: {e}",
1402                                                                            path.display(),
1403                                                                            page_num
1404                                                                        );
1405                                                                    } else {
1406                                                                        info!(
1407                                                                            "Added CLIP image frame {} for {} page {}",
1408                                                                            child_frame_id,
1409                                                                            clip_frame_id,
1410                                                                            page_num
1411                                                                        );
1412                                                                        clip_embeddings_added = true;
1413                                                                    }
1414                                                                }
1415                                                                Err(e) => warn!(
1416                                                                    "Failed to encode CLIP embedding for {} page {}: {e}",
1417                                                                    path.display(),
1418                                                                    page_num
1419                                                                ),
1420                                                            }
1421
1422                                                            // Commit after each image to checkpoint WAL
1423                                                            if let Err(e) = mem.commit() {
1424                                                                warn!("Failed to commit after image {}: {e}", child_frame_offset);
1425                                                            }
1426                                                        }
1427                                                        Err(e) => warn!(
1428                                                            "Failed to store image frame for {} page {}: {e}",
1429                                                            path.display(),
1430                                                            page_num
1431                                                        ),
1432                                                    }
1433
1434                                                    if let Some(ref pb) = clip_pb {
1435                                                        pb.inc(1);
1436                                                    }
1437                                                }
1438
1439                                                if let Some(pb) = clip_pb {
1440                                                    pb.finish_and_clear();
1441                                                }
1442                                            }
1443                                            Err(err) => warn!(
1444                                                "Failed to render PDF pages for CLIP {}: {err}",
1445                                                path.display()
1446                                            ),
1447                                        }
1448                                    }
1449                                }
1450
1451                                if args.json {
1452                                    summary_warnings
1453                                        .extend(outcome.report.extraction.warnings.iter().cloned());
1454                                }
1455                                reports.push(outcome.report);
1456                                let report_index = reports.len() - 1;
1457                                if !args.json && !use_parallel {
1458                                    if let Some(report) = reports.get(report_index) {
1459                                        print_report(report);
1460                                    }
1461                                }
1462                                if args.transcript && !transcript_notice_printed {
1463                                    let notice = transcript_notice_message(&mut mem)?;
1464                                    if args.json {
1465                                        summary_warnings.push(notice);
1466                                    } else {
1467                                        println!("{notice}");
1468                                    }
1469                                    transcript_notice_printed = true;
1470                                }
1471                                if outcome.embedded {
1472                                    if let Some(pb) = embed_progress.as_ref() {
1473                                        pb.inc(1);
1474                                    }
1475                                    embedded_docs += 1;
1476                                }
1477                                if use_parallel {
1478                                    #[cfg(feature = "parallel_segments")]
1479                                    if let Some(input) = outcome.parallel_input {
1480                                        pending_parallel_indices.push(report_index);
1481                                        pending_parallel_inputs.push(input);
1482                                    }
1483                                } else if let Some(guard) = capacity_guard.as_mut() {
1484                                    mem.commit()?;
1485                                    let stats = mem.stats()?;
1486                                    guard.update_after_commit(&stats);
1487                                    guard.check_and_warn_capacity(); // PHASE 2: Capacity warnings
1488                                }
1489                            }
1490                            Err(err) => {
1491                                if let Some(pb) = analyze_spinner.take() {
1492                                    pb.finish_and_clear();
1493                                }
1494                                if err.downcast_ref::<DuplicateUriError>().is_some() {
1495                                    return Err(err);
1496                                }
1497                                warn!("skipped {}: {err}", path.display());
1498                                skipped += 1;
1499                                if err.downcast_ref::<CapacityExceededMessage>().is_some() {
1500                                    capacity_reached = true;
1501                                }
1502                            }
1503                        }
1504                    }
1505                    Err(err) => {
1506                        warn!("failed to read {}: {err}", path.display());
1507                        skipped += 1;
1508                    }
1509                }
1510                if capacity_reached {
1511                    break;
1512                }
1513            }
1514        }
1515    }
1516
1517    if capacity_reached {
1518        warn!("capacity limit reached; remaining inputs were not processed");
1519    }
1520
1521    if let Some(pb) = embed_progress.as_ref() {
1522        pb.finish_with_message("embed");
1523    }
1524
1525    if let Some(runtime) = embedding_runtime.as_ref() {
1526        if embedded_docs > 0 {
1527            match embedding_mode {
1528                EmbeddingMode::Explicit => println!(
1529                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)",
1530                    embedded_docs,
1531                    runtime.dimension()
1532                ),
1533                EmbeddingMode::Auto => println!(
1534                    "\u{2713} vectors={}  dim={}  hnsw(M=16, efC=200)  (auto)",
1535                    embedded_docs,
1536                    runtime.dimension()
1537                ),
1538                EmbeddingMode::None => {}
1539            }
1540        } else {
1541            match embedding_mode {
1542                EmbeddingMode::Explicit => {
1543                    println!("No embeddings were generated (no textual content available)");
1544                }
1545                EmbeddingMode::Auto => {
1546                    println!(
1547                        "Semantic runtime available, but no textual content produced embeddings"
1548                    );
1549                }
1550                EmbeddingMode::None => {}
1551            }
1552        }
1553    }
1554
1555    if reports.is_empty() {
1556        if args.json {
1557            if capacity_reached {
1558                summary_warnings.push("capacity_limit_reached".to_string());
1559            }
1560            let summary_json = json!({
1561                "processed": processed,
1562                "ingested": 0,
1563                "skipped": skipped,
1564                "bytes_added": bytes_added,
1565                "bytes_added_human": format_bytes(bytes_added),
1566                "embedded_documents": embedded_docs,
1567                "capacity_reached": capacity_reached,
1568                "warnings": summary_warnings,
1569                "reports": Vec::<serde_json::Value>::new(),
1570            });
1571            println!("{}", serde_json::to_string_pretty(&summary_json)?);
1572        } else {
1573            println!(
1574                "Summary: processed {}, ingested 0, skipped {}, bytes added 0 B",
1575                processed, skipped
1576            );
1577        }
1578        return Ok(());
1579    }
1580
1581    #[cfg(feature = "parallel_segments")]
1582    let mut parallel_flush_spinner = if use_parallel && !args.json {
1583        Some(create_spinner("Flushing parallel segments..."))
1584    } else {
1585        None
1586    };
1587
1588    if use_parallel {
1589        #[cfg(feature = "parallel_segments")]
1590        {
1591            let mut opts = parallel_opts.take().unwrap_or_else(|| {
1592                let mut opts = BuildOpts::default();
1593                opts.sanitize();
1594                opts
1595            });
1596            // Set vector compression mode from mem state
1597            opts.vec_compression = mem.vector_compression().clone();
1598            let seqs = if pending_parallel_inputs.is_empty() {
1599                mem.commit_parallel(opts)?;
1600                Vec::new()
1601            } else {
1602                mem.put_parallel_inputs(&pending_parallel_inputs, opts)?
1603            };
1604            if let Some(pb) = parallel_flush_spinner.take() {
1605                pb.finish_with_message("Flushed parallel segments");
1606            }
1607            for (idx, seq) in pending_parallel_indices.into_iter().zip(seqs.into_iter()) {
1608                if let Some(report) = reports.get_mut(idx) {
1609                    report.seq = seq;
1610                }
1611            }
1612        }
1613        #[cfg(not(feature = "parallel_segments"))]
1614        {
1615            mem.commit()?;
1616        }
1617    } else {
1618        mem.commit()?;
1619    }
1620
1621    // Persist CLIP embeddings added after the primary commit (avoids rebuild ordering issues).
1622    #[cfg(feature = "clip")]
1623    if clip_embeddings_added {
1624        mem.commit()?;
1625    }
1626
1627    if !args.json && use_parallel {
1628        for report in &reports {
1629            print_report(report);
1630        }
1631    }
1632
1633    // Table extraction for PDF files
1634    let mut tables_extracted = 0usize;
1635    let mut table_summaries: Vec<serde_json::Value> = Vec::new();
1636    if args.tables && !args.no_tables {
1637        if let InputSet::Files(ref paths) = input_set {
1638            let pdf_paths: Vec<_> = paths
1639                .iter()
1640                .filter(|p| {
1641                    p.extension()
1642                        .and_then(|e| e.to_str())
1643                        .map(|e| e.eq_ignore_ascii_case("pdf"))
1644                        .unwrap_or(false)
1645                })
1646                .collect();
1647
1648            if !pdf_paths.is_empty() {
1649                let table_spinner = if args.json {
1650                    None
1651                } else {
1652                    Some(create_spinner(&format!(
1653                        "Extracting tables from {} PDF(s)...",
1654                        pdf_paths.len()
1655                    )))
1656                };
1657
1658                // Use aggressive mode with auto fallback for best results
1659                let table_options = TableExtractionOptions::builder()
1660                    .mode(memvid_core::table::ExtractionMode::Aggressive)
1661                    .min_rows(2)
1662                    .min_cols(2)
1663                    .min_quality(memvid_core::table::TableQuality::Low)
1664                    .merge_multi_page(true)
1665                    .build();
1666
1667                for pdf_path in pdf_paths {
1668                    if let Ok(pdf_bytes) = std::fs::read(pdf_path) {
1669                        let filename = pdf_path
1670                            .file_name()
1671                            .and_then(|s| s.to_str())
1672                            .unwrap_or("unknown.pdf");
1673
1674                        if let Ok(result) = extract_tables(&pdf_bytes, filename, &table_options) {
1675                            for table in &result.tables {
1676                                if let Ok((meta_id, _row_ids)) = store_table(&mut mem, table, true)
1677                                {
1678                                    tables_extracted += 1;
1679                                    table_summaries.push(json!({
1680                                        "table_id": table.table_id,
1681                                        "source_file": table.source_file,
1682                                        "meta_frame_id": meta_id,
1683                                        "rows": table.n_rows,
1684                                        "cols": table.n_cols,
1685                                        "quality": format!("{:?}", table.quality),
1686                                        "pages": format!("{}-{}", table.page_start, table.page_end),
1687                                    }));
1688                                }
1689                            }
1690                        }
1691                    }
1692                }
1693
1694                if let Some(pb) = table_spinner {
1695                    if tables_extracted > 0 {
1696                        pb.finish_with_message(format!("Extracted {} table(s)", tables_extracted));
1697                    } else {
1698                        pb.finish_with_message("No tables found");
1699                    }
1700                }
1701
1702                // Commit table frames
1703                if tables_extracted > 0 {
1704                    mem.commit()?;
1705                }
1706            }
1707        }
1708    }
1709
1710    // Logic-Mesh: Extract entities and build relationship graph
1711    // Opt-in only: requires --logic-mesh flag
1712    #[cfg(feature = "logic_mesh")]
1713    let mut logic_mesh_entities = 0usize;
1714    #[cfg(feature = "logic_mesh")]
1715    {
1716        use memvid_core::{ner_model_path, ner_tokenizer_path, LogicMesh, MeshNode, NerModel};
1717
1718        let models_dir = config.models_dir.clone();
1719        let model_path = ner_model_path(&models_dir);
1720        let tokenizer_path = ner_tokenizer_path(&models_dir);
1721
1722        // Opt-in: only enable Logic-Mesh when explicitly requested with --logic-mesh
1723        let ner_available = model_path.exists() && tokenizer_path.exists();
1724        let should_run_ner = args.logic_mesh;
1725
1726        if should_run_ner && !ner_available {
1727            // User explicitly requested --logic-mesh but model not installed
1728            if args.json {
1729                summary_warnings.push(
1730                    "logic_mesh_skipped: NER model not installed. Run 'memvid models install --ner distilbert-ner'".to_string()
1731                );
1732            } else {
1733                eprintln!("⚠️  Logic-Mesh skipped: NER model not installed.");
1734                eprintln!("   Run: memvid models install --ner distilbert-ner");
1735            }
1736        } else if should_run_ner {
1737            let ner_spinner = if args.json {
1738                None
1739            } else {
1740                Some(create_spinner("Building Logic-Mesh entity graph..."))
1741            };
1742
1743            match NerModel::load(&model_path, &tokenizer_path, None) {
1744                Ok(mut ner_model) => {
1745                    let mut mesh = LogicMesh::new();
1746                    let mut filtered_count = 0usize;
1747
1748                    // Process each ingested frame for entity extraction using cached search_text
1749                    for report in &reports {
1750                        let frame_id = report.frame_id;
1751                        // Use the cached search_text from ingestion
1752                        if let Some(ref content) = report.search_text {
1753                            if !content.is_empty() {
1754                                match ner_model.extract(content) {
1755                                    Ok(raw_entities) => {
1756                                        // Merge adjacent entities that were split by tokenization
1757                                        // e.g., "S" + "&" + "P Global" -> "S&P Global"
1758                                        let merged_entities =
1759                                            merge_adjacent_entities(raw_entities, content);
1760
1761                                        for entity in &merged_entities {
1762                                            // Apply post-processing filter to remove noisy entities
1763                                            if !is_valid_entity(&entity.text, entity.confidence) {
1764                                                tracing::debug!(
1765                                                    "Filtered entity: '{}' (confidence: {:.0}%)",
1766                                                    entity.text,
1767                                                    entity.confidence * 100.0
1768                                                );
1769                                                filtered_count += 1;
1770                                                continue;
1771                                            }
1772
1773                                            // Convert NER entity type to EntityKind
1774                                            let kind = entity.to_entity_kind();
1775                                            // Create mesh node with proper structure
1776                                            let node = MeshNode::new(
1777                                                entity.text.to_lowercase(),
1778                                                entity.text.trim().to_string(),
1779                                                kind,
1780                                                entity.confidence,
1781                                                frame_id,
1782                                                entity.byte_start as u32,
1783                                                (entity.byte_end - entity.byte_start)
1784                                                    .min(u16::MAX as usize)
1785                                                    as u16,
1786                                            );
1787                                            mesh.merge_node(node);
1788                                            logic_mesh_entities += 1;
1789                                        }
1790                                    }
1791                                    Err(e) => {
1792                                        warn!("NER extraction failed for frame {frame_id}: {e}");
1793                                    }
1794                                }
1795                            }
1796                        }
1797                    }
1798
1799                    if logic_mesh_entities > 0 {
1800                        mesh.finalize();
1801                        let node_count = mesh.stats().node_count;
1802                        // Persist mesh to MV2 file
1803                        mem.set_logic_mesh(mesh);
1804                        mem.commit()?;
1805                        info!(
1806                            "Logic-Mesh persisted with {} entities ({} unique nodes, {} filtered)",
1807                            logic_mesh_entities, node_count, filtered_count
1808                        );
1809                    }
1810
1811                    if let Some(pb) = ner_spinner {
1812                        pb.finish_with_message(format!(
1813                            "Logic-Mesh: {} entities ({} unique)",
1814                            logic_mesh_entities,
1815                            mem.mesh_node_count()
1816                        ));
1817                    }
1818                }
1819                Err(e) => {
1820                    if let Some(pb) = ner_spinner {
1821                        pb.finish_with_message(format!("Logic-Mesh failed: {e}"));
1822                    }
1823                    if args.json {
1824                        summary_warnings.push(format!("logic_mesh_failed: {e}"));
1825                    }
1826                }
1827            }
1828        }
1829    }
1830
1831    // Rules-based enrichment: extract memory cards from ingested frames
1832    let mut enrichment_cards = 0usize;
1833    if args.enrich && !args.no_enrich && !reports.is_empty() {
1834        let enrich_spinner = if args.json {
1835            None
1836        } else {
1837            Some(create_spinner(&format!(
1838                "Extracting memories from {} frame(s)...",
1839                reports.len()
1840            )))
1841        };
1842
1843        let mut rules_engine = RulesEngine::new();
1844        rules_engine.init().ok(); // Rules engine doesn't need external init
1845
1846        let frame_ids: Vec<u64> = reports.iter().map(|r| r.frame_id).collect();
1847        for frame_id in &frame_ids {
1848            if let Ok(frame) = mem.frame_by_id(*frame_id) {
1849                let text = frame.search_text.as_deref().unwrap_or("");
1850                if !text.is_empty() {
1851                    let ctx = memvid_core::enrich::EnrichmentContext::new(
1852                        *frame_id,
1853                        frame.uri.clone().unwrap_or_default(),
1854                        text.to_string(),
1855                        frame.title.clone(),
1856                        frame.timestamp,
1857                        None,
1858                    );
1859                    let result = rules_engine.enrich(&ctx);
1860                    if !result.cards.is_empty() {
1861                        for card in result.cards {
1862                            if mem.put_memory_card(card).is_ok() {
1863                                enrichment_cards += 1;
1864                            }
1865                        }
1866                    }
1867                }
1868            }
1869        }
1870
1871        if enrichment_cards > 0 {
1872            mem.commit()?;
1873        }
1874
1875        if let Some(pb) = enrich_spinner {
1876            if enrichment_cards > 0 {
1877                pb.finish_with_message(format!("Extracted {} memory card(s)", enrichment_cards));
1878            } else {
1879                pb.finish_with_message("No memories extracted");
1880            }
1881        }
1882    }
1883
1884    let stats = mem.stats()?;
1885    if args.json {
1886        if capacity_reached {
1887            summary_warnings.push("capacity_limit_reached".to_string());
1888        }
1889        let reports_json: Vec<serde_json::Value> = reports.iter().map(put_report_to_json).collect();
1890        let total_duration_ms: Option<u64> = {
1891            let sum: u64 = reports
1892                .iter()
1893                .filter_map(|r| r.extraction.duration_ms)
1894                .sum();
1895            if sum > 0 {
1896                Some(sum)
1897            } else {
1898                None
1899            }
1900        };
1901        let total_pages: Option<u32> = {
1902            let sum: u32 = reports
1903                .iter()
1904                .filter_map(|r| r.extraction.pages_processed)
1905                .sum();
1906            if sum > 0 {
1907                Some(sum)
1908            } else {
1909                None
1910            }
1911        };
1912        let embedding_mode_str = match embedding_mode {
1913            EmbeddingMode::None => "none",
1914            EmbeddingMode::Auto => "auto",
1915            EmbeddingMode::Explicit => "explicit",
1916        };
1917        let reduction_percent = if bytes_input > 0 && bytes_input > bytes_added {
1918            Some(((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0)
1919        } else {
1920            None
1921        };
1922        let summary_json = json!({
1923            "processed": processed,
1924            "ingested": reports.len(),
1925            "skipped": skipped,
1926            "bytes_input": bytes_input,
1927            "bytes_input_human": format_bytes(bytes_input),
1928            "bytes_stored": bytes_added,
1929            "bytes_stored_human": format_bytes(bytes_added),
1930            "reduction_percent": reduction_percent,
1931            "embedded_documents": embedded_docs,
1932            "embedding": {
1933                "enabled": embedding_enabled,
1934                "mode": embedding_mode_str,
1935                "runtime_dimension": runtime_ref.map(|rt| rt.dimension()),
1936            },
1937            "tables": {
1938                "enabled": args.tables && !args.no_tables,
1939                "extracted": tables_extracted,
1940                "tables": table_summaries,
1941            },
1942            "enrichment": {
1943                "enabled": args.enrich && !args.no_enrich,
1944                "cards_extracted": enrichment_cards,
1945            },
1946            "capacity_reached": capacity_reached,
1947            "warnings": summary_warnings,
1948            "extraction": {
1949                "total_duration_ms": total_duration_ms,
1950                "total_pages_processed": total_pages,
1951            },
1952            "memory": {
1953                "path": args.file.display().to_string(),
1954                "frame_count": stats.frame_count,
1955                "size_bytes": stats.size_bytes,
1956                "tier": format!("{:?}", stats.tier),
1957            },
1958            "reports": reports_json,
1959        });
1960        println!("{}", serde_json::to_string_pretty(&summary_json)?);
1961    } else {
1962        println!(
1963            "Committed {} frame(s); total frames {}",
1964            reports.len(),
1965            stats.frame_count
1966        );
1967        println!(
1968            "Summary: processed {}, ingested {}, skipped {}",
1969            processed,
1970            reports.len(),
1971            skipped
1972        );
1973        // Show storage efficiency - use actual payload bytes for accuracy
1974        // When --no-raw is used, stored_bytes in reports doesn't reflect actual storage
1975        if bytes_input > 0 {
1976            let actual_stored = stats.payload_bytes;
1977            if actual_stored < bytes_input && no_raw_count > 0 {
1978                // --no-raw mode: show the dramatic reduction (raw files → extracted text)
1979                let reduction = ((bytes_input - actual_stored) as f64 / bytes_input as f64) * 100.0;
1980                println!(
1981                    "Storage: {} input → {} stored ({:.1}% smaller, text extracted)",
1982                    format_bytes(bytes_input),
1983                    format_bytes(actual_stored),
1984                    reduction
1985                );
1986            } else if bytes_added < bytes_input {
1987                // Normal compression
1988                let reduction = ((bytes_input - bytes_added) as f64 / bytes_input as f64) * 100.0;
1989                println!(
1990                    "Storage: {} input → {} stored ({:.1}% smaller)",
1991                    format_bytes(bytes_input),
1992                    format_bytes(bytes_added),
1993                    reduction
1994                );
1995            } else {
1996                println!("Storage: {} added", format_bytes(bytes_added));
1997            }
1998        }
1999        if tables_extracted > 0 {
2000            println!("Tables: {} extracted from PDF(s)", tables_extracted);
2001        }
2002        if enrichment_cards > 0 {
2003            println!("Enrichment: {} memory card(s) extracted", enrichment_cards);
2004        }
2005    }
2006
2007    // Save active replay session if one exists
2008    #[cfg(feature = "replay")]
2009    let _ = mem.save_active_session();
2010
2011    Ok(())
2012}
2013
2014pub fn handle_update(config: &CliConfig, args: UpdateArgs) -> Result<()> {
2015    let mut mem = Memvid::open(&args.file)?;
2016    ensure_cli_mutation_allowed(&mem)?;
2017    apply_lock_cli(&mut mem, &args.lock);
2018    let existing = select_frame(&mut mem, args.frame_id, args.uri.as_deref())?;
2019    let frame_id = existing.id;
2020
2021    let payload_bytes = match args.input.as_ref() {
2022        Some(path) => Some(read_payload(Some(path))?),
2023        None => None,
2024    };
2025    let payload_slice = payload_bytes.as_deref();
2026    let payload_utf8 = payload_slice.and_then(|bytes| std::str::from_utf8(bytes).ok());
2027    let source_path = args.input.as_deref();
2028
2029    let mut options = PutOptions::default();
2030    options.enable_embedding = false;
2031    options.auto_tag = payload_slice.is_some();
2032    options.extract_dates = payload_slice.is_some();
2033    options.timestamp = Some(existing.timestamp);
2034    options.track = existing.track.clone();
2035    options.kind = existing.kind.clone();
2036    options.uri = existing.uri.clone();
2037    options.title = existing.title.clone();
2038    options.metadata = existing.metadata.clone();
2039    options.search_text = existing.search_text.clone();
2040    options.tags = existing.tags.clone();
2041    options.labels = existing.labels.clone();
2042    options.extra_metadata = existing.extra_metadata.clone();
2043
2044    if let Some(new_uri) = &args.set_uri {
2045        options.uri = Some(derive_uri(Some(new_uri), None));
2046    }
2047
2048    if options.uri.is_none() && payload_slice.is_some() {
2049        options.uri = Some(derive_uri(None, source_path));
2050    }
2051
2052    if let Some(title) = &args.title {
2053        options.title = Some(title.clone());
2054    }
2055    if let Some(ts) = args.timestamp {
2056        options.timestamp = Some(ts);
2057    }
2058    if let Some(track) = &args.track {
2059        options.track = Some(track.clone());
2060    }
2061    if let Some(kind) = &args.kind {
2062        options.kind = Some(kind.clone());
2063    }
2064
2065    if !args.labels.is_empty() {
2066        options.labels = args.labels.clone();
2067    }
2068
2069    if !args.tags.is_empty() {
2070        options.tags.clear();
2071        for (key, value) in &args.tags {
2072            if !key.is_empty() {
2073                options.tags.push(key.clone());
2074            }
2075            if !value.is_empty() && value != key {
2076                options.tags.push(value.clone());
2077            }
2078            options.extra_metadata.insert(key.clone(), value.clone());
2079        }
2080    }
2081
2082    apply_metadata_overrides(&mut options, &args.metadata);
2083
2084    let mut search_source = options.search_text.clone();
2085
2086    if let Some(payload) = payload_slice {
2087        let inferred_title = derive_title(args.title.clone(), source_path, payload_utf8);
2088        if let Some(title) = inferred_title {
2089            options.title = Some(title);
2090        }
2091
2092        if options.uri.is_none() {
2093            options.uri = Some(derive_uri(None, source_path));
2094        }
2095
2096        let analysis = analyze_file(
2097            source_path,
2098            payload,
2099            payload_utf8,
2100            options.title.as_deref(),
2101            AudioAnalyzeOptions::default(),
2102            false,
2103        );
2104        if let Some(meta) = analysis.metadata {
2105            options.metadata = Some(meta);
2106        }
2107        if let Some(text) = analysis.search_text {
2108            search_source = Some(text.clone());
2109            options.search_text = Some(text);
2110        }
2111    } else {
2112        options.auto_tag = false;
2113        options.extract_dates = false;
2114    }
2115
2116    let stats = mem.stats()?;
2117    options.enable_embedding = stats.has_vec_index;
2118
2119    // Auto-detect embedding model from existing MV2 dimension
2120    let mv2_dimension = mem.effective_vec_index_dimension()?;
2121    let mut embedding: Option<Vec<f32>> = None;
2122    if args.embeddings {
2123        let inferred_embedding_model = match mem.embedding_identity_summary(10_000) {
2124            memvid_core::EmbeddingIdentitySummary::Single(identity) => {
2125                identity.model.map(String::from)
2126            }
2127            memvid_core::EmbeddingIdentitySummary::Mixed(identities) => {
2128                let models: Vec<_> = identities
2129                    .iter()
2130                    .filter_map(|entry| entry.identity.model.as_deref())
2131                    .collect();
2132                anyhow::bail!(
2133                    "memory contains mixed embedding models; refusing to recompute embeddings.\n\n\
2134                    Detected models: {:?}",
2135                    models
2136                );
2137            }
2138            memvid_core::EmbeddingIdentitySummary::Unknown => None,
2139        };
2140
2141        let runtime = load_embedding_runtime_for_mv2(
2142            config,
2143            inferred_embedding_model.as_deref(),
2144            mv2_dimension,
2145        )?;
2146        if let Some(text) = search_source.as_ref() {
2147            if !text.trim().is_empty() {
2148                embedding = Some(runtime.embed_passage(text)?);
2149            }
2150        }
2151        if embedding.is_none() {
2152            if let Some(text) = payload_utf8 {
2153                if !text.trim().is_empty() {
2154                    embedding = Some(runtime.embed_passage(text)?);
2155                }
2156            }
2157        }
2158        if embedding.is_none() {
2159            warn!("no textual content available; embeddings not recomputed");
2160        }
2161        if embedding.is_some() {
2162            apply_embedding_identity_metadata(&mut options, &runtime);
2163        }
2164    }
2165
2166    let mut effective_embedding = embedding;
2167    if effective_embedding.is_none() && stats.has_vec_index {
2168        effective_embedding = mem.frame_embedding(frame_id)?;
2169    }
2170
2171    let final_uri = options.uri.clone();
2172    let replaced_payload = payload_slice.is_some();
2173    let seq = mem.update_frame(frame_id, payload_bytes, options, effective_embedding)?;
2174    mem.commit()?;
2175
2176    let updated_frame =
2177        if let Some(uri) = final_uri.and_then(|u| if u.is_empty() { None } else { Some(u) }) {
2178            mem.frame_by_uri(&uri)?
2179        } else {
2180            let mut query = TimelineQueryBuilder::default();
2181            if let Some(limit) = NonZeroU64::new(1) {
2182                query = query.limit(limit).reverse(true);
2183            }
2184            let latest = mem.timeline(query.build())?;
2185            if let Some(entry) = latest.first() {
2186                mem.frame_by_id(entry.frame_id)?
2187            } else {
2188                mem.frame_by_id(frame_id)?
2189            }
2190        };
2191
2192    if args.json {
2193        let json = json!({
2194            "sequence": seq,
2195            "previous_frame_id": frame_id,
2196            "frame": frame_to_json(&updated_frame),
2197            "replaced_payload": replaced_payload,
2198        });
2199        println!("{}", serde_json::to_string_pretty(&json)?);
2200    } else {
2201        println!(
2202            "Updated frame {} → new frame {} (seq {})",
2203            frame_id, updated_frame.id, seq
2204        );
2205        println!(
2206            "Payload: {}",
2207            if replaced_payload {
2208                "replaced"
2209            } else {
2210                "reused"
2211            }
2212        );
2213        print_frame_summary(&mut mem, &updated_frame)?;
2214    }
2215
2216    Ok(())
2217}
2218
2219fn derive_uri(provided: Option<&str>, source: Option<&Path>) -> String {
2220    if let Some(uri) = provided {
2221        let sanitized = sanitize_uri(uri, true);
2222        return format!("mv2://{}", sanitized);
2223    }
2224
2225    if let Some(path) = source {
2226        let raw = path.to_string_lossy();
2227        let sanitized = sanitize_uri(&raw, false);
2228        return format!("mv2://{}", sanitized);
2229    }
2230
2231    format!("mv2://frames/{}", Uuid::new_v4())
2232}
2233
2234pub fn derive_video_uri(payload: &[u8], source: Option<&Path>, mime: &str) -> String {
2235    let digest = hash(payload).to_hex();
2236    let short = &digest[..32];
2237    let ext_from_path = source
2238        .and_then(|path| path.extension())
2239        .and_then(|ext| ext.to_str())
2240        .map(|ext| ext.trim_start_matches('.').to_ascii_lowercase())
2241        .filter(|ext| !ext.is_empty());
2242    let ext = ext_from_path
2243        .or_else(|| extension_from_mime(mime).map(|ext| ext.to_string()))
2244        .unwrap_or_else(|| "bin".to_string());
2245    format!("mv2://video/{short}.{ext}")
2246}
2247
2248fn derive_title(
2249    provided: Option<String>,
2250    source: Option<&Path>,
2251    payload_utf8: Option<&str>,
2252) -> Option<String> {
2253    if let Some(title) = provided {
2254        let trimmed = title.trim();
2255        if trimmed.is_empty() {
2256            None
2257        } else {
2258            Some(trimmed.to_string())
2259        }
2260    } else {
2261        if let Some(text) = payload_utf8 {
2262            if let Some(markdown_title) = extract_markdown_title(text) {
2263                return Some(markdown_title);
2264            }
2265        }
2266        source
2267            .and_then(|path| path.file_stem())
2268            .and_then(|stem| stem.to_str())
2269            .map(to_title_case)
2270    }
2271}
2272
2273fn extract_markdown_title(text: &str) -> Option<String> {
2274    for line in text.lines() {
2275        let trimmed = line.trim();
2276        if trimmed.starts_with('#') {
2277            let title = trimmed.trim_start_matches('#').trim();
2278            if !title.is_empty() {
2279                return Some(title.to_string());
2280            }
2281        }
2282    }
2283    None
2284}
2285
2286fn to_title_case(input: &str) -> String {
2287    if input.is_empty() {
2288        return String::new();
2289    }
2290    let mut chars = input.chars();
2291    let Some(first) = chars.next() else {
2292        return String::new();
2293    };
2294    let prefix = first.to_uppercase().collect::<String>();
2295    prefix + chars.as_str()
2296}
2297
2298fn should_skip_input(path: &Path) -> bool {
2299    matches!(
2300        path.file_name().and_then(|name| name.to_str()),
2301        Some(".DS_Store")
2302    )
2303}
2304
2305fn should_skip_dir(path: &Path) -> bool {
2306    matches!(
2307        path.file_name().and_then(|name| name.to_str()),
2308        Some(name) if name.starts_with('.')
2309    )
2310}
2311
2312enum InputSet {
2313    Stdin,
2314    Files(Vec<PathBuf>),
2315}
2316
2317/// Check if a file is an image based on extension
2318#[cfg(feature = "clip")]
2319fn is_image_file(path: &std::path::Path) -> bool {
2320    let Some(ext) = path.extension().and_then(|e| e.to_str()) else {
2321        return false;
2322    };
2323    matches!(
2324        ext.to_ascii_lowercase().as_str(),
2325        "jpg" | "jpeg" | "png" | "gif" | "bmp" | "webp" | "tiff" | "tif" | "ico"
2326    )
2327}
2328
2329fn resolve_inputs(input: Option<&str>) -> Result<InputSet> {
2330    let Some(raw) = input else {
2331        return Ok(InputSet::Stdin);
2332    };
2333
2334    if raw.contains('*') || raw.contains('?') || raw.contains('[') {
2335        let mut files = Vec::new();
2336        let mut matched_any = false;
2337        for entry in glob(raw)? {
2338            let path = entry?;
2339            if path.is_file() {
2340                matched_any = true;
2341                if should_skip_input(&path) {
2342                    continue;
2343                }
2344                files.push(path);
2345            }
2346        }
2347        files.sort();
2348        if files.is_empty() {
2349            if matched_any {
2350                anyhow::bail!(
2351                    "pattern '{raw}' only matched files that are ignored by default (e.g. .DS_Store)"
2352                );
2353            }
2354            anyhow::bail!("pattern '{raw}' did not match any files");
2355        }
2356        return Ok(InputSet::Files(files));
2357    }
2358
2359    let path = PathBuf::from(raw);
2360    if path.is_dir() {
2361        let (mut files, skipped_any) = collect_directory_files(&path)?;
2362        files.sort();
2363        if files.is_empty() {
2364            if skipped_any {
2365                anyhow::bail!(
2366                    "directory '{}' contains only ignored files (e.g. .DS_Store/.git)",
2367                    path.display()
2368                );
2369            }
2370            anyhow::bail!(
2371                "directory '{}' contains no ingestible files",
2372                path.display()
2373            );
2374        }
2375        Ok(InputSet::Files(files))
2376    } else {
2377        Ok(InputSet::Files(vec![path]))
2378    }
2379}
2380
2381fn collect_directory_files(root: &Path) -> Result<(Vec<PathBuf>, bool)> {
2382    let mut files = Vec::new();
2383    let mut skipped_any = false;
2384    let mut stack = vec![root.to_path_buf()];
2385    while let Some(dir) = stack.pop() {
2386        for entry in fs::read_dir(&dir)? {
2387            let entry = entry?;
2388            let path = entry.path();
2389            let file_type = entry.file_type()?;
2390            if file_type.is_dir() {
2391                if should_skip_dir(&path) {
2392                    skipped_any = true;
2393                    continue;
2394                }
2395                stack.push(path);
2396            } else if file_type.is_file() {
2397                if should_skip_input(&path) {
2398                    skipped_any = true;
2399                    continue;
2400                }
2401                files.push(path);
2402            }
2403        }
2404    }
2405    Ok((files, skipped_any))
2406}
2407
2408struct IngestOutcome {
2409    report: PutReport,
2410    embedded: bool,
2411    #[cfg(feature = "parallel_segments")]
2412    parallel_input: Option<ParallelInput>,
2413}
2414
2415struct PutReport {
2416    seq: u64,
2417    #[allow(dead_code)] // Used in feature-gated code (clip, logic_mesh)
2418    frame_id: u64,
2419    uri: String,
2420    title: Option<String>,
2421    original_bytes: usize,
2422    stored_bytes: usize,
2423    compressed: bool,
2424    /// --no-raw mode: raw binary not stored, only extracted text + hash
2425    no_raw: bool,
2426    source: Option<PathBuf>,
2427    mime: Option<String>,
2428    metadata: Option<DocMetadata>,
2429    extraction: ExtractionSummary,
2430    /// Text content for entity extraction (only populated when --logic-mesh is enabled)
2431    #[cfg(feature = "logic_mesh")]
2432    search_text: Option<String>,
2433}
2434
2435struct CapacityGuard {
2436    #[allow(dead_code)]
2437    tier: Tier,
2438    capacity: u64,
2439    current_size: u64,
2440}
2441
2442impl CapacityGuard {
2443    const MIN_OVERHEAD: u64 = 128 * 1024;
2444    const RELATIVE_OVERHEAD: f64 = 0.05;
2445
2446    fn from_stats(stats: &Stats) -> Option<Self> {
2447        if stats.capacity_bytes == 0 {
2448            return None;
2449        }
2450        Some(Self {
2451            tier: stats.tier,
2452            capacity: stats.capacity_bytes,
2453            current_size: stats.size_bytes,
2454        })
2455    }
2456
2457    fn ensure_capacity(&self, additional: u64) -> Result<()> {
2458        let projected = self
2459            .current_size
2460            .saturating_add(additional)
2461            .saturating_add(Self::estimate_overhead(additional));
2462        if projected > self.capacity {
2463            return Err(map_put_error(
2464                MemvidError::CapacityExceeded {
2465                    current: self.current_size,
2466                    limit: self.capacity,
2467                    required: projected,
2468                },
2469                Some(self.capacity),
2470            ));
2471        }
2472        Ok(())
2473    }
2474
2475    fn update_after_commit(&mut self, stats: &Stats) {
2476        self.current_size = stats.size_bytes;
2477    }
2478
2479    fn capacity_hint(&self) -> Option<u64> {
2480        Some(self.capacity)
2481    }
2482
2483    // PHASE 2: Check capacity and warn at thresholds (50%, 75%, 90%)
2484    fn check_and_warn_capacity(&self) {
2485        if self.capacity == 0 {
2486            return;
2487        }
2488
2489        let usage_pct = (self.current_size as f64 / self.capacity as f64) * 100.0;
2490
2491        // Warn at key thresholds
2492        if usage_pct >= 90.0 && usage_pct < 91.0 {
2493            eprintln!(
2494                "⚠️  CRITICAL: 90% capacity used ({} / {})",
2495                format_bytes(self.current_size),
2496                format_bytes(self.capacity)
2497            );
2498            eprintln!("   Actions:");
2499            eprintln!("   1. Recreate with larger --size");
2500            eprintln!("   2. Delete old frames with: memvid delete <file> --uri <pattern>");
2501            eprintln!("   3. If using vectors, consider --no-embedding for new docs");
2502        } else if usage_pct >= 75.0 && usage_pct < 76.0 {
2503            eprintln!(
2504                "⚠️  WARNING: 75% capacity used ({} / {})",
2505                format_bytes(self.current_size),
2506                format_bytes(self.capacity)
2507            );
2508            eprintln!("   Consider planning for capacity expansion soon");
2509        } else if usage_pct >= 50.0 && usage_pct < 51.0 {
2510            eprintln!(
2511                "ℹ️  INFO: 50% capacity used ({} / {})",
2512                format_bytes(self.current_size),
2513                format_bytes(self.capacity)
2514            );
2515        }
2516    }
2517
2518    fn estimate_overhead(additional: u64) -> u64 {
2519        let fractional = ((additional as f64) * Self::RELATIVE_OVERHEAD).ceil() as u64;
2520        fractional.max(Self::MIN_OVERHEAD)
2521    }
2522}
2523
2524#[derive(Debug, Clone)]
2525pub struct FileAnalysis {
2526    pub mime: String,
2527    pub metadata: Option<DocMetadata>,
2528    pub search_text: Option<String>,
2529    pub extraction: ExtractionSummary,
2530}
2531
2532#[derive(Clone, Copy)]
2533pub struct AudioAnalyzeOptions {
2534    force: bool,
2535    segment_secs: u32,
2536    transcribe: bool,
2537}
2538
2539impl AudioAnalyzeOptions {
2540    const DEFAULT_SEGMENT_SECS: u32 = 30;
2541
2542    fn normalised_segment_secs(self) -> u32 {
2543        self.segment_secs.max(1)
2544    }
2545}
2546
2547impl Default for AudioAnalyzeOptions {
2548    fn default() -> Self {
2549        Self {
2550            force: false,
2551            segment_secs: Self::DEFAULT_SEGMENT_SECS,
2552            transcribe: false,
2553        }
2554    }
2555}
2556
2557struct AudioAnalysis {
2558    metadata: DocAudioMetadata,
2559    caption: Option<String>,
2560    search_terms: Vec<String>,
2561    transcript: Option<String>,
2562}
2563
2564struct ImageAnalysis {
2565    width: u32,
2566    height: u32,
2567    palette: Vec<String>,
2568    caption: Option<String>,
2569    exif: Option<DocExifMetadata>,
2570}
2571
2572#[derive(Debug, Clone)]
2573pub struct ExtractionSummary {
2574    reader: Option<String>,
2575    status: ExtractionStatus,
2576    warnings: Vec<String>,
2577    duration_ms: Option<u64>,
2578    pages_processed: Option<u32>,
2579}
2580
2581impl ExtractionSummary {
2582    fn record_warning<S: Into<String>>(&mut self, warning: S) {
2583        self.warnings.push(warning.into());
2584    }
2585}
2586
2587#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2588enum ExtractionStatus {
2589    Skipped,
2590    Ok,
2591    FallbackUsed,
2592    Empty,
2593    Failed,
2594}
2595
2596impl ExtractionStatus {
2597    fn label(self) -> &'static str {
2598        match self {
2599            Self::Skipped => "skipped",
2600            Self::Ok => "ok",
2601            Self::FallbackUsed => "fallback_used",
2602            Self::Empty => "empty",
2603            Self::Failed => "failed",
2604        }
2605    }
2606}
2607
2608fn analyze_video(source_path: Option<&Path>, mime: &str, bytes: u64) -> MediaManifest {
2609    let filename = source_path
2610        .and_then(|path| path.file_name())
2611        .and_then(|name| name.to_str())
2612        .map(|value| value.to_string());
2613    MediaManifest {
2614        kind: "video".to_string(),
2615        mime: mime.to_string(),
2616        bytes,
2617        filename,
2618        duration_ms: None,
2619        width: None,
2620        height: None,
2621        codec: None,
2622    }
2623}
2624
2625pub fn analyze_file(
2626    source_path: Option<&Path>,
2627    payload: &[u8],
2628    payload_utf8: Option<&str>,
2629    inferred_title: Option<&str>,
2630    audio_opts: AudioAnalyzeOptions,
2631    force_video: bool,
2632) -> FileAnalysis {
2633    let (mime, treat_as_text_base) = detect_mime(source_path, payload, payload_utf8);
2634    let is_video = force_video || mime.starts_with("video/");
2635    let treat_as_text = if is_video { false } else { treat_as_text_base };
2636
2637    let mut metadata = DocMetadata::default();
2638    metadata.mime = Some(mime.clone());
2639    metadata.bytes = Some(payload.len() as u64);
2640    metadata.hash = Some(hash(payload).to_hex().to_string());
2641
2642    let mut search_text = None;
2643
2644    let mut extraction = ExtractionSummary {
2645        reader: None,
2646        status: ExtractionStatus::Skipped,
2647        warnings: Vec::new(),
2648        duration_ms: None,
2649        pages_processed: None,
2650    };
2651
2652    let reader_registry = default_reader_registry();
2653    let magic = payload.get(..MAGIC_SNIFF_BYTES).and_then(|slice| {
2654        if slice.is_empty() {
2655            None
2656        } else {
2657            Some(slice)
2658        }
2659    });
2660
2661    let apply_extracted_text = |text: &str,
2662                                reader_label: &str,
2663                                status_if_applied: ExtractionStatus,
2664                                extraction: &mut ExtractionSummary,
2665                                metadata: &mut DocMetadata,
2666                                search_text: &mut Option<String>|
2667     -> bool {
2668        if let Some(normalized) = normalize_text(text, MAX_SEARCH_TEXT_LEN) {
2669            let mut value = normalized.text;
2670            if normalized.truncated {
2671                value.push('…');
2672            }
2673            if metadata.caption.is_none() {
2674                if let Some(caption) = caption_from_text(&value) {
2675                    metadata.caption = Some(caption);
2676                }
2677            }
2678            *search_text = Some(value);
2679            extraction.reader = Some(reader_label.to_string());
2680            extraction.status = status_if_applied;
2681            true
2682        } else {
2683            false
2684        }
2685    };
2686
2687    if is_video {
2688        metadata.media = Some(analyze_video(source_path, &mime, payload.len() as u64));
2689    }
2690
2691    if treat_as_text {
2692        if let Some(text) = payload_utf8 {
2693            if !apply_extracted_text(
2694                text,
2695                "bytes",
2696                ExtractionStatus::Ok,
2697                &mut extraction,
2698                &mut metadata,
2699                &mut search_text,
2700            ) {
2701                extraction.status = ExtractionStatus::Empty;
2702                extraction.reader = Some("bytes".to_string());
2703                let msg = "text payload contained no searchable content after normalization";
2704                extraction.record_warning(msg);
2705                warn!("{msg}");
2706            }
2707        } else {
2708            extraction.reader = Some("bytes".to_string());
2709            extraction.status = ExtractionStatus::Failed;
2710            let msg = "payload reported as text but was not valid UTF-8";
2711            extraction.record_warning(msg);
2712            warn!("{msg}");
2713        }
2714    } else if mime == "application/pdf"
2715        || mime == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
2716        || mime == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
2717        || mime == "application/vnd.openxmlformats-officedocument.presentationml.presentation"
2718        || mime == "application/vnd.ms-excel"
2719        || mime == "application/msword"
2720    {
2721        // Use reader registry for PDFs and Office documents (XLSX, DOCX, PPTX, etc.)
2722        let mut applied = false;
2723
2724        let format_hint = infer_document_format(Some(mime.as_str()), magic);
2725        let hint = ReaderHint::new(Some(mime.as_str()), format_hint)
2726            .with_uri(source_path.and_then(|p| p.to_str()))
2727            .with_magic(magic);
2728
2729        if let Some(reader) = reader_registry.find_reader(&hint) {
2730            match reader.extract(payload, &hint) {
2731                Ok(output) => {
2732                    extraction.reader = Some(output.reader_name.clone());
2733                    if let Some(ms) = output.diagnostics.duration_ms {
2734                        extraction.duration_ms = Some(ms);
2735                    }
2736                    if let Some(pages) = output.diagnostics.pages_processed {
2737                        extraction.pages_processed = Some(pages);
2738                    }
2739                    if let Some(mime_type) = output.document.mime_type.clone() {
2740                        metadata.mime = Some(mime_type);
2741                    }
2742
2743                    for warning in output.diagnostics.warnings.iter() {
2744                        extraction.record_warning(warning);
2745                        warn!("{warning}");
2746                    }
2747
2748                    let status = if output.diagnostics.fallback {
2749                        ExtractionStatus::FallbackUsed
2750                    } else {
2751                        ExtractionStatus::Ok
2752                    };
2753
2754                    if let Some(doc_text) = output.document.text.as_ref() {
2755                        applied = apply_extracted_text(
2756                            doc_text,
2757                            output.reader_name.as_str(),
2758                            status,
2759                            &mut extraction,
2760                            &mut metadata,
2761                            &mut search_text,
2762                        );
2763                        if !applied {
2764                            extraction.reader = Some(output.reader_name);
2765                            extraction.status = ExtractionStatus::Empty;
2766                            let msg = "primary reader returned no usable text";
2767                            extraction.record_warning(msg);
2768                            warn!("{msg}");
2769                        }
2770                    } else {
2771                        extraction.reader = Some(output.reader_name);
2772                        extraction.status = ExtractionStatus::Empty;
2773                        let msg = "primary reader returned empty text";
2774                        extraction.record_warning(msg);
2775                        warn!("{msg}");
2776                    }
2777                }
2778                Err(err) => {
2779                    let name = reader.name();
2780                    extraction.reader = Some(name.to_string());
2781                    extraction.status = ExtractionStatus::Failed;
2782                    let msg = format!("primary reader {name} failed: {err}");
2783                    extraction.record_warning(&msg);
2784                    warn!("{msg}");
2785                }
2786            }
2787        } else {
2788            extraction.record_warning("no registered reader matched this PDF payload");
2789            warn!("no registered reader matched this PDF payload");
2790        }
2791
2792        if !applied {
2793            match extract_pdf_text(payload) {
2794                Ok(pdf_text) => {
2795                    if !apply_extracted_text(
2796                        &pdf_text,
2797                        "pdf_extract",
2798                        ExtractionStatus::FallbackUsed,
2799                        &mut extraction,
2800                        &mut metadata,
2801                        &mut search_text,
2802                    ) {
2803                        extraction.reader = Some("pdf_extract".to_string());
2804                        extraction.status = ExtractionStatus::Empty;
2805                        let msg = "PDF text extraction yielded no searchable content";
2806                        extraction.record_warning(msg);
2807                        warn!("{msg}");
2808                    }
2809                }
2810                Err(err) => {
2811                    extraction.reader = Some("pdf_extract".to_string());
2812                    extraction.status = ExtractionStatus::Failed;
2813                    let msg = format!("failed to extract PDF text via fallback: {err}");
2814                    extraction.record_warning(&msg);
2815                    warn!("{msg}");
2816                }
2817            }
2818        }
2819    }
2820
2821    if !is_video && mime.starts_with("image/") {
2822        if let Some(image_meta) = analyze_image(payload) {
2823            let ImageAnalysis {
2824                width,
2825                height,
2826                palette,
2827                caption,
2828                exif,
2829            } = image_meta;
2830            metadata.width = Some(width);
2831            metadata.height = Some(height);
2832            if !palette.is_empty() {
2833                metadata.colors = Some(palette);
2834            }
2835            if metadata.caption.is_none() {
2836                metadata.caption = caption;
2837            }
2838            if let Some(exif) = exif {
2839                metadata.exif = Some(exif);
2840            }
2841        }
2842    }
2843
2844    if !is_video && (audio_opts.force || mime.starts_with("audio/")) {
2845        if let Some(AudioAnalysis {
2846            metadata: audio_metadata,
2847            caption: audio_caption,
2848            mut search_terms,
2849            transcript,
2850        }) = analyze_audio(payload, source_path, &mime, audio_opts)
2851        {
2852            if metadata.audio.is_none()
2853                || metadata
2854                    .audio
2855                    .as_ref()
2856                    .map_or(true, DocAudioMetadata::is_empty)
2857            {
2858                metadata.audio = Some(audio_metadata);
2859            }
2860            if metadata.caption.is_none() {
2861                metadata.caption = audio_caption;
2862            }
2863
2864            // Use transcript as primary search text if available
2865            if let Some(ref text) = transcript {
2866                extraction.reader = Some("whisper".to_string());
2867                extraction.status = ExtractionStatus::Ok;
2868                search_text = Some(text.clone());
2869            } else if !search_terms.is_empty() {
2870                match &mut search_text {
2871                    Some(existing) => {
2872                        for term in search_terms.drain(..) {
2873                            if !existing.contains(&term) {
2874                                if !existing.ends_with(' ') {
2875                                    existing.push(' ');
2876                                }
2877                                existing.push_str(&term);
2878                            }
2879                        }
2880                    }
2881                    None => {
2882                        search_text = Some(search_terms.join(" "));
2883                    }
2884                }
2885            }
2886        }
2887    }
2888
2889    if metadata.caption.is_none() {
2890        if let Some(title) = inferred_title {
2891            let trimmed = title.trim();
2892            if !trimmed.is_empty() {
2893                metadata.caption = Some(truncate_to_boundary(trimmed, 240));
2894            }
2895        }
2896    }
2897
2898    FileAnalysis {
2899        mime,
2900        metadata: if metadata.is_empty() {
2901            None
2902        } else {
2903            Some(metadata)
2904        },
2905        search_text,
2906        extraction,
2907    }
2908}
2909
2910fn default_reader_registry() -> &'static ReaderRegistry {
2911    static REGISTRY: OnceLock<ReaderRegistry> = OnceLock::new();
2912    REGISTRY.get_or_init(ReaderRegistry::default)
2913}
2914
2915fn infer_document_format(mime: Option<&str>, magic: Option<&[u8]>) -> Option<DocumentFormat> {
2916    if detect_pdf_magic(magic) {
2917        return Some(DocumentFormat::Pdf);
2918    }
2919
2920    let mime = mime?.trim().to_ascii_lowercase();
2921    match mime.as_str() {
2922        "application/pdf" => Some(DocumentFormat::Pdf),
2923        "text/plain" => Some(DocumentFormat::PlainText),
2924        "text/markdown" => Some(DocumentFormat::Markdown),
2925        "text/html" | "application/xhtml+xml" => Some(DocumentFormat::Html),
2926        "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
2927            Some(DocumentFormat::Docx)
2928        }
2929        "application/vnd.openxmlformats-officedocument.presentationml.presentation" => {
2930            Some(DocumentFormat::Pptx)
2931        }
2932        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
2933            Some(DocumentFormat::Xlsx)
2934        }
2935        other if other.starts_with("text/") => Some(DocumentFormat::PlainText),
2936        _ => None,
2937    }
2938}
2939
2940fn detect_pdf_magic(magic: Option<&[u8]>) -> bool {
2941    let mut slice = match magic {
2942        Some(slice) if !slice.is_empty() => slice,
2943        _ => return false,
2944    };
2945    if slice.starts_with(&[0xEF, 0xBB, 0xBF]) {
2946        slice = &slice[3..];
2947    }
2948    while let Some((first, rest)) = slice.split_first() {
2949        if first.is_ascii_whitespace() {
2950            slice = rest;
2951        } else {
2952            break;
2953        }
2954    }
2955    slice.starts_with(b"%PDF")
2956}
2957
2958/// Robust PDF text extraction with multiple fallbacks for cross-platform support.
2959/// Tries all extractors and returns the one with the most text content.
2960fn extract_pdf_text(payload: &[u8]) -> Result<String, PdfExtractError> {
2961    let mut best_text = String::new();
2962    let mut best_source = "none";
2963
2964    // Minimum chars to consider extraction "good" - scales with PDF size
2965    // For a 12MB PDF, expect at least ~1000 chars; for 1MB, ~100 chars
2966    let min_good_chars = (payload.len() / 10000).max(100).min(5000);
2967
2968    // Try pdf_extract first
2969    match pdf_extract::extract_text_from_mem(payload) {
2970        Ok(text) => {
2971            let trimmed = text.trim();
2972            if trimmed.len() > best_text.len() {
2973                best_text = trimmed.to_string();
2974                best_source = "pdf_extract";
2975            }
2976            if trimmed.len() >= min_good_chars {
2977                info!("pdf_extract extracted {} chars (good)", trimmed.len());
2978                return Ok(best_text);
2979            } else if !trimmed.is_empty() {
2980                warn!(
2981                    "pdf_extract returned only {} chars, trying other extractors",
2982                    trimmed.len()
2983                );
2984            }
2985        }
2986        Err(err) => {
2987            warn!("pdf_extract failed: {err}");
2988        }
2989    }
2990
2991    // Try lopdf - pure Rust, works on all platforms
2992    match extract_pdf_with_lopdf(payload) {
2993        Ok(text) => {
2994            let trimmed = text.trim();
2995            if trimmed.len() > best_text.len() {
2996                best_text = trimmed.to_string();
2997                best_source = "lopdf";
2998            }
2999            if trimmed.len() >= min_good_chars {
3000                info!("lopdf extracted {} chars (good)", trimmed.len());
3001                return Ok(best_text);
3002            } else if !trimmed.is_empty() {
3003                warn!(
3004                    "lopdf returned only {} chars, trying pdftotext",
3005                    trimmed.len()
3006                );
3007            }
3008        }
3009        Err(err) => {
3010            warn!("lopdf failed: {err}");
3011        }
3012    }
3013
3014    // Try pdftotext binary (requires poppler installed)
3015    match fallback_pdftotext(payload) {
3016        Ok(text) => {
3017            let trimmed = text.trim();
3018            if trimmed.len() > best_text.len() {
3019                best_text = trimmed.to_string();
3020                best_source = "pdftotext";
3021            }
3022            if !trimmed.is_empty() {
3023                info!("pdftotext extracted {} chars", trimmed.len());
3024            }
3025        }
3026        Err(err) => {
3027            warn!("pdftotext failed: {err}");
3028        }
3029    }
3030
3031    // Return the best result we found
3032    if !best_text.is_empty() {
3033        info!(
3034            "using {} extraction ({} chars)",
3035            best_source,
3036            best_text.len()
3037        );
3038        Ok(best_text)
3039    } else {
3040        warn!("all PDF extraction methods failed, storing without searchable text");
3041        Ok(String::new())
3042    }
3043}
3044
3045/// Extract text from PDF using lopdf (pure Rust, no native dependencies)
3046fn extract_pdf_with_lopdf(payload: &[u8]) -> Result<String> {
3047    use lopdf::Document;
3048
3049    let mut document =
3050        Document::load_mem(payload).map_err(|e| anyhow!("lopdf failed to load PDF: {e}"))?;
3051
3052    // Try to decrypt if encrypted (empty password)
3053    if document.is_encrypted() {
3054        let _ = document.decrypt("");
3055    }
3056
3057    // Decompress streams for text extraction
3058    let _ = document.decompress();
3059
3060    // Get sorted page numbers
3061    let mut page_numbers: Vec<u32> = document.get_pages().keys().copied().collect();
3062    if page_numbers.is_empty() {
3063        return Err(anyhow!("PDF has no pages"));
3064    }
3065    page_numbers.sort_unstable();
3066
3067    // Limit to 4096 pages max
3068    if page_numbers.len() > 4096 {
3069        page_numbers.truncate(4096);
3070        warn!("PDF has more than 4096 pages, truncating extraction");
3071    }
3072
3073    // Extract text from all pages
3074    let text = document
3075        .extract_text(&page_numbers)
3076        .map_err(|e| anyhow!("lopdf text extraction failed: {e}"))?;
3077
3078    Ok(text.trim().to_string())
3079}
3080
3081fn fallback_pdftotext(payload: &[u8]) -> Result<String> {
3082    use std::io::Write;
3083    use std::process::{Command, Stdio};
3084
3085    let pdftotext = which::which("pdftotext").context("pdftotext binary not found in PATH")?;
3086
3087    let mut temp_pdf = tempfile::NamedTempFile::new().context("failed to create temp pdf file")?;
3088    temp_pdf
3089        .write_all(payload)
3090        .context("failed to write pdf payload to temp file")?;
3091    temp_pdf.flush().context("failed to flush temp pdf file")?;
3092
3093    let child = Command::new(pdftotext)
3094        .arg("-layout")
3095        .arg(temp_pdf.path())
3096        .arg("-")
3097        .stdout(Stdio::piped())
3098        .spawn()
3099        .context("failed to spawn pdftotext")?;
3100
3101    let output = child
3102        .wait_with_output()
3103        .context("failed to read pdftotext output")?;
3104
3105    if !output.status.success() {
3106        return Err(anyhow!("pdftotext exited with status {}", output.status));
3107    }
3108
3109    let text = String::from_utf8(output.stdout).context("pdftotext produced non-UTF8 output")?;
3110    Ok(text)
3111}
3112
3113fn detect_mime(
3114    source_path: Option<&Path>,
3115    payload: &[u8],
3116    payload_utf8: Option<&str>,
3117) -> (String, bool) {
3118    // For ZIP-based formats (OOXML: docx, xlsx, pptx), check extension FIRST
3119    // because magic byte detection returns generic "application/zip"
3120    if let Some(path) = source_path {
3121        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3122            let ext_lower = ext.to_ascii_lowercase();
3123            // Office Open XML and legacy Office formats need extension-based detection
3124            if matches!(
3125                ext_lower.as_str(),
3126                "docx" | "xlsx" | "pptx" | "doc" | "xls" | "ppt"
3127            ) {
3128                if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3129                    return (mime.to_string(), treat_as_text);
3130                }
3131            }
3132        }
3133    }
3134
3135    if let Some(kind) = infer::get(payload) {
3136        let mime = kind.mime_type().to_string();
3137        let treat_as_text = mime.starts_with("text/")
3138            || matches!(
3139                mime.as_str(),
3140                "application/json" | "application/xml" | "application/javascript" | "image/svg+xml"
3141            );
3142        return (mime, treat_as_text);
3143    }
3144
3145    let magic = magic_from_u8(payload);
3146    if !magic.is_empty() && magic != "application/octet-stream" {
3147        let treat_as_text = magic.starts_with("text/")
3148            || matches!(
3149                magic,
3150                "application/json"
3151                    | "application/xml"
3152                    | "application/javascript"
3153                    | "image/svg+xml"
3154                    | "text/plain"
3155            );
3156        return (magic.to_string(), treat_as_text);
3157    }
3158
3159    if let Some(path) = source_path {
3160        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3161            if let Some((mime, treat_as_text)) = mime_from_extension(ext) {
3162                return (mime.to_string(), treat_as_text);
3163            }
3164        }
3165    }
3166
3167    if payload_utf8.is_some() {
3168        return ("text/plain".to_string(), true);
3169    }
3170
3171    ("application/octet-stream".to_string(), false)
3172}
3173
3174fn mime_from_extension(ext: &str) -> Option<(&'static str, bool)> {
3175    let ext_lower = ext.to_ascii_lowercase();
3176    match ext_lower.as_str() {
3177        "txt" | "text" | "log" | "cfg" | "conf" | "ini" | "properties" | "sql" | "rs" | "py"
3178        | "js" | "ts" | "tsx" | "jsx" | "c" | "h" | "cpp" | "hpp" | "go" | "rb" | "php" | "css"
3179        | "scss" | "sass" | "sh" | "bash" | "zsh" | "ps1" | "swift" | "kt" | "java" | "scala"
3180        | "lua" | "pl" | "pm" | "r" | "erl" | "ex" | "exs" | "dart" => Some(("text/plain", true)),
3181        "md" | "markdown" => Some(("text/markdown", true)),
3182        "rst" => Some(("text/x-rst", true)),
3183        "json" => Some(("application/json", true)),
3184        "csv" => Some(("text/csv", true)),
3185        "tsv" => Some(("text/tab-separated-values", true)),
3186        "yaml" | "yml" => Some(("application/yaml", true)),
3187        "toml" => Some(("application/toml", true)),
3188        "html" | "htm" => Some(("text/html", true)),
3189        "xml" => Some(("application/xml", true)),
3190        "svg" => Some(("image/svg+xml", true)),
3191        // Office Open XML formats (ZIP-based, need explicit extension mapping)
3192        "docx" => Some((
3193            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
3194            false,
3195        )),
3196        "xlsx" => Some((
3197            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
3198            false,
3199        )),
3200        "pptx" => Some((
3201            "application/vnd.openxmlformats-officedocument.presentationml.presentation",
3202            false,
3203        )),
3204        // Legacy Office formats
3205        "doc" => Some(("application/msword", false)),
3206        "xls" => Some(("application/vnd.ms-excel", false)),
3207        "ppt" => Some(("application/vnd.ms-powerpoint", false)),
3208        "gif" => Some(("image/gif", false)),
3209        "jpg" | "jpeg" => Some(("image/jpeg", false)),
3210        "png" => Some(("image/png", false)),
3211        "bmp" => Some(("image/bmp", false)),
3212        "ico" => Some(("image/x-icon", false)),
3213        "tif" | "tiff" => Some(("image/tiff", false)),
3214        "webp" => Some(("image/webp", false)),
3215        _ => None,
3216    }
3217}
3218
3219fn caption_from_text(text: &str) -> Option<String> {
3220    for line in text.lines() {
3221        let trimmed = line.trim();
3222        if !trimmed.is_empty() {
3223            return Some(truncate_to_boundary(trimmed, 240));
3224        }
3225    }
3226    None
3227}
3228
3229fn truncate_to_boundary(text: &str, max_len: usize) -> String {
3230    if text.len() <= max_len {
3231        return text.to_string();
3232    }
3233    let mut end = max_len;
3234    while end > 0 && !text.is_char_boundary(end) {
3235        end -= 1;
3236    }
3237    if end == 0 {
3238        return String::new();
3239    }
3240    let mut truncated = text[..end].trim_end().to_string();
3241    truncated.push('…');
3242    truncated
3243}
3244
3245fn analyze_image(payload: &[u8]) -> Option<ImageAnalysis> {
3246    let reader = ImageReader::new(Cursor::new(payload))
3247        .with_guessed_format()
3248        .map_err(|err| warn!("failed to guess image format: {err}"))
3249        .ok()?;
3250    let image = reader
3251        .decode()
3252        .map_err(|err| warn!("failed to decode image: {err}"))
3253        .ok()?;
3254    let width = image.width();
3255    let height = image.height();
3256    let rgb = image.to_rgb8();
3257    let palette = match get_palette(
3258        rgb.as_raw(),
3259        ColorFormat::Rgb,
3260        COLOR_PALETTE_SIZE,
3261        COLOR_PALETTE_QUALITY,
3262    ) {
3263        Ok(colors) => colors.into_iter().map(color_to_hex).collect(),
3264        Err(err) => {
3265            warn!("failed to compute colour palette: {err}");
3266            Vec::new()
3267        }
3268    };
3269
3270    let (exif, exif_caption) = extract_exif_metadata(payload);
3271
3272    Some(ImageAnalysis {
3273        width,
3274        height,
3275        palette,
3276        caption: exif_caption,
3277        exif,
3278    })
3279}
3280
3281fn color_to_hex(color: Color) -> String {
3282    format!("#{:02x}{:02x}{:02x}", color.r, color.g, color.b)
3283}
3284
3285fn analyze_audio(
3286    payload: &[u8],
3287    source_path: Option<&Path>,
3288    mime: &str,
3289    options: AudioAnalyzeOptions,
3290) -> Option<AudioAnalysis> {
3291    use symphonia::core::codecs::{CodecParameters, DecoderOptions, CODEC_TYPE_NULL};
3292    use symphonia::core::errors::Error as SymphoniaError;
3293    use symphonia::core::formats::FormatOptions;
3294    use symphonia::core::io::{MediaSourceStream, MediaSourceStreamOptions};
3295    use symphonia::core::meta::MetadataOptions;
3296    use symphonia::core::probe::Hint;
3297
3298    let cursor = Cursor::new(payload.to_vec());
3299    let mss = MediaSourceStream::new(Box::new(cursor), MediaSourceStreamOptions::default());
3300
3301    let mut hint = Hint::new();
3302    if let Some(path) = source_path {
3303        if let Some(ext) = path.extension().and_then(|ext| ext.to_str()) {
3304            hint.with_extension(ext);
3305        }
3306    }
3307    if let Some(suffix) = mime.split('/').nth(1) {
3308        hint.with_extension(suffix);
3309    }
3310
3311    let probed = symphonia::default::get_probe()
3312        .format(
3313            &hint,
3314            mss,
3315            &FormatOptions::default(),
3316            &MetadataOptions::default(),
3317        )
3318        .map_err(|err| warn!("failed to probe audio stream: {err}"))
3319        .ok()?;
3320
3321    let mut format = probed.format;
3322    let track = format.default_track()?;
3323    let track_id = track.id;
3324    let codec_params: CodecParameters = track.codec_params.clone();
3325    let sample_rate = codec_params.sample_rate;
3326    let channel_count = codec_params.channels.map(|channels| channels.count() as u8);
3327
3328    let mut decoder = symphonia::default::get_codecs()
3329        .make(&codec_params, &DecoderOptions::default())
3330        .map_err(|err| warn!("failed to create audio decoder: {err}"))
3331        .ok()?;
3332
3333    let mut decoded_frames: u64 = 0;
3334    loop {
3335        let packet = match format.next_packet() {
3336            Ok(packet) => packet,
3337            Err(SymphoniaError::IoError(_)) => break,
3338            Err(SymphoniaError::DecodeError(err)) => {
3339                warn!("skipping undecodable audio packet: {err}");
3340                continue;
3341            }
3342            Err(SymphoniaError::ResetRequired) => {
3343                decoder.reset();
3344                continue;
3345            }
3346            Err(other) => {
3347                warn!("stopping audio analysis due to error: {other}");
3348                break;
3349            }
3350        };
3351
3352        if packet.track_id() != track_id {
3353            continue;
3354        }
3355
3356        match decoder.decode(&packet) {
3357            Ok(audio_buf) => {
3358                decoded_frames = decoded_frames.saturating_add(audio_buf.frames() as u64);
3359            }
3360            Err(SymphoniaError::DecodeError(err)) => {
3361                warn!("failed to decode audio packet: {err}");
3362            }
3363            Err(SymphoniaError::IoError(_)) => break,
3364            Err(SymphoniaError::ResetRequired) => {
3365                decoder.reset();
3366            }
3367            Err(other) => {
3368                warn!("decoder error: {other}");
3369                break;
3370            }
3371        }
3372    }
3373
3374    let duration_secs = match sample_rate {
3375        Some(rate) if rate > 0 => decoded_frames as f64 / rate as f64,
3376        _ => 0.0,
3377    };
3378
3379    let bitrate_kbps = if duration_secs > 0.0 {
3380        Some(((payload.len() as f64 * 8.0) / duration_secs / 1_000.0).round() as u32)
3381    } else {
3382        None
3383    };
3384
3385    let tags = extract_lofty_tags(payload);
3386    let caption = tags
3387        .get("title")
3388        .cloned()
3389        .or_else(|| tags.get("tracktitle").cloned());
3390
3391    let mut search_terms = Vec::new();
3392    if let Some(title) = tags.get("title").or_else(|| tags.get("tracktitle")) {
3393        search_terms.push(title.clone());
3394    }
3395    if let Some(artist) = tags.get("artist") {
3396        search_terms.push(artist.clone());
3397    }
3398    if let Some(album) = tags.get("album") {
3399        search_terms.push(album.clone());
3400    }
3401    if let Some(genre) = tags.get("genre") {
3402        search_terms.push(genre.clone());
3403    }
3404
3405    let mut segments = Vec::new();
3406    if duration_secs > 0.0 {
3407        let segment_len = options.normalised_segment_secs() as f64;
3408        if segment_len > 0.0 {
3409            let mut start = 0.0;
3410            let mut idx = 1usize;
3411            while start < duration_secs {
3412                let end = (start + segment_len).min(duration_secs);
3413                segments.push(AudioSegmentMetadata {
3414                    start_seconds: start as f32,
3415                    end_seconds: end as f32,
3416                    label: Some(format!("Segment {}", idx)),
3417                });
3418                if end >= duration_secs {
3419                    break;
3420                }
3421                start = end;
3422                idx += 1;
3423            }
3424        }
3425    }
3426
3427    let mut metadata = DocAudioMetadata::default();
3428    if duration_secs > 0.0 {
3429        metadata.duration_secs = Some(duration_secs as f32);
3430    }
3431    if let Some(rate) = sample_rate {
3432        metadata.sample_rate_hz = Some(rate);
3433    }
3434    if let Some(channels) = channel_count {
3435        metadata.channels = Some(channels);
3436    }
3437    if let Some(bitrate) = bitrate_kbps {
3438        metadata.bitrate_kbps = Some(bitrate);
3439    }
3440    if codec_params.codec != CODEC_TYPE_NULL {
3441        metadata.codec = Some(format!("{:?}", codec_params.codec));
3442    }
3443    if !segments.is_empty() {
3444        metadata.segments = segments;
3445    }
3446    if !tags.is_empty() {
3447        metadata.tags = tags
3448            .iter()
3449            .map(|(k, v)| (k.clone(), v.clone()))
3450            .collect::<BTreeMap<_, _>>();
3451    }
3452
3453    // Transcribe audio if requested
3454    let transcript = if options.transcribe {
3455        transcribe_audio(source_path)
3456    } else {
3457        None
3458    };
3459
3460    Some(AudioAnalysis {
3461        metadata,
3462        caption,
3463        search_terms,
3464        transcript,
3465    })
3466}
3467
3468/// Transcribe audio file using Whisper
3469#[cfg(feature = "whisper")]
3470fn transcribe_audio(source_path: Option<&Path>) -> Option<String> {
3471    use memvid_core::{WhisperConfig, WhisperTranscriber};
3472
3473    let path = source_path?;
3474
3475    tracing::info!(path = %path.display(), "Transcribing audio with Whisper");
3476
3477    let config = WhisperConfig::default();
3478    let mut transcriber = match WhisperTranscriber::new(&config) {
3479        Ok(t) => t,
3480        Err(e) => {
3481            tracing::warn!(error = %e, "Failed to initialize Whisper transcriber");
3482            return None;
3483        }
3484    };
3485
3486    match transcriber.transcribe_file(path) {
3487        Ok(result) => {
3488            tracing::info!(
3489                duration = result.duration_secs,
3490                text_len = result.text.len(),
3491                "Audio transcription complete"
3492            );
3493            Some(result.text)
3494        }
3495        Err(e) => {
3496            tracing::warn!(error = %e, "Failed to transcribe audio");
3497            None
3498        }
3499    }
3500}
3501
3502#[cfg(not(feature = "whisper"))]
3503fn transcribe_audio(_source_path: Option<&Path>) -> Option<String> {
3504    tracing::warn!(
3505        "Whisper feature not enabled. Rebuild with --features whisper to enable transcription."
3506    );
3507    None
3508}
3509
3510fn extract_lofty_tags(payload: &[u8]) -> HashMap<String, String> {
3511    use lofty::{ItemKey, Probe as LoftyProbe, Tag, TaggedFileExt};
3512
3513    fn collect_tag(tag: &Tag, out: &mut HashMap<String, String>) {
3514        if let Some(value) = tag.get_string(&ItemKey::TrackTitle) {
3515            out.entry("title".into())
3516                .or_insert_with(|| value.to_string());
3517            out.entry("tracktitle".into())
3518                .or_insert_with(|| value.to_string());
3519        }
3520        if let Some(value) = tag.get_string(&ItemKey::TrackArtist) {
3521            out.entry("artist".into())
3522                .or_insert_with(|| value.to_string());
3523        } else if let Some(value) = tag.get_string(&ItemKey::AlbumArtist) {
3524            out.entry("artist".into())
3525                .or_insert_with(|| value.to_string());
3526        }
3527        if let Some(value) = tag.get_string(&ItemKey::AlbumTitle) {
3528            out.entry("album".into())
3529                .or_insert_with(|| value.to_string());
3530        }
3531        if let Some(value) = tag.get_string(&ItemKey::Genre) {
3532            out.entry("genre".into())
3533                .or_insert_with(|| value.to_string());
3534        }
3535        if let Some(value) = tag.get_string(&ItemKey::TrackNumber) {
3536            out.entry("track_number".into())
3537                .or_insert_with(|| value.to_string());
3538        }
3539        if let Some(value) = tag.get_string(&ItemKey::DiscNumber) {
3540            out.entry("disc_number".into())
3541                .or_insert_with(|| value.to_string());
3542        }
3543    }
3544
3545    let mut tags = HashMap::new();
3546    let probe = match LoftyProbe::new(Cursor::new(payload)).guess_file_type() {
3547        Ok(probe) => probe,
3548        Err(err) => {
3549            warn!("failed to guess audio tag file type: {err}");
3550            return tags;
3551        }
3552    };
3553
3554    let tagged_file = match probe.read() {
3555        Ok(file) => file,
3556        Err(err) => {
3557            warn!("failed to read audio tags: {err}");
3558            return tags;
3559        }
3560    };
3561
3562    if let Some(primary) = tagged_file.primary_tag() {
3563        collect_tag(primary, &mut tags);
3564    }
3565    for tag in tagged_file.tags() {
3566        collect_tag(tag, &mut tags);
3567    }
3568
3569    tags
3570}
3571
3572fn extract_exif_metadata(payload: &[u8]) -> (Option<DocExifMetadata>, Option<String>) {
3573    let mut cursor = Cursor::new(payload);
3574    let exif = match ExifReader::new().read_from_container(&mut cursor) {
3575        Ok(exif) => exif,
3576        Err(err) => {
3577            warn!("failed to read EXIF metadata: {err}");
3578            return (None, None);
3579        }
3580    };
3581
3582    let mut doc = DocExifMetadata::default();
3583    let mut has_data = false;
3584
3585    if let Some(make) = exif_string(&exif, Tag::Make) {
3586        doc.make = Some(make);
3587        has_data = true;
3588    }
3589    if let Some(model) = exif_string(&exif, Tag::Model) {
3590        doc.model = Some(model);
3591        has_data = true;
3592    }
3593    if let Some(lens) =
3594        exif_string(&exif, Tag::LensModel).or_else(|| exif_string(&exif, Tag::LensMake))
3595    {
3596        doc.lens = Some(lens);
3597        has_data = true;
3598    }
3599    if let Some(dt) =
3600        exif_string(&exif, Tag::DateTimeOriginal).or_else(|| exif_string(&exif, Tag::DateTime))
3601    {
3602        doc.datetime = Some(dt);
3603        has_data = true;
3604    }
3605
3606    if let Some(gps) = extract_gps_metadata(&exif) {
3607        doc.gps = Some(gps);
3608        has_data = true;
3609    }
3610
3611    let caption = exif_string(&exif, Tag::ImageDescription);
3612
3613    let metadata = if has_data { Some(doc) } else { None };
3614    (metadata, caption)
3615}
3616
3617fn exif_string(exif: &exif::Exif, tag: Tag) -> Option<String> {
3618    exif.fields()
3619        .find(|field| field.tag == tag)
3620        .and_then(|field| field_to_string(field, exif))
3621}
3622
3623fn field_to_string(field: &exif::Field, exif: &exif::Exif) -> Option<String> {
3624    let value = field.display_value().with_unit(exif).to_string();
3625    let trimmed = value.trim_matches('\0').trim();
3626    if trimmed.is_empty() {
3627        None
3628    } else {
3629        Some(trimmed.to_string())
3630    }
3631}
3632
3633fn extract_gps_metadata(exif: &exif::Exif) -> Option<DocGpsMetadata> {
3634    let latitude_field = exif.fields().find(|field| field.tag == Tag::GPSLatitude)?;
3635    let latitude_ref_field = exif
3636        .fields()
3637        .find(|field| field.tag == Tag::GPSLatitudeRef)?;
3638    let longitude_field = exif.fields().find(|field| field.tag == Tag::GPSLongitude)?;
3639    let longitude_ref_field = exif
3640        .fields()
3641        .find(|field| field.tag == Tag::GPSLongitudeRef)?;
3642
3643    let mut latitude = gps_value_to_degrees(&latitude_field.value)?;
3644    let mut longitude = gps_value_to_degrees(&longitude_field.value)?;
3645
3646    if let Some(reference) = value_to_ascii(&latitude_ref_field.value) {
3647        if reference.eq_ignore_ascii_case("S") {
3648            latitude = -latitude;
3649        }
3650    }
3651    if let Some(reference) = value_to_ascii(&longitude_ref_field.value) {
3652        if reference.eq_ignore_ascii_case("W") {
3653            longitude = -longitude;
3654        }
3655    }
3656
3657    Some(DocGpsMetadata {
3658        latitude,
3659        longitude,
3660    })
3661}
3662
3663fn gps_value_to_degrees(value: &ExifValue) -> Option<f64> {
3664    match value {
3665        ExifValue::Rational(values) if !values.is_empty() => {
3666            let deg = rational_to_f64_u(values.get(0)?)?;
3667            let min = values
3668                .get(1)
3669                .and_then(|v| rational_to_f64_u(v))
3670                .unwrap_or(0.0);
3671            let sec = values
3672                .get(2)
3673                .and_then(|v| rational_to_f64_u(v))
3674                .unwrap_or(0.0);
3675            Some(deg + (min / 60.0) + (sec / 3600.0))
3676        }
3677        ExifValue::SRational(values) if !values.is_empty() => {
3678            let deg = rational_to_f64_i(values.get(0)?)?;
3679            let min = values
3680                .get(1)
3681                .and_then(|v| rational_to_f64_i(v))
3682                .unwrap_or(0.0);
3683            let sec = values
3684                .get(2)
3685                .and_then(|v| rational_to_f64_i(v))
3686                .unwrap_or(0.0);
3687            Some(deg + (min / 60.0) + (sec / 3600.0))
3688        }
3689        _ => None,
3690    }
3691}
3692
3693fn rational_to_f64_u(value: &exif::Rational) -> Option<f64> {
3694    if value.denom == 0 {
3695        None
3696    } else {
3697        Some(value.num as f64 / value.denom as f64)
3698    }
3699}
3700
3701fn rational_to_f64_i(value: &exif::SRational) -> Option<f64> {
3702    if value.denom == 0 {
3703        None
3704    } else {
3705        Some(value.num as f64 / value.denom as f64)
3706    }
3707}
3708
3709fn value_to_ascii(value: &ExifValue) -> Option<String> {
3710    if let ExifValue::Ascii(values) = value {
3711        values
3712            .first()
3713            .and_then(|bytes| std::str::from_utf8(bytes).ok())
3714            .map(|s| s.trim_matches('\0').trim().to_string())
3715            .filter(|s| !s.is_empty())
3716    } else {
3717        None
3718    }
3719}
3720
3721fn ingest_payload(
3722    mem: &mut Memvid,
3723    args: &PutArgs,
3724    config: &CliConfig,
3725    payload: Vec<u8>,
3726    source_path: Option<&Path>,
3727    capacity_guard: Option<&mut CapacityGuard>,
3728    enable_embedding: bool,
3729    runtime: Option<&EmbeddingRuntime>,
3730    parallel_mode: bool,
3731) -> Result<IngestOutcome> {
3732    let original_bytes = payload.len();
3733    let (stored_bytes, compressed) = canonical_storage_len(&payload)?;
3734
3735    // Check if adding this payload would exceed free tier limit
3736    // Require API key for operations that would exceed 1GB total
3737    let current_size = mem.stats().map(|s| s.size_bytes).unwrap_or(0);
3738    crate::utils::ensure_capacity_with_api_key(current_size, stored_bytes as u64, config)?;
3739
3740    let payload_text = std::str::from_utf8(&payload).ok();
3741    let inferred_title = derive_title(args.title.clone(), source_path, payload_text);
3742
3743    let audio_opts = AudioAnalyzeOptions {
3744        force: args.audio || args.transcribe,
3745        segment_secs: args
3746            .audio_segment_seconds
3747            .unwrap_or(AudioAnalyzeOptions::DEFAULT_SEGMENT_SECS),
3748        transcribe: args.transcribe,
3749    };
3750
3751    let mut analysis = analyze_file(
3752        source_path,
3753        &payload,
3754        payload_text,
3755        inferred_title.as_deref(),
3756        audio_opts,
3757        args.video,
3758    );
3759
3760    if args.video && !analysis.mime.starts_with("video/") {
3761        anyhow::bail!(
3762            "--video requires a video input; detected MIME type {}",
3763            analysis.mime
3764        );
3765    }
3766
3767    let mut search_text = analysis.search_text.clone();
3768    if let Some(ref mut text) = search_text {
3769        if text.len() > MAX_SEARCH_TEXT_LEN {
3770            let truncated = truncate_to_boundary(text, MAX_SEARCH_TEXT_LEN);
3771            *text = truncated;
3772        }
3773    }
3774    analysis.search_text = search_text.clone();
3775
3776    let uri = if let Some(ref explicit) = args.uri {
3777        derive_uri(Some(explicit), None)
3778    } else if args.video {
3779        derive_video_uri(&payload, source_path, &analysis.mime)
3780    } else {
3781        derive_uri(None, source_path)
3782    };
3783
3784    let mut options_builder = PutOptions::builder()
3785        .enable_embedding(enable_embedding)
3786        .auto_tag(!args.no_auto_tag)
3787        .extract_dates(!args.no_extract_dates)
3788        .extract_triplets(!args.no_extract_triplets);
3789    if let Some(ts) = args.timestamp {
3790        options_builder = options_builder.timestamp(ts);
3791    }
3792    if let Some(track) = &args.track {
3793        options_builder = options_builder.track(track.clone());
3794    }
3795    if args.video {
3796        options_builder = options_builder.kind("video");
3797        options_builder = options_builder.tag("kind", "video");
3798        options_builder = options_builder.push_tag("video");
3799    } else if let Some(kind) = &args.kind {
3800        options_builder = options_builder.kind(kind.clone());
3801    }
3802    options_builder = options_builder.uri(uri.clone());
3803    if let Some(ref title) = inferred_title {
3804        options_builder = options_builder.title(title.clone());
3805    }
3806    for (key, value) in &args.tags {
3807        options_builder = options_builder.tag(key.clone(), value.clone());
3808        if !key.is_empty() {
3809            options_builder = options_builder.push_tag(key.clone());
3810        }
3811        if !value.is_empty() && value != key {
3812            options_builder = options_builder.push_tag(value.clone());
3813        }
3814    }
3815    for label in &args.labels {
3816        options_builder = options_builder.label(label.clone());
3817    }
3818    if let Some(metadata) = analysis.metadata.clone() {
3819        if !metadata.is_empty() {
3820            options_builder = options_builder.metadata(metadata);
3821        }
3822    }
3823    for (idx, entry) in args.metadata.iter().enumerate() {
3824        match serde_json::from_str::<DocMetadata>(entry) {
3825            Ok(meta) => {
3826                options_builder = options_builder.metadata(meta);
3827            }
3828            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
3829                Ok(value) => {
3830                    options_builder =
3831                        options_builder.metadata_entry(format!("custom_metadata_{}", idx), value);
3832                }
3833                Err(err) => {
3834                    warn!("failed to parse --metadata JSON: {err}");
3835                }
3836            },
3837        }
3838    }
3839    if let Some(text) = search_text.clone() {
3840        if !text.trim().is_empty() {
3841            options_builder = options_builder.search_text(text);
3842        }
3843    }
3844    // Default: no-raw mode (only store extracted text + BLAKE3 hash)
3845    // Use --raw to store raw binary content for later retrieval
3846    let effective_no_raw = !args.raw; // true by default unless --raw is passed
3847    if effective_no_raw {
3848        options_builder = options_builder.no_raw(true);
3849        if let Some(path) = source_path {
3850            options_builder = options_builder.source_path(path.display().to_string());
3851        }
3852    }
3853    // --dedup: skip if identical content already exists
3854    if args.dedup {
3855        options_builder = options_builder.dedup(true);
3856    }
3857    let mut options = options_builder.build();
3858
3859    // Set extraction budget for progressive ingestion
3860    if let Some(budget_ms) = args.extraction_budget_ms {
3861        options.extraction_budget_ms = budget_ms;
3862    }
3863
3864    let existing_frame = if args.update_existing || !args.allow_duplicate {
3865        match mem.frame_by_uri(&uri) {
3866            Ok(frame) => Some(frame),
3867            Err(MemvidError::FrameNotFoundByUri { .. }) => None,
3868            Err(err) => return Err(err.into()),
3869        }
3870    } else {
3871        None
3872    };
3873
3874    // Compute frame_id: for updates it's the existing frame's id,
3875    // for new frames it's the current frame count (which becomes the next id)
3876    let frame_id = if let Some(ref existing) = existing_frame {
3877        existing.id
3878    } else {
3879        mem.stats()?.frame_count
3880    };
3881
3882    let mut embedded = false;
3883    let allow_embedding = enable_embedding && !args.video;
3884    if enable_embedding && !allow_embedding && args.video {
3885        warn!("semantic embeddings are not generated for video payloads");
3886    }
3887    let seq = if let Some(existing) = existing_frame {
3888        if args.update_existing {
3889            let payload_for_update = payload.clone();
3890            if allow_embedding {
3891                if let Some(path) = args.embedding_vec.as_deref() {
3892                    let embedding = crate::utils::read_embedding(path)?;
3893                    if let Some(model_str) = args.embedding_vec_model.as_deref() {
3894                        let choice = model_str.parse::<EmbeddingModelChoice>()?;
3895                        let expected = choice.dimensions();
3896                        if expected != 0 && embedding.len() != expected {
3897                            anyhow::bail!(
3898                                "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
3899                                embedding.len(),
3900                                choice.name(),
3901                                expected
3902                            );
3903                        }
3904                        apply_embedding_identity_metadata_from_choice(
3905                            &mut options,
3906                            choice,
3907                            embedding.len(),
3908                            Some(model_str),
3909                        );
3910                    } else {
3911                        options.extra_metadata.insert(
3912                            MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
3913                            embedding.len().to_string(),
3914                        );
3915                    }
3916                    embedded = true;
3917                    mem.update_frame(
3918                        existing.id,
3919                        Some(payload_for_update),
3920                        options.clone(),
3921                        Some(embedding),
3922                    )
3923                    .map_err(|err| {
3924                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3925                    })?
3926                } else {
3927                    let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
3928                    let embed_text = search_text
3929                        .clone()
3930                        .or_else(|| payload_text.map(|text| text.to_string()))
3931                        .unwrap_or_default();
3932                    if embed_text.trim().is_empty() {
3933                        warn!("no textual content available; embedding skipped");
3934                        mem.update_frame(
3935                            existing.id,
3936                            Some(payload_for_update),
3937                            options.clone(),
3938                            None,
3939                        )
3940                        .map_err(|err| {
3941                            map_put_error(
3942                                err,
3943                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3944                            )
3945                        })?
3946                    } else {
3947                        // Truncate to avoid token limits
3948                        let truncated_text = truncate_for_embedding(&embed_text);
3949                        if truncated_text.len() < embed_text.len() {
3950                            info!(
3951                                "Truncated text from {} to {} chars for embedding",
3952                                embed_text.len(),
3953                                truncated_text.len()
3954                            );
3955                        }
3956                        let embedding = runtime.embed_passage(&truncated_text)?;
3957                        embedded = true;
3958                        apply_embedding_identity_metadata(&mut options, runtime);
3959                        mem.update_frame(
3960                            existing.id,
3961                            Some(payload_for_update),
3962                            options.clone(),
3963                            Some(embedding),
3964                        )
3965                        .map_err(|err| {
3966                            map_put_error(
3967                                err,
3968                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
3969                            )
3970                        })?
3971                    }
3972                }
3973            } else {
3974                mem.update_frame(existing.id, Some(payload_for_update), options.clone(), None)
3975                    .map_err(|err| {
3976                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
3977                    })?
3978            }
3979        } else {
3980            return Err(DuplicateUriError::new(uri.as_str()).into());
3981        }
3982    } else {
3983        if let Some(guard) = capacity_guard.as_ref() {
3984            guard.ensure_capacity(stored_bytes as u64)?;
3985        }
3986        if parallel_mode {
3987            #[cfg(feature = "parallel_segments")]
3988            {
3989                let mut parent_embedding = None;
3990                let mut chunk_embeddings_vec = None;
3991                if allow_embedding {
3992                    if let Some(path) = args.embedding_vec.as_deref() {
3993                        let embedding = crate::utils::read_embedding(path)?;
3994                        if let Some(model_str) = args.embedding_vec_model.as_deref() {
3995                            let choice = model_str.parse::<EmbeddingModelChoice>()?;
3996                            let expected = choice.dimensions();
3997                            if expected != 0 && embedding.len() != expected {
3998                                anyhow::bail!(
3999                                    "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4000                                    embedding.len(),
4001                                    choice.name(),
4002                                    expected
4003                                );
4004                            }
4005                            apply_embedding_identity_metadata_from_choice(
4006                                &mut options,
4007                                choice,
4008                                embedding.len(),
4009                                Some(model_str),
4010                            );
4011                        } else {
4012                            options.extra_metadata.insert(
4013                                MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4014                                embedding.len().to_string(),
4015                            );
4016                        }
4017                        parent_embedding = Some(embedding);
4018                        embedded = true;
4019                    } else {
4020                        let runtime =
4021                            runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4022                        let embed_text = search_text
4023                            .clone()
4024                            .or_else(|| payload_text.map(|text| text.to_string()))
4025                            .unwrap_or_default();
4026                        if embed_text.trim().is_empty() {
4027                            warn!("no textual content available; embedding skipped");
4028                        } else {
4029                            // Check if document will be chunked - if so, embed each chunk
4030                            info!(
4031                                "parallel mode: checking for chunks on {} bytes payload",
4032                                payload.len()
4033                            );
4034                            if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4035                                // Document will be chunked - embed each chunk for full semantic coverage
4036                                info!("parallel mode: Document will be chunked into {} chunks, generating embeddings for each", chunk_texts.len());
4037
4038                                // Apply temporal enrichment if enabled (before contextual)
4039                                #[cfg(feature = "temporal_enrich")]
4040                                let enriched_chunks = if args.temporal_enrich {
4041                                    info!(
4042                                        "Temporal enrichment enabled, processing {} chunks",
4043                                        chunk_texts.len()
4044                                    );
4045                                    // Use today's date as fallback document date
4046                                    let today = chrono::Local::now().date_naive();
4047                                    let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4048                                    // Results are tuples of (enriched_text, TemporalEnrichment)
4049                                    let enriched: Vec<String> =
4050                                        results.iter().map(|(text, _)| text.clone()).collect();
4051                                    let resolved_count = results
4052                                        .iter()
4053                                        .filter(|(_, e)| {
4054                                            e.relative_phrases.iter().any(|p| p.resolved.is_some())
4055                                        })
4056                                        .count();
4057                                    info!(
4058                                    "Temporal enrichment: resolved {} chunks with temporal context",
4059                                    resolved_count
4060                                );
4061                                    enriched
4062                                } else {
4063                                    chunk_texts.clone()
4064                                };
4065                                #[cfg(not(feature = "temporal_enrich"))]
4066                                let enriched_chunks = {
4067                                    if args.temporal_enrich {
4068                                        warn!(
4069                                        "Temporal enrichment requested but feature not compiled in"
4070                                    );
4071                                    }
4072                                    chunk_texts.clone()
4073                                };
4074
4075                                // Apply contextual retrieval if enabled
4076                                let embed_chunks = if args.contextual {
4077                                    info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4078                                    let engine = if args.contextual_model.as_deref()
4079                                        == Some("local")
4080                                    {
4081                                        #[cfg(feature = "llama-cpp")]
4082                                        {
4083                                            let model_path = get_local_contextual_model_path()?;
4084                                            ContextualEngine::local(model_path)
4085                                        }
4086                                        #[cfg(not(feature = "llama-cpp"))]
4087                                        {
4088                                            anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4089                                        }
4090                                    } else if let Some(model) = &args.contextual_model {
4091                                        ContextualEngine::openai_with_model(model)?
4092                                    } else {
4093                                        ContextualEngine::openai()?
4094                                    };
4095
4096                                    match engine
4097                                        .generate_contexts_batch(&embed_text, &enriched_chunks)
4098                                    {
4099                                        Ok(contexts) => {
4100                                            info!(
4101                                                "Generated {} contextual prefixes",
4102                                                contexts.len()
4103                                            );
4104                                            apply_contextual_prefixes(
4105                                                &embed_text,
4106                                                &enriched_chunks,
4107                                                &contexts,
4108                                            )
4109                                        }
4110                                        Err(e) => {
4111                                            warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4112                                            enriched_chunks.clone()
4113                                        }
4114                                    }
4115                                } else {
4116                                    enriched_chunks
4117                                };
4118
4119                                let truncated_chunks: Vec<String> = embed_chunks
4120                                .iter()
4121                                .map(|chunk_text| {
4122                                    // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4123                                    let truncated_chunk = truncate_for_embedding(chunk_text);
4124                                    if truncated_chunk.len() < chunk_text.len() {
4125                                        info!(
4126                                            "parallel mode: Truncated chunk from {} to {} chars for embedding",
4127                                            chunk_text.len(),
4128                                            truncated_chunk.len()
4129                                        );
4130                                    }
4131                                    truncated_chunk
4132                                })
4133                                .collect();
4134                                let truncated_refs: Vec<&str> = truncated_chunks
4135                                    .iter()
4136                                    .map(|chunk| chunk.as_str())
4137                                    .collect();
4138                                let chunk_embeddings =
4139                                    runtime.embed_batch_passages(&truncated_refs)?;
4140                                let num_chunks = chunk_embeddings.len();
4141                                chunk_embeddings_vec = Some(chunk_embeddings);
4142                                // Also embed the parent for the parent frame (truncate to avoid token limits)
4143                                let parent_text = truncate_for_embedding(&embed_text);
4144                                if parent_text.len() < embed_text.len() {
4145                                    info!("parallel mode: Truncated parent text from {} to {} chars for embedding", embed_text.len(), parent_text.len());
4146                                }
4147                                parent_embedding = Some(runtime.embed_passage(&parent_text)?);
4148                                embedded = true;
4149                                info!(
4150                                "parallel mode: Generated {} chunk embeddings + 1 parent embedding",
4151                                num_chunks
4152                            );
4153                            } else {
4154                                // No chunking - just embed the whole document (truncate to avoid token limits)
4155                                info!("parallel mode: No chunking (payload < 2400 chars or not UTF-8), using single embedding");
4156                                let truncated_text = truncate_for_embedding(&embed_text);
4157                                if truncated_text.len() < embed_text.len() {
4158                                    info!("parallel mode: Truncated text from {} to {} chars for embedding", embed_text.len(), truncated_text.len());
4159                                }
4160                                parent_embedding = Some(runtime.embed_passage(&truncated_text)?);
4161                                embedded = true;
4162                            }
4163                        }
4164                    }
4165                }
4166                if embedded {
4167                    if let Some(runtime) = runtime {
4168                        apply_embedding_identity_metadata(&mut options, runtime);
4169                    }
4170                }
4171                let payload_variant = if let Some(path) = source_path {
4172                    ParallelPayload::Path(path.to_path_buf())
4173                } else {
4174                    ParallelPayload::Bytes(payload)
4175                };
4176                let input = ParallelInput {
4177                    payload: payload_variant,
4178                    options: options.clone(),
4179                    embedding: parent_embedding,
4180                    chunk_embeddings: chunk_embeddings_vec,
4181                };
4182                return Ok(IngestOutcome {
4183                    report: PutReport {
4184                        seq: 0,
4185                        frame_id: 0, // Will be populated after parallel commit
4186                        uri,
4187                        title: inferred_title,
4188                        original_bytes,
4189                        stored_bytes,
4190                        compressed,
4191                        no_raw: effective_no_raw,
4192                        source: source_path.map(Path::to_path_buf),
4193                        mime: Some(analysis.mime),
4194                        metadata: analysis.metadata,
4195                        extraction: analysis.extraction,
4196                        #[cfg(feature = "logic_mesh")]
4197                        search_text: search_text.clone(),
4198                    },
4199                    embedded,
4200                    parallel_input: Some(input),
4201                });
4202            }
4203            #[cfg(not(feature = "parallel_segments"))]
4204            {
4205                mem.put_bytes_with_options(&payload, options)
4206                    .map_err(|err| {
4207                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4208                    })?
4209            }
4210        } else if allow_embedding {
4211            if let Some(path) = args.embedding_vec.as_deref() {
4212                let embedding = crate::utils::read_embedding(path)?;
4213                if let Some(model_str) = args.embedding_vec_model.as_deref() {
4214                    let choice = model_str.parse::<EmbeddingModelChoice>()?;
4215                    let expected = choice.dimensions();
4216                    if expected != 0 && embedding.len() != expected {
4217                        anyhow::bail!(
4218                            "pre-computed embedding has {} dimensions but --embedding-vec-model {} expects {}",
4219                            embedding.len(),
4220                            choice.name(),
4221                            expected
4222                        );
4223                    }
4224                    apply_embedding_identity_metadata_from_choice(
4225                        &mut options,
4226                        choice,
4227                        embedding.len(),
4228                        Some(model_str),
4229                    );
4230                } else {
4231                    options.extra_metadata.insert(
4232                        MEMVID_EMBEDDING_DIMENSION_KEY.to_string(),
4233                        embedding.len().to_string(),
4234                    );
4235                }
4236                embedded = true;
4237                mem.put_with_embedding_and_options(&payload, embedding, options)
4238                    .map_err(|err| {
4239                        map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4240                    })?
4241            } else {
4242                let runtime = runtime.ok_or_else(|| anyhow!("semantic runtime unavailable"))?;
4243                let embed_text = search_text
4244                    .clone()
4245                    .or_else(|| payload_text.map(|text| text.to_string()))
4246                    .unwrap_or_default();
4247                if embed_text.trim().is_empty() {
4248                    warn!("no textual content available; embedding skipped");
4249                    mem.put_bytes_with_options(&payload, options)
4250                        .map_err(|err| {
4251                            map_put_error(
4252                                err,
4253                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4254                            )
4255                        })?
4256                } else {
4257                    // Check if document will be chunked - if so, embed each chunk
4258                    if let Some(chunk_texts) = mem.preview_chunks(&payload) {
4259                        // Document will be chunked - embed each chunk for full semantic coverage
4260                        info!(
4261                        "Document will be chunked into {} chunks, generating embeddings for each",
4262                        chunk_texts.len()
4263                    );
4264
4265                        // Apply temporal enrichment if enabled (before contextual)
4266                        #[cfg(feature = "temporal_enrich")]
4267                        let enriched_chunks = if args.temporal_enrich {
4268                            info!(
4269                                "Temporal enrichment enabled, processing {} chunks",
4270                                chunk_texts.len()
4271                            );
4272                            // Use today's date as fallback document date
4273                            let today = chrono::Local::now().date_naive();
4274                            let results = temporal_enrich_chunks(&chunk_texts, Some(today));
4275                            // Results are tuples of (enriched_text, TemporalEnrichment)
4276                            let enriched: Vec<String> =
4277                                results.iter().map(|(text, _)| text.clone()).collect();
4278                            let resolved_count = results
4279                                .iter()
4280                                .filter(|(_, e)| {
4281                                    e.relative_phrases.iter().any(|p| p.resolved.is_some())
4282                                })
4283                                .count();
4284                            info!(
4285                                "Temporal enrichment: resolved {} chunks with temporal context",
4286                                resolved_count
4287                            );
4288                            enriched
4289                        } else {
4290                            chunk_texts.clone()
4291                        };
4292                        #[cfg(not(feature = "temporal_enrich"))]
4293                        let enriched_chunks = {
4294                            if args.temporal_enrich {
4295                                warn!("Temporal enrichment requested but feature not compiled in");
4296                            }
4297                            chunk_texts.clone()
4298                        };
4299
4300                        // Apply contextual retrieval if enabled
4301                        let embed_chunks = if args.contextual {
4302                            info!("Contextual retrieval enabled, generating context prefixes for {} chunks", enriched_chunks.len());
4303                            let engine = if args.contextual_model.as_deref() == Some("local") {
4304                                #[cfg(feature = "llama-cpp")]
4305                                {
4306                                    let model_path = get_local_contextual_model_path()?;
4307                                    ContextualEngine::local(model_path)
4308                                }
4309                                #[cfg(not(feature = "llama-cpp"))]
4310                                {
4311                                    anyhow::bail!("Local contextual model requires the 'llama-cpp' feature. Use --contextual-model openai or omit the flag for OpenAI.");
4312                                }
4313                            } else if let Some(model) = &args.contextual_model {
4314                                ContextualEngine::openai_with_model(model)?
4315                            } else {
4316                                ContextualEngine::openai()?
4317                            };
4318
4319                            match engine.generate_contexts_batch(&embed_text, &enriched_chunks) {
4320                                Ok(contexts) => {
4321                                    info!("Generated {} contextual prefixes", contexts.len());
4322                                    apply_contextual_prefixes(
4323                                        &embed_text,
4324                                        &enriched_chunks,
4325                                        &contexts,
4326                                    )
4327                                }
4328                                Err(e) => {
4329                                    warn!("Failed to generate contextual prefixes: {}. Using original chunks.", e);
4330                                    enriched_chunks.clone()
4331                                }
4332                            }
4333                        } else {
4334                            enriched_chunks
4335                        };
4336
4337                        let truncated_chunks: Vec<String> = embed_chunks
4338                            .iter()
4339                            .map(|chunk_text| {
4340                                // Truncate chunk to avoid provider token limit issues (contextual prefixes can make chunks large)
4341                                let truncated_chunk = truncate_for_embedding(chunk_text);
4342                                if truncated_chunk.len() < chunk_text.len() {
4343                                    info!(
4344                                        "Truncated chunk from {} to {} chars for embedding",
4345                                        chunk_text.len(),
4346                                        truncated_chunk.len()
4347                                    );
4348                                }
4349                                truncated_chunk
4350                            })
4351                            .collect();
4352                        let truncated_refs: Vec<&str> = truncated_chunks
4353                            .iter()
4354                            .map(|chunk| chunk.as_str())
4355                            .collect();
4356                        let chunk_embeddings = runtime.embed_batch_passages(&truncated_refs)?;
4357                        // Truncate parent text to avoid token limits
4358                        let parent_text = truncate_for_embedding(&embed_text);
4359                        if parent_text.len() < embed_text.len() {
4360                            info!(
4361                                "Truncated parent text from {} to {} chars for embedding",
4362                                embed_text.len(),
4363                                parent_text.len()
4364                            );
4365                        }
4366                        let parent_embedding = runtime.embed_passage(&parent_text)?;
4367                        embedded = true;
4368                        apply_embedding_identity_metadata(&mut options, runtime);
4369                        info!(
4370                            "Calling put_with_chunk_embeddings with {} chunk embeddings",
4371                            chunk_embeddings.len()
4372                        );
4373                        mem.put_with_chunk_embeddings(
4374                            &payload,
4375                            Some(parent_embedding),
4376                            chunk_embeddings,
4377                            options,
4378                        )
4379                        .map_err(|err| {
4380                            map_put_error(
4381                                err,
4382                                capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4383                            )
4384                        })?
4385                    } else {
4386                        // No chunking - just embed the whole document (truncate to avoid token limits)
4387                        info!("Document too small for chunking (< 2400 chars after normalization), using single embedding");
4388                        let truncated_text = truncate_for_embedding(&embed_text);
4389                        if truncated_text.len() < embed_text.len() {
4390                            info!(
4391                                "Truncated text from {} to {} chars for embedding",
4392                                embed_text.len(),
4393                                truncated_text.len()
4394                            );
4395                        }
4396                        let embedding = runtime.embed_passage(&truncated_text)?;
4397                        embedded = true;
4398                        apply_embedding_identity_metadata(&mut options, runtime);
4399                        mem.put_with_embedding_and_options(&payload, embedding, options)
4400                            .map_err(|err| {
4401                                map_put_error(
4402                                    err,
4403                                    capacity_guard.as_ref().and_then(|g| g.capacity_hint()),
4404                                )
4405                            })?
4406                    }
4407                }
4408            }
4409        } else {
4410            mem.put_bytes_with_options(&payload, options)
4411                .map_err(|err| {
4412                    map_put_error(err, capacity_guard.as_ref().and_then(|g| g.capacity_hint()))
4413                })?
4414        }
4415    };
4416
4417    Ok(IngestOutcome {
4418        report: PutReport {
4419            seq,
4420            frame_id,
4421            uri,
4422            title: inferred_title,
4423            original_bytes,
4424            stored_bytes,
4425            compressed,
4426            no_raw: effective_no_raw,
4427            source: source_path.map(Path::to_path_buf),
4428            mime: Some(analysis.mime),
4429            metadata: analysis.metadata,
4430            extraction: analysis.extraction,
4431            #[cfg(feature = "logic_mesh")]
4432            search_text,
4433        },
4434        embedded,
4435        #[cfg(feature = "parallel_segments")]
4436        parallel_input: None,
4437    })
4438}
4439
4440fn canonical_storage_len(payload: &[u8]) -> Result<(usize, bool)> {
4441    if std::str::from_utf8(payload).is_ok() {
4442        let compressed = zstd::encode_all(Cursor::new(payload), 3)?;
4443        Ok((compressed.len(), true))
4444    } else {
4445        Ok((payload.len(), false))
4446    }
4447}
4448
4449fn print_report(report: &PutReport) {
4450    let name = report
4451        .source
4452        .as_ref()
4453        .map(|path| {
4454            let pretty = env::current_dir()
4455                .ok()
4456                .as_ref()
4457                .and_then(|cwd| diff_paths(path, cwd))
4458                .unwrap_or_else(|| path.to_path_buf());
4459            pretty.to_string_lossy().into_owned()
4460        })
4461        .unwrap_or_else(|| "stdin".to_string());
4462
4463    let ratio = if report.original_bytes > 0 {
4464        (report.stored_bytes as f64 / report.original_bytes as f64) * 100.0
4465    } else {
4466        100.0
4467    };
4468
4469    println!("• {name} → {}", report.uri);
4470    println!("  seq: {}", report.seq);
4471    if report.no_raw {
4472        println!(
4473            "  size: {} B (--no-raw: text only, hash stored)",
4474            report.original_bytes
4475        );
4476    } else if report.compressed {
4477        println!(
4478            "  size: {} B → {} B ({:.1}% of original, compressed)",
4479            report.original_bytes, report.stored_bytes, ratio
4480        );
4481    } else {
4482        println!("  size: {} B (stored as-is)", report.original_bytes);
4483    }
4484    if let Some(mime) = &report.mime {
4485        println!("  mime: {mime}");
4486    }
4487    if let Some(title) = &report.title {
4488        println!("  title: {title}");
4489    }
4490    let extraction_reader = report.extraction.reader.as_deref().unwrap_or("n/a");
4491    println!(
4492        "  extraction: status={} reader={}",
4493        report.extraction.status.label(),
4494        extraction_reader
4495    );
4496    if let Some(ms) = report.extraction.duration_ms {
4497        println!("  extraction-duration: {} ms", ms);
4498    }
4499    if let Some(pages) = report.extraction.pages_processed {
4500        println!("  extraction-pages: {}", pages);
4501    }
4502    for warning in &report.extraction.warnings {
4503        println!("  warning: {warning}");
4504    }
4505    if let Some(metadata) = &report.metadata {
4506        if let Some(caption) = &metadata.caption {
4507            println!("  caption: {caption}");
4508        }
4509        if let Some(audio) = metadata.audio.as_ref() {
4510            if audio.duration_secs.is_some()
4511                || audio.sample_rate_hz.is_some()
4512                || audio.channels.is_some()
4513            {
4514                let duration = audio
4515                    .duration_secs
4516                    .map(|secs| format!("{secs:.1}s"))
4517                    .unwrap_or_else(|| "unknown".into());
4518                let rate = audio
4519                    .sample_rate_hz
4520                    .map(|hz| format!("{} Hz", hz))
4521                    .unwrap_or_else(|| "? Hz".into());
4522                let channels = audio
4523                    .channels
4524                    .map(|ch| ch.to_string())
4525                    .unwrap_or_else(|| "?".into());
4526                println!("  audio: duration {duration}, {channels} ch, {rate}");
4527            }
4528        }
4529    }
4530
4531    println!();
4532}
4533
4534fn put_report_to_json(report: &PutReport) -> serde_json::Value {
4535    let source = report
4536        .source
4537        .as_ref()
4538        .map(|path| path.to_string_lossy().into_owned());
4539    let extraction = json!({
4540        "status": report.extraction.status.label(),
4541        "reader": report.extraction.reader,
4542        "duration_ms": report.extraction.duration_ms,
4543        "pages_processed": report.extraction.pages_processed,
4544        "warnings": report.extraction.warnings,
4545    });
4546    json!({
4547        "seq": report.seq,
4548        "uri": report.uri,
4549        "title": report.title,
4550        "original_bytes": report.original_bytes,
4551        "original_bytes_human": format_bytes(report.original_bytes as u64),
4552        "stored_bytes": report.stored_bytes,
4553        "stored_bytes_human": format_bytes(report.stored_bytes as u64),
4554        "compressed": report.compressed,
4555        "mime": report.mime,
4556        "source": source,
4557        "metadata": report.metadata,
4558        "extraction": extraction,
4559    })
4560}
4561
4562fn map_put_error(err: MemvidError, _capacity_hint: Option<u64>) -> anyhow::Error {
4563    match err {
4564        MemvidError::CapacityExceeded {
4565            current,
4566            limit,
4567            required,
4568        } => anyhow!(CapacityExceededMessage {
4569            current,
4570            limit,
4571            required,
4572        }),
4573        other => other.into(),
4574    }
4575}
4576
4577fn apply_metadata_overrides(options: &mut PutOptions, entries: &[String]) {
4578    for (idx, entry) in entries.iter().enumerate() {
4579        match serde_json::from_str::<DocMetadata>(entry) {
4580            Ok(meta) => {
4581                options.metadata = Some(meta);
4582            }
4583            Err(_) => match serde_json::from_str::<serde_json::Value>(entry) {
4584                Ok(value) => {
4585                    options
4586                        .extra_metadata
4587                        .insert(format!("custom_metadata_{idx}"), value.to_string());
4588                }
4589                Err(err) => warn!("failed to parse --metadata JSON: {err}"),
4590            },
4591        }
4592    }
4593}
4594
4595fn transcript_notice_message(mem: &mut Memvid) -> Result<String> {
4596    let stats = mem.stats()?;
4597    let message = match stats.tier {
4598        Tier::Free => "Transcript requires Dev/Enterprise tier. Apply a ticket to enable.".to_string(),
4599        Tier::Dev | Tier::Enterprise => "Transcript capture will attach in a future update; no transcript generated in this build.".to_string(),
4600    };
4601    Ok(message)
4602}
4603
4604fn sanitize_uri(raw: &str, keep_path: bool) -> String {
4605    let trimmed = raw.trim().trim_start_matches("mv2://");
4606    let normalized = trimmed.replace('\\', "/");
4607    let mut segments: Vec<String> = Vec::new();
4608    for segment in normalized.split('/') {
4609        if segment.is_empty() || segment == "." {
4610            continue;
4611        }
4612        if segment == ".." {
4613            segments.pop();
4614            continue;
4615        }
4616        segments.push(segment.to_string());
4617    }
4618
4619    if segments.is_empty() {
4620        return normalized
4621            .split('/')
4622            .last()
4623            .filter(|s| !s.is_empty())
4624            .unwrap_or("document")
4625            .to_string();
4626    }
4627
4628    if keep_path {
4629        segments.join("/")
4630    } else {
4631        segments
4632            .last()
4633            .cloned()
4634            .unwrap_or_else(|| "document".to_string())
4635    }
4636}
4637
4638fn create_task_progress_bar(total: Option<u64>, message: &str, quiet: bool) -> ProgressBar {
4639    if quiet {
4640        let pb = ProgressBar::hidden();
4641        pb.set_message(message.to_string());
4642        return pb;
4643    }
4644
4645    match total {
4646        Some(len) => {
4647            let pb = ProgressBar::new(len);
4648            pb.set_draw_target(ProgressDrawTarget::stderr());
4649            let style = ProgressStyle::with_template("{msg:>9} {pos}/{len}")
4650                .unwrap_or_else(|_| ProgressStyle::default_bar());
4651            pb.set_style(style);
4652            pb.set_message(message.to_string());
4653            pb
4654        }
4655        None => {
4656            let pb = ProgressBar::new_spinner();
4657            pb.set_draw_target(ProgressDrawTarget::stderr());
4658            pb.set_message(message.to_string());
4659            pb.enable_steady_tick(Duration::from_millis(120));
4660            pb
4661        }
4662    }
4663}
4664
4665fn create_spinner(message: &str) -> ProgressBar {
4666    let pb = ProgressBar::new_spinner();
4667    pb.set_draw_target(ProgressDrawTarget::stderr());
4668    let style = ProgressStyle::with_template("{spinner} {msg}")
4669        .unwrap_or_else(|_| ProgressStyle::default_spinner())
4670        .tick_strings(&["-", "\\", "|", "/"]);
4671    pb.set_style(style);
4672    pb.set_message(message.to_string());
4673    pb.enable_steady_tick(Duration::from_millis(120));
4674    pb
4675}
4676
4677// ============================================================================
4678// put-many command - batch ingestion with pre-computed embeddings
4679// ============================================================================
4680
4681/// Arguments for the `put-many` subcommand
4682#[cfg(feature = "parallel_segments")]
4683#[derive(Args)]
4684pub struct PutManyArgs {
4685    /// Path to the memory file to append to
4686    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4687    pub file: PathBuf,
4688    /// Emit JSON instead of human-readable output
4689    #[arg(long)]
4690    pub json: bool,
4691    /// Path to JSON file containing batch requests (array of objects with title, label, text, uri, tags, labels, metadata, embedding)
4692    /// If not specified, reads from stdin
4693    #[arg(long, value_name = "PATH")]
4694    pub input: Option<PathBuf>,
4695    /// Zstd compression level (1-9, default: 3)
4696    #[arg(long, default_value = "3")]
4697    pub compression_level: i32,
4698    /// Store raw binary content along with extracted text.
4699    /// By default, only extracted text + BLAKE3 hash is stored (--no-raw behavior).
4700    #[arg(long)]
4701    pub raw: bool,
4702    /// Don't store raw binary content (DEFAULT behavior).
4703    /// Kept for backwards compatibility.
4704    #[arg(long, hide = true)]
4705    pub no_raw: bool,
4706    /// Skip ingestion if identical content (by BLAKE3 hash) already exists.
4707    /// Returns the existing frame's ID instead of creating a duplicate.
4708    #[arg(long)]
4709    pub dedup: bool,
4710    #[command(flatten)]
4711    pub lock: LockCliArgs,
4712}
4713
4714/// JSON input format for put-many requests
4715#[cfg(feature = "parallel_segments")]
4716#[derive(Debug, serde::Deserialize)]
4717pub struct PutManyRequest {
4718    pub title: String,
4719    #[serde(default)]
4720    pub label: String,
4721    pub text: String,
4722    #[serde(default)]
4723    pub uri: Option<String>,
4724    #[serde(default)]
4725    pub tags: Vec<String>,
4726    #[serde(default)]
4727    pub labels: Vec<String>,
4728    #[serde(default)]
4729    pub metadata: serde_json::Value,
4730    /// Extra metadata (string key/value) stored on the frame.
4731    /// Use this to persist embedding identity keys like `memvid.embedding.model`.
4732    #[serde(default)]
4733    pub extra_metadata: BTreeMap<String, String>,
4734    /// Pre-computed embedding vector for the parent document (e.g., from OpenAI)
4735    #[serde(default)]
4736    pub embedding: Option<Vec<f32>>,
4737    /// Pre-computed embeddings for each chunk (matched by index)
4738    /// If the document gets chunked, these embeddings are assigned to each chunk frame.
4739    /// The number of chunk embeddings should match the number of chunks that will be created.
4740    #[serde(default)]
4741    pub chunk_embeddings: Option<Vec<Vec<f32>>>,
4742    /// Per-item override for no_raw mode. If not specified, uses the global --no-raw flag.
4743    /// When true, stores only extracted text + BLAKE3 hash instead of raw content.
4744    #[serde(default)]
4745    pub no_raw: Option<bool>,
4746    /// Per-item override for dedup mode. If not specified, uses the global --dedup flag.
4747    /// When true, skips ingestion if identical content already exists.
4748    #[serde(default)]
4749    pub dedup: Option<bool>,
4750}
4751
4752/// Batch input format (array of requests)
4753#[cfg(feature = "parallel_segments")]
4754#[derive(Debug, serde::Deserialize)]
4755#[serde(transparent)]
4756pub struct PutManyBatch {
4757    pub requests: Vec<PutManyRequest>,
4758}
4759
4760#[cfg(feature = "parallel_segments")]
4761pub fn handle_put_many(config: &crate::config::CliConfig, args: PutManyArgs) -> Result<()> {
4762    use memvid_core::{BuildOpts, Memvid, ParallelInput, ParallelPayload, PutOptions};
4763    use std::io::Read;
4764
4765    // Check if plan allows write operations (blocks expired subscriptions)
4766    crate::utils::require_active_plan(config, "put-many")?;
4767
4768    let mut mem = Memvid::open(&args.file)?;
4769    crate::utils::ensure_cli_mutation_allowed(&mem)?;
4770    crate::utils::apply_lock_cli(&mut mem, &args.lock);
4771
4772    // Ensure indexes are enabled
4773    let stats = mem.stats()?;
4774    if !stats.has_lex_index {
4775        mem.enable_lex()?;
4776    }
4777    if !stats.has_vec_index {
4778        mem.enable_vec()?;
4779    }
4780
4781    // Check if current memory size exceeds free tier limit
4782    crate::utils::ensure_capacity_with_api_key(stats.size_bytes, 0, config)?;
4783
4784    // Read batch input
4785    let batch_json: String = if let Some(input_path) = &args.input {
4786        fs::read_to_string(input_path)
4787            .with_context(|| format!("Failed to read input file: {}", input_path.display()))?
4788    } else {
4789        let mut buffer = String::new();
4790        io::stdin()
4791            .read_to_string(&mut buffer)
4792            .context("Failed to read from stdin")?;
4793        buffer
4794    };
4795
4796    let batch: PutManyBatch =
4797        serde_json::from_str(&batch_json).context("Failed to parse JSON input")?;
4798
4799    if batch.requests.is_empty() {
4800        if args.json {
4801            println!("{{\"frame_ids\": [], \"count\": 0}}");
4802        } else {
4803            println!("No requests to process");
4804        }
4805        return Ok(());
4806    }
4807
4808    let count = batch.requests.len();
4809    if !args.json {
4810        eprintln!("Processing {} documents...", count);
4811    }
4812
4813    // Convert to ParallelInput
4814    // Default: no-raw mode (only store extracted text + BLAKE3 hash)
4815    // Use --raw to store raw binary content for later retrieval
4816    let global_no_raw = !args.raw; // true by default unless --raw is passed
4817    let global_dedup = args.dedup;
4818    let inputs: Vec<ParallelInput> = batch
4819        .requests
4820        .into_iter()
4821        .enumerate()
4822        .map(|(i, req)| {
4823            let uri = req.uri.unwrap_or_else(|| format!("mv2://batch/{}", i));
4824            let mut options = PutOptions::default();
4825            options.title = Some(req.title.clone());
4826            options.uri = Some(uri);
4827            options.tags = req.tags;
4828            options.labels = req.labels;
4829            options.extra_metadata = req.extra_metadata;
4830            // Use per-item no_raw if specified, otherwise use global --no-raw flag
4831            options.no_raw = req.no_raw.unwrap_or(global_no_raw);
4832            // Use per-item dedup if specified, otherwise use global --dedup flag
4833            options.dedup = req.dedup.unwrap_or(global_dedup);
4834            if !req.metadata.is_null() {
4835                match serde_json::from_value::<DocMetadata>(req.metadata.clone()) {
4836                    Ok(meta) => options.metadata = Some(meta),
4837                    Err(_) => {
4838                        options
4839                            .extra_metadata
4840                            .entry("memvid.metadata.json".to_string())
4841                            .or_insert_with(|| req.metadata.to_string());
4842                    }
4843                }
4844            }
4845
4846            if let Some(embedding) = req
4847                .embedding
4848                .as_ref()
4849                .or_else(|| req.chunk_embeddings.as_ref().and_then(|emb| emb.first()))
4850            {
4851                options
4852                    .extra_metadata
4853                    .entry(MEMVID_EMBEDDING_DIMENSION_KEY.to_string())
4854                    .or_insert_with(|| embedding.len().to_string());
4855            }
4856
4857            ParallelInput {
4858                payload: ParallelPayload::Bytes(req.text.into_bytes()),
4859                options,
4860                embedding: req.embedding,
4861                chunk_embeddings: req.chunk_embeddings,
4862            }
4863        })
4864        .collect();
4865
4866    // Build options
4867    let build_opts = BuildOpts {
4868        zstd_level: args.compression_level.clamp(1, 9),
4869        ..BuildOpts::default()
4870    };
4871
4872    // Ingest batch
4873    let frame_ids = mem
4874        .put_parallel_inputs(&inputs, build_opts)
4875        .context("Failed to ingest batch")?;
4876
4877    if args.json {
4878        let output = serde_json::json!({
4879            "frame_ids": frame_ids,
4880            "count": frame_ids.len()
4881        });
4882        println!("{}", serde_json::to_string(&output)?);
4883    } else {
4884        println!("Ingested {} documents", frame_ids.len());
4885        if frame_ids.len() <= 10 {
4886            for fid in &frame_ids {
4887                println!("  frame_id: {}", fid);
4888            }
4889        } else {
4890            println!("  first: {}", frame_ids.first().unwrap_or(&0));
4891            println!("  last:  {}", frame_ids.last().unwrap_or(&0));
4892        }
4893    }
4894
4895    Ok(())
4896}
4897
4898/// Arguments for the `correct` subcommand
4899#[derive(Args)]
4900pub struct CorrectArgs {
4901    /// Path to the memory file
4902    #[arg(value_name = "FILE", value_parser = clap::value_parser!(PathBuf))]
4903    pub file: PathBuf,
4904    /// The correction statement (e.g., "Ben Koenig reported to Chloe Nguyen before 2025")
4905    #[arg(value_name = "STATEMENT")]
4906    pub statement: String,
4907    /// Topics for better retrieval matching (comma-separated)
4908    #[arg(long, value_name = "TOPICS", value_delimiter = ',')]
4909    pub topics: Option<Vec<String>>,
4910    /// Source attribution for the correction
4911    #[arg(long, value_name = "SOURCE")]
4912    pub source: Option<String>,
4913    /// Retrieval boost factor (default: 2.0)
4914    #[arg(long, default_value = "2.0")]
4915    pub boost: f64,
4916    /// Emit JSON instead of human-readable output
4917    #[arg(long)]
4918    pub json: bool,
4919    #[command(flatten)]
4920    pub lock: LockCliArgs,
4921}
4922
4923/// Store a correction with retrieval priority boost.
4924///
4925/// Corrections are stored as frames with special metadata that gives them
4926/// priority during retrieval. This ensures corrected information surfaces
4927/// first when relevant queries are made.
4928pub fn handle_correct(config: &CliConfig, args: CorrectArgs) -> Result<()> {
4929    crate::utils::require_active_plan(config, "correct")?;
4930
4931    let mut mem = Memvid::open(&args.file)?;
4932    ensure_cli_mutation_allowed(&mem)?;
4933    apply_lock_cli(&mut mem, &args.lock);
4934
4935    let stats = mem.stats()?;
4936    if stats.frame_count == 0 && !stats.has_lex_index {
4937        mem.enable_lex()?;
4938    }
4939
4940    // Build extra_metadata for correction
4941    let mut extra_metadata = BTreeMap::new();
4942    extra_metadata.insert("memvid.correction".to_string(), "true".to_string());
4943    extra_metadata.insert(
4944        "memvid.correction.boost".to_string(),
4945        args.boost.to_string(),
4946    );
4947    if let Some(source) = &args.source {
4948        extra_metadata.insert("memvid.correction.source".to_string(), source.clone());
4949    }
4950
4951    // Convert topics to tags for better retrieval matching
4952    let tags: Vec<String> = args.topics.clone().unwrap_or_default();
4953
4954    // Build put options
4955    let mut options = PutOptions::default();
4956    options.title = Some(format!(
4957        "Correction: {}",
4958        truncate_title(&args.statement, 50)
4959    ));
4960    options.uri = Some(format!("mv2://correction/{}", Uuid::new_v4()));
4961    options.labels = vec!["correction".to_string()];
4962    options.tags = tags;
4963    options.extra_metadata = extra_metadata;
4964    options.no_raw = true; // Corrections are text-only
4965
4966    // Store the correction
4967    let frame_id = mem.put_bytes_with_options(args.statement.as_bytes(), options)?;
4968
4969    if args.json {
4970        let output = json!({
4971            "frame_id": frame_id,
4972            "type": "correction",
4973            "boost": args.boost,
4974            "topics": args.topics.unwrap_or_default(),
4975            "source": args.source,
4976        });
4977        println!("{}", serde_json::to_string_pretty(&output)?);
4978    } else {
4979        println!("✓ Correction stored (frame_id: {})", frame_id);
4980        if let Some(topics) = &args.topics {
4981            println!("  Topics: {}", topics.join(", "));
4982        }
4983        if let Some(source) = &args.source {
4984            println!("  Source: {}", source);
4985        }
4986        println!("  Boost:  {}x", args.boost);
4987    }
4988
4989    Ok(())
4990}
4991
4992/// Truncate a string to a maximum length, adding "..." if truncated
4993fn truncate_title(s: &str, max_len: usize) -> String {
4994    if s.len() <= max_len {
4995        s.to_string()
4996    } else {
4997        format!("{}...", &s[..max_len.saturating_sub(3)])
4998    }
4999}