Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Namespace derived names with a kebab-case prefix (projx-<derived>)\n  \
67    sqlite-graphrag ingest ./docs --name-prefix projx- --dry-run\n\n  \
68    # Enable automatic URL extraction (URL-regex only since v1.0.79)\n  \
69    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
70    # Preview file-to-name mapping without ingesting\n  \
71    sqlite-graphrag ingest ./docs --dry-run\n\n  \
72    # LLM-curated extraction via Claude Code CLI\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
74    # Resume interrupted claude-code ingest\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
76    # Claude Code with budget cap and custom timeout\n  \
77    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
78AUTHENTICATION:\n  \
79    --mode claude-code: Uses existing Claude Code authentication.\n  \
80      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
81      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
82    --mode codex: Uses existing Codex CLI authentication.\n  \
83      Device auth: run `codex auth login` first\n  \
84      API key: set OPENAI_API_KEY (optional)\n\n  \
85NOTES:\n  \
86    Each file becomes a separate memory. Names derive from file basenames\n  \
87    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
88    followed by a final summary line with counts. Per-file errors are reported\n  \
89    inline and processing continues unless --fail-fast is set.")]
90pub struct IngestArgs {
91    /// Directory containing files to ingest.
92    #[arg(
93        value_name = "DIR",
94        help = "Directory to ingest recursively (each matching file becomes a memory)"
95    )]
96    pub dir: PathBuf,
97
98    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
99    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
100    pub r#type: MemoryType,
101
102    /// Glob pattern matched against file basenames (default: `*.md`). Supports
103    /// `*.<ext>`, `<prefix>*`, and exact filename match.
104    #[arg(long, default_value = "*.md")]
105    pub pattern: String,
106
107    /// Recurse into subdirectories.
108    #[arg(long, default_value_t = false)]
109    pub recursive: bool,
110
111    #[arg(
112        long,
113        env = "SQLITE_GRAPHRAG_ENABLE_NER",
114        value_parser = crate::parsers::parse_bool_flexible,
115        action = clap::ArgAction::Set,
116        num_args = 0..=1,
117        default_missing_value = "true",
118        default_value = "false",
119        help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
120    )]
121    pub enable_ner: bool,
122
123    /// GAP-E2E-011: generates a heuristic description from the first meaningful
124    /// line of the body, instead of "ingested from `<path>`". When
125    /// `--no-auto-describe` is passed, keeps the legacy behaviour.
126    #[arg(
127        long,
128        default_value_t = true,
129        overrides_with = "no_auto_describe",
130        help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
131    )]
132    pub auto_describe: bool,
133    #[arg(
134        long = "no-auto-describe",
135        default_value_t = false,
136        help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
137    )]
138    pub no_auto_describe: bool,
139    #[arg(
140        long,
141        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
142        default_value = "fp32",
143        help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
144    )]
145    pub gliner_variant: String,
146
147    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
148    #[arg(long, default_value_t = false, hide = true)]
149    pub skip_extraction: bool,
150
151    /// Stop on first per-file error instead of continuing with the next file.
152    #[arg(long, default_value_t = false)]
153    pub fail_fast: bool,
154
155    /// Preview file-to-name mapping without loading model or persisting.
156    #[arg(long, default_value_t = false)]
157    pub dry_run: bool,
158
159    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
160    #[arg(long, default_value_t = 10_000)]
161    pub max_files: usize,
162
163    /// Namespace for the ingested memories.
164    #[arg(long)]
165    pub namespace: Option<String>,
166
167    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
168    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
169    pub db: Option<String>,
170
171    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
172    pub format: JsonOutputFormat,
173
174    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
175    pub json: bool,
176
177    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
178    #[arg(
179        long,
180        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
181    )]
182    pub ingest_parallelism: Option<usize>,
183
184    /// Force single-threaded ingest to reduce RSS pressure.
185    ///
186    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
187    /// explicit value. Recommended for environments with <4 GB available
188    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
189    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
190    /// (CLI flag has higher precedence than the env var).
191    #[arg(
192        long,
193        default_value_t = false,
194        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
195                Recommended for environments with <4 GB available RAM or container/cgroup \
196                constraints. Trade-off: 3-4x longer wall time. Also honored via \
197                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
198    )]
199    pub low_memory: bool,
200
201    /// Maximum process RSS in MiB; abort if exceeded during embedding.
202    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
203          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
204    pub max_rss_mb: u64,
205
206    /// G42/S3 (v1.0.79): maximum simultaneous LLM embedding subprocesses
207    /// PER FILE. Multiplies with --ingest-parallelism (files staged
208    /// concurrently), hence the conservative default of 2. The effective
209    /// value is further bounded by CPU count and available RAM.
210    #[arg(long, default_value_t = 2, value_name = "N",
211          value_parser = clap::value_parser!(u64).range(1..=32),
212          help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
213    pub llm_parallelism: u64,
214
215    /// Maximum character length for derived memory names from file basenames.
216    ///
217    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
218    /// Shorter values leave more headroom for collision suffix resolution.
219    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
220          help = "Maximum length for derived memory names (default: 60)")]
221    pub max_name_length: usize,
222
223    /// v1.1.1 (P12): kebab-case prefix prepended to every derived memory name,
224    /// AFTER the basename is normalized. Namespaces a corpus inside a shared
225    /// database (e.g. `--name-prefix projx-` yields `projx-<derived>`). The
226    /// derived part's budget shrinks so the final name always respects the
227    /// 80-char name cap. Only supported with `--mode none` or `gliner`.
228    #[arg(
229        long,
230        value_name = "PREFIX",
231        help = "Kebab-case prefix applied to every derived memory name (e.g. 'projx-')"
232    )]
233    pub name_prefix: Option<String>,
234
235    /// Extraction mode: `none` (body-only, default), `claude-code`/`codex` (LLM-curated), or `gliner` (DEPRECATED: URL-regex only since v1.0.79).
236    #[arg(long, value_enum, default_value_t = IngestMode::None)]
237    pub mode: IngestMode,
238
239    /// Explicit path to the Claude Code binary (only with --mode claude-code).
240    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
241    pub claude_binary: Option<std::path::PathBuf>,
242
243    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
244    #[arg(long)]
245    pub claude_model: Option<String>,
246
247    /// Resume a previously interrupted claude-code ingest from the queue DB.
248    #[arg(long, default_value_t = false)]
249    pub resume: bool,
250
251    /// Retry only failed files from a previous claude-code ingest.
252    #[arg(long, default_value_t = false)]
253    pub retry_failed: bool,
254
255    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
256    #[arg(long, default_value_t = false)]
257    pub keep_queue: bool,
258
259    /// Custom path for the ingest queue DB. Default: alongside the --db database.
260    #[arg(long)]
261    pub queue_db: Option<String>,
262
263    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
264    #[arg(long, default_value_t = 60)]
265    pub rate_limit_wait: u64,
266
267    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
268    #[arg(long)]
269    pub max_cost_usd: Option<f64>,
270
271    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
272    #[arg(
273        long,
274        default_value_t = 300,
275        help = "Timeout in seconds for each claude -p invocation (default: 300)"
276    )]
277    pub claude_timeout: u64,
278
279    /// Explicit path to the Codex CLI binary (only with --mode codex).
280    #[arg(
281        long,
282        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
283        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
284    )]
285    pub codex_binary: Option<PathBuf>,
286
287    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
288    #[arg(
289        long,
290        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
291    )]
292    pub codex_model: Option<String>,
293
294    /// Timeout in seconds for each codex exec invocation.
295    #[arg(
296        long,
297        default_value_t = 300,
298        help = "Timeout in seconds for each codex exec invocation (default: 300)"
299    )]
300    pub codex_timeout: u64,
301
302    /// Path to the `opencode` binary (override PATH lookup, only with --mode opencode).
303    #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
304    pub opencode_binary: Option<PathBuf>,
305
306    /// Model override for OpenCode extraction.
307    #[arg(
308        long,
309        value_name = "MODEL",
310        env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
311        help = "Model override for OpenCode extraction"
312    )]
313    pub opencode_model: Option<String>,
314
315    /// Timeout in seconds for each opencode run invocation.
316    #[arg(
317        long,
318        value_name = "SECONDS",
319        env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
320        default_value_t = 300,
321        help = "Timeout in seconds for each opencode run invocation (default: 300)"
322    )]
323    pub opencode_timeout: u64,
324
325    /// G30: poll for the job singleton every second for up to N seconds
326    /// when another invocation holds the lock. Default: 0 (fail fast).
327    #[arg(long, value_name = "SECONDS")]
328    pub wait_job_singleton: Option<u64>,
329
330    /// G30: force acquisition of the singleton lock by removing a stale
331    /// lock file from a previously crashed invocation.
332    #[arg(long, default_value_t = false)]
333    pub force_job_singleton: bool,
334
335    /// v1.0.93 (GAP-OR-INGEST): run `enrich --operation memory-bindings`
336    /// after all files are embedded, using the active `--llm-backend`.
337    #[arg(
338        long,
339        default_value_t = false,
340        help = "Run enrich --operation memory-bindings after all files are ingested"
341    )]
342    pub enrich_after: bool,
343
344    /// GAP-SG-54: update existing memories instead of skipping them. Without
345    /// this flag a file whose derived name already exists is reported `skipped`;
346    /// with it the existing memory's body, embedding and chunks are refreshed
347    /// (the `remember --force-merge` update path applied per file).
348    #[arg(
349        long,
350        default_value_t = false,
351        help = "Update existing memories on name collision instead of skipping (idempotent re-ingest)"
352    )]
353    pub force_merge: bool,
354}
355
356/// Extraction mode for the ingest pipeline.
357#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
358pub enum IngestMode {
359    /// Body-only ingestion without entity/relationship extraction (default).
360    None,
361    /// DEPRECATED: URL-regex extraction only since v1.0.79 (the GLiNER pipeline was removed; requires --enable-ner).
362    Gliner,
363    /// LLM-curated extraction via locally installed Claude Code CLI.
364    ClaudeCode,
365    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
366    Codex,
367    /// LLM-curated extraction via locally installed OpenCode CLI.
368    #[value(name = "opencode")]
369    Opencode,
370}
371
372/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
373/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
374/// values evaluate to false. Unrecognized non-empty values emit a
375/// `tracing::warn!` and evaluate to false.
376fn env_low_memory_enabled() -> bool {
377    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
378        Ok(v) if v.is_empty() => false,
379        Ok(v) => match v.to_lowercase().as_str() {
380            "1" | "true" | "yes" | "on" => true,
381            "0" | "false" | "no" | "off" => false,
382            other => {
383                tracing::warn!(
384                    target: "ingest",
385                    value = %other,
386                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
387                );
388                false
389            }
390        },
391        Err(_) => false,
392    }
393}
394
395/// Resolves the effective ingest parallelism honoring `--low-memory` and the
396/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
397///
398/// Precedence:
399/// 1. `--low-memory` CLI flag forces parallelism = 1.
400/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
401/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
402/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
403///
404/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
405/// emits a `tracing::warn!` advertising the override.
406fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
407    let env_flag = env_low_memory_enabled();
408    let low_memory = low_memory_flag || env_flag;
409
410    if low_memory {
411        if let Some(n) = ingest_parallelism {
412            if n > 1 {
413                tracing::warn!(
414                    target: "ingest",
415                    requested = n,
416                    "--ingest-parallelism overridden by --low-memory; using 1"
417                );
418            }
419        }
420        if low_memory_flag {
421            tracing::info!(
422                target: "ingest",
423                source = "flag",
424                "low-memory mode enabled: forcing --ingest-parallelism 1"
425            );
426        } else {
427            tracing::info!(
428                target: "ingest",
429                source = "env",
430                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
431            );
432        }
433        return 1;
434    }
435
436    ingest_parallelism
437        .unwrap_or_else(|| {
438            std::thread::available_parallelism()
439                .map(|v| v.get() / 2)
440                .unwrap_or(1)
441                .clamp(1, 4)
442        })
443        .max(1)
444}
445
446#[derive(Serialize)]
447struct IngestFileEvent<'a> {
448    file: &'a str,
449    name: &'a str,
450    status: &'a str,
451    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
452    truncated: bool,
453    /// Original derived name before truncation; only present when `truncated=true`.
454    #[serde(skip_serializing_if = "Option::is_none")]
455    original_name: Option<String>,
456    /// Original file basename (without extension); only present when it differs from `name`.
457    #[serde(skip_serializing_if = "Option::is_none")]
458    original_filename: Option<&'a str>,
459    #[serde(skip_serializing_if = "Option::is_none")]
460    error: Option<String>,
461    #[serde(skip_serializing_if = "Option::is_none")]
462    memory_id: Option<i64>,
463    #[serde(skip_serializing_if = "Option::is_none")]
464    action: Option<String>,
465    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
466    body_length: usize,
467    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
468    /// ran the live embedding. `"claude" | "codex" | "none"`. Absent on
469    /// the wire when `None` (kept for happy-path envelope cleanliness, or
470    /// when the file never reached the embed phase due to duplication/error).
471    #[serde(skip_serializing_if = "Option::is_none")]
472    backend_invoked: Option<&'a str>,
473}
474
475/// GAP-SG-06: per-file budget assessment emitted during `--dry-run` so the
476/// operator sees chunk and token counts (and how many sub-memories an
477/// auto-split would create) before running a real ingest.
478#[derive(Serialize)]
479struct IngestDryRunBudget<'a> {
480    budget: bool,
481    file: &'a str,
482    name: &'a str,
483    bytes: usize,
484    chunk_count: usize,
485    token_count: usize,
486    partition_count: usize,
487    exceeds_limits: bool,
488}
489
490#[derive(Serialize)]
491struct IngestSummary {
492    summary: bool,
493    dir: String,
494    pattern: String,
495    recursive: bool,
496    files_total: usize,
497    files_succeeded: usize,
498    files_failed: usize,
499    files_skipped: usize,
500    elapsed_ms: u64,
501}
502
503/// Outcome of a successful per-file ingest, used to build the NDJSON event.
504#[derive(Debug)]
505struct FileSuccess {
506    memory_id: i64,
507    action: String,
508    body_length: usize,
509    backend_invoked: Option<&'static str>,
510}
511
512/// NDJSON progress event emitted to stderr after each file completes Phase A.
513/// Schema version 1; consumers should check `schema_version` before parsing.
514#[derive(Serialize)]
515struct StageProgressEvent<'a> {
516    schema_version: u8,
517    event: &'a str,
518    path: &'a str,
519    ms: u64,
520    entities: usize,
521    relationships: usize,
522}
523
524/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
525/// Phase B persists these to SQLite on the main thread in submission order.
526struct StagedFile {
527    body: String,
528    body_hash: String,
529    snippet: String,
530    name: String,
531    description: String,
532    embedding: Option<Vec<f32>>,
533    chunk_embeddings: Option<Vec<Vec<f32>>>,
534    chunks_info: Vec<crate::chunking::Chunk>,
535    entities: Vec<NewEntity>,
536    relationships: Vec<NewRelationship>,
537    entity_embeddings: Option<Vec<Vec<f32>>>,
538    urls: Vec<crate::extraction::ExtractedUrl>,
539    /// v1.0.84 (ADR-0042): discriminator of the LLM backend that actually
540    /// ran the body embedding. `None` when the parallel batch
541    /// embed_passages_parallel_local fell back to different backends
542    /// across chunks (there is no single stable discriminator).
543    backend_invoked: Option<&'static str>,
544}
545
546/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
547/// Never touches the database — safe to run on any rayon thread.
548// G42/S3 added `llm_parallelism` as the 8th parameter; grouping the
549// stage knobs into a struct is a wider refactor than the surgical
550// scope of v1.0.79 allows.
551#[allow(clippy::too_many_arguments)]
552fn stage_file(
553    _idx: usize,
554    path: &Path,
555    name: &str,
556    paths: &AppPaths,
557    enable_ner: bool,
558    gliner_variant: crate::extraction::GlinerVariant,
559    max_rss_mb: u64,
560    llm_parallelism: usize,
561    llm_backend: crate::cli::LlmBackendChoice,
562    embedding_backend: crate::cli::EmbeddingBackendChoice,
563    auto_describe: bool,
564) -> Result<Vec<StagedFile>, AppError> {
565    use crate::constants::*;
566
567    if name.len() > MAX_MEMORY_NAME_LEN {
568        return Err(AppError::LimitExceeded(
569            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
570        ));
571    }
572    if name.starts_with("__") {
573        return Err(AppError::Validation(
574            crate::i18n::validation::reserved_name(),
575        ));
576    }
577    {
578        let slug_re = crate::constants::name_slug_regex();
579        if !slug_re.is_match(name) {
580            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
581                name,
582            )));
583        }
584    }
585
586    let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
587    if file_size > MAX_MEMORY_BODY_LEN as u64 {
588        return Err(AppError::BodyTooLarge {
589            bytes: file_size,
590            limit: MAX_MEMORY_BODY_LEN as u64,
591        });
592    }
593    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
594    if raw_body.len() > MAX_MEMORY_BODY_LEN {
595        return Err(AppError::BodyTooLarge {
596            bytes: raw_body.len() as u64,
597            limit: MAX_MEMORY_BODY_LEN as u64,
598        });
599    }
600    if raw_body.trim().is_empty() {
601        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
602    }
603
604    let description = if auto_describe {
605        crate::commands::ingest_heuristics::extract_heuristic_description(
606            &raw_body,
607            Some(&path.display().to_string()),
608        )
609    } else {
610        format!("ingested from {}", path.display())
611    };
612    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
613        return Err(AppError::Validation(
614            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
615        ));
616    }
617
618    // GAP-SG-04/07: auto-split a body that exceeds the single-memory budgets
619    // (bytes, chunk count, token count) into section-aligned sub-memories so
620    // ingestion never fails on an oversized document. A body that fits returns
621    // a single partition under the original name.
622    let partitions = chunking::split_body_by_sections(&raw_body);
623    let total_parts = partitions.len();
624    let mut staged = Vec::with_capacity(total_parts);
625    for (part_idx, part_body) in partitions.into_iter().enumerate() {
626        let part_name = if total_parts == 1 {
627            name.to_string()
628        } else {
629            format!("{name}-part-{}", part_idx + 1)
630        };
631        if part_name.len() > MAX_MEMORY_NAME_LEN {
632            return Err(AppError::LimitExceeded(
633                crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
634            ));
635        }
636        let part_description = if total_parts == 1 {
637            description.clone()
638        } else {
639            partition_description(&description, part_idx + 1, total_parts)
640        };
641        staged.push(stage_one_body(
642            part_body,
643            part_name,
644            part_description,
645            paths,
646            enable_ner,
647            gliner_variant,
648            max_rss_mb,
649            llm_parallelism,
650            llm_backend,
651            embedding_backend,
652        )?);
653    }
654    Ok(staged)
655}
656
657/// Builds a partition description by appending a `(part i/n)` marker, trimming
658/// the base (on a char boundary) when the marker would push it past
659/// [`crate::constants::MAX_MEMORY_DESCRIPTION_LEN`].
660fn partition_description(base: &str, part: usize, total: usize) -> String {
661    let suffix = format!(" (part {part}/{total})");
662    let max = crate::constants::MAX_MEMORY_DESCRIPTION_LEN;
663    if base.len() + suffix.len() <= max {
664        return format!("{base}{suffix}");
665    }
666    let mut cut = max.saturating_sub(suffix.len()).min(base.len());
667    while cut > 0 && !base.is_char_boundary(cut) {
668        cut -= 1;
669    }
670    format!("{}{}", &base[..cut], suffix)
671}
672
673/// Stages a single body (one memory) into a [`StagedFile`]: NER extraction,
674/// chunking, embedding and entity embedding. Extracted from `stage_file` so the
675/// GAP-SG-04/07 auto-split path stages each partition independently.
676#[allow(clippy::too_many_arguments)]
677fn stage_one_body(
678    raw_body: String,
679    name: String,
680    description: String,
681    paths: &AppPaths,
682    enable_ner: bool,
683    gliner_variant: crate::extraction::GlinerVariant,
684    max_rss_mb: u64,
685    llm_parallelism: usize,
686    llm_backend: crate::cli::LlmBackendChoice,
687    embedding_backend: crate::cli::EmbeddingBackendChoice,
688) -> Result<StagedFile, AppError> {
689    use crate::constants::*;
690
691    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
692    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
693    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
694    if enable_ner {
695        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
696            Ok(extracted) => {
697                extracted_urls = extracted.urls;
698                // v1.0.76: ExtractionResult.entities is now
699                // Vec<ExtractedEntity>, not Vec<NewEntity>. Convert
700                // via name + type only; start/end offsets are not
701                // carried forward into the storage layer.
702                extracted_entities = extracted
703                    .entities
704                    .into_iter()
705                    .map(|e| NewEntity {
706                        name: e.name,
707                        entity_type: crate::entity_type::EntityType::Concept,
708                        description: None,
709                    })
710                    .collect();
711                // v1.0.76: relationships are no longer in the
712                // ExtractionResult struct; the LLM backend returns
713                // them in its own payload. The default build is
714                // URL-only extraction.
715                extracted_relationships.clear();
716
717                if extracted_entities.len() > max_entities_per_memory() {
718                    extracted_entities.truncate(max_entities_per_memory());
719                }
720                if extracted_relationships.len() > max_relationships_per_memory() {
721                    extracted_relationships.truncate(max_relationships_per_memory());
722                }
723            }
724            Err(e) => {
725                tracing::warn!(
726                    target: "ingest",
727                    file = %name,
728                    "auto-extraction failed (graceful degradation): {e:#}"
729                );
730            }
731        }
732    }
733
734    for rel in &mut extracted_relationships {
735        rel.relation = crate::parsers::normalize_relation(&rel.relation);
736        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
737            return Err(AppError::Validation(format!(
738                "{e} for relationship '{}' -> '{}'",
739                rel.source, rel.target
740            )));
741        }
742        crate::parsers::warn_if_non_canonical(&rel.relation);
743        if !(0.0..=1.0).contains(&rel.strength) {
744            return Err(AppError::Validation(format!(
745                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
746                rel.strength, rel.source, rel.target
747            )));
748        }
749    }
750
751    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
752    let snippet: String = raw_body.chars().take(200).collect();
753
754    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
755    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
756        return Err(AppError::TooManyChunks {
757            chunks: chunks_info.len(),
758            limit: REMEMBER_MAX_SAFE_MULTI_CHUNKS,
759        });
760    }
761
762    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
763    let skip_embed = crate::embedder::should_skip_embedding_on_failure();
764    // v1.0.84 (ADR-0042): tuple (Vec<f32>, LlmBackendKind) — extrai o
765    // backend que efetivamente rodou para popular `backend_invoked` no
766    // envelope NDJSON por arquivo.
767    let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
768        .len()
769        == 1
770    {
771        match crate::embedder::embed_passage_with_embedding_choice(
772            &paths.models,
773            &raw_body,
774            embedding_backend,
775            llm_backend,
776        ) {
777            Ok((v, k)) => (Some(v), Some(k.as_str())),
778            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
779            Err(e) if skip_embed => {
780                tracing::warn!(error = %e, file = %name, "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
781                (None, None)
782            }
783            Err(e) => return Err(e),
784        }
785    } else {
786        // G42/S2+S3 (v1.0.79): batched bounded fan-out replaces the
787        // serial per-chunk subprocess loop.
788        let chunk_texts: Vec<String> = chunks_info
789            .iter()
790            .map(|c| chunking::chunk_text(&raw_body, c).to_string())
791            .collect();
792        if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
793            if rss > max_rss_mb {
794                tracing::error!(
795                    target: "ingest",
796                    rss_mb = rss,
797                    max_rss_mb = max_rss_mb,
798                    file = %name,
799                    "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
800                );
801                return Err(AppError::LowMemory {
802                    available_mb: crate::memory_guard::available_memory_mb(),
803                    required_mb: max_rss_mb,
804                });
805            }
806        }
807        match crate::embedder::embed_passages_parallel_with_embedding_choice(
808            &paths.models,
809            &chunk_texts,
810            llm_parallelism,
811            crate::embedder::chunk_embed_batch_size(),
812            embedding_backend,
813            llm_backend,
814        ) {
815            Ok(chunk_embeddings) => {
816                let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
817                chunk_embeddings_opt = Some(chunk_embeddings);
818                // v1.0.84 (ADR-0042): batch paralelo não retorna discriminador
819                // único por chamada. Conservadoramente, populamos None aqui.
820                (Some(aggregated), None)
821            }
822            Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
823            Err(e) if skip_embed => {
824                tracing::warn!(error = %e, file = %name, "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
825                (None, None)
826            }
827            Err(e) => return Err(e),
828        }
829    };
830
831    // G42/S2+A4 (v1.0.79): entity names use the short-text batch profile.
832    let entity_texts: Vec<String> = extracted_entities
833        .iter()
834        .map(|entity| match &entity.description {
835            Some(desc) => format!("{} {}", entity.name, desc),
836            None => entity.name.clone(),
837        })
838        .collect();
839    // G56 (v1.0.80): ingest reuses canonical entity names across many
840    // memories (e.g. `sqlite-graphrag`, `claude-code`); the in-process
841    // cache collapses the repeated LLM calls into one per unique text.
842    let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
843        &paths.models,
844        &entity_texts,
845        llm_parallelism,
846        embedding_backend,
847        llm_backend,
848    ) {
849        Ok((entity_embeddings, embed_cache_stats)) => {
850            if embed_cache_stats.hits > 0 {
851                tracing::debug!(
852                    hits = embed_cache_stats.hits,
853                    misses = embed_cache_stats.misses,
854                    requested = embed_cache_stats.requested,
855                    "G56: entity embed cache hit (ingest)"
856                );
857            }
858            Some(entity_embeddings)
859        }
860        Err(e) if skip_embed => {
861            tracing::warn!(error = %e, file = %name, "ingest: entity embedding failed; --skip-embedding-on-failure active");
862            None
863        }
864        Err(e) => return Err(e),
865    };
866
867    Ok(StagedFile {
868        body: raw_body,
869        body_hash,
870        snippet,
871        name,
872        description,
873        embedding,
874        chunk_embeddings: chunk_embeddings_opt,
875        chunks_info,
876        entities: extracted_entities,
877        relationships: extracted_relationships,
878        entity_embeddings: entity_embeddings_opt,
879        urls: extracted_urls,
880        backend_invoked,
881    })
882}
883
884/// Links the staged entities and relationships to `memory_id` within `tx`.
885/// Shared by the create and `--force-merge` update paths so the graph-binding
886/// logic lives in one place.
887fn link_staged_graph(
888    tx: &Connection,
889    namespace: &str,
890    memory_id: i64,
891    staged: &StagedFile,
892) -> Result<(), AppError> {
893    if staged.entities.is_empty() && staged.relationships.is_empty() {
894        return Ok(());
895    }
896    for (idx, entity) in staged.entities.iter().enumerate() {
897        let entity_id = entities::upsert_entity(tx, namespace, entity)?;
898        if let Some(ref entity_embeddings) = staged.entity_embeddings {
899            if let Some(entity_embedding) = entity_embeddings.get(idx) {
900                entities::upsert_entity_vec(
901                    tx,
902                    entity_id,
903                    namespace,
904                    entity.entity_type,
905                    entity_embedding,
906                    &entity.name,
907                )?;
908            }
909        }
910        entities::link_memory_entity(tx, memory_id, entity_id)?;
911    }
912    let entity_types: std::collections::HashMap<&str, EntityType> = staged
913        .entities
914        .iter()
915        .map(|entity| (entity.name.as_str(), entity.entity_type))
916        .collect();
917
918    let mut affected_entity_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
919    for entity in &staged.entities {
920        if let Some(eid) = entities::find_entity_id(tx, namespace, &entity.name)? {
921            affected_entity_ids.insert(eid);
922        }
923    }
924
925    for rel in &staged.relationships {
926        let source_entity = NewEntity {
927            name: rel.source.clone(),
928            entity_type: entity_types
929                .get(rel.source.as_str())
930                .copied()
931                .unwrap_or(EntityType::Concept),
932            description: None,
933        };
934        let target_entity = NewEntity {
935            name: rel.target.clone(),
936            entity_type: entity_types
937                .get(rel.target.as_str())
938                .copied()
939                .unwrap_or(EntityType::Concept),
940            description: None,
941        };
942        let source_id = entities::upsert_entity(tx, namespace, &source_entity)?;
943        let target_id = entities::upsert_entity(tx, namespace, &target_entity)?;
944        let rel_id = entities::upsert_relationship(tx, namespace, source_id, target_id, rel)?;
945        entities::link_memory_relationship(tx, memory_id, rel_id)?;
946        affected_entity_ids.insert(source_id);
947        affected_entity_ids.insert(target_id);
948    }
949
950    for &eid in &affected_entity_ids {
951        entities::recalculate_degree(tx, eid)?;
952    }
953    Ok(())
954}
955
956/// Phase B: persists one `StagedFile` to the database on the main thread.
957///
958/// GAP-SG-54: when `force_merge` is true an existing memory with the same name
959/// is UPDATED (body/embedding/chunks/graph refreshed) instead of being rejected
960/// as a duplicate. GAP-SG-55: a memory whose `body_hash` already exists under a
961/// DIFFERENT name is skipped (content-level dedup) so divergent derived names do
962/// not duplicate identical content.
963fn persist_staged(
964    conn: &mut Connection,
965    namespace: &str,
966    memory_type: &str,
967    staged: StagedFile,
968    force_merge: bool,
969) -> Result<FileSuccess, AppError> {
970    {
971        let active_count: u32 = conn.query_row(
972            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
973            [],
974            |r| r.get::<_, i64>(0).map(|v| v as u32),
975        )?;
976        let ns_exists: bool = conn.query_row(
977            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
978            rusqlite::params![namespace],
979            |r| r.get::<_, i64>(0).map(|v| v > 0),
980        )?;
981        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
982            return Err(AppError::NamespaceError(format!(
983                "active namespace limit of {} exceeded while creating '{namespace}'",
984                crate::constants::MAX_NAMESPACES_ACTIVE
985            )));
986        }
987    }
988
989    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
990    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
991
992    let new_memory = NewMemory {
993        namespace: namespace.to_string(),
994        name: staged.name.clone(),
995        memory_type: memory_type.to_string(),
996        description: staged.description.clone(),
997        body: staged.body.clone(),
998        body_hash: staged.body_hash.clone(),
999        session_id: None,
1000        source: "agent".to_string(),
1001        metadata: serde_json::json!({}),
1002    };
1003    let body_length = new_memory.body.len();
1004    let metadata_json = serde_json::to_string(&new_memory.metadata)?;
1005
1006    match existing_memory {
1007        Some((existing_id, _updated_at, _version)) => {
1008            if !force_merge {
1009                return Err(AppError::Duplicate(errors_msg::duplicate_memory(
1010                    &staged.name,
1011                    namespace,
1012                )));
1013            }
1014
1015            // GAP-SG-54: --force-merge update path. Refresh body, embedding,
1016            // chunks and graph bindings of the existing memory.
1017            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1018
1019            let (old_name, old_desc, old_body): (String, String, String) = tx.query_row(
1020                "SELECT name, description, body FROM memories WHERE id = ?1",
1021                rusqlite::params![existing_id],
1022                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1023            )?;
1024
1025            let next_v = versions::next_version(&tx, existing_id)?;
1026            memories::update(&tx, existing_id, &new_memory, None)?;
1027            memories::sync_fts_after_update(
1028                &tx,
1029                existing_id,
1030                &old_name,
1031                &old_desc,
1032                &old_body,
1033                &staged.name,
1034                &staged.description,
1035                &new_memory.body,
1036            )?;
1037            versions::insert_version(
1038                &tx,
1039                existing_id,
1040                next_v,
1041                &staged.name,
1042                memory_type,
1043                &staged.description,
1044                &new_memory.body,
1045                &metadata_json,
1046                None,
1047                "edit",
1048            )?;
1049
1050            // Re-index chunks: drop the old slices then re-insert the staged set.
1051            storage_chunks::delete_chunks(&tx, existing_id)?;
1052            if let Some(ref emb) = staged.embedding {
1053                memories::upsert_vec(
1054                    &tx,
1055                    existing_id,
1056                    namespace,
1057                    memory_type,
1058                    emb,
1059                    &staged.name,
1060                    &staged.snippet,
1061                )?;
1062            }
1063            if staged.chunks_info.len() > 1 {
1064                storage_chunks::insert_chunk_slices(
1065                    &tx,
1066                    existing_id,
1067                    &new_memory.body,
1068                    &staged.chunks_info,
1069                )?;
1070                if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1071                    for (i, emb) in chunk_embeddings.iter().enumerate() {
1072                        storage_chunks::upsert_chunk_vec(
1073                            &tx,
1074                            i as i64,
1075                            existing_id,
1076                            i as i32,
1077                            emb,
1078                        )?;
1079                    }
1080                }
1081            }
1082
1083            link_staged_graph(&tx, namespace, existing_id, &staged)?;
1084            tx.commit()?;
1085
1086            Ok(FileSuccess {
1087                memory_id: existing_id,
1088                action: "updated".to_string(),
1089                body_length,
1090                backend_invoked: staged.backend_invoked,
1091            })
1092        }
1093        None => {
1094            // GAP-SG-55: identical content already stored under a different name
1095            // → skip creating a duplicate (reported as `skipped` by the caller).
1096            if let Some(hash_id) = duplicate_hash_id {
1097                return Err(AppError::Duplicate(format!(
1098                    "identical body already stored as memory id {hash_id} (dedup by body_hash); skipping '{}'",
1099                    staged.name
1100                )));
1101            }
1102
1103            let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1104            let memory_id = memories::insert(&tx, &new_memory)?;
1105            versions::insert_version(
1106                &tx,
1107                memory_id,
1108                1,
1109                &staged.name,
1110                memory_type,
1111                &staged.description,
1112                &new_memory.body,
1113                &metadata_json,
1114                None,
1115                "create",
1116            )?;
1117            if let Some(ref emb) = staged.embedding {
1118                memories::upsert_vec(
1119                    &tx,
1120                    memory_id,
1121                    namespace,
1122                    memory_type,
1123                    emb,
1124                    &staged.name,
1125                    &staged.snippet,
1126                )?;
1127            }
1128            if staged.chunks_info.len() > 1 {
1129                storage_chunks::insert_chunk_slices(
1130                    &tx,
1131                    memory_id,
1132                    &new_memory.body,
1133                    &staged.chunks_info,
1134                )?;
1135                if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1136                    for (i, emb) in chunk_embeddings.iter().enumerate() {
1137                        storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
1138                    }
1139                }
1140            }
1141            link_staged_graph(&tx, namespace, memory_id, &staged)?;
1142            tx.commit()?;
1143
1144            if !staged.urls.is_empty() {
1145                let url_entries: Vec<storage_urls::MemoryUrl> = staged
1146                    .urls
1147                    .into_iter()
1148                    .map(|u| storage_urls::MemoryUrl {
1149                        url: u.url,
1150                        offset: Some(u.start as i64),
1151                    })
1152                    .collect();
1153                let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
1154            }
1155
1156            Ok(FileSuccess {
1157                memory_id,
1158                action: "created".to_string(),
1159                body_length,
1160                backend_invoked: staged.backend_invoked,
1161            })
1162        }
1163    }
1164}
1165
1166// ---------------------------------------------------------------------------
1167// G20: mode-conditional flag validation
1168// ---------------------------------------------------------------------------
1169
1170/// True when a scalar value matches its declared default. Local
1171/// re-declaration (also defined in ) to keep this module
1172/// self-contained for the G20 fix.
1173fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1174    value == default
1175}
1176
1177/// G20: validate that flags for one LLM provider were not passed when
1178/// the operator selected a different provider (or no provider). Flags
1179/// silently discarded by the wrong mode are surfaced as
1180///  BEFORE any DB work, so the operator gets
1181/// an actionable error instead of a surprise at runtime.
1182///
1183/// Mode-specific matrices:
1184/// - `mode=none` and `mode=gliner` reject: claude_binary, claude_model,
1185///   claude_timeout!=300, max_cost_usd, resume, retry_failed, keep_queue,
1186///   codex_binary, codex_model, codex_timeout!=300, gliner_variant (if
1187///   --enable-ner is false)
1188/// - `mode=claude-code` rejects: codex_binary, codex_model, codex_timeout!=300
1189/// - `mode=codex` rejects: claude_binary, claude_model, claude_timeout!=300,
1190///   max_cost_usd, resume, retry_failed, keep_queue
1191fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
1192    const DEFAULT_TIMEOUT: u64 = 300;
1193    const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
1194
1195    let mut conflicts: Vec<String> = Vec::new();
1196
1197    let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
1198
1199    // v1.1.1 (P12): --name-prefix is only applied by the local staging path;
1200    // rejecting it under LLM modes avoids a silently unprefixed corpus.
1201    if args.name_prefix.is_some() && !is_local_mode {
1202        return Err(AppError::Validation(
1203            "--name-prefix is not supported with --mode claude-code/codex/opencode; \
1204             use --mode none (default) or gliner"
1205                .to_string(),
1206        ));
1207    }
1208
1209    if is_local_mode {
1210        if args.claude_binary.is_some() {
1211            conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
1212        }
1213        if args.claude_model.is_some() {
1214            conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
1215        }
1216        if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1217            conflicts.push(format!(
1218                "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1219                args.claude_timeout
1220            ));
1221        }
1222        if args.codex_binary.is_some() {
1223            conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
1224        }
1225        if args.codex_model.is_some() {
1226            conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
1227        }
1228        if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1229            conflicts.push(format!(
1230                "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1231                args.codex_timeout
1232            ));
1233        }
1234        if args.opencode_binary.is_some() {
1235            conflicts
1236                .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
1237        }
1238        if args.opencode_model.is_some() {
1239            conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
1240        }
1241        if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1242            conflicts.push(format!(
1243                "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1244                args.opencode_timeout
1245            ));
1246        }
1247        if args.max_cost_usd.is_some() {
1248            conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1249        }
1250        if args.resume {
1251            conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1252        }
1253        if args.retry_failed {
1254            conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1255        }
1256        if args.keep_queue {
1257            conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1258        }
1259        if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1260            conflicts.push(format!(
1261                "--rate-limit-wait={} is ignored when --mode is none or gliner",
1262                args.rate_limit_wait
1263            ));
1264        }
1265    }
1266
1267    match args.mode {
1268        IngestMode::ClaudeCode => {
1269            if args.codex_binary.is_some() {
1270                conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1271            }
1272            if args.codex_model.is_some() {
1273                conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1274            }
1275            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1276                conflicts.push(format!(
1277                    "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1278                    args.codex_timeout
1279                ));
1280            }
1281            if args.opencode_binary.is_some() {
1282                conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1283            }
1284            if args.opencode_model.is_some() {
1285                conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1286            }
1287            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1288                conflicts.push(format!(
1289                    "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1290                    args.opencode_timeout
1291                ));
1292            }
1293        }
1294        IngestMode::Codex => {
1295            if args.claude_binary.is_some() {
1296                conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1297            }
1298            if args.claude_model.is_some() {
1299                conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1300            }
1301            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1302                conflicts.push(format!(
1303                    "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1304                    args.claude_timeout
1305                ));
1306            }
1307            if args.max_cost_usd.is_some() {
1308                conflicts.push(
1309                    "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1310                        .to_string(),
1311                );
1312            }
1313            if args.resume {
1314                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1315            }
1316            if args.retry_failed {
1317                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1318            }
1319            if args.keep_queue {
1320                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1321            }
1322            if args.opencode_binary.is_some() {
1323                conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1324            }
1325            if args.opencode_model.is_some() {
1326                conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1327            }
1328            if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1329                conflicts.push(format!(
1330                    "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1331                    args.opencode_timeout
1332                ));
1333            }
1334        }
1335        IngestMode::Opencode => {
1336            if args.claude_binary.is_some() {
1337                conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1338            }
1339            if args.claude_model.is_some() {
1340                conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1341            }
1342            if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1343                conflicts.push(format!(
1344                    "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1345                    args.claude_timeout
1346                ));
1347            }
1348            if args.codex_binary.is_some() {
1349                conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1350            }
1351            if args.codex_model.is_some() {
1352                conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1353            }
1354            if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1355                conflicts.push(format!(
1356                    "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1357                    args.codex_timeout
1358                ));
1359            }
1360            if args.max_cost_usd.is_some() {
1361                conflicts.push(
1362                    "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1363                        .to_string(),
1364                );
1365            }
1366            if args.resume {
1367                conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1368            }
1369            if args.retry_failed {
1370                conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1371            }
1372            if args.keep_queue {
1373                conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1374            }
1375        }
1376        IngestMode::None | IngestMode::Gliner => {}
1377    }
1378
1379    if !conflicts.is_empty() {
1380        return Err(AppError::Validation(format!(
1381            "G20: mode-conditional flag conflicts detected for --mode={:?}:\n  - {}",
1382            args.mode,
1383            conflicts.join("\n  - ")
1384        )));
1385    }
1386
1387    Ok(())
1388}
1389
1390// ---------------------------------------------------------------------------
1391
1392#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1393pub fn run(
1394    args: IngestArgs,
1395    llm_backend: crate::cli::LlmBackendChoice,
1396    embedding_backend: crate::cli::EmbeddingBackendChoice,
1397) -> Result<(), AppError> {
1398    // G20: mode-conditional flag validation BEFORE any DB access.
1399    // Surfaces flags that the wrong mode would silently discard.
1400    validate_mode_conditional_flags_ingest(&args)?;
1401    tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1402    if args.mode == IngestMode::ClaudeCode {
1403        return super::ingest_claude::run_claude_ingest(&args, embedding_backend, llm_backend);
1404    }
1405    if args.mode == IngestMode::Codex {
1406        return super::ingest_codex::run_codex_ingest(&args);
1407    }
1408    if args.mode == IngestMode::Opencode {
1409        return super::ingest_opencode::run_opencode_ingest(&args);
1410    }
1411
1412    let started = std::time::Instant::now();
1413
1414    if !args.dir.exists() {
1415        return Err(AppError::Validation(format!(
1416            "directory not found: {}",
1417            args.dir.display()
1418        )));
1419    }
1420    if !args.dir.is_dir() {
1421        return Err(AppError::Validation(format!(
1422            "path is not a directory: {}",
1423            args.dir.display()
1424        )));
1425    }
1426
1427    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1428    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1429    files.sort_unstable();
1430
1431    if files.len() > args.max_files {
1432        return Err(AppError::Validation(format!(
1433            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1434            files.len(),
1435            args.max_files
1436        )));
1437    }
1438
1439    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1440    let memory_type_str = args.r#type.as_str().to_string();
1441
1442    let paths = AppPaths::resolve(args.db.as_deref())?;
1443    let mut conn_or_err = match init_storage(&paths) {
1444        Ok(c) => Ok(c),
1445        Err(e) => Err(format!("{e}")),
1446    };
1447
1448    let mut succeeded: usize = 0;
1449    let mut failed: usize = 0;
1450    let mut skipped: usize = 0;
1451    let total = files.len();
1452
1453    // Pre-resolve all names before parallelisation so Phase A workers see a
1454    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
1455    let mut taken_names: BTreeSet<String> = BTreeSet::new();
1456
1457    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
1458    // ProcessItem: the data moved into the producer thread for Phase A computation.
1459    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
1460    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
1461    // boundary into the rayon producer.
1462    enum SlotMeta {
1463        Skip {
1464            file_str: String,
1465            derived_base: String,
1466            name_truncated: bool,
1467            original_name: Option<String>,
1468            original_filename: Option<String>,
1469            reason: String,
1470        },
1471        Process {
1472            file_str: String,
1473            derived_name: String,
1474            name_truncated: bool,
1475            original_name: Option<String>,
1476            original_filename: Option<String>,
1477        },
1478    }
1479
1480    struct ProcessItem {
1481        idx: usize,
1482        path: PathBuf,
1483        file_str: String,
1484        derived_name: String,
1485    }
1486
1487    let files_cap = files.len();
1488    let mut slots_meta: Vec<SlotMeta> = Vec::new();
1489    slots_meta.try_reserve(files_cap).map_err(|_| {
1490        AppError::LimitExceeded(format!(
1491            "allocation of {files_cap} slot metadata entries would exceed available memory"
1492        ))
1493    })?;
1494    let mut process_items: Vec<ProcessItem> = Vec::new();
1495    process_items.try_reserve(files_cap).map_err(|_| {
1496        AppError::LimitExceeded(format!(
1497            "allocation of {files_cap} process items would exceed available memory"
1498        ))
1499    })?;
1500    let mut truncations: Vec<(String, String)> = Vec::new();
1501    truncations.try_reserve(files_cap).map_err(|_| {
1502        AppError::LimitExceeded(format!(
1503            "allocation of {files_cap} truncation entries would exceed available memory"
1504        ))
1505    })?;
1506
1507    // v1.1.1 (P12): validate the prefix once and shrink the derived-name
1508    // budget so `prefix + derived` always fits MAX_MEMORY_NAME_LEN.
1509    let max_name_length = match args.name_prefix.as_deref() {
1510        Some(prefix) => validate_name_prefix(prefix, args.max_name_length)?,
1511        None => args.max_name_length,
1512    };
1513    for path in &files {
1514        let file_str = path.to_string_lossy().into_owned();
1515        let (derived_base, name_truncated, original_name) =
1516            derive_kebab_name(path, max_name_length);
1517        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1518
1519        if name_truncated {
1520            if let Some(ref orig) = original_name {
1521                truncations.push((orig.clone(), derived_base.clone()));
1522            }
1523        }
1524
1525        if derived_base.is_empty() {
1526            // original_filename: always include when it differs from the empty derived name
1527            let orig_filename = if !original_basename.is_empty() {
1528                Some(original_basename.to_string())
1529            } else {
1530                None
1531            };
1532            slots_meta.push(SlotMeta::Skip {
1533                file_str,
1534                derived_base: String::new(),
1535                name_truncated: false,
1536                original_name: None,
1537                original_filename: orig_filename,
1538                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1539            });
1540            continue;
1541        }
1542
1543        // v1.1.1 (P12): prefix applied AFTER kebab normalization of the
1544        // basename; the shrunken budget above guarantees the final length
1545        // fits MAX_MEMORY_NAME_LEN.
1546        let derived_base = match args.name_prefix.as_deref() {
1547            Some(prefix) => format!("{prefix}{derived_base}"),
1548            None => derived_base,
1549        };
1550
1551        match unique_name(&derived_base, &taken_names) {
1552            Ok(derived_name) => {
1553                taken_names.insert(derived_name.clone());
1554                let idx = slots_meta.len();
1555                // original_filename: present only when the raw basename differs from the derived name
1556                let orig_filename = if original_basename != derived_name {
1557                    Some(original_basename.to_string())
1558                } else {
1559                    None
1560                };
1561                process_items.push(ProcessItem {
1562                    idx,
1563                    path: path.clone(),
1564                    file_str: file_str.clone(),
1565                    derived_name: derived_name.clone(),
1566                });
1567                slots_meta.push(SlotMeta::Process {
1568                    file_str,
1569                    derived_name,
1570                    name_truncated,
1571                    original_name,
1572                    original_filename: orig_filename,
1573                });
1574            }
1575            Err(e) => {
1576                let orig_filename = if original_basename != derived_base {
1577                    Some(original_basename.to_string())
1578                } else {
1579                    None
1580                };
1581                slots_meta.push(SlotMeta::Skip {
1582                    file_str,
1583                    derived_base,
1584                    name_truncated,
1585                    original_name,
1586                    original_filename: orig_filename,
1587                    reason: e.to_string(),
1588                });
1589            }
1590        }
1591    }
1592
1593    if !truncations.is_empty() {
1594        tracing::info!(
1595            target: "ingest",
1596            count = truncations.len(),
1597            max_name_length = max_name_length,
1598            max_len = DERIVED_NAME_MAX_LEN,
1599            "derived names truncated; pass -vv (debug) for per-file detail"
1600        );
1601    }
1602
1603    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
1604    if args.dry_run {
1605        for meta in &slots_meta {
1606            match meta {
1607                SlotMeta::Skip {
1608                    file_str,
1609                    derived_base,
1610                    name_truncated,
1611                    original_name,
1612                    original_filename,
1613                    reason,
1614                } => {
1615                    output::emit_json_compact(&IngestFileEvent {
1616                        file: file_str,
1617                        name: derived_base,
1618                        status: "skip",
1619                        truncated: *name_truncated,
1620                        original_name: original_name.clone(),
1621                        original_filename: original_filename.as_deref(),
1622                        error: Some(reason.clone()),
1623                        memory_id: None,
1624                        action: None,
1625                        body_length: 0,
1626                        backend_invoked: None,
1627                    })?;
1628                }
1629                SlotMeta::Process {
1630                    file_str,
1631                    derived_name,
1632                    name_truncated,
1633                    original_name,
1634                    original_filename,
1635                } => {
1636                    output::emit_json_compact(&IngestFileEvent {
1637                        file: file_str,
1638                        name: derived_name,
1639                        status: "preview",
1640                        truncated: *name_truncated,
1641                        original_name: original_name.clone(),
1642                        original_filename: original_filename.as_deref(),
1643                        error: None,
1644                        memory_id: None,
1645                        action: None,
1646                        body_length: 0,
1647                        backend_invoked: None,
1648                    })?;
1649
1650                    // GAP-SG-06: report chunk + token counts and how many
1651                    // sub-memories an auto-split would create, so the operator
1652                    // detects chunk/token overflow before a real ingest.
1653                    match std::fs::read_to_string(file_str) {
1654                        Ok(body) => {
1655                            let budget = chunking::assess_body_budget(&body);
1656                            output::emit_json_compact(&IngestDryRunBudget {
1657                                budget: true,
1658                                file: file_str,
1659                                name: derived_name,
1660                                bytes: budget.bytes,
1661                                chunk_count: budget.chunk_count,
1662                                token_count: budget.approx_tokens,
1663                                partition_count: budget.partition_count,
1664                                exceeds_limits: budget.exceeds_limits,
1665                            })?;
1666                        }
1667                        Err(e) => {
1668                            tracing::warn!(
1669                                target: "ingest",
1670                                file = %file_str,
1671                                "dry-run: could not read file for budget assessment: {e}"
1672                            );
1673                        }
1674                    }
1675                }
1676            }
1677        }
1678        output::emit_json_compact(&IngestSummary {
1679            summary: true,
1680            dir: args.dir.to_string_lossy().into_owned(),
1681            pattern: args.pattern.clone(),
1682            recursive: args.recursive,
1683            files_total: total,
1684            files_succeeded: 0,
1685            files_failed: 0,
1686            files_skipped: 0,
1687            elapsed_ms: started.elapsed().as_millis() as u64,
1688        })?;
1689        return Ok(());
1690    }
1691
1692    // Reject contradictory flag combination: explicit parallelism > 1 with --low-memory.
1693    if args.low_memory {
1694        if let Some(n) = args.ingest_parallelism {
1695            if n > 1 {
1696                return Err(AppError::Validation(
1697                    "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1698                        .to_string(),
1699                ));
1700            }
1701        }
1702    }
1703
1704    // Determine rayon thread pool size, honoring --low-memory and the
1705    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
1706    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1707
1708    let pool = rayon::ThreadPoolBuilder::new()
1709        .num_threads(parallelism)
1710        .build()
1711        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1712
1713    if args.enable_ner && args.skip_extraction {
1714        return Err(AppError::Validation(
1715            "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1716        ));
1717    }
1718    if args.skip_extraction && !args.enable_ner {
1719        // v1.0.74: revert to v1.0.45 hidden no-op behavior. The v1.0.67
1720        // commit (9ddb17b) promoted this to a hard validation error, which
1721        // broke the "kept as a hidden no-op for backwards compatibility"
1722        // promise documented in CHANGELOG v1.0.45 and started failing
1723        // 5+ CI jobs whose E2E tests use this flag to skip the
1724        // GLiNER-ONNX model download in CI environments.
1725        tracing::warn!(
1726            "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1727        );
1728    }
1729    let enable_ner = args.enable_ner;
1730    let auto_describe = args.auto_describe && !args.no_auto_describe;
1731    let max_rss_mb = args.max_rss_mb;
1732    let llm_parallelism = args.llm_parallelism as usize;
1733    // v1.0.79: `--mode gliner` and `--gliner-variant` are no-ops kept for
1734    // compatibility (the GLiNER pipeline was removed); warn explicitly so
1735    // callers do not silently expect NER-quality extraction.
1736    if args.mode == IngestMode::Gliner {
1737        tracing::warn!(
1738            "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1739        );
1740    }
1741    if args.gliner_variant != "fp32" {
1742        tracing::warn!(
1743            "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1744        );
1745    }
1746    let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1747        "int8" => crate::extraction::GlinerVariant::Int8,
1748        _ => crate::extraction::GlinerVariant::Fp32,
1749    };
1750
1751    let total_to_process = process_items.len();
1752    tracing::info!(
1753        target: "ingest",
1754        phase = "pipeline_start",
1755        files = total_to_process,
1756        ingest_parallelism = parallelism,
1757        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1758    );
1759
1760    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1761    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1762    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1763    let channel_bound = (parallelism * 2).max(1);
1764    let (tx, rx) = mpsc::sync_channel::<(usize, Result<Vec<StagedFile>, AppError>)>(channel_bound);
1765
1766    // Phase A: launched in a dedicated OS thread so the main thread can consume
1767    // the channel concurrently. pool.install() blocks the calling thread until
1768    // all rayon workers finish — if called on the main thread it would
1769    // reintroduce the 2-phase blocking behaviour we are eliminating.
1770    let paths_owned = paths.clone();
1771    let llm_backend_owned = llm_backend;
1772    let embedding_backend_owned = embedding_backend;
1773    let producer_handle = std::thread::spawn(move || {
1774        pool.install(|| {
1775            process_items.into_par_iter().for_each(|item| {
1776                if crate::shutdown_requested() {
1777                    return;
1778                }
1779                let t0 = std::time::Instant::now();
1780                let result = stage_file(
1781                    item.idx,
1782                    &item.path,
1783                    &item.derived_name,
1784                    &paths_owned,
1785                    enable_ner,
1786                    gliner_variant,
1787                    max_rss_mb,
1788                    llm_parallelism,
1789                    llm_backend_owned,
1790                    embedding_backend_owned,
1791                    auto_describe,
1792                );
1793                let elapsed_ms = t0.elapsed().as_millis() as u64;
1794
1795                // Emit NDJSON progress event to stderr so the user sees work
1796                // happening during long NER runs (e.g. 50 files × 27s each).
1797                let (n_entities, n_relationships) = match &result {
1798                    Ok(parts) => (
1799                        parts.iter().map(|sf| sf.entities.len()).sum::<usize>(),
1800                        parts.iter().map(|sf| sf.relationships.len()).sum::<usize>(),
1801                    ),
1802                    Err(_) => (0, 0),
1803                };
1804                let progress = StageProgressEvent {
1805                    schema_version: 1,
1806                    event: "file_extracted",
1807                    path: &item.file_str,
1808                    ms: elapsed_ms,
1809                    entities: n_entities,
1810                    relationships: n_relationships,
1811                };
1812                if let Ok(line) = serde_json::to_string(&progress) {
1813                    tracing::info!(target: "ingest_progress", "{}", line);
1814                }
1815
1816                // Blocking send applies backpressure: if Phase B is slower,
1817                // Phase A workers wait here instead of accumulating staged files
1818                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1819                let _ = tx.send((item.idx, result));
1820            });
1821            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1822            drop(tx);
1823        });
1824    });
1825
1826    // Phase B: main thread persists files as results arrive from the channel.
1827    // Results arrive in completion order (par_iter is unordered). We persist
1828    // each file immediately on arrival — this is the key fix for B1: with the
1829    // old 2-phase design the first DB write happened only after ALL files had
1830    // finished Phase A. Now the first commit happens as soon as the first file
1831    // completes Phase A, regardless of how many files remain.
1832    //
1833    // NDJSON output order follows completion order (not file-system sort order).
1834    // Skip slots are emitted at the end, after all Process results are consumed.
1835    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1836    // requirement than ensuring data is persisted before the user's timeout fires.
1837    let fail_fast = args.fail_fast;
1838
1839    // Emit pending Skip events first so agents see them early.
1840    for meta in &slots_meta {
1841        if let SlotMeta::Skip {
1842            file_str,
1843            derived_base,
1844            name_truncated,
1845            original_name,
1846            original_filename,
1847            reason,
1848        } = meta
1849        {
1850            output::emit_json_compact(&IngestFileEvent {
1851                file: file_str,
1852                name: derived_base,
1853                status: "skipped",
1854                truncated: *name_truncated,
1855                original_name: original_name.clone(),
1856                original_filename: original_filename.as_deref(),
1857                error: Some(reason.clone()),
1858                memory_id: None,
1859                action: None,
1860                body_length: 0,
1861                backend_invoked: None,
1862            })?;
1863            skipped += 1;
1864        }
1865    }
1866
1867    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1868    // as channel messages arrive in completion order.
1869    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1870        .iter()
1871        .enumerate()
1872        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1873        .collect();
1874
1875    tracing::info!(
1876        target: "ingest",
1877        phase = "persist_start",
1878        files = total_to_process,
1879        "phase B starting: persisting files incrementally as Phase A completes each one",
1880    );
1881
1882    // Drain channel and persist each file immediately — no accumulation into a
1883    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1884    // Phase B without applying backpressure.
1885    for (idx, stage_result) in rx {
1886        if crate::shutdown_requested() {
1887            tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1888            break;
1889        }
1890        let meta = meta_index.get(&idx).ok_or_else(|| {
1891            AppError::Internal(anyhow::anyhow!(
1892                "channel idx {idx} has no corresponding Process slot"
1893            ))
1894        })?;
1895        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1896        {
1897            SlotMeta::Process {
1898                file_str,
1899                derived_name,
1900                name_truncated,
1901                original_name,
1902                original_filename,
1903            } => (
1904                file_str,
1905                derived_name,
1906                name_truncated,
1907                original_name,
1908                original_filename,
1909            ),
1910            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1911        };
1912
1913        // If storage init failed, every file fails with the same error.
1914        let conn = match conn_or_err.as_mut() {
1915            Ok(c) => c,
1916            Err(err_msg) => {
1917                let err_clone = err_msg.clone();
1918                output::emit_json_compact(&IngestFileEvent {
1919                    file: file_str,
1920                    name: derived_name,
1921                    status: "failed",
1922                    truncated: *name_truncated,
1923                    original_name: original_name.clone(),
1924                    original_filename: original_filename.as_deref(),
1925                    error: Some(err_clone.clone()),
1926                    memory_id: None,
1927                    action: None,
1928                    body_length: 0,
1929                    backend_invoked: None,
1930                })?;
1931                failed += 1;
1932                if fail_fast {
1933                    output::emit_json_compact(&IngestSummary {
1934                        summary: true,
1935                        dir: args.dir.display().to_string(),
1936                        pattern: args.pattern.clone(),
1937                        recursive: args.recursive,
1938                        files_total: total,
1939                        files_succeeded: succeeded,
1940                        files_failed: failed,
1941                        files_skipped: skipped,
1942                        elapsed_ms: started.elapsed().as_millis() as u64,
1943                    })?;
1944                    return Err(AppError::Validation(format!(
1945                        "ingest aborted on first failure: {err_clone}"
1946                    )));
1947                }
1948                continue;
1949            }
1950        };
1951
1952        match stage_result {
1953            Ok(parts) => {
1954                // GAP-SG-04/07: one source file can stage as multiple
1955                // sub-memories (auto-split partitions); persist and report each.
1956                for staged in parts {
1957                    let part_name = staged.name.clone();
1958                    match persist_staged(
1959                        conn,
1960                        &namespace,
1961                        &memory_type_str,
1962                        staged,
1963                        args.force_merge,
1964                    ) {
1965                        Ok(FileSuccess {
1966                            memory_id,
1967                            action,
1968                            body_length,
1969                            backend_invoked: file_backend_invoked,
1970                        }) => {
1971                            output::emit_json_compact(&IngestFileEvent {
1972                                file: file_str,
1973                                name: &part_name,
1974                                status: "indexed",
1975                                truncated: *name_truncated,
1976                                original_name: original_name.clone(),
1977                                original_filename: original_filename.as_deref(),
1978                                error: None,
1979                                memory_id: Some(memory_id),
1980                                action: Some(action),
1981                                body_length,
1982                                backend_invoked: file_backend_invoked,
1983                            })?;
1984                            succeeded += 1;
1985                        }
1986                        Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1987                            output::emit_json_compact(&IngestFileEvent {
1988                                file: file_str,
1989                                name: &part_name,
1990                                status: "skipped",
1991                                truncated: *name_truncated,
1992                                original_name: original_name.clone(),
1993                                original_filename: original_filename.as_deref(),
1994                                error: Some(format!("{e}")),
1995                                memory_id: None,
1996                                action: Some("duplicate".to_string()),
1997                                body_length: 0,
1998                                backend_invoked: None,
1999                            })?;
2000                            skipped += 1;
2001                        }
2002                        Err(e) => {
2003                            let err_msg = format!("{e}");
2004                            output::emit_json_compact(&IngestFileEvent {
2005                                file: file_str,
2006                                name: &part_name,
2007                                status: "failed",
2008                                truncated: *name_truncated,
2009                                original_name: original_name.clone(),
2010                                original_filename: original_filename.as_deref(),
2011                                error: Some(err_msg.clone()),
2012                                memory_id: None,
2013                                action: None,
2014                                body_length: 0,
2015                                backend_invoked: None,
2016                            })?;
2017                            failed += 1;
2018                            if fail_fast {
2019                                output::emit_json_compact(&IngestSummary {
2020                                    summary: true,
2021                                    dir: args.dir.display().to_string(),
2022                                    pattern: args.pattern.clone(),
2023                                    recursive: args.recursive,
2024                                    files_total: total,
2025                                    files_succeeded: succeeded,
2026                                    files_failed: failed,
2027                                    files_skipped: skipped,
2028                                    elapsed_ms: started.elapsed().as_millis() as u64,
2029                                })?;
2030                                return Err(AppError::Validation(format!(
2031                                    "ingest aborted on first failure: {err_msg}"
2032                                )));
2033                            }
2034                        }
2035                    }
2036                }
2037            }
2038            Err(e) => {
2039                let err_msg = format!("{e}");
2040                output::emit_json_compact(&IngestFileEvent {
2041                    file: file_str,
2042                    name: derived_name,
2043                    status: "failed",
2044                    truncated: *name_truncated,
2045                    original_name: original_name.clone(),
2046                    original_filename: original_filename.as_deref(),
2047                    error: Some(err_msg.clone()),
2048                    memory_id: None,
2049                    action: None,
2050                    body_length: 0,
2051                    backend_invoked: None,
2052                })?;
2053                failed += 1;
2054                if fail_fast {
2055                    output::emit_json_compact(&IngestSummary {
2056                        summary: true,
2057                        dir: args.dir.display().to_string(),
2058                        pattern: args.pattern.clone(),
2059                        recursive: args.recursive,
2060                        files_total: total,
2061                        files_succeeded: succeeded,
2062                        files_failed: failed,
2063                        files_skipped: skipped,
2064                        elapsed_ms: started.elapsed().as_millis() as u64,
2065                    })?;
2066                    return Err(AppError::Validation(format!(
2067                        "ingest aborted on first failure: {err_msg}"
2068                    )));
2069                }
2070            }
2071        }
2072    }
2073
2074    // Wait for the producer thread to finish cleanly.
2075    producer_handle
2076        .join()
2077        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
2078
2079    if let Ok(ref conn) = conn_or_err {
2080        if succeeded > 0 {
2081            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2082        }
2083    }
2084
2085    output::emit_json_compact(&IngestSummary {
2086        summary: true,
2087        dir: args.dir.display().to_string(),
2088        pattern: args.pattern.clone(),
2089        recursive: args.recursive,
2090        files_total: total,
2091        files_succeeded: succeeded,
2092        files_failed: failed,
2093        files_skipped: skipped,
2094        elapsed_ms: started.elapsed().as_millis() as u64,
2095    })?;
2096
2097    if args.enrich_after && succeeded > 0 {
2098        output::emit_json_compact(&serde_json::json!({
2099            "event": "enrich_phase_started",
2100            "operation": "memory-bindings"
2101        }))?;
2102        let enrich_args = super::enrich::EnrichArgs {
2103            operation: Some(super::enrich::EnrichOperation::MemoryBindings),
2104            mode: Some(super::enrich::EnrichMode::ClaudeCode),
2105            limit: None,
2106            target: super::enrich::ReEmbedTarget::Memories,
2107            dry_run: false,
2108            namespace: args.namespace.clone(),
2109            claude_binary: args.claude_binary.clone(),
2110            claude_model: args.claude_model.clone(),
2111            claude_timeout: args.claude_timeout,
2112            codex_binary: args.codex_binary.clone(),
2113            codex_model: args.codex_model.clone(),
2114            codex_timeout: args.codex_timeout,
2115            opencode_binary: args.opencode_binary.clone(),
2116            opencode_model: args.opencode_model.clone(),
2117            opencode_timeout: args.opencode_timeout,
2118            openrouter_model: None,
2119            openrouter_api_key: None,
2120            openrouter_timeout: 300,
2121            openrouter_base_url: None,
2122            db: args.db.clone(),
2123            json: false,
2124            resume: false,
2125            retry_failed: false,
2126            max_cost_usd: args.max_cost_usd,
2127            llm_parallelism: args.llm_parallelism as u32,
2128            wait_job_singleton: args.wait_job_singleton,
2129            force_job_singleton: args.force_job_singleton,
2130            names: Vec::new(),
2131            names_file: None,
2132            preflight_check: false,
2133            fallback_mode: None,
2134            rate_limit_buffer: 300,
2135            max_load_check: true,
2136            circuit_breaker_threshold: 5,
2137            preserve_threshold: 0.7,
2138            codex_model_validate: true,
2139            codex_model_fallback: None,
2140            min_output_chars: 500,
2141            max_output_chars: 2000,
2142            preserve_check: true,
2143            prompt_template: None,
2144            until_empty: false,
2145            max_runtime: None,
2146            max_attempts: 5,
2147            status: false,
2148            rest_concurrency: None,
2149            // enrich-after runs a plain memory-bindings pass; dead-letter,
2150            // backoff-ignore and graph-only flags stay at their defaults.
2151            list_dead: false,
2152            requeue_dead: false,
2153            prune_dead_orphans: false,
2154            ignore_backoff: false,
2155            body_extract_graph_only: false,
2156        };
2157        match super::enrich::run(&enrich_args, llm_backend, embedding_backend) {
2158            Ok(()) => {
2159                output::emit_json_compact(&serde_json::json!({
2160                    "event": "enrich_phase_completed"
2161                }))?;
2162            }
2163            Err(e) => {
2164                tracing::warn!(error = %e, "enrich --operation memory-bindings failed after ingest");
2165                output::emit_json_compact(&serde_json::json!({
2166                    "event": "enrich_phase_failed",
2167                    "error": e.to_string()
2168                }))?;
2169            }
2170        }
2171    }
2172
2173    Ok(())
2174}
2175
2176/// Auto-initialises the database (matches the contract of every other CRUD
2177/// handler) and returns a fresh read/write connection ready for the ingest
2178/// loop. Errors here are recoverable per-file: the caller surfaces them as
2179/// failure events so `--fail-fast` and the continue-on-error path keep
2180/// working when, for example, the user points `--db` at an unwritable path.
2181fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
2182    ensure_db_ready(paths)?;
2183    let conn = open_rw(&paths.db)?;
2184    Ok(conn)
2185}
2186
2187pub(crate) fn collect_files(
2188    dir: &Path,
2189    pattern: &str,
2190    recursive: bool,
2191    out: &mut Vec<PathBuf>,
2192) -> Result<(), AppError> {
2193    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
2194    for entry in entries {
2195        let entry = entry.map_err(AppError::Io)?;
2196        let path = entry.path();
2197        let file_type = entry.file_type().map_err(AppError::Io)?;
2198        if file_type.is_file() {
2199            let name = entry.file_name();
2200            let name_str = name.to_string_lossy();
2201            if matches_pattern(&name_str, pattern) {
2202                out.push(path);
2203            }
2204        } else if file_type.is_dir() && recursive {
2205            collect_files(&path, pattern, recursive, out)?;
2206        }
2207    }
2208    Ok(())
2209}
2210
2211fn matches_pattern(name: &str, pattern: &str) -> bool {
2212    if let Some(suffix) = pattern.strip_prefix('*') {
2213        name.ends_with(suffix)
2214    } else if let Some(prefix) = pattern.strip_suffix('*') {
2215        name.starts_with(prefix)
2216    } else {
2217        name == pattern
2218    }
2219}
2220
2221/// Returns `(final_name, truncated, original_name)`.
2222/// `truncated` is true when the derived name exceeded `max_len`.
2223/// `original_name` holds the pre-truncation name only when `truncated=true`.
2224///
2225/// Non-ASCII characters are first decomposed via NFD and then stripped of
2226/// combining marks so accented letters fold to their base ASCII letter
2227/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
2228/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
2229/// preserves meaningful word content rather than collapsing the basename
2230/// to a few stray ASCII letters as the previous filter did.
2231/// v1.1.1 (P12): validates `--name-prefix` and returns the effective budget
2232/// for the DERIVED part of the name, so `prefix + derived` never exceeds
2233/// [`crate::constants::MAX_MEMORY_NAME_LEN`]. The prefix is applied verbatim
2234/// AFTER kebab normalization of the basename, so it must itself be a valid
2235/// slug head: starting with a lowercase letter and containing only
2236/// lowercase letters, digits and hyphens.
2237pub(crate) fn validate_name_prefix(
2238    prefix: &str,
2239    max_name_length: usize,
2240) -> Result<usize, AppError> {
2241    if prefix.is_empty() {
2242        return Err(AppError::Validation(
2243            "--name-prefix cannot be empty".to_string(),
2244        ));
2245    }
2246    let starts_lower = prefix
2247        .chars()
2248        .next()
2249        .is_some_and(|c| c.is_ascii_lowercase());
2250    let all_slug_chars = prefix
2251        .chars()
2252        .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-');
2253    if !starts_lower || !all_slug_chars {
2254        return Err(AppError::Validation(format!(
2255            "--name-prefix '{prefix}' must start with a lowercase letter and contain \
2256             only lowercase letters, digits and hyphens (kebab-case)"
2257        )));
2258    }
2259    let cap = crate::constants::MAX_MEMORY_NAME_LEN;
2260    if prefix.len() >= cap {
2261        return Err(AppError::LimitExceeded(format!(
2262            "--name-prefix is {} chars; prefixed names would exceed the {cap}-char \
2263             name cap (MAX_MEMORY_NAME_LEN)",
2264            prefix.len()
2265        )));
2266    }
2267    Ok(max_name_length.min(cap - prefix.len()))
2268}
2269
2270pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
2271    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
2272    let lowered: String = stem
2273        .nfd()
2274        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
2275        .map(|c| {
2276            if c == '_' || c.is_whitespace() {
2277                '-'
2278            } else {
2279                c
2280            }
2281        })
2282        .map(|c| c.to_ascii_lowercase())
2283        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
2284        .collect();
2285    let collapsed = collapse_dashes(&lowered);
2286    let trimmed_raw = collapsed.trim_matches('-').to_string();
2287    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
2288    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
2289        format!("doc-{trimmed_raw}")
2290    } else {
2291        trimmed_raw
2292    };
2293    if trimmed.len() > max_len {
2294        let truncated = trimmed[..max_len].trim_matches('-').to_string();
2295        // GAP-SG-38: warn (not debug) so the operator sees that a derived name
2296        // was cut at the cap and that any collision will be resolved with a
2297        // numeric disambiguation suffix. The pre-truncation form is also
2298        // surfaced per-file via `IngestFileEvent.original_name`.
2299        tracing::warn!(
2300            target: "ingest",
2301            original = %trimmed,
2302            truncated_to = %truncated,
2303            max_len = max_len,
2304            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
2305        );
2306        (truncated, true, Some(trimmed))
2307    } else {
2308        (trimmed, false, None)
2309    }
2310}
2311
2312/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
2313/// numeric suffix (`-1`, `-2`, …) when needed.
2314///
2315/// `taken` is the set of names already consumed in the current ingest run.
2316/// The caller is expected to insert the returned name into `taken` so the
2317/// next call observes the consumption. Cross-run collisions are intentionally
2318/// surfaced by the per-file persistence path as duplicates so re-ingestion
2319/// of identical corpora stays idempotent.
2320///
2321/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
2322/// candidates collide, signalling a pathological corpus that should be
2323/// renamed manually.
2324fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
2325    if !taken.contains(base) {
2326        return Ok(base.to_string());
2327    }
2328    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
2329        let candidate = format!("{base}-{suffix}");
2330        if !taken.contains(&candidate) {
2331            tracing::warn!(
2332                target: "ingest",
2333                base = %base,
2334                resolved = %candidate,
2335                suffix,
2336                "memory name collision resolved with numeric suffix"
2337            );
2338            return Ok(candidate);
2339        }
2340    }
2341    Err(AppError::Validation(format!(
2342        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
2343    )))
2344}
2345
2346fn collapse_dashes(s: &str) -> String {
2347    let mut out = String::with_capacity(s.len());
2348    let mut prev_dash = false;
2349    for c in s.chars() {
2350        if c == '-' {
2351            if !prev_dash {
2352                out.push('-');
2353            }
2354            prev_dash = true;
2355        } else {
2356            out.push(c);
2357            prev_dash = false;
2358        }
2359    }
2360    out
2361}
2362
2363#[cfg(test)]
2364mod tests {
2365    use super::*;
2366    use std::path::PathBuf;
2367
2368    // v1.1.1 (P12): --name-prefix validation and budget arithmetic.
2369    #[test]
2370    fn validate_name_prefix_shrinks_budget_to_fit_name_cap() {
2371        // 80-char cap; a 10-char prefix leaves 70 for the derived part, but
2372        // the caller's budget (60) is smaller, so it wins.
2373        let budget = validate_name_prefix("projx-team", 60).unwrap();
2374        assert_eq!(budget, 60);
2375        // A long prefix shrinks the budget below the caller's 60.
2376        let long_prefix = "p".repeat(75);
2377        let budget = validate_name_prefix(&long_prefix, 60).unwrap();
2378        assert_eq!(budget, 5, "80-char cap minus 75-char prefix leaves 5");
2379    }
2380
2381    #[test]
2382    fn validate_name_prefix_rejects_invalid_slugs() {
2383        for bad in ["", "-lead", "Upper", "has_underscore", "acentuação", "1x"] {
2384            let err = validate_name_prefix(bad, 60).unwrap_err();
2385            assert_eq!(err.exit_code(), 1, "prefix '{bad}' must be Validation");
2386        }
2387    }
2388
2389    #[test]
2390    fn validate_name_prefix_too_long_is_limit_exceeded() {
2391        let huge = "p".repeat(crate::constants::MAX_MEMORY_NAME_LEN);
2392        let err = validate_name_prefix(&huge, 60).unwrap_err();
2393        assert_eq!(err.exit_code(), 6, "prefix >= name cap must be exit 6");
2394        assert!(
2395            err.to_string().contains("MAX_MEMORY_NAME_LEN"),
2396            "obtido: {err}"
2397        );
2398    }
2399
2400    #[test]
2401    fn name_prefix_applies_after_kebab_normalization_and_fits_cap() {
2402        let prefix = "projx-";
2403        let budget = validate_name_prefix(prefix, 60).unwrap();
2404        let (base, _, _) = derive_kebab_name(&PathBuf::from("My File Name.md"), budget);
2405        let final_name = format!("{prefix}{base}");
2406        assert_eq!(final_name, "projx-my-file-name");
2407        assert!(final_name.len() <= crate::constants::MAX_MEMORY_NAME_LEN);
2408        assert!(crate::constants::name_slug_regex().is_match(&final_name));
2409    }
2410
2411    /// GAP-SG-29: `ingest --mode none --resume` is rejected fail-fast by the
2412    /// mode-conditional validator, which `run()` invokes as its very first
2413    /// statement (before any DB/IO). clap 4.6 derive cannot express a
2414    /// value-conditional conflict (`--mode=none` vs `--resume`) without also
2415    /// breaking the valid `--mode claude-code --resume` combo, so the contract
2416    /// is enforced here instead of at the parser layer.
2417    #[test]
2418    fn ingest_mode_none_with_resume_is_rejected() {
2419        use crate::cli::{Cli, Commands};
2420        use clap::Parser;
2421
2422        let none_resume = Cli::try_parse_from([
2423            "sqlite-graphrag",
2424            "ingest",
2425            "./docs",
2426            "--mode",
2427            "none",
2428            "--resume",
2429        ])
2430        .expect("parse succeeds; the conflict is value-conditional");
2431        let args = match none_resume.command {
2432            Some(Commands::Ingest(a)) => a,
2433            other => panic!("expected ingest, got {other:?}"),
2434        };
2435        assert!(
2436            validate_mode_conditional_flags_ingest(&args).is_err(),
2437            "--mode none + --resume must be rejected fail-fast"
2438        );
2439
2440        // The valid LLM-mode combo is NOT rejected.
2441        let claude_resume = Cli::try_parse_from([
2442            "sqlite-graphrag",
2443            "ingest",
2444            "./docs",
2445            "--mode",
2446            "claude-code",
2447            "--resume",
2448        ])
2449        .expect("parse");
2450        let args = match claude_resume.command {
2451            Some(Commands::Ingest(a)) => a,
2452            other => panic!("expected ingest, got {other:?}"),
2453        };
2454        assert!(
2455            validate_mode_conditional_flags_ingest(&args).is_ok(),
2456            "--mode claude-code + --resume is valid and must pass"
2457        );
2458    }
2459
2460    fn setup_ingest_conn() -> Connection {
2461        crate::storage::connection::register_vec_extension();
2462        let mut conn = Connection::open_in_memory().unwrap();
2463        crate::migrations::runner().run(&mut conn).unwrap();
2464        conn
2465    }
2466
2467    fn make_staged(name: &str, body: &str) -> StagedFile {
2468        StagedFile {
2469            body: body.to_string(),
2470            body_hash: blake3::hash(body.as_bytes()).to_hex().to_string(),
2471            snippet: body.chars().take(200).collect(),
2472            name: name.to_string(),
2473            description: "desc".to_string(),
2474            embedding: None,
2475            chunk_embeddings: None,
2476            chunks_info: Vec::new(),
2477            entities: Vec::new(),
2478            relationships: Vec::new(),
2479            entity_embeddings: None,
2480            urls: Vec::new(),
2481            backend_invoked: None,
2482        }
2483    }
2484
2485    // GAP-SG-54: re-ingesting the same name without --force-merge is a duplicate
2486    // (skipped); with --force-merge it updates in place.
2487    #[test]
2488    fn persist_staged_force_merge_updates_existing() {
2489        let mut conn = setup_ingest_conn();
2490
2491        let first = persist_staged(
2492            &mut conn,
2493            "global",
2494            "document",
2495            make_staged("doc-a", "v1"),
2496            false,
2497        )
2498        .expect("create");
2499        assert_eq!(first.action, "created");
2500
2501        // Same name, no force_merge → Duplicate (skip).
2502        let dup = persist_staged(
2503            &mut conn,
2504            "global",
2505            "document",
2506            make_staged("doc-a", "v2-changed"),
2507            false,
2508        );
2509        assert!(matches!(dup, Err(AppError::Duplicate(_))));
2510
2511        // Same name, force_merge → updated, body refreshed.
2512        let upd = persist_staged(
2513            &mut conn,
2514            "global",
2515            "document",
2516            make_staged("doc-a", "v2-changed"),
2517            true,
2518        )
2519        .expect("update");
2520        assert_eq!(upd.action, "updated");
2521        assert_eq!(upd.memory_id, first.memory_id);
2522        let body: String = conn
2523            .query_row(
2524                "SELECT body FROM memories WHERE id = ?1",
2525                rusqlite::params![first.memory_id],
2526                |r| r.get(0),
2527            )
2528            .unwrap();
2529        assert_eq!(body, "v2-changed");
2530    }
2531
2532    // GAP-SG-55: identical body under a divergent name is deduped (skipped).
2533    #[test]
2534    fn persist_staged_dedupes_by_body_hash() {
2535        let mut conn = setup_ingest_conn();
2536        persist_staged(
2537            &mut conn,
2538            "global",
2539            "document",
2540            make_staged("parte-1", "identical content"),
2541            false,
2542        )
2543        .expect("create");
2544
2545        // Divergent derived name, same content → skipped as duplicate.
2546        let res = persist_staged(
2547            &mut conn,
2548            "global",
2549            "document",
2550            make_staged("part-01", "identical content"),
2551            false,
2552        );
2553        match res {
2554            Err(AppError::Duplicate(msg)) => assert!(msg.contains("body_hash")),
2555            other => panic!("expected body_hash dedup duplicate, got {other:?}"),
2556        }
2557        // Only one memory persisted.
2558        let n: i64 = conn
2559            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2560            .unwrap();
2561        assert_eq!(n, 1);
2562    }
2563
2564    // GAP-SG-54: `ingest --force-merge` parses and sets the update flag.
2565    #[test]
2566    fn ingest_force_merge_flag_parses() {
2567        use crate::cli::{Cli, Commands};
2568        use clap::Parser;
2569        let cli = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs", "--force-merge"])
2570            .expect("parse");
2571        match cli.command {
2572            Some(Commands::Ingest(a)) => assert!(a.force_merge),
2573            other => panic!("expected ingest, got {other:?}"),
2574        }
2575        // Default is off.
2576        let cli2 = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs"]).expect("parse");
2577        match cli2.command {
2578            Some(Commands::Ingest(a)) => assert!(!a.force_merge),
2579            other => panic!("expected ingest, got {other:?}"),
2580        }
2581    }
2582
2583    #[test]
2584    fn matches_pattern_suffix() {
2585        assert!(matches_pattern("foo.md", "*.md"));
2586        assert!(!matches_pattern("foo.txt", "*.md"));
2587        assert!(matches_pattern("foo.md", "*"));
2588    }
2589
2590    #[test]
2591    fn matches_pattern_prefix() {
2592        assert!(matches_pattern("README.md", "README*"));
2593        assert!(!matches_pattern("CHANGELOG.md", "README*"));
2594    }
2595
2596    #[test]
2597    fn matches_pattern_exact() {
2598        assert!(matches_pattern("README.md", "README.md"));
2599        assert!(!matches_pattern("readme.md", "README.md"));
2600    }
2601
2602    #[test]
2603    fn derive_kebab_underscore_to_dash() {
2604        let p = PathBuf::from("/tmp/claude_code_headless.md");
2605        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2606        assert_eq!(name, "claude-code-headless");
2607        assert!(!truncated);
2608        assert!(original.is_none());
2609    }
2610
2611    #[test]
2612    fn derive_kebab_uppercase_lowered() {
2613        let p = PathBuf::from("/tmp/README.md");
2614        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2615        assert_eq!(name, "readme");
2616        assert!(!truncated);
2617        assert!(original.is_none());
2618    }
2619
2620    #[test]
2621    fn derive_kebab_strips_non_kebab_chars() {
2622        let p = PathBuf::from("/tmp/some@weird#name!.md");
2623        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2624        assert_eq!(name, "someweirdname");
2625        assert!(!truncated);
2626        assert!(original.is_none());
2627    }
2628
2629    // Bug M-A3: NFD-based unicode normalization preserves base letters of
2630    // accented characters instead of dropping them entirely.
2631    #[test]
2632    fn derive_kebab_folds_accented_letters_to_ascii() {
2633        let p = PathBuf::from("/tmp/açaí.md");
2634        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2635        assert_eq!(name, "acai", "got '{name}'");
2636    }
2637
2638    #[test]
2639    fn derive_kebab_handles_naive_with_diaeresis() {
2640        let p = PathBuf::from("/tmp/naïve-test.md");
2641        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2642        assert_eq!(name, "naive-test", "got '{name}'");
2643    }
2644
2645    #[test]
2646    fn derive_kebab_drops_emoji_keeps_word() {
2647        let p = PathBuf::from("/tmp/🚀-rocket.md");
2648        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2649        assert_eq!(name, "rocket", "got '{name}'");
2650    }
2651
2652    #[test]
2653    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
2654        let p = PathBuf::from("/tmp/açaí🦜.md");
2655        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2656        assert_eq!(name, "acai", "got '{name}'");
2657    }
2658
2659    #[test]
2660    fn derive_kebab_pure_emoji_yields_empty() {
2661        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
2662        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2663        assert!(name.is_empty(), "got '{name}'");
2664    }
2665
2666    #[test]
2667    fn derive_kebab_collapses_consecutive_dashes() {
2668        let p = PathBuf::from("/tmp/a__b___c.md");
2669        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2670        assert_eq!(name, "a-b-c");
2671        assert!(!truncated);
2672        assert!(original.is_none());
2673    }
2674
2675    #[test]
2676    fn derive_kebab_truncates_to_60_chars() {
2677        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2678        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2679        assert!(name.len() <= 60, "got len {}", name.len());
2680        assert!(truncated);
2681        assert!(original.is_some());
2682        assert!(original.unwrap().len() > 60);
2683    }
2684
2685    #[test]
2686    fn collect_files_finds_md_files() {
2687        let tmp = tempfile::tempdir().expect("tempdir");
2688        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2689        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2690        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2691        let mut out = Vec::new();
2692        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2693        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2694    }
2695
2696    #[test]
2697    fn collect_files_recursive_descends_subdirs() {
2698        let tmp = tempfile::tempdir().expect("tempdir");
2699        let sub = tmp.path().join("sub");
2700        std::fs::create_dir(&sub).unwrap();
2701        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2702        std::fs::write(sub.join("b.md"), "y").unwrap();
2703        let mut out = Vec::new();
2704        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2705        assert_eq!(out.len(), 2);
2706    }
2707
2708    #[test]
2709    fn collect_files_non_recursive_skips_subdirs() {
2710        let tmp = tempfile::tempdir().expect("tempdir");
2711        let sub = tmp.path().join("sub");
2712        std::fs::create_dir(&sub).unwrap();
2713        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2714        std::fs::write(sub.join("b.md"), "y").unwrap();
2715        let mut out = Vec::new();
2716        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2717        assert_eq!(out.len(), 1);
2718    }
2719
2720    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
2721
2722    #[test]
2723    fn derive_kebab_long_basename_truncated_within_cap() {
2724        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2725        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2726        assert!(
2727            name.len() <= DERIVED_NAME_MAX_LEN,
2728            "truncated name must respect cap; got {} chars",
2729            name.len()
2730        );
2731        assert!(!name.is_empty());
2732        assert!(truncated);
2733        assert!(original.is_some());
2734    }
2735
2736    #[test]
2737    fn unique_name_returns_base_when_free() {
2738        let taken: BTreeSet<String> = BTreeSet::new();
2739        let resolved = unique_name("note", &taken).expect("must resolve");
2740        assert_eq!(resolved, "note");
2741    }
2742
2743    #[test]
2744    fn unique_name_appends_first_free_suffix_on_collision() {
2745        let mut taken: BTreeSet<String> = BTreeSet::new();
2746        taken.insert("note".to_string());
2747        taken.insert("note-1".to_string());
2748        let resolved = unique_name("note", &taken).expect("must resolve");
2749        assert_eq!(resolved, "note-2");
2750    }
2751
2752    #[test]
2753    fn unique_name_errors_after_collision_cap() {
2754        let mut taken: BTreeSet<String> = BTreeSet::new();
2755        taken.insert("note".to_string());
2756        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2757            taken.insert(format!("note-{i}"));
2758        }
2759        let err = unique_name("note", &taken).expect_err("must surface error");
2760        assert!(matches!(err, AppError::Validation(_)));
2761    }
2762
2763    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
2764
2765    #[test]
2766    fn validate_relation_format_accepts_valid_relations() {
2767        use crate::parsers::{is_canonical_relation, validate_relation_format};
2768        assert!(validate_relation_format("applies_to").is_ok());
2769        assert!(validate_relation_format("depends_on").is_ok());
2770        assert!(validate_relation_format("implements").is_ok());
2771        assert!(validate_relation_format("").is_err());
2772        assert!(is_canonical_relation("applies_to"));
2773        assert!(!is_canonical_relation("implements"));
2774    }
2775
2776    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
2777
2778    use serial_test::serial;
2779
2780    /// Helper: scrubs the env var around a closure to keep tests deterministic.
2781    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2782        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2783        let prev = std::env::var(key).ok();
2784        match value {
2785            Some(v) => std::env::set_var(key, v),
2786            None => std::env::remove_var(key),
2787        }
2788        f();
2789        match prev {
2790            Some(p) => std::env::set_var(key, p),
2791            None => std::env::remove_var(key),
2792        }
2793    }
2794
2795    #[test]
2796    #[serial]
2797    fn env_low_memory_enabled_unset_returns_false() {
2798        with_env_var(None, || assert!(!env_low_memory_enabled()));
2799    }
2800
2801    #[test]
2802    #[serial]
2803    fn env_low_memory_enabled_empty_returns_false() {
2804        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2805    }
2806
2807    #[test]
2808    #[serial]
2809    fn env_low_memory_enabled_truthy_values_return_true() {
2810        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2811            with_env_var(Some(v), || {
2812                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2813            });
2814        }
2815    }
2816
2817    #[test]
2818    #[serial]
2819    fn env_low_memory_enabled_falsy_values_return_false() {
2820        for v in ["0", "false", "FALSE", "no", "off"] {
2821            with_env_var(Some(v), || {
2822                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2823            });
2824        }
2825    }
2826
2827    #[test]
2828    #[serial]
2829    fn env_low_memory_enabled_unrecognized_value_returns_false() {
2830        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2831    }
2832
2833    #[test]
2834    #[serial]
2835    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2836        with_env_var(None, || {
2837            assert_eq!(resolve_parallelism(true, Some(4)), 1);
2838            assert_eq!(resolve_parallelism(true, Some(8)), 1);
2839            assert_eq!(resolve_parallelism(true, None), 1);
2840        });
2841    }
2842
2843    #[test]
2844    #[serial]
2845    fn resolve_parallelism_env_forces_one_when_flag_off() {
2846        with_env_var(Some("1"), || {
2847            assert_eq!(resolve_parallelism(false, Some(4)), 1);
2848            assert_eq!(resolve_parallelism(false, None), 1);
2849        });
2850    }
2851
2852    #[test]
2853    #[serial]
2854    fn resolve_parallelism_falsy_env_does_not_override() {
2855        with_env_var(Some("0"), || {
2856            assert_eq!(resolve_parallelism(false, Some(4)), 4);
2857        });
2858    }
2859
2860    #[test]
2861    #[serial]
2862    fn resolve_parallelism_explicit_value_when_low_memory_off() {
2863        with_env_var(None, || {
2864            assert_eq!(resolve_parallelism(false, Some(3)), 3);
2865            assert_eq!(resolve_parallelism(false, Some(1)), 1);
2866        });
2867    }
2868
2869    #[test]
2870    #[serial]
2871    fn resolve_parallelism_default_when_unset() {
2872        with_env_var(None, || {
2873            let p = resolve_parallelism(false, None);
2874            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2875        });
2876    }
2877
2878    #[test]
2879    fn ingest_args_parses_low_memory_flag_via_clap() {
2880        use clap::Parser;
2881        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
2882        // on the public `Cli` definition so the flag is wired end-to-end.
2883        let cli = crate::cli::Cli::try_parse_from([
2884            "sqlite-graphrag",
2885            "ingest",
2886            "/tmp/dummy",
2887            "--type",
2888            "document",
2889            "--low-memory",
2890        ])
2891        .expect("parse must succeed");
2892        match cli.command {
2893            Some(crate::cli::Commands::Ingest(args)) => {
2894                assert!(args.low_memory, "--low-memory must set field to true");
2895            }
2896            _ => panic!("expected Ingest subcommand"),
2897        }
2898    }
2899
2900    #[test]
2901    fn ingest_args_low_memory_defaults_false() {
2902        use clap::Parser;
2903        let cli = crate::cli::Cli::try_parse_from([
2904            "sqlite-graphrag",
2905            "ingest",
2906            "/tmp/dummy",
2907            "--type",
2908            "document",
2909        ])
2910        .expect("parse must succeed");
2911        match cli.command {
2912            Some(crate::cli::Commands::Ingest(args)) => {
2913                assert!(!args.low_memory, "default must be false");
2914            }
2915            _ => panic!("expected Ingest subcommand"),
2916        }
2917    }
2918
2919    // ── GAP-SG-06: --dry-run reports chunk and token counts ──
2920
2921    #[test]
2922    fn dry_run_budget_event_serializes_chunk_and_token_counts() {
2923        let ev = IngestDryRunBudget {
2924            budget: true,
2925            file: "/tmp/doc.md",
2926            name: "doc",
2927            bytes: 1234,
2928            chunk_count: 3,
2929            token_count: 567,
2930            partition_count: 1,
2931            exceeds_limits: false,
2932        };
2933        let json = serde_json::to_string(&ev).expect("serialize budget event");
2934        assert!(json.contains("\"chunk_count\":3"), "got: {json}");
2935        assert!(json.contains("\"token_count\":567"), "got: {json}");
2936        assert!(json.contains("\"partition_count\":1"), "got: {json}");
2937        assert!(json.contains("\"exceeds_limits\":false"), "got: {json}");
2938    }
2939
2940    #[test]
2941    fn assess_body_budget_feeds_dry_run_with_positive_counts() {
2942        // The dry-run path feeds chunking::assess_body_budget; a representative
2943        // body must report a positive chunk and token count.
2944        let body = "# Title\n\nsome representative body text for the budget.";
2945        let budget = chunking::assess_body_budget(body);
2946        assert!(budget.chunk_count >= 1);
2947        assert!(budget.approx_tokens >= 1);
2948        assert_eq!(budget.partition_count, 1);
2949    }
2950}