Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118    )]
119    pub enable_ner: bool,
120
121    /// GAP-E2E-011: gera description heurística a partir da primeira linha
122    /// significativa do body, em vez de "ingested from `<path>`". Quando
123    /// `--no-auto-describe` é passado, mantém o comportamento legado.
124    #[arg(
125        long,
126        default_value_t = true,
127        overrides_with = "no_auto_describe",
128        help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129    )]
130    pub auto_describe: bool,
131    #[arg(
132        long = "no-auto-describe",
133        default_value_t = false,
134        help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135    )]
136    pub no_auto_describe: bool,
137    #[arg(
138        long,
139        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140        default_value = "fp32",
141        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142    )]
143    pub gliner_variant: String,
144
145    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
146    #[arg(long, default_value_t = false, hide = true)]
147    pub skip_extraction: bool,
148
149    /// Stop on first per-file error instead of continuing with the next file.
150    #[arg(long, default_value_t = false)]
151    pub fail_fast: bool,
152
153    /// Preview file-to-name mapping without loading model or persisting.
154    #[arg(long, default_value_t = false)]
155    pub dry_run: bool,
156
157    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
158    #[arg(long, default_value_t = 10_000)]
159    pub max_files: usize,
160
161    /// Namespace for the ingested memories.
162    #[arg(long)]
163    pub namespace: Option<String>,
164
165    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
166    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167    pub db: Option<String>,
168
169    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170    pub format: JsonOutputFormat,
171
172    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173    pub json: bool,
174
175    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
176    #[arg(
177        long,
178        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179    )]
180    pub ingest_parallelism: Option<usize>,
181
182    /// Force single-threaded ingest to reduce RSS pressure.
183    ///
184    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
185    /// explicit value. Recommended for environments with <4 GB available
186    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
187    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
188    /// (CLI flag has higher precedence than the env var).
189    #[arg(
190        long,
191        default_value_t = false,
192        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193                Recommended for environments with <4 GB available RAM or container/cgroup \
194                constraints. Trade-off: 3-4x longer wall time. Also honored via \
195                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196    )]
197    pub low_memory: bool,
198
199    /// Maximum process RSS in MiB; abort if exceeded during embedding.
200    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202    pub max_rss_mb: u64,
203
204    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
205    /// PER FILE. Multiplies with --ingest-parallelism (files staged
206    /// concurrently), hence the conservative default of 2. The effective
207    /// value is further bounded by CPU count and available RAM.
208    #[arg(long, default_value_t = 2, value_name = "N",
209          value_parser = clap::value_parser!(u64).range(1..=32),
210          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211    pub llm_parallelism: u64,
212
213    /// Maximum character length for derived memory names from file basenames.
214    ///
215    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
216    /// Shorter values leave more headroom for collision suffix resolution.
217    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218          help = "Maximum length for derived memory names (default: 60)")]
219    pub max_name_length: usize,
220
221    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
222    #[arg(long, value_enum, default_value_t = IngestMode::None)]
223    pub mode: IngestMode,
224
225    /// Explicit path to the Claude Code binary (only with --mode claude-code).
226    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227    pub claude_binary: Option<std::path::PathBuf>,
228
229    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
230    #[arg(long)]
231    pub claude_model: Option<String>,
232
233    /// Resume a previously interrupted claude-code ingest from the queue DB.
234    #[arg(long, default_value_t = false)]
235    pub resume: bool,
236
237    /// Retry only failed files from a previous claude-code ingest.
238    #[arg(long, default_value_t = false)]
239    pub retry_failed: bool,
240
241    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
242    #[arg(long, default_value_t = false)]
243    pub keep_queue: bool,
244
245    /// Custom path for the claude-code ingest queue database.
246    #[arg(long, default_value = ".ingest-queue.sqlite")]
247    pub queue_db: String,
248
249    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
250    #[arg(long, default_value_t = 60)]
251    pub rate_limit_wait: u64,
252
253    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
254    #[arg(long)]
255    pub max_cost_usd: Option<f64>,
256
257    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
258    #[arg(
259        long,
260        default_value_t = 300,
261        help = "Timeout in seconds for each claude -p invocation (default: 300)"
262    )]
263    pub claude_timeout: u64,
264
265    /// Explicit path to the Codex CLI binary (only with --mode codex).
266    #[arg(
267        long,
268        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270    )]
271    pub codex_binary: Option<PathBuf>,
272
273    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
274    #[arg(
275        long,
276        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277    )]
278    pub codex_model: Option<String>,
279
280    /// Timeout in seconds for each codex exec invocation.
281    #[arg(
282        long,
283        default_value_t = 300,
284        help = "Timeout in seconds for each codex exec invocation (default: 300)"
285    )]
286    pub codex_timeout: u64,
287
288    /// Path to the `opencode` binary (override PATH lookup, only with --mode opencode).
289    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290    pub opencode_binary: Option<PathBuf>,
291
292    /// Model override for OpenCode extraction.
293    #[arg(
294        long,
295        value_name = "MODEL",
296        env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297        help = "Model override for OpenCode extraction"
298    )]
299    pub opencode_model: Option<String>,
300
301    /// Timeout in seconds for each opencode run invocation.
302    #[arg(
303        long,
304        value_name = "SECONDS",
305        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306        default_value_t = 300,
307        help = "Timeout in seconds for each opencode run invocation (default: 300)"
308    )]
309    pub opencode_timeout: u64,
310
311    /// G30: poll for the job singleton every second for up to N seconds
312    /// when another invocation holds the lock. Default: 0 (fail fast).
313    #[arg(long, value_name = "SECONDS")]
314    pub wait_job_singleton: Option<u64>,
315
316    /// G30: force acquisition of the singleton lock by removing a stale
317    /// lock file from a previously crashed invocation.
318    #[arg(long, default_value_t = false)]
319    pub force_job_singleton: bool,
320
321    /// v1.0.93 (GAP-OR-INGEST): run `enrich --operation memory-bindings`
322    /// after all files are embedded, using the active `--llm-backend`.
323    #[arg(
324        long,
325        default_value_t = false,
326        help = "Run enrich --operation memory-bindings after all files are ingested"
327    )]
328    pub enrich_after: bool,
329}
330
331/// Extraction mode for the ingest pipeline.
332#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
333pub enum IngestMode {
334    /// Body-only ingestion without entity/relationship extraction (default).
335    None,
336    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
337    Gliner,
338    /// LLM-curated extraction via locally installed Claude Code CLI.
339    ClaudeCode,
340    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
341    Codex,
342    /// LLM-curated extraction via locally installed OpenCode CLI.
343    #[value(name = "opencode")]
344    Opencode,
345}
346
347/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
348/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
349/// values evaluate to false. Unrecognized non-empty values emit a
350/// `tracing::warn!` and evaluate to false.
351fn env_low_memory_enabled() -> bool {
352    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
353        Ok(v) if v.is_empty() => false,
354        Ok(v) => match v.to_lowercase().as_str() {
355            "1" | "true" | "yes" | "on" => true,
356            "0" | "false" | "no" | "off" => false,
357            other => {
358                tracing::warn!(
359                    target: "ingest",
360                    value = %other,
361                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
362                );
363                false
364            }
365        },
366        Err(_) => false,
367    }
368}
369
370/// Resolves the effective ingest parallelism honoring `--low-memory` and the
371/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
372///
373/// Precedence:
374/// 1. `--low-memory` CLI flag forces parallelism = 1.
375/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
376/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
377/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
378///
379/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
380/// emits a `tracing::warn!` advertising the override.
381fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
382    let env_flag = env_low_memory_enabled();
383    let low_memory = low_memory_flag || env_flag;
384
385    if low_memory {
386        if let Some(n) = ingest_parallelism {
387            if n > 1 {
388                tracing::warn!(
389                    target: "ingest",
390                    requested = n,
391                    "--ingest-parallelism overridden by --low-memory; using 1"
392                );
393            }
394        }
395        if low_memory_flag {
396            tracing::info!(
397                target: "ingest",
398                source = "flag",
399                "low-memory mode enabled: forcing --ingest-parallelism 1"
400            );
401        } else {
402            tracing::info!(
403                target: "ingest",
404                source = "env",
405                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
406            );
407        }
408        return 1;
409    }
410
411    ingest_parallelism
412        .unwrap_or_else(|| {
413            std::thread::available_parallelism()
414                .map(|v| v.get() / 2)
415                .unwrap_or(1)
416                .clamp(1, 4)
417        })
418        .max(1)
419}
420
421#[derive(Serialize)]
422struct IngestFileEvent<'a> {
423    file: &'a str,
424    name: &'a str,
425    status: &'a str,
426    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
427    truncated: bool,
428    /// Original derived name before truncation; only present when `truncated=true`.
429    #[serde(skip_serializing_if = "Option::is_none")]
430    original_name: Option<String>,
431    /// Original file basename (without extension); only present when it differs from `name`.
432    #[serde(skip_serializing_if = "Option::is_none")]
433    original_filename: Option<&'a str>,
434    #[serde(skip_serializing_if = "Option::is_none")]
435    error: Option<String>,
436    #[serde(skip_serializing_if = "Option::is_none")]
437    memory_id: Option<i64>,
438    #[serde(skip_serializing_if = "Option::is_none")]
439    action: Option<String>,
440    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
441    body_length: usize,
442    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
443    /// executou o embedding live. `"claude" | "codex" | "none"`. Absent on
444    /// the wire when `None` (kept for happy-path envelope cleanliness, ou
445    /// quando o arquivo não chegou à fase de embed por duplicação/erro).
446    #[serde(skip_serializing_if = "Option::is_none")]
447    backend_invoked: Option<&'a str>,
448}
449
450#[derive(Serialize)]
451struct IngestSummary {
452    summary: bool,
453    dir: String,
454    pattern: String,
455    recursive: bool,
456    files_total: usize,
457    files_succeeded: usize,
458    files_failed: usize,
459    files_skipped: usize,
460    elapsed_ms: u64,
461}
462
463/// Outcome of a successful per-file ingest, used to build the NDJSON event.
464struct FileSuccess {
465    memory_id: i64,
466    action: String,
467    body_length: usize,
468    backend_invoked: Option<&'static str>,
469}
470
471/// NDJSON progress event emitted to stderr after each file completes Phase A.
472/// Schema version 1; consumers should check `schema_version` before parsing.
473#[derive(Serialize)]
474struct StageProgressEvent<'a> {
475    schema_version: u8,
476    event: &'a str,
477    path: &'a str,
478    ms: u64,
479    entities: usize,
480    relationships: usize,
481}
482
483/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
484/// Phase B persists these to SQLite on the main thread in submission order.
485struct StagedFile {
486    body: String,
487    body_hash: String,
488    snippet: String,
489    name: String,
490    description: String,
491    embedding: Option<Vec<f32>>,
492    chunk_embeddings: Option<Vec<Vec<f32>>>,
493    chunks_info: Vec<crate::chunking::Chunk>,
494    entities: Vec<NewEntity>,
495    relationships: Vec<NewRelationship>,
496    entity_embeddings: Option<Vec<Vec<f32>>>,
497    urls: Vec<crate::extraction::ExtractedUrl>,
498    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
499    /// executou o embedding do body. `None` quando o batch paralelo
500    /// embed_passages_parallel_local fallback em backends diferentes
501    /// entre chunks (não há um único discriminador estável).
502    backend_invoked: Option<&'static str>,
503}
504
505/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
506/// Never touches the database — safe to run on any rayon thread.
507// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
508// stage knobs into a struct is a wider refactor than the surgical
509// scope of v1.0.79 allows.
510#[allow(clippy::too_many_arguments)]
511fn stage_file(
512    _idx: usize,
513    path: &Path,
514    name: &str,
515    paths: &AppPaths,
516    enable_ner: bool,
517    gliner_variant: crate::extraction::GlinerVariant,
518    max_rss_mb: u64,
519    llm_parallelism: usize,
520    llm_backend: crate::cli::LlmBackendChoice,
521    embedding_backend: crate::cli::EmbeddingBackendChoice,
522    auto_describe: bool,
523) -> Result<StagedFile, AppError> {
524    use crate::constants::*;
525
526    if name.len() > MAX_MEMORY_NAME_LEN {
527        return Err(AppError::LimitExceeded(
528            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
529        ));
530    }
531    if name.starts_with("__") {
532        return Err(AppError::Validation(
533            crate::i18n::validation::reserved_name(),
534        ));
535    }
536    {
537        let slug_re = crate::constants::name_slug_regex();
538        if !slug_re.is_match(name) {
539            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
540                name,
541            )));
542        }
543    }
544
545    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
546    if file_size > MAX_MEMORY_BODY_LEN as u64 {
547        return Err(AppError::LimitExceeded(
548            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
549        ));
550    }
551    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
552    if raw_body.len() > MAX_MEMORY_BODY_LEN {
553        return Err(AppError::LimitExceeded(
554            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
555        ));
556    }
557    if raw_body.trim().is_empty() {
558        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
559    }
560
561    let description = if auto_describe {
562        crate::commands::ingest_heuristics::extract_heuristic_description(
563            &raw_body,
564            Some(&path.display().to_string()),
565        )
566    } else {
567        format!("ingested from {}", path.display())
568    };
569    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
570        return Err(AppError::Validation(
571            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
572        ));
573    }
574
575    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
576    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
577    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
578    if enable_ner {
579        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
580            Ok(extracted) => {
581                extracted_urls = extracted.urls;
582                // v1.0.76: ExtractionResult.entities is now
583                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
584                // via name + type only; start/end offsets are not
585                // carried forward into the storage layer.
586                extracted_entities = extracted
587                    .entities
588                    .into_iter()
589                    .map(|e| NewEntity {
590                        name: e.name,
591                        entity_type: crate::entity_type::EntityType::Concept,
592                        description: None,
593                    })
594                    .collect();
595                // v1.0.76: relationships are no longer in the
596                // ExtractionResult struct; the LLM backend returns
597                // them in its own payload. The default build is
598                // URL-only extraction.
599                extracted_relationships.clear();
600
601                if extracted_entities.len() > max_entities_per_memory() {
602                    extracted_entities.truncate(max_entities_per_memory());
603                }
604                if extracted_relationships.len() > max_relationships_per_memory() {
605                    extracted_relationships.truncate(max_relationships_per_memory());
606                }
607            }
608            Err(e) => {
609                tracing::warn!(
610                    target: "ingest",
611                    file = %path.display(),
612                    "auto-extraction failed (graceful degradation): {e:#}"
613                );
614            }
615        }
616    }
617
618    for rel in &mut extracted_relationships {
619        rel.relation = crate::parsers::normalize_relation(&rel.relation);
620        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
621            return Err(AppError::Validation(format!(
622                "{e} for relationship '{}' -> '{}'",
623                rel.source, rel.target
624            )));
625        }
626        crate::parsers::warn_if_non_canonical(&rel.relation);
627        if !(0.0..=1.0).contains(&rel.strength) {
628            return Err(AppError::Validation(format!(
629                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
630                rel.strength, rel.source, rel.target
631            )));
632        }
633    }
634
635    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
636    let snippet: String = raw_body.chars().take(200).collect();
637
638    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
639    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
640        return Err(AppError::LimitExceeded(format!(
641            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
642            chunks_info.len(),
643            REMEMBER_MAX_SAFE_MULTI_CHUNKS
644        )));
645    }
646
647    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
648    let skip_embed = crate::embedder::should_skip_embedding_on_failure();
649    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
650    // backend que efetivamente rodou para popular `backend_invoked` no
651    // envelope NDJSON por arquivo.
652    let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
653        .len()
654        == 1
655    {
656        match crate::embedder::embed_passage_with_embedding_choice(
657            &paths.models,
658            &raw_body,
659            embedding_backend,
660            llm_backend,
661        ) {
662            Ok((v, k)) => (Some(v), Some(k.as_str())),
663            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
664            Err(e) if skip_embed => {
665                tracing::warn!(error = %e, file = %path.display(), "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
666                (None, None)
667            }
668            Err(e) => return Err(e),
669        }
670    } else {
671        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
672        // serial per-chunk subprocess loop.
673        let chunk_texts: Vec<String> = chunks_info
674            .iter()
675            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
676            .collect();
677        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
678            if rss > max_rss_mb {
679                tracing::error!(
680                    target: "ingest",
681                    rss_mb = rss,
682                    max_rss_mb = max_rss_mb,
683                    file = %path.display(),
684                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
685                );
686                return Err(AppError::LowMemory {
687                    available_mb: crate::memory_guard::available_memory_mb(),
688                    required_mb: max_rss_mb,
689                });
690            }
691        }
692        match crate::embedder::embed_passages_parallel_with_embedding_choice(
693            &paths.models,
694            &chunk_texts,
695            llm_parallelism,
696            crate::embedder::chunk_embed_batch_size(),
697            embedding_backend,
698            llm_backend,
699        ) {
700            Ok(chunk_embeddings) => {
701                let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
702                chunk_embeddings_opt = Some(chunk_embeddings);
703                // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
704                // único por chamada. Conservadoramente, populamos None aqui.
705                (Some(aggregated), None)
706            }
707            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
708            Err(e) if skip_embed => {
709                tracing::warn!(error = %e, file = %path.display(), "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
710                (None, None)
711            }
712            Err(e) => return Err(e),
713        }
714    };
715
716    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
717    let entity_texts: Vec<String> = extracted_entities
718        .iter()
719        .map(|entity| match &entity.description {
720            Some(desc) => format!("{} {}", entity.name, desc),
721            None => entity.name.clone(),
722        })
723        .collect();
724    // G56 (v1.0.80): ingest reuses canonical entity names across many
725    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
726    // cache collapses the repeated LLM calls into one per unique text.
727    let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
728        &paths.models,
729        &entity_texts,
730        llm_parallelism,
731        embedding_backend,
732        llm_backend,
733    ) {
734        Ok((entity_embeddings, embed_cache_stats)) => {
735            if embed_cache_stats.hits > 0 {
736                tracing::debug!(
737                    hits = embed_cache_stats.hits,
738                    misses = embed_cache_stats.misses,
739                    requested = embed_cache_stats.requested,
740                    "G56: entity embed cache hit (ingest)"
741                );
742            }
743            Some(entity_embeddings)
744        }
745        Err(e) if skip_embed => {
746            tracing::warn!(error = %e, file = %path.display(), "ingest: entity embedding failed; --skip-embedding-on-failure active");
747            None
748        }
749        Err(e) => return Err(e),
750    };
751
752    Ok(StagedFile {
753        body: raw_body,
754        body_hash,
755        snippet,
756        name: name.to_string(),
757        description,
758        embedding,
759        chunk_embeddings: chunk_embeddings_opt,
760        chunks_info,
761        entities: extracted_entities,
762        relationships: extracted_relationships,
763        entity_embeddings: entity_embeddings_opt,
764        urls: extracted_urls,
765        backend_invoked,
766    })
767}
768
769/// Phase B: persists one `StagedFile` to the database on the main thread.
770fn persist_staged(
771    conn: &mut Connection,
772    namespace: &str,
773    memory_type: &str,
774    staged: StagedFile,
775) -> Result<FileSuccess, AppError> {
776    {
777        let active_count: u32 = conn.query_row(
778            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
779            [],
780            |r| r.get::<_, i64>(0).map(|v| v as u32),
781        )?;
782        let ns_exists: bool = conn.query_row(
783            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
784            rusqlite::params![namespace],
785            |r| r.get::<_, i64>(0).map(|v| v > 0),
786        )?;
787        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
788            return Err(AppError::NamespaceError(format!(
789                "active namespace limit of {} exceeded while creating '{namespace}'",
790                crate::constants::MAX_NAMESPACES_ACTIVE
791            )));
792        }
793    }
794
795    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
796    if existing_memory.is_some() {
797        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
798            &staged.name,
799            namespace,
800        )));
801    }
802    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
803
804    let new_memory = NewMemory {
805        namespace: namespace.to_string(),
806        name: staged.name.clone(),
807        memory_type: memory_type.to_string(),
808        description: staged.description.clone(),
809        body: staged.body,
810        body_hash: staged.body_hash,
811        session_id: None,
812        source: "agent".to_string(),
813        metadata: serde_json::json!({}),
814    };
815
816    if let Some(hash_id) = duplicate_hash_id {
817        tracing::debug!(
818            target: "ingest",
819            duplicate_memory_id = hash_id,
820            "identical body already exists; persisting a new memory anyway"
821        );
822    }
823
824    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
825
826    let memory_id = memories::insert(&tx, &new_memory)?;
827    versions::insert_version(
828        &tx,
829        memory_id,
830        1,
831        &staged.name,
832        memory_type,
833        &staged.description,
834        &new_memory.body,
835        &serde_json::to_string(&new_memory.metadata)?,
836        None,
837        "create",
838    )?;
839    if let Some(ref emb) = staged.embedding {
840        memories::upsert_vec(
841            &tx,
842            memory_id,
843            namespace,
844            memory_type,
845            emb,
846            &staged.name,
847            &staged.snippet,
848        )?;
849    }
850
851    if staged.chunks_info.len() > 1 {
852        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
853        if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
854            for (i, emb) in chunk_embeddings.iter().enumerate() {
855                storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
856            }
857        }
858    }
859
860    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
861        for (idx, entity) in staged.entities.iter().enumerate() {
862            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
863            if let Some(ref entity_embeddings) = staged.entity_embeddings {
864                if let Some(entity_embedding) = entity_embeddings.get(idx) {
865                    entities::upsert_entity_vec(
866                        &tx,
867                        entity_id,
868                        namespace,
869                        entity.entity_type,
870                        entity_embedding,
871                        &entity.name,
872                    )?;
873                }
874            }
875            entities::link_memory_entity(&tx, memory_id, entity_id)?;
876        }
877        let entity_types: std::collections::HashMap<&str, EntityType> = staged
878            .entities
879            .iter()
880            .map(|entity| (entity.name.as_str(), entity.entity_type))
881            .collect();
882
883        let mut affected_entity_ids: std::collections::HashSet<i64> =
884            std::collections::HashSet::new();
885        for entity in &staged.entities {
886            if let Some(eid) = entities::find_entity_id(&tx, namespace, &entity.name)? {
887                affected_entity_ids.insert(eid);
888            }
889        }
890
891        for rel in &staged.relationships {
892            let source_entity = NewEntity {
893                name: rel.source.clone(),
894                entity_type: entity_types
895                    .get(rel.source.as_str())
896                    .copied()
897                    .unwrap_or(EntityType::Concept),
898                description: None,
899            };
900            let target_entity = NewEntity {
901                name: rel.target.clone(),
902                entity_type: entity_types
903                    .get(rel.target.as_str())
904                    .copied()
905                    .unwrap_or(EntityType::Concept),
906                description: None,
907            };
908            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
909            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
910            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
911            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
912            affected_entity_ids.insert(source_id);
913            affected_entity_ids.insert(target_id);
914        }
915
916        for &eid in &affected_entity_ids {
917            entities::recalculate_degree(&tx, eid)?;
918        }
919    }
920
921    tx.commit()?;
922
923    if !staged.urls.is_empty() {
924        let url_entries: Vec<storage_urls::MemoryUrl> = staged
925            .urls
926            .into_iter()
927            .map(|u| storage_urls::MemoryUrl {
928                url: u.url,
929                offset: Some(u.start as i64),
930            })
931            .collect();
932        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
933    }
934
935    Ok(FileSuccess {
936        memory_id,
937        action: "created".to_string(),
938        body_length: new_memory.body.len(),
939        backend_invoked: staged.backend_invoked,
940    })
941}
942
943// ---------------------------------------------------------------------------
944// G20: mode-conditional flag validation
945// ---------------------------------------------------------------------------
946
947/// True when a scalar value matches its declared default. Local
948/// re-declaration (also defined in ) to keep this module
949/// self-contained for the G20 fix.
950fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
951    value == default
952}
953
954/// G20: validate that flags for one LLM provider were not passed when
955/// the operator selected a different provider (or no provider). Flags
956/// silently discarded by the wrong mode are surfaced as
957///  BEFORE any DB work, so the operator gets
958/// an actionable error instead of a surprise at runtime.
959///
960/// Mode-specific matrices:
961/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
962///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
963///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
964///   --enable-ner is false)
965/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
966/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
967///   max_cost_usd, resume, retry_failed, keep_queue
968fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
969    const DEFAULT_TIMEOUT: u64 = 300;
970    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
971
972    let mut conflicts: Vec<String> = Vec::new();
973
974    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
975
976    if is_local_mode {
977        if args.claude_binary.is_some() {
978            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
979        }
980        if args.claude_model.is_some() {
981            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
982        }
983        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
984            conflicts.push(format!(
985                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
986                args.claude_timeout
987            ));
988        }
989        if args.codex_binary.is_some() {
990            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
991        }
992        if args.codex_model.is_some() {
993            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
994        }
995        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
996            conflicts.push(format!(
997                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
998                args.codex_timeout
999            ));
1000        }
1001        if args.opencode_binary.is_some() {
1002            conflicts
1003                .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
1004        }
1005        if args.opencode_model.is_some() {
1006            conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
1007        }
1008        if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1009            conflicts.push(format!(
1010                "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1011                args.opencode_timeout
1012            ));
1013        }
1014        if args.max_cost_usd.is_some() {
1015            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1016        }
1017        if args.resume {
1018            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1019        }
1020        if args.retry_failed {
1021            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1022        }
1023        if args.keep_queue {
1024            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1025        }
1026        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1027            conflicts.push(format!(
1028                "--rate-limit-wait={} is ignored when --mode is none or gliner",
1029                args.rate_limit_wait
1030            ));
1031        }
1032    }
1033
1034    match args.mode {
1035        IngestMode::ClaudeCode => {
1036            if args.codex_binary.is_some() {
1037                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1038            }
1039            if args.codex_model.is_some() {
1040                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1041            }
1042            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1043                conflicts.push(format!(
1044                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1045                    args.codex_timeout
1046                ));
1047            }
1048            if args.opencode_binary.is_some() {
1049                conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1050            }
1051            if args.opencode_model.is_some() {
1052                conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1053            }
1054            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1055                conflicts.push(format!(
1056                    "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1057                    args.opencode_timeout
1058                ));
1059            }
1060        }
1061        IngestMode::Codex => {
1062            if args.claude_binary.is_some() {
1063                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1064            }
1065            if args.claude_model.is_some() {
1066                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1067            }
1068            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1069                conflicts.push(format!(
1070                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1071                    args.claude_timeout
1072                ));
1073            }
1074            if args.max_cost_usd.is_some() {
1075                conflicts.push(
1076                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1077                        .to_string(),
1078                );
1079            }
1080            if args.resume {
1081                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1082            }
1083            if args.retry_failed {
1084                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1085            }
1086            if args.keep_queue {
1087                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1088            }
1089            if args.opencode_binary.is_some() {
1090                conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1091            }
1092            if args.opencode_model.is_some() {
1093                conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1094            }
1095            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1096                conflicts.push(format!(
1097                    "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1098                    args.opencode_timeout
1099                ));
1100            }
1101        }
1102        IngestMode::Opencode => {
1103            if args.claude_binary.is_some() {
1104                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1105            }
1106            if args.claude_model.is_some() {
1107                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1108            }
1109            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1110                conflicts.push(format!(
1111                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1112                    args.claude_timeout
1113                ));
1114            }
1115            if args.codex_binary.is_some() {
1116                conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1117            }
1118            if args.codex_model.is_some() {
1119                conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1120            }
1121            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1122                conflicts.push(format!(
1123                    "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1124                    args.codex_timeout
1125                ));
1126            }
1127            if args.max_cost_usd.is_some() {
1128                conflicts.push(
1129                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1130                        .to_string(),
1131                );
1132            }
1133            if args.resume {
1134                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1135            }
1136            if args.retry_failed {
1137                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1138            }
1139            if args.keep_queue {
1140                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1141            }
1142        }
1143        IngestMode::None | IngestMode::Gliner => {}
1144    }
1145
1146    if !conflicts.is_empty() {
1147        return Err(AppError::Validation(format!(
1148            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
1149            args.mode,
1150            conflicts.join("\n  - ")
1151        )));
1152    }
1153
1154    Ok(())
1155}
1156
1157// ---------------------------------------------------------------------------
1158
1159#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1160pub fn run(
1161    args: IngestArgs,
1162    llm_backend: crate::cli::LlmBackendChoice,
1163    embedding_backend: crate::cli::EmbeddingBackendChoice,
1164) -> Result<(), AppError> {
1165    // G20: mode-conditional flag validation BEFORE any DB access.
1166    // Surfaces flags that the wrong mode would silently discard.
1167    validate_mode_conditional_flags_ingest(&args)?;
1168    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1169    if args.mode == IngestMode::ClaudeCode {
1170        return super::ingest_claude::run_claude_ingest(&args, embedding_backend, llm_backend);
1171    }
1172    if args.mode == IngestMode::Codex {
1173        return super::ingest_codex::run_codex_ingest(&args);
1174    }
1175    if args.mode == IngestMode::Opencode {
1176        return super::ingest_opencode::run_opencode_ingest(&args);
1177    }
1178
1179    let started = std::time::Instant::now();
1180
1181    if !args.dir.exists() {
1182        return Err(AppError::Validation(format!(
1183            "directory not found: {}",
1184            args.dir.display()
1185        )));
1186    }
1187    if !args.dir.is_dir() {
1188        return Err(AppError::Validation(format!(
1189            "path is not a directory: {}",
1190            args.dir.display()
1191        )));
1192    }
1193
1194    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1195    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1196    files.sort_unstable();
1197
1198    if files.len() > args.max_files {
1199        return Err(AppError::Validation(format!(
1200            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1201            files.len(),
1202            args.max_files
1203        )));
1204    }
1205
1206    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1207    let memory_type_str = args.r#type.as_str().to_string();
1208
1209    let paths = AppPaths::resolve(args.db.as_deref())?;
1210    let mut conn_or_err = match init_storage(&paths) {
1211        Ok(c) => Ok(c),
1212        Err(e) => Err(format!("{e}")),
1213    };
1214
1215    let mut succeeded: usize = 0;
1216    let mut failed: usize = 0;
1217    let mut skipped: usize = 0;
1218    let total = files.len();
1219
1220    // Pre-resolve all names before parallelisation so Phase A workers see a
1221    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1222    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1223
1224    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1225    // ProcessItem: the data moved into the producer thread for Phase A computation.
1226    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1227    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1228    // boundary into the rayon producer.
1229    enum SlotMeta {
1230        Skip {
1231            file_str: String,
1232            derived_base: String,
1233            name_truncated: bool,
1234            original_name: Option<String>,
1235            original_filename: Option<String>,
1236            reason: String,
1237        },
1238        Process {
1239            file_str: String,
1240            derived_name: String,
1241            name_truncated: bool,
1242            original_name: Option<String>,
1243            original_filename: Option<String>,
1244        },
1245    }
1246
1247    struct ProcessItem {
1248        idx: usize,
1249        path: PathBuf,
1250        file_str: String,
1251        derived_name: String,
1252    }
1253
1254    let files_cap = files.len();
1255    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1256    slots_meta.try_reserve(files_cap).map_err(|_| {
1257        AppError::LimitExceeded(format!(
1258            "allocation of {files_cap} slot metadata entries would exceed available memory"
1259        ))
1260    })?;
1261    let mut process_items: Vec<ProcessItem> = Vec::new();
1262    process_items.try_reserve(files_cap).map_err(|_| {
1263        AppError::LimitExceeded(format!(
1264            "allocation of {files_cap} process items would exceed available memory"
1265        ))
1266    })?;
1267    let mut truncations: Vec<(String, String)> = Vec::new();
1268    truncations.try_reserve(files_cap).map_err(|_| {
1269        AppError::LimitExceeded(format!(
1270            "allocation of {files_cap} truncation entries would exceed available memory"
1271        ))
1272    })?;
1273
1274    let max_name_length = args.max_name_length;
1275    for path in &files {
1276        let file_str = path.to_string_lossy().into_owned();
1277        let (derived_base, name_truncated, original_name) =
1278            derive_kebab_name(path, max_name_length);
1279        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1280
1281        if name_truncated {
1282            if let Some(ref orig) = original_name {
1283                truncations.push((orig.clone(), derived_base.clone()));
1284            }
1285        }
1286
1287        if derived_base.is_empty() {
1288            // original_filename: always include when it differs from the empty derived name
1289            let orig_filename = if !original_basename.is_empty() {
1290                Some(original_basename.to_string())
1291            } else {
1292                None
1293            };
1294            slots_meta.push(SlotMeta::Skip {
1295                file_str,
1296                derived_base: String::new(),
1297                name_truncated: false,
1298                original_name: None,
1299                original_filename: orig_filename,
1300                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1301            });
1302            continue;
1303        }
1304
1305        match unique_name(&derived_base, &taken_names) {
1306            Ok(derived_name) => {
1307                taken_names.insert(derived_name.clone());
1308                let idx = slots_meta.len();
1309                // original_filename: present only when the raw basename differs from the derived name
1310                let orig_filename = if original_basename != derived_name {
1311                    Some(original_basename.to_string())
1312                } else {
1313                    None
1314                };
1315                process_items.push(ProcessItem {
1316                    idx,
1317                    path: path.clone(),
1318                    file_str: file_str.clone(),
1319                    derived_name: derived_name.clone(),
1320                });
1321                slots_meta.push(SlotMeta::Process {
1322                    file_str,
1323                    derived_name,
1324                    name_truncated,
1325                    original_name,
1326                    original_filename: orig_filename,
1327                });
1328            }
1329            Err(e) => {
1330                let orig_filename = if original_basename != derived_base {
1331                    Some(original_basename.to_string())
1332                } else {
1333                    None
1334                };
1335                slots_meta.push(SlotMeta::Skip {
1336                    file_str,
1337                    derived_base,
1338                    name_truncated,
1339                    original_name,
1340                    original_filename: orig_filename,
1341                    reason: e.to_string(),
1342                });
1343            }
1344        }
1345    }
1346
1347    if !truncations.is_empty() {
1348        tracing::info!(
1349            target: "ingest",
1350            count = truncations.len(),
1351            max_name_length = max_name_length,
1352            max_len = DERIVED_NAME_MAX_LEN,
1353            "derived names truncated; pass -vv (debug) for per-file detail"
1354        );
1355    }
1356
1357    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1358    if args.dry_run {
1359        for meta in &slots_meta {
1360            match meta {
1361                SlotMeta::Skip {
1362                    file_str,
1363                    derived_base,
1364                    name_truncated,
1365                    original_name,
1366                    original_filename,
1367                    reason,
1368                } => {
1369                    output::emit_json_compact(&IngestFileEvent {
1370                        file: file_str,
1371                        name: derived_base,
1372                        status: "skip",
1373                        truncated: *name_truncated,
1374                        original_name: original_name.clone(),
1375                        original_filename: original_filename.as_deref(),
1376                        error: Some(reason.clone()),
1377                        memory_id: None,
1378                        action: None,
1379                        body_length: 0,
1380                        backend_invoked: None,
1381                    })?;
1382                }
1383                SlotMeta::Process {
1384                    file_str,
1385                    derived_name,
1386                    name_truncated,
1387                    original_name,
1388                    original_filename,
1389                } => {
1390                    output::emit_json_compact(&IngestFileEvent {
1391                        file: file_str,
1392                        name: derived_name,
1393                        status: "preview",
1394                        truncated: *name_truncated,
1395                        original_name: original_name.clone(),
1396                        original_filename: original_filename.as_deref(),
1397                        error: None,
1398                        memory_id: None,
1399                        action: None,
1400                        body_length: 0,
1401                        backend_invoked: None,
1402                    })?;
1403                }
1404            }
1405        }
1406        output::emit_json_compact(&IngestSummary {
1407            summary: true,
1408            dir: args.dir.to_string_lossy().into_owned(),
1409            pattern: args.pattern.clone(),
1410            recursive: args.recursive,
1411            files_total: total,
1412            files_succeeded: 0,
1413            files_failed: 0,
1414            files_skipped: 0,
1415            elapsed_ms: started.elapsed().as_millis() as u64,
1416        })?;
1417        return Ok(());
1418    }
1419
1420    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1421    if args.low_memory {
1422        if let Some(n) = args.ingest_parallelism {
1423            if n > 1 {
1424                return Err(AppError::Validation(
1425                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1426                        .to_string(),
1427                ));
1428            }
1429        }
1430    }
1431
1432    // Determine rayon thread pool size, honoring --low-memory and the
1433    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1434    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1435
1436    let pool = rayon::ThreadPoolBuilder::new()
1437        .num_threads(parallelism)
1438        .build()
1439        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1440
1441    if args.enable_ner && args.skip_extraction {
1442        return Err(AppError::Validation(
1443            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1444        ));
1445    }
1446    if args.skip_extraction && !args.enable_ner {
1447        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1448        // commit (9ddb17b) promoted this to a hard validation error, which
1449        // broke the "kept as a hidden no-op for backwards compatibility"
1450        // promise documented in CHANGELOG v1.0.45 and started failing
1451        // 5+ CI jobs whose E2E tests use this flag to skip the
1452        // GLiNER-ONNX model download in CI environments.
1453        tracing::warn!(
1454            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1455        );
1456    }
1457    let enable_ner = args.enable_ner;
1458    let auto_describe = args.auto_describe && !args.no_auto_describe;
1459    let max_rss_mb = args.max_rss_mb;
1460    let llm_parallelism = args.llm_parallelism as usize;
1461    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1462    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1463    // callers do not silently expect NER-quality extraction.
1464    if args.mode == IngestMode::Gliner {
1465        tracing::warn!(
1466            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1467        );
1468    }
1469    if args.gliner_variant != "fp32" {
1470        tracing::warn!(
1471            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1472        );
1473    }
1474    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1475        "int8" => crate::extraction::GlinerVariant::Int8,
1476        _ => crate::extraction::GlinerVariant::Fp32,
1477    };
1478
1479    let total_to_process = process_items.len();
1480    tracing::info!(
1481        target: "ingest",
1482        phase = "pipeline_start",
1483        files = total_to_process,
1484        ingest_parallelism = parallelism,
1485        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1486    );
1487
1488    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1489    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1490    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1491    let channel_bound = (parallelism * 2).max(1);
1492    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1493
1494    // Phase A: launched in a dedicated OS thread so the main thread can consume
1495    // the channel concurrently. pool.install() blocks the calling thread until
1496    // all rayon workers finish — if called on the main thread it would
1497    // reintroduce the 2-phase blocking behaviour we are eliminating.
1498    let paths_owned = paths.clone();
1499    let llm_backend_owned = llm_backend;
1500    let embedding_backend_owned = embedding_backend;
1501    let producer_handle = std::thread::spawn(move || {
1502        pool.install(|| {
1503            process_items.into_par_iter().for_each(|item| {
1504                if crate::shutdown_requested() {
1505                    return;
1506                }
1507                let t0 = std::time::Instant::now();
1508                let result = stage_file(
1509                    item.idx,
1510                    &item.path,
1511                    &item.derived_name,
1512                    &paths_owned,
1513                    enable_ner,
1514                    gliner_variant,
1515                    max_rss_mb,
1516                    llm_parallelism,
1517                    llm_backend_owned,
1518                    embedding_backend_owned,
1519                    auto_describe,
1520                );
1521                let elapsed_ms = t0.elapsed().as_millis() as u64;
1522
1523                // Emit NDJSON progress event to stderr so the user sees work
1524                // happening during long NER runs (e.g. 50 files × 27s each).
1525                let (n_entities, n_relationships) = match &result {
1526                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1527                    Err(_) => (0, 0),
1528                };
1529                let progress = StageProgressEvent {
1530                    schema_version: 1,
1531                    event: "file_extracted",
1532                    path: &item.file_str,
1533                    ms: elapsed_ms,
1534                    entities: n_entities,
1535                    relationships: n_relationships,
1536                };
1537                if let Ok(line) = serde_json::to_string(&progress) {
1538                    tracing::info!(target: "ingest_progress", "{}", line);
1539                }
1540
1541                // Blocking send applies backpressure: if Phase B is slower,
1542                // Phase A workers wait here instead of accumulating staged files
1543                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1544                let _ = tx.send((item.idx, result));
1545            });
1546            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1547            drop(tx);
1548        });
1549    });
1550
1551    // Phase B: main thread persists files as results arrive from the channel.
1552    // Results arrive in completion order (par_iter is unordered). We persist
1553    // each file immediately on arrival — this is the key fix for B1: with the
1554    // old 2-phase design the first DB write happened only after ALL files had
1555    // finished Phase A. Now the first commit happens as soon as the first file
1556    // completes Phase A, regardless of how many files remain.
1557    //
1558    // NDJSON output order follows completion order (not file-system sort order).
1559    // Skip slots are emitted at the end, after all Process results are consumed.
1560    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1561    // requirement than ensuring data is persisted before the user's timeout fires.
1562    let fail_fast = args.fail_fast;
1563
1564    // Emit pending Skip events first so agents see them early.
1565    for meta in &slots_meta {
1566        if let SlotMeta::Skip {
1567            file_str,
1568            derived_base,
1569            name_truncated,
1570            original_name,
1571            original_filename,
1572            reason,
1573        } = meta
1574        {
1575            output::emit_json_compact(&IngestFileEvent {
1576                file: file_str,
1577                name: derived_base,
1578                status: "skipped",
1579                truncated: *name_truncated,
1580                original_name: original_name.clone(),
1581                original_filename: original_filename.as_deref(),
1582                error: Some(reason.clone()),
1583                memory_id: None,
1584                action: None,
1585                body_length: 0,
1586                backend_invoked: None,
1587            })?;
1588            skipped += 1;
1589        }
1590    }
1591
1592    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1593    // as channel messages arrive in completion order.
1594    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1595        .iter()
1596        .enumerate()
1597        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1598        .collect();
1599
1600    tracing::info!(
1601        target: "ingest",
1602        phase = "persist_start",
1603        files = total_to_process,
1604        "phase B starting: persisting files incrementally as Phase A completes each one",
1605    );
1606
1607    // Drain channel and persist each file immediately — no accumulation into a
1608    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1609    // Phase B without applying backpressure.
1610    for (idx, stage_result) in rx {
1611        if crate::shutdown_requested() {
1612            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1613            break;
1614        }
1615        let meta = meta_index.get(&idx).ok_or_else(|| {
1616            AppError::Internal(anyhow::anyhow!(
1617                "channel idx {idx} has no corresponding Process slot"
1618            ))
1619        })?;
1620        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1621        {
1622            SlotMeta::Process {
1623                file_str,
1624                derived_name,
1625                name_truncated,
1626                original_name,
1627                original_filename,
1628            } => (
1629                file_str,
1630                derived_name,
1631                name_truncated,
1632                original_name,
1633                original_filename,
1634            ),
1635            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1636        };
1637
1638        // If storage init failed, every file fails with the same error.
1639        let conn = match conn_or_err.as_mut() {
1640            Ok(c) => c,
1641            Err(err_msg) => {
1642                let err_clone = err_msg.clone();
1643                output::emit_json_compact(&IngestFileEvent {
1644                    file: file_str,
1645                    name: derived_name,
1646                    status: "failed",
1647                    truncated: *name_truncated,
1648                    original_name: original_name.clone(),
1649                    original_filename: original_filename.as_deref(),
1650                    error: Some(err_clone.clone()),
1651                    memory_id: None,
1652                    action: None,
1653                    body_length: 0,
1654                    backend_invoked: None,
1655                })?;
1656                failed += 1;
1657                if fail_fast {
1658                    output::emit_json_compact(&IngestSummary {
1659                        summary: true,
1660                        dir: args.dir.display().to_string(),
1661                        pattern: args.pattern.clone(),
1662                        recursive: args.recursive,
1663                        files_total: total,
1664                        files_succeeded: succeeded,
1665                        files_failed: failed,
1666                        files_skipped: skipped,
1667                        elapsed_ms: started.elapsed().as_millis() as u64,
1668                    })?;
1669                    return Err(AppError::Validation(format!(
1670                        "ingest aborted on first failure: {err_clone}"
1671                    )));
1672                }
1673                continue;
1674            }
1675        };
1676
1677        let outcome =
1678            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1679
1680        match outcome {
1681            Ok(FileSuccess {
1682                memory_id,
1683                action,
1684                body_length,
1685                backend_invoked: file_backend_invoked,
1686            }) => {
1687                output::emit_json_compact(&IngestFileEvent {
1688                    file: file_str,
1689                    name: derived_name,
1690                    status: "indexed",
1691                    truncated: *name_truncated,
1692                    original_name: original_name.clone(),
1693                    original_filename: original_filename.as_deref(),
1694                    error: None,
1695                    memory_id: Some(memory_id),
1696                    action: Some(action),
1697                    body_length,
1698                    backend_invoked: file_backend_invoked,
1699                })?;
1700                succeeded += 1;
1701            }
1702            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1703                output::emit_json_compact(&IngestFileEvent {
1704                    file: file_str,
1705                    name: derived_name,
1706                    status: "skipped",
1707                    truncated: *name_truncated,
1708                    original_name: original_name.clone(),
1709                    original_filename: original_filename.as_deref(),
1710                    error: Some(format!("{e}")),
1711                    memory_id: None,
1712                    action: Some("duplicate".to_string()),
1713                    body_length: 0,
1714                    backend_invoked: None,
1715                })?;
1716                skipped += 1;
1717            }
1718            Err(e) => {
1719                let err_msg = format!("{e}");
1720                output::emit_json_compact(&IngestFileEvent {
1721                    file: file_str,
1722                    name: derived_name,
1723                    status: "failed",
1724                    truncated: *name_truncated,
1725                    original_name: original_name.clone(),
1726                    original_filename: original_filename.as_deref(),
1727                    error: Some(err_msg.clone()),
1728                    memory_id: None,
1729                    action: None,
1730                    body_length: 0,
1731                    backend_invoked: None,
1732                })?;
1733                failed += 1;
1734                if fail_fast {
1735                    output::emit_json_compact(&IngestSummary {
1736                        summary: true,
1737                        dir: args.dir.display().to_string(),
1738                        pattern: args.pattern.clone(),
1739                        recursive: args.recursive,
1740                        files_total: total,
1741                        files_succeeded: succeeded,
1742                        files_failed: failed,
1743                        files_skipped: skipped,
1744                        elapsed_ms: started.elapsed().as_millis() as u64,
1745                    })?;
1746                    return Err(AppError::Validation(format!(
1747                        "ingest aborted on first failure: {err_msg}"
1748                    )));
1749                }
1750            }
1751        }
1752    }
1753
1754    // Wait for the producer thread to finish cleanly.
1755    producer_handle
1756        .join()
1757        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1758
1759    if let Ok(ref conn) = conn_or_err {
1760        if succeeded > 0 {
1761            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1762        }
1763    }
1764
1765    output::emit_json_compact(&IngestSummary {
1766        summary: true,
1767        dir: args.dir.display().to_string(),
1768        pattern: args.pattern.clone(),
1769        recursive: args.recursive,
1770        files_total: total,
1771        files_succeeded: succeeded,
1772        files_failed: failed,
1773        files_skipped: skipped,
1774        elapsed_ms: started.elapsed().as_millis() as u64,
1775    })?;
1776
1777    if args.enrich_after && succeeded > 0 {
1778        output::emit_json_compact(&serde_json::json!({
1779            "event": "enrich_phase_started",
1780            "operation": "memory-bindings"
1781        }))?;
1782        let enrich_args = super::enrich::EnrichArgs {
1783            operation: super::enrich::EnrichOperation::MemoryBindings,
1784            mode: super::enrich::EnrichMode::ClaudeCode,
1785            limit: None,
1786            dry_run: false,
1787            namespace: args.namespace.clone(),
1788            claude_binary: args.claude_binary.clone(),
1789            claude_model: args.claude_model.clone(),
1790            claude_timeout: args.claude_timeout,
1791            codex_binary: args.codex_binary.clone(),
1792            codex_model: args.codex_model.clone(),
1793            codex_timeout: args.codex_timeout,
1794            opencode_binary: args.opencode_binary.clone(),
1795            opencode_model: args.opencode_model.clone(),
1796            opencode_timeout: args.opencode_timeout,
1797            db: args.db.clone(),
1798            json: false,
1799            resume: false,
1800            retry_failed: false,
1801            max_cost_usd: args.max_cost_usd,
1802            llm_parallelism: args.llm_parallelism as u32,
1803            wait_job_singleton: args.wait_job_singleton,
1804            force_job_singleton: args.force_job_singleton,
1805            names: Vec::new(),
1806            names_file: None,
1807            preflight_check: false,
1808            fallback_mode: None,
1809            rate_limit_buffer: 300,
1810            max_load_check: true,
1811            circuit_breaker_threshold: 5,
1812            preserve_threshold: 0.7,
1813            codex_model_validate: true,
1814            codex_model_fallback: None,
1815            min_output_chars: 500,
1816            max_output_chars: 2000,
1817            preserve_check: true,
1818            prompt_template: None,
1819        };
1820        match super::enrich::run(&enrich_args, llm_backend, embedding_backend) {
1821            Ok(()) => {
1822                output::emit_json_compact(&serde_json::json!({
1823                    "event": "enrich_phase_completed"
1824                }))?;
1825            }
1826            Err(e) => {
1827                tracing::warn!(error = %e, "enrich --operation memory-bindings failed after ingest");
1828                output::emit_json_compact(&serde_json::json!({
1829                    "event": "enrich_phase_failed",
1830                    "error": e.to_string()
1831                }))?;
1832            }
1833        }
1834    }
1835
1836    Ok(())
1837}
1838
1839/// Auto-initialises the database (matches the contract of every other CRUD
1840/// handler) and returns a fresh read/write connection ready for the ingest
1841/// loop. Errors here are recoverable per-file: the caller surfaces them as
1842/// failure events so `--fail-fast` and the continue-on-error path keep
1843/// working when, for example, the user points `--db` at an unwritable path.
1844fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1845    ensure_db_ready(paths)?;
1846    let conn = open_rw(&paths.db)?;
1847    Ok(conn)
1848}
1849
1850pub(crate) fn collect_files(
1851    dir: &Path,
1852    pattern: &str,
1853    recursive: bool,
1854    out: &mut Vec<PathBuf>,
1855) -> Result<(), AppError> {
1856    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1857    for entry in entries {
1858        let entry = entry.map_err(AppError::Io)?;
1859        let path = entry.path();
1860        let file_type = entry.file_type().map_err(AppError::Io)?;
1861        if file_type.is_file() {
1862            let name = entry.file_name();
1863            let name_str = name.to_string_lossy();
1864            if matches_pattern(&name_str, pattern) {
1865                out.push(path);
1866            }
1867        } else if file_type.is_dir() && recursive {
1868            collect_files(&path, pattern, recursive, out)?;
1869        }
1870    }
1871    Ok(())
1872}
1873
1874fn matches_pattern(name: &str, pattern: &str) -> bool {
1875    if let Some(suffix) = pattern.strip_prefix('*') {
1876        name.ends_with(suffix)
1877    } else if let Some(prefix) = pattern.strip_suffix('*') {
1878        name.starts_with(prefix)
1879    } else {
1880        name == pattern
1881    }
1882}
1883
1884/// Returns `(final_name, truncated, original_name)`.
1885/// `truncated` is true when the derived name exceeded `max_len`.
1886/// `original_name` holds the pre-truncation name only when `truncated=true`.
1887///
1888/// Non-ASCII characters are first decomposed via NFD and then stripped of
1889/// combining marks so accented letters fold to their base ASCII letter
1890/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1891/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1892/// preserves meaningful word content rather than collapsing the basename
1893/// to a few stray ASCII letters as the previous filter did.
1894pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1895    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1896    let lowered: String = stem
1897        .nfd()
1898        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1899        .map(|c| {
1900            if c == '_' || c.is_whitespace() {
1901                '-'
1902            } else {
1903                c
1904            }
1905        })
1906        .map(|c| c.to_ascii_lowercase())
1907        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1908        .collect();
1909    let collapsed = collapse_dashes(&lowered);
1910    let trimmed_raw = collapsed.trim_matches('-').to_string();
1911    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1912    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1913        format!("doc-{trimmed_raw}")
1914    } else {
1915        trimmed_raw
1916    };
1917    if trimmed.len() > max_len {
1918        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1919        tracing::debug!(
1920            target: "ingest",
1921            original = %trimmed,
1922            truncated_to = %truncated,
1923            max_len = max_len,
1924            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1925        );
1926        (truncated, true, Some(trimmed))
1927    } else {
1928        (trimmed, false, None)
1929    }
1930}
1931
1932/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1933/// numeric suffix (`-1`, `-2`, …) when needed.
1934///
1935/// `taken` is the set of names already consumed in the current ingest run.
1936/// The caller is expected to insert the returned name into `taken` so the
1937/// next call observes the consumption. Cross-run collisions are intentionally
1938/// surfaced by the per-file persistence path as duplicates so re-ingestion
1939/// of identical corpora stays idempotent.
1940///
1941/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1942/// candidates collide, signalling a pathological corpus that should be
1943/// renamed manually.
1944fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1945    if !taken.contains(base) {
1946        return Ok(base.to_string());
1947    }
1948    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1949        let candidate = format!("{base}-{suffix}");
1950        if !taken.contains(&candidate) {
1951            tracing::warn!(
1952                target: "ingest",
1953                base = %base,
1954                resolved = %candidate,
1955                suffix,
1956                "memory name collision resolved with numeric suffix"
1957            );
1958            return Ok(candidate);
1959        }
1960    }
1961    Err(AppError::Validation(format!(
1962        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1963    )))
1964}
1965
1966fn collapse_dashes(s: &str) -> String {
1967    let mut out = String::with_capacity(s.len());
1968    let mut prev_dash = false;
1969    for c in s.chars() {
1970        if c == '-' {
1971            if !prev_dash {
1972                out.push('-');
1973            }
1974            prev_dash = true;
1975        } else {
1976            out.push(c);
1977            prev_dash = false;
1978        }
1979    }
1980    out
1981}
1982
1983#[cfg(test)]
1984mod tests {
1985    use super::*;
1986    use std::path::PathBuf;
1987
1988    #[test]
1989    fn matches_pattern_suffix() {
1990        assert!(matches_pattern("foo.md", "*.md"));
1991        assert!(!matches_pattern("foo.txt", "*.md"));
1992        assert!(matches_pattern("foo.md", "*"));
1993    }
1994
1995    #[test]
1996    fn matches_pattern_prefix() {
1997        assert!(matches_pattern("README.md", "README*"));
1998        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1999    }
2000
2001    #[test]
2002    fn matches_pattern_exact() {
2003        assert!(matches_pattern("README.md", "README.md"));
2004        assert!(!matches_pattern("readme.md", "README.md"));
2005    }
2006
2007    #[test]
2008    fn derive_kebab_underscore_to_dash() {
2009        let p = PathBuf::from("/tmp/claude_code_headless.md");
2010        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2011        assert_eq!(name, "claude-code-headless");
2012        assert!(!truncated);
2013        assert!(original.is_none());
2014    }
2015
2016    #[test]
2017    fn derive_kebab_uppercase_lowered() {
2018        let p = PathBuf::from("/tmp/README.md");
2019        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2020        assert_eq!(name, "readme");
2021        assert!(!truncated);
2022        assert!(original.is_none());
2023    }
2024
2025    #[test]
2026    fn derive_kebab_strips_non_kebab_chars() {
2027        let p = PathBuf::from("/tmp/some@weird#name!.md");
2028        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2029        assert_eq!(name, "someweirdname");
2030        assert!(!truncated);
2031        assert!(original.is_none());
2032    }
2033
2034    // Bug M-A3: NFD-based unicode normalization preserves base letters of
2035    // accented characters instead of dropping them entirely.
2036    #[test]
2037    fn derive_kebab_folds_accented_letters_to_ascii() {
2038        let p = PathBuf::from("/tmp/açaí.md");
2039        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2040        assert_eq!(name, "acai", "got '{name}'");
2041    }
2042
2043    #[test]
2044    fn derive_kebab_handles_naive_with_diaeresis() {
2045        let p = PathBuf::from("/tmp/naïve-test.md");
2046        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2047        assert_eq!(name, "naive-test", "got '{name}'");
2048    }
2049
2050    #[test]
2051    fn derive_kebab_drops_emoji_keeps_word() {
2052        let p = PathBuf::from("/tmp/🚀-rocket.md");
2053        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2054        assert_eq!(name, "rocket", "got '{name}'");
2055    }
2056
2057    #[test]
2058    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
2059        let p = PathBuf::from("/tmp/açaí🦜.md");
2060        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2061        assert_eq!(name, "acai", "got '{name}'");
2062    }
2063
2064    #[test]
2065    fn derive_kebab_pure_emoji_yields_empty() {
2066        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
2067        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2068        assert!(name.is_empty(), "got '{name}'");
2069    }
2070
2071    #[test]
2072    fn derive_kebab_collapses_consecutive_dashes() {
2073        let p = PathBuf::from("/tmp/a__b___c.md");
2074        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2075        assert_eq!(name, "a-b-c");
2076        assert!(!truncated);
2077        assert!(original.is_none());
2078    }
2079
2080    #[test]
2081    fn derive_kebab_truncates_to_60_chars() {
2082        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2083        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2084        assert!(name.len() <= 60, "got len {}", name.len());
2085        assert!(truncated);
2086        assert!(original.is_some());
2087        assert!(original.unwrap().len() > 60);
2088    }
2089
2090    #[test]
2091    fn collect_files_finds_md_files() {
2092        let tmp = tempfile::tempdir().expect("tempdir");
2093        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2094        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2095        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2096        let mut out = Vec::new();
2097        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2098        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2099    }
2100
2101    #[test]
2102    fn collect_files_recursive_descends_subdirs() {
2103        let tmp = tempfile::tempdir().expect("tempdir");
2104        let sub = tmp.path().join("sub");
2105        std::fs::create_dir(&sub).unwrap();
2106        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2107        std::fs::write(sub.join("b.md"), "y").unwrap();
2108        let mut out = Vec::new();
2109        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2110        assert_eq!(out.len(), 2);
2111    }
2112
2113    #[test]
2114    fn collect_files_non_recursive_skips_subdirs() {
2115        let tmp = tempfile::tempdir().expect("tempdir");
2116        let sub = tmp.path().join("sub");
2117        std::fs::create_dir(&sub).unwrap();
2118        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2119        std::fs::write(sub.join("b.md"), "y").unwrap();
2120        let mut out = Vec::new();
2121        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2122        assert_eq!(out.len(), 1);
2123    }
2124
2125    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
2126
2127    #[test]
2128    fn derive_kebab_long_basename_truncated_within_cap() {
2129        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2130        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2131        assert!(
2132            name.len() <= DERIVED_NAME_MAX_LEN,
2133            "truncated name must respect cap; got {} chars",
2134            name.len()
2135        );
2136        assert!(!name.is_empty());
2137        assert!(truncated);
2138        assert!(original.is_some());
2139    }
2140
2141    #[test]
2142    fn unique_name_returns_base_when_free() {
2143        let taken: BTreeSet<String> = BTreeSet::new();
2144        let resolved = unique_name("note", &taken).expect("must resolve");
2145        assert_eq!(resolved, "note");
2146    }
2147
2148    #[test]
2149    fn unique_name_appends_first_free_suffix_on_collision() {
2150        let mut taken: BTreeSet<String> = BTreeSet::new();
2151        taken.insert("note".to_string());
2152        taken.insert("note-1".to_string());
2153        let resolved = unique_name("note", &taken).expect("must resolve");
2154        assert_eq!(resolved, "note-2");
2155    }
2156
2157    #[test]
2158    fn unique_name_errors_after_collision_cap() {
2159        let mut taken: BTreeSet<String> = BTreeSet::new();
2160        taken.insert("note".to_string());
2161        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2162            taken.insert(format!("note-{i}"));
2163        }
2164        let err = unique_name("note", &taken).expect_err("must surface error");
2165        assert!(matches!(err, AppError::Validation(_)));
2166    }
2167
2168    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
2169
2170    #[test]
2171    fn validate_relation_format_accepts_valid_relations() {
2172        use crate::parsers::{is_canonical_relation, validate_relation_format};
2173        assert!(validate_relation_format("applies_to").is_ok());
2174        assert!(validate_relation_format("depends_on").is_ok());
2175        assert!(validate_relation_format("implements").is_ok());
2176        assert!(validate_relation_format("").is_err());
2177        assert!(is_canonical_relation("applies_to"));
2178        assert!(!is_canonical_relation("implements"));
2179    }
2180
2181    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
2182
2183    use serial_test::serial;
2184
2185    /// Helper: scrubs the env var around a closure to keep tests deterministic.
2186    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2187        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2188        let prev = std::env::var(key).ok();
2189        match value {
2190            Some(v) => std::env::set_var(key, v),
2191            None => std::env::remove_var(key),
2192        }
2193        f();
2194        match prev {
2195            Some(p) => std::env::set_var(key, p),
2196            None => std::env::remove_var(key),
2197        }
2198    }
2199
2200    #[test]
2201    #[serial]
2202    fn env_low_memory_enabled_unset_returns_false() {
2203        with_env_var(None, || assert!(!env_low_memory_enabled()));
2204    }
2205
2206    #[test]
2207    #[serial]
2208    fn env_low_memory_enabled_empty_returns_false() {
2209        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2210    }
2211
2212    #[test]
2213    #[serial]
2214    fn env_low_memory_enabled_truthy_values_return_true() {
2215        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2216            with_env_var(Some(v), || {
2217                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2218            });
2219        }
2220    }
2221
2222    #[test]
2223    #[serial]
2224    fn env_low_memory_enabled_falsy_values_return_false() {
2225        for v in ["0", "false", "FALSE", "no", "off"] {
2226            with_env_var(Some(v), || {
2227                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2228            });
2229        }
2230    }
2231
2232    #[test]
2233    #[serial]
2234    fn env_low_memory_enabled_unrecognized_value_returns_false() {
2235        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2236    }
2237
2238    #[test]
2239    #[serial]
2240    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2241        with_env_var(None, || {
2242            assert_eq!(resolve_parallelism(true, Some(4)), 1);
2243            assert_eq!(resolve_parallelism(true, Some(8)), 1);
2244            assert_eq!(resolve_parallelism(true, None), 1);
2245        });
2246    }
2247
2248    #[test]
2249    #[serial]
2250    fn resolve_parallelism_env_forces_one_when_flag_off() {
2251        with_env_var(Some("1"), || {
2252            assert_eq!(resolve_parallelism(false, Some(4)), 1);
2253            assert_eq!(resolve_parallelism(false, None), 1);
2254        });
2255    }
2256
2257    #[test]
2258    #[serial]
2259    fn resolve_parallelism_falsy_env_does_not_override() {
2260        with_env_var(Some("0"), || {
2261            assert_eq!(resolve_parallelism(false, Some(4)), 4);
2262        });
2263    }
2264
2265    #[test]
2266    #[serial]
2267    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2268        with_env_var(None, || {
2269            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2270            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2271        });
2272    }
2273
2274    #[test]
2275    #[serial]
2276    fn resolve_parallelism_default_when_unset() {
2277        with_env_var(None, || {
2278            let p = resolve_parallelism(false, None);
2279            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2280        });
2281    }
2282
2283    #[test]
2284    fn ingest_args_parses_low_memory_flag_via_clap() {
2285        use clap::Parser;
2286        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2287        // on the public `Cli` definition so the flag is wired end-to-end.
2288        let cli = crate::cli::Cli::try_parse_from([
2289            "sqlite-graphrag",
2290            "ingest",
2291            "/tmp/dummy",
2292            "--type",
2293            "document",
2294            "--low-memory",
2295        ])
2296        .expect("parse must succeed");
2297        match cli.command {
2298            Some(crate::cli::Commands::Ingest(args)) => {
2299                assert!(args.low_memory, "--low-memory must set field to true");
2300            }
2301            _ => panic!("expected Ingest subcommand"),
2302        }
2303    }
2304
2305    #[test]
2306    fn ingest_args_low_memory_defaults_false() {
2307        use clap::Parser;
2308        let cli = crate::cli::Cli::try_parse_from([
2309            "sqlite-graphrag",
2310            "ingest",
2311            "/tmp/dummy",
2312            "--type",
2313            "document",
2314        ])
2315        .expect("parse must succeed");
2316        match cli.command {
2317            Some(crate::cli::Commands::Ingest(args)) => {
2318                assert!(!args.low_memory, "default must be false");
2319            }
2320            _ => panic!("expected Ingest subcommand"),
2321        }
2322    }
2323}