Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118    )]
119    pub enable_ner: bool,
120
121    /// GAP-E2E-011: gera description heurística a partir da primeira linha
122    /// significativa do body, em vez de "ingested from <path>". Quando
123    /// `--no-auto-describe` é passado, mantém o comportamento legado.
124    #[arg(
125        long,
126        default_value_t = true,
127        overrides_with = "no_auto_describe",
128        help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129    )]
130    pub auto_describe: bool,
131    #[arg(
132        long = "no-auto-describe",
133        default_value_t = false,
134        help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135    )]
136    pub no_auto_describe: bool,
137    #[arg(
138        long,
139        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140        default_value = "fp32",
141        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142    )]
143    pub gliner_variant: String,
144
145    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
146    #[arg(long, default_value_t = false, hide = true)]
147    pub skip_extraction: bool,
148
149    /// Stop on first per-file error instead of continuing with the next file.
150    #[arg(long, default_value_t = false)]
151    pub fail_fast: bool,
152
153    /// Preview file-to-name mapping without loading model or persisting.
154    #[arg(long, default_value_t = false)]
155    pub dry_run: bool,
156
157    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
158    #[arg(long, default_value_t = 10_000)]
159    pub max_files: usize,
160
161    /// Namespace for the ingested memories.
162    #[arg(long)]
163    pub namespace: Option<String>,
164
165    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
166    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167    pub db: Option<String>,
168
169    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170    pub format: JsonOutputFormat,
171
172    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173    pub json: bool,
174
175    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
176    #[arg(
177        long,
178        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179    )]
180    pub ingest_parallelism: Option<usize>,
181
182    /// Force single-threaded ingest to reduce RSS pressure.
183    ///
184    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
185    /// explicit value. Recommended for environments with <4 GB available
186    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
187    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
188    /// (CLI flag has higher precedence than the env var).
189    #[arg(
190        long,
191        default_value_t = false,
192        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193                Recommended for environments with <4 GB available RAM or container/cgroup \
194                constraints. Trade-off: 3-4x longer wall time. Also honored via \
195                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196    )]
197    pub low_memory: bool,
198
199    /// Maximum process RSS in MiB; abort if exceeded during embedding.
200    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202    pub max_rss_mb: u64,
203
204    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
205    /// PER FILE. Multiplies with --ingest-parallelism (files staged
206    /// concurrently), hence the conservative default of 2. The effective
207    /// value is further bounded by CPU count and available RAM.
208    #[arg(long, default_value_t = 2, value_name = "N",
209          value_parser = clap::value_parser!(u64).range(1..=32),
210          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211    pub llm_parallelism: u64,
212
213    /// Maximum character length for derived memory names from file basenames.
214    ///
215    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
216    /// Shorter values leave more headroom for collision suffix resolution.
217    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218          help = "Maximum length for derived memory names (default: 60)")]
219    pub max_name_length: usize,
220
221    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
222    #[arg(long, value_enum, default_value_t = IngestMode::None)]
223    pub mode: IngestMode,
224
225    /// Explicit path to the Claude Code binary (only with --mode claude-code).
226    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227    pub claude_binary: Option<std::path::PathBuf>,
228
229    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
230    #[arg(long)]
231    pub claude_model: Option<String>,
232
233    /// Resume a previously interrupted claude-code ingest from the queue DB.
234    #[arg(long, default_value_t = false)]
235    pub resume: bool,
236
237    /// Retry only failed files from a previous claude-code ingest.
238    #[arg(long, default_value_t = false)]
239    pub retry_failed: bool,
240
241    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
242    #[arg(long, default_value_t = false)]
243    pub keep_queue: bool,
244
245    /// Custom path for the claude-code ingest queue database.
246    #[arg(long, default_value = ".ingest-queue.sqlite")]
247    pub queue_db: String,
248
249    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
250    #[arg(long, default_value_t = 60)]
251    pub rate_limit_wait: u64,
252
253    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
254    #[arg(long)]
255    pub max_cost_usd: Option<f64>,
256
257    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
258    #[arg(
259        long,
260        default_value_t = 300,
261        help = "Timeout in seconds for each claude -p invocation (default: 300)"
262    )]
263    pub claude_timeout: u64,
264
265    /// Explicit path to the Codex CLI binary (only with --mode codex).
266    #[arg(
267        long,
268        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270    )]
271    pub codex_binary: Option<PathBuf>,
272
273    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
274    #[arg(
275        long,
276        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277    )]
278    pub codex_model: Option<String>,
279
280    /// Timeout in seconds for each codex exec invocation.
281    #[arg(
282        long,
283        default_value_t = 300,
284        help = "Timeout in seconds for each codex exec invocation (default: 300)"
285    )]
286    pub codex_timeout: u64,
287
288    /// G30: poll for the job singleton every second for up to N seconds
289    /// when another invocation holds the lock. Default: 0 (fail fast).
290    #[arg(long, value_name = "SECONDS")]
291    pub wait_job_singleton: Option<u64>,
292
293    /// G30: force acquisition of the singleton lock by removing a stale
294    /// lock file from a previously crashed invocation.
295    #[arg(long, default_value_t = false)]
296    pub force_job_singleton: bool,
297}
298
299/// Extraction mode for the ingest pipeline.
300#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
301pub enum IngestMode {
302    /// Body-only ingestion without entity/relationship extraction (default).
303    None,
304    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
305    Gliner,
306    /// LLM-curated extraction via locally installed Claude Code CLI.
307    ClaudeCode,
308    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
309    Codex,
310}
311
312/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
313/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
314/// values evaluate to false. Unrecognized non-empty values emit a
315/// `tracing::warn!` and evaluate to false.
316fn env_low_memory_enabled() -> bool {
317    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
318        Ok(v) if v.is_empty() => false,
319        Ok(v) => match v.to_lowercase().as_str() {
320            "1" | "true" | "yes" | "on" => true,
321            "0" | "false" | "no" | "off" => false,
322            other => {
323                tracing::warn!(
324                    target: "ingest",
325                    value = %other,
326                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
327                );
328                false
329            }
330        },
331        Err(_) => false,
332    }
333}
334
335/// Resolves the effective ingest parallelism honoring `--low-memory` and the
336/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
337///
338/// Precedence:
339/// 1. `--low-memory` CLI flag forces parallelism = 1.
340/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
341/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
342/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
343///
344/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
345/// emits a `tracing::warn!` advertising the override.
346fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
347    let env_flag = env_low_memory_enabled();
348    let low_memory = low_memory_flag || env_flag;
349
350    if low_memory {
351        if let Some(n) = ingest_parallelism {
352            if n > 1 {
353                tracing::warn!(
354                    target: "ingest",
355                    requested = n,
356                    "--ingest-parallelism overridden by --low-memory; using 1"
357                );
358            }
359        }
360        if low_memory_flag {
361            tracing::info!(
362                target: "ingest",
363                source = "flag",
364                "low-memory mode enabled: forcing --ingest-parallelism 1"
365            );
366        } else {
367            tracing::info!(
368                target: "ingest",
369                source = "env",
370                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
371            );
372        }
373        return 1;
374    }
375
376    ingest_parallelism
377        .unwrap_or_else(|| {
378            std::thread::available_parallelism()
379                .map(|v| v.get() / 2)
380                .unwrap_or(1)
381                .clamp(1, 4)
382        })
383        .max(1)
384}
385
386#[derive(Serialize)]
387struct IngestFileEvent<'a> {
388    file: &'a str,
389    name: &'a str,
390    status: &'a str,
391    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
392    truncated: bool,
393    /// Original derived name before truncation; only present when `truncated=true`.
394    #[serde(skip_serializing_if = "Option::is_none")]
395    original_name: Option<String>,
396    /// Original file basename (without extension); only present when it differs from `name`.
397    #[serde(skip_serializing_if = "Option::is_none")]
398    original_filename: Option<&'a str>,
399    #[serde(skip_serializing_if = "Option::is_none")]
400    error: Option<String>,
401    #[serde(skip_serializing_if = "Option::is_none")]
402    memory_id: Option<i64>,
403    #[serde(skip_serializing_if = "Option::is_none")]
404    action: Option<String>,
405    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
406    body_length: usize,
407    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
408    /// executou o embedding live. `"claude" | "codex" | "none"`. Absent on
409    /// the wire when `None` (kept for happy-path envelope cleanliness, ou
410    /// quando o arquivo não chegou à fase de embed por duplicação/erro).
411    #[serde(skip_serializing_if = "Option::is_none")]
412    backend_invoked: Option<&'a str>,
413}
414
415#[derive(Serialize)]
416struct IngestSummary {
417    summary: bool,
418    dir: String,
419    pattern: String,
420    recursive: bool,
421    files_total: usize,
422    files_succeeded: usize,
423    files_failed: usize,
424    files_skipped: usize,
425    elapsed_ms: u64,
426}
427
428/// Outcome of a successful per-file ingest, used to build the NDJSON event.
429struct FileSuccess {
430    memory_id: i64,
431    action: String,
432    body_length: usize,
433    backend_invoked: Option<&'static str>,
434}
435
436/// NDJSON progress event emitted to stderr after each file completes Phase A.
437/// Schema version 1; consumers should check `schema_version` before parsing.
438#[derive(Serialize)]
439struct StageProgressEvent<'a> {
440    schema_version: u8,
441    event: &'a str,
442    path: &'a str,
443    ms: u64,
444    entities: usize,
445    relationships: usize,
446}
447
448/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
449/// Phase B persists these to SQLite on the main thread in submission order.
450struct StagedFile {
451    body: String,
452    body_hash: String,
453    snippet: String,
454    name: String,
455    description: String,
456    embedding: Vec<f32>,
457    chunk_embeddings: Option<Vec<Vec<f32>>>,
458    chunks_info: Vec<crate::chunking::Chunk>,
459    entities: Vec<NewEntity>,
460    relationships: Vec<NewRelationship>,
461    entity_embeddings: Vec<Vec<f32>>,
462    urls: Vec<crate::extraction::ExtractedUrl>,
463    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
464    /// executou o embedding do body. `None` quando o batch paralelo
465    /// embed_passages_parallel_local fallback em backends diferentes
466    /// entre chunks (não há um único discriminador estável).
467    backend_invoked: Option<&'static str>,
468}
469
470/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
471/// Never touches the database — safe to run on any rayon thread.
472// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
473// stage knobs into a struct is a wider refactor than the surgical
474// scope of v1.0.79 allows.
475#[allow(clippy::too_many_arguments)]
476fn stage_file(
477    _idx: usize,
478    path: &Path,
479    name: &str,
480    paths: &AppPaths,
481    enable_ner: bool,
482    gliner_variant: crate::extraction::GlinerVariant,
483    max_rss_mb: u64,
484    llm_parallelism: usize,
485    llm_backend: crate::cli::LlmBackendChoice,
486    auto_describe: bool,
487) -> Result<StagedFile, AppError> {
488    use crate::constants::*;
489
490    if name.len() > MAX_MEMORY_NAME_LEN {
491        return Err(AppError::LimitExceeded(
492            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
493        ));
494    }
495    if name.starts_with("__") {
496        return Err(AppError::Validation(
497            crate::i18n::validation::reserved_name(),
498        ));
499    }
500    {
501        let slug_re = crate::constants::name_slug_regex();
502        if !slug_re.is_match(name) {
503            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
504                name,
505            )));
506        }
507    }
508
509    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
510    if file_size > MAX_MEMORY_BODY_LEN as u64 {
511        return Err(AppError::LimitExceeded(
512            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
513        ));
514    }
515    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
516    if raw_body.len() > MAX_MEMORY_BODY_LEN {
517        return Err(AppError::LimitExceeded(
518            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
519        ));
520    }
521    if raw_body.trim().is_empty() {
522        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
523    }
524
525    let description = if auto_describe {
526        crate::commands::ingest_heuristics::extract_heuristic_description(
527            &raw_body,
528            Some(&path.display().to_string()),
529        )
530    } else {
531        format!("ingested from {}", path.display())
532    };
533    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
534        return Err(AppError::Validation(
535            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
536        ));
537    }
538
539    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
540    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
541    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
542    if enable_ner {
543        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
544            Ok(extracted) => {
545                extracted_urls = extracted.urls;
546                // v1.0.76: ExtractionResult.entities is now
547                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
548                // via name + type only; start/end offsets are not
549                // carried forward into the storage layer.
550                extracted_entities = extracted
551                    .entities
552                    .into_iter()
553                    .map(|e| NewEntity {
554                        name: e.name,
555                        entity_type: crate::entity_type::EntityType::Concept,
556                        description: None,
557                    })
558                    .collect();
559                // v1.0.76: relationships are no longer in the
560                // ExtractionResult struct; the LLM backend returns
561                // them in its own payload. The default build is
562                // URL-only extraction.
563                extracted_relationships.clear();
564
565                if extracted_entities.len() > max_entities_per_memory() {
566                    extracted_entities.truncate(max_entities_per_memory());
567                }
568                if extracted_relationships.len() > max_relationships_per_memory() {
569                    extracted_relationships.truncate(max_relationships_per_memory());
570                }
571            }
572            Err(e) => {
573                tracing::warn!(
574                    target: "ingest",
575                    file = %path.display(),
576                    "auto-extraction failed (graceful degradation): {e:#}"
577                );
578            }
579        }
580    }
581
582    for rel in &mut extracted_relationships {
583        rel.relation = crate::parsers::normalize_relation(&rel.relation);
584        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
585            return Err(AppError::Validation(format!(
586                "{e} for relationship '{}' -> '{}'",
587                rel.source, rel.target
588            )));
589        }
590        crate::parsers::warn_if_non_canonical(&rel.relation);
591        if !(0.0..=1.0).contains(&rel.strength) {
592            return Err(AppError::Validation(format!(
593                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
594                rel.strength, rel.source, rel.target
595            )));
596        }
597    }
598
599    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
600    let snippet: String = raw_body.chars().take(200).collect();
601
602    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
603    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
604        return Err(AppError::LimitExceeded(format!(
605            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
606            chunks_info.len(),
607            REMEMBER_MAX_SAFE_MULTI_CHUNKS
608        )));
609    }
610
611    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
612    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
613    // backend que efetivamente rodou para popular `backend_invoked` no
614    // envelope NDJSON por arquivo.
615    let (embedding, backend_invoked) = if chunks_info.len() == 1 {
616        // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
617        crate::embedder::embed_passage_with_choice(&paths.models, &raw_body, Some(llm_backend))
618            .map(|(v, k)| (v, Some(k.as_str())))?
619    } else {
620        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
621        // serial per-chunk subprocess loop.
622        let chunk_texts: Vec<String> = chunks_info
623            .iter()
624            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
625            .collect();
626        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
627            if rss > max_rss_mb {
628                tracing::error!(
629                    target: "ingest",
630                    rss_mb = rss,
631                    max_rss_mb = max_rss_mb,
632                    file = %path.display(),
633                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
634                );
635                return Err(AppError::LowMemory {
636                    available_mb: crate::memory_guard::available_memory_mb(),
637                    required_mb: max_rss_mb,
638                });
639            }
640        }
641        let chunk_embeddings = crate::embedder::embed_passages_parallel_local(
642            &paths.models,
643            &chunk_texts,
644            llm_parallelism,
645            crate::embedder::chunk_embed_batch_size(),
646        )?;
647        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
648        chunk_embeddings_opt = Some(chunk_embeddings);
649        // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
650        // único por chamada. Conservadoramente, populamos None aqui.
651        (aggregated, None)
652    };
653
654    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
655    let entity_texts: Vec<String> = extracted_entities
656        .iter()
657        .map(|entity| match &entity.description {
658            Some(desc) => format!("{} {}", entity.name, desc),
659            None => entity.name.clone(),
660        })
661        .collect();
662    // G56 (v1.0.80): ingest reuses canonical entity names across many
663    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
664    // cache collapses the repeated LLM calls into one per unique text.
665    let (entity_embeddings, embed_cache_stats) =
666        crate::embedder::embed_entity_texts_cached(&paths.models, &entity_texts, llm_parallelism)?;
667    if embed_cache_stats.hits > 0 {
668        tracing::debug!(
669            hits = embed_cache_stats.hits,
670            misses = embed_cache_stats.misses,
671            requested = embed_cache_stats.requested,
672            "G56: entity embed cache hit (ingest)"
673        );
674    }
675
676    Ok(StagedFile {
677        body: raw_body,
678        body_hash,
679        snippet,
680        name: name.to_string(),
681        description,
682        embedding,
683        chunk_embeddings: chunk_embeddings_opt,
684        chunks_info,
685        entities: extracted_entities,
686        relationships: extracted_relationships,
687        entity_embeddings,
688        urls: extracted_urls,
689        backend_invoked,
690    })
691}
692
693/// Phase B: persists one `StagedFile` to the database on the main thread.
694fn persist_staged(
695    conn: &mut Connection,
696    namespace: &str,
697    memory_type: &str,
698    staged: StagedFile,
699) -> Result<FileSuccess, AppError> {
700    {
701        let active_count: u32 = conn.query_row(
702            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
703            [],
704            |r| r.get::<_, i64>(0).map(|v| v as u32),
705        )?;
706        let ns_exists: bool = conn.query_row(
707            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
708            rusqlite::params![namespace],
709            |r| r.get::<_, i64>(0).map(|v| v > 0),
710        )?;
711        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
712            return Err(AppError::NamespaceError(format!(
713                "active namespace limit of {} exceeded while creating '{namespace}'",
714                crate::constants::MAX_NAMESPACES_ACTIVE
715            )));
716        }
717    }
718
719    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
720    if existing_memory.is_some() {
721        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
722            &staged.name,
723            namespace,
724        )));
725    }
726    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
727
728    let new_memory = NewMemory {
729        namespace: namespace.to_string(),
730        name: staged.name.clone(),
731        memory_type: memory_type.to_string(),
732        description: staged.description.clone(),
733        body: staged.body,
734        body_hash: staged.body_hash,
735        session_id: None,
736        source: "agent".to_string(),
737        metadata: serde_json::json!({}),
738    };
739
740    if let Some(hash_id) = duplicate_hash_id {
741        tracing::debug!(
742            target: "ingest",
743            duplicate_memory_id = hash_id,
744            "identical body already exists; persisting a new memory anyway"
745        );
746    }
747
748    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
749
750    let memory_id = memories::insert(&tx, &new_memory)?;
751    versions::insert_version(
752        &tx,
753        memory_id,
754        1,
755        &staged.name,
756        memory_type,
757        &staged.description,
758        &new_memory.body,
759        &serde_json::to_string(&new_memory.metadata)?,
760        None,
761        "create",
762    )?;
763    memories::upsert_vec(
764        &tx,
765        memory_id,
766        namespace,
767        memory_type,
768        &staged.embedding,
769        &staged.name,
770        &staged.snippet,
771    )?;
772
773    if staged.chunks_info.len() > 1 {
774        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
775        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
776            AppError::Internal(anyhow::anyhow!(
777                "missing chunk embeddings cache on multi-chunk ingest path"
778            ))
779        })?;
780        for (i, emb) in chunk_embeddings.iter().enumerate() {
781            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
782        }
783    }
784
785    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
786        for (idx, entity) in staged.entities.iter().enumerate() {
787            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
788            let entity_embedding = &staged.entity_embeddings[idx];
789            entities::upsert_entity_vec(
790                &tx,
791                entity_id,
792                namespace,
793                entity.entity_type,
794                entity_embedding,
795                &entity.name,
796            )?;
797            entities::link_memory_entity(&tx, memory_id, entity_id)?;
798            entities::increment_degree(&tx, entity_id)?;
799        }
800        let entity_types: std::collections::HashMap<&str, EntityType> = staged
801            .entities
802            .iter()
803            .map(|entity| (entity.name.as_str(), entity.entity_type))
804            .collect();
805        for rel in &staged.relationships {
806            let source_entity = NewEntity {
807                name: rel.source.clone(),
808                entity_type: entity_types
809                    .get(rel.source.as_str())
810                    .copied()
811                    .unwrap_or(EntityType::Concept),
812                description: None,
813            };
814            let target_entity = NewEntity {
815                name: rel.target.clone(),
816                entity_type: entity_types
817                    .get(rel.target.as_str())
818                    .copied()
819                    .unwrap_or(EntityType::Concept),
820                description: None,
821            };
822            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
823            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
824            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
825            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
826        }
827    }
828
829    tx.commit()?;
830
831    if !staged.urls.is_empty() {
832        let url_entries: Vec<storage_urls::MemoryUrl> = staged
833            .urls
834            .into_iter()
835            .map(|u| storage_urls::MemoryUrl {
836                url: u.url,
837                offset: Some(u.start as i64),
838            })
839            .collect();
840        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
841    }
842
843    Ok(FileSuccess {
844        memory_id,
845        action: "created".to_string(),
846        body_length: new_memory.body.len(),
847        backend_invoked: staged.backend_invoked,
848    })
849}
850
851// ---------------------------------------------------------------------------
852// G20: mode-conditional flag validation
853// ---------------------------------------------------------------------------
854
855/// True when a scalar value matches its declared default. Local
856/// re-declaration (also defined in ) to keep this module
857/// self-contained for the G20 fix.
858fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
859    value == default
860}
861
862/// G20: validate that flags for one LLM provider were not passed when
863/// the operator selected a different provider (or no provider). Flags
864/// silently discarded by the wrong mode are surfaced as
865///  BEFORE any DB work, so the operator gets
866/// an actionable error instead of a surprise at runtime.
867///
868/// Mode-specific matrices:
869/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
870///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
871///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
872///   --enable-ner is false)
873/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
874/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
875///   max_cost_usd, resume, retry_failed, keep_queue
876fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
877    const DEFAULT_TIMEOUT: u64 = 300;
878    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
879
880    let mut conflicts: Vec<String> = Vec::new();
881
882    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
883
884    if is_local_mode {
885        if args.claude_binary.is_some() {
886            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
887        }
888        if args.claude_model.is_some() {
889            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
890        }
891        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
892            conflicts.push(format!(
893                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
894                args.claude_timeout
895            ));
896        }
897        if args.codex_binary.is_some() {
898            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
899        }
900        if args.codex_model.is_some() {
901            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
902        }
903        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
904            conflicts.push(format!(
905                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
906                args.codex_timeout
907            ));
908        }
909        if args.max_cost_usd.is_some() {
910            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
911        }
912        if args.resume {
913            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
914        }
915        if args.retry_failed {
916            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
917        }
918        if args.keep_queue {
919            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
920        }
921        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
922            conflicts.push(format!(
923                "--rate-limit-wait={} is ignored when --mode is none or gliner",
924                args.rate_limit_wait
925            ));
926        }
927    }
928
929    match args.mode {
930        IngestMode::ClaudeCode => {
931            if args.codex_binary.is_some() {
932                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
933            }
934            if args.codex_model.is_some() {
935                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
936            }
937            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
938                conflicts.push(format!(
939                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
940                    args.codex_timeout
941                ));
942            }
943        }
944        IngestMode::Codex => {
945            if args.claude_binary.is_some() {
946                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
947            }
948            if args.claude_model.is_some() {
949                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
950            }
951            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
952                conflicts.push(format!(
953                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
954                    args.claude_timeout
955                ));
956            }
957            if args.max_cost_usd.is_some() {
958                conflicts.push(
959                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
960                        .to_string(),
961                );
962            }
963            if args.resume {
964                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
965            }
966            if args.retry_failed {
967                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
968            }
969            if args.keep_queue {
970                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
971            }
972        }
973        IngestMode::None | IngestMode::Gliner => {}
974    }
975
976    if !conflicts.is_empty() {
977        return Err(AppError::Validation(format!(
978            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
979            args.mode,
980            conflicts.join("\n  - ")
981        )));
982    }
983
984    Ok(())
985}
986
987// ---------------------------------------------------------------------------
988
989#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
990pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
991    // G20: mode-conditional flag validation BEFORE any DB access.
992    // Surfaces flags that the wrong mode would silently discard.
993    validate_mode_conditional_flags_ingest(&args)?;
994    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
995    if args.mode == IngestMode::ClaudeCode {
996        return super::ingest_claude::run_claude_ingest(&args);
997    }
998    if args.mode == IngestMode::Codex {
999        return super::ingest_codex::run_codex_ingest(&args);
1000    }
1001
1002    let started = std::time::Instant::now();
1003
1004    if !args.dir.exists() {
1005        return Err(AppError::Validation(format!(
1006            "directory not found: {}",
1007            args.dir.display()
1008        )));
1009    }
1010    if !args.dir.is_dir() {
1011        return Err(AppError::Validation(format!(
1012            "path is not a directory: {}",
1013            args.dir.display()
1014        )));
1015    }
1016
1017    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1018    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1019    files.sort_unstable();
1020
1021    if files.len() > args.max_files {
1022        return Err(AppError::Validation(format!(
1023            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1024            files.len(),
1025            args.max_files
1026        )));
1027    }
1028
1029    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1030    let memory_type_str = args.r#type.as_str().to_string();
1031
1032    let paths = AppPaths::resolve(args.db.as_deref())?;
1033    let mut conn_or_err = match init_storage(&paths) {
1034        Ok(c) => Ok(c),
1035        Err(e) => Err(format!("{e}")),
1036    };
1037
1038    let mut succeeded: usize = 0;
1039    let mut failed: usize = 0;
1040    let mut skipped: usize = 0;
1041    let total = files.len();
1042
1043    // Pre-resolve all names before parallelisation so Phase A workers see a
1044    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1045    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1046
1047    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1048    // ProcessItem: the data moved into the producer thread for Phase A computation.
1049    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1050    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1051    // boundary into the rayon producer.
1052    enum SlotMeta {
1053        Skip {
1054            file_str: String,
1055            derived_base: String,
1056            name_truncated: bool,
1057            original_name: Option<String>,
1058            original_filename: Option<String>,
1059            reason: String,
1060        },
1061        Process {
1062            file_str: String,
1063            derived_name: String,
1064            name_truncated: bool,
1065            original_name: Option<String>,
1066            original_filename: Option<String>,
1067        },
1068    }
1069
1070    struct ProcessItem {
1071        idx: usize,
1072        path: PathBuf,
1073        file_str: String,
1074        derived_name: String,
1075    }
1076
1077    let files_cap = files.len();
1078    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1079    slots_meta.try_reserve(files_cap).map_err(|_| {
1080        AppError::LimitExceeded(format!(
1081            "allocation of {files_cap} slot metadata entries would exceed available memory"
1082        ))
1083    })?;
1084    let mut process_items: Vec<ProcessItem> = Vec::new();
1085    process_items.try_reserve(files_cap).map_err(|_| {
1086        AppError::LimitExceeded(format!(
1087            "allocation of {files_cap} process items would exceed available memory"
1088        ))
1089    })?;
1090    let mut truncations: Vec<(String, String)> = Vec::new();
1091    truncations.try_reserve(files_cap).map_err(|_| {
1092        AppError::LimitExceeded(format!(
1093            "allocation of {files_cap} truncation entries would exceed available memory"
1094        ))
1095    })?;
1096
1097    let max_name_length = args.max_name_length;
1098    for path in &files {
1099        let file_str = path.to_string_lossy().into_owned();
1100        let (derived_base, name_truncated, original_name) =
1101            derive_kebab_name(path, max_name_length);
1102        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1103
1104        if name_truncated {
1105            if let Some(ref orig) = original_name {
1106                truncations.push((orig.clone(), derived_base.clone()));
1107            }
1108        }
1109
1110        if derived_base.is_empty() {
1111            // original_filename: always include when it differs from the empty derived name
1112            let orig_filename = if !original_basename.is_empty() {
1113                Some(original_basename.to_string())
1114            } else {
1115                None
1116            };
1117            slots_meta.push(SlotMeta::Skip {
1118                file_str,
1119                derived_base: String::new(),
1120                name_truncated: false,
1121                original_name: None,
1122                original_filename: orig_filename,
1123                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1124            });
1125            continue;
1126        }
1127
1128        match unique_name(&derived_base, &taken_names) {
1129            Ok(derived_name) => {
1130                taken_names.insert(derived_name.clone());
1131                let idx = slots_meta.len();
1132                // original_filename: present only when the raw basename differs from the derived name
1133                let orig_filename = if original_basename != derived_name {
1134                    Some(original_basename.to_string())
1135                } else {
1136                    None
1137                };
1138                process_items.push(ProcessItem {
1139                    idx,
1140                    path: path.clone(),
1141                    file_str: file_str.clone(),
1142                    derived_name: derived_name.clone(),
1143                });
1144                slots_meta.push(SlotMeta::Process {
1145                    file_str,
1146                    derived_name,
1147                    name_truncated,
1148                    original_name,
1149                    original_filename: orig_filename,
1150                });
1151            }
1152            Err(e) => {
1153                let orig_filename = if original_basename != derived_base {
1154                    Some(original_basename.to_string())
1155                } else {
1156                    None
1157                };
1158                slots_meta.push(SlotMeta::Skip {
1159                    file_str,
1160                    derived_base,
1161                    name_truncated,
1162                    original_name,
1163                    original_filename: orig_filename,
1164                    reason: e.to_string(),
1165                });
1166            }
1167        }
1168    }
1169
1170    if !truncations.is_empty() {
1171        tracing::info!(
1172            target: "ingest",
1173            count = truncations.len(),
1174            max_name_length = max_name_length,
1175            max_len = DERIVED_NAME_MAX_LEN,
1176            "derived names truncated; pass -vv (debug) for per-file detail"
1177        );
1178    }
1179
1180    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1181    if args.dry_run {
1182        for meta in &slots_meta {
1183            match meta {
1184                SlotMeta::Skip {
1185                    file_str,
1186                    derived_base,
1187                    name_truncated,
1188                    original_name,
1189                    original_filename,
1190                    reason,
1191                } => {
1192                    output::emit_json_compact(&IngestFileEvent {
1193                        file: file_str,
1194                        name: derived_base,
1195                        status: "skip",
1196                        truncated: *name_truncated,
1197                        original_name: original_name.clone(),
1198                        original_filename: original_filename.as_deref(),
1199                        error: Some(reason.clone()),
1200                        memory_id: None,
1201                        action: None,
1202                        body_length: 0,
1203                        backend_invoked: None,
1204                    })?;
1205                }
1206                SlotMeta::Process {
1207                    file_str,
1208                    derived_name,
1209                    name_truncated,
1210                    original_name,
1211                    original_filename,
1212                } => {
1213                    output::emit_json_compact(&IngestFileEvent {
1214                        file: file_str,
1215                        name: derived_name,
1216                        status: "preview",
1217                        truncated: *name_truncated,
1218                        original_name: original_name.clone(),
1219                        original_filename: original_filename.as_deref(),
1220                        error: None,
1221                        memory_id: None,
1222                        action: None,
1223                        body_length: 0,
1224                        backend_invoked: None,
1225                    })?;
1226                }
1227            }
1228        }
1229        output::emit_json_compact(&IngestSummary {
1230            summary: true,
1231            dir: args.dir.to_string_lossy().into_owned(),
1232            pattern: args.pattern.clone(),
1233            recursive: args.recursive,
1234            files_total: total,
1235            files_succeeded: 0,
1236            files_failed: 0,
1237            files_skipped: 0,
1238            elapsed_ms: started.elapsed().as_millis() as u64,
1239        })?;
1240        return Ok(());
1241    }
1242
1243    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1244    if args.low_memory {
1245        if let Some(n) = args.ingest_parallelism {
1246            if n > 1 {
1247                return Err(AppError::Validation(
1248                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1249                        .to_string(),
1250                ));
1251            }
1252        }
1253    }
1254
1255    // Determine rayon thread pool size, honoring --low-memory and the
1256    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1257    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1258
1259    let pool = rayon::ThreadPoolBuilder::new()
1260        .num_threads(parallelism)
1261        .build()
1262        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1263
1264    if args.enable_ner && args.skip_extraction {
1265        return Err(AppError::Validation(
1266            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1267        ));
1268    }
1269    if args.skip_extraction && !args.enable_ner {
1270        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1271        // commit (9ddb17b) promoted this to a hard validation error, which
1272        // broke the "kept as a hidden no-op for backwards compatibility"
1273        // promise documented in CHANGELOG v1.0.45 and started failing
1274        // 5+ CI jobs whose E2E tests use this flag to skip the
1275        // GLiNER-ONNX model download in CI environments.
1276        tracing::warn!(
1277            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1278        );
1279    }
1280    let enable_ner = args.enable_ner;
1281    let auto_describe = args.auto_describe && !args.no_auto_describe;
1282    let max_rss_mb = args.max_rss_mb;
1283    let llm_parallelism = args.llm_parallelism as usize;
1284    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1285    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1286    // callers do not silently expect NER-quality extraction.
1287    if args.mode == IngestMode::Gliner {
1288        tracing::warn!(
1289            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1290        );
1291    }
1292    if args.gliner_variant != "fp32" {
1293        tracing::warn!(
1294            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1295        );
1296    }
1297    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1298        "int8" => crate::extraction::GlinerVariant::Int8,
1299        _ => crate::extraction::GlinerVariant::Fp32,
1300    };
1301
1302    let total_to_process = process_items.len();
1303    tracing::info!(
1304        target: "ingest",
1305        phase = "pipeline_start",
1306        files = total_to_process,
1307        ingest_parallelism = parallelism,
1308        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1309    );
1310
1311    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1312    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1313    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1314    let channel_bound = (parallelism * 2).max(1);
1315    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1316
1317    // Phase A: launched in a dedicated OS thread so the main thread can consume
1318    // the channel concurrently. pool.install() blocks the calling thread until
1319    // all rayon workers finish — if called on the main thread it would
1320    // reintroduce the 2-phase blocking behaviour we are eliminating.
1321    let paths_owned = paths.clone();
1322    let llm_backend_owned = llm_backend;
1323    let producer_handle = std::thread::spawn(move || {
1324        pool.install(|| {
1325            process_items.into_par_iter().for_each(|item| {
1326                if crate::shutdown_requested() {
1327                    return;
1328                }
1329                let t0 = std::time::Instant::now();
1330                let result = stage_file(
1331                    item.idx,
1332                    &item.path,
1333                    &item.derived_name,
1334                    &paths_owned,
1335                    enable_ner,
1336                    gliner_variant,
1337                    max_rss_mb,
1338                    llm_parallelism,
1339                    llm_backend_owned,
1340                    auto_describe,
1341                );
1342                let elapsed_ms = t0.elapsed().as_millis() as u64;
1343
1344                // Emit NDJSON progress event to stderr so the user sees work
1345                // happening during long NER runs (e.g. 50 files × 27s each).
1346                let (n_entities, n_relationships) = match &result {
1347                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1348                    Err(_) => (0, 0),
1349                };
1350                let progress = StageProgressEvent {
1351                    schema_version: 1,
1352                    event: "file_extracted",
1353                    path: &item.file_str,
1354                    ms: elapsed_ms,
1355                    entities: n_entities,
1356                    relationships: n_relationships,
1357                };
1358                if let Ok(line) = serde_json::to_string(&progress) {
1359                    tracing::info!(target: "ingest_progress", "{}", line);
1360                }
1361
1362                // Blocking send applies backpressure: if Phase B is slower,
1363                // Phase A workers wait here instead of accumulating staged files
1364                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1365                let _ = tx.send((item.idx, result));
1366            });
1367            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1368            drop(tx);
1369        });
1370    });
1371
1372    // Phase B: main thread persists files as results arrive from the channel.
1373    // Results arrive in completion order (par_iter is unordered). We persist
1374    // each file immediately on arrival — this is the key fix for B1: with the
1375    // old 2-phase design the first DB write happened only after ALL files had
1376    // finished Phase A. Now the first commit happens as soon as the first file
1377    // completes Phase A, regardless of how many files remain.
1378    //
1379    // NDJSON output order follows completion order (not file-system sort order).
1380    // Skip slots are emitted at the end, after all Process results are consumed.
1381    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1382    // requirement than ensuring data is persisted before the user's timeout fires.
1383    let fail_fast = args.fail_fast;
1384
1385    // Emit pending Skip events first so agents see them early.
1386    for meta in &slots_meta {
1387        if let SlotMeta::Skip {
1388            file_str,
1389            derived_base,
1390            name_truncated,
1391            original_name,
1392            original_filename,
1393            reason,
1394        } = meta
1395        {
1396            output::emit_json_compact(&IngestFileEvent {
1397                file: file_str,
1398                name: derived_base,
1399                status: "skipped",
1400                truncated: *name_truncated,
1401                original_name: original_name.clone(),
1402                original_filename: original_filename.as_deref(),
1403                error: Some(reason.clone()),
1404                memory_id: None,
1405                action: None,
1406                body_length: 0,
1407                backend_invoked: None,
1408            })?;
1409            skipped += 1;
1410        }
1411    }
1412
1413    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1414    // as channel messages arrive in completion order.
1415    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1416        .iter()
1417        .enumerate()
1418        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1419        .collect();
1420
1421    tracing::info!(
1422        target: "ingest",
1423        phase = "persist_start",
1424        files = total_to_process,
1425        "phase B starting: persisting files incrementally as Phase A completes each one",
1426    );
1427
1428    // Drain channel and persist each file immediately — no accumulation into a
1429    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1430    // Phase B without applying backpressure.
1431    for (idx, stage_result) in rx {
1432        if crate::shutdown_requested() {
1433            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1434            break;
1435        }
1436        let meta = meta_index.get(&idx).ok_or_else(|| {
1437            AppError::Internal(anyhow::anyhow!(
1438                "channel idx {idx} has no corresponding Process slot"
1439            ))
1440        })?;
1441        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1442        {
1443            SlotMeta::Process {
1444                file_str,
1445                derived_name,
1446                name_truncated,
1447                original_name,
1448                original_filename,
1449            } => (
1450                file_str,
1451                derived_name,
1452                name_truncated,
1453                original_name,
1454                original_filename,
1455            ),
1456            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1457        };
1458
1459        // If storage init failed, every file fails with the same error.
1460        let conn = match conn_or_err.as_mut() {
1461            Ok(c) => c,
1462            Err(err_msg) => {
1463                let err_clone = err_msg.clone();
1464                output::emit_json_compact(&IngestFileEvent {
1465                    file: file_str,
1466                    name: derived_name,
1467                    status: "failed",
1468                    truncated: *name_truncated,
1469                    original_name: original_name.clone(),
1470                    original_filename: original_filename.as_deref(),
1471                    error: Some(err_clone.clone()),
1472                    memory_id: None,
1473                    action: None,
1474                    body_length: 0,
1475                    backend_invoked: None,
1476                })?;
1477                failed += 1;
1478                if fail_fast {
1479                    output::emit_json_compact(&IngestSummary {
1480                        summary: true,
1481                        dir: args.dir.display().to_string(),
1482                        pattern: args.pattern.clone(),
1483                        recursive: args.recursive,
1484                        files_total: total,
1485                        files_succeeded: succeeded,
1486                        files_failed: failed,
1487                        files_skipped: skipped,
1488                        elapsed_ms: started.elapsed().as_millis() as u64,
1489                    })?;
1490                    return Err(AppError::Validation(format!(
1491                        "ingest aborted on first failure: {err_clone}"
1492                    )));
1493                }
1494                continue;
1495            }
1496        };
1497
1498        let outcome =
1499            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1500
1501        match outcome {
1502            Ok(FileSuccess {
1503                memory_id,
1504                action,
1505                body_length,
1506                backend_invoked: file_backend_invoked,
1507            }) => {
1508                output::emit_json_compact(&IngestFileEvent {
1509                    file: file_str,
1510                    name: derived_name,
1511                    status: "indexed",
1512                    truncated: *name_truncated,
1513                    original_name: original_name.clone(),
1514                    original_filename: original_filename.as_deref(),
1515                    error: None,
1516                    memory_id: Some(memory_id),
1517                    action: Some(action),
1518                    body_length,
1519                    backend_invoked: file_backend_invoked,
1520                })?;
1521                succeeded += 1;
1522            }
1523            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1524                output::emit_json_compact(&IngestFileEvent {
1525                    file: file_str,
1526                    name: derived_name,
1527                    status: "skipped",
1528                    truncated: *name_truncated,
1529                    original_name: original_name.clone(),
1530                    original_filename: original_filename.as_deref(),
1531                    error: Some(format!("{e}")),
1532                    memory_id: None,
1533                    action: Some("duplicate".to_string()),
1534                    body_length: 0,
1535                    backend_invoked: None,
1536                })?;
1537                skipped += 1;
1538            }
1539            Err(e) => {
1540                let err_msg = format!("{e}");
1541                output::emit_json_compact(&IngestFileEvent {
1542                    file: file_str,
1543                    name: derived_name,
1544                    status: "failed",
1545                    truncated: *name_truncated,
1546                    original_name: original_name.clone(),
1547                    original_filename: original_filename.as_deref(),
1548                    error: Some(err_msg.clone()),
1549                    memory_id: None,
1550                    action: None,
1551                    body_length: 0,
1552                    backend_invoked: None,
1553                })?;
1554                failed += 1;
1555                if fail_fast {
1556                    output::emit_json_compact(&IngestSummary {
1557                        summary: true,
1558                        dir: args.dir.display().to_string(),
1559                        pattern: args.pattern.clone(),
1560                        recursive: args.recursive,
1561                        files_total: total,
1562                        files_succeeded: succeeded,
1563                        files_failed: failed,
1564                        files_skipped: skipped,
1565                        elapsed_ms: started.elapsed().as_millis() as u64,
1566                    })?;
1567                    return Err(AppError::Validation(format!(
1568                        "ingest aborted on first failure: {err_msg}"
1569                    )));
1570                }
1571            }
1572        }
1573    }
1574
1575    // Wait for the producer thread to finish cleanly.
1576    producer_handle
1577        .join()
1578        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1579
1580    if let Ok(ref conn) = conn_or_err {
1581        if succeeded > 0 {
1582            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1583        }
1584    }
1585
1586    output::emit_json_compact(&IngestSummary {
1587        summary: true,
1588        dir: args.dir.display().to_string(),
1589        pattern: args.pattern.clone(),
1590        recursive: args.recursive,
1591        files_total: total,
1592        files_succeeded: succeeded,
1593        files_failed: failed,
1594        files_skipped: skipped,
1595        elapsed_ms: started.elapsed().as_millis() as u64,
1596    })?;
1597
1598    Ok(())
1599}
1600
1601/// Auto-initialises the database (matches the contract of every other CRUD
1602/// handler) and returns a fresh read/write connection ready for the ingest
1603/// loop. Errors here are recoverable per-file: the caller surfaces them as
1604/// failure events so `--fail-fast` and the continue-on-error path keep
1605/// working when, for example, the user points `--db` at an unwritable path.
1606fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1607    ensure_db_ready(paths)?;
1608    let conn = open_rw(&paths.db)?;
1609    Ok(conn)
1610}
1611
1612pub(crate) fn collect_files(
1613    dir: &Path,
1614    pattern: &str,
1615    recursive: bool,
1616    out: &mut Vec<PathBuf>,
1617) -> Result<(), AppError> {
1618    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1619    for entry in entries {
1620        let entry = entry.map_err(AppError::Io)?;
1621        let path = entry.path();
1622        let file_type = entry.file_type().map_err(AppError::Io)?;
1623        if file_type.is_file() {
1624            let name = entry.file_name();
1625            let name_str = name.to_string_lossy();
1626            if matches_pattern(&name_str, pattern) {
1627                out.push(path);
1628            }
1629        } else if file_type.is_dir() && recursive {
1630            collect_files(&path, pattern, recursive, out)?;
1631        }
1632    }
1633    Ok(())
1634}
1635
1636fn matches_pattern(name: &str, pattern: &str) -> bool {
1637    if let Some(suffix) = pattern.strip_prefix('*') {
1638        name.ends_with(suffix)
1639    } else if let Some(prefix) = pattern.strip_suffix('*') {
1640        name.starts_with(prefix)
1641    } else {
1642        name == pattern
1643    }
1644}
1645
1646/// Returns `(final_name, truncated, original_name)`.
1647/// `truncated` is true when the derived name exceeded `max_len`.
1648/// `original_name` holds the pre-truncation name only when `truncated=true`.
1649///
1650/// Non-ASCII characters are first decomposed via NFD and then stripped of
1651/// combining marks so accented letters fold to their base ASCII letter
1652/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1653/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1654/// preserves meaningful word content rather than collapsing the basename
1655/// to a few stray ASCII letters as the previous filter did.
1656pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1657    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1658    let lowered: String = stem
1659        .nfd()
1660        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1661        .map(|c| {
1662            if c == '_' || c.is_whitespace() {
1663                '-'
1664            } else {
1665                c
1666            }
1667        })
1668        .map(|c| c.to_ascii_lowercase())
1669        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1670        .collect();
1671    let collapsed = collapse_dashes(&lowered);
1672    let trimmed_raw = collapsed.trim_matches('-').to_string();
1673    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1674    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1675        format!("doc-{trimmed_raw}")
1676    } else {
1677        trimmed_raw
1678    };
1679    if trimmed.len() > max_len {
1680        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1681        tracing::debug!(
1682            target: "ingest",
1683            original = %trimmed,
1684            truncated_to = %truncated,
1685            max_len = max_len,
1686            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1687        );
1688        (truncated, true, Some(trimmed))
1689    } else {
1690        (trimmed, false, None)
1691    }
1692}
1693
1694/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1695/// numeric suffix (`-1`, `-2`, …) when needed.
1696///
1697/// `taken` is the set of names already consumed in the current ingest run.
1698/// The caller is expected to insert the returned name into `taken` so the
1699/// next call observes the consumption. Cross-run collisions are intentionally
1700/// surfaced by the per-file persistence path as duplicates so re-ingestion
1701/// of identical corpora stays idempotent.
1702///
1703/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1704/// candidates collide, signalling a pathological corpus that should be
1705/// renamed manually.
1706fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1707    if !taken.contains(base) {
1708        return Ok(base.to_string());
1709    }
1710    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1711        let candidate = format!("{base}-{suffix}");
1712        if !taken.contains(&candidate) {
1713            tracing::warn!(
1714                target: "ingest",
1715                base = %base,
1716                resolved = %candidate,
1717                suffix,
1718                "memory name collision resolved with numeric suffix"
1719            );
1720            return Ok(candidate);
1721        }
1722    }
1723    Err(AppError::Validation(format!(
1724        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1725    )))
1726}
1727
1728fn collapse_dashes(s: &str) -> String {
1729    let mut out = String::with_capacity(s.len());
1730    let mut prev_dash = false;
1731    for c in s.chars() {
1732        if c == '-' {
1733            if !prev_dash {
1734                out.push('-');
1735            }
1736            prev_dash = true;
1737        } else {
1738            out.push(c);
1739            prev_dash = false;
1740        }
1741    }
1742    out
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747    use super::*;
1748    use std::path::PathBuf;
1749
1750    #[test]
1751    fn matches_pattern_suffix() {
1752        assert!(matches_pattern("foo.md", "*.md"));
1753        assert!(!matches_pattern("foo.txt", "*.md"));
1754        assert!(matches_pattern("foo.md", "*"));
1755    }
1756
1757    #[test]
1758    fn matches_pattern_prefix() {
1759        assert!(matches_pattern("README.md", "README*"));
1760        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1761    }
1762
1763    #[test]
1764    fn matches_pattern_exact() {
1765        assert!(matches_pattern("README.md", "README.md"));
1766        assert!(!matches_pattern("readme.md", "README.md"));
1767    }
1768
1769    #[test]
1770    fn derive_kebab_underscore_to_dash() {
1771        let p = PathBuf::from("/tmp/claude_code_headless.md");
1772        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1773        assert_eq!(name, "claude-code-headless");
1774        assert!(!truncated);
1775        assert!(original.is_none());
1776    }
1777
1778    #[test]
1779    fn derive_kebab_uppercase_lowered() {
1780        let p = PathBuf::from("/tmp/README.md");
1781        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1782        assert_eq!(name, "readme");
1783        assert!(!truncated);
1784        assert!(original.is_none());
1785    }
1786
1787    #[test]
1788    fn derive_kebab_strips_non_kebab_chars() {
1789        let p = PathBuf::from("/tmp/some@weird#name!.md");
1790        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1791        assert_eq!(name, "someweirdname");
1792        assert!(!truncated);
1793        assert!(original.is_none());
1794    }
1795
1796    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1797    // accented characters instead of dropping them entirely.
1798    #[test]
1799    fn derive_kebab_folds_accented_letters_to_ascii() {
1800        let p = PathBuf::from("/tmp/açaí.md");
1801        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1802        assert_eq!(name, "acai", "got '{name}'");
1803    }
1804
1805    #[test]
1806    fn derive_kebab_handles_naive_with_diaeresis() {
1807        let p = PathBuf::from("/tmp/naïve-test.md");
1808        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1809        assert_eq!(name, "naive-test", "got '{name}'");
1810    }
1811
1812    #[test]
1813    fn derive_kebab_drops_emoji_keeps_word() {
1814        let p = PathBuf::from("/tmp/🚀-rocket.md");
1815        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1816        assert_eq!(name, "rocket", "got '{name}'");
1817    }
1818
1819    #[test]
1820    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1821        let p = PathBuf::from("/tmp/açaí🦜.md");
1822        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1823        assert_eq!(name, "acai", "got '{name}'");
1824    }
1825
1826    #[test]
1827    fn derive_kebab_pure_emoji_yields_empty() {
1828        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1829        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1830        assert!(name.is_empty(), "got '{name}'");
1831    }
1832
1833    #[test]
1834    fn derive_kebab_collapses_consecutive_dashes() {
1835        let p = PathBuf::from("/tmp/a__b___c.md");
1836        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1837        assert_eq!(name, "a-b-c");
1838        assert!(!truncated);
1839        assert!(original.is_none());
1840    }
1841
1842    #[test]
1843    fn derive_kebab_truncates_to_60_chars() {
1844        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1845        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1846        assert!(name.len() <= 60, "got len {}", name.len());
1847        assert!(truncated);
1848        assert!(original.is_some());
1849        assert!(original.unwrap().len() > 60);
1850    }
1851
1852    #[test]
1853    fn collect_files_finds_md_files() {
1854        let tmp = tempfile::tempdir().expect("tempdir");
1855        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1856        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1857        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1858        let mut out = Vec::new();
1859        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1860        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1861    }
1862
1863    #[test]
1864    fn collect_files_recursive_descends_subdirs() {
1865        let tmp = tempfile::tempdir().expect("tempdir");
1866        let sub = tmp.path().join("sub");
1867        std::fs::create_dir(&sub).unwrap();
1868        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1869        std::fs::write(sub.join("b.md"), "y").unwrap();
1870        let mut out = Vec::new();
1871        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1872        assert_eq!(out.len(), 2);
1873    }
1874
1875    #[test]
1876    fn collect_files_non_recursive_skips_subdirs() {
1877        let tmp = tempfile::tempdir().expect("tempdir");
1878        let sub = tmp.path().join("sub");
1879        std::fs::create_dir(&sub).unwrap();
1880        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1881        std::fs::write(sub.join("b.md"), "y").unwrap();
1882        let mut out = Vec::new();
1883        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1884        assert_eq!(out.len(), 1);
1885    }
1886
1887    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1888
1889    #[test]
1890    fn derive_kebab_long_basename_truncated_within_cap() {
1891        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1892        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1893        assert!(
1894            name.len() <= DERIVED_NAME_MAX_LEN,
1895            "truncated name must respect cap; got {} chars",
1896            name.len()
1897        );
1898        assert!(!name.is_empty());
1899        assert!(truncated);
1900        assert!(original.is_some());
1901    }
1902
1903    #[test]
1904    fn unique_name_returns_base_when_free() {
1905        let taken: BTreeSet<String> = BTreeSet::new();
1906        let resolved = unique_name("note", &taken).expect("must resolve");
1907        assert_eq!(resolved, "note");
1908    }
1909
1910    #[test]
1911    fn unique_name_appends_first_free_suffix_on_collision() {
1912        let mut taken: BTreeSet<String> = BTreeSet::new();
1913        taken.insert("note".to_string());
1914        taken.insert("note-1".to_string());
1915        let resolved = unique_name("note", &taken).expect("must resolve");
1916        assert_eq!(resolved, "note-2");
1917    }
1918
1919    #[test]
1920    fn unique_name_errors_after_collision_cap() {
1921        let mut taken: BTreeSet<String> = BTreeSet::new();
1922        taken.insert("note".to_string());
1923        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1924            taken.insert(format!("note-{i}"));
1925        }
1926        let err = unique_name("note", &taken).expect_err("must surface error");
1927        assert!(matches!(err, AppError::Validation(_)));
1928    }
1929
1930    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1931
1932    #[test]
1933    fn validate_relation_format_accepts_valid_relations() {
1934        use crate::parsers::{is_canonical_relation, validate_relation_format};
1935        assert!(validate_relation_format("applies_to").is_ok());
1936        assert!(validate_relation_format("depends_on").is_ok());
1937        assert!(validate_relation_format("implements").is_ok());
1938        assert!(validate_relation_format("").is_err());
1939        assert!(is_canonical_relation("applies_to"));
1940        assert!(!is_canonical_relation("implements"));
1941    }
1942
1943    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1944
1945    use serial_test::serial;
1946
1947    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1948    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1949        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1950        let prev = std::env::var(key).ok();
1951        match value {
1952            Some(v) => std::env::set_var(key, v),
1953            None => std::env::remove_var(key),
1954        }
1955        f();
1956        match prev {
1957            Some(p) => std::env::set_var(key, p),
1958            None => std::env::remove_var(key),
1959        }
1960    }
1961
1962    #[test]
1963    #[serial]
1964    fn env_low_memory_enabled_unset_returns_false() {
1965        with_env_var(None, || assert!(!env_low_memory_enabled()));
1966    }
1967
1968    #[test]
1969    #[serial]
1970    fn env_low_memory_enabled_empty_returns_false() {
1971        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1972    }
1973
1974    #[test]
1975    #[serial]
1976    fn env_low_memory_enabled_truthy_values_return_true() {
1977        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1978            with_env_var(Some(v), || {
1979                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1980            });
1981        }
1982    }
1983
1984    #[test]
1985    #[serial]
1986    fn env_low_memory_enabled_falsy_values_return_false() {
1987        for v in ["0", "false", "FALSE", "no", "off"] {
1988            with_env_var(Some(v), || {
1989                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1990            });
1991        }
1992    }
1993
1994    #[test]
1995    #[serial]
1996    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1997        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1998    }
1999
2000    #[test]
2001    #[serial]
2002    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2003        with_env_var(None, || {
2004            assert_eq!(resolve_parallelism(true, Some(4)), 1);
2005            assert_eq!(resolve_parallelism(true, Some(8)), 1);
2006            assert_eq!(resolve_parallelism(true, None), 1);
2007        });
2008    }
2009
2010    #[test]
2011    #[serial]
2012    fn resolve_parallelism_env_forces_one_when_flag_off() {
2013        with_env_var(Some("1"), || {
2014            assert_eq!(resolve_parallelism(false, Some(4)), 1);
2015            assert_eq!(resolve_parallelism(false, None), 1);
2016        });
2017    }
2018
2019    #[test]
2020    #[serial]
2021    fn resolve_parallelism_falsy_env_does_not_override() {
2022        with_env_var(Some("0"), || {
2023            assert_eq!(resolve_parallelism(false, Some(4)), 4);
2024        });
2025    }
2026
2027    #[test]
2028    #[serial]
2029    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2030        with_env_var(None, || {
2031            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2032            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2033        });
2034    }
2035
2036    #[test]
2037    #[serial]
2038    fn resolve_parallelism_default_when_unset() {
2039        with_env_var(None, || {
2040            let p = resolve_parallelism(false, None);
2041            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2042        });
2043    }
2044
2045    #[test]
2046    fn ingest_args_parses_low_memory_flag_via_clap() {
2047        use clap::Parser;
2048        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2049        // on the public `Cli` definition so the flag is wired end-to-end.
2050        let cli = crate::cli::Cli::try_parse_from([
2051            "sqlite-graphrag",
2052            "ingest",
2053            "/tmp/dummy",
2054            "--type",
2055            "document",
2056            "--low-memory",
2057        ])
2058        .expect("parse must succeed");
2059        match cli.command {
2060            Some(crate::cli::Commands::Ingest(args)) => {
2061                assert!(args.low_memory, "--low-memory must set field to true");
2062            }
2063            _ => panic!("expected Ingest subcommand"),
2064        }
2065    }
2066
2067    #[test]
2068    fn ingest_args_low_memory_defaults_false() {
2069        use clap::Parser;
2070        let cli = crate::cli::Cli::try_parse_from([
2071            "sqlite-graphrag",
2072            "ingest",
2073            "/tmp/dummy",
2074            "--type",
2075            "document",
2076        ])
2077        .expect("parse must succeed");
2078        match cli.command {
2079            Some(crate::cli::Commands::Ingest(args)) => {
2080                assert!(!args.low_memory, "default must be false");
2081            }
2082            _ => panic!("expected Ingest subcommand"),
2083        }
2084    }
2085}