Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
118    )]
119    pub enable_ner: bool,
120    #[arg(
121        long,
122        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123        default_value = "fp32",
124        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
125    )]
126    pub gliner_variant: String,
127
128    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
129    #[arg(long, default_value_t = false, hide = true)]
130    pub skip_extraction: bool,
131
132    /// Stop on first per-file error instead of continuing with the next file.
133    #[arg(long, default_value_t = false)]
134    pub fail_fast: bool,
135
136    /// Preview file-to-name mapping without loading model or persisting.
137    #[arg(long, default_value_t = false)]
138    pub dry_run: bool,
139
140    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
141    #[arg(long, default_value_t = 10_000)]
142    pub max_files: usize,
143
144    /// Namespace for the ingested memories.
145    #[arg(long)]
146    pub namespace: Option<String>,
147
148    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
149    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150    pub db: Option<String>,
151
152    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153    pub format: JsonOutputFormat,
154
155    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156    pub json: bool,
157
158    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
159    #[arg(
160        long,
161        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162    )]
163    pub ingest_parallelism: Option<usize>,
164
165    /// Force single-threaded ingest to reduce RSS pressure.
166    ///
167    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
168    /// explicit value. Recommended for environments with <4 GB available
169    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
170    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
171    /// (CLI flag has higher precedence than the env var).
172    #[arg(
173        long,
174        default_value_t = false,
175        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176                Recommended for environments with <4 GB available RAM or container/cgroup \
177                constraints. Trade-off: 3-4x longer wall time. Also honored via \
178                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179    )]
180    pub low_memory: bool,
181
182    /// Maximum process RSS in MiB; abort if exceeded during embedding.
183    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185    pub max_rss_mb: u64,
186
187    /// Maximum character length for derived memory names from file basenames.
188    ///
189    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
190    /// Shorter values leave more headroom for collision suffix resolution.
191    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
192          help = "Maximum length for derived memory names (default: 60)")]
193    pub max_name_length: usize,
194
195    /// Extraction mode: `none` (body-only, default), `gliner` (NER), or `claude-code` (LLM-curated via Claude Code CLI).
196    #[arg(long, value_enum, default_value_t = IngestMode::None)]
197    pub mode: IngestMode,
198
199    /// Explicit path to the Claude Code binary (only with --mode claude-code).
200    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
201    pub claude_binary: Option<std::path::PathBuf>,
202
203    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
204    #[arg(long)]
205    pub claude_model: Option<String>,
206
207    /// Resume a previously interrupted claude-code ingest from the queue DB.
208    #[arg(long, default_value_t = false)]
209    pub resume: bool,
210
211    /// Retry only failed files from a previous claude-code ingest.
212    #[arg(long, default_value_t = false)]
213    pub retry_failed: bool,
214
215    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
216    #[arg(long, default_value_t = false)]
217    pub keep_queue: bool,
218
219    /// Custom path for the claude-code ingest queue database.
220    #[arg(long, default_value = ".ingest-queue.sqlite")]
221    pub queue_db: String,
222
223    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
224    #[arg(long, default_value_t = 60)]
225    pub rate_limit_wait: u64,
226
227    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
228    #[arg(long)]
229    pub max_cost_usd: Option<f64>,
230
231    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
232    #[arg(
233        long,
234        default_value_t = 300,
235        help = "Timeout in seconds for each claude -p invocation (default: 300)"
236    )]
237    pub claude_timeout: u64,
238
239    /// Explicit path to the Codex CLI binary (only with --mode codex).
240    #[arg(
241        long,
242        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
243        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
244    )]
245    pub codex_binary: Option<PathBuf>,
246
247    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
248    #[arg(
249        long,
250        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
251    )]
252    pub codex_model: Option<String>,
253
254    /// Timeout in seconds for each codex exec invocation.
255    #[arg(
256        long,
257        default_value_t = 300,
258        help = "Timeout in seconds for each codex exec invocation (default: 300)"
259    )]
260    pub codex_timeout: u64,
261
262    /// G30: poll for the job singleton every second for up to N seconds
263    /// when another invocation holds the lock. Default: 0 (fail fast).
264    #[arg(long, value_name = "SECONDS")]
265    pub wait_job_singleton: Option<u64>,
266
267    /// G30: force acquisition of the singleton lock by removing a stale
268    /// lock file from a previously crashed invocation.
269    #[arg(long, default_value_t = false)]
270    pub force_job_singleton: bool,
271}
272
273/// Extraction mode for the ingest pipeline.
274#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
275pub enum IngestMode {
276    /// Body-only ingestion without entity/relationship extraction (default).
277    None,
278    /// GLiNER zero-shot NER extraction (requires --enable-ner).
279    Gliner,
280    /// LLM-curated extraction via locally installed Claude Code CLI.
281    ClaudeCode,
282    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
283    Codex,
284}
285
286/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
287/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
288/// values evaluate to false. Unrecognized non-empty values emit a
289/// `tracing::warn!` and evaluate to false.
290fn env_low_memory_enabled() -> bool {
291    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
292        Ok(v) if v.is_empty() => false,
293        Ok(v) => match v.to_lowercase().as_str() {
294            "1" | "true" | "yes" | "on" => true,
295            "0" | "false" | "no" | "off" => false,
296            other => {
297                tracing::warn!(
298                    target: "ingest",
299                    value = %other,
300                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
301                );
302                false
303            }
304        },
305        Err(_) => false,
306    }
307}
308
309/// Resolves the effective ingest parallelism honoring `--low-memory` and the
310/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
311///
312/// Precedence:
313/// 1. `--low-memory` CLI flag forces parallelism = 1.
314/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
315/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
316/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
317///
318/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
319/// emits a `tracing::warn!` advertising the override.
320fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
321    let env_flag = env_low_memory_enabled();
322    let low_memory = low_memory_flag || env_flag;
323
324    if low_memory {
325        if let Some(n) = ingest_parallelism {
326            if n > 1 {
327                tracing::warn!(
328                    target: "ingest",
329                    requested = n,
330                    "--ingest-parallelism overridden by --low-memory; using 1"
331                );
332            }
333        }
334        if low_memory_flag {
335            tracing::info!(
336                target: "ingest",
337                source = "flag",
338                "low-memory mode enabled: forcing --ingest-parallelism 1"
339            );
340        } else {
341            tracing::info!(
342                target: "ingest",
343                source = "env",
344                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
345            );
346        }
347        return 1;
348    }
349
350    ingest_parallelism
351        .unwrap_or_else(|| {
352            std::thread::available_parallelism()
353                .map(|v| v.get() / 2)
354                .unwrap_or(1)
355                .clamp(1, 4)
356        })
357        .max(1)
358}
359
360#[derive(Serialize)]
361struct IngestFileEvent<'a> {
362    file: &'a str,
363    name: &'a str,
364    status: &'a str,
365    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
366    truncated: bool,
367    /// Original derived name before truncation; only present when `truncated=true`.
368    #[serde(skip_serializing_if = "Option::is_none")]
369    original_name: Option<String>,
370    /// Original file basename (without extension); only present when it differs from `name`.
371    #[serde(skip_serializing_if = "Option::is_none")]
372    original_filename: Option<&'a str>,
373    #[serde(skip_serializing_if = "Option::is_none")]
374    error: Option<String>,
375    #[serde(skip_serializing_if = "Option::is_none")]
376    memory_id: Option<i64>,
377    #[serde(skip_serializing_if = "Option::is_none")]
378    action: Option<String>,
379    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
380    body_length: usize,
381}
382
383#[derive(Serialize)]
384struct IngestSummary {
385    summary: bool,
386    dir: String,
387    pattern: String,
388    recursive: bool,
389    files_total: usize,
390    files_succeeded: usize,
391    files_failed: usize,
392    files_skipped: usize,
393    elapsed_ms: u64,
394}
395
396/// Outcome of a successful per-file ingest, used to build the NDJSON event.
397struct FileSuccess {
398    memory_id: i64,
399    action: String,
400    body_length: usize,
401}
402
403/// NDJSON progress event emitted to stderr after each file completes Phase A.
404/// Schema version 1; consumers should check `schema_version` before parsing.
405#[derive(Serialize)]
406struct StageProgressEvent<'a> {
407    schema_version: u8,
408    event: &'a str,
409    path: &'a str,
410    ms: u64,
411    entities: usize,
412    relationships: usize,
413}
414
415/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
416/// Phase B persists these to SQLite on the main thread in submission order.
417struct StagedFile {
418    body: String,
419    body_hash: String,
420    snippet: String,
421    name: String,
422    description: String,
423    embedding: Vec<f32>,
424    chunk_embeddings: Option<Vec<Vec<f32>>>,
425    chunks_info: Vec<crate::chunking::Chunk>,
426    entities: Vec<NewEntity>,
427    relationships: Vec<NewRelationship>,
428    entity_embeddings: Vec<Vec<f32>>,
429    urls: Vec<crate::extraction::ExtractedUrl>,
430}
431
432/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
433/// Never touches the database — safe to run on any rayon thread.
434fn stage_file(
435    _idx: usize,
436    path: &Path,
437    name: &str,
438    paths: &AppPaths,
439    enable_ner: bool,
440    gliner_variant: crate::extraction::GlinerVariant,
441    max_rss_mb: u64,
442) -> Result<StagedFile, AppError> {
443    use crate::constants::*;
444
445    if name.len() > MAX_MEMORY_NAME_LEN {
446        return Err(AppError::LimitExceeded(
447            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
448        ));
449    }
450    if name.starts_with("__") {
451        return Err(AppError::Validation(
452            crate::i18n::validation::reserved_name(),
453        ));
454    }
455    {
456        let slug_re = crate::constants::name_slug_regex();
457        if !slug_re.is_match(name) {
458            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
459                name,
460            )));
461        }
462    }
463
464    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
465    if file_size > MAX_MEMORY_BODY_LEN as u64 {
466        return Err(AppError::LimitExceeded(
467            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
468        ));
469    }
470    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
471    if raw_body.len() > MAX_MEMORY_BODY_LEN {
472        return Err(AppError::LimitExceeded(
473            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
474        ));
475    }
476    if raw_body.trim().is_empty() {
477        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
478    }
479
480    let description = format!("ingested from {}", path.display());
481    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
482        return Err(AppError::Validation(
483            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
484        ));
485    }
486
487    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
488    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
489    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
490    if enable_ner {
491        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
492            Ok(extracted) => {
493                extracted_urls = extracted.urls;
494                // v1.0.76: ExtractionResult.entities is now
495                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
496                // via name + type only; start/end offsets are not
497                // carried forward into the storage layer.
498                extracted_entities = extracted
499                    .entities
500                    .into_iter()
501                    .map(|e| NewEntity {
502                        name: e.name,
503                        entity_type: crate::entity_type::EntityType::Concept,
504                        description: None,
505                    })
506                    .collect();
507                // v1.0.76: relationships are no longer in the
508                // ExtractionResult struct; the LLM backend returns
509                // them in its own payload. The default build is
510                // URL-only extraction.
511                extracted_relationships.clear();
512
513                if extracted_entities.len() > max_entities_per_memory() {
514                    extracted_entities.truncate(max_entities_per_memory());
515                }
516                if extracted_relationships.len() > max_relationships_per_memory() {
517                    extracted_relationships.truncate(max_relationships_per_memory());
518                }
519            }
520            Err(e) => {
521                tracing::warn!(
522                    target: "ingest",
523                    file = %path.display(),
524                    "auto-extraction failed (graceful degradation): {e:#}"
525                );
526            }
527        }
528    }
529
530    for rel in &mut extracted_relationships {
531        rel.relation = crate::parsers::normalize_relation(&rel.relation);
532        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
533            return Err(AppError::Validation(format!(
534                "{e} for relationship '{}' -> '{}'",
535                rel.source, rel.target
536            )));
537        }
538        crate::parsers::warn_if_non_canonical(&rel.relation);
539        if !(0.0..=1.0).contains(&rel.strength) {
540            return Err(AppError::Validation(format!(
541                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
542                rel.strength, rel.source, rel.target
543            )));
544        }
545    }
546
547    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
548    let snippet: String = raw_body.chars().take(200).collect();
549
550    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
551    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
552        return Err(AppError::LimitExceeded(format!(
553            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
554            chunks_info.len(),
555            REMEMBER_MAX_SAFE_MULTI_CHUNKS
556        )));
557    }
558
559    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
560    let embedding = if chunks_info.len() == 1 {
561        crate::embedder::embed_passage_local(&paths.models, &raw_body)?
562    } else {
563        let chunk_texts: Vec<&str> = chunks_info
564            .iter()
565            .map(|c| chunking::chunk_text(&raw_body, c))
566            .collect();
567        let embed_cap = chunk_texts.len();
568        let mut chunk_embeddings = Vec::new();
569        chunk_embeddings.try_reserve(embed_cap).map_err(|_| {
570            AppError::LimitExceeded(format!(
571                "allocation of {embed_cap} chunk embeddings would exceed available memory"
572            ))
573        })?;
574        for chunk_text in &chunk_texts {
575            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
576                if rss > max_rss_mb {
577                    tracing::error!(
578                        target: "ingest",
579                        rss_mb = rss,
580                        max_rss_mb = max_rss_mb,
581                        file = %path.display(),
582                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
583                    );
584                    return Err(AppError::LowMemory {
585                        available_mb: crate::memory_guard::available_memory_mb(),
586                        required_mb: max_rss_mb,
587                    });
588                }
589            }
590            chunk_embeddings.push(crate::embedder::embed_passage_local(
591                &paths.models,
592                chunk_text,
593            )?);
594        }
595        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
596        chunk_embeddings_opt = Some(chunk_embeddings);
597        aggregated
598    };
599
600    let entity_embeddings = extracted_entities
601        .iter()
602        .map(|entity| {
603            let entity_text = match &entity.description {
604                Some(desc) => format!("{} {}", entity.name, desc),
605                None => entity.name.clone(),
606            };
607            crate::embedder::embed_passage_local(&paths.models, &entity_text)
608        })
609        .collect::<Result<Vec<_>, _>>()?;
610
611    Ok(StagedFile {
612        body: raw_body,
613        body_hash,
614        snippet,
615        name: name.to_string(),
616        description,
617        embedding,
618        chunk_embeddings: chunk_embeddings_opt,
619        chunks_info,
620        entities: extracted_entities,
621        relationships: extracted_relationships,
622        entity_embeddings,
623        urls: extracted_urls,
624    })
625}
626
627/// Phase B: persists one `StagedFile` to the database on the main thread.
628fn persist_staged(
629    conn: &mut Connection,
630    namespace: &str,
631    memory_type: &str,
632    staged: StagedFile,
633) -> Result<FileSuccess, AppError> {
634    {
635        let active_count: u32 = conn.query_row(
636            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
637            [],
638            |r| r.get::<_, i64>(0).map(|v| v as u32),
639        )?;
640        let ns_exists: bool = conn.query_row(
641            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
642            rusqlite::params![namespace],
643            |r| r.get::<_, i64>(0).map(|v| v > 0),
644        )?;
645        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
646            return Err(AppError::NamespaceError(format!(
647                "active namespace limit of {} exceeded while creating '{namespace}'",
648                crate::constants::MAX_NAMESPACES_ACTIVE
649            )));
650        }
651    }
652
653    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
654    if existing_memory.is_some() {
655        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
656            &staged.name,
657            namespace,
658        )));
659    }
660    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
661
662    let new_memory = NewMemory {
663        namespace: namespace.to_string(),
664        name: staged.name.clone(),
665        memory_type: memory_type.to_string(),
666        description: staged.description.clone(),
667        body: staged.body,
668        body_hash: staged.body_hash,
669        session_id: None,
670        source: "agent".to_string(),
671        metadata: serde_json::json!({}),
672    };
673
674    if let Some(hash_id) = duplicate_hash_id {
675        tracing::debug!(
676            target: "ingest",
677            duplicate_memory_id = hash_id,
678            "identical body already exists; persisting a new memory anyway"
679        );
680    }
681
682    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
683
684    let memory_id = memories::insert(&tx, &new_memory)?;
685    versions::insert_version(
686        &tx,
687        memory_id,
688        1,
689        &staged.name,
690        memory_type,
691        &staged.description,
692        &new_memory.body,
693        &serde_json::to_string(&new_memory.metadata)?,
694        None,
695        "create",
696    )?;
697    memories::upsert_vec(
698        &tx,
699        memory_id,
700        namespace,
701        memory_type,
702        &staged.embedding,
703        &staged.name,
704        &staged.snippet,
705    )?;
706
707    if staged.chunks_info.len() > 1 {
708        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
709        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
710            AppError::Internal(anyhow::anyhow!(
711                "missing chunk embeddings cache on multi-chunk ingest path"
712            ))
713        })?;
714        for (i, emb) in chunk_embeddings.iter().enumerate() {
715            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
716        }
717    }
718
719    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
720        for (idx, entity) in staged.entities.iter().enumerate() {
721            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
722            let entity_embedding = &staged.entity_embeddings[idx];
723            entities::upsert_entity_vec(
724                &tx,
725                entity_id,
726                namespace,
727                entity.entity_type,
728                entity_embedding,
729                &entity.name,
730            )?;
731            entities::link_memory_entity(&tx, memory_id, entity_id)?;
732            entities::increment_degree(&tx, entity_id)?;
733        }
734        let entity_types: std::collections::HashMap<&str, EntityType> = staged
735            .entities
736            .iter()
737            .map(|entity| (entity.name.as_str(), entity.entity_type))
738            .collect();
739        for rel in &staged.relationships {
740            let source_entity = NewEntity {
741                name: rel.source.clone(),
742                entity_type: entity_types
743                    .get(rel.source.as_str())
744                    .copied()
745                    .unwrap_or(EntityType::Concept),
746                description: None,
747            };
748            let target_entity = NewEntity {
749                name: rel.target.clone(),
750                entity_type: entity_types
751                    .get(rel.target.as_str())
752                    .copied()
753                    .unwrap_or(EntityType::Concept),
754                description: None,
755            };
756            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
757            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
758            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
759            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
760        }
761    }
762
763    tx.commit()?;
764
765    if !staged.urls.is_empty() {
766        let url_entries: Vec<storage_urls::MemoryUrl> = staged
767            .urls
768            .into_iter()
769            .map(|u| storage_urls::MemoryUrl {
770                url: u.url,
771                offset: Some(u.start as i64),
772            })
773            .collect();
774        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
775    }
776
777    Ok(FileSuccess {
778        memory_id,
779        action: "created".to_string(),
780        body_length: new_memory.body.len(),
781    })
782}
783
784// ---------------------------------------------------------------------------
785// G20: mode-conditional flag validation
786// ---------------------------------------------------------------------------
787
788/// True when a scalar value matches its declared default. Local
789/// re-declaration (also defined in ) to keep this module
790/// self-contained for the G20 fix.
791fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
792    value == default
793}
794
795/// G20: validate that flags for one LLM provider were not passed when
796/// the operator selected a different provider (or no provider). Flags
797/// silently discarded by the wrong mode are surfaced as
798///  BEFORE any DB work, so the operator gets
799/// an actionable error instead of a surprise at runtime.
800///
801/// Mode-specific matrices:
802/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
803///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
804///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
805///   --enable-ner is false)
806/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
807/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
808///   max_cost_usd, resume, retry_failed, keep_queue
809fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
810    const DEFAULT_TIMEOUT: u64 = 300;
811    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
812
813    let mut conflicts: Vec<String> = Vec::new();
814
815    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
816
817    if is_local_mode {
818        if args.claude_binary.is_some() {
819            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
820        }
821        if args.claude_model.is_some() {
822            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
823        }
824        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
825            conflicts.push(format!(
826                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
827                args.claude_timeout
828            ));
829        }
830        if args.codex_binary.is_some() {
831            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
832        }
833        if args.codex_model.is_some() {
834            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
835        }
836        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
837            conflicts.push(format!(
838                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
839                args.codex_timeout
840            ));
841        }
842        if args.max_cost_usd.is_some() {
843            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
844        }
845        if args.resume {
846            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
847        }
848        if args.retry_failed {
849            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
850        }
851        if args.keep_queue {
852            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
853        }
854        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
855            conflicts.push(format!(
856                "--rate-limit-wait={} is ignored when --mode is none or gliner",
857                args.rate_limit_wait
858            ));
859        }
860    }
861
862    match args.mode {
863        IngestMode::ClaudeCode => {
864            if args.codex_binary.is_some() {
865                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
866            }
867            if args.codex_model.is_some() {
868                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
869            }
870            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
871                conflicts.push(format!(
872                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
873                    args.codex_timeout
874                ));
875            }
876        }
877        IngestMode::Codex => {
878            if args.claude_binary.is_some() {
879                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
880            }
881            if args.claude_model.is_some() {
882                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
883            }
884            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
885                conflicts.push(format!(
886                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
887                    args.claude_timeout
888                ));
889            }
890            if args.max_cost_usd.is_some() {
891                conflicts.push(
892                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
893                        .to_string(),
894                );
895            }
896            if args.resume {
897                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
898            }
899            if args.retry_failed {
900                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
901            }
902            if args.keep_queue {
903                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
904            }
905        }
906        IngestMode::None | IngestMode::Gliner => {}
907    }
908
909    if !conflicts.is_empty() {
910        return Err(AppError::Validation(format!(
911            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
912            args.mode,
913            conflicts.join("\n  - ")
914        )));
915    }
916
917    Ok(())
918}
919
920// ---------------------------------------------------------------------------
921
922#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
923pub fn run(args: IngestArgs) -> Result<(), AppError> {
924    // G20: mode-conditional flag validation BEFORE any DB access.
925    // Surfaces flags that the wrong mode would silently discard.
926    validate_mode_conditional_flags_ingest(&args)?;
927    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
928    if args.mode == IngestMode::ClaudeCode {
929        return super::ingest_claude::run_claude_ingest(&args);
930    }
931    if args.mode == IngestMode::Codex {
932        return super::ingest_codex::run_codex_ingest(&args);
933    }
934
935    let started = std::time::Instant::now();
936
937    if !args.dir.exists() {
938        return Err(AppError::Validation(format!(
939            "directory not found: {}",
940            args.dir.display()
941        )));
942    }
943    if !args.dir.is_dir() {
944        return Err(AppError::Validation(format!(
945            "path is not a directory: {}",
946            args.dir.display()
947        )));
948    }
949
950    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
951    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
952    files.sort_unstable();
953
954    if files.len() > args.max_files {
955        return Err(AppError::Validation(format!(
956            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
957            files.len(),
958            args.max_files
959        )));
960    }
961
962    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
963    let memory_type_str = args.r#type.as_str().to_string();
964
965    let paths = AppPaths::resolve(args.db.as_deref())?;
966    let mut conn_or_err = match init_storage(&paths) {
967        Ok(c) => Ok(c),
968        Err(e) => Err(format!("{e}")),
969    };
970
971    let mut succeeded: usize = 0;
972    let mut failed: usize = 0;
973    let mut skipped: usize = 0;
974    let total = files.len();
975
976    // Pre-resolve all names before parallelisation so Phase A workers see a
977    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
978    let mut taken_names: BTreeSet<String> = BTreeSet::new();
979
980    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
981    // ProcessItem: the data moved into the producer thread for Phase A computation.
982    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
983    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
984    // boundary into the rayon producer.
985    enum SlotMeta {
986        Skip {
987            file_str: String,
988            derived_base: String,
989            name_truncated: bool,
990            original_name: Option<String>,
991            original_filename: Option<String>,
992            reason: String,
993        },
994        Process {
995            file_str: String,
996            derived_name: String,
997            name_truncated: bool,
998            original_name: Option<String>,
999            original_filename: Option<String>,
1000        },
1001    }
1002
1003    struct ProcessItem {
1004        idx: usize,
1005        path: PathBuf,
1006        file_str: String,
1007        derived_name: String,
1008    }
1009
1010    let files_cap = files.len();
1011    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1012    slots_meta.try_reserve(files_cap).map_err(|_| {
1013        AppError::LimitExceeded(format!(
1014            "allocation of {files_cap} slot metadata entries would exceed available memory"
1015        ))
1016    })?;
1017    let mut process_items: Vec<ProcessItem> = Vec::new();
1018    process_items.try_reserve(files_cap).map_err(|_| {
1019        AppError::LimitExceeded(format!(
1020            "allocation of {files_cap} process items would exceed available memory"
1021        ))
1022    })?;
1023    let mut truncations: Vec<(String, String)> = Vec::new();
1024    truncations.try_reserve(files_cap).map_err(|_| {
1025        AppError::LimitExceeded(format!(
1026            "allocation of {files_cap} truncation entries would exceed available memory"
1027        ))
1028    })?;
1029
1030    let max_name_length = args.max_name_length;
1031    for path in &files {
1032        let file_str = path.to_string_lossy().into_owned();
1033        let (derived_base, name_truncated, original_name) =
1034            derive_kebab_name(path, max_name_length);
1035        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1036
1037        if name_truncated {
1038            if let Some(ref orig) = original_name {
1039                truncations.push((orig.clone(), derived_base.clone()));
1040            }
1041        }
1042
1043        if derived_base.is_empty() {
1044            // original_filename: always include when it differs from the empty derived name
1045            let orig_filename = if !original_basename.is_empty() {
1046                Some(original_basename.to_string())
1047            } else {
1048                None
1049            };
1050            slots_meta.push(SlotMeta::Skip {
1051                file_str,
1052                derived_base: String::new(),
1053                name_truncated: false,
1054                original_name: None,
1055                original_filename: orig_filename,
1056                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1057            });
1058            continue;
1059        }
1060
1061        match unique_name(&derived_base, &taken_names) {
1062            Ok(derived_name) => {
1063                taken_names.insert(derived_name.clone());
1064                let idx = slots_meta.len();
1065                // original_filename: present only when the raw basename differs from the derived name
1066                let orig_filename = if original_basename != derived_name {
1067                    Some(original_basename.to_string())
1068                } else {
1069                    None
1070                };
1071                process_items.push(ProcessItem {
1072                    idx,
1073                    path: path.clone(),
1074                    file_str: file_str.clone(),
1075                    derived_name: derived_name.clone(),
1076                });
1077                slots_meta.push(SlotMeta::Process {
1078                    file_str,
1079                    derived_name,
1080                    name_truncated,
1081                    original_name,
1082                    original_filename: orig_filename,
1083                });
1084            }
1085            Err(e) => {
1086                let orig_filename = if original_basename != derived_base {
1087                    Some(original_basename.to_string())
1088                } else {
1089                    None
1090                };
1091                slots_meta.push(SlotMeta::Skip {
1092                    file_str,
1093                    derived_base,
1094                    name_truncated,
1095                    original_name,
1096                    original_filename: orig_filename,
1097                    reason: e.to_string(),
1098                });
1099            }
1100        }
1101    }
1102
1103    if !truncations.is_empty() {
1104        tracing::info!(
1105            target: "ingest",
1106            count = truncations.len(),
1107            max_name_length = max_name_length,
1108            max_len = DERIVED_NAME_MAX_LEN,
1109            "derived names truncated; pass -vv (debug) for per-file detail"
1110        );
1111    }
1112
1113    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1114    if args.dry_run {
1115        for meta in &slots_meta {
1116            match meta {
1117                SlotMeta::Skip {
1118                    file_str,
1119                    derived_base,
1120                    name_truncated,
1121                    original_name,
1122                    original_filename,
1123                    reason,
1124                } => {
1125                    output::emit_json_compact(&IngestFileEvent {
1126                        file: file_str,
1127                        name: derived_base,
1128                        status: "skip",
1129                        truncated: *name_truncated,
1130                        original_name: original_name.clone(),
1131                        original_filename: original_filename.as_deref(),
1132                        error: Some(reason.clone()),
1133                        memory_id: None,
1134                        action: None,
1135                        body_length: 0,
1136                    })?;
1137                }
1138                SlotMeta::Process {
1139                    file_str,
1140                    derived_name,
1141                    name_truncated,
1142                    original_name,
1143                    original_filename,
1144                } => {
1145                    output::emit_json_compact(&IngestFileEvent {
1146                        file: file_str,
1147                        name: derived_name,
1148                        status: "preview",
1149                        truncated: *name_truncated,
1150                        original_name: original_name.clone(),
1151                        original_filename: original_filename.as_deref(),
1152                        error: None,
1153                        memory_id: None,
1154                        action: None,
1155                        body_length: 0,
1156                    })?;
1157                }
1158            }
1159        }
1160        output::emit_json_compact(&IngestSummary {
1161            summary: true,
1162            dir: args.dir.to_string_lossy().into_owned(),
1163            pattern: args.pattern.clone(),
1164            recursive: args.recursive,
1165            files_total: total,
1166            files_succeeded: 0,
1167            files_failed: 0,
1168            files_skipped: 0,
1169            elapsed_ms: started.elapsed().as_millis() as u64,
1170        })?;
1171        return Ok(());
1172    }
1173
1174    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1175    if args.low_memory {
1176        if let Some(n) = args.ingest_parallelism {
1177            if n > 1 {
1178                return Err(AppError::Validation(
1179                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1180                        .to_string(),
1181                ));
1182            }
1183        }
1184    }
1185
1186    // Determine rayon thread pool size, honoring --low-memory and the
1187    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1188    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1189
1190    let pool = rayon::ThreadPoolBuilder::new()
1191        .num_threads(parallelism)
1192        .build()
1193        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1194
1195    if args.enable_ner && args.skip_extraction {
1196        return Err(AppError::Validation(
1197            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1198        ));
1199    }
1200    if args.skip_extraction && !args.enable_ner {
1201        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1202        // commit (9ddb17b) promoted this to a hard validation error, which
1203        // broke the "kept as a hidden no-op for backwards compatibility"
1204        // promise documented in CHANGELOG v1.0.45 and started failing
1205        // 5+ CI jobs whose E2E tests use this flag to skip the
1206        // GLiNER-ONNX model download in CI environments.
1207        tracing::warn!(
1208            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1209        );
1210    }
1211    let enable_ner = args.enable_ner;
1212    let max_rss_mb = args.max_rss_mb;
1213    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1214        "int8" => crate::extraction::GlinerVariant::Int8,
1215        _ => crate::extraction::GlinerVariant::Fp32,
1216    };
1217
1218    let total_to_process = process_items.len();
1219    tracing::info!(
1220        target: "ingest",
1221        phase = "pipeline_start",
1222        files = total_to_process,
1223        ingest_parallelism = parallelism,
1224        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1225    );
1226
1227    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1228    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1229    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1230    let channel_bound = (parallelism * 2).max(1);
1231    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1232
1233    // Phase A: launched in a dedicated OS thread so the main thread can consume
1234    // the channel concurrently. pool.install() blocks the calling thread until
1235    // all rayon workers finish — if called on the main thread it would
1236    // reintroduce the 2-phase blocking behaviour we are eliminating.
1237    let paths_owned = paths.clone();
1238    let producer_handle = std::thread::spawn(move || {
1239        pool.install(|| {
1240            process_items.into_par_iter().for_each(|item| {
1241                if crate::shutdown_requested() {
1242                    return;
1243                }
1244                let t0 = std::time::Instant::now();
1245                let result = stage_file(
1246                    item.idx,
1247                    &item.path,
1248                    &item.derived_name,
1249                    &paths_owned,
1250                    enable_ner,
1251                    gliner_variant,
1252                    max_rss_mb,
1253                );
1254                let elapsed_ms = t0.elapsed().as_millis() as u64;
1255
1256                // Emit NDJSON progress event to stderr so the user sees work
1257                // happening during long NER runs (e.g. 50 files × 27s each).
1258                let (n_entities, n_relationships) = match &result {
1259                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1260                    Err(_) => (0, 0),
1261                };
1262                let progress = StageProgressEvent {
1263                    schema_version: 1,
1264                    event: "file_extracted",
1265                    path: &item.file_str,
1266                    ms: elapsed_ms,
1267                    entities: n_entities,
1268                    relationships: n_relationships,
1269                };
1270                if let Ok(line) = serde_json::to_string(&progress) {
1271                    tracing::info!(target: "ingest_progress", "{}", line);
1272                }
1273
1274                // Blocking send applies backpressure: if Phase B is slower,
1275                // Phase A workers wait here instead of accumulating staged files
1276                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1277                let _ = tx.send((item.idx, result));
1278            });
1279            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1280            drop(tx);
1281        });
1282    });
1283
1284    // Phase B: main thread persists files as results arrive from the channel.
1285    // Results arrive in completion order (par_iter is unordered). We persist
1286    // each file immediately on arrival — this is the key fix for B1: with the
1287    // old 2-phase design the first DB write happened only after ALL files had
1288    // finished Phase A. Now the first commit happens as soon as the first file
1289    // completes Phase A, regardless of how many files remain.
1290    //
1291    // NDJSON output order follows completion order (not file-system sort order).
1292    // Skip slots are emitted at the end, after all Process results are consumed.
1293    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1294    // requirement than ensuring data is persisted before the user's timeout fires.
1295    let fail_fast = args.fail_fast;
1296
1297    // Emit pending Skip events first so agents see them early.
1298    for meta in &slots_meta {
1299        if let SlotMeta::Skip {
1300            file_str,
1301            derived_base,
1302            name_truncated,
1303            original_name,
1304            original_filename,
1305            reason,
1306        } = meta
1307        {
1308            output::emit_json_compact(&IngestFileEvent {
1309                file: file_str,
1310                name: derived_base,
1311                status: "skipped",
1312                truncated: *name_truncated,
1313                original_name: original_name.clone(),
1314                original_filename: original_filename.as_deref(),
1315                error: Some(reason.clone()),
1316                memory_id: None,
1317                action: None,
1318                body_length: 0,
1319            })?;
1320            skipped += 1;
1321        }
1322    }
1323
1324    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1325    // as channel messages arrive in completion order.
1326    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1327        .iter()
1328        .enumerate()
1329        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1330        .collect();
1331
1332    tracing::info!(
1333        target: "ingest",
1334        phase = "persist_start",
1335        files = total_to_process,
1336        "phase B starting: persisting files incrementally as Phase A completes each one",
1337    );
1338
1339    // Drain channel and persist each file immediately — no accumulation into a
1340    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1341    // Phase B without applying backpressure.
1342    for (idx, stage_result) in rx {
1343        if crate::shutdown_requested() {
1344            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1345            break;
1346        }
1347        let meta = meta_index.get(&idx).ok_or_else(|| {
1348            AppError::Internal(anyhow::anyhow!(
1349                "channel idx {idx} has no corresponding Process slot"
1350            ))
1351        })?;
1352        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1353        {
1354            SlotMeta::Process {
1355                file_str,
1356                derived_name,
1357                name_truncated,
1358                original_name,
1359                original_filename,
1360            } => (
1361                file_str,
1362                derived_name,
1363                name_truncated,
1364                original_name,
1365                original_filename,
1366            ),
1367            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1368        };
1369
1370        // If storage init failed, every file fails with the same error.
1371        let conn = match conn_or_err.as_mut() {
1372            Ok(c) => c,
1373            Err(err_msg) => {
1374                let err_clone = err_msg.clone();
1375                output::emit_json_compact(&IngestFileEvent {
1376                    file: file_str,
1377                    name: derived_name,
1378                    status: "failed",
1379                    truncated: *name_truncated,
1380                    original_name: original_name.clone(),
1381                    original_filename: original_filename.as_deref(),
1382                    error: Some(err_clone.clone()),
1383                    memory_id: None,
1384                    action: None,
1385                    body_length: 0,
1386                })?;
1387                failed += 1;
1388                if fail_fast {
1389                    output::emit_json_compact(&IngestSummary {
1390                        summary: true,
1391                        dir: args.dir.display().to_string(),
1392                        pattern: args.pattern.clone(),
1393                        recursive: args.recursive,
1394                        files_total: total,
1395                        files_succeeded: succeeded,
1396                        files_failed: failed,
1397                        files_skipped: skipped,
1398                        elapsed_ms: started.elapsed().as_millis() as u64,
1399                    })?;
1400                    return Err(AppError::Validation(format!(
1401                        "ingest aborted on first failure: {err_clone}"
1402                    )));
1403                }
1404                continue;
1405            }
1406        };
1407
1408        let outcome =
1409            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1410
1411        match outcome {
1412            Ok(FileSuccess {
1413                memory_id,
1414                action,
1415                body_length,
1416            }) => {
1417                output::emit_json_compact(&IngestFileEvent {
1418                    file: file_str,
1419                    name: derived_name,
1420                    status: "indexed",
1421                    truncated: *name_truncated,
1422                    original_name: original_name.clone(),
1423                    original_filename: original_filename.as_deref(),
1424                    error: None,
1425                    memory_id: Some(memory_id),
1426                    action: Some(action),
1427                    body_length,
1428                })?;
1429                succeeded += 1;
1430            }
1431            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1432                output::emit_json_compact(&IngestFileEvent {
1433                    file: file_str,
1434                    name: derived_name,
1435                    status: "skipped",
1436                    truncated: *name_truncated,
1437                    original_name: original_name.clone(),
1438                    original_filename: original_filename.as_deref(),
1439                    error: Some(format!("{e}")),
1440                    memory_id: None,
1441                    action: Some("duplicate".to_string()),
1442                    body_length: 0,
1443                })?;
1444                skipped += 1;
1445            }
1446            Err(e) => {
1447                let err_msg = format!("{e}");
1448                output::emit_json_compact(&IngestFileEvent {
1449                    file: file_str,
1450                    name: derived_name,
1451                    status: "failed",
1452                    truncated: *name_truncated,
1453                    original_name: original_name.clone(),
1454                    original_filename: original_filename.as_deref(),
1455                    error: Some(err_msg.clone()),
1456                    memory_id: None,
1457                    action: None,
1458                    body_length: 0,
1459                })?;
1460                failed += 1;
1461                if fail_fast {
1462                    output::emit_json_compact(&IngestSummary {
1463                        summary: true,
1464                        dir: args.dir.display().to_string(),
1465                        pattern: args.pattern.clone(),
1466                        recursive: args.recursive,
1467                        files_total: total,
1468                        files_succeeded: succeeded,
1469                        files_failed: failed,
1470                        files_skipped: skipped,
1471                        elapsed_ms: started.elapsed().as_millis() as u64,
1472                    })?;
1473                    return Err(AppError::Validation(format!(
1474                        "ingest aborted on first failure: {err_msg}"
1475                    )));
1476                }
1477            }
1478        }
1479    }
1480
1481    // Wait for the producer thread to finish cleanly.
1482    producer_handle
1483        .join()
1484        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1485
1486    if let Ok(ref conn) = conn_or_err {
1487        if succeeded > 0 {
1488            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1489        }
1490    }
1491
1492    output::emit_json_compact(&IngestSummary {
1493        summary: true,
1494        dir: args.dir.display().to_string(),
1495        pattern: args.pattern.clone(),
1496        recursive: args.recursive,
1497        files_total: total,
1498        files_succeeded: succeeded,
1499        files_failed: failed,
1500        files_skipped: skipped,
1501        elapsed_ms: started.elapsed().as_millis() as u64,
1502    })?;
1503
1504    Ok(())
1505}
1506
1507/// Auto-initialises the database (matches the contract of every other CRUD
1508/// handler) and returns a fresh read/write connection ready for the ingest
1509/// loop. Errors here are recoverable per-file: the caller surfaces them as
1510/// failure events so `--fail-fast` and the continue-on-error path keep
1511/// working when, for example, the user points `--db` at an unwritable path.
1512fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1513    ensure_db_ready(paths)?;
1514    let conn = open_rw(&paths.db)?;
1515    Ok(conn)
1516}
1517
1518pub(crate) fn collect_files(
1519    dir: &Path,
1520    pattern: &str,
1521    recursive: bool,
1522    out: &mut Vec<PathBuf>,
1523) -> Result<(), AppError> {
1524    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1525    for entry in entries {
1526        let entry = entry.map_err(AppError::Io)?;
1527        let path = entry.path();
1528        let file_type = entry.file_type().map_err(AppError::Io)?;
1529        if file_type.is_file() {
1530            let name = entry.file_name();
1531            let name_str = name.to_string_lossy();
1532            if matches_pattern(&name_str, pattern) {
1533                out.push(path);
1534            }
1535        } else if file_type.is_dir() && recursive {
1536            collect_files(&path, pattern, recursive, out)?;
1537        }
1538    }
1539    Ok(())
1540}
1541
1542fn matches_pattern(name: &str, pattern: &str) -> bool {
1543    if let Some(suffix) = pattern.strip_prefix('*') {
1544        name.ends_with(suffix)
1545    } else if let Some(prefix) = pattern.strip_suffix('*') {
1546        name.starts_with(prefix)
1547    } else {
1548        name == pattern
1549    }
1550}
1551
1552/// Returns `(final_name, truncated, original_name)`.
1553/// `truncated` is true when the derived name exceeded `max_len`.
1554/// `original_name` holds the pre-truncation name only when `truncated=true`.
1555///
1556/// Non-ASCII characters are first decomposed via NFD and then stripped of
1557/// combining marks so accented letters fold to their base ASCII letter
1558/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1559/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1560/// preserves meaningful word content rather than collapsing the basename
1561/// to a few stray ASCII letters as the previous filter did.
1562pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1563    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1564    let lowered: String = stem
1565        .nfd()
1566        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1567        .map(|c| {
1568            if c == '_' || c.is_whitespace() {
1569                '-'
1570            } else {
1571                c
1572            }
1573        })
1574        .map(|c| c.to_ascii_lowercase())
1575        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1576        .collect();
1577    let collapsed = collapse_dashes(&lowered);
1578    let trimmed_raw = collapsed.trim_matches('-').to_string();
1579    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1580    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1581        format!("doc-{trimmed_raw}")
1582    } else {
1583        trimmed_raw
1584    };
1585    if trimmed.len() > max_len {
1586        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1587        tracing::debug!(
1588            target: "ingest",
1589            original = %trimmed,
1590            truncated_to = %truncated,
1591            max_len = max_len,
1592            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1593        );
1594        (truncated, true, Some(trimmed))
1595    } else {
1596        (trimmed, false, None)
1597    }
1598}
1599
1600/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1601/// numeric suffix (`-1`, `-2`, …) when needed.
1602///
1603/// `taken` is the set of names already consumed in the current ingest run.
1604/// The caller is expected to insert the returned name into `taken` so the
1605/// next call observes the consumption. Cross-run collisions are intentionally
1606/// surfaced by the per-file persistence path as duplicates so re-ingestion
1607/// of identical corpora stays idempotent.
1608///
1609/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1610/// candidates collide, signalling a pathological corpus that should be
1611/// renamed manually.
1612fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1613    if !taken.contains(base) {
1614        return Ok(base.to_string());
1615    }
1616    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1617        let candidate = format!("{base}-{suffix}");
1618        if !taken.contains(&candidate) {
1619            tracing::warn!(
1620                target: "ingest",
1621                base = %base,
1622                resolved = %candidate,
1623                suffix,
1624                "memory name collision resolved with numeric suffix"
1625            );
1626            return Ok(candidate);
1627        }
1628    }
1629    Err(AppError::Validation(format!(
1630        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1631    )))
1632}
1633
1634fn collapse_dashes(s: &str) -> String {
1635    let mut out = String::with_capacity(s.len());
1636    let mut prev_dash = false;
1637    for c in s.chars() {
1638        if c == '-' {
1639            if !prev_dash {
1640                out.push('-');
1641            }
1642            prev_dash = true;
1643        } else {
1644            out.push(c);
1645            prev_dash = false;
1646        }
1647    }
1648    out
1649}
1650
1651#[cfg(test)]
1652mod tests {
1653    use super::*;
1654    use std::path::PathBuf;
1655
1656    #[test]
1657    fn matches_pattern_suffix() {
1658        assert!(matches_pattern("foo.md", "*.md"));
1659        assert!(!matches_pattern("foo.txt", "*.md"));
1660        assert!(matches_pattern("foo.md", "*"));
1661    }
1662
1663    #[test]
1664    fn matches_pattern_prefix() {
1665        assert!(matches_pattern("README.md", "README*"));
1666        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1667    }
1668
1669    #[test]
1670    fn matches_pattern_exact() {
1671        assert!(matches_pattern("README.md", "README.md"));
1672        assert!(!matches_pattern("readme.md", "README.md"));
1673    }
1674
1675    #[test]
1676    fn derive_kebab_underscore_to_dash() {
1677        let p = PathBuf::from("/tmp/claude_code_headless.md");
1678        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1679        assert_eq!(name, "claude-code-headless");
1680        assert!(!truncated);
1681        assert!(original.is_none());
1682    }
1683
1684    #[test]
1685    fn derive_kebab_uppercase_lowered() {
1686        let p = PathBuf::from("/tmp/README.md");
1687        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1688        assert_eq!(name, "readme");
1689        assert!(!truncated);
1690        assert!(original.is_none());
1691    }
1692
1693    #[test]
1694    fn derive_kebab_strips_non_kebab_chars() {
1695        let p = PathBuf::from("/tmp/some@weird#name!.md");
1696        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1697        assert_eq!(name, "someweirdname");
1698        assert!(!truncated);
1699        assert!(original.is_none());
1700    }
1701
1702    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1703    // accented characters instead of dropping them entirely.
1704    #[test]
1705    fn derive_kebab_folds_accented_letters_to_ascii() {
1706        let p = PathBuf::from("/tmp/açaí.md");
1707        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1708        assert_eq!(name, "acai", "got '{name}'");
1709    }
1710
1711    #[test]
1712    fn derive_kebab_handles_naive_with_diaeresis() {
1713        let p = PathBuf::from("/tmp/naïve-test.md");
1714        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1715        assert_eq!(name, "naive-test", "got '{name}'");
1716    }
1717
1718    #[test]
1719    fn derive_kebab_drops_emoji_keeps_word() {
1720        let p = PathBuf::from("/tmp/🚀-rocket.md");
1721        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1722        assert_eq!(name, "rocket", "got '{name}'");
1723    }
1724
1725    #[test]
1726    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1727        let p = PathBuf::from("/tmp/açaí🦜.md");
1728        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1729        assert_eq!(name, "acai", "got '{name}'");
1730    }
1731
1732    #[test]
1733    fn derive_kebab_pure_emoji_yields_empty() {
1734        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1735        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1736        assert!(name.is_empty(), "got '{name}'");
1737    }
1738
1739    #[test]
1740    fn derive_kebab_collapses_consecutive_dashes() {
1741        let p = PathBuf::from("/tmp/a__b___c.md");
1742        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1743        assert_eq!(name, "a-b-c");
1744        assert!(!truncated);
1745        assert!(original.is_none());
1746    }
1747
1748    #[test]
1749    fn derive_kebab_truncates_to_60_chars() {
1750        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1751        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1752        assert!(name.len() <= 60, "got len {}", name.len());
1753        assert!(truncated);
1754        assert!(original.is_some());
1755        assert!(original.unwrap().len() > 60);
1756    }
1757
1758    #[test]
1759    fn collect_files_finds_md_files() {
1760        let tmp = tempfile::tempdir().expect("tempdir");
1761        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1762        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1763        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1764        let mut out = Vec::new();
1765        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1766        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1767    }
1768
1769    #[test]
1770    fn collect_files_recursive_descends_subdirs() {
1771        let tmp = tempfile::tempdir().expect("tempdir");
1772        let sub = tmp.path().join("sub");
1773        std::fs::create_dir(&sub).unwrap();
1774        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1775        std::fs::write(sub.join("b.md"), "y").unwrap();
1776        let mut out = Vec::new();
1777        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1778        assert_eq!(out.len(), 2);
1779    }
1780
1781    #[test]
1782    fn collect_files_non_recursive_skips_subdirs() {
1783        let tmp = tempfile::tempdir().expect("tempdir");
1784        let sub = tmp.path().join("sub");
1785        std::fs::create_dir(&sub).unwrap();
1786        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1787        std::fs::write(sub.join("b.md"), "y").unwrap();
1788        let mut out = Vec::new();
1789        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1790        assert_eq!(out.len(), 1);
1791    }
1792
1793    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1794
1795    #[test]
1796    fn derive_kebab_long_basename_truncated_within_cap() {
1797        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1798        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1799        assert!(
1800            name.len() <= DERIVED_NAME_MAX_LEN,
1801            "truncated name must respect cap; got {} chars",
1802            name.len()
1803        );
1804        assert!(!name.is_empty());
1805        assert!(truncated);
1806        assert!(original.is_some());
1807    }
1808
1809    #[test]
1810    fn unique_name_returns_base_when_free() {
1811        let taken: BTreeSet<String> = BTreeSet::new();
1812        let resolved = unique_name("note", &taken).expect("must resolve");
1813        assert_eq!(resolved, "note");
1814    }
1815
1816    #[test]
1817    fn unique_name_appends_first_free_suffix_on_collision() {
1818        let mut taken: BTreeSet<String> = BTreeSet::new();
1819        taken.insert("note".to_string());
1820        taken.insert("note-1".to_string());
1821        let resolved = unique_name("note", &taken).expect("must resolve");
1822        assert_eq!(resolved, "note-2");
1823    }
1824
1825    #[test]
1826    fn unique_name_errors_after_collision_cap() {
1827        let mut taken: BTreeSet<String> = BTreeSet::new();
1828        taken.insert("note".to_string());
1829        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1830            taken.insert(format!("note-{i}"));
1831        }
1832        let err = unique_name("note", &taken).expect_err("must surface error");
1833        assert!(matches!(err, AppError::Validation(_)));
1834    }
1835
1836    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1837
1838    #[test]
1839    fn validate_relation_format_accepts_valid_relations() {
1840        use crate::parsers::{is_canonical_relation, validate_relation_format};
1841        assert!(validate_relation_format("applies_to").is_ok());
1842        assert!(validate_relation_format("depends_on").is_ok());
1843        assert!(validate_relation_format("implements").is_ok());
1844        assert!(validate_relation_format("").is_err());
1845        assert!(is_canonical_relation("applies_to"));
1846        assert!(!is_canonical_relation("implements"));
1847    }
1848
1849    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1850
1851    use serial_test::serial;
1852
1853    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1854    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1855        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1856        let prev = std::env::var(key).ok();
1857        match value {
1858            Some(v) => std::env::set_var(key, v),
1859            None => std::env::remove_var(key),
1860        }
1861        f();
1862        match prev {
1863            Some(p) => std::env::set_var(key, p),
1864            None => std::env::remove_var(key),
1865        }
1866    }
1867
1868    #[test]
1869    #[serial]
1870    fn env_low_memory_enabled_unset_returns_false() {
1871        with_env_var(None, || assert!(!env_low_memory_enabled()));
1872    }
1873
1874    #[test]
1875    #[serial]
1876    fn env_low_memory_enabled_empty_returns_false() {
1877        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1878    }
1879
1880    #[test]
1881    #[serial]
1882    fn env_low_memory_enabled_truthy_values_return_true() {
1883        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1884            with_env_var(Some(v), || {
1885                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1886            });
1887        }
1888    }
1889
1890    #[test]
1891    #[serial]
1892    fn env_low_memory_enabled_falsy_values_return_false() {
1893        for v in ["0", "false", "FALSE", "no", "off"] {
1894            with_env_var(Some(v), || {
1895                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1896            });
1897        }
1898    }
1899
1900    #[test]
1901    #[serial]
1902    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1903        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1904    }
1905
1906    #[test]
1907    #[serial]
1908    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1909        with_env_var(None, || {
1910            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1911            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1912            assert_eq!(resolve_parallelism(true, None), 1);
1913        });
1914    }
1915
1916    #[test]
1917    #[serial]
1918    fn resolve_parallelism_env_forces_one_when_flag_off() {
1919        with_env_var(Some("1"), || {
1920            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1921            assert_eq!(resolve_parallelism(false, None), 1);
1922        });
1923    }
1924
1925    #[test]
1926    #[serial]
1927    fn resolve_parallelism_falsy_env_does_not_override() {
1928        with_env_var(Some("0"), || {
1929            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1930        });
1931    }
1932
1933    #[test]
1934    #[serial]
1935    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1936        with_env_var(None, || {
1937            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1938            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1939        });
1940    }
1941
1942    #[test]
1943    #[serial]
1944    fn resolve_parallelism_default_when_unset() {
1945        with_env_var(None, || {
1946            let p = resolve_parallelism(false, None);
1947            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1948        });
1949    }
1950
1951    #[test]
1952    fn ingest_args_parses_low_memory_flag_via_clap() {
1953        use clap::Parser;
1954        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1955        // on the public `Cli` definition so the flag is wired end-to-end.
1956        let cli = crate::cli::Cli::try_parse_from([
1957            "sqlite-graphrag",
1958            "ingest",
1959            "/tmp/dummy",
1960            "--type",
1961            "document",
1962            "--low-memory",
1963        ])
1964        .expect("parse must succeed");
1965        match cli.command {
1966            crate::cli::Commands::Ingest(args) => {
1967                assert!(args.low_memory, "--low-memory must set field to true");
1968            }
1969            _ => panic!("expected Ingest subcommand"),
1970        }
1971    }
1972
1973    #[test]
1974    fn ingest_args_low_memory_defaults_false() {
1975        use clap::Parser;
1976        let cli = crate::cli::Cli::try_parse_from([
1977            "sqlite-graphrag",
1978            "ingest",
1979            "/tmp/dummy",
1980            "--type",
1981            "document",
1982        ])
1983        .expect("parse must succeed");
1984        match cli.command {
1985            crate::cli::Commands::Ingest(args) => {
1986                assert!(!args.low_memory, "default must be false");
1987            }
1988            _ => panic!("expected Ingest subcommand"),
1989        }
1990    }
1991}