Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118    )]
119    pub enable_ner: bool,
120    #[arg(
121        long,
122        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123        default_value = "fp32",
124        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
125    )]
126    pub gliner_variant: String,
127
128    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
129    #[arg(long, default_value_t = false, hide = true)]
130    pub skip_extraction: bool,
131
132    /// Stop on first per-file error instead of continuing with the next file.
133    #[arg(long, default_value_t = false)]
134    pub fail_fast: bool,
135
136    /// Preview file-to-name mapping without loading model or persisting.
137    #[arg(long, default_value_t = false)]
138    pub dry_run: bool,
139
140    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
141    #[arg(long, default_value_t = 10_000)]
142    pub max_files: usize,
143
144    /// Namespace for the ingested memories.
145    #[arg(long)]
146    pub namespace: Option<String>,
147
148    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
149    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150    pub db: Option<String>,
151
152    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153    pub format: JsonOutputFormat,
154
155    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156    pub json: bool,
157
158    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
159    #[arg(
160        long,
161        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162    )]
163    pub ingest_parallelism: Option<usize>,
164
165    /// Force single-threaded ingest to reduce RSS pressure.
166    ///
167    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
168    /// explicit value. Recommended for environments with <4 GB available
169    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
170    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
171    /// (CLI flag has higher precedence than the env var).
172    #[arg(
173        long,
174        default_value_t = false,
175        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176                Recommended for environments with <4 GB available RAM or container/cgroup \
177                constraints. Trade-off: 3-4x longer wall time. Also honored via \
178                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179    )]
180    pub low_memory: bool,
181
182    /// Maximum process RSS in MiB; abort if exceeded during embedding.
183    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185    pub max_rss_mb: u64,
186
187    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
188    /// PER FILE. Multiplies with --ingest-parallelism (files staged
189    /// concurrently), hence the conservative default of 2. The effective
190    /// value is further bounded by CPU count and available RAM.
191    #[arg(long, default_value_t = 2, value_name = "N",
192          value_parser = clap::value_parser!(u64).range(1..=32),
193          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
194    pub llm_parallelism: u64,
195
196    /// Maximum character length for derived memory names from file basenames.
197    ///
198    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
199    /// Shorter values leave more headroom for collision suffix resolution.
200    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
201          help = "Maximum length for derived memory names (default: 60)")]
202    pub max_name_length: usize,
203
204    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
205    #[arg(long, value_enum, default_value_t = IngestMode::None)]
206    pub mode: IngestMode,
207
208    /// Explicit path to the Claude Code binary (only with --mode claude-code).
209    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
210    pub claude_binary: Option<std::path::PathBuf>,
211
212    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
213    #[arg(long)]
214    pub claude_model: Option<String>,
215
216    /// Resume a previously interrupted claude-code ingest from the queue DB.
217    #[arg(long, default_value_t = false)]
218    pub resume: bool,
219
220    /// Retry only failed files from a previous claude-code ingest.
221    #[arg(long, default_value_t = false)]
222    pub retry_failed: bool,
223
224    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
225    #[arg(long, default_value_t = false)]
226    pub keep_queue: bool,
227
228    /// Custom path for the claude-code ingest queue database.
229    #[arg(long, default_value = ".ingest-queue.sqlite")]
230    pub queue_db: String,
231
232    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
233    #[arg(long, default_value_t = 60)]
234    pub rate_limit_wait: u64,
235
236    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
237    #[arg(long)]
238    pub max_cost_usd: Option<f64>,
239
240    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
241    #[arg(
242        long,
243        default_value_t = 300,
244        help = "Timeout in seconds for each claude -p invocation (default: 300)"
245    )]
246    pub claude_timeout: u64,
247
248    /// Explicit path to the Codex CLI binary (only with --mode codex).
249    #[arg(
250        long,
251        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
252        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
253    )]
254    pub codex_binary: Option<PathBuf>,
255
256    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
257    #[arg(
258        long,
259        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
260    )]
261    pub codex_model: Option<String>,
262
263    /// Timeout in seconds for each codex exec invocation.
264    #[arg(
265        long,
266        default_value_t = 300,
267        help = "Timeout in seconds for each codex exec invocation (default: 300)"
268    )]
269    pub codex_timeout: u64,
270
271    /// G30: poll for the job singleton every second for up to N seconds
272    /// when another invocation holds the lock. Default: 0 (fail fast).
273    #[arg(long, value_name = "SECONDS")]
274    pub wait_job_singleton: Option<u64>,
275
276    /// G30: force acquisition of the singleton lock by removing a stale
277    /// lock file from a previously crashed invocation.
278    #[arg(long, default_value_t = false)]
279    pub force_job_singleton: bool,
280}
281
282/// Extraction mode for the ingest pipeline.
283#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
284pub enum IngestMode {
285    /// Body-only ingestion without entity/relationship extraction (default).
286    None,
287    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
288    Gliner,
289    /// LLM-curated extraction via locally installed Claude Code CLI.
290    ClaudeCode,
291    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
292    Codex,
293}
294
295/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
296/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
297/// values evaluate to false. Unrecognized non-empty values emit a
298/// `tracing::warn!` and evaluate to false.
299fn env_low_memory_enabled() -> bool {
300    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
301        Ok(v) if v.is_empty() => false,
302        Ok(v) => match v.to_lowercase().as_str() {
303            "1" | "true" | "yes" | "on" => true,
304            "0" | "false" | "no" | "off" => false,
305            other => {
306                tracing::warn!(
307                    target: "ingest",
308                    value = %other,
309                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
310                );
311                false
312            }
313        },
314        Err(_) => false,
315    }
316}
317
318/// Resolves the effective ingest parallelism honoring `--low-memory` and the
319/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
320///
321/// Precedence:
322/// 1. `--low-memory` CLI flag forces parallelism = 1.
323/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
324/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
325/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
326///
327/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
328/// emits a `tracing::warn!` advertising the override.
329fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
330    let env_flag = env_low_memory_enabled();
331    let low_memory = low_memory_flag || env_flag;
332
333    if low_memory {
334        if let Some(n) = ingest_parallelism {
335            if n > 1 {
336                tracing::warn!(
337                    target: "ingest",
338                    requested = n,
339                    "--ingest-parallelism overridden by --low-memory; using 1"
340                );
341            }
342        }
343        if low_memory_flag {
344            tracing::info!(
345                target: "ingest",
346                source = "flag",
347                "low-memory mode enabled: forcing --ingest-parallelism 1"
348            );
349        } else {
350            tracing::info!(
351                target: "ingest",
352                source = "env",
353                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
354            );
355        }
356        return 1;
357    }
358
359    ingest_parallelism
360        .unwrap_or_else(|| {
361            std::thread::available_parallelism()
362                .map(|v| v.get() / 2)
363                .unwrap_or(1)
364                .clamp(1, 4)
365        })
366        .max(1)
367}
368
369#[derive(Serialize)]
370struct IngestFileEvent<'a> {
371    file: &'a str,
372    name: &'a str,
373    status: &'a str,
374    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
375    truncated: bool,
376    /// Original derived name before truncation; only present when `truncated=true`.
377    #[serde(skip_serializing_if = "Option::is_none")]
378    original_name: Option<String>,
379    /// Original file basename (without extension); only present when it differs from `name`.
380    #[serde(skip_serializing_if = "Option::is_none")]
381    original_filename: Option<&'a str>,
382    #[serde(skip_serializing_if = "Option::is_none")]
383    error: Option<String>,
384    #[serde(skip_serializing_if = "Option::is_none")]
385    memory_id: Option<i64>,
386    #[serde(skip_serializing_if = "Option::is_none")]
387    action: Option<String>,
388    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
389    body_length: usize,
390    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
391    /// executou o embedding live. `"claude" | "codex" | "none"`. Absent on
392    /// the wire when `None` (kept for happy-path envelope cleanliness, ou
393    /// quando o arquivo não chegou à fase de embed por duplicação/erro).
394    #[serde(skip_serializing_if = "Option::is_none")]
395    backend_invoked: Option<&'a str>,
396}
397
398#[derive(Serialize)]
399struct IngestSummary {
400    summary: bool,
401    dir: String,
402    pattern: String,
403    recursive: bool,
404    files_total: usize,
405    files_succeeded: usize,
406    files_failed: usize,
407    files_skipped: usize,
408    elapsed_ms: u64,
409}
410
411/// Outcome of a successful per-file ingest, used to build the NDJSON event.
412struct FileSuccess {
413    memory_id: i64,
414    action: String,
415    body_length: usize,
416    backend_invoked: Option<&'static str>,
417}
418
419/// NDJSON progress event emitted to stderr after each file completes Phase A.
420/// Schema version 1; consumers should check `schema_version` before parsing.
421#[derive(Serialize)]
422struct StageProgressEvent<'a> {
423    schema_version: u8,
424    event: &'a str,
425    path: &'a str,
426    ms: u64,
427    entities: usize,
428    relationships: usize,
429}
430
431/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
432/// Phase B persists these to SQLite on the main thread in submission order.
433struct StagedFile {
434    body: String,
435    body_hash: String,
436    snippet: String,
437    name: String,
438    description: String,
439    embedding: Vec<f32>,
440    chunk_embeddings: Option<Vec<Vec<f32>>>,
441    chunks_info: Vec<crate::chunking::Chunk>,
442    entities: Vec<NewEntity>,
443    relationships: Vec<NewRelationship>,
444    entity_embeddings: Vec<Vec<f32>>,
445    urls: Vec<crate::extraction::ExtractedUrl>,
446    /// v1.0.84 (ADR-0042): discriminador do backend LLM que efetivamente
447    /// executou o embedding do body. `None` quando o batch paralelo
448    /// embed_passages_parallel_local fallback em backends diferentes
449    /// entre chunks (não há um único discriminador estável).
450    backend_invoked: Option<&'static str>,
451}
452
453/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
454/// Never touches the database — safe to run on any rayon thread.
455// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
456// stage knobs into a struct is a wider refactor than the surgical
457// scope of v1.0.79 allows.
458#[allow(clippy::too_many_arguments)]
459fn stage_file(
460    _idx: usize,
461    path: &Path,
462    name: &str,
463    paths: &AppPaths,
464    enable_ner: bool,
465    gliner_variant: crate::extraction::GlinerVariant,
466    max_rss_mb: u64,
467    llm_parallelism: usize,
468    llm_backend: crate::cli::LlmBackendChoice,
469) -> Result<StagedFile, AppError> {
470    use crate::constants::*;
471
472    if name.len() > MAX_MEMORY_NAME_LEN {
473        return Err(AppError::LimitExceeded(
474            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
475        ));
476    }
477    if name.starts_with("__") {
478        return Err(AppError::Validation(
479            crate::i18n::validation::reserved_name(),
480        ));
481    }
482    {
483        let slug_re = crate::constants::name_slug_regex();
484        if !slug_re.is_match(name) {
485            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
486                name,
487            )));
488        }
489    }
490
491    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
492    if file_size > MAX_MEMORY_BODY_LEN as u64 {
493        return Err(AppError::LimitExceeded(
494            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
495        ));
496    }
497    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
498    if raw_body.len() > MAX_MEMORY_BODY_LEN {
499        return Err(AppError::LimitExceeded(
500            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
501        ));
502    }
503    if raw_body.trim().is_empty() {
504        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
505    }
506
507    let description = format!("ingested from {}", path.display());
508    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
509        return Err(AppError::Validation(
510            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
511        ));
512    }
513
514    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
515    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
516    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
517    if enable_ner {
518        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
519            Ok(extracted) => {
520                extracted_urls = extracted.urls;
521                // v1.0.76: ExtractionResult.entities is now
522                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
523                // via name + type only; start/end offsets are not
524                // carried forward into the storage layer.
525                extracted_entities = extracted
526                    .entities
527                    .into_iter()
528                    .map(|e| NewEntity {
529                        name: e.name,
530                        entity_type: crate::entity_type::EntityType::Concept,
531                        description: None,
532                    })
533                    .collect();
534                // v1.0.76: relationships are no longer in the
535                // ExtractionResult struct; the LLM backend returns
536                // them in its own payload. The default build is
537                // URL-only extraction.
538                extracted_relationships.clear();
539
540                if extracted_entities.len() > max_entities_per_memory() {
541                    extracted_entities.truncate(max_entities_per_memory());
542                }
543                if extracted_relationships.len() > max_relationships_per_memory() {
544                    extracted_relationships.truncate(max_relationships_per_memory());
545                }
546            }
547            Err(e) => {
548                tracing::warn!(
549                    target: "ingest",
550                    file = %path.display(),
551                    "auto-extraction failed (graceful degradation): {e:#}"
552                );
553            }
554        }
555    }
556
557    for rel in &mut extracted_relationships {
558        rel.relation = crate::parsers::normalize_relation(&rel.relation);
559        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
560            return Err(AppError::Validation(format!(
561                "{e} for relationship '{}' -> '{}'",
562                rel.source, rel.target
563            )));
564        }
565        crate::parsers::warn_if_non_canonical(&rel.relation);
566        if !(0.0..=1.0).contains(&rel.strength) {
567            return Err(AppError::Validation(format!(
568                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
569                rel.strength, rel.source, rel.target
570            )));
571        }
572    }
573
574    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
575    let snippet: String = raw_body.chars().take(200).collect();
576
577    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
578    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
579        return Err(AppError::LimitExceeded(format!(
580            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
581            chunks_info.len(),
582            REMEMBER_MAX_SAFE_MULTI_CHUNKS
583        )));
584    }
585
586    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
587    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
588    // backend que efetivamente rodou para popular `backend_invoked` no
589    // envelope NDJSON por arquivo.
590    let (embedding, backend_invoked) = if chunks_info.len() == 1 {
591        // v1.0.82 (GAP-003): forward --llm-backend to embed_with_fallback.
592        crate::embedder::embed_passage_with_choice(&paths.models, &raw_body, Some(llm_backend))
593            .map(|(v, k)| (v, Some(k.as_str())))?
594    } else {
595        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
596        // serial per-chunk subprocess loop.
597        let chunk_texts: Vec<String> = chunks_info
598            .iter()
599            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
600            .collect();
601        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
602            if rss > max_rss_mb {
603                tracing::error!(
604                    target: "ingest",
605                    rss_mb = rss,
606                    max_rss_mb = max_rss_mb,
607                    file = %path.display(),
608                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
609                );
610                return Err(AppError::LowMemory {
611                    available_mb: crate::memory_guard::available_memory_mb(),
612                    required_mb: max_rss_mb,
613                });
614            }
615        }
616        let chunk_embeddings = crate::embedder::embed_passages_parallel_local(
617            &paths.models,
618            &chunk_texts,
619            llm_parallelism,
620            crate::embedder::chunk_embed_batch_size(),
621        )?;
622        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
623        chunk_embeddings_opt = Some(chunk_embeddings);
624        // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
625        // único por chamada. Conservadoramente, populamos None aqui.
626        (aggregated, None)
627    };
628
629    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
630    let entity_texts: Vec<String> = extracted_entities
631        .iter()
632        .map(|entity| match &entity.description {
633            Some(desc) => format!("{} {}", entity.name, desc),
634            None => entity.name.clone(),
635        })
636        .collect();
637    // G56 (v1.0.80): ingest reuses canonical entity names across many
638    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
639    // cache collapses the repeated LLM calls into one per unique text.
640    let (entity_embeddings, embed_cache_stats) =
641        crate::embedder::embed_entity_texts_cached(&paths.models, &entity_texts, llm_parallelism)?;
642    if embed_cache_stats.hits > 0 {
643        tracing::debug!(
644            hits = embed_cache_stats.hits,
645            misses = embed_cache_stats.misses,
646            requested = embed_cache_stats.requested,
647            "G56: entity embed cache hit (ingest)"
648        );
649    }
650
651    Ok(StagedFile {
652        body: raw_body,
653        body_hash,
654        snippet,
655        name: name.to_string(),
656        description,
657        embedding,
658        chunk_embeddings: chunk_embeddings_opt,
659        chunks_info,
660        entities: extracted_entities,
661        relationships: extracted_relationships,
662        entity_embeddings,
663        urls: extracted_urls,
664        backend_invoked,
665    })
666}
667
668/// Phase B: persists one `StagedFile` to the database on the main thread.
669fn persist_staged(
670    conn: &mut Connection,
671    namespace: &str,
672    memory_type: &str,
673    staged: StagedFile,
674) -> Result<FileSuccess, AppError> {
675    {
676        let active_count: u32 = conn.query_row(
677            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
678            [],
679            |r| r.get::<_, i64>(0).map(|v| v as u32),
680        )?;
681        let ns_exists: bool = conn.query_row(
682            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
683            rusqlite::params![namespace],
684            |r| r.get::<_, i64>(0).map(|v| v > 0),
685        )?;
686        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
687            return Err(AppError::NamespaceError(format!(
688                "active namespace limit of {} exceeded while creating '{namespace}'",
689                crate::constants::MAX_NAMESPACES_ACTIVE
690            )));
691        }
692    }
693
694    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
695    if existing_memory.is_some() {
696        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
697            &staged.name,
698            namespace,
699        )));
700    }
701    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
702
703    let new_memory = NewMemory {
704        namespace: namespace.to_string(),
705        name: staged.name.clone(),
706        memory_type: memory_type.to_string(),
707        description: staged.description.clone(),
708        body: staged.body,
709        body_hash: staged.body_hash,
710        session_id: None,
711        source: "agent".to_string(),
712        metadata: serde_json::json!({}),
713    };
714
715    if let Some(hash_id) = duplicate_hash_id {
716        tracing::debug!(
717            target: "ingest",
718            duplicate_memory_id = hash_id,
719            "identical body already exists; persisting a new memory anyway"
720        );
721    }
722
723    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
724
725    let memory_id = memories::insert(&tx, &new_memory)?;
726    versions::insert_version(
727        &tx,
728        memory_id,
729        1,
730        &staged.name,
731        memory_type,
732        &staged.description,
733        &new_memory.body,
734        &serde_json::to_string(&new_memory.metadata)?,
735        None,
736        "create",
737    )?;
738    memories::upsert_vec(
739        &tx,
740        memory_id,
741        namespace,
742        memory_type,
743        &staged.embedding,
744        &staged.name,
745        &staged.snippet,
746    )?;
747
748    if staged.chunks_info.len() > 1 {
749        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
750        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
751            AppError::Internal(anyhow::anyhow!(
752                "missing chunk embeddings cache on multi-chunk ingest path"
753            ))
754        })?;
755        for (i, emb) in chunk_embeddings.iter().enumerate() {
756            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
757        }
758    }
759
760    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
761        for (idx, entity) in staged.entities.iter().enumerate() {
762            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
763            let entity_embedding = &staged.entity_embeddings[idx];
764            entities::upsert_entity_vec(
765                &tx,
766                entity_id,
767                namespace,
768                entity.entity_type,
769                entity_embedding,
770                &entity.name,
771            )?;
772            entities::link_memory_entity(&tx, memory_id, entity_id)?;
773            entities::increment_degree(&tx, entity_id)?;
774        }
775        let entity_types: std::collections::HashMap<&str, EntityType> = staged
776            .entities
777            .iter()
778            .map(|entity| (entity.name.as_str(), entity.entity_type))
779            .collect();
780        for rel in &staged.relationships {
781            let source_entity = NewEntity {
782                name: rel.source.clone(),
783                entity_type: entity_types
784                    .get(rel.source.as_str())
785                    .copied()
786                    .unwrap_or(EntityType::Concept),
787                description: None,
788            };
789            let target_entity = NewEntity {
790                name: rel.target.clone(),
791                entity_type: entity_types
792                    .get(rel.target.as_str())
793                    .copied()
794                    .unwrap_or(EntityType::Concept),
795                description: None,
796            };
797            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
798            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
799            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
800            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
801        }
802    }
803
804    tx.commit()?;
805
806    if !staged.urls.is_empty() {
807        let url_entries: Vec<storage_urls::MemoryUrl> = staged
808            .urls
809            .into_iter()
810            .map(|u| storage_urls::MemoryUrl {
811                url: u.url,
812                offset: Some(u.start as i64),
813            })
814            .collect();
815        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
816    }
817
818    Ok(FileSuccess {
819        memory_id,
820        action: "created".to_string(),
821        body_length: new_memory.body.len(),
822        backend_invoked: staged.backend_invoked,
823    })
824}
825
826// ---------------------------------------------------------------------------
827// G20: mode-conditional flag validation
828// ---------------------------------------------------------------------------
829
830/// True when a scalar value matches its declared default. Local
831/// re-declaration (also defined in ) to keep this module
832/// self-contained for the G20 fix.
833fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
834    value == default
835}
836
837/// G20: validate that flags for one LLM provider were not passed when
838/// the operator selected a different provider (or no provider). Flags
839/// silently discarded by the wrong mode are surfaced as
840///  BEFORE any DB work, so the operator gets
841/// an actionable error instead of a surprise at runtime.
842///
843/// Mode-specific matrices:
844/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
845///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
846///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
847///   --enable-ner is false)
848/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
849/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
850///   max_cost_usd, resume, retry_failed, keep_queue
851fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
852    const DEFAULT_TIMEOUT: u64 = 300;
853    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
854
855    let mut conflicts: Vec<String> = Vec::new();
856
857    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
858
859    if is_local_mode {
860        if args.claude_binary.is_some() {
861            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
862        }
863        if args.claude_model.is_some() {
864            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
865        }
866        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
867            conflicts.push(format!(
868                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
869                args.claude_timeout
870            ));
871        }
872        if args.codex_binary.is_some() {
873            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
874        }
875        if args.codex_model.is_some() {
876            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
877        }
878        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
879            conflicts.push(format!(
880                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
881                args.codex_timeout
882            ));
883        }
884        if args.max_cost_usd.is_some() {
885            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
886        }
887        if args.resume {
888            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
889        }
890        if args.retry_failed {
891            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
892        }
893        if args.keep_queue {
894            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
895        }
896        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
897            conflicts.push(format!(
898                "--rate-limit-wait={} is ignored when --mode is none or gliner",
899                args.rate_limit_wait
900            ));
901        }
902    }
903
904    match args.mode {
905        IngestMode::ClaudeCode => {
906            if args.codex_binary.is_some() {
907                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
908            }
909            if args.codex_model.is_some() {
910                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
911            }
912            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
913                conflicts.push(format!(
914                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
915                    args.codex_timeout
916                ));
917            }
918        }
919        IngestMode::Codex => {
920            if args.claude_binary.is_some() {
921                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
922            }
923            if args.claude_model.is_some() {
924                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
925            }
926            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
927                conflicts.push(format!(
928                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
929                    args.claude_timeout
930                ));
931            }
932            if args.max_cost_usd.is_some() {
933                conflicts.push(
934                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
935                        .to_string(),
936                );
937            }
938            if args.resume {
939                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
940            }
941            if args.retry_failed {
942                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
943            }
944            if args.keep_queue {
945                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
946            }
947        }
948        IngestMode::None | IngestMode::Gliner => {}
949    }
950
951    if !conflicts.is_empty() {
952        return Err(AppError::Validation(format!(
953            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
954            args.mode,
955            conflicts.join("\n  - ")
956        )));
957    }
958
959    Ok(())
960}
961
962// ---------------------------------------------------------------------------
963
964#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
965pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
966    // G20: mode-conditional flag validation BEFORE any DB access.
967    // Surfaces flags that the wrong mode would silently discard.
968    validate_mode_conditional_flags_ingest(&args)?;
969    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
970    if args.mode == IngestMode::ClaudeCode {
971        return super::ingest_claude::run_claude_ingest(&args);
972    }
973    if args.mode == IngestMode::Codex {
974        return super::ingest_codex::run_codex_ingest(&args);
975    }
976
977    let started = std::time::Instant::now();
978
979    if !args.dir.exists() {
980        return Err(AppError::Validation(format!(
981            "directory not found: {}",
982            args.dir.display()
983        )));
984    }
985    if !args.dir.is_dir() {
986        return Err(AppError::Validation(format!(
987            "path is not a directory: {}",
988            args.dir.display()
989        )));
990    }
991
992    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
993    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
994    files.sort_unstable();
995
996    if files.len() > args.max_files {
997        return Err(AppError::Validation(format!(
998            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
999            files.len(),
1000            args.max_files
1001        )));
1002    }
1003
1004    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1005    let memory_type_str = args.r#type.as_str().to_string();
1006
1007    let paths = AppPaths::resolve(args.db.as_deref())?;
1008    let mut conn_or_err = match init_storage(&paths) {
1009        Ok(c) => Ok(c),
1010        Err(e) => Err(format!("{e}")),
1011    };
1012
1013    let mut succeeded: usize = 0;
1014    let mut failed: usize = 0;
1015    let mut skipped: usize = 0;
1016    let total = files.len();
1017
1018    // Pre-resolve all names before parallelisation so Phase A workers see a
1019    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1020    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1021
1022    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1023    // ProcessItem: the data moved into the producer thread for Phase A computation.
1024    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1025    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1026    // boundary into the rayon producer.
1027    enum SlotMeta {
1028        Skip {
1029            file_str: String,
1030            derived_base: String,
1031            name_truncated: bool,
1032            original_name: Option<String>,
1033            original_filename: Option<String>,
1034            reason: String,
1035        },
1036        Process {
1037            file_str: String,
1038            derived_name: String,
1039            name_truncated: bool,
1040            original_name: Option<String>,
1041            original_filename: Option<String>,
1042        },
1043    }
1044
1045    struct ProcessItem {
1046        idx: usize,
1047        path: PathBuf,
1048        file_str: String,
1049        derived_name: String,
1050    }
1051
1052    let files_cap = files.len();
1053    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1054    slots_meta.try_reserve(files_cap).map_err(|_| {
1055        AppError::LimitExceeded(format!(
1056            "allocation of {files_cap} slot metadata entries would exceed available memory"
1057        ))
1058    })?;
1059    let mut process_items: Vec<ProcessItem> = Vec::new();
1060    process_items.try_reserve(files_cap).map_err(|_| {
1061        AppError::LimitExceeded(format!(
1062            "allocation of {files_cap} process items would exceed available memory"
1063        ))
1064    })?;
1065    let mut truncations: Vec<(String, String)> = Vec::new();
1066    truncations.try_reserve(files_cap).map_err(|_| {
1067        AppError::LimitExceeded(format!(
1068            "allocation of {files_cap} truncation entries would exceed available memory"
1069        ))
1070    })?;
1071
1072    let max_name_length = args.max_name_length;
1073    for path in &files {
1074        let file_str = path.to_string_lossy().into_owned();
1075        let (derived_base, name_truncated, original_name) =
1076            derive_kebab_name(path, max_name_length);
1077        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1078
1079        if name_truncated {
1080            if let Some(ref orig) = original_name {
1081                truncations.push((orig.clone(), derived_base.clone()));
1082            }
1083        }
1084
1085        if derived_base.is_empty() {
1086            // original_filename: always include when it differs from the empty derived name
1087            let orig_filename = if !original_basename.is_empty() {
1088                Some(original_basename.to_string())
1089            } else {
1090                None
1091            };
1092            slots_meta.push(SlotMeta::Skip {
1093                file_str,
1094                derived_base: String::new(),
1095                name_truncated: false,
1096                original_name: None,
1097                original_filename: orig_filename,
1098                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1099            });
1100            continue;
1101        }
1102
1103        match unique_name(&derived_base, &taken_names) {
1104            Ok(derived_name) => {
1105                taken_names.insert(derived_name.clone());
1106                let idx = slots_meta.len();
1107                // original_filename: present only when the raw basename differs from the derived name
1108                let orig_filename = if original_basename != derived_name {
1109                    Some(original_basename.to_string())
1110                } else {
1111                    None
1112                };
1113                process_items.push(ProcessItem {
1114                    idx,
1115                    path: path.clone(),
1116                    file_str: file_str.clone(),
1117                    derived_name: derived_name.clone(),
1118                });
1119                slots_meta.push(SlotMeta::Process {
1120                    file_str,
1121                    derived_name,
1122                    name_truncated,
1123                    original_name,
1124                    original_filename: orig_filename,
1125                });
1126            }
1127            Err(e) => {
1128                let orig_filename = if original_basename != derived_base {
1129                    Some(original_basename.to_string())
1130                } else {
1131                    None
1132                };
1133                slots_meta.push(SlotMeta::Skip {
1134                    file_str,
1135                    derived_base,
1136                    name_truncated,
1137                    original_name,
1138                    original_filename: orig_filename,
1139                    reason: e.to_string(),
1140                });
1141            }
1142        }
1143    }
1144
1145    if !truncations.is_empty() {
1146        tracing::info!(
1147            target: "ingest",
1148            count = truncations.len(),
1149            max_name_length = max_name_length,
1150            max_len = DERIVED_NAME_MAX_LEN,
1151            "derived names truncated; pass -vv (debug) for per-file detail"
1152        );
1153    }
1154
1155    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1156    if args.dry_run {
1157        for meta in &slots_meta {
1158            match meta {
1159                SlotMeta::Skip {
1160                    file_str,
1161                    derived_base,
1162                    name_truncated,
1163                    original_name,
1164                    original_filename,
1165                    reason,
1166                } => {
1167                    output::emit_json_compact(&IngestFileEvent {
1168                        file: file_str,
1169                        name: derived_base,
1170                        status: "skip",
1171                        truncated: *name_truncated,
1172                        original_name: original_name.clone(),
1173                        original_filename: original_filename.as_deref(),
1174                        error: Some(reason.clone()),
1175                        memory_id: None,
1176                        action: None,
1177                        body_length: 0,
1178                        backend_invoked: None,
1179                    })?;
1180                }
1181                SlotMeta::Process {
1182                    file_str,
1183                    derived_name,
1184                    name_truncated,
1185                    original_name,
1186                    original_filename,
1187                } => {
1188                    output::emit_json_compact(&IngestFileEvent {
1189                        file: file_str,
1190                        name: derived_name,
1191                        status: "preview",
1192                        truncated: *name_truncated,
1193                        original_name: original_name.clone(),
1194                        original_filename: original_filename.as_deref(),
1195                        error: None,
1196                        memory_id: None,
1197                        action: None,
1198                        body_length: 0,
1199                        backend_invoked: None,
1200                    })?;
1201                }
1202            }
1203        }
1204        output::emit_json_compact(&IngestSummary {
1205            summary: true,
1206            dir: args.dir.to_string_lossy().into_owned(),
1207            pattern: args.pattern.clone(),
1208            recursive: args.recursive,
1209            files_total: total,
1210            files_succeeded: 0,
1211            files_failed: 0,
1212            files_skipped: 0,
1213            elapsed_ms: started.elapsed().as_millis() as u64,
1214        })?;
1215        return Ok(());
1216    }
1217
1218    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1219    if args.low_memory {
1220        if let Some(n) = args.ingest_parallelism {
1221            if n > 1 {
1222                return Err(AppError::Validation(
1223                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1224                        .to_string(),
1225                ));
1226            }
1227        }
1228    }
1229
1230    // Determine rayon thread pool size, honoring --low-memory and the
1231    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1232    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1233
1234    let pool = rayon::ThreadPoolBuilder::new()
1235        .num_threads(parallelism)
1236        .build()
1237        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1238
1239    if args.enable_ner && args.skip_extraction {
1240        return Err(AppError::Validation(
1241            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1242        ));
1243    }
1244    if args.skip_extraction && !args.enable_ner {
1245        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1246        // commit (9ddb17b) promoted this to a hard validation error, which
1247        // broke the "kept as a hidden no-op for backwards compatibility"
1248        // promise documented in CHANGELOG v1.0.45 and started failing
1249        // 5+ CI jobs whose E2E tests use this flag to skip the
1250        // GLiNER-ONNX model download in CI environments.
1251        tracing::warn!(
1252            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1253        );
1254    }
1255    let enable_ner = args.enable_ner;
1256    let max_rss_mb = args.max_rss_mb;
1257    let llm_parallelism = args.llm_parallelism as usize;
1258    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1259    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1260    // callers do not silently expect NER-quality extraction.
1261    if args.mode == IngestMode::Gliner {
1262        tracing::warn!(
1263            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1264        );
1265    }
1266    if args.gliner_variant != "fp32" {
1267        tracing::warn!(
1268            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1269        );
1270    }
1271    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1272        "int8" => crate::extraction::GlinerVariant::Int8,
1273        _ => crate::extraction::GlinerVariant::Fp32,
1274    };
1275
1276    let total_to_process = process_items.len();
1277    tracing::info!(
1278        target: "ingest",
1279        phase = "pipeline_start",
1280        files = total_to_process,
1281        ingest_parallelism = parallelism,
1282        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1283    );
1284
1285    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1286    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1287    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1288    let channel_bound = (parallelism * 2).max(1);
1289    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1290
1291    // Phase A: launched in a dedicated OS thread so the main thread can consume
1292    // the channel concurrently. pool.install() blocks the calling thread until
1293    // all rayon workers finish — if called on the main thread it would
1294    // reintroduce the 2-phase blocking behaviour we are eliminating.
1295    let paths_owned = paths.clone();
1296    let llm_backend_owned = llm_backend;
1297    let producer_handle = std::thread::spawn(move || {
1298        pool.install(|| {
1299            process_items.into_par_iter().for_each(|item| {
1300                if crate::shutdown_requested() {
1301                    return;
1302                }
1303                let t0 = std::time::Instant::now();
1304                let result = stage_file(
1305                    item.idx,
1306                    &item.path,
1307                    &item.derived_name,
1308                    &paths_owned,
1309                    enable_ner,
1310                    gliner_variant,
1311                    max_rss_mb,
1312                    llm_parallelism,
1313                    llm_backend_owned,
1314                );
1315                let elapsed_ms = t0.elapsed().as_millis() as u64;
1316
1317                // Emit NDJSON progress event to stderr so the user sees work
1318                // happening during long NER runs (e.g. 50 files × 27s each).
1319                let (n_entities, n_relationships) = match &result {
1320                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1321                    Err(_) => (0, 0),
1322                };
1323                let progress = StageProgressEvent {
1324                    schema_version: 1,
1325                    event: "file_extracted",
1326                    path: &item.file_str,
1327                    ms: elapsed_ms,
1328                    entities: n_entities,
1329                    relationships: n_relationships,
1330                };
1331                if let Ok(line) = serde_json::to_string(&progress) {
1332                    tracing::info!(target: "ingest_progress", "{}", line);
1333                }
1334
1335                // Blocking send applies backpressure: if Phase B is slower,
1336                // Phase A workers wait here instead of accumulating staged files
1337                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1338                let _ = tx.send((item.idx, result));
1339            });
1340            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1341            drop(tx);
1342        });
1343    });
1344
1345    // Phase B: main thread persists files as results arrive from the channel.
1346    // Results arrive in completion order (par_iter is unordered). We persist
1347    // each file immediately on arrival — this is the key fix for B1: with the
1348    // old 2-phase design the first DB write happened only after ALL files had
1349    // finished Phase A. Now the first commit happens as soon as the first file
1350    // completes Phase A, regardless of how many files remain.
1351    //
1352    // NDJSON output order follows completion order (not file-system sort order).
1353    // Skip slots are emitted at the end, after all Process results are consumed.
1354    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1355    // requirement than ensuring data is persisted before the user's timeout fires.
1356    let fail_fast = args.fail_fast;
1357
1358    // Emit pending Skip events first so agents see them early.
1359    for meta in &slots_meta {
1360        if let SlotMeta::Skip {
1361            file_str,
1362            derived_base,
1363            name_truncated,
1364            original_name,
1365            original_filename,
1366            reason,
1367        } = meta
1368        {
1369            output::emit_json_compact(&IngestFileEvent {
1370                file: file_str,
1371                name: derived_base,
1372                status: "skipped",
1373                truncated: *name_truncated,
1374                original_name: original_name.clone(),
1375                original_filename: original_filename.as_deref(),
1376                error: Some(reason.clone()),
1377                memory_id: None,
1378                action: None,
1379                body_length: 0,
1380                backend_invoked: None,
1381            })?;
1382            skipped += 1;
1383        }
1384    }
1385
1386    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1387    // as channel messages arrive in completion order.
1388    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1389        .iter()
1390        .enumerate()
1391        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1392        .collect();
1393
1394    tracing::info!(
1395        target: "ingest",
1396        phase = "persist_start",
1397        files = total_to_process,
1398        "phase B starting: persisting files incrementally as Phase A completes each one",
1399    );
1400
1401    // Drain channel and persist each file immediately — no accumulation into a
1402    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1403    // Phase B without applying backpressure.
1404    for (idx, stage_result) in rx {
1405        if crate::shutdown_requested() {
1406            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1407            break;
1408        }
1409        let meta = meta_index.get(&idx).ok_or_else(|| {
1410            AppError::Internal(anyhow::anyhow!(
1411                "channel idx {idx} has no corresponding Process slot"
1412            ))
1413        })?;
1414        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1415        {
1416            SlotMeta::Process {
1417                file_str,
1418                derived_name,
1419                name_truncated,
1420                original_name,
1421                original_filename,
1422            } => (
1423                file_str,
1424                derived_name,
1425                name_truncated,
1426                original_name,
1427                original_filename,
1428            ),
1429            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1430        };
1431
1432        // If storage init failed, every file fails with the same error.
1433        let conn = match conn_or_err.as_mut() {
1434            Ok(c) => c,
1435            Err(err_msg) => {
1436                let err_clone = err_msg.clone();
1437                output::emit_json_compact(&IngestFileEvent {
1438                    file: file_str,
1439                    name: derived_name,
1440                    status: "failed",
1441                    truncated: *name_truncated,
1442                    original_name: original_name.clone(),
1443                    original_filename: original_filename.as_deref(),
1444                    error: Some(err_clone.clone()),
1445                    memory_id: None,
1446                    action: None,
1447                    body_length: 0,
1448                    backend_invoked: None,
1449                })?;
1450                failed += 1;
1451                if fail_fast {
1452                    output::emit_json_compact(&IngestSummary {
1453                        summary: true,
1454                        dir: args.dir.display().to_string(),
1455                        pattern: args.pattern.clone(),
1456                        recursive: args.recursive,
1457                        files_total: total,
1458                        files_succeeded: succeeded,
1459                        files_failed: failed,
1460                        files_skipped: skipped,
1461                        elapsed_ms: started.elapsed().as_millis() as u64,
1462                    })?;
1463                    return Err(AppError::Validation(format!(
1464                        "ingest aborted on first failure: {err_clone}"
1465                    )));
1466                }
1467                continue;
1468            }
1469        };
1470
1471        let outcome =
1472            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1473
1474        match outcome {
1475            Ok(FileSuccess {
1476                memory_id,
1477                action,
1478                body_length,
1479                backend_invoked: file_backend_invoked,
1480            }) => {
1481                output::emit_json_compact(&IngestFileEvent {
1482                    file: file_str,
1483                    name: derived_name,
1484                    status: "indexed",
1485                    truncated: *name_truncated,
1486                    original_name: original_name.clone(),
1487                    original_filename: original_filename.as_deref(),
1488                    error: None,
1489                    memory_id: Some(memory_id),
1490                    action: Some(action),
1491                    body_length,
1492                    backend_invoked: file_backend_invoked,
1493                })?;
1494                succeeded += 1;
1495            }
1496            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1497                output::emit_json_compact(&IngestFileEvent {
1498                    file: file_str,
1499                    name: derived_name,
1500                    status: "skipped",
1501                    truncated: *name_truncated,
1502                    original_name: original_name.clone(),
1503                    original_filename: original_filename.as_deref(),
1504                    error: Some(format!("{e}")),
1505                    memory_id: None,
1506                    action: Some("duplicate".to_string()),
1507                    body_length: 0,
1508                    backend_invoked: None,
1509                })?;
1510                skipped += 1;
1511            }
1512            Err(e) => {
1513                let err_msg = format!("{e}");
1514                output::emit_json_compact(&IngestFileEvent {
1515                    file: file_str,
1516                    name: derived_name,
1517                    status: "failed",
1518                    truncated: *name_truncated,
1519                    original_name: original_name.clone(),
1520                    original_filename: original_filename.as_deref(),
1521                    error: Some(err_msg.clone()),
1522                    memory_id: None,
1523                    action: None,
1524                    body_length: 0,
1525                    backend_invoked: None,
1526                })?;
1527                failed += 1;
1528                if fail_fast {
1529                    output::emit_json_compact(&IngestSummary {
1530                        summary: true,
1531                        dir: args.dir.display().to_string(),
1532                        pattern: args.pattern.clone(),
1533                        recursive: args.recursive,
1534                        files_total: total,
1535                        files_succeeded: succeeded,
1536                        files_failed: failed,
1537                        files_skipped: skipped,
1538                        elapsed_ms: started.elapsed().as_millis() as u64,
1539                    })?;
1540                    return Err(AppError::Validation(format!(
1541                        "ingest aborted on first failure: {err_msg}"
1542                    )));
1543                }
1544            }
1545        }
1546    }
1547
1548    // Wait for the producer thread to finish cleanly.
1549    producer_handle
1550        .join()
1551        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1552
1553    if let Ok(ref conn) = conn_or_err {
1554        if succeeded > 0 {
1555            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1556        }
1557    }
1558
1559    output::emit_json_compact(&IngestSummary {
1560        summary: true,
1561        dir: args.dir.display().to_string(),
1562        pattern: args.pattern.clone(),
1563        recursive: args.recursive,
1564        files_total: total,
1565        files_succeeded: succeeded,
1566        files_failed: failed,
1567        files_skipped: skipped,
1568        elapsed_ms: started.elapsed().as_millis() as u64,
1569    })?;
1570
1571    Ok(())
1572}
1573
1574/// Auto-initialises the database (matches the contract of every other CRUD
1575/// handler) and returns a fresh read/write connection ready for the ingest
1576/// loop. Errors here are recoverable per-file: the caller surfaces them as
1577/// failure events so `--fail-fast` and the continue-on-error path keep
1578/// working when, for example, the user points `--db` at an unwritable path.
1579fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1580    ensure_db_ready(paths)?;
1581    let conn = open_rw(&paths.db)?;
1582    Ok(conn)
1583}
1584
1585pub(crate) fn collect_files(
1586    dir: &Path,
1587    pattern: &str,
1588    recursive: bool,
1589    out: &mut Vec<PathBuf>,
1590) -> Result<(), AppError> {
1591    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1592    for entry in entries {
1593        let entry = entry.map_err(AppError::Io)?;
1594        let path = entry.path();
1595        let file_type = entry.file_type().map_err(AppError::Io)?;
1596        if file_type.is_file() {
1597            let name = entry.file_name();
1598            let name_str = name.to_string_lossy();
1599            if matches_pattern(&name_str, pattern) {
1600                out.push(path);
1601            }
1602        } else if file_type.is_dir() && recursive {
1603            collect_files(&path, pattern, recursive, out)?;
1604        }
1605    }
1606    Ok(())
1607}
1608
1609fn matches_pattern(name: &str, pattern: &str) -> bool {
1610    if let Some(suffix) = pattern.strip_prefix('*') {
1611        name.ends_with(suffix)
1612    } else if let Some(prefix) = pattern.strip_suffix('*') {
1613        name.starts_with(prefix)
1614    } else {
1615        name == pattern
1616    }
1617}
1618
1619/// Returns `(final_name, truncated, original_name)`.
1620/// `truncated` is true when the derived name exceeded `max_len`.
1621/// `original_name` holds the pre-truncation name only when `truncated=true`.
1622///
1623/// Non-ASCII characters are first decomposed via NFD and then stripped of
1624/// combining marks so accented letters fold to their base ASCII letter
1625/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1626/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1627/// preserves meaningful word content rather than collapsing the basename
1628/// to a few stray ASCII letters as the previous filter did.
1629pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1630    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1631    let lowered: String = stem
1632        .nfd()
1633        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1634        .map(|c| {
1635            if c == '_' || c.is_whitespace() {
1636                '-'
1637            } else {
1638                c
1639            }
1640        })
1641        .map(|c| c.to_ascii_lowercase())
1642        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1643        .collect();
1644    let collapsed = collapse_dashes(&lowered);
1645    let trimmed_raw = collapsed.trim_matches('-').to_string();
1646    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1647    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1648        format!("doc-{trimmed_raw}")
1649    } else {
1650        trimmed_raw
1651    };
1652    if trimmed.len() > max_len {
1653        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1654        tracing::debug!(
1655            target: "ingest",
1656            original = %trimmed,
1657            truncated_to = %truncated,
1658            max_len = max_len,
1659            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1660        );
1661        (truncated, true, Some(trimmed))
1662    } else {
1663        (trimmed, false, None)
1664    }
1665}
1666
1667/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1668/// numeric suffix (`-1`, `-2`, …) when needed.
1669///
1670/// `taken` is the set of names already consumed in the current ingest run.
1671/// The caller is expected to insert the returned name into `taken` so the
1672/// next call observes the consumption. Cross-run collisions are intentionally
1673/// surfaced by the per-file persistence path as duplicates so re-ingestion
1674/// of identical corpora stays idempotent.
1675///
1676/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1677/// candidates collide, signalling a pathological corpus that should be
1678/// renamed manually.
1679fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1680    if !taken.contains(base) {
1681        return Ok(base.to_string());
1682    }
1683    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1684        let candidate = format!("{base}-{suffix}");
1685        if !taken.contains(&candidate) {
1686            tracing::warn!(
1687                target: "ingest",
1688                base = %base,
1689                resolved = %candidate,
1690                suffix,
1691                "memory name collision resolved with numeric suffix"
1692            );
1693            return Ok(candidate);
1694        }
1695    }
1696    Err(AppError::Validation(format!(
1697        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1698    )))
1699}
1700
1701fn collapse_dashes(s: &str) -> String {
1702    let mut out = String::with_capacity(s.len());
1703    let mut prev_dash = false;
1704    for c in s.chars() {
1705        if c == '-' {
1706            if !prev_dash {
1707                out.push('-');
1708            }
1709            prev_dash = true;
1710        } else {
1711            out.push(c);
1712            prev_dash = false;
1713        }
1714    }
1715    out
1716}
1717
1718#[cfg(test)]
1719mod tests {
1720    use super::*;
1721    use std::path::PathBuf;
1722
1723    #[test]
1724    fn matches_pattern_suffix() {
1725        assert!(matches_pattern("foo.md", "*.md"));
1726        assert!(!matches_pattern("foo.txt", "*.md"));
1727        assert!(matches_pattern("foo.md", "*"));
1728    }
1729
1730    #[test]
1731    fn matches_pattern_prefix() {
1732        assert!(matches_pattern("README.md", "README*"));
1733        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1734    }
1735
1736    #[test]
1737    fn matches_pattern_exact() {
1738        assert!(matches_pattern("README.md", "README.md"));
1739        assert!(!matches_pattern("readme.md", "README.md"));
1740    }
1741
1742    #[test]
1743    fn derive_kebab_underscore_to_dash() {
1744        let p = PathBuf::from("/tmp/claude_code_headless.md");
1745        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1746        assert_eq!(name, "claude-code-headless");
1747        assert!(!truncated);
1748        assert!(original.is_none());
1749    }
1750
1751    #[test]
1752    fn derive_kebab_uppercase_lowered() {
1753        let p = PathBuf::from("/tmp/README.md");
1754        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1755        assert_eq!(name, "readme");
1756        assert!(!truncated);
1757        assert!(original.is_none());
1758    }
1759
1760    #[test]
1761    fn derive_kebab_strips_non_kebab_chars() {
1762        let p = PathBuf::from("/tmp/some@weird#name!.md");
1763        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1764        assert_eq!(name, "someweirdname");
1765        assert!(!truncated);
1766        assert!(original.is_none());
1767    }
1768
1769    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1770    // accented characters instead of dropping them entirely.
1771    #[test]
1772    fn derive_kebab_folds_accented_letters_to_ascii() {
1773        let p = PathBuf::from("/tmp/açaí.md");
1774        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1775        assert_eq!(name, "acai", "got '{name}'");
1776    }
1777
1778    #[test]
1779    fn derive_kebab_handles_naive_with_diaeresis() {
1780        let p = PathBuf::from("/tmp/naïve-test.md");
1781        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1782        assert_eq!(name, "naive-test", "got '{name}'");
1783    }
1784
1785    #[test]
1786    fn derive_kebab_drops_emoji_keeps_word() {
1787        let p = PathBuf::from("/tmp/🚀-rocket.md");
1788        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1789        assert_eq!(name, "rocket", "got '{name}'");
1790    }
1791
1792    #[test]
1793    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1794        let p = PathBuf::from("/tmp/açaí🦜.md");
1795        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1796        assert_eq!(name, "acai", "got '{name}'");
1797    }
1798
1799    #[test]
1800    fn derive_kebab_pure_emoji_yields_empty() {
1801        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1802        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1803        assert!(name.is_empty(), "got '{name}'");
1804    }
1805
1806    #[test]
1807    fn derive_kebab_collapses_consecutive_dashes() {
1808        let p = PathBuf::from("/tmp/a__b___c.md");
1809        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1810        assert_eq!(name, "a-b-c");
1811        assert!(!truncated);
1812        assert!(original.is_none());
1813    }
1814
1815    #[test]
1816    fn derive_kebab_truncates_to_60_chars() {
1817        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1818        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1819        assert!(name.len() <= 60, "got len {}", name.len());
1820        assert!(truncated);
1821        assert!(original.is_some());
1822        assert!(original.unwrap().len() > 60);
1823    }
1824
1825    #[test]
1826    fn collect_files_finds_md_files() {
1827        let tmp = tempfile::tempdir().expect("tempdir");
1828        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1829        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1830        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1831        let mut out = Vec::new();
1832        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1833        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1834    }
1835
1836    #[test]
1837    fn collect_files_recursive_descends_subdirs() {
1838        let tmp = tempfile::tempdir().expect("tempdir");
1839        let sub = tmp.path().join("sub");
1840        std::fs::create_dir(&sub).unwrap();
1841        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1842        std::fs::write(sub.join("b.md"), "y").unwrap();
1843        let mut out = Vec::new();
1844        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1845        assert_eq!(out.len(), 2);
1846    }
1847
1848    #[test]
1849    fn collect_files_non_recursive_skips_subdirs() {
1850        let tmp = tempfile::tempdir().expect("tempdir");
1851        let sub = tmp.path().join("sub");
1852        std::fs::create_dir(&sub).unwrap();
1853        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1854        std::fs::write(sub.join("b.md"), "y").unwrap();
1855        let mut out = Vec::new();
1856        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1857        assert_eq!(out.len(), 1);
1858    }
1859
1860    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1861
1862    #[test]
1863    fn derive_kebab_long_basename_truncated_within_cap() {
1864        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1865        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1866        assert!(
1867            name.len() <= DERIVED_NAME_MAX_LEN,
1868            "truncated name must respect cap; got {} chars",
1869            name.len()
1870        );
1871        assert!(!name.is_empty());
1872        assert!(truncated);
1873        assert!(original.is_some());
1874    }
1875
1876    #[test]
1877    fn unique_name_returns_base_when_free() {
1878        let taken: BTreeSet<String> = BTreeSet::new();
1879        let resolved = unique_name("note", &taken).expect("must resolve");
1880        assert_eq!(resolved, "note");
1881    }
1882
1883    #[test]
1884    fn unique_name_appends_first_free_suffix_on_collision() {
1885        let mut taken: BTreeSet<String> = BTreeSet::new();
1886        taken.insert("note".to_string());
1887        taken.insert("note-1".to_string());
1888        let resolved = unique_name("note", &taken).expect("must resolve");
1889        assert_eq!(resolved, "note-2");
1890    }
1891
1892    #[test]
1893    fn unique_name_errors_after_collision_cap() {
1894        let mut taken: BTreeSet<String> = BTreeSet::new();
1895        taken.insert("note".to_string());
1896        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1897            taken.insert(format!("note-{i}"));
1898        }
1899        let err = unique_name("note", &taken).expect_err("must surface error");
1900        assert!(matches!(err, AppError::Validation(_)));
1901    }
1902
1903    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1904
1905    #[test]
1906    fn validate_relation_format_accepts_valid_relations() {
1907        use crate::parsers::{is_canonical_relation, validate_relation_format};
1908        assert!(validate_relation_format("applies_to").is_ok());
1909        assert!(validate_relation_format("depends_on").is_ok());
1910        assert!(validate_relation_format("implements").is_ok());
1911        assert!(validate_relation_format("").is_err());
1912        assert!(is_canonical_relation("applies_to"));
1913        assert!(!is_canonical_relation("implements"));
1914    }
1915
1916    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1917
1918    use serial_test::serial;
1919
1920    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1921    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1922        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1923        let prev = std::env::var(key).ok();
1924        match value {
1925            Some(v) => std::env::set_var(key, v),
1926            None => std::env::remove_var(key),
1927        }
1928        f();
1929        match prev {
1930            Some(p) => std::env::set_var(key, p),
1931            None => std::env::remove_var(key),
1932        }
1933    }
1934
1935    #[test]
1936    #[serial]
1937    fn env_low_memory_enabled_unset_returns_false() {
1938        with_env_var(None, || assert!(!env_low_memory_enabled()));
1939    }
1940
1941    #[test]
1942    #[serial]
1943    fn env_low_memory_enabled_empty_returns_false() {
1944        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1945    }
1946
1947    #[test]
1948    #[serial]
1949    fn env_low_memory_enabled_truthy_values_return_true() {
1950        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1951            with_env_var(Some(v), || {
1952                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1953            });
1954        }
1955    }
1956
1957    #[test]
1958    #[serial]
1959    fn env_low_memory_enabled_falsy_values_return_false() {
1960        for v in ["0", "false", "FALSE", "no", "off"] {
1961            with_env_var(Some(v), || {
1962                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1963            });
1964        }
1965    }
1966
1967    #[test]
1968    #[serial]
1969    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1970        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1971    }
1972
1973    #[test]
1974    #[serial]
1975    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1976        with_env_var(None, || {
1977            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1978            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1979            assert_eq!(resolve_parallelism(true, None), 1);
1980        });
1981    }
1982
1983    #[test]
1984    #[serial]
1985    fn resolve_parallelism_env_forces_one_when_flag_off() {
1986        with_env_var(Some("1"), || {
1987            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1988            assert_eq!(resolve_parallelism(false, None), 1);
1989        });
1990    }
1991
1992    #[test]
1993    #[serial]
1994    fn resolve_parallelism_falsy_env_does_not_override() {
1995        with_env_var(Some("0"), || {
1996            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1997        });
1998    }
1999
2000    #[test]
2001    #[serial]
2002    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2003        with_env_var(None, || {
2004            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2005            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2006        });
2007    }
2008
2009    #[test]
2010    #[serial]
2011    fn resolve_parallelism_default_when_unset() {
2012        with_env_var(None, || {
2013            let p = resolve_parallelism(false, None);
2014            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2015        });
2016    }
2017
2018    #[test]
2019    fn ingest_args_parses_low_memory_flag_via_clap() {
2020        use clap::Parser;
2021        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2022        // on the public `Cli` definition so the flag is wired end-to-end.
2023        let cli = crate::cli::Cli::try_parse_from([
2024            "sqlite-graphrag",
2025            "ingest",
2026            "/tmp/dummy",
2027            "--type",
2028            "document",
2029            "--low-memory",
2030        ])
2031        .expect("parse must succeed");
2032        match cli.command {
2033            Some(crate::cli::Commands::Ingest(args)) => {
2034                assert!(args.low_memory, "--low-memory must set field to true");
2035            }
2036            _ => panic!("expected Ingest subcommand"),
2037        }
2038    }
2039
2040    #[test]
2041    fn ingest_args_low_memory_defaults_false() {
2042        use clap::Parser;
2043        let cli = crate::cli::Cli::try_parse_from([
2044            "sqlite-graphrag",
2045            "ingest",
2046            "/tmp/dummy",
2047            "--type",
2048            "document",
2049        ])
2050        .expect("parse must succeed");
2051        match cli.command {
2052            Some(crate::cli::Commands::Ingest(args)) => {
2053                assert!(!args.low_memory, "default must be false");
2054            }
2055            _ => panic!("expected Ingest subcommand"),
2056        }
2057    }
2058}