Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118    )]
119    pub enable_ner: bool,
120
121    /// GAP-E2E-011: generates a heuristic description from the first meaningful
122    /// line of the body, instead of "ingested from `<path>`". When
123    /// `--no-auto-describe` is passed, keeps the legacy behaviour.
124    #[arg(
125        long,
126        default_value_t = true,
127        overrides_with = "no_auto_describe",
128        help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129    )]
130    pub auto_describe: bool,
131    #[arg(
132        long = "no-auto-describe",
133        default_value_t = false,
134        help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135    )]
136    pub no_auto_describe: bool,
137    #[arg(
138        long,
139        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140        default_value = "fp32",
141        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142    )]
143    pub gliner_variant: String,
144
145    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
146    #[arg(long, default_value_t = false, hide = true)]
147    pub skip_extraction: bool,
148
149    /// Stop on first per-file error instead of continuing with the next file.
150    #[arg(long, default_value_t = false)]
151    pub fail_fast: bool,
152
153    /// Preview file-to-name mapping without loading model or persisting.
154    #[arg(long, default_value_t = false)]
155    pub dry_run: bool,
156
157    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
158    #[arg(long, default_value_t = 10_000)]
159    pub max_files: usize,
160
161    /// Namespace for the ingested memories.
162    #[arg(long)]
163    pub namespace: Option<String>,
164
165    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
166    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167    pub db: Option<String>,
168
169    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170    pub format: JsonOutputFormat,
171
172    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173    pub json: bool,
174
175    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
176    #[arg(
177        long,
178        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179    )]
180    pub ingest_parallelism: Option<usize>,
181
182    /// Force single-threaded ingest to reduce RSS pressure.
183    ///
184    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
185    /// explicit value. Recommended for environments with <4 GB available
186    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
187    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
188    /// (CLI flag has higher precedence than the env var).
189    #[arg(
190        long,
191        default_value_t = false,
192        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193                Recommended for environments with <4 GB available RAM or container/cgroup \
194                constraints. Trade-off: 3-4x longer wall time. Also honored via \
195                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196    )]
197    pub low_memory: bool,
198
199    /// Maximum process RSS in MiB; abort if exceeded during embedding.
200    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202    pub max_rss_mb: u64,
203
204    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
205    /// PER FILE. Multiplies with --ingest-parallelism (files staged
206    /// concurrently), hence the conservative default of 2. The effective
207    /// value is further bounded by CPU count and available RAM.
208    #[arg(long, default_value_t = 2, value_name = "N",
209          value_parser = clap::value_parser!(u64).range(1..=32),
210          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211    pub llm_parallelism: u64,
212
213    /// Maximum character length for derived memory names from file basenames.
214    ///
215    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
216    /// Shorter values leave more headroom for collision suffix resolution.
217    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218          help = "Maximum length for derived memory names (default: 60)")]
219    pub max_name_length: usize,
220
221    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
222    #[arg(long, value_enum, default_value_t = IngestMode::None)]
223    pub mode: IngestMode,
224
225    /// Explicit path to the Claude Code binary (only with --mode claude-code).
226    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227    pub claude_binary: Option<std::path::PathBuf>,
228
229    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
230    #[arg(long)]
231    pub claude_model: Option<String>,
232
233    /// Resume a previously interrupted claude-code ingest from the queue DB.
234    #[arg(long, default_value_t = false)]
235    pub resume: bool,
236
237    /// Retry only failed files from a previous claude-code ingest.
238    #[arg(long, default_value_t = false)]
239    pub retry_failed: bool,
240
241    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
242    #[arg(long, default_value_t = false)]
243    pub keep_queue: bool,
244
245    /// Custom path for the ingest queue DB. Default: alongside the --db database.
246    #[arg(long)]
247    pub queue_db: Option<String>,
248
249    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
250    #[arg(long, default_value_t = 60)]
251    pub rate_limit_wait: u64,
252
253    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
254    #[arg(long)]
255    pub max_cost_usd: Option<f64>,
256
257    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
258    #[arg(
259        long,
260        default_value_t = 300,
261        help = "Timeout in seconds for each claude -p invocation (default: 300)"
262    )]
263    pub claude_timeout: u64,
264
265    /// Explicit path to the Codex CLI binary (only with --mode codex).
266    #[arg(
267        long,
268        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270    )]
271    pub codex_binary: Option<PathBuf>,
272
273    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
274    #[arg(
275        long,
276        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277    )]
278    pub codex_model: Option<String>,
279
280    /// Timeout in seconds for each codex exec invocation.
281    #[arg(
282        long,
283        default_value_t = 300,
284        help = "Timeout in seconds for each codex exec invocation (default: 300)"
285    )]
286    pub codex_timeout: u64,
287
288    /// Path to the `opencode` binary (override PATH lookup, only with --mode opencode).
289    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290    pub opencode_binary: Option<PathBuf>,
291
292    /// Model override for OpenCode extraction.
293    #[arg(
294        long,
295        value_name = "MODEL",
296        env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297        help = "Model override for OpenCode extraction"
298    )]
299    pub opencode_model: Option<String>,
300
301    /// Timeout in seconds for each opencode run invocation.
302    #[arg(
303        long,
304        value_name = "SECONDS",
305        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306        default_value_t = 300,
307        help = "Timeout in seconds for each opencode run invocation (default: 300)"
308    )]
309    pub opencode_timeout: u64,
310
311    /// G30: poll for the job singleton every second for up to N seconds
312    /// when another invocation holds the lock. Default: 0 (fail fast).
313    #[arg(long, value_name = "SECONDS")]
314    pub wait_job_singleton: Option<u64>,
315
316    /// G30: force acquisition of the singleton lock by removing a stale
317    /// lock file from a previously crashed invocation.
318    #[arg(long, default_value_t = false)]
319    pub force_job_singleton: bool,
320
321    /// v1.0.93 (GAP-OR-INGEST): run `enrich --operation memory-bindings`
322    /// after all files are embedded, using the active `--llm-backend`.
323    #[arg(
324        long,
325        default_value_t = false,
326        help = "Run enrich --operation memory-bindings after all files are ingested"
327    )]
328    pub enrich_after: bool,
329
330    /// GAP-SG-54: update existing memories instead of skipping them. Without
331    /// this flag a file whose derived name already exists is reported `skipped`;
332    /// with it the existing memory's body, embedding and chunks are refreshed
333    /// (the `remember --force-merge` update path applied per file).
334    #[arg(
335        long,
336        default_value_t = false,
337        help = "Update existing memories on name collision instead of skipping (idempotent re-ingest)"
338    )]
339    pub force_merge: bool,
340}
341
342/// Extraction mode for the ingest pipeline.
343#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
344pub enum IngestMode {
345    /// Body-only ingestion without entity/relationship extraction (default).
346    None,
347    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
348    Gliner,
349    /// LLM-curated extraction via locally installed Claude Code CLI.
350    ClaudeCode,
351    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
352    Codex,
353    /// LLM-curated extraction via locally installed OpenCode CLI.
354    #[value(name = "opencode")]
355    Opencode,
356}
357
358/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
359/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
360/// values evaluate to false. Unrecognized non-empty values emit a
361/// `tracing::warn!` and evaluate to false.
362fn env_low_memory_enabled() -> bool {
363    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
364        Ok(v) if v.is_empty() => false,
365        Ok(v) => match v.to_lowercase().as_str() {
366            "1" | "true" | "yes" | "on" => true,
367            "0" | "false" | "no" | "off" => false,
368            other => {
369                tracing::warn!(
370                    target: "ingest",
371                    value = %other,
372                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
373                );
374                false
375            }
376        },
377        Err(_) => false,
378    }
379}
380
381/// Resolves the effective ingest parallelism honoring `--low-memory` and the
382/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
383///
384/// Precedence:
385/// 1. `--low-memory` CLI flag forces parallelism = 1.
386/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
387/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
388/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
389///
390/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
391/// emits a `tracing::warn!` advertising the override.
392fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
393    let env_flag = env_low_memory_enabled();
394    let low_memory = low_memory_flag || env_flag;
395
396    if low_memory {
397        if let Some(n) = ingest_parallelism {
398            if n > 1 {
399                tracing::warn!(
400                    target: "ingest",
401                    requested = n,
402                    "--ingest-parallelism overridden by --low-memory; using 1"
403                );
404            }
405        }
406        if low_memory_flag {
407            tracing::info!(
408                target: "ingest",
409                source = "flag",
410                "low-memory mode enabled: forcing --ingest-parallelism 1"
411            );
412        } else {
413            tracing::info!(
414                target: "ingest",
415                source = "env",
416                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
417            );
418        }
419        return 1;
420    }
421
422    ingest_parallelism
423        .unwrap_or_else(|| {
424            std::thread::available_parallelism()
425                .map(|v| v.get() / 2)
426                .unwrap_or(1)
427                .clamp(1, 4)
428        })
429        .max(1)
430}
431
432#[derive(Serialize)]
433struct IngestFileEvent<'a> {
434    file: &'a str,
435    name: &'a str,
436    status: &'a str,
437    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
438    truncated: bool,
439    /// Original derived name before truncation; only present when `truncated=true`.
440    #[serde(skip_serializing_if = "Option::is_none")]
441    original_name: Option<String>,
442    /// Original file basename (without extension); only present when it differs from `name`.
443    #[serde(skip_serializing_if = "Option::is_none")]
444    original_filename: Option<&'a str>,
445    #[serde(skip_serializing_if = "Option::is_none")]
446    error: Option<String>,
447    #[serde(skip_serializing_if = "Option::is_none")]
448    memory_id: Option<i64>,
449    #[serde(skip_serializing_if = "Option::is_none")]
450    action: Option<String>,
451    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
452    body_length: usize,
453    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
454    /// ran the live embedding. `"claude" | "codex" | "none"`. Absent on
455    /// the wire when `None` (kept for happy-path envelope cleanliness, or
456    /// when the file never reached the embed phase due to duplication/error).
457    #[serde(skip_serializing_if = "Option::is_none")]
458    backend_invoked: Option<&'a str>,
459}
460
461/// GAP-SG-06: per-file budget assessment emitted during `--dry-run` so the
462/// operator sees chunk and token counts (and how many sub-memories an
463/// auto-split would create) before running a real ingest.
464#[derive(Serialize)]
465struct IngestDryRunBudget<'a> {
466    budget: bool,
467    file: &'a str,
468    name: &'a str,
469    bytes: usize,
470    chunk_count: usize,
471    token_count: usize,
472    partition_count: usize,
473    exceeds_limits: bool,
474}
475
476#[derive(Serialize)]
477struct IngestSummary {
478    summary: bool,
479    dir: String,
480    pattern: String,
481    recursive: bool,
482    files_total: usize,
483    files_succeeded: usize,
484    files_failed: usize,
485    files_skipped: usize,
486    elapsed_ms: u64,
487}
488
489/// Outcome of a successful per-file ingest, used to build the NDJSON event.
490#[derive(Debug)]
491struct FileSuccess {
492    memory_id: i64,
493    action: String,
494    body_length: usize,
495    backend_invoked: Option<&'static str>,
496}
497
498/// NDJSON progress event emitted to stderr after each file completes Phase A.
499/// Schema version 1; consumers should check `schema_version` before parsing.
500#[derive(Serialize)]
501struct StageProgressEvent<'a> {
502    schema_version: u8,
503    event: &'a str,
504    path: &'a str,
505    ms: u64,
506    entities: usize,
507    relationships: usize,
508}
509
510/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
511/// Phase B persists these to SQLite on the main thread in submission order.
512struct StagedFile {
513    body: String,
514    body_hash: String,
515    snippet: String,
516    name: String,
517    description: String,
518    embedding: Option<Vec<f32>>,
519    chunk_embeddings: Option<Vec<Vec<f32>>>,
520    chunks_info: Vec<crate::chunking::Chunk>,
521    entities: Vec<NewEntity>,
522    relationships: Vec<NewRelationship>,
523    entity_embeddings: Option<Vec<Vec<f32>>>,
524    urls: Vec<crate::extraction::ExtractedUrl>,
525    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
526    /// ran the body embedding. `None` when the parallel batch
527    /// embed_passages_parallel_local fell back to different backends
528    /// across chunks (there is no single stable discriminator).
529    backend_invoked: Option<&'static str>,
530}
531
532/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
533/// Never touches the database — safe to run on any rayon thread.
534// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
535// stage knobs into a struct is a wider refactor than the surgical
536// scope of v1.0.79 allows.
537#[allow(clippy::too_many_arguments)]
538fn stage_file(
539    _idx: usize,
540    path: &Path,
541    name: &str,
542    paths: &AppPaths,
543    enable_ner: bool,
544    gliner_variant: crate::extraction::GlinerVariant,
545    max_rss_mb: u64,
546    llm_parallelism: usize,
547    llm_backend: crate::cli::LlmBackendChoice,
548    embedding_backend: crate::cli::EmbeddingBackendChoice,
549    auto_describe: bool,
550) -> Result<Vec<StagedFile>, AppError> {
551    use crate::constants::*;
552
553    if name.len() > MAX_MEMORY_NAME_LEN {
554        return Err(AppError::LimitExceeded(
555            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
556        ));
557    }
558    if name.starts_with("__") {
559        return Err(AppError::Validation(
560            crate::i18n::validation::reserved_name(),
561        ));
562    }
563    {
564        let slug_re = crate::constants::name_slug_regex();
565        if !slug_re.is_match(name) {
566            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
567                name,
568            )));
569        }
570    }
571
572    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
573    if file_size > MAX_MEMORY_BODY_LEN as u64 {
574        return Err(AppError::LimitExceeded(
575            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
576        ));
577    }
578    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
579    if raw_body.len() > MAX_MEMORY_BODY_LEN {
580        return Err(AppError::LimitExceeded(
581            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
582        ));
583    }
584    if raw_body.trim().is_empty() {
585        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
586    }
587
588    let description = if auto_describe {
589        crate::commands::ingest_heuristics::extract_heuristic_description(
590            &raw_body,
591            Some(&path.display().to_string()),
592        )
593    } else {
594        format!("ingested from {}", path.display())
595    };
596    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
597        return Err(AppError::Validation(
598            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
599        ));
600    }
601
602    // GAP-SG-04/07: auto-split a body that exceeds the single-memory budgets
603    // (bytes, chunk count, token count) into section-aligned sub-memories so
604    // ingestion never fails on an oversized document. A body that fits returns
605    // a single partition under the original name.
606    let partitions = chunking::split_body_by_sections(&raw_body);
607    let total_parts = partitions.len();
608    let mut staged = Vec::with_capacity(total_parts);
609    for (part_idx, part_body) in partitions.into_iter().enumerate() {
610        let part_name = if total_parts == 1 {
611            name.to_string()
612        } else {
613            format!("{name}-part-{}", part_idx + 1)
614        };
615        if part_name.len() > MAX_MEMORY_NAME_LEN {
616            return Err(AppError::LimitExceeded(
617                crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
618            ));
619        }
620        let part_description = if total_parts == 1 {
621            description.clone()
622        } else {
623            partition_description(&description, part_idx + 1, total_parts)
624        };
625        staged.push(stage_one_body(
626            part_body,
627            part_name,
628            part_description,
629            paths,
630            enable_ner,
631            gliner_variant,
632            max_rss_mb,
633            llm_parallelism,
634            llm_backend,
635            embedding_backend,
636        )?);
637    }
638    Ok(staged)
639}
640
641/// Builds a partition description by appending a `(part i/n)` marker, trimming
642/// the base (on a char boundary) when the marker would push it past
643/// [`crate::constants::MAX_MEMORY_DESCRIPTION_LEN`].
644fn partition_description(base: &str, part: usize, total: usize) -> String {
645    let suffix = format!(" (part {part}/{total})");
646    let max = crate::constants::MAX_MEMORY_DESCRIPTION_LEN;
647    if base.len() + suffix.len() <= max {
648        return format!("{base}{suffix}");
649    }
650    let mut cut = max.saturating_sub(suffix.len()).min(base.len());
651    while cut > 0 && !base.is_char_boundary(cut) {
652        cut -= 1;
653    }
654    format!("{}{}", &base[..cut], suffix)
655}
656
657/// Stages a single body (one memory) into a [`StagedFile`]: NER extraction,
658/// chunking, embedding and entity embedding. Extracted from `stage_file` so the
659/// GAP-SG-04/07 auto-split path stages each partition independently.
660#[allow(clippy::too_many_arguments)]
661fn stage_one_body(
662    raw_body: String,
663    name: String,
664    description: String,
665    paths: &AppPaths,
666    enable_ner: bool,
667    gliner_variant: crate::extraction::GlinerVariant,
668    max_rss_mb: u64,
669    llm_parallelism: usize,
670    llm_backend: crate::cli::LlmBackendChoice,
671    embedding_backend: crate::cli::EmbeddingBackendChoice,
672) -> Result<StagedFile, AppError> {
673    use crate::constants::*;
674
675    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
676    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
677    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
678    if enable_ner {
679        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
680            Ok(extracted) => {
681                extracted_urls = extracted.urls;
682                // v1.0.76: ExtractionResult.entities is now
683                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
684                // via name + type only; start/end offsets are not
685                // carried forward into the storage layer.
686                extracted_entities = extracted
687                    .entities
688                    .into_iter()
689                    .map(|e| NewEntity {
690                        name: e.name,
691                        entity_type: crate::entity_type::EntityType::Concept,
692                        description: None,
693                    })
694                    .collect();
695                // v1.0.76: relationships are no longer in the
696                // ExtractionResult struct; the LLM backend returns
697                // them in its own payload. The default build is
698                // URL-only extraction.
699                extracted_relationships.clear();
700
701                if extracted_entities.len() > max_entities_per_memory() {
702                    extracted_entities.truncate(max_entities_per_memory());
703                }
704                if extracted_relationships.len() > max_relationships_per_memory() {
705                    extracted_relationships.truncate(max_relationships_per_memory());
706                }
707            }
708            Err(e) => {
709                tracing::warn!(
710                    target: "ingest",
711                    file = %name,
712                    "auto-extraction failed (graceful degradation): {e:#}"
713                );
714            }
715        }
716    }
717
718    for rel in &mut extracted_relationships {
719        rel.relation = crate::parsers::normalize_relation(&rel.relation);
720        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
721            return Err(AppError::Validation(format!(
722                "{e} for relationship '{}' -> '{}'",
723                rel.source, rel.target
724            )));
725        }
726        crate::parsers::warn_if_non_canonical(&rel.relation);
727        if !(0.0..=1.0).contains(&rel.strength) {
728            return Err(AppError::Validation(format!(
729                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
730                rel.strength, rel.source, rel.target
731            )));
732        }
733    }
734
735    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
736    let snippet: String = raw_body.chars().take(200).collect();
737
738    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
739    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
740        return Err(AppError::LimitExceeded(format!(
741            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
742            chunks_info.len(),
743            REMEMBER_MAX_SAFE_MULTI_CHUNKS
744        )));
745    }
746
747    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
748    let skip_embed = crate::embedder::should_skip_embedding_on_failure();
749    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
750    // backend que efetivamente rodou para popular `backend_invoked` no
751    // envelope NDJSON por arquivo.
752    let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
753        .len()
754        == 1
755    {
756        match crate::embedder::embed_passage_with_embedding_choice(
757            &paths.models,
758            &raw_body,
759            embedding_backend,
760            llm_backend,
761        ) {
762            Ok((v, k)) => (Some(v), Some(k.as_str())),
763            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
764            Err(e) if skip_embed => {
765                tracing::warn!(error = %e, file = %name, "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
766                (None, None)
767            }
768            Err(e) => return Err(e),
769        }
770    } else {
771        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
772        // serial per-chunk subprocess loop.
773        let chunk_texts: Vec<String> = chunks_info
774            .iter()
775            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
776            .collect();
777        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
778            if rss > max_rss_mb {
779                tracing::error!(
780                    target: "ingest",
781                    rss_mb = rss,
782                    max_rss_mb = max_rss_mb,
783                    file = %name,
784                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
785                );
786                return Err(AppError::LowMemory {
787                    available_mb: crate::memory_guard::available_memory_mb(),
788                    required_mb: max_rss_mb,
789                });
790            }
791        }
792        match crate::embedder::embed_passages_parallel_with_embedding_choice(
793            &paths.models,
794            &chunk_texts,
795            llm_parallelism,
796            crate::embedder::chunk_embed_batch_size(),
797            embedding_backend,
798            llm_backend,
799        ) {
800            Ok(chunk_embeddings) => {
801                let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
802                chunk_embeddings_opt = Some(chunk_embeddings);
803                // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
804                // único por chamada. Conservadoramente, populamos None aqui.
805                (Some(aggregated), None)
806            }
807            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
808            Err(e) if skip_embed => {
809                tracing::warn!(error = %e, file = %name, "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
810                (None, None)
811            }
812            Err(e) => return Err(e),
813        }
814    };
815
816    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
817    let entity_texts: Vec<String> = extracted_entities
818        .iter()
819        .map(|entity| match &entity.description {
820            Some(desc) => format!("{} {}", entity.name, desc),
821            None => entity.name.clone(),
822        })
823        .collect();
824    // G56 (v1.0.80): ingest reuses canonical entity names across many
825    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
826    // cache collapses the repeated LLM calls into one per unique text.
827    let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
828        &paths.models,
829        &entity_texts,
830        llm_parallelism,
831        embedding_backend,
832        llm_backend,
833    ) {
834        Ok((entity_embeddings, embed_cache_stats)) => {
835            if embed_cache_stats.hits > 0 {
836                tracing::debug!(
837                    hits = embed_cache_stats.hits,
838                    misses = embed_cache_stats.misses,
839                    requested = embed_cache_stats.requested,
840                    "G56: entity embed cache hit (ingest)"
841                );
842            }
843            Some(entity_embeddings)
844        }
845        Err(e) if skip_embed => {
846            tracing::warn!(error = %e, file = %name, "ingest: entity embedding failed; --skip-embedding-on-failure active");
847            None
848        }
849        Err(e) => return Err(e),
850    };
851
852    Ok(StagedFile {
853        body: raw_body,
854        body_hash,
855        snippet,
856        name,
857        description,
858        embedding,
859        chunk_embeddings: chunk_embeddings_opt,
860        chunks_info,
861        entities: extracted_entities,
862        relationships: extracted_relationships,
863        entity_embeddings: entity_embeddings_opt,
864        urls: extracted_urls,
865        backend_invoked,
866    })
867}
868
869/// Links the staged entities and relationships to `memory_id` within `tx`.
870/// Shared by the create and `--force-merge` update paths so the graph-binding
871/// logic lives in one place.
872fn link_staged_graph(
873    tx: &Connection,
874    namespace: &str,
875    memory_id: i64,
876    staged: &StagedFile,
877) -> Result<(), AppError> {
878    if staged.entities.is_empty() && staged.relationships.is_empty() {
879        return Ok(());
880    }
881    for (idx, entity) in staged.entities.iter().enumerate() {
882        let entity_id = entities::upsert_entity(tx, namespace, entity)?;
883        if let Some(ref entity_embeddings) = staged.entity_embeddings {
884            if let Some(entity_embedding) = entity_embeddings.get(idx) {
885                entities::upsert_entity_vec(
886                    tx,
887                    entity_id,
888                    namespace,
889                    entity.entity_type,
890                    entity_embedding,
891                    &entity.name,
892                )?;
893            }
894        }
895        entities::link_memory_entity(tx, memory_id, entity_id)?;
896    }
897    let entity_types: std::collections::HashMap<&str, EntityType> = staged
898        .entities
899        .iter()
900        .map(|entity| (entity.name.as_str(), entity.entity_type))
901        .collect();
902
903    let mut affected_entity_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
904    for entity in &staged.entities {
905        if let Some(eid) = entities::find_entity_id(tx, namespace, &entity.name)? {
906            affected_entity_ids.insert(eid);
907        }
908    }
909
910    for rel in &staged.relationships {
911        let source_entity = NewEntity {
912            name: rel.source.clone(),
913            entity_type: entity_types
914                .get(rel.source.as_str())
915                .copied()
916                .unwrap_or(EntityType::Concept),
917            description: None,
918        };
919        let target_entity = NewEntity {
920            name: rel.target.clone(),
921            entity_type: entity_types
922                .get(rel.target.as_str())
923                .copied()
924                .unwrap_or(EntityType::Concept),
925            description: None,
926        };
927        let source_id = entities::upsert_entity(tx, namespace, &source_entity)?;
928        let target_id = entities::upsert_entity(tx, namespace, &target_entity)?;
929        let rel_id = entities::upsert_relationship(tx, namespace, source_id, target_id, rel)?;
930        entities::link_memory_relationship(tx, memory_id, rel_id)?;
931        affected_entity_ids.insert(source_id);
932        affected_entity_ids.insert(target_id);
933    }
934
935    for &eid in &affected_entity_ids {
936        entities::recalculate_degree(tx, eid)?;
937    }
938    Ok(())
939}
940
941/// Phase B: persists one `StagedFile` to the database on the main thread.
942///
943/// GAP-SG-54: when `force_merge` is true an existing memory with the same name
944/// is UPDATED (body/embedding/chunks/graph refreshed) instead of being rejected
945/// as a duplicate. GAP-SG-55: a memory whose `body_hash` already exists under a
946/// DIFFERENT name is skipped (content-level dedup) so divergent derived names do
947/// not duplicate identical content.
948fn persist_staged(
949    conn: &mut Connection,
950    namespace: &str,
951    memory_type: &str,
952    staged: StagedFile,
953    force_merge: bool,
954) -> Result<FileSuccess, AppError> {
955    {
956        let active_count: u32 = conn.query_row(
957            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
958            [],
959            |r| r.get::<_, i64>(0).map(|v| v as u32),
960        )?;
961        let ns_exists: bool = conn.query_row(
962            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
963            rusqlite::params![namespace],
964            |r| r.get::<_, i64>(0).map(|v| v > 0),
965        )?;
966        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
967            return Err(AppError::NamespaceError(format!(
968                "active namespace limit of {} exceeded while creating '{namespace}'",
969                crate::constants::MAX_NAMESPACES_ACTIVE
970            )));
971        }
972    }
973
974    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
975    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
976
977    let new_memory = NewMemory {
978        namespace: namespace.to_string(),
979        name: staged.name.clone(),
980        memory_type: memory_type.to_string(),
981        description: staged.description.clone(),
982        body: staged.body.clone(),
983        body_hash: staged.body_hash.clone(),
984        session_id: None,
985        source: "agent".to_string(),
986        metadata: serde_json::json!({}),
987    };
988    let body_length = new_memory.body.len();
989    let metadata_json = serde_json::to_string(&new_memory.metadata)?;
990
991    match existing_memory {
992        Some((existing_id, _updated_at, _version)) => {
993            if !force_merge {
994                return Err(AppError::Duplicate(errors_msg::duplicate_memory(
995                    &staged.name,
996                    namespace,
997                )));
998            }
999
1000            // GAP-SG-54: --force-merge update path. Refresh body, embedding,
1001            // chunks and graph bindings of the existing memory.
1002            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1003
1004            let (old_name, old_desc, old_body): (String, String, String) = tx.query_row(
1005                "SELECT name, description, body FROM memories WHERE id = ?1",
1006                rusqlite::params![existing_id],
1007                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1008            )?;
1009
1010            let next_v = versions::next_version(&tx, existing_id)?;
1011            memories::update(&tx, existing_id, &new_memory, None)?;
1012            memories::sync_fts_after_update(
1013                &tx,
1014                existing_id,
1015                &old_name,
1016                &old_desc,
1017                &old_body,
1018                &staged.name,
1019                &staged.description,
1020                &new_memory.body,
1021            )?;
1022            versions::insert_version(
1023                &tx,
1024                existing_id,
1025                next_v,
1026                &staged.name,
1027                memory_type,
1028                &staged.description,
1029                &new_memory.body,
1030                &metadata_json,
1031                None,
1032                "edit",
1033            )?;
1034
1035            // Re-index chunks: drop the old slices then re-insert the staged set.
1036            storage_chunks::delete_chunks(&tx, existing_id)?;
1037            if let Some(ref emb) = staged.embedding {
1038                memories::upsert_vec(
1039                    &tx,
1040                    existing_id,
1041                    namespace,
1042                    memory_type,
1043                    emb,
1044                    &staged.name,
1045                    &staged.snippet,
1046                )?;
1047            }
1048            if staged.chunks_info.len() > 1 {
1049                storage_chunks::insert_chunk_slices(
1050                    &tx,
1051                    existing_id,
1052                    &new_memory.body,
1053                    &staged.chunks_info,
1054                )?;
1055                if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1056                    for (i, emb) in chunk_embeddings.iter().enumerate() {
1057                        storage_chunks::upsert_chunk_vec(
1058                            &tx,
1059                            i as i64,
1060                            existing_id,
1061                            i as i32,
1062                            emb,
1063                        )?;
1064                    }
1065                }
1066            }
1067
1068            link_staged_graph(&tx, namespace, existing_id, &staged)?;
1069            tx.commit()?;
1070
1071            Ok(FileSuccess {
1072                memory_id: existing_id,
1073                action: "updated".to_string(),
1074                body_length,
1075                backend_invoked: staged.backend_invoked,
1076            })
1077        }
1078        None => {
1079            // GAP-SG-55: identical content already stored under a different name
1080            // → skip creating a duplicate (reported as `skipped` by the caller).
1081            if let Some(hash_id) = duplicate_hash_id {
1082                return Err(AppError::Duplicate(format!(
1083                    "identical body already stored as memory id {hash_id} (dedup by body_hash); skipping '{}'",
1084                    staged.name
1085                )));
1086            }
1087
1088            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1089            let memory_id = memories::insert(&tx, &new_memory)?;
1090            versions::insert_version(
1091                &tx,
1092                memory_id,
1093                1,
1094                &staged.name,
1095                memory_type,
1096                &staged.description,
1097                &new_memory.body,
1098                &metadata_json,
1099                None,
1100                "create",
1101            )?;
1102            if let Some(ref emb) = staged.embedding {
1103                memories::upsert_vec(
1104                    &tx,
1105                    memory_id,
1106                    namespace,
1107                    memory_type,
1108                    emb,
1109                    &staged.name,
1110                    &staged.snippet,
1111                )?;
1112            }
1113            if staged.chunks_info.len() > 1 {
1114                storage_chunks::insert_chunk_slices(
1115                    &tx,
1116                    memory_id,
1117                    &new_memory.body,
1118                    &staged.chunks_info,
1119                )?;
1120                if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1121                    for (i, emb) in chunk_embeddings.iter().enumerate() {
1122                        storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
1123                    }
1124                }
1125            }
1126            link_staged_graph(&tx, namespace, memory_id, &staged)?;
1127            tx.commit()?;
1128
1129            if !staged.urls.is_empty() {
1130                let url_entries: Vec<storage_urls::MemoryUrl> = staged
1131                    .urls
1132                    .into_iter()
1133                    .map(|u| storage_urls::MemoryUrl {
1134                        url: u.url,
1135                        offset: Some(u.start as i64),
1136                    })
1137                    .collect();
1138                let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
1139            }
1140
1141            Ok(FileSuccess {
1142                memory_id,
1143                action: "created".to_string(),
1144                body_length,
1145                backend_invoked: staged.backend_invoked,
1146            })
1147        }
1148    }
1149}
1150
1151// ---------------------------------------------------------------------------
1152// G20: mode-conditional flag validation
1153// ---------------------------------------------------------------------------
1154
1155/// True when a scalar value matches its declared default. Local
1156/// re-declaration (also defined in ) to keep this module
1157/// self-contained for the G20 fix.
1158fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1159    value == default
1160}
1161
1162/// G20: validate that flags for one LLM provider were not passed when
1163/// the operator selected a different provider (or no provider). Flags
1164/// silently discarded by the wrong mode are surfaced as
1165///  BEFORE any DB work, so the operator gets
1166/// an actionable error instead of a surprise at runtime.
1167///
1168/// Mode-specific matrices:
1169/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
1170///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
1171///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
1172///   --enable-ner is false)
1173/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
1174/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
1175///   max_cost_usd, resume, retry_failed, keep_queue
1176fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
1177    const DEFAULT_TIMEOUT: u64 = 300;
1178    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
1179
1180    let mut conflicts: Vec<String> = Vec::new();
1181
1182    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
1183
1184    if is_local_mode {
1185        if args.claude_binary.is_some() {
1186            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
1187        }
1188        if args.claude_model.is_some() {
1189            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
1190        }
1191        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1192            conflicts.push(format!(
1193                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1194                args.claude_timeout
1195            ));
1196        }
1197        if args.codex_binary.is_some() {
1198            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
1199        }
1200        if args.codex_model.is_some() {
1201            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
1202        }
1203        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1204            conflicts.push(format!(
1205                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1206                args.codex_timeout
1207            ));
1208        }
1209        if args.opencode_binary.is_some() {
1210            conflicts
1211                .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
1212        }
1213        if args.opencode_model.is_some() {
1214            conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
1215        }
1216        if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1217            conflicts.push(format!(
1218                "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1219                args.opencode_timeout
1220            ));
1221        }
1222        if args.max_cost_usd.is_some() {
1223            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1224        }
1225        if args.resume {
1226            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1227        }
1228        if args.retry_failed {
1229            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1230        }
1231        if args.keep_queue {
1232            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1233        }
1234        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1235            conflicts.push(format!(
1236                "--rate-limit-wait={} is ignored when --mode is none or gliner",
1237                args.rate_limit_wait
1238            ));
1239        }
1240    }
1241
1242    match args.mode {
1243        IngestMode::ClaudeCode => {
1244            if args.codex_binary.is_some() {
1245                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1246            }
1247            if args.codex_model.is_some() {
1248                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1249            }
1250            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1251                conflicts.push(format!(
1252                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1253                    args.codex_timeout
1254                ));
1255            }
1256            if args.opencode_binary.is_some() {
1257                conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1258            }
1259            if args.opencode_model.is_some() {
1260                conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1261            }
1262            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1263                conflicts.push(format!(
1264                    "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1265                    args.opencode_timeout
1266                ));
1267            }
1268        }
1269        IngestMode::Codex => {
1270            if args.claude_binary.is_some() {
1271                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1272            }
1273            if args.claude_model.is_some() {
1274                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1275            }
1276            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1277                conflicts.push(format!(
1278                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1279                    args.claude_timeout
1280                ));
1281            }
1282            if args.max_cost_usd.is_some() {
1283                conflicts.push(
1284                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1285                        .to_string(),
1286                );
1287            }
1288            if args.resume {
1289                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1290            }
1291            if args.retry_failed {
1292                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1293            }
1294            if args.keep_queue {
1295                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1296            }
1297            if args.opencode_binary.is_some() {
1298                conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1299            }
1300            if args.opencode_model.is_some() {
1301                conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1302            }
1303            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1304                conflicts.push(format!(
1305                    "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1306                    args.opencode_timeout
1307                ));
1308            }
1309        }
1310        IngestMode::Opencode => {
1311            if args.claude_binary.is_some() {
1312                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1313            }
1314            if args.claude_model.is_some() {
1315                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1316            }
1317            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1318                conflicts.push(format!(
1319                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1320                    args.claude_timeout
1321                ));
1322            }
1323            if args.codex_binary.is_some() {
1324                conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1325            }
1326            if args.codex_model.is_some() {
1327                conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1328            }
1329            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1330                conflicts.push(format!(
1331                    "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1332                    args.codex_timeout
1333                ));
1334            }
1335            if args.max_cost_usd.is_some() {
1336                conflicts.push(
1337                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1338                        .to_string(),
1339                );
1340            }
1341            if args.resume {
1342                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1343            }
1344            if args.retry_failed {
1345                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1346            }
1347            if args.keep_queue {
1348                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1349            }
1350        }
1351        IngestMode::None | IngestMode::Gliner => {}
1352    }
1353
1354    if !conflicts.is_empty() {
1355        return Err(AppError::Validation(format!(
1356            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
1357            args.mode,
1358            conflicts.join("\n  - ")
1359        )));
1360    }
1361
1362    Ok(())
1363}
1364
1365// ---------------------------------------------------------------------------
1366
1367#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1368pub fn run(
1369    args: IngestArgs,
1370    llm_backend: crate::cli::LlmBackendChoice,
1371    embedding_backend: crate::cli::EmbeddingBackendChoice,
1372) -> Result<(), AppError> {
1373    // G20: mode-conditional flag validation BEFORE any DB access.
1374    // Surfaces flags that the wrong mode would silently discard.
1375    validate_mode_conditional_flags_ingest(&args)?;
1376    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1377    if args.mode == IngestMode::ClaudeCode {
1378        return super::ingest_claude::run_claude_ingest(&args, embedding_backend, llm_backend);
1379    }
1380    if args.mode == IngestMode::Codex {
1381        return super::ingest_codex::run_codex_ingest(&args);
1382    }
1383    if args.mode == IngestMode::Opencode {
1384        return super::ingest_opencode::run_opencode_ingest(&args);
1385    }
1386
1387    let started = std::time::Instant::now();
1388
1389    if !args.dir.exists() {
1390        return Err(AppError::Validation(format!(
1391            "directory not found: {}",
1392            args.dir.display()
1393        )));
1394    }
1395    if !args.dir.is_dir() {
1396        return Err(AppError::Validation(format!(
1397            "path is not a directory: {}",
1398            args.dir.display()
1399        )));
1400    }
1401
1402    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1403    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1404    files.sort_unstable();
1405
1406    if files.len() > args.max_files {
1407        return Err(AppError::Validation(format!(
1408            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1409            files.len(),
1410            args.max_files
1411        )));
1412    }
1413
1414    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1415    let memory_type_str = args.r#type.as_str().to_string();
1416
1417    let paths = AppPaths::resolve(args.db.as_deref())?;
1418    let mut conn_or_err = match init_storage(&paths) {
1419        Ok(c) => Ok(c),
1420        Err(e) => Err(format!("{e}")),
1421    };
1422
1423    let mut succeeded: usize = 0;
1424    let mut failed: usize = 0;
1425    let mut skipped: usize = 0;
1426    let total = files.len();
1427
1428    // Pre-resolve all names before parallelisation so Phase A workers see a
1429    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1430    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1431
1432    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1433    // ProcessItem: the data moved into the producer thread for Phase A computation.
1434    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1435    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1436    // boundary into the rayon producer.
1437    enum SlotMeta {
1438        Skip {
1439            file_str: String,
1440            derived_base: String,
1441            name_truncated: bool,
1442            original_name: Option<String>,
1443            original_filename: Option<String>,
1444            reason: String,
1445        },
1446        Process {
1447            file_str: String,
1448            derived_name: String,
1449            name_truncated: bool,
1450            original_name: Option<String>,
1451            original_filename: Option<String>,
1452        },
1453    }
1454
1455    struct ProcessItem {
1456        idx: usize,
1457        path: PathBuf,
1458        file_str: String,
1459        derived_name: String,
1460    }
1461
1462    let files_cap = files.len();
1463    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1464    slots_meta.try_reserve(files_cap).map_err(|_| {
1465        AppError::LimitExceeded(format!(
1466            "allocation of {files_cap} slot metadata entries would exceed available memory"
1467        ))
1468    })?;
1469    let mut process_items: Vec<ProcessItem> = Vec::new();
1470    process_items.try_reserve(files_cap).map_err(|_| {
1471        AppError::LimitExceeded(format!(
1472            "allocation of {files_cap} process items would exceed available memory"
1473        ))
1474    })?;
1475    let mut truncations: Vec<(String, String)> = Vec::new();
1476    truncations.try_reserve(files_cap).map_err(|_| {
1477        AppError::LimitExceeded(format!(
1478            "allocation of {files_cap} truncation entries would exceed available memory"
1479        ))
1480    })?;
1481
1482    let max_name_length = args.max_name_length;
1483    for path in &files {
1484        let file_str = path.to_string_lossy().into_owned();
1485        let (derived_base, name_truncated, original_name) =
1486            derive_kebab_name(path, max_name_length);
1487        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1488
1489        if name_truncated {
1490            if let Some(ref orig) = original_name {
1491                truncations.push((orig.clone(), derived_base.clone()));
1492            }
1493        }
1494
1495        if derived_base.is_empty() {
1496            // original_filename: always include when it differs from the empty derived name
1497            let orig_filename = if !original_basename.is_empty() {
1498                Some(original_basename.to_string())
1499            } else {
1500                None
1501            };
1502            slots_meta.push(SlotMeta::Skip {
1503                file_str,
1504                derived_base: String::new(),
1505                name_truncated: false,
1506                original_name: None,
1507                original_filename: orig_filename,
1508                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1509            });
1510            continue;
1511        }
1512
1513        match unique_name(&derived_base, &taken_names) {
1514            Ok(derived_name) => {
1515                taken_names.insert(derived_name.clone());
1516                let idx = slots_meta.len();
1517                // original_filename: present only when the raw basename differs from the derived name
1518                let orig_filename = if original_basename != derived_name {
1519                    Some(original_basename.to_string())
1520                } else {
1521                    None
1522                };
1523                process_items.push(ProcessItem {
1524                    idx,
1525                    path: path.clone(),
1526                    file_str: file_str.clone(),
1527                    derived_name: derived_name.clone(),
1528                });
1529                slots_meta.push(SlotMeta::Process {
1530                    file_str,
1531                    derived_name,
1532                    name_truncated,
1533                    original_name,
1534                    original_filename: orig_filename,
1535                });
1536            }
1537            Err(e) => {
1538                let orig_filename = if original_basename != derived_base {
1539                    Some(original_basename.to_string())
1540                } else {
1541                    None
1542                };
1543                slots_meta.push(SlotMeta::Skip {
1544                    file_str,
1545                    derived_base,
1546                    name_truncated,
1547                    original_name,
1548                    original_filename: orig_filename,
1549                    reason: e.to_string(),
1550                });
1551            }
1552        }
1553    }
1554
1555    if !truncations.is_empty() {
1556        tracing::info!(
1557            target: "ingest",
1558            count = truncations.len(),
1559            max_name_length = max_name_length,
1560            max_len = DERIVED_NAME_MAX_LEN,
1561            "derived names truncated; pass -vv (debug) for per-file detail"
1562        );
1563    }
1564
1565    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1566    if args.dry_run {
1567        for meta in &slots_meta {
1568            match meta {
1569                SlotMeta::Skip {
1570                    file_str,
1571                    derived_base,
1572                    name_truncated,
1573                    original_name,
1574                    original_filename,
1575                    reason,
1576                } => {
1577                    output::emit_json_compact(&IngestFileEvent {
1578                        file: file_str,
1579                        name: derived_base,
1580                        status: "skip",
1581                        truncated: *name_truncated,
1582                        original_name: original_name.clone(),
1583                        original_filename: original_filename.as_deref(),
1584                        error: Some(reason.clone()),
1585                        memory_id: None,
1586                        action: None,
1587                        body_length: 0,
1588                        backend_invoked: None,
1589                    })?;
1590                }
1591                SlotMeta::Process {
1592                    file_str,
1593                    derived_name,
1594                    name_truncated,
1595                    original_name,
1596                    original_filename,
1597                } => {
1598                    output::emit_json_compact(&IngestFileEvent {
1599                        file: file_str,
1600                        name: derived_name,
1601                        status: "preview",
1602                        truncated: *name_truncated,
1603                        original_name: original_name.clone(),
1604                        original_filename: original_filename.as_deref(),
1605                        error: None,
1606                        memory_id: None,
1607                        action: None,
1608                        body_length: 0,
1609                        backend_invoked: None,
1610                    })?;
1611
1612                    // GAP-SG-06: report chunk + token counts and how many
1613                    // sub-memories an auto-split would create, so the operator
1614                    // detects chunk/token overflow before a real ingest.
1615                    match std::fs::read_to_string(file_str) {
1616                        Ok(body) => {
1617                            let budget = chunking::assess_body_budget(&body);
1618                            output::emit_json_compact(&IngestDryRunBudget {
1619                                budget: true,
1620                                file: file_str,
1621                                name: derived_name,
1622                                bytes: budget.bytes,
1623                                chunk_count: budget.chunk_count,
1624                                token_count: budget.approx_tokens,
1625                                partition_count: budget.partition_count,
1626                                exceeds_limits: budget.exceeds_limits,
1627                            })?;
1628                        }
1629                        Err(e) => {
1630                            tracing::warn!(
1631                                target: "ingest",
1632                                file = %file_str,
1633                                "dry-run: could not read file for budget assessment: {e}"
1634                            );
1635                        }
1636                    }
1637                }
1638            }
1639        }
1640        output::emit_json_compact(&IngestSummary {
1641            summary: true,
1642            dir: args.dir.to_string_lossy().into_owned(),
1643            pattern: args.pattern.clone(),
1644            recursive: args.recursive,
1645            files_total: total,
1646            files_succeeded: 0,
1647            files_failed: 0,
1648            files_skipped: 0,
1649            elapsed_ms: started.elapsed().as_millis() as u64,
1650        })?;
1651        return Ok(());
1652    }
1653
1654    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1655    if args.low_memory {
1656        if let Some(n) = args.ingest_parallelism {
1657            if n > 1 {
1658                return Err(AppError::Validation(
1659                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1660                        .to_string(),
1661                ));
1662            }
1663        }
1664    }
1665
1666    // Determine rayon thread pool size, honoring --low-memory and the
1667    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1668    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1669
1670    let pool = rayon::ThreadPoolBuilder::new()
1671        .num_threads(parallelism)
1672        .build()
1673        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1674
1675    if args.enable_ner && args.skip_extraction {
1676        return Err(AppError::Validation(
1677            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1678        ));
1679    }
1680    if args.skip_extraction && !args.enable_ner {
1681        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1682        // commit (9ddb17b) promoted this to a hard validation error, which
1683        // broke the "kept as a hidden no-op for backwards compatibility"
1684        // promise documented in CHANGELOG v1.0.45 and started failing
1685        // 5+ CI jobs whose E2E tests use this flag to skip the
1686        // GLiNER-ONNX model download in CI environments.
1687        tracing::warn!(
1688            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1689        );
1690    }
1691    let enable_ner = args.enable_ner;
1692    let auto_describe = args.auto_describe && !args.no_auto_describe;
1693    let max_rss_mb = args.max_rss_mb;
1694    let llm_parallelism = args.llm_parallelism as usize;
1695    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1696    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1697    // callers do not silently expect NER-quality extraction.
1698    if args.mode == IngestMode::Gliner {
1699        tracing::warn!(
1700            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1701        );
1702    }
1703    if args.gliner_variant != "fp32" {
1704        tracing::warn!(
1705            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1706        );
1707    }
1708    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1709        "int8" => crate::extraction::GlinerVariant::Int8,
1710        _ => crate::extraction::GlinerVariant::Fp32,
1711    };
1712
1713    let total_to_process = process_items.len();
1714    tracing::info!(
1715        target: "ingest",
1716        phase = "pipeline_start",
1717        files = total_to_process,
1718        ingest_parallelism = parallelism,
1719        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1720    );
1721
1722    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1723    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1724    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1725    let channel_bound = (parallelism * 2).max(1);
1726    let (tx, rx) = mpsc::sync_channel::<(usize, Result<Vec<StagedFile>, AppError>)>(channel_bound);
1727
1728    // Phase A: launched in a dedicated OS thread so the main thread can consume
1729    // the channel concurrently. pool.install() blocks the calling thread until
1730    // all rayon workers finish — if called on the main thread it would
1731    // reintroduce the 2-phase blocking behaviour we are eliminating.
1732    let paths_owned = paths.clone();
1733    let llm_backend_owned = llm_backend;
1734    let embedding_backend_owned = embedding_backend;
1735    let producer_handle = std::thread::spawn(move || {
1736        pool.install(|| {
1737            process_items.into_par_iter().for_each(|item| {
1738                if crate::shutdown_requested() {
1739                    return;
1740                }
1741                let t0 = std::time::Instant::now();
1742                let result = stage_file(
1743                    item.idx,
1744                    &item.path,
1745                    &item.derived_name,
1746                    &paths_owned,
1747                    enable_ner,
1748                    gliner_variant,
1749                    max_rss_mb,
1750                    llm_parallelism,
1751                    llm_backend_owned,
1752                    embedding_backend_owned,
1753                    auto_describe,
1754                );
1755                let elapsed_ms = t0.elapsed().as_millis() as u64;
1756
1757                // Emit NDJSON progress event to stderr so the user sees work
1758                // happening during long NER runs (e.g. 50 files × 27s each).
1759                let (n_entities, n_relationships) = match &result {
1760                    Ok(parts) => (
1761                        parts.iter().map(|sf| sf.entities.len()).sum::<usize>(),
1762                        parts.iter().map(|sf| sf.relationships.len()).sum::<usize>(),
1763                    ),
1764                    Err(_) => (0, 0),
1765                };
1766                let progress = StageProgressEvent {
1767                    schema_version: 1,
1768                    event: "file_extracted",
1769                    path: &item.file_str,
1770                    ms: elapsed_ms,
1771                    entities: n_entities,
1772                    relationships: n_relationships,
1773                };
1774                if let Ok(line) = serde_json::to_string(&progress) {
1775                    tracing::info!(target: "ingest_progress", "{}", line);
1776                }
1777
1778                // Blocking send applies backpressure: if Phase B is slower,
1779                // Phase A workers wait here instead of accumulating staged files
1780                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1781                let _ = tx.send((item.idx, result));
1782            });
1783            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1784            drop(tx);
1785        });
1786    });
1787
1788    // Phase B: main thread persists files as results arrive from the channel.
1789    // Results arrive in completion order (par_iter is unordered). We persist
1790    // each file immediately on arrival — this is the key fix for B1: with the
1791    // old 2-phase design the first DB write happened only after ALL files had
1792    // finished Phase A. Now the first commit happens as soon as the first file
1793    // completes Phase A, regardless of how many files remain.
1794    //
1795    // NDJSON output order follows completion order (not file-system sort order).
1796    // Skip slots are emitted at the end, after all Process results are consumed.
1797    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1798    // requirement than ensuring data is persisted before the user's timeout fires.
1799    let fail_fast = args.fail_fast;
1800
1801    // Emit pending Skip events first so agents see them early.
1802    for meta in &slots_meta {
1803        if let SlotMeta::Skip {
1804            file_str,
1805            derived_base,
1806            name_truncated,
1807            original_name,
1808            original_filename,
1809            reason,
1810        } = meta
1811        {
1812            output::emit_json_compact(&IngestFileEvent {
1813                file: file_str,
1814                name: derived_base,
1815                status: "skipped",
1816                truncated: *name_truncated,
1817                original_name: original_name.clone(),
1818                original_filename: original_filename.as_deref(),
1819                error: Some(reason.clone()),
1820                memory_id: None,
1821                action: None,
1822                body_length: 0,
1823                backend_invoked: None,
1824            })?;
1825            skipped += 1;
1826        }
1827    }
1828
1829    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1830    // as channel messages arrive in completion order.
1831    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1832        .iter()
1833        .enumerate()
1834        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1835        .collect();
1836
1837    tracing::info!(
1838        target: "ingest",
1839        phase = "persist_start",
1840        files = total_to_process,
1841        "phase B starting: persisting files incrementally as Phase A completes each one",
1842    );
1843
1844    // Drain channel and persist each file immediately — no accumulation into a
1845    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1846    // Phase B without applying backpressure.
1847    for (idx, stage_result) in rx {
1848        if crate::shutdown_requested() {
1849            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1850            break;
1851        }
1852        let meta = meta_index.get(&idx).ok_or_else(|| {
1853            AppError::Internal(anyhow::anyhow!(
1854                "channel idx {idx} has no corresponding Process slot"
1855            ))
1856        })?;
1857        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1858        {
1859            SlotMeta::Process {
1860                file_str,
1861                derived_name,
1862                name_truncated,
1863                original_name,
1864                original_filename,
1865            } => (
1866                file_str,
1867                derived_name,
1868                name_truncated,
1869                original_name,
1870                original_filename,
1871            ),
1872            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1873        };
1874
1875        // If storage init failed, every file fails with the same error.
1876        let conn = match conn_or_err.as_mut() {
1877            Ok(c) => c,
1878            Err(err_msg) => {
1879                let err_clone = err_msg.clone();
1880                output::emit_json_compact(&IngestFileEvent {
1881                    file: file_str,
1882                    name: derived_name,
1883                    status: "failed",
1884                    truncated: *name_truncated,
1885                    original_name: original_name.clone(),
1886                    original_filename: original_filename.as_deref(),
1887                    error: Some(err_clone.clone()),
1888                    memory_id: None,
1889                    action: None,
1890                    body_length: 0,
1891                    backend_invoked: None,
1892                })?;
1893                failed += 1;
1894                if fail_fast {
1895                    output::emit_json_compact(&IngestSummary {
1896                        summary: true,
1897                        dir: args.dir.display().to_string(),
1898                        pattern: args.pattern.clone(),
1899                        recursive: args.recursive,
1900                        files_total: total,
1901                        files_succeeded: succeeded,
1902                        files_failed: failed,
1903                        files_skipped: skipped,
1904                        elapsed_ms: started.elapsed().as_millis() as u64,
1905                    })?;
1906                    return Err(AppError::Validation(format!(
1907                        "ingest aborted on first failure: {err_clone}"
1908                    )));
1909                }
1910                continue;
1911            }
1912        };
1913
1914        match stage_result {
1915            Ok(parts) => {
1916                // GAP-SG-04/07: one source file can stage as multiple
1917                // sub-memories (auto-split partitions); persist and report each.
1918                for staged in parts {
1919                    let part_name = staged.name.clone();
1920                    match persist_staged(
1921                        conn,
1922                        &namespace,
1923                        &memory_type_str,
1924                        staged,
1925                        args.force_merge,
1926                    ) {
1927                        Ok(FileSuccess {
1928                            memory_id,
1929                            action,
1930                            body_length,
1931                            backend_invoked: file_backend_invoked,
1932                        }) => {
1933                            output::emit_json_compact(&IngestFileEvent {
1934                                file: file_str,
1935                                name: &part_name,
1936                                status: "indexed",
1937                                truncated: *name_truncated,
1938                                original_name: original_name.clone(),
1939                                original_filename: original_filename.as_deref(),
1940                                error: None,
1941                                memory_id: Some(memory_id),
1942                                action: Some(action),
1943                                body_length,
1944                                backend_invoked: file_backend_invoked,
1945                            })?;
1946                            succeeded += 1;
1947                        }
1948                        Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1949                            output::emit_json_compact(&IngestFileEvent {
1950                                file: file_str,
1951                                name: &part_name,
1952                                status: "skipped",
1953                                truncated: *name_truncated,
1954                                original_name: original_name.clone(),
1955                                original_filename: original_filename.as_deref(),
1956                                error: Some(format!("{e}")),
1957                                memory_id: None,
1958                                action: Some("duplicate".to_string()),
1959                                body_length: 0,
1960                                backend_invoked: None,
1961                            })?;
1962                            skipped += 1;
1963                        }
1964                        Err(e) => {
1965                            let err_msg = format!("{e}");
1966                            output::emit_json_compact(&IngestFileEvent {
1967                                file: file_str,
1968                                name: &part_name,
1969                                status: "failed",
1970                                truncated: *name_truncated,
1971                                original_name: original_name.clone(),
1972                                original_filename: original_filename.as_deref(),
1973                                error: Some(err_msg.clone()),
1974                                memory_id: None,
1975                                action: None,
1976                                body_length: 0,
1977                                backend_invoked: None,
1978                            })?;
1979                            failed += 1;
1980                            if fail_fast {
1981                                output::emit_json_compact(&IngestSummary {
1982                                    summary: true,
1983                                    dir: args.dir.display().to_string(),
1984                                    pattern: args.pattern.clone(),
1985                                    recursive: args.recursive,
1986                                    files_total: total,
1987                                    files_succeeded: succeeded,
1988                                    files_failed: failed,
1989                                    files_skipped: skipped,
1990                                    elapsed_ms: started.elapsed().as_millis() as u64,
1991                                })?;
1992                                return Err(AppError::Validation(format!(
1993                                    "ingest aborted on first failure: {err_msg}"
1994                                )));
1995                            }
1996                        }
1997                    }
1998                }
1999            }
2000            Err(e) => {
2001                let err_msg = format!("{e}");
2002                output::emit_json_compact(&IngestFileEvent {
2003                    file: file_str,
2004                    name: derived_name,
2005                    status: "failed",
2006                    truncated: *name_truncated,
2007                    original_name: original_name.clone(),
2008                    original_filename: original_filename.as_deref(),
2009                    error: Some(err_msg.clone()),
2010                    memory_id: None,
2011                    action: None,
2012                    body_length: 0,
2013                    backend_invoked: None,
2014                })?;
2015                failed += 1;
2016                if fail_fast {
2017                    output::emit_json_compact(&IngestSummary {
2018                        summary: true,
2019                        dir: args.dir.display().to_string(),
2020                        pattern: args.pattern.clone(),
2021                        recursive: args.recursive,
2022                        files_total: total,
2023                        files_succeeded: succeeded,
2024                        files_failed: failed,
2025                        files_skipped: skipped,
2026                        elapsed_ms: started.elapsed().as_millis() as u64,
2027                    })?;
2028                    return Err(AppError::Validation(format!(
2029                        "ingest aborted on first failure: {err_msg}"
2030                    )));
2031                }
2032            }
2033        }
2034    }
2035
2036    // Wait for the producer thread to finish cleanly.
2037    producer_handle
2038        .join()
2039        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
2040
2041    if let Ok(ref conn) = conn_or_err {
2042        if succeeded > 0 {
2043            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2044        }
2045    }
2046
2047    output::emit_json_compact(&IngestSummary {
2048        summary: true,
2049        dir: args.dir.display().to_string(),
2050        pattern: args.pattern.clone(),
2051        recursive: args.recursive,
2052        files_total: total,
2053        files_succeeded: succeeded,
2054        files_failed: failed,
2055        files_skipped: skipped,
2056        elapsed_ms: started.elapsed().as_millis() as u64,
2057    })?;
2058
2059    if args.enrich_after && succeeded > 0 {
2060        output::emit_json_compact(&serde_json::json!({
2061            "event": "enrich_phase_started",
2062            "operation": "memory-bindings"
2063        }))?;
2064        let enrich_args = super::enrich::EnrichArgs {
2065            operation: Some(super::enrich::EnrichOperation::MemoryBindings),
2066            mode: Some(super::enrich::EnrichMode::ClaudeCode),
2067            limit: None,
2068            dry_run: false,
2069            namespace: args.namespace.clone(),
2070            claude_binary: args.claude_binary.clone(),
2071            claude_model: args.claude_model.clone(),
2072            claude_timeout: args.claude_timeout,
2073            codex_binary: args.codex_binary.clone(),
2074            codex_model: args.codex_model.clone(),
2075            codex_timeout: args.codex_timeout,
2076            opencode_binary: args.opencode_binary.clone(),
2077            opencode_model: args.opencode_model.clone(),
2078            opencode_timeout: args.opencode_timeout,
2079            openrouter_model: None,
2080            openrouter_api_key: None,
2081            openrouter_timeout: 300,
2082            openrouter_base_url: None,
2083            db: args.db.clone(),
2084            json: false,
2085            resume: false,
2086            retry_failed: false,
2087            max_cost_usd: args.max_cost_usd,
2088            llm_parallelism: args.llm_parallelism as u32,
2089            wait_job_singleton: args.wait_job_singleton,
2090            force_job_singleton: args.force_job_singleton,
2091            names: Vec::new(),
2092            names_file: None,
2093            preflight_check: false,
2094            fallback_mode: None,
2095            rate_limit_buffer: 300,
2096            max_load_check: true,
2097            circuit_breaker_threshold: 5,
2098            preserve_threshold: 0.7,
2099            codex_model_validate: true,
2100            codex_model_fallback: None,
2101            min_output_chars: 500,
2102            max_output_chars: 2000,
2103            preserve_check: true,
2104            prompt_template: None,
2105            until_empty: false,
2106            max_runtime: None,
2107            max_attempts: 5,
2108            status: false,
2109            rest_concurrency: None,
2110            // enrich-after runs a plain memory-bindings pass; dead-letter,
2111            // backoff-ignore and graph-only flags stay at their defaults.
2112            list_dead: false,
2113            requeue_dead: false,
2114            prune_dead_orphans: false,
2115            ignore_backoff: false,
2116            body_extract_graph_only: false,
2117        };
2118        match super::enrich::run(&enrich_args, llm_backend, embedding_backend) {
2119            Ok(()) => {
2120                output::emit_json_compact(&serde_json::json!({
2121                    "event": "enrich_phase_completed"
2122                }))?;
2123            }
2124            Err(e) => {
2125                tracing::warn!(error = %e, "enrich --operation memory-bindings failed after ingest");
2126                output::emit_json_compact(&serde_json::json!({
2127                    "event": "enrich_phase_failed",
2128                    "error": e.to_string()
2129                }))?;
2130            }
2131        }
2132    }
2133
2134    Ok(())
2135}
2136
2137/// Auto-initialises the database (matches the contract of every other CRUD
2138/// handler) and returns a fresh read/write connection ready for the ingest
2139/// loop. Errors here are recoverable per-file: the caller surfaces them as
2140/// failure events so `--fail-fast` and the continue-on-error path keep
2141/// working when, for example, the user points `--db` at an unwritable path.
2142fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
2143    ensure_db_ready(paths)?;
2144    let conn = open_rw(&paths.db)?;
2145    Ok(conn)
2146}
2147
2148pub(crate) fn collect_files(
2149    dir: &Path,
2150    pattern: &str,
2151    recursive: bool,
2152    out: &mut Vec<PathBuf>,
2153) -> Result<(), AppError> {
2154    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
2155    for entry in entries {
2156        let entry = entry.map_err(AppError::Io)?;
2157        let path = entry.path();
2158        let file_type = entry.file_type().map_err(AppError::Io)?;
2159        if file_type.is_file() {
2160            let name = entry.file_name();
2161            let name_str = name.to_string_lossy();
2162            if matches_pattern(&name_str, pattern) {
2163                out.push(path);
2164            }
2165        } else if file_type.is_dir() && recursive {
2166            collect_files(&path, pattern, recursive, out)?;
2167        }
2168    }
2169    Ok(())
2170}
2171
2172fn matches_pattern(name: &str, pattern: &str) -> bool {
2173    if let Some(suffix) = pattern.strip_prefix('*') {
2174        name.ends_with(suffix)
2175    } else if let Some(prefix) = pattern.strip_suffix('*') {
2176        name.starts_with(prefix)
2177    } else {
2178        name == pattern
2179    }
2180}
2181
2182/// Returns `(final_name, truncated, original_name)`.
2183/// `truncated` is true when the derived name exceeded `max_len`.
2184/// `original_name` holds the pre-truncation name only when `truncated=true`.
2185///
2186/// Non-ASCII characters are first decomposed via NFD and then stripped of
2187/// combining marks so accented letters fold to their base ASCII letter
2188/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
2189/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
2190/// preserves meaningful word content rather than collapsing the basename
2191/// to a few stray ASCII letters as the previous filter did.
2192pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
2193    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
2194    let lowered: String = stem
2195        .nfd()
2196        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
2197        .map(|c| {
2198            if c == '_' || c.is_whitespace() {
2199                '-'
2200            } else {
2201                c
2202            }
2203        })
2204        .map(|c| c.to_ascii_lowercase())
2205        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
2206        .collect();
2207    let collapsed = collapse_dashes(&lowered);
2208    let trimmed_raw = collapsed.trim_matches('-').to_string();
2209    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
2210    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
2211        format!("doc-{trimmed_raw}")
2212    } else {
2213        trimmed_raw
2214    };
2215    if trimmed.len() > max_len {
2216        let truncated = trimmed[..max_len].trim_matches('-').to_string();
2217        // GAP-SG-38: warn (not debug) so the operator sees that a derived name
2218        // was cut at the cap and that any collision will be resolved with a
2219        // numeric disambiguation suffix. The pre-truncation form is also
2220        // surfaced per-file via `IngestFileEvent.original_name`.
2221        tracing::warn!(
2222            target: "ingest",
2223            original = %trimmed,
2224            truncated_to = %truncated,
2225            max_len = max_len,
2226            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
2227        );
2228        (truncated, true, Some(trimmed))
2229    } else {
2230        (trimmed, false, None)
2231    }
2232}
2233
2234/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
2235/// numeric suffix (`-1`, `-2`, …) when needed.
2236///
2237/// `taken` is the set of names already consumed in the current ingest run.
2238/// The caller is expected to insert the returned name into `taken` so the
2239/// next call observes the consumption. Cross-run collisions are intentionally
2240/// surfaced by the per-file persistence path as duplicates so re-ingestion
2241/// of identical corpora stays idempotent.
2242///
2243/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
2244/// candidates collide, signalling a pathological corpus that should be
2245/// renamed manually.
2246fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
2247    if !taken.contains(base) {
2248        return Ok(base.to_string());
2249    }
2250    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
2251        let candidate = format!("{base}-{suffix}");
2252        if !taken.contains(&candidate) {
2253            tracing::warn!(
2254                target: "ingest",
2255                base = %base,
2256                resolved = %candidate,
2257                suffix,
2258                "memory name collision resolved with numeric suffix"
2259            );
2260            return Ok(candidate);
2261        }
2262    }
2263    Err(AppError::Validation(format!(
2264        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
2265    )))
2266}
2267
2268fn collapse_dashes(s: &str) -> String {
2269    let mut out = String::with_capacity(s.len());
2270    let mut prev_dash = false;
2271    for c in s.chars() {
2272        if c == '-' {
2273            if !prev_dash {
2274                out.push('-');
2275            }
2276            prev_dash = true;
2277        } else {
2278            out.push(c);
2279            prev_dash = false;
2280        }
2281    }
2282    out
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287    use super::*;
2288    use std::path::PathBuf;
2289
2290    /// GAP-SG-29: `ingest --mode none --resume` is rejected fail-fast by the
2291    /// mode-conditional validator, which `run()` invokes as its very first
2292    /// statement (before any DB/IO). clap 4.6 derive cannot express a
2293    /// value-conditional conflict (`--mode=none` vs `--resume`) without also
2294    /// breaking the valid `--mode claude-code --resume` combo, so the contract
2295    /// is enforced here instead of at the parser layer.
2296    #[test]
2297    fn ingest_mode_none_with_resume_is_rejected() {
2298        use crate::cli::{Cli, Commands};
2299        use clap::Parser;
2300
2301        let none_resume = Cli::try_parse_from([
2302            "sqlite-graphrag",
2303            "ingest",
2304            "./docs",
2305            "--mode",
2306            "none",
2307            "--resume",
2308        ])
2309        .expect("parse succeeds; the conflict is value-conditional");
2310        let args = match none_resume.command {
2311            Some(Commands::Ingest(a)) => a,
2312            other => panic!("expected ingest, got {other:?}"),
2313        };
2314        assert!(
2315            validate_mode_conditional_flags_ingest(&args).is_err(),
2316            "--mode none + --resume must be rejected fail-fast"
2317        );
2318
2319        // The valid LLM-mode combo is NOT rejected.
2320        let claude_resume = Cli::try_parse_from([
2321            "sqlite-graphrag",
2322            "ingest",
2323            "./docs",
2324            "--mode",
2325            "claude-code",
2326            "--resume",
2327        ])
2328        .expect("parse");
2329        let args = match claude_resume.command {
2330            Some(Commands::Ingest(a)) => a,
2331            other => panic!("expected ingest, got {other:?}"),
2332        };
2333        assert!(
2334            validate_mode_conditional_flags_ingest(&args).is_ok(),
2335            "--mode claude-code + --resume is valid and must pass"
2336        );
2337    }
2338
2339    fn setup_ingest_conn() -> Connection {
2340        crate::storage::connection::register_vec_extension();
2341        let mut conn = Connection::open_in_memory().unwrap();
2342        crate::migrations::runner().run(&mut conn).unwrap();
2343        conn
2344    }
2345
2346    fn make_staged(name: &str, body: &str) -> StagedFile {
2347        StagedFile {
2348            body: body.to_string(),
2349            body_hash: blake3::hash(body.as_bytes()).to_hex().to_string(),
2350            snippet: body.chars().take(200).collect(),
2351            name: name.to_string(),
2352            description: "desc".to_string(),
2353            embedding: None,
2354            chunk_embeddings: None,
2355            chunks_info: Vec::new(),
2356            entities: Vec::new(),
2357            relationships: Vec::new(),
2358            entity_embeddings: None,
2359            urls: Vec::new(),
2360            backend_invoked: None,
2361        }
2362    }
2363
2364    // GAP-SG-54: re-ingesting the same name without --force-merge is a duplicate
2365    // (skipped); with --force-merge it updates in place.
2366    #[test]
2367    fn persist_staged_force_merge_updates_existing() {
2368        let mut conn = setup_ingest_conn();
2369
2370        let first = persist_staged(
2371            &mut conn,
2372            "global",
2373            "document",
2374            make_staged("doc-a", "v1"),
2375            false,
2376        )
2377        .expect("create");
2378        assert_eq!(first.action, "created");
2379
2380        // Same name, no force_merge → Duplicate (skip).
2381        let dup = persist_staged(
2382            &mut conn,
2383            "global",
2384            "document",
2385            make_staged("doc-a", "v2-changed"),
2386            false,
2387        );
2388        assert!(matches!(dup, Err(AppError::Duplicate(_))));
2389
2390        // Same name, force_merge → updated, body refreshed.
2391        let upd = persist_staged(
2392            &mut conn,
2393            "global",
2394            "document",
2395            make_staged("doc-a", "v2-changed"),
2396            true,
2397        )
2398        .expect("update");
2399        assert_eq!(upd.action, "updated");
2400        assert_eq!(upd.memory_id, first.memory_id);
2401        let body: String = conn
2402            .query_row(
2403                "SELECT body FROM memories WHERE id = ?1",
2404                rusqlite::params![first.memory_id],
2405                |r| r.get(0),
2406            )
2407            .unwrap();
2408        assert_eq!(body, "v2-changed");
2409    }
2410
2411    // GAP-SG-55: identical body under a divergent name is deduped (skipped).
2412    #[test]
2413    fn persist_staged_dedupes_by_body_hash() {
2414        let mut conn = setup_ingest_conn();
2415        persist_staged(
2416            &mut conn,
2417            "global",
2418            "document",
2419            make_staged("parte-1", "identical content"),
2420            false,
2421        )
2422        .expect("create");
2423
2424        // Divergent derived name, same content → skipped as duplicate.
2425        let res = persist_staged(
2426            &mut conn,
2427            "global",
2428            "document",
2429            make_staged("part-01", "identical content"),
2430            false,
2431        );
2432        match res {
2433            Err(AppError::Duplicate(msg)) => assert!(msg.contains("body_hash")),
2434            other => panic!("expected body_hash dedup duplicate, got {other:?}"),
2435        }
2436        // Only one memory persisted.
2437        let n: i64 = conn
2438            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2439            .unwrap();
2440        assert_eq!(n, 1);
2441    }
2442
2443    // GAP-SG-54: `ingest --force-merge` parses and sets the update flag.
2444    #[test]
2445    fn ingest_force_merge_flag_parses() {
2446        use crate::cli::{Cli, Commands};
2447        use clap::Parser;
2448        let cli = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs", "--force-merge"])
2449            .expect("parse");
2450        match cli.command {
2451            Some(Commands::Ingest(a)) => assert!(a.force_merge),
2452            other => panic!("expected ingest, got {other:?}"),
2453        }
2454        // Default is off.
2455        let cli2 = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs"]).expect("parse");
2456        match cli2.command {
2457            Some(Commands::Ingest(a)) => assert!(!a.force_merge),
2458            other => panic!("expected ingest, got {other:?}"),
2459        }
2460    }
2461
2462    #[test]
2463    fn matches_pattern_suffix() {
2464        assert!(matches_pattern("foo.md", "*.md"));
2465        assert!(!matches_pattern("foo.txt", "*.md"));
2466        assert!(matches_pattern("foo.md", "*"));
2467    }
2468
2469    #[test]
2470    fn matches_pattern_prefix() {
2471        assert!(matches_pattern("README.md", "README*"));
2472        assert!(!matches_pattern("CHANGELOG.md", "README*"));
2473    }
2474
2475    #[test]
2476    fn matches_pattern_exact() {
2477        assert!(matches_pattern("README.md", "README.md"));
2478        assert!(!matches_pattern("readme.md", "README.md"));
2479    }
2480
2481    #[test]
2482    fn derive_kebab_underscore_to_dash() {
2483        let p = PathBuf::from("/tmp/claude_code_headless.md");
2484        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2485        assert_eq!(name, "claude-code-headless");
2486        assert!(!truncated);
2487        assert!(original.is_none());
2488    }
2489
2490    #[test]
2491    fn derive_kebab_uppercase_lowered() {
2492        let p = PathBuf::from("/tmp/README.md");
2493        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2494        assert_eq!(name, "readme");
2495        assert!(!truncated);
2496        assert!(original.is_none());
2497    }
2498
2499    #[test]
2500    fn derive_kebab_strips_non_kebab_chars() {
2501        let p = PathBuf::from("/tmp/some@weird#name!.md");
2502        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2503        assert_eq!(name, "someweirdname");
2504        assert!(!truncated);
2505        assert!(original.is_none());
2506    }
2507
2508    // Bug M-A3: NFD-based unicode normalization preserves base letters of
2509    // accented characters instead of dropping them entirely.
2510    #[test]
2511    fn derive_kebab_folds_accented_letters_to_ascii() {
2512        let p = PathBuf::from("/tmp/açaí.md");
2513        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2514        assert_eq!(name, "acai", "got '{name}'");
2515    }
2516
2517    #[test]
2518    fn derive_kebab_handles_naive_with_diaeresis() {
2519        let p = PathBuf::from("/tmp/naïve-test.md");
2520        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2521        assert_eq!(name, "naive-test", "got '{name}'");
2522    }
2523
2524    #[test]
2525    fn derive_kebab_drops_emoji_keeps_word() {
2526        let p = PathBuf::from("/tmp/🚀-rocket.md");
2527        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2528        assert_eq!(name, "rocket", "got '{name}'");
2529    }
2530
2531    #[test]
2532    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
2533        let p = PathBuf::from("/tmp/açaí🦜.md");
2534        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2535        assert_eq!(name, "acai", "got '{name}'");
2536    }
2537
2538    #[test]
2539    fn derive_kebab_pure_emoji_yields_empty() {
2540        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
2541        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2542        assert!(name.is_empty(), "got '{name}'");
2543    }
2544
2545    #[test]
2546    fn derive_kebab_collapses_consecutive_dashes() {
2547        let p = PathBuf::from("/tmp/a__b___c.md");
2548        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2549        assert_eq!(name, "a-b-c");
2550        assert!(!truncated);
2551        assert!(original.is_none());
2552    }
2553
2554    #[test]
2555    fn derive_kebab_truncates_to_60_chars() {
2556        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2557        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2558        assert!(name.len() <= 60, "got len {}", name.len());
2559        assert!(truncated);
2560        assert!(original.is_some());
2561        assert!(original.unwrap().len() > 60);
2562    }
2563
2564    #[test]
2565    fn collect_files_finds_md_files() {
2566        let tmp = tempfile::tempdir().expect("tempdir");
2567        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2568        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2569        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2570        let mut out = Vec::new();
2571        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2572        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2573    }
2574
2575    #[test]
2576    fn collect_files_recursive_descends_subdirs() {
2577        let tmp = tempfile::tempdir().expect("tempdir");
2578        let sub = tmp.path().join("sub");
2579        std::fs::create_dir(&sub).unwrap();
2580        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2581        std::fs::write(sub.join("b.md"), "y").unwrap();
2582        let mut out = Vec::new();
2583        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2584        assert_eq!(out.len(), 2);
2585    }
2586
2587    #[test]
2588    fn collect_files_non_recursive_skips_subdirs() {
2589        let tmp = tempfile::tempdir().expect("tempdir");
2590        let sub = tmp.path().join("sub");
2591        std::fs::create_dir(&sub).unwrap();
2592        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2593        std::fs::write(sub.join("b.md"), "y").unwrap();
2594        let mut out = Vec::new();
2595        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2596        assert_eq!(out.len(), 1);
2597    }
2598
2599    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
2600
2601    #[test]
2602    fn derive_kebab_long_basename_truncated_within_cap() {
2603        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2604        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2605        assert!(
2606            name.len() <= DERIVED_NAME_MAX_LEN,
2607            "truncated name must respect cap; got {} chars",
2608            name.len()
2609        );
2610        assert!(!name.is_empty());
2611        assert!(truncated);
2612        assert!(original.is_some());
2613    }
2614
2615    #[test]
2616    fn unique_name_returns_base_when_free() {
2617        let taken: BTreeSet<String> = BTreeSet::new();
2618        let resolved = unique_name("note", &taken).expect("must resolve");
2619        assert_eq!(resolved, "note");
2620    }
2621
2622    #[test]
2623    fn unique_name_appends_first_free_suffix_on_collision() {
2624        let mut taken: BTreeSet<String> = BTreeSet::new();
2625        taken.insert("note".to_string());
2626        taken.insert("note-1".to_string());
2627        let resolved = unique_name("note", &taken).expect("must resolve");
2628        assert_eq!(resolved, "note-2");
2629    }
2630
2631    #[test]
2632    fn unique_name_errors_after_collision_cap() {
2633        let mut taken: BTreeSet<String> = BTreeSet::new();
2634        taken.insert("note".to_string());
2635        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2636            taken.insert(format!("note-{i}"));
2637        }
2638        let err = unique_name("note", &taken).expect_err("must surface error");
2639        assert!(matches!(err, AppError::Validation(_)));
2640    }
2641
2642    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
2643
2644    #[test]
2645    fn validate_relation_format_accepts_valid_relations() {
2646        use crate::parsers::{is_canonical_relation, validate_relation_format};
2647        assert!(validate_relation_format("applies_to").is_ok());
2648        assert!(validate_relation_format("depends_on").is_ok());
2649        assert!(validate_relation_format("implements").is_ok());
2650        assert!(validate_relation_format("").is_err());
2651        assert!(is_canonical_relation("applies_to"));
2652        assert!(!is_canonical_relation("implements"));
2653    }
2654
2655    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
2656
2657    use serial_test::serial;
2658
2659    /// Helper: scrubs the env var around a closure to keep tests deterministic.
2660    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2661        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2662        let prev = std::env::var(key).ok();
2663        match value {
2664            Some(v) => std::env::set_var(key, v),
2665            None => std::env::remove_var(key),
2666        }
2667        f();
2668        match prev {
2669            Some(p) => std::env::set_var(key, p),
2670            None => std::env::remove_var(key),
2671        }
2672    }
2673
2674    #[test]
2675    #[serial]
2676    fn env_low_memory_enabled_unset_returns_false() {
2677        with_env_var(None, || assert!(!env_low_memory_enabled()));
2678    }
2679
2680    #[test]
2681    #[serial]
2682    fn env_low_memory_enabled_empty_returns_false() {
2683        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2684    }
2685
2686    #[test]
2687    #[serial]
2688    fn env_low_memory_enabled_truthy_values_return_true() {
2689        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2690            with_env_var(Some(v), || {
2691                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2692            });
2693        }
2694    }
2695
2696    #[test]
2697    #[serial]
2698    fn env_low_memory_enabled_falsy_values_return_false() {
2699        for v in ["0", "false", "FALSE", "no", "off"] {
2700            with_env_var(Some(v), || {
2701                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2702            });
2703        }
2704    }
2705
2706    #[test]
2707    #[serial]
2708    fn env_low_memory_enabled_unrecognized_value_returns_false() {
2709        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2710    }
2711
2712    #[test]
2713    #[serial]
2714    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2715        with_env_var(None, || {
2716            assert_eq!(resolve_parallelism(true, Some(4)), 1);
2717            assert_eq!(resolve_parallelism(true, Some(8)), 1);
2718            assert_eq!(resolve_parallelism(true, None), 1);
2719        });
2720    }
2721
2722    #[test]
2723    #[serial]
2724    fn resolve_parallelism_env_forces_one_when_flag_off() {
2725        with_env_var(Some("1"), || {
2726            assert_eq!(resolve_parallelism(false, Some(4)), 1);
2727            assert_eq!(resolve_parallelism(false, None), 1);
2728        });
2729    }
2730
2731    #[test]
2732    #[serial]
2733    fn resolve_parallelism_falsy_env_does_not_override() {
2734        with_env_var(Some("0"), || {
2735            assert_eq!(resolve_parallelism(false, Some(4)), 4);
2736        });
2737    }
2738
2739    #[test]
2740    #[serial]
2741    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2742        with_env_var(None, || {
2743            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2744            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2745        });
2746    }
2747
2748    #[test]
2749    #[serial]
2750    fn resolve_parallelism_default_when_unset() {
2751        with_env_var(None, || {
2752            let p = resolve_parallelism(false, None);
2753            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2754        });
2755    }
2756
2757    #[test]
2758    fn ingest_args_parses_low_memory_flag_via_clap() {
2759        use clap::Parser;
2760        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2761        // on the public `Cli` definition so the flag is wired end-to-end.
2762        let cli = crate::cli::Cli::try_parse_from([
2763            "sqlite-graphrag",
2764            "ingest",
2765            "/tmp/dummy",
2766            "--type",
2767            "document",
2768            "--low-memory",
2769        ])
2770        .expect("parse must succeed");
2771        match cli.command {
2772            Some(crate::cli::Commands::Ingest(args)) => {
2773                assert!(args.low_memory, "--low-memory must set field to true");
2774            }
2775            _ => panic!("expected Ingest subcommand"),
2776        }
2777    }
2778
2779    #[test]
2780    fn ingest_args_low_memory_defaults_false() {
2781        use clap::Parser;
2782        let cli = crate::cli::Cli::try_parse_from([
2783            "sqlite-graphrag",
2784            "ingest",
2785            "/tmp/dummy",
2786            "--type",
2787            "document",
2788        ])
2789        .expect("parse must succeed");
2790        match cli.command {
2791            Some(crate::cli::Commands::Ingest(args)) => {
2792                assert!(!args.low_memory, "default must be false");
2793            }
2794            _ => panic!("expected Ingest subcommand"),
2795        }
2796    }
2797
2798    // ── GAP-SG-06: --dry-run reports chunk and token counts ──
2799
2800    #[test]
2801    fn dry_run_budget_event_serializes_chunk_and_token_counts() {
2802        let ev = IngestDryRunBudget {
2803            budget: true,
2804            file: "/tmp/doc.md",
2805            name: "doc",
2806            bytes: 1234,
2807            chunk_count: 3,
2808            token_count: 567,
2809            partition_count: 1,
2810            exceeds_limits: false,
2811        };
2812        let json = serde_json::to_string(&ev).expect("serialize budget event");
2813        assert!(json.contains("\"chunk_count\":3"), "got: {json}");
2814        assert!(json.contains("\"token_count\":567"), "got: {json}");
2815        assert!(json.contains("\"partition_count\":1"), "got: {json}");
2816        assert!(json.contains("\"exceeds_limits\":false"), "got: {json}");
2817    }
2818
2819    #[test]
2820    fn assess_body_budget_feeds_dry_run_with_positive_counts() {
2821        // The dry-run path feeds chunking::assess_body_budget; a representative
2822        // body must report a positive chunk and token count.
2823        let body = "# Title\n\nsome representative body text for the budget.";
2824        let budget = chunking::assess_body_budget(body);
2825        assert!(budget.chunk_count >= 1);
2826        assert!(budget.approx_tokens >= 1);
2827        assert_eq!(budget.partition_count, 1);
2828    }
2829}