Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118    )]
119    pub enable_ner: bool,
120
121    /// GAP-E2E-011: gera description heurística a partir da primeira linha
122    /// significativa do body, em vez de "ingested from `<path>`". Quando
123    /// `--no-auto-describe` é passado, mantém o comportamento legado.
124    #[arg(
125        long,
126        default_value_t = true,
127        overrides_with = "no_auto_describe",
128        help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129    )]
130    pub auto_describe: bool,
131    #[arg(
132        long = "no-auto-describe",
133        default_value_t = false,
134        help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135    )]
136    pub no_auto_describe: bool,
137    #[arg(
138        long,
139        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140        default_value = "fp32",
141        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142    )]
143    pub gliner_variant: String,
144
145    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
146    #[arg(long, default_value_t = false, hide = true)]
147    pub skip_extraction: bool,
148
149    /// Stop on first per-file error instead of continuing with the next file.
150    #[arg(long, default_value_t = false)]
151    pub fail_fast: bool,
152
153    /// Preview file-to-name mapping without loading model or persisting.
154    #[arg(long, default_value_t = false)]
155    pub dry_run: bool,
156
157    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
158    #[arg(long, default_value_t = 10_000)]
159    pub max_files: usize,
160
161    /// Namespace for the ingested memories.
162    #[arg(long)]
163    pub namespace: Option<String>,
164
165    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
166    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167    pub db: Option<String>,
168
169    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170    pub format: JsonOutputFormat,
171
172    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173    pub json: bool,
174
175    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
176    #[arg(
177        long,
178        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179    )]
180    pub ingest_parallelism: Option<usize>,
181
182    /// Force single-threaded ingest to reduce RSS pressure.
183    ///
184    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
185    /// explicit value. Recommended for environments with <4 GB available
186    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
187    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
188    /// (CLI flag has higher precedence than the env var).
189    #[arg(
190        long,
191        default_value_t = false,
192        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193                Recommended for environments with <4 GB available RAM or container/cgroup \
194                constraints. Trade-off: 3-4x longer wall time. Also honored via \
195                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196    )]
197    pub low_memory: bool,
198
199    /// Maximum process RSS in MiB; abort if exceeded during embedding.
200    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202    pub max_rss_mb: u64,
203
204    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
205    /// PER FILE. Multiplies with --ingest-parallelism (files staged
206    /// concurrently), hence the conservative default of 2. The effective
207    /// value is further bounded by CPU count and available RAM.
208    #[arg(long, default_value_t = 2, value_name = "N",
209          value_parser = clap::value_parser!(u64).range(1..=32),
210          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211    pub llm_parallelism: u64,
212
213    /// Maximum character length for derived memory names from file basenames.
214    ///
215    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
216    /// Shorter values leave more headroom for collision suffix resolution.
217    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218          help = "Maximum length for derived memory names (default: 60)")]
219    pub max_name_length: usize,
220
221    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
222    #[arg(long, value_enum, default_value_t = IngestMode::None)]
223    pub mode: IngestMode,
224
225    /// Explicit path to the Claude Code binary (only with --mode claude-code).
226    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227    pub claude_binary: Option<std::path::PathBuf>,
228
229    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
230    #[arg(long)]
231    pub claude_model: Option<String>,
232
233    /// Resume a previously interrupted claude-code ingest from the queue DB.
234    #[arg(long, default_value_t = false)]
235    pub resume: bool,
236
237    /// Retry only failed files from a previous claude-code ingest.
238    #[arg(long, default_value_t = false)]
239    pub retry_failed: bool,
240
241    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
242    #[arg(long, default_value_t = false)]
243    pub keep_queue: bool,
244
245    /// Custom path for the claude-code ingest queue database.
246    #[arg(long, default_value = ".ingest-queue.sqlite")]
247    pub queue_db: String,
248
249    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
250    #[arg(long, default_value_t = 60)]
251    pub rate_limit_wait: u64,
252
253    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
254    #[arg(long)]
255    pub max_cost_usd: Option<f64>,
256
257    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
258    #[arg(
259        long,
260        default_value_t = 300,
261        help = "Timeout in seconds for each claude -p invocation (default: 300)"
262    )]
263    pub claude_timeout: u64,
264
265    /// Explicit path to the Codex CLI binary (only with --mode codex).
266    #[arg(
267        long,
268        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270    )]
271    pub codex_binary: Option<PathBuf>,
272
273    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
274    #[arg(
275        long,
276        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277    )]
278    pub codex_model: Option<String>,
279
280    /// Timeout in seconds for each codex exec invocation.
281    #[arg(
282        long,
283        default_value_t = 300,
284        help = "Timeout in seconds for each codex exec invocation (default: 300)"
285    )]
286    pub codex_timeout: u64,
287
288    /// Path to the `opencode` binary (override PATH lookup, only with --mode opencode).
289    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290    pub opencode_binary: Option<PathBuf>,
291
292    /// Model override for OpenCode extraction.
293    #[arg(
294        long,
295        value_name = "MODEL",
296        env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297        help = "Model override for OpenCode extraction"
298    )]
299    pub opencode_model: Option<String>,
300
301    /// Timeout in seconds for each opencode run invocation.
302    #[arg(
303        long,
304        value_name = "SECONDS",
305        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306        default_value_t = 300,
307        help = "Timeout in seconds for each opencode run invocation (default: 300)"
308    )]
309    pub opencode_timeout: u64,
310
311    /// G30: poll for the job singleton every second for up to N seconds
312    /// when another invocation holds the lock. Default: 0 (fail fast).
313    #[arg(long, value_name = "SECONDS")]
314    pub wait_job_singleton: Option<u64>,
315
316    /// G30: force acquisition of the singleton lock by removing a stale
317    /// lock file from a previously crashed invocation.
318    #[arg(long, default_value_t = false)]
319    pub force_job_singleton: bool,
320}
321
322/// Extraction mode for the ingest pipeline.
323#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
324pub enum IngestMode {
325    /// Body-only ingestion without entity/relationship extraction (default).
326    None,
327    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
328    Gliner,
329    /// LLM-curated extraction via locally installed Claude Code CLI.
330    ClaudeCode,
331    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
332    Codex,
333    /// LLM-curated extraction via locally installed OpenCode CLI.
334    #[value(name = "opencode")]
335    Opencode,
336}
337
338/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
339/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
340/// values evaluate to false. Unrecognized non-empty values emit a
341/// `tracing::warn!` and evaluate to false.
342fn env_low_memory_enabled() -> bool {
343    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
344        Ok(v) if v.is_empty() => false,
345        Ok(v) => match v.to_lowercase().as_str() {
346            "1" | "true" | "yes" | "on" => true,
347            "0" | "false" | "no" | "off" => false,
348            other => {
349                tracing::warn!(
350                    target: "ingest",
351                    value = %other,
352                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
353                );
354                false
355            }
356        },
357        Err(_) => false,
358    }
359}
360
361/// Resolves the effective ingest parallelism honoring `--low-memory` and the
362/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
363///
364/// Precedence:
365/// 1. `--low-memory` CLI flag forces parallelism = 1.
366/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
367/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
368/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
369///
370/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
371/// emits a `tracing::warn!` advertising the override.
372fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
373    let env_flag = env_low_memory_enabled();
374    let low_memory = low_memory_flag || env_flag;
375
376    if low_memory {
377        if let Some(n) = ingest_parallelism {
378            if n > 1 {
379                tracing::warn!(
380                    target: "ingest",
381                    requested = n,
382                    "--ingest-parallelism overridden by --low-memory; using 1"
383                );
384            }
385        }
386        if low_memory_flag {
387            tracing::info!(
388                target: "ingest",
389                source = "flag",
390                "low-memory mode enabled: forcing --ingest-parallelism 1"
391            );
392        } else {
393            tracing::info!(
394                target: "ingest",
395                source = "env",
396                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
397            );
398        }
399        return 1;
400    }
401
402    ingest_parallelism
403        .unwrap_or_else(|| {
404            std::thread::available_parallelism()
405                .map(|v| v.get() / 2)
406                .unwrap_or(1)
407                .clamp(1, 4)
408        })
409        .max(1)
410}
411
412#[derive(Serialize)]
413struct IngestFileEvent<'a> {
414    file: &'a str,
415    name: &'a str,
416    status: &'a str,
417    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
418    truncated: bool,
419    /// Original derived name before truncation; only present when `truncated=true`.
420    #[serde(skip_serializing_if = "Option::is_none")]
421    original_name: Option<String>,
422    /// Original file basename (without extension); only present when it differs from `name`.
423    #[serde(skip_serializing_if = "Option::is_none")]
424    original_filename: Option<&'a str>,
425    #[serde(skip_serializing_if = "Option::is_none")]
426    error: Option<String>,
427    #[serde(skip_serializing_if = "Option::is_none")]
428    memory_id: Option<i64>,
429    #[serde(skip_serializing_if = "Option::is_none")]
430    action: Option<String>,
431    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
432    body_length: usize,
433    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
434    /// executou o embedding live. `"claude" | "codex" | "none"`. Absent on
435    /// the wire when `None` (kept for happy-path envelope cleanliness, ou
436    /// quando o arquivo não chegou à fase de embed por duplicação/erro).
437    #[serde(skip_serializing_if = "Option::is_none")]
438    backend_invoked: Option<&'a str>,
439}
440
441#[derive(Serialize)]
442struct IngestSummary {
443    summary: bool,
444    dir: String,
445    pattern: String,
446    recursive: bool,
447    files_total: usize,
448    files_succeeded: usize,
449    files_failed: usize,
450    files_skipped: usize,
451    elapsed_ms: u64,
452}
453
454/// Outcome of a successful per-file ingest, used to build the NDJSON event.
455struct FileSuccess {
456    memory_id: i64,
457    action: String,
458    body_length: usize,
459    backend_invoked: Option<&'static str>,
460}
461
462/// NDJSON progress event emitted to stderr after each file completes Phase A.
463/// Schema version 1; consumers should check `schema_version` before parsing.
464#[derive(Serialize)]
465struct StageProgressEvent<'a> {
466    schema_version: u8,
467    event: &'a str,
468    path: &'a str,
469    ms: u64,
470    entities: usize,
471    relationships: usize,
472}
473
474/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
475/// Phase B persists these to SQLite on the main thread in submission order.
476struct StagedFile {
477    body: String,
478    body_hash: String,
479    snippet: String,
480    name: String,
481    description: String,
482    embedding: Option<Vec<f32>>,
483    chunk_embeddings: Option<Vec<Vec<f32>>>,
484    chunks_info: Vec<crate::chunking::Chunk>,
485    entities: Vec<NewEntity>,
486    relationships: Vec<NewRelationship>,
487    entity_embeddings: Option<Vec<Vec<f32>>>,
488    urls: Vec<crate::extraction::ExtractedUrl>,
489    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
490    /// executou o embedding do body. `None` quando o batch paralelo
491    /// embed_passages_parallel_local fallback em backends diferentes
492    /// entre chunks (não há um único discriminador estável).
493    backend_invoked: Option<&'static str>,
494}
495
496/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
497/// Never touches the database — safe to run on any rayon thread.
498// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
499// stage knobs into a struct is a wider refactor than the surgical
500// scope of v1.0.79 allows.
501#[allow(clippy::too_many_arguments)]
502fn stage_file(
503    _idx: usize,
504    path: &Path,
505    name: &str,
506    paths: &AppPaths,
507    enable_ner: bool,
508    gliner_variant: crate::extraction::GlinerVariant,
509    max_rss_mb: u64,
510    llm_parallelism: usize,
511    llm_backend: crate::cli::LlmBackendChoice,
512    auto_describe: bool,
513) -> Result<StagedFile, AppError> {
514    use crate::constants::*;
515
516    if name.len() > MAX_MEMORY_NAME_LEN {
517        return Err(AppError::LimitExceeded(
518            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
519        ));
520    }
521    if name.starts_with("__") {
522        return Err(AppError::Validation(
523            crate::i18n::validation::reserved_name(),
524        ));
525    }
526    {
527        let slug_re = crate::constants::name_slug_regex();
528        if !slug_re.is_match(name) {
529            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
530                name,
531            )));
532        }
533    }
534
535    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
536    if file_size > MAX_MEMORY_BODY_LEN as u64 {
537        return Err(AppError::LimitExceeded(
538            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
539        ));
540    }
541    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
542    if raw_body.len() > MAX_MEMORY_BODY_LEN {
543        return Err(AppError::LimitExceeded(
544            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
545        ));
546    }
547    if raw_body.trim().is_empty() {
548        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
549    }
550
551    let description = if auto_describe {
552        crate::commands::ingest_heuristics::extract_heuristic_description(
553            &raw_body,
554            Some(&path.display().to_string()),
555        )
556    } else {
557        format!("ingested from {}", path.display())
558    };
559    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
560        return Err(AppError::Validation(
561            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
562        ));
563    }
564
565    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
566    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
567    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
568    if enable_ner {
569        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
570            Ok(extracted) => {
571                extracted_urls = extracted.urls;
572                // v1.0.76: ExtractionResult.entities is now
573                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
574                // via name + type only; start/end offsets are not
575                // carried forward into the storage layer.
576                extracted_entities = extracted
577                    .entities
578                    .into_iter()
579                    .map(|e| NewEntity {
580                        name: e.name,
581                        entity_type: crate::entity_type::EntityType::Concept,
582                        description: None,
583                    })
584                    .collect();
585                // v1.0.76: relationships are no longer in the
586                // ExtractionResult struct; the LLM backend returns
587                // them in its own payload. The default build is
588                // URL-only extraction.
589                extracted_relationships.clear();
590
591                if extracted_entities.len() > max_entities_per_memory() {
592                    extracted_entities.truncate(max_entities_per_memory());
593                }
594                if extracted_relationships.len() > max_relationships_per_memory() {
595                    extracted_relationships.truncate(max_relationships_per_memory());
596                }
597            }
598            Err(e) => {
599                tracing::warn!(
600                    target: "ingest",
601                    file = %path.display(),
602                    "auto-extraction failed (graceful degradation): {e:#}"
603                );
604            }
605        }
606    }
607
608    for rel in &mut extracted_relationships {
609        rel.relation = crate::parsers::normalize_relation(&rel.relation);
610        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
611            return Err(AppError::Validation(format!(
612                "{e} for relationship '{}' -> '{}'",
613                rel.source, rel.target
614            )));
615        }
616        crate::parsers::warn_if_non_canonical(&rel.relation);
617        if !(0.0..=1.0).contains(&rel.strength) {
618            return Err(AppError::Validation(format!(
619                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
620                rel.strength, rel.source, rel.target
621            )));
622        }
623    }
624
625    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
626    let snippet: String = raw_body.chars().take(200).collect();
627
628    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
629    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
630        return Err(AppError::LimitExceeded(format!(
631            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
632            chunks_info.len(),
633            REMEMBER_MAX_SAFE_MULTI_CHUNKS
634        )));
635    }
636
637    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
638    let skip_embed = crate::embedder::should_skip_embedding_on_failure();
639    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
640    // backend que efetivamente rodou para popular `backend_invoked` no
641    // envelope NDJSON por arquivo.
642    let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
643        .len()
644        == 1
645    {
646        // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
647        match crate::embedder::embed_passage_with_choice(
648            &paths.models,
649            &raw_body,
650            Some(llm_backend),
651        ) {
652            Ok((v, k)) => (Some(v), Some(k.as_str())),
653            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
654            Err(e) if skip_embed => {
655                tracing::warn!(error = %e, file = %path.display(), "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
656                (None, None)
657            }
658            Err(e) => return Err(e),
659        }
660    } else {
661        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
662        // serial per-chunk subprocess loop.
663        let chunk_texts: Vec<String> = chunks_info
664            .iter()
665            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
666            .collect();
667        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
668            if rss > max_rss_mb {
669                tracing::error!(
670                    target: "ingest",
671                    rss_mb = rss,
672                    max_rss_mb = max_rss_mb,
673                    file = %path.display(),
674                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
675                );
676                return Err(AppError::LowMemory {
677                    available_mb: crate::memory_guard::available_memory_mb(),
678                    required_mb: max_rss_mb,
679                });
680            }
681        }
682        match crate::embedder::embed_passages_parallel_local(
683            &paths.models,
684            &chunk_texts,
685            llm_parallelism,
686            crate::embedder::chunk_embed_batch_size(),
687        ) {
688            Ok(chunk_embeddings) => {
689                let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
690                chunk_embeddings_opt = Some(chunk_embeddings);
691                // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
692                // único por chamada. Conservadoramente, populamos None aqui.
693                (Some(aggregated), None)
694            }
695            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
696            Err(e) if skip_embed => {
697                tracing::warn!(error = %e, file = %path.display(), "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
698                (None, None)
699            }
700            Err(e) => return Err(e),
701        }
702    };
703
704    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
705    let entity_texts: Vec<String> = extracted_entities
706        .iter()
707        .map(|entity| match &entity.description {
708            Some(desc) => format!("{} {}", entity.name, desc),
709            None => entity.name.clone(),
710        })
711        .collect();
712    // G56 (v1.0.80): ingest reuses canonical entity names across many
713    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
714    // cache collapses the repeated LLM calls into one per unique text.
715    let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
716        &paths.models,
717        &entity_texts,
718        llm_parallelism,
719    ) {
720        Ok((entity_embeddings, embed_cache_stats)) => {
721            if embed_cache_stats.hits > 0 {
722                tracing::debug!(
723                    hits = embed_cache_stats.hits,
724                    misses = embed_cache_stats.misses,
725                    requested = embed_cache_stats.requested,
726                    "G56: entity embed cache hit (ingest)"
727                );
728            }
729            Some(entity_embeddings)
730        }
731        Err(e) if skip_embed => {
732            tracing::warn!(error = %e, file = %path.display(), "ingest: entity embedding failed; --skip-embedding-on-failure active");
733            None
734        }
735        Err(e) => return Err(e),
736    };
737
738    Ok(StagedFile {
739        body: raw_body,
740        body_hash,
741        snippet,
742        name: name.to_string(),
743        description,
744        embedding,
745        chunk_embeddings: chunk_embeddings_opt,
746        chunks_info,
747        entities: extracted_entities,
748        relationships: extracted_relationships,
749        entity_embeddings: entity_embeddings_opt,
750        urls: extracted_urls,
751        backend_invoked,
752    })
753}
754
755/// Phase B: persists one `StagedFile` to the database on the main thread.
756fn persist_staged(
757    conn: &mut Connection,
758    namespace: &str,
759    memory_type: &str,
760    staged: StagedFile,
761) -> Result<FileSuccess, AppError> {
762    {
763        let active_count: u32 = conn.query_row(
764            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
765            [],
766            |r| r.get::<_, i64>(0).map(|v| v as u32),
767        )?;
768        let ns_exists: bool = conn.query_row(
769            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
770            rusqlite::params![namespace],
771            |r| r.get::<_, i64>(0).map(|v| v > 0),
772        )?;
773        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
774            return Err(AppError::NamespaceError(format!(
775                "active namespace limit of {} exceeded while creating '{namespace}'",
776                crate::constants::MAX_NAMESPACES_ACTIVE
777            )));
778        }
779    }
780
781    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
782    if existing_memory.is_some() {
783        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
784            &staged.name,
785            namespace,
786        )));
787    }
788    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
789
790    let new_memory = NewMemory {
791        namespace: namespace.to_string(),
792        name: staged.name.clone(),
793        memory_type: memory_type.to_string(),
794        description: staged.description.clone(),
795        body: staged.body,
796        body_hash: staged.body_hash,
797        session_id: None,
798        source: "agent".to_string(),
799        metadata: serde_json::json!({}),
800    };
801
802    if let Some(hash_id) = duplicate_hash_id {
803        tracing::debug!(
804            target: "ingest",
805            duplicate_memory_id = hash_id,
806            "identical body already exists; persisting a new memory anyway"
807        );
808    }
809
810    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
811
812    let memory_id = memories::insert(&tx, &new_memory)?;
813    versions::insert_version(
814        &tx,
815        memory_id,
816        1,
817        &staged.name,
818        memory_type,
819        &staged.description,
820        &new_memory.body,
821        &serde_json::to_string(&new_memory.metadata)?,
822        None,
823        "create",
824    )?;
825    if let Some(ref emb) = staged.embedding {
826        memories::upsert_vec(
827            &tx,
828            memory_id,
829            namespace,
830            memory_type,
831            emb,
832            &staged.name,
833            &staged.snippet,
834        )?;
835    }
836
837    if staged.chunks_info.len() > 1 {
838        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
839        if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
840            for (i, emb) in chunk_embeddings.iter().enumerate() {
841                storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
842            }
843        }
844    }
845
846    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
847        for (idx, entity) in staged.entities.iter().enumerate() {
848            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
849            if let Some(ref entity_embeddings) = staged.entity_embeddings {
850                if let Some(entity_embedding) = entity_embeddings.get(idx) {
851                    entities::upsert_entity_vec(
852                        &tx,
853                        entity_id,
854                        namespace,
855                        entity.entity_type,
856                        entity_embedding,
857                        &entity.name,
858                    )?;
859                }
860            }
861            entities::link_memory_entity(&tx, memory_id, entity_id)?;
862            entities::increment_degree(&tx, entity_id)?;
863        }
864        let entity_types: std::collections::HashMap<&str, EntityType> = staged
865            .entities
866            .iter()
867            .map(|entity| (entity.name.as_str(), entity.entity_type))
868            .collect();
869        for rel in &staged.relationships {
870            let source_entity = NewEntity {
871                name: rel.source.clone(),
872                entity_type: entity_types
873                    .get(rel.source.as_str())
874                    .copied()
875                    .unwrap_or(EntityType::Concept),
876                description: None,
877            };
878            let target_entity = NewEntity {
879                name: rel.target.clone(),
880                entity_type: entity_types
881                    .get(rel.target.as_str())
882                    .copied()
883                    .unwrap_or(EntityType::Concept),
884                description: None,
885            };
886            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
887            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
888            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
889            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
890        }
891    }
892
893    tx.commit()?;
894
895    if !staged.urls.is_empty() {
896        let url_entries: Vec<storage_urls::MemoryUrl> = staged
897            .urls
898            .into_iter()
899            .map(|u| storage_urls::MemoryUrl {
900                url: u.url,
901                offset: Some(u.start as i64),
902            })
903            .collect();
904        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
905    }
906
907    Ok(FileSuccess {
908        memory_id,
909        action: "created".to_string(),
910        body_length: new_memory.body.len(),
911        backend_invoked: staged.backend_invoked,
912    })
913}
914
915// ---------------------------------------------------------------------------
916// G20: mode-conditional flag validation
917// ---------------------------------------------------------------------------
918
919/// True when a scalar value matches its declared default. Local
920/// re-declaration (also defined in ) to keep this module
921/// self-contained for the G20 fix.
922fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
923    value == default
924}
925
926/// G20: validate that flags for one LLM provider were not passed when
927/// the operator selected a different provider (or no provider). Flags
928/// silently discarded by the wrong mode are surfaced as
929///  BEFORE any DB work, so the operator gets
930/// an actionable error instead of a surprise at runtime.
931///
932/// Mode-specific matrices:
933/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
934///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
935///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
936///   --enable-ner is false)
937/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
938/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
939///   max_cost_usd, resume, retry_failed, keep_queue
940fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
941    const DEFAULT_TIMEOUT: u64 = 300;
942    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
943
944    let mut conflicts: Vec<String> = Vec::new();
945
946    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
947
948    if is_local_mode {
949        if args.claude_binary.is_some() {
950            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
951        }
952        if args.claude_model.is_some() {
953            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
954        }
955        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
956            conflicts.push(format!(
957                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
958                args.claude_timeout
959            ));
960        }
961        if args.codex_binary.is_some() {
962            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
963        }
964        if args.codex_model.is_some() {
965            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
966        }
967        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
968            conflicts.push(format!(
969                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
970                args.codex_timeout
971            ));
972        }
973        if args.opencode_binary.is_some() {
974            conflicts
975                .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
976        }
977        if args.opencode_model.is_some() {
978            conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
979        }
980        if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
981            conflicts.push(format!(
982                "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
983                args.opencode_timeout
984            ));
985        }
986        if args.max_cost_usd.is_some() {
987            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
988        }
989        if args.resume {
990            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
991        }
992        if args.retry_failed {
993            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
994        }
995        if args.keep_queue {
996            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
997        }
998        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
999            conflicts.push(format!(
1000                "--rate-limit-wait={} is ignored when --mode is none or gliner",
1001                args.rate_limit_wait
1002            ));
1003        }
1004    }
1005
1006    match args.mode {
1007        IngestMode::ClaudeCode => {
1008            if args.codex_binary.is_some() {
1009                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1010            }
1011            if args.codex_model.is_some() {
1012                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1013            }
1014            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1015                conflicts.push(format!(
1016                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1017                    args.codex_timeout
1018                ));
1019            }
1020            if args.opencode_binary.is_some() {
1021                conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1022            }
1023            if args.opencode_model.is_some() {
1024                conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1025            }
1026            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1027                conflicts.push(format!(
1028                    "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1029                    args.opencode_timeout
1030                ));
1031            }
1032        }
1033        IngestMode::Codex => {
1034            if args.claude_binary.is_some() {
1035                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1036            }
1037            if args.claude_model.is_some() {
1038                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1039            }
1040            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1041                conflicts.push(format!(
1042                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1043                    args.claude_timeout
1044                ));
1045            }
1046            if args.max_cost_usd.is_some() {
1047                conflicts.push(
1048                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1049                        .to_string(),
1050                );
1051            }
1052            if args.resume {
1053                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1054            }
1055            if args.retry_failed {
1056                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1057            }
1058            if args.keep_queue {
1059                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1060            }
1061            if args.opencode_binary.is_some() {
1062                conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1063            }
1064            if args.opencode_model.is_some() {
1065                conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1066            }
1067            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1068                conflicts.push(format!(
1069                    "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1070                    args.opencode_timeout
1071                ));
1072            }
1073        }
1074        IngestMode::Opencode => {
1075            if args.claude_binary.is_some() {
1076                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1077            }
1078            if args.claude_model.is_some() {
1079                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1080            }
1081            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1082                conflicts.push(format!(
1083                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1084                    args.claude_timeout
1085                ));
1086            }
1087            if args.codex_binary.is_some() {
1088                conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1089            }
1090            if args.codex_model.is_some() {
1091                conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1092            }
1093            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1094                conflicts.push(format!(
1095                    "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1096                    args.codex_timeout
1097                ));
1098            }
1099            if args.max_cost_usd.is_some() {
1100                conflicts.push(
1101                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1102                        .to_string(),
1103                );
1104            }
1105            if args.resume {
1106                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1107            }
1108            if args.retry_failed {
1109                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1110            }
1111            if args.keep_queue {
1112                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1113            }
1114        }
1115        IngestMode::None | IngestMode::Gliner => {}
1116    }
1117
1118    if !conflicts.is_empty() {
1119        return Err(AppError::Validation(format!(
1120            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
1121            args.mode,
1122            conflicts.join("\n  - ")
1123        )));
1124    }
1125
1126    Ok(())
1127}
1128
1129// ---------------------------------------------------------------------------
1130
1131#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1132pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1133    // G20: mode-conditional flag validation BEFORE any DB access.
1134    // Surfaces flags that the wrong mode would silently discard.
1135    validate_mode_conditional_flags_ingest(&args)?;
1136    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1137    if args.mode == IngestMode::ClaudeCode {
1138        return super::ingest_claude::run_claude_ingest(&args);
1139    }
1140    if args.mode == IngestMode::Codex {
1141        return super::ingest_codex::run_codex_ingest(&args);
1142    }
1143    if args.mode == IngestMode::Opencode {
1144        return super::ingest_opencode::run_opencode_ingest(&args);
1145    }
1146
1147    let started = std::time::Instant::now();
1148
1149    if !args.dir.exists() {
1150        return Err(AppError::Validation(format!(
1151            "directory not found: {}",
1152            args.dir.display()
1153        )));
1154    }
1155    if !args.dir.is_dir() {
1156        return Err(AppError::Validation(format!(
1157            "path is not a directory: {}",
1158            args.dir.display()
1159        )));
1160    }
1161
1162    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1163    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1164    files.sort_unstable();
1165
1166    if files.len() > args.max_files {
1167        return Err(AppError::Validation(format!(
1168            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1169            files.len(),
1170            args.max_files
1171        )));
1172    }
1173
1174    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1175    let memory_type_str = args.r#type.as_str().to_string();
1176
1177    let paths = AppPaths::resolve(args.db.as_deref())?;
1178    let mut conn_or_err = match init_storage(&paths) {
1179        Ok(c) => Ok(c),
1180        Err(e) => Err(format!("{e}")),
1181    };
1182
1183    let mut succeeded: usize = 0;
1184    let mut failed: usize = 0;
1185    let mut skipped: usize = 0;
1186    let total = files.len();
1187
1188    // Pre-resolve all names before parallelisation so Phase A workers see a
1189    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1190    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1191
1192    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1193    // ProcessItem: the data moved into the producer thread for Phase A computation.
1194    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1195    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1196    // boundary into the rayon producer.
1197    enum SlotMeta {
1198        Skip {
1199            file_str: String,
1200            derived_base: String,
1201            name_truncated: bool,
1202            original_name: Option<String>,
1203            original_filename: Option<String>,
1204            reason: String,
1205        },
1206        Process {
1207            file_str: String,
1208            derived_name: String,
1209            name_truncated: bool,
1210            original_name: Option<String>,
1211            original_filename: Option<String>,
1212        },
1213    }
1214
1215    struct ProcessItem {
1216        idx: usize,
1217        path: PathBuf,
1218        file_str: String,
1219        derived_name: String,
1220    }
1221
1222    let files_cap = files.len();
1223    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1224    slots_meta.try_reserve(files_cap).map_err(|_| {
1225        AppError::LimitExceeded(format!(
1226            "allocation of {files_cap} slot metadata entries would exceed available memory"
1227        ))
1228    })?;
1229    let mut process_items: Vec<ProcessItem> = Vec::new();
1230    process_items.try_reserve(files_cap).map_err(|_| {
1231        AppError::LimitExceeded(format!(
1232            "allocation of {files_cap} process items would exceed available memory"
1233        ))
1234    })?;
1235    let mut truncations: Vec<(String, String)> = Vec::new();
1236    truncations.try_reserve(files_cap).map_err(|_| {
1237        AppError::LimitExceeded(format!(
1238            "allocation of {files_cap} truncation entries would exceed available memory"
1239        ))
1240    })?;
1241
1242    let max_name_length = args.max_name_length;
1243    for path in &files {
1244        let file_str = path.to_string_lossy().into_owned();
1245        let (derived_base, name_truncated, original_name) =
1246            derive_kebab_name(path, max_name_length);
1247        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1248
1249        if name_truncated {
1250            if let Some(ref orig) = original_name {
1251                truncations.push((orig.clone(), derived_base.clone()));
1252            }
1253        }
1254
1255        if derived_base.is_empty() {
1256            // original_filename: always include when it differs from the empty derived name
1257            let orig_filename = if !original_basename.is_empty() {
1258                Some(original_basename.to_string())
1259            } else {
1260                None
1261            };
1262            slots_meta.push(SlotMeta::Skip {
1263                file_str,
1264                derived_base: String::new(),
1265                name_truncated: false,
1266                original_name: None,
1267                original_filename: orig_filename,
1268                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1269            });
1270            continue;
1271        }
1272
1273        match unique_name(&derived_base, &taken_names) {
1274            Ok(derived_name) => {
1275                taken_names.insert(derived_name.clone());
1276                let idx = slots_meta.len();
1277                // original_filename: present only when the raw basename differs from the derived name
1278                let orig_filename = if original_basename != derived_name {
1279                    Some(original_basename.to_string())
1280                } else {
1281                    None
1282                };
1283                process_items.push(ProcessItem {
1284                    idx,
1285                    path: path.clone(),
1286                    file_str: file_str.clone(),
1287                    derived_name: derived_name.clone(),
1288                });
1289                slots_meta.push(SlotMeta::Process {
1290                    file_str,
1291                    derived_name,
1292                    name_truncated,
1293                    original_name,
1294                    original_filename: orig_filename,
1295                });
1296            }
1297            Err(e) => {
1298                let orig_filename = if original_basename != derived_base {
1299                    Some(original_basename.to_string())
1300                } else {
1301                    None
1302                };
1303                slots_meta.push(SlotMeta::Skip {
1304                    file_str,
1305                    derived_base,
1306                    name_truncated,
1307                    original_name,
1308                    original_filename: orig_filename,
1309                    reason: e.to_string(),
1310                });
1311            }
1312        }
1313    }
1314
1315    if !truncations.is_empty() {
1316        tracing::info!(
1317            target: "ingest",
1318            count = truncations.len(),
1319            max_name_length = max_name_length,
1320            max_len = DERIVED_NAME_MAX_LEN,
1321            "derived names truncated; pass -vv (debug) for per-file detail"
1322        );
1323    }
1324
1325    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1326    if args.dry_run {
1327        for meta in &slots_meta {
1328            match meta {
1329                SlotMeta::Skip {
1330                    file_str,
1331                    derived_base,
1332                    name_truncated,
1333                    original_name,
1334                    original_filename,
1335                    reason,
1336                } => {
1337                    output::emit_json_compact(&IngestFileEvent {
1338                        file: file_str,
1339                        name: derived_base,
1340                        status: "skip",
1341                        truncated: *name_truncated,
1342                        original_name: original_name.clone(),
1343                        original_filename: original_filename.as_deref(),
1344                        error: Some(reason.clone()),
1345                        memory_id: None,
1346                        action: None,
1347                        body_length: 0,
1348                        backend_invoked: None,
1349                    })?;
1350                }
1351                SlotMeta::Process {
1352                    file_str,
1353                    derived_name,
1354                    name_truncated,
1355                    original_name,
1356                    original_filename,
1357                } => {
1358                    output::emit_json_compact(&IngestFileEvent {
1359                        file: file_str,
1360                        name: derived_name,
1361                        status: "preview",
1362                        truncated: *name_truncated,
1363                        original_name: original_name.clone(),
1364                        original_filename: original_filename.as_deref(),
1365                        error: None,
1366                        memory_id: None,
1367                        action: None,
1368                        body_length: 0,
1369                        backend_invoked: None,
1370                    })?;
1371                }
1372            }
1373        }
1374        output::emit_json_compact(&IngestSummary {
1375            summary: true,
1376            dir: args.dir.to_string_lossy().into_owned(),
1377            pattern: args.pattern.clone(),
1378            recursive: args.recursive,
1379            files_total: total,
1380            files_succeeded: 0,
1381            files_failed: 0,
1382            files_skipped: 0,
1383            elapsed_ms: started.elapsed().as_millis() as u64,
1384        })?;
1385        return Ok(());
1386    }
1387
1388    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1389    if args.low_memory {
1390        if let Some(n) = args.ingest_parallelism {
1391            if n > 1 {
1392                return Err(AppError::Validation(
1393                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1394                        .to_string(),
1395                ));
1396            }
1397        }
1398    }
1399
1400    // Determine rayon thread pool size, honoring --low-memory and the
1401    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1402    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1403
1404    let pool = rayon::ThreadPoolBuilder::new()
1405        .num_threads(parallelism)
1406        .build()
1407        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1408
1409    if args.enable_ner && args.skip_extraction {
1410        return Err(AppError::Validation(
1411            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1412        ));
1413    }
1414    if args.skip_extraction && !args.enable_ner {
1415        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1416        // commit (9ddb17b) promoted this to a hard validation error, which
1417        // broke the "kept as a hidden no-op for backwards compatibility"
1418        // promise documented in CHANGELOG v1.0.45 and started failing
1419        // 5+ CI jobs whose E2E tests use this flag to skip the
1420        // GLiNER-ONNX model download in CI environments.
1421        tracing::warn!(
1422            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1423        );
1424    }
1425    let enable_ner = args.enable_ner;
1426    let auto_describe = args.auto_describe && !args.no_auto_describe;
1427    let max_rss_mb = args.max_rss_mb;
1428    let llm_parallelism = args.llm_parallelism as usize;
1429    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1430    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1431    // callers do not silently expect NER-quality extraction.
1432    if args.mode == IngestMode::Gliner {
1433        tracing::warn!(
1434            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1435        );
1436    }
1437    if args.gliner_variant != "fp32" {
1438        tracing::warn!(
1439            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1440        );
1441    }
1442    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1443        "int8" => crate::extraction::GlinerVariant::Int8,
1444        _ => crate::extraction::GlinerVariant::Fp32,
1445    };
1446
1447    let total_to_process = process_items.len();
1448    tracing::info!(
1449        target: "ingest",
1450        phase = "pipeline_start",
1451        files = total_to_process,
1452        ingest_parallelism = parallelism,
1453        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1454    );
1455
1456    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1457    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1458    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1459    let channel_bound = (parallelism * 2).max(1);
1460    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1461
1462    // Phase A: launched in a dedicated OS thread so the main thread can consume
1463    // the channel concurrently. pool.install() blocks the calling thread until
1464    // all rayon workers finish — if called on the main thread it would
1465    // reintroduce the 2-phase blocking behaviour we are eliminating.
1466    let paths_owned = paths.clone();
1467    let llm_backend_owned = llm_backend;
1468    let producer_handle = std::thread::spawn(move || {
1469        pool.install(|| {
1470            process_items.into_par_iter().for_each(|item| {
1471                if crate::shutdown_requested() {
1472                    return;
1473                }
1474                let t0 = std::time::Instant::now();
1475                let result = stage_file(
1476                    item.idx,
1477                    &item.path,
1478                    &item.derived_name,
1479                    &paths_owned,
1480                    enable_ner,
1481                    gliner_variant,
1482                    max_rss_mb,
1483                    llm_parallelism,
1484                    llm_backend_owned,
1485                    auto_describe,
1486                );
1487                let elapsed_ms = t0.elapsed().as_millis() as u64;
1488
1489                // Emit NDJSON progress event to stderr so the user sees work
1490                // happening during long NER runs (e.g. 50 files × 27s each).
1491                let (n_entities, n_relationships) = match &result {
1492                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1493                    Err(_) => (0, 0),
1494                };
1495                let progress = StageProgressEvent {
1496                    schema_version: 1,
1497                    event: "file_extracted",
1498                    path: &item.file_str,
1499                    ms: elapsed_ms,
1500                    entities: n_entities,
1501                    relationships: n_relationships,
1502                };
1503                if let Ok(line) = serde_json::to_string(&progress) {
1504                    tracing::info!(target: "ingest_progress", "{}", line);
1505                }
1506
1507                // Blocking send applies backpressure: if Phase B is slower,
1508                // Phase A workers wait here instead of accumulating staged files
1509                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1510                let _ = tx.send((item.idx, result));
1511            });
1512            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1513            drop(tx);
1514        });
1515    });
1516
1517    // Phase B: main thread persists files as results arrive from the channel.
1518    // Results arrive in completion order (par_iter is unordered). We persist
1519    // each file immediately on arrival — this is the key fix for B1: with the
1520    // old 2-phase design the first DB write happened only after ALL files had
1521    // finished Phase A. Now the first commit happens as soon as the first file
1522    // completes Phase A, regardless of how many files remain.
1523    //
1524    // NDJSON output order follows completion order (not file-system sort order).
1525    // Skip slots are emitted at the end, after all Process results are consumed.
1526    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1527    // requirement than ensuring data is persisted before the user's timeout fires.
1528    let fail_fast = args.fail_fast;
1529
1530    // Emit pending Skip events first so agents see them early.
1531    for meta in &slots_meta {
1532        if let SlotMeta::Skip {
1533            file_str,
1534            derived_base,
1535            name_truncated,
1536            original_name,
1537            original_filename,
1538            reason,
1539        } = meta
1540        {
1541            output::emit_json_compact(&IngestFileEvent {
1542                file: file_str,
1543                name: derived_base,
1544                status: "skipped",
1545                truncated: *name_truncated,
1546                original_name: original_name.clone(),
1547                original_filename: original_filename.as_deref(),
1548                error: Some(reason.clone()),
1549                memory_id: None,
1550                action: None,
1551                body_length: 0,
1552                backend_invoked: None,
1553            })?;
1554            skipped += 1;
1555        }
1556    }
1557
1558    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1559    // as channel messages arrive in completion order.
1560    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1561        .iter()
1562        .enumerate()
1563        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1564        .collect();
1565
1566    tracing::info!(
1567        target: "ingest",
1568        phase = "persist_start",
1569        files = total_to_process,
1570        "phase B starting: persisting files incrementally as Phase A completes each one",
1571    );
1572
1573    // Drain channel and persist each file immediately — no accumulation into a
1574    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1575    // Phase B without applying backpressure.
1576    for (idx, stage_result) in rx {
1577        if crate::shutdown_requested() {
1578            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1579            break;
1580        }
1581        let meta = meta_index.get(&idx).ok_or_else(|| {
1582            AppError::Internal(anyhow::anyhow!(
1583                "channel idx {idx} has no corresponding Process slot"
1584            ))
1585        })?;
1586        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1587        {
1588            SlotMeta::Process {
1589                file_str,
1590                derived_name,
1591                name_truncated,
1592                original_name,
1593                original_filename,
1594            } => (
1595                file_str,
1596                derived_name,
1597                name_truncated,
1598                original_name,
1599                original_filename,
1600            ),
1601            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1602        };
1603
1604        // If storage init failed, every file fails with the same error.
1605        let conn = match conn_or_err.as_mut() {
1606            Ok(c) => c,
1607            Err(err_msg) => {
1608                let err_clone = err_msg.clone();
1609                output::emit_json_compact(&IngestFileEvent {
1610                    file: file_str,
1611                    name: derived_name,
1612                    status: "failed",
1613                    truncated: *name_truncated,
1614                    original_name: original_name.clone(),
1615                    original_filename: original_filename.as_deref(),
1616                    error: Some(err_clone.clone()),
1617                    memory_id: None,
1618                    action: None,
1619                    body_length: 0,
1620                    backend_invoked: None,
1621                })?;
1622                failed += 1;
1623                if fail_fast {
1624                    output::emit_json_compact(&IngestSummary {
1625                        summary: true,
1626                        dir: args.dir.display().to_string(),
1627                        pattern: args.pattern.clone(),
1628                        recursive: args.recursive,
1629                        files_total: total,
1630                        files_succeeded: succeeded,
1631                        files_failed: failed,
1632                        files_skipped: skipped,
1633                        elapsed_ms: started.elapsed().as_millis() as u64,
1634                    })?;
1635                    return Err(AppError::Validation(format!(
1636                        "ingest aborted on first failure: {err_clone}"
1637                    )));
1638                }
1639                continue;
1640            }
1641        };
1642
1643        let outcome =
1644            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1645
1646        match outcome {
1647            Ok(FileSuccess {
1648                memory_id,
1649                action,
1650                body_length,
1651                backend_invoked: file_backend_invoked,
1652            }) => {
1653                output::emit_json_compact(&IngestFileEvent {
1654                    file: file_str,
1655                    name: derived_name,
1656                    status: "indexed",
1657                    truncated: *name_truncated,
1658                    original_name: original_name.clone(),
1659                    original_filename: original_filename.as_deref(),
1660                    error: None,
1661                    memory_id: Some(memory_id),
1662                    action: Some(action),
1663                    body_length,
1664                    backend_invoked: file_backend_invoked,
1665                })?;
1666                succeeded += 1;
1667            }
1668            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1669                output::emit_json_compact(&IngestFileEvent {
1670                    file: file_str,
1671                    name: derived_name,
1672                    status: "skipped",
1673                    truncated: *name_truncated,
1674                    original_name: original_name.clone(),
1675                    original_filename: original_filename.as_deref(),
1676                    error: Some(format!("{e}")),
1677                    memory_id: None,
1678                    action: Some("duplicate".to_string()),
1679                    body_length: 0,
1680                    backend_invoked: None,
1681                })?;
1682                skipped += 1;
1683            }
1684            Err(e) => {
1685                let err_msg = format!("{e}");
1686                output::emit_json_compact(&IngestFileEvent {
1687                    file: file_str,
1688                    name: derived_name,
1689                    status: "failed",
1690                    truncated: *name_truncated,
1691                    original_name: original_name.clone(),
1692                    original_filename: original_filename.as_deref(),
1693                    error: Some(err_msg.clone()),
1694                    memory_id: None,
1695                    action: None,
1696                    body_length: 0,
1697                    backend_invoked: None,
1698                })?;
1699                failed += 1;
1700                if fail_fast {
1701                    output::emit_json_compact(&IngestSummary {
1702                        summary: true,
1703                        dir: args.dir.display().to_string(),
1704                        pattern: args.pattern.clone(),
1705                        recursive: args.recursive,
1706                        files_total: total,
1707                        files_succeeded: succeeded,
1708                        files_failed: failed,
1709                        files_skipped: skipped,
1710                        elapsed_ms: started.elapsed().as_millis() as u64,
1711                    })?;
1712                    return Err(AppError::Validation(format!(
1713                        "ingest aborted on first failure: {err_msg}"
1714                    )));
1715                }
1716            }
1717        }
1718    }
1719
1720    // Wait for the producer thread to finish cleanly.
1721    producer_handle
1722        .join()
1723        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1724
1725    if let Ok(ref conn) = conn_or_err {
1726        if succeeded > 0 {
1727            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1728        }
1729    }
1730
1731    output::emit_json_compact(&IngestSummary {
1732        summary: true,
1733        dir: args.dir.display().to_string(),
1734        pattern: args.pattern.clone(),
1735        recursive: args.recursive,
1736        files_total: total,
1737        files_succeeded: succeeded,
1738        files_failed: failed,
1739        files_skipped: skipped,
1740        elapsed_ms: started.elapsed().as_millis() as u64,
1741    })?;
1742
1743    Ok(())
1744}
1745
1746/// Auto-initialises the database (matches the contract of every other CRUD
1747/// handler) and returns a fresh read/write connection ready for the ingest
1748/// loop. Errors here are recoverable per-file: the caller surfaces them as
1749/// failure events so `--fail-fast` and the continue-on-error path keep
1750/// working when, for example, the user points `--db` at an unwritable path.
1751fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1752    ensure_db_ready(paths)?;
1753    let conn = open_rw(&paths.db)?;
1754    Ok(conn)
1755}
1756
1757pub(crate) fn collect_files(
1758    dir: &Path,
1759    pattern: &str,
1760    recursive: bool,
1761    out: &mut Vec<PathBuf>,
1762) -> Result<(), AppError> {
1763    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1764    for entry in entries {
1765        let entry = entry.map_err(AppError::Io)?;
1766        let path = entry.path();
1767        let file_type = entry.file_type().map_err(AppError::Io)?;
1768        if file_type.is_file() {
1769            let name = entry.file_name();
1770            let name_str = name.to_string_lossy();
1771            if matches_pattern(&name_str, pattern) {
1772                out.push(path);
1773            }
1774        } else if file_type.is_dir() && recursive {
1775            collect_files(&path, pattern, recursive, out)?;
1776        }
1777    }
1778    Ok(())
1779}
1780
1781fn matches_pattern(name: &str, pattern: &str) -> bool {
1782    if let Some(suffix) = pattern.strip_prefix('*') {
1783        name.ends_with(suffix)
1784    } else if let Some(prefix) = pattern.strip_suffix('*') {
1785        name.starts_with(prefix)
1786    } else {
1787        name == pattern
1788    }
1789}
1790
1791/// Returns `(final_name, truncated, original_name)`.
1792/// `truncated` is true when the derived name exceeded `max_len`.
1793/// `original_name` holds the pre-truncation name only when `truncated=true`.
1794///
1795/// Non-ASCII characters are first decomposed via NFD and then stripped of
1796/// combining marks so accented letters fold to their base ASCII letter
1797/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1798/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1799/// preserves meaningful word content rather than collapsing the basename
1800/// to a few stray ASCII letters as the previous filter did.
1801pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1802    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1803    let lowered: String = stem
1804        .nfd()
1805        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1806        .map(|c| {
1807            if c == '_' || c.is_whitespace() {
1808                '-'
1809            } else {
1810                c
1811            }
1812        })
1813        .map(|c| c.to_ascii_lowercase())
1814        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1815        .collect();
1816    let collapsed = collapse_dashes(&lowered);
1817    let trimmed_raw = collapsed.trim_matches('-').to_string();
1818    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1819    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1820        format!("doc-{trimmed_raw}")
1821    } else {
1822        trimmed_raw
1823    };
1824    if trimmed.len() > max_len {
1825        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1826        tracing::debug!(
1827            target: "ingest",
1828            original = %trimmed,
1829            truncated_to = %truncated,
1830            max_len = max_len,
1831            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1832        );
1833        (truncated, true, Some(trimmed))
1834    } else {
1835        (trimmed, false, None)
1836    }
1837}
1838
1839/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1840/// numeric suffix (`-1`, `-2`, …) when needed.
1841///
1842/// `taken` is the set of names already consumed in the current ingest run.
1843/// The caller is expected to insert the returned name into `taken` so the
1844/// next call observes the consumption. Cross-run collisions are intentionally
1845/// surfaced by the per-file persistence path as duplicates so re-ingestion
1846/// of identical corpora stays idempotent.
1847///
1848/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1849/// candidates collide, signalling a pathological corpus that should be
1850/// renamed manually.
1851fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1852    if !taken.contains(base) {
1853        return Ok(base.to_string());
1854    }
1855    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1856        let candidate = format!("{base}-{suffix}");
1857        if !taken.contains(&candidate) {
1858            tracing::warn!(
1859                target: "ingest",
1860                base = %base,
1861                resolved = %candidate,
1862                suffix,
1863                "memory name collision resolved with numeric suffix"
1864            );
1865            return Ok(candidate);
1866        }
1867    }
1868    Err(AppError::Validation(format!(
1869        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1870    )))
1871}
1872
1873fn collapse_dashes(s: &str) -> String {
1874    let mut out = String::with_capacity(s.len());
1875    let mut prev_dash = false;
1876    for c in s.chars() {
1877        if c == '-' {
1878            if !prev_dash {
1879                out.push('-');
1880            }
1881            prev_dash = true;
1882        } else {
1883            out.push(c);
1884            prev_dash = false;
1885        }
1886    }
1887    out
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892    use super::*;
1893    use std::path::PathBuf;
1894
1895    #[test]
1896    fn matches_pattern_suffix() {
1897        assert!(matches_pattern("foo.md", "*.md"));
1898        assert!(!matches_pattern("foo.txt", "*.md"));
1899        assert!(matches_pattern("foo.md", "*"));
1900    }
1901
1902    #[test]
1903    fn matches_pattern_prefix() {
1904        assert!(matches_pattern("README.md", "README*"));
1905        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1906    }
1907
1908    #[test]
1909    fn matches_pattern_exact() {
1910        assert!(matches_pattern("README.md", "README.md"));
1911        assert!(!matches_pattern("readme.md", "README.md"));
1912    }
1913
1914    #[test]
1915    fn derive_kebab_underscore_to_dash() {
1916        let p = PathBuf::from("/tmp/claude_code_headless.md");
1917        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1918        assert_eq!(name, "claude-code-headless");
1919        assert!(!truncated);
1920        assert!(original.is_none());
1921    }
1922
1923    #[test]
1924    fn derive_kebab_uppercase_lowered() {
1925        let p = PathBuf::from("/tmp/README.md");
1926        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1927        assert_eq!(name, "readme");
1928        assert!(!truncated);
1929        assert!(original.is_none());
1930    }
1931
1932    #[test]
1933    fn derive_kebab_strips_non_kebab_chars() {
1934        let p = PathBuf::from("/tmp/some@weird#name!.md");
1935        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1936        assert_eq!(name, "someweirdname");
1937        assert!(!truncated);
1938        assert!(original.is_none());
1939    }
1940
1941    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1942    // accented characters instead of dropping them entirely.
1943    #[test]
1944    fn derive_kebab_folds_accented_letters_to_ascii() {
1945        let p = PathBuf::from("/tmp/açaí.md");
1946        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1947        assert_eq!(name, "acai", "got '{name}'");
1948    }
1949
1950    #[test]
1951    fn derive_kebab_handles_naive_with_diaeresis() {
1952        let p = PathBuf::from("/tmp/naïve-test.md");
1953        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1954        assert_eq!(name, "naive-test", "got '{name}'");
1955    }
1956
1957    #[test]
1958    fn derive_kebab_drops_emoji_keeps_word() {
1959        let p = PathBuf::from("/tmp/🚀-rocket.md");
1960        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1961        assert_eq!(name, "rocket", "got '{name}'");
1962    }
1963
1964    #[test]
1965    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1966        let p = PathBuf::from("/tmp/açaí🦜.md");
1967        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1968        assert_eq!(name, "acai", "got '{name}'");
1969    }
1970
1971    #[test]
1972    fn derive_kebab_pure_emoji_yields_empty() {
1973        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1974        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1975        assert!(name.is_empty(), "got '{name}'");
1976    }
1977
1978    #[test]
1979    fn derive_kebab_collapses_consecutive_dashes() {
1980        let p = PathBuf::from("/tmp/a__b___c.md");
1981        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1982        assert_eq!(name, "a-b-c");
1983        assert!(!truncated);
1984        assert!(original.is_none());
1985    }
1986
1987    #[test]
1988    fn derive_kebab_truncates_to_60_chars() {
1989        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1990        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1991        assert!(name.len() <= 60, "got len {}", name.len());
1992        assert!(truncated);
1993        assert!(original.is_some());
1994        assert!(original.unwrap().len() > 60);
1995    }
1996
1997    #[test]
1998    fn collect_files_finds_md_files() {
1999        let tmp = tempfile::tempdir().expect("tempdir");
2000        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2001        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2002        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2003        let mut out = Vec::new();
2004        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2005        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2006    }
2007
2008    #[test]
2009    fn collect_files_recursive_descends_subdirs() {
2010        let tmp = tempfile::tempdir().expect("tempdir");
2011        let sub = tmp.path().join("sub");
2012        std::fs::create_dir(&sub).unwrap();
2013        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2014        std::fs::write(sub.join("b.md"), "y").unwrap();
2015        let mut out = Vec::new();
2016        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2017        assert_eq!(out.len(), 2);
2018    }
2019
2020    #[test]
2021    fn collect_files_non_recursive_skips_subdirs() {
2022        let tmp = tempfile::tempdir().expect("tempdir");
2023        let sub = tmp.path().join("sub");
2024        std::fs::create_dir(&sub).unwrap();
2025        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2026        std::fs::write(sub.join("b.md"), "y").unwrap();
2027        let mut out = Vec::new();
2028        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2029        assert_eq!(out.len(), 1);
2030    }
2031
2032    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
2033
2034    #[test]
2035    fn derive_kebab_long_basename_truncated_within_cap() {
2036        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2037        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2038        assert!(
2039            name.len() <= DERIVED_NAME_MAX_LEN,
2040            "truncated name must respect cap; got {} chars",
2041            name.len()
2042        );
2043        assert!(!name.is_empty());
2044        assert!(truncated);
2045        assert!(original.is_some());
2046    }
2047
2048    #[test]
2049    fn unique_name_returns_base_when_free() {
2050        let taken: BTreeSet<String> = BTreeSet::new();
2051        let resolved = unique_name("note", &taken).expect("must resolve");
2052        assert_eq!(resolved, "note");
2053    }
2054
2055    #[test]
2056    fn unique_name_appends_first_free_suffix_on_collision() {
2057        let mut taken: BTreeSet<String> = BTreeSet::new();
2058        taken.insert("note".to_string());
2059        taken.insert("note-1".to_string());
2060        let resolved = unique_name("note", &taken).expect("must resolve");
2061        assert_eq!(resolved, "note-2");
2062    }
2063
2064    #[test]
2065    fn unique_name_errors_after_collision_cap() {
2066        let mut taken: BTreeSet<String> = BTreeSet::new();
2067        taken.insert("note".to_string());
2068        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2069            taken.insert(format!("note-{i}"));
2070        }
2071        let err = unique_name("note", &taken).expect_err("must surface error");
2072        assert!(matches!(err, AppError::Validation(_)));
2073    }
2074
2075    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
2076
2077    #[test]
2078    fn validate_relation_format_accepts_valid_relations() {
2079        use crate::parsers::{is_canonical_relation, validate_relation_format};
2080        assert!(validate_relation_format("applies_to").is_ok());
2081        assert!(validate_relation_format("depends_on").is_ok());
2082        assert!(validate_relation_format("implements").is_ok());
2083        assert!(validate_relation_format("").is_err());
2084        assert!(is_canonical_relation("applies_to"));
2085        assert!(!is_canonical_relation("implements"));
2086    }
2087
2088    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
2089
2090    use serial_test::serial;
2091
2092    /// Helper: scrubs the env var around a closure to keep tests deterministic.
2093    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2094        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2095        let prev = std::env::var(key).ok();
2096        match value {
2097            Some(v) => std::env::set_var(key, v),
2098            None => std::env::remove_var(key),
2099        }
2100        f();
2101        match prev {
2102            Some(p) => std::env::set_var(key, p),
2103            None => std::env::remove_var(key),
2104        }
2105    }
2106
2107    #[test]
2108    #[serial]
2109    fn env_low_memory_enabled_unset_returns_false() {
2110        with_env_var(None, || assert!(!env_low_memory_enabled()));
2111    }
2112
2113    #[test]
2114    #[serial]
2115    fn env_low_memory_enabled_empty_returns_false() {
2116        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2117    }
2118
2119    #[test]
2120    #[serial]
2121    fn env_low_memory_enabled_truthy_values_return_true() {
2122        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2123            with_env_var(Some(v), || {
2124                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2125            });
2126        }
2127    }
2128
2129    #[test]
2130    #[serial]
2131    fn env_low_memory_enabled_falsy_values_return_false() {
2132        for v in ["0", "false", "FALSE", "no", "off"] {
2133            with_env_var(Some(v), || {
2134                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2135            });
2136        }
2137    }
2138
2139    #[test]
2140    #[serial]
2141    fn env_low_memory_enabled_unrecognized_value_returns_false() {
2142        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2143    }
2144
2145    #[test]
2146    #[serial]
2147    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2148        with_env_var(None, || {
2149            assert_eq!(resolve_parallelism(true, Some(4)), 1);
2150            assert_eq!(resolve_parallelism(true, Some(8)), 1);
2151            assert_eq!(resolve_parallelism(true, None), 1);
2152        });
2153    }
2154
2155    #[test]
2156    #[serial]
2157    fn resolve_parallelism_env_forces_one_when_flag_off() {
2158        with_env_var(Some("1"), || {
2159            assert_eq!(resolve_parallelism(false, Some(4)), 1);
2160            assert_eq!(resolve_parallelism(false, None), 1);
2161        });
2162    }
2163
2164    #[test]
2165    #[serial]
2166    fn resolve_parallelism_falsy_env_does_not_override() {
2167        with_env_var(Some("0"), || {
2168            assert_eq!(resolve_parallelism(false, Some(4)), 4);
2169        });
2170    }
2171
2172    #[test]
2173    #[serial]
2174    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2175        with_env_var(None, || {
2176            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2177            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2178        });
2179    }
2180
2181    #[test]
2182    #[serial]
2183    fn resolve_parallelism_default_when_unset() {
2184        with_env_var(None, || {
2185            let p = resolve_parallelism(false, None);
2186            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2187        });
2188    }
2189
2190    #[test]
2191    fn ingest_args_parses_low_memory_flag_via_clap() {
2192        use clap::Parser;
2193        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2194        // on the public `Cli` definition so the flag is wired end-to-end.
2195        let cli = crate::cli::Cli::try_parse_from([
2196            "sqlite-graphrag",
2197            "ingest",
2198            "/tmp/dummy",
2199            "--type",
2200            "document",
2201            "--low-memory",
2202        ])
2203        .expect("parse must succeed");
2204        match cli.command {
2205            Some(crate::cli::Commands::Ingest(args)) => {
2206                assert!(args.low_memory, "--low-memory must set field to true");
2207            }
2208            _ => panic!("expected Ingest subcommand"),
2209        }
2210    }
2211
2212    #[test]
2213    fn ingest_args_low_memory_defaults_false() {
2214        use clap::Parser;
2215        let cli = crate::cli::Cli::try_parse_from([
2216            "sqlite-graphrag",
2217            "ingest",
2218            "/tmp/dummy",
2219            "--type",
2220            "document",
2221        ])
2222        .expect("parse must succeed");
2223        match cli.command {
2224            Some(crate::cli::Commands::Ingest(args)) => {
2225                assert!(!args.low_memory, "default must be false");
2226            }
2227            _ => panic!("expected Ingest subcommand"),
2228        }
2229    }
2230}