Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76AUTHENTICATION:\n  \
77    --mode claude-code: Uses existing Claude Code authentication.\n  \
78      OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n  \
79      API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n  \
80    --mode codex: Uses existing Codex CLI authentication.\n  \
81      Device auth: run `codex auth login` first\n  \
82      API key: set OPENAI_API_KEY (optional)\n\n  \
83NOTES:\n  \
84    Each file becomes a separate memory. Names derive from file basenames\n  \
85    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
86    followed by a final summary line with counts. Per-file errors are reported\n  \
87    inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89    /// Directory containing files to ingest.
90    #[arg(
91        value_name = "DIR",
92        help = "Directory to ingest recursively (each matching file becomes a memory)"
93    )]
94    pub dir: PathBuf,
95
96    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
97    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98    pub r#type: MemoryType,
99
100    /// Glob pattern matched against file basenames (default: `*.md`). Supports
101    /// `*.<ext>`, `<prefix>*`, and exact filename match.
102    #[arg(long, default_value = "*.md")]
103    pub pattern: String,
104
105    /// Recurse into subdirectories.
106    #[arg(long, default_value_t = false)]
107    pub recursive: bool,
108
109    #[arg(
110        long,
111        env = "SQLITE_GRAPHRAG_ENABLE_NER",
112        value_parser = crate::parsers::parse_bool_flexible,
113        action = clap::ArgAction::Set,
114        num_args = 0..=1,
115        default_missing_value = "true",
116        default_value = "false",
117        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
118    )]
119    pub enable_ner: bool,
120    #[arg(
121        long,
122        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123        default_value = "fp32",
124        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
125    )]
126    pub gliner_variant: String,
127
128    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
129    #[arg(long, default_value_t = false, hide = true)]
130    pub skip_extraction: bool,
131
132    /// Stop on first per-file error instead of continuing with the next file.
133    #[arg(long, default_value_t = false)]
134    pub fail_fast: bool,
135
136    /// Preview file-to-name mapping without loading model or persisting.
137    #[arg(long, default_value_t = false)]
138    pub dry_run: bool,
139
140    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
141    #[arg(long, default_value_t = 10_000)]
142    pub max_files: usize,
143
144    /// Namespace for the ingested memories.
145    #[arg(long)]
146    pub namespace: Option<String>,
147
148    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
149    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150    pub db: Option<String>,
151
152    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153    pub format: JsonOutputFormat,
154
155    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156    pub json: bool,
157
158    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
159    #[arg(
160        long,
161        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162    )]
163    pub ingest_parallelism: Option<usize>,
164
165    /// Force single-threaded ingest to reduce RSS pressure.
166    ///
167    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
168    /// explicit value. Recommended for environments with <4 GB available
169    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
170    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
171    /// (CLI flag has higher precedence than the env var).
172    #[arg(
173        long,
174        default_value_t = false,
175        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176                Recommended for environments with <4 GB available RAM or container/cgroup \
177                constraints. Trade-off: 3-4x longer wall time. Also honored via \
178                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179    )]
180    pub low_memory: bool,
181
182    /// Maximum process RSS in MiB; abort if exceeded during embedding.
183    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185    pub max_rss_mb: u64,
186
187    /// Maximum character length for derived memory names from file basenames.
188    ///
189    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
190    /// Shorter values leave more headroom for collision suffix resolution.
191    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
192          help = "Maximum length for derived memory names (default: 60)")]
193    pub max_name_length: usize,
194
195    /// Extraction mode: `none` (body-only, default), `gliner` (NER), or `claude-code` (LLM-curated via Claude Code CLI).
196    #[arg(long, value_enum, default_value_t = IngestMode::None)]
197    pub mode: IngestMode,
198
199    /// Explicit path to the Claude Code binary (only with --mode claude-code).
200    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
201    pub claude_binary: Option<std::path::PathBuf>,
202
203    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
204    #[arg(long)]
205    pub claude_model: Option<String>,
206
207    /// Resume a previously interrupted claude-code ingest from the queue DB.
208    #[arg(long, default_value_t = false)]
209    pub resume: bool,
210
211    /// Retry only failed files from a previous claude-code ingest.
212    #[arg(long, default_value_t = false)]
213    pub retry_failed: bool,
214
215    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
216    #[arg(long, default_value_t = false)]
217    pub keep_queue: bool,
218
219    /// Custom path for the claude-code ingest queue database.
220    #[arg(long, default_value = ".ingest-queue.sqlite")]
221    pub queue_db: String,
222
223    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
224    #[arg(long, default_value_t = 60)]
225    pub rate_limit_wait: u64,
226
227    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
228    #[arg(long)]
229    pub max_cost_usd: Option<f64>,
230
231    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
232    #[arg(
233        long,
234        default_value_t = 300,
235        help = "Timeout in seconds for each claude -p invocation (default: 300)"
236    )]
237    pub claude_timeout: u64,
238
239    /// Explicit path to the Codex CLI binary (only with --mode codex).
240    #[arg(
241        long,
242        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
243        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
244    )]
245    pub codex_binary: Option<PathBuf>,
246
247    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
248    #[arg(
249        long,
250        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
251    )]
252    pub codex_model: Option<String>,
253
254    /// Timeout in seconds for each codex exec invocation.
255    #[arg(
256        long,
257        default_value_t = 300,
258        help = "Timeout in seconds for each codex exec invocation (default: 300)"
259    )]
260    pub codex_timeout: u64,
261}
262
263/// Extraction mode for the ingest pipeline.
264#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
265pub enum IngestMode {
266    /// Body-only ingestion without entity/relationship extraction (default).
267    None,
268    /// GLiNER zero-shot NER extraction (requires --enable-ner).
269    Gliner,
270    /// LLM-curated extraction via locally installed Claude Code CLI.
271    ClaudeCode,
272    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
273    Codex,
274}
275
276/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
277/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
278/// values evaluate to false. Unrecognized non-empty values emit a
279/// `tracing::warn!` and evaluate to false.
280fn env_low_memory_enabled() -> bool {
281    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
282        Ok(v) if v.is_empty() => false,
283        Ok(v) => match v.to_lowercase().as_str() {
284            "1" | "true" | "yes" | "on" => true,
285            "0" | "false" | "no" | "off" => false,
286            other => {
287                tracing::warn!(
288                    target: "ingest",
289                    value = %other,
290                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
291                );
292                false
293            }
294        },
295        Err(_) => false,
296    }
297}
298
299/// Resolves the effective ingest parallelism honoring `--low-memory` and the
300/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
301///
302/// Precedence:
303/// 1. `--low-memory` CLI flag forces parallelism = 1.
304/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
305/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
306/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
307///
308/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
309/// emits a `tracing::warn!` advertising the override.
310fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
311    let env_flag = env_low_memory_enabled();
312    let low_memory = low_memory_flag || env_flag;
313
314    if low_memory {
315        if let Some(n) = ingest_parallelism {
316            if n > 1 {
317                tracing::warn!(
318                    target: "ingest",
319                    requested = n,
320                    "--ingest-parallelism overridden by --low-memory; using 1"
321                );
322            }
323        }
324        if low_memory_flag {
325            tracing::info!(
326                target: "ingest",
327                source = "flag",
328                "low-memory mode enabled: forcing --ingest-parallelism 1"
329            );
330        } else {
331            tracing::info!(
332                target: "ingest",
333                source = "env",
334                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
335            );
336        }
337        return 1;
338    }
339
340    ingest_parallelism
341        .unwrap_or_else(|| {
342            std::thread::available_parallelism()
343                .map(|v| v.get() / 2)
344                .unwrap_or(1)
345                .clamp(1, 4)
346        })
347        .max(1)
348}
349
350#[derive(Serialize)]
351struct IngestFileEvent<'a> {
352    file: &'a str,
353    name: &'a str,
354    status: &'a str,
355    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
356    truncated: bool,
357    /// Original derived name before truncation; only present when `truncated=true`.
358    #[serde(skip_serializing_if = "Option::is_none")]
359    original_name: Option<String>,
360    /// Original file basename (without extension); only present when it differs from `name`.
361    #[serde(skip_serializing_if = "Option::is_none")]
362    original_filename: Option<&'a str>,
363    #[serde(skip_serializing_if = "Option::is_none")]
364    error: Option<String>,
365    #[serde(skip_serializing_if = "Option::is_none")]
366    memory_id: Option<i64>,
367    #[serde(skip_serializing_if = "Option::is_none")]
368    action: Option<String>,
369    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
370    body_length: usize,
371}
372
373#[derive(Serialize)]
374struct IngestSummary {
375    summary: bool,
376    dir: String,
377    pattern: String,
378    recursive: bool,
379    files_total: usize,
380    files_succeeded: usize,
381    files_failed: usize,
382    files_skipped: usize,
383    elapsed_ms: u64,
384}
385
386/// Outcome of a successful per-file ingest, used to build the NDJSON event.
387struct FileSuccess {
388    memory_id: i64,
389    action: String,
390    body_length: usize,
391}
392
393/// NDJSON progress event emitted to stderr after each file completes Phase A.
394/// Schema version 1; consumers should check `schema_version` before parsing.
395#[derive(Serialize)]
396struct StageProgressEvent<'a> {
397    schema_version: u8,
398    event: &'a str,
399    path: &'a str,
400    ms: u64,
401    entities: usize,
402    relationships: usize,
403}
404
405/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
406/// Phase B persists these to SQLite on the main thread in submission order.
407struct StagedFile {
408    body: String,
409    body_hash: String,
410    snippet: String,
411    name: String,
412    description: String,
413    embedding: Vec<f32>,
414    chunk_embeddings: Option<Vec<Vec<f32>>>,
415    chunks_info: Vec<crate::chunking::Chunk>,
416    entities: Vec<NewEntity>,
417    relationships: Vec<NewRelationship>,
418    entity_embeddings: Vec<Vec<f32>>,
419    urls: Vec<crate::extraction::ExtractedUrl>,
420}
421
422/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
423/// Never touches the database — safe to run on any rayon thread.
424fn stage_file(
425    _idx: usize,
426    path: &Path,
427    name: &str,
428    paths: &AppPaths,
429    enable_ner: bool,
430    gliner_variant: crate::extraction::GlinerVariant,
431    max_rss_mb: u64,
432) -> Result<StagedFile, AppError> {
433    use crate::constants::*;
434
435    if name.len() > MAX_MEMORY_NAME_LEN {
436        return Err(AppError::LimitExceeded(
437            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
438        ));
439    }
440    if name.starts_with("__") {
441        return Err(AppError::Validation(
442            crate::i18n::validation::reserved_name(),
443        ));
444    }
445    {
446        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
447            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
448        if !slug_re.is_match(name) {
449            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
450                name,
451            )));
452        }
453    }
454
455    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
456    if raw_body.len() > MAX_MEMORY_BODY_LEN {
457        return Err(AppError::LimitExceeded(
458            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
459        ));
460    }
461    if raw_body.trim().is_empty() {
462        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
463    }
464
465    let description = format!("ingested from {}", path.display());
466    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
467        return Err(AppError::Validation(
468            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
469        ));
470    }
471
472    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
473    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
474    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
475    if enable_ner {
476        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
477            Ok(extracted) => {
478                extracted_urls = extracted.urls;
479                extracted_entities = extracted.entities;
480                extracted_relationships = extracted.relationships;
481
482                if extracted_entities.len() > max_entities_per_memory() {
483                    extracted_entities.truncate(max_entities_per_memory());
484                }
485                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
486                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
487                }
488            }
489            Err(e) => {
490                tracing::warn!(
491                    file = %path.display(),
492                    "auto-extraction failed (graceful degradation): {e:#}"
493                );
494            }
495        }
496    }
497
498    for rel in &mut extracted_relationships {
499        rel.relation = crate::parsers::normalize_relation(&rel.relation);
500        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
501            return Err(AppError::Validation(format!(
502                "{e} for relationship '{}' -> '{}'",
503                rel.source, rel.target
504            )));
505        }
506        crate::parsers::warn_if_non_canonical(&rel.relation);
507        if !(0.0..=1.0).contains(&rel.strength) {
508            return Err(AppError::Validation(format!(
509                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
510                rel.strength, rel.source, rel.target
511            )));
512        }
513    }
514
515    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
516    let snippet: String = raw_body.chars().take(200).collect();
517
518    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
519    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
520    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
521        return Err(AppError::LimitExceeded(format!(
522            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
523            chunks_info.len(),
524            REMEMBER_MAX_SAFE_MULTI_CHUNKS
525        )));
526    }
527
528    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
529    let embedding = if chunks_info.len() == 1 {
530        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
531    } else {
532        let chunk_texts: Vec<&str> = chunks_info
533            .iter()
534            .map(|c| chunking::chunk_text(&raw_body, c))
535            .collect();
536        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
537        for chunk_text in &chunk_texts {
538            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
539                if rss > max_rss_mb {
540                    tracing::error!(
541                        rss_mb = rss,
542                        max_rss_mb = max_rss_mb,
543                        file = %path.display(),
544                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
545                    );
546                    return Err(AppError::LowMemory {
547                        available_mb: crate::memory_guard::available_memory_mb(),
548                        required_mb: max_rss_mb,
549                    });
550                }
551            }
552            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
553                &paths.models,
554                chunk_text,
555            )?);
556        }
557        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
558        chunk_embeddings_opt = Some(chunk_embeddings);
559        aggregated
560    };
561
562    let entity_embeddings = extracted_entities
563        .iter()
564        .map(|entity| {
565            let entity_text = match &entity.description {
566                Some(desc) => format!("{} {}", entity.name, desc),
567                None => entity.name.clone(),
568            };
569            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
570        })
571        .collect::<Result<Vec<_>, _>>()?;
572
573    Ok(StagedFile {
574        body: raw_body,
575        body_hash,
576        snippet,
577        name: name.to_string(),
578        description,
579        embedding,
580        chunk_embeddings: chunk_embeddings_opt,
581        chunks_info,
582        entities: extracted_entities,
583        relationships: extracted_relationships,
584        entity_embeddings,
585        urls: extracted_urls,
586    })
587}
588
589/// Phase B: persists one `StagedFile` to the database on the main thread.
590fn persist_staged(
591    conn: &mut Connection,
592    namespace: &str,
593    memory_type: &str,
594    staged: StagedFile,
595) -> Result<FileSuccess, AppError> {
596    {
597        let active_count: u32 = conn.query_row(
598            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
599            [],
600            |r| r.get::<_, i64>(0).map(|v| v as u32),
601        )?;
602        let ns_exists: bool = conn.query_row(
603            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
604            rusqlite::params![namespace],
605            |r| r.get::<_, i64>(0).map(|v| v > 0),
606        )?;
607        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
608            return Err(AppError::NamespaceError(format!(
609                "active namespace limit of {} exceeded while creating '{namespace}'",
610                crate::constants::MAX_NAMESPACES_ACTIVE
611            )));
612        }
613    }
614
615    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
616    if existing_memory.is_some() {
617        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
618            &staged.name,
619            namespace,
620        )));
621    }
622    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
623
624    let new_memory = NewMemory {
625        namespace: namespace.to_string(),
626        name: staged.name.clone(),
627        memory_type: memory_type.to_string(),
628        description: staged.description.clone(),
629        body: staged.body,
630        body_hash: staged.body_hash,
631        session_id: None,
632        source: "agent".to_string(),
633        metadata: serde_json::json!({}),
634    };
635
636    if let Some(hash_id) = duplicate_hash_id {
637        tracing::debug!(
638            target: "ingest",
639            duplicate_memory_id = hash_id,
640            "identical body already exists; persisting a new memory anyway"
641        );
642    }
643
644    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
645
646    let memory_id = memories::insert(&tx, &new_memory)?;
647    versions::insert_version(
648        &tx,
649        memory_id,
650        1,
651        &staged.name,
652        memory_type,
653        &staged.description,
654        &new_memory.body,
655        &serde_json::to_string(&new_memory.metadata)?,
656        None,
657        "create",
658    )?;
659    memories::upsert_vec(
660        &tx,
661        memory_id,
662        namespace,
663        memory_type,
664        &staged.embedding,
665        &staged.name,
666        &staged.snippet,
667    )?;
668
669    if staged.chunks_info.len() > 1 {
670        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
671        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
672            AppError::Internal(anyhow::anyhow!(
673                "missing chunk embeddings cache on multi-chunk ingest path"
674            ))
675        })?;
676        for (i, emb) in chunk_embeddings.iter().enumerate() {
677            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
678        }
679    }
680
681    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
682        for (idx, entity) in staged.entities.iter().enumerate() {
683            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
684            let entity_embedding = &staged.entity_embeddings[idx];
685            entities::upsert_entity_vec(
686                &tx,
687                entity_id,
688                namespace,
689                entity.entity_type,
690                entity_embedding,
691                &entity.name,
692            )?;
693            entities::link_memory_entity(&tx, memory_id, entity_id)?;
694            entities::increment_degree(&tx, entity_id)?;
695        }
696        let entity_types: std::collections::HashMap<&str, EntityType> = staged
697            .entities
698            .iter()
699            .map(|entity| (entity.name.as_str(), entity.entity_type))
700            .collect();
701        for rel in &staged.relationships {
702            let source_entity = NewEntity {
703                name: rel.source.clone(),
704                entity_type: entity_types
705                    .get(rel.source.as_str())
706                    .copied()
707                    .unwrap_or(EntityType::Concept),
708                description: None,
709            };
710            let target_entity = NewEntity {
711                name: rel.target.clone(),
712                entity_type: entity_types
713                    .get(rel.target.as_str())
714                    .copied()
715                    .unwrap_or(EntityType::Concept),
716                description: None,
717            };
718            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
719            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
720            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
721            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
722        }
723    }
724
725    tx.commit()?;
726
727    if !staged.urls.is_empty() {
728        let url_entries: Vec<storage_urls::MemoryUrl> = staged
729            .urls
730            .into_iter()
731            .map(|u| storage_urls::MemoryUrl {
732                url: u.url,
733                offset: Some(u.offset as i64),
734            })
735            .collect();
736        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
737    }
738
739    Ok(FileSuccess {
740        memory_id,
741        action: "created".to_string(),
742        body_length: new_memory.body.len(),
743    })
744}
745
746pub fn run(args: IngestArgs) -> Result<(), AppError> {
747    if args.mode == IngestMode::ClaudeCode {
748        return super::ingest_claude::run_claude_ingest(&args);
749    }
750    if args.mode == IngestMode::Codex {
751        return super::ingest_codex::run_codex_ingest(&args);
752    }
753
754    let started = std::time::Instant::now();
755
756    if !args.dir.exists() {
757        return Err(AppError::Validation(format!(
758            "directory not found: {}",
759            args.dir.display()
760        )));
761    }
762    if !args.dir.is_dir() {
763        return Err(AppError::Validation(format!(
764            "path is not a directory: {}",
765            args.dir.display()
766        )));
767    }
768
769    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
770    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
771    files.sort();
772
773    if files.len() > args.max_files {
774        return Err(AppError::Validation(format!(
775            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
776            files.len(),
777            args.max_files
778        )));
779    }
780
781    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
782    let memory_type_str = args.r#type.as_str().to_string();
783
784    let paths = AppPaths::resolve(args.db.as_deref())?;
785    let mut conn_or_err = match init_storage(&paths) {
786        Ok(c) => Ok(c),
787        Err(e) => Err(format!("{e}")),
788    };
789
790    let mut succeeded: usize = 0;
791    let mut failed: usize = 0;
792    let mut skipped: usize = 0;
793    let total = files.len();
794
795    // Pre-resolve all names before parallelisation so Phase A workers see a
796    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
797    let mut taken_names: BTreeSet<String> = BTreeSet::new();
798
799    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
800    // ProcessItem: the data moved into the producer thread for Phase A computation.
801    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
802    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
803    // boundary into the rayon producer.
804    enum SlotMeta {
805        Skip {
806            file_str: String,
807            derived_base: String,
808            name_truncated: bool,
809            original_name: Option<String>,
810            original_filename: Option<String>,
811            reason: String,
812        },
813        Process {
814            file_str: String,
815            derived_name: String,
816            name_truncated: bool,
817            original_name: Option<String>,
818            original_filename: Option<String>,
819        },
820    }
821
822    struct ProcessItem {
823        idx: usize,
824        path: PathBuf,
825        file_str: String,
826        derived_name: String,
827    }
828
829    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
830    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
831    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
832
833    let max_name_length = args.max_name_length;
834    for path in &files {
835        let file_str = path.to_string_lossy().into_owned();
836        let (derived_base, name_truncated, original_name) =
837            derive_kebab_name(path, max_name_length);
838        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
839
840        if name_truncated {
841            if let Some(ref orig) = original_name {
842                truncations.push((orig.clone(), derived_base.clone()));
843            }
844        }
845
846        if derived_base.is_empty() {
847            // original_filename: always include when it differs from the empty derived name
848            let orig_filename = if !original_basename.is_empty() {
849                Some(original_basename.to_string())
850            } else {
851                None
852            };
853            slots_meta.push(SlotMeta::Skip {
854                file_str,
855                derived_base: String::new(),
856                name_truncated: false,
857                original_name: None,
858                original_filename: orig_filename,
859                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
860            });
861            continue;
862        }
863
864        match unique_name(&derived_base, &taken_names) {
865            Ok(derived_name) => {
866                taken_names.insert(derived_name.clone());
867                let idx = slots_meta.len();
868                // original_filename: present only when the raw basename differs from the derived name
869                let orig_filename = if original_basename != derived_name {
870                    Some(original_basename.to_string())
871                } else {
872                    None
873                };
874                process_items.push(ProcessItem {
875                    idx,
876                    path: path.clone(),
877                    file_str: file_str.clone(),
878                    derived_name: derived_name.clone(),
879                });
880                slots_meta.push(SlotMeta::Process {
881                    file_str,
882                    derived_name,
883                    name_truncated,
884                    original_name,
885                    original_filename: orig_filename,
886                });
887            }
888            Err(e) => {
889                let orig_filename = if original_basename != derived_base {
890                    Some(original_basename.to_string())
891                } else {
892                    None
893                };
894                slots_meta.push(SlotMeta::Skip {
895                    file_str,
896                    derived_base,
897                    name_truncated,
898                    original_name,
899                    original_filename: orig_filename,
900                    reason: e.to_string(),
901                });
902            }
903        }
904    }
905
906    if !truncations.is_empty() {
907        tracing::info!(
908            target: "ingest",
909            count = truncations.len(),
910            max_name_length = max_name_length,
911            max_len = DERIVED_NAME_MAX_LEN,
912            "derived names truncated; pass -vv (debug) for per-file detail"
913        );
914    }
915
916    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
917    if args.dry_run {
918        for meta in &slots_meta {
919            match meta {
920                SlotMeta::Skip {
921                    file_str,
922                    derived_base,
923                    name_truncated,
924                    original_name,
925                    original_filename,
926                    reason,
927                } => {
928                    output::emit_json_compact(&IngestFileEvent {
929                        file: file_str,
930                        name: derived_base,
931                        status: "skip",
932                        truncated: *name_truncated,
933                        original_name: original_name.clone(),
934                        original_filename: original_filename.as_deref(),
935                        error: Some(reason.clone()),
936                        memory_id: None,
937                        action: None,
938                        body_length: 0,
939                    })?;
940                }
941                SlotMeta::Process {
942                    file_str,
943                    derived_name,
944                    name_truncated,
945                    original_name,
946                    original_filename,
947                } => {
948                    output::emit_json_compact(&IngestFileEvent {
949                        file: file_str,
950                        name: derived_name,
951                        status: "preview",
952                        truncated: *name_truncated,
953                        original_name: original_name.clone(),
954                        original_filename: original_filename.as_deref(),
955                        error: None,
956                        memory_id: None,
957                        action: None,
958                        body_length: 0,
959                    })?;
960                }
961            }
962        }
963        output::emit_json_compact(&IngestSummary {
964            summary: true,
965            dir: args.dir.to_string_lossy().into_owned(),
966            pattern: args.pattern.clone(),
967            recursive: args.recursive,
968            files_total: total,
969            files_succeeded: 0,
970            files_failed: 0,
971            files_skipped: 0,
972            elapsed_ms: started.elapsed().as_millis() as u64,
973        })?;
974        return Ok(());
975    }
976
977    // Determine rayon thread pool size, honoring --low-memory and the
978    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
979    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
980
981    let pool = rayon::ThreadPoolBuilder::new()
982        .num_threads(parallelism)
983        .build()
984        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
985
986    if args.enable_ner && args.skip_extraction {
987        tracing::warn!(
988            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
989        );
990    }
991    if args.skip_extraction && !args.enable_ner {
992        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
993    }
994    let enable_ner = args.enable_ner;
995    let max_rss_mb = args.max_rss_mb;
996    let gliner_variant: crate::extraction::GlinerVariant =
997        args.gliner_variant.parse().unwrap_or_else(|e| {
998            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
999            crate::extraction::GlinerVariant::Fp32
1000        });
1001
1002    let total_to_process = process_items.len();
1003    tracing::info!(
1004        target = "ingest",
1005        phase = "pipeline_start",
1006        files = total_to_process,
1007        ingest_parallelism = parallelism,
1008        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1009    );
1010
1011    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1012    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1013    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1014    let channel_bound = (parallelism * 2).max(1);
1015    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1016
1017    // Phase A: launched in a dedicated OS thread so the main thread can consume
1018    // the channel concurrently. pool.install() blocks the calling thread until
1019    // all rayon workers finish — if called on the main thread it would
1020    // reintroduce the 2-phase blocking behaviour we are eliminating.
1021    let paths_owned = paths.clone();
1022    let producer_handle = std::thread::spawn(move || {
1023        pool.install(|| {
1024            process_items.into_par_iter().for_each(|item| {
1025                let t0 = std::time::Instant::now();
1026                let result = stage_file(
1027                    item.idx,
1028                    &item.path,
1029                    &item.derived_name,
1030                    &paths_owned,
1031                    enable_ner,
1032                    gliner_variant,
1033                    max_rss_mb,
1034                );
1035                let elapsed_ms = t0.elapsed().as_millis() as u64;
1036
1037                // Emit NDJSON progress event to stderr so the user sees work
1038                // happening during long NER runs (e.g. 50 files × 27s each).
1039                let (n_entities, n_relationships) = match &result {
1040                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1041                    Err(_) => (0, 0),
1042                };
1043                let progress = StageProgressEvent {
1044                    schema_version: 1,
1045                    event: "file_extracted",
1046                    path: &item.file_str,
1047                    ms: elapsed_ms,
1048                    entities: n_entities,
1049                    relationships: n_relationships,
1050                };
1051                if let Ok(line) = serde_json::to_string(&progress) {
1052                    eprintln!("{line}");
1053                }
1054
1055                // Blocking send applies backpressure: if Phase B is slower,
1056                // Phase A workers wait here instead of accumulating staged files
1057                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1058                let _ = tx.send((item.idx, result));
1059            });
1060            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1061            drop(tx);
1062        });
1063    });
1064
1065    // Phase B: main thread persists files as results arrive from the channel.
1066    // Results arrive in completion order (par_iter is unordered). We persist
1067    // each file immediately on arrival — this is the key fix for B1: with the
1068    // old 2-phase design the first DB write happened only after ALL files had
1069    // finished Phase A. Now the first commit happens as soon as the first file
1070    // completes Phase A, regardless of how many files remain.
1071    //
1072    // NDJSON output order follows completion order (not file-system sort order).
1073    // Skip slots are emitted at the end, after all Process results are consumed.
1074    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1075    // requirement than ensuring data is persisted before the user's timeout fires.
1076    let fail_fast = args.fail_fast;
1077
1078    // Emit pending Skip events first so agents see them early.
1079    for meta in &slots_meta {
1080        if let SlotMeta::Skip {
1081            file_str,
1082            derived_base,
1083            name_truncated,
1084            original_name,
1085            original_filename,
1086            reason,
1087        } = meta
1088        {
1089            output::emit_json_compact(&IngestFileEvent {
1090                file: file_str,
1091                name: derived_base,
1092                status: "skipped",
1093                truncated: *name_truncated,
1094                original_name: original_name.clone(),
1095                original_filename: original_filename.as_deref(),
1096                error: Some(reason.clone()),
1097                memory_id: None,
1098                action: None,
1099                body_length: 0,
1100            })?;
1101            skipped += 1;
1102        }
1103    }
1104
1105    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1106    // as channel messages arrive in completion order.
1107    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1108        .iter()
1109        .enumerate()
1110        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1111        .collect();
1112
1113    tracing::info!(
1114        target = "ingest",
1115        phase = "persist_start",
1116        files = total_to_process,
1117        "phase B starting: persisting files incrementally as Phase A completes each one",
1118    );
1119
1120    // Drain channel and persist each file immediately — no accumulation into a
1121    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1122    // Phase B without applying backpressure.
1123    for (idx, stage_result) in rx {
1124        let meta = meta_index.get(&idx).ok_or_else(|| {
1125            AppError::Internal(anyhow::anyhow!(
1126                "channel idx {idx} has no corresponding Process slot"
1127            ))
1128        })?;
1129        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1130        {
1131            SlotMeta::Process {
1132                file_str,
1133                derived_name,
1134                name_truncated,
1135                original_name,
1136                original_filename,
1137            } => (
1138                file_str,
1139                derived_name,
1140                name_truncated,
1141                original_name,
1142                original_filename,
1143            ),
1144            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1145        };
1146
1147        // If storage init failed, every file fails with the same error.
1148        let conn = match conn_or_err.as_mut() {
1149            Ok(c) => c,
1150            Err(err_msg) => {
1151                let err_clone = err_msg.clone();
1152                output::emit_json_compact(&IngestFileEvent {
1153                    file: file_str,
1154                    name: derived_name,
1155                    status: "failed",
1156                    truncated: *name_truncated,
1157                    original_name: original_name.clone(),
1158                    original_filename: original_filename.as_deref(),
1159                    error: Some(err_clone.clone()),
1160                    memory_id: None,
1161                    action: None,
1162                    body_length: 0,
1163                })?;
1164                failed += 1;
1165                if fail_fast {
1166                    output::emit_json_compact(&IngestSummary {
1167                        summary: true,
1168                        dir: args.dir.display().to_string(),
1169                        pattern: args.pattern.clone(),
1170                        recursive: args.recursive,
1171                        files_total: total,
1172                        files_succeeded: succeeded,
1173                        files_failed: failed,
1174                        files_skipped: skipped,
1175                        elapsed_ms: started.elapsed().as_millis() as u64,
1176                    })?;
1177                    return Err(AppError::Validation(format!(
1178                        "ingest aborted on first failure: {err_clone}"
1179                    )));
1180                }
1181                continue;
1182            }
1183        };
1184
1185        let outcome =
1186            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1187
1188        match outcome {
1189            Ok(FileSuccess {
1190                memory_id,
1191                action,
1192                body_length,
1193            }) => {
1194                output::emit_json_compact(&IngestFileEvent {
1195                    file: file_str,
1196                    name: derived_name,
1197                    status: "indexed",
1198                    truncated: *name_truncated,
1199                    original_name: original_name.clone(),
1200                    original_filename: original_filename.as_deref(),
1201                    error: None,
1202                    memory_id: Some(memory_id),
1203                    action: Some(action),
1204                    body_length,
1205                })?;
1206                succeeded += 1;
1207            }
1208            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1209                output::emit_json_compact(&IngestFileEvent {
1210                    file: file_str,
1211                    name: derived_name,
1212                    status: "skipped",
1213                    truncated: *name_truncated,
1214                    original_name: original_name.clone(),
1215                    original_filename: original_filename.as_deref(),
1216                    error: Some(format!("{e}")),
1217                    memory_id: None,
1218                    action: Some("duplicate".to_string()),
1219                    body_length: 0,
1220                })?;
1221                skipped += 1;
1222            }
1223            Err(e) => {
1224                let err_msg = format!("{e}");
1225                output::emit_json_compact(&IngestFileEvent {
1226                    file: file_str,
1227                    name: derived_name,
1228                    status: "failed",
1229                    truncated: *name_truncated,
1230                    original_name: original_name.clone(),
1231                    original_filename: original_filename.as_deref(),
1232                    error: Some(err_msg.clone()),
1233                    memory_id: None,
1234                    action: None,
1235                    body_length: 0,
1236                })?;
1237                failed += 1;
1238                if fail_fast {
1239                    output::emit_json_compact(&IngestSummary {
1240                        summary: true,
1241                        dir: args.dir.display().to_string(),
1242                        pattern: args.pattern.clone(),
1243                        recursive: args.recursive,
1244                        files_total: total,
1245                        files_succeeded: succeeded,
1246                        files_failed: failed,
1247                        files_skipped: skipped,
1248                        elapsed_ms: started.elapsed().as_millis() as u64,
1249                    })?;
1250                    return Err(AppError::Validation(format!(
1251                        "ingest aborted on first failure: {err_msg}"
1252                    )));
1253                }
1254            }
1255        }
1256    }
1257
1258    // Wait for the producer thread to finish cleanly.
1259    producer_handle
1260        .join()
1261        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1262
1263    if let Ok(ref conn) = conn_or_err {
1264        if succeeded > 0 {
1265            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1266        }
1267    }
1268
1269    output::emit_json_compact(&IngestSummary {
1270        summary: true,
1271        dir: args.dir.display().to_string(),
1272        pattern: args.pattern.clone(),
1273        recursive: args.recursive,
1274        files_total: total,
1275        files_succeeded: succeeded,
1276        files_failed: failed,
1277        files_skipped: skipped,
1278        elapsed_ms: started.elapsed().as_millis() as u64,
1279    })?;
1280
1281    Ok(())
1282}
1283
1284/// Auto-initialises the database (matches the contract of every other CRUD
1285/// handler) and returns a fresh read/write connection ready for the ingest
1286/// loop. Errors here are recoverable per-file: the caller surfaces them as
1287/// failure events so `--fail-fast` and the continue-on-error path keep
1288/// working when, for example, the user points `--db` at an unwritable path.
1289fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1290    ensure_db_ready(paths)?;
1291    let conn = open_rw(&paths.db)?;
1292    Ok(conn)
1293}
1294
1295pub(crate) fn collect_files(
1296    dir: &Path,
1297    pattern: &str,
1298    recursive: bool,
1299    out: &mut Vec<PathBuf>,
1300) -> Result<(), AppError> {
1301    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1302    for entry in entries {
1303        let entry = entry.map_err(AppError::Io)?;
1304        let path = entry.path();
1305        let file_type = entry.file_type().map_err(AppError::Io)?;
1306        if file_type.is_file() {
1307            let name = entry.file_name();
1308            let name_str = name.to_string_lossy();
1309            if matches_pattern(&name_str, pattern) {
1310                out.push(path);
1311            }
1312        } else if file_type.is_dir() && recursive {
1313            collect_files(&path, pattern, recursive, out)?;
1314        }
1315    }
1316    Ok(())
1317}
1318
1319fn matches_pattern(name: &str, pattern: &str) -> bool {
1320    if let Some(suffix) = pattern.strip_prefix('*') {
1321        name.ends_with(suffix)
1322    } else if let Some(prefix) = pattern.strip_suffix('*') {
1323        name.starts_with(prefix)
1324    } else {
1325        name == pattern
1326    }
1327}
1328
1329/// Returns `(final_name, truncated, original_name)`.
1330/// `truncated` is true when the derived name exceeded `max_len`.
1331/// `original_name` holds the pre-truncation name only when `truncated=true`.
1332///
1333/// Non-ASCII characters are first decomposed via NFD and then stripped of
1334/// combining marks so accented letters fold to their base ASCII letter
1335/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1336/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1337/// preserves meaningful word content rather than collapsing the basename
1338/// to a few stray ASCII letters as the previous filter did.
1339pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1340    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1341    let lowered: String = stem
1342        .nfd()
1343        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1344        .map(|c| {
1345            if c == '_' || c.is_whitespace() {
1346                '-'
1347            } else {
1348                c
1349            }
1350        })
1351        .map(|c| c.to_ascii_lowercase())
1352        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1353        .collect();
1354    let collapsed = collapse_dashes(&lowered);
1355    let trimmed_raw = collapsed.trim_matches('-').to_string();
1356    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1357    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1358        format!("doc-{trimmed_raw}")
1359    } else {
1360        trimmed_raw
1361    };
1362    if trimmed.len() > max_len {
1363        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1364        tracing::debug!(
1365            target: "ingest",
1366            original = %trimmed,
1367            truncated_to = %truncated,
1368            max_len = max_len,
1369            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1370        );
1371        (truncated, true, Some(trimmed))
1372    } else {
1373        (trimmed, false, None)
1374    }
1375}
1376
1377/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1378/// numeric suffix (`-1`, `-2`, …) when needed.
1379///
1380/// `taken` is the set of names already consumed in the current ingest run.
1381/// The caller is expected to insert the returned name into `taken` so the
1382/// next call observes the consumption. Cross-run collisions are intentionally
1383/// surfaced by the per-file persistence path as duplicates so re-ingestion
1384/// of identical corpora stays idempotent.
1385///
1386/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1387/// candidates collide, signalling a pathological corpus that should be
1388/// renamed manually.
1389fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1390    if !taken.contains(base) {
1391        return Ok(base.to_string());
1392    }
1393    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1394        let candidate = format!("{base}-{suffix}");
1395        if !taken.contains(&candidate) {
1396            tracing::warn!(
1397                target: "ingest",
1398                base = %base,
1399                resolved = %candidate,
1400                suffix,
1401                "memory name collision resolved with numeric suffix"
1402            );
1403            return Ok(candidate);
1404        }
1405    }
1406    Err(AppError::Validation(format!(
1407        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1408    )))
1409}
1410
1411fn collapse_dashes(s: &str) -> String {
1412    let mut out = String::with_capacity(s.len());
1413    let mut prev_dash = false;
1414    for c in s.chars() {
1415        if c == '-' {
1416            if !prev_dash {
1417                out.push('-');
1418            }
1419            prev_dash = true;
1420        } else {
1421            out.push(c);
1422            prev_dash = false;
1423        }
1424    }
1425    out
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430    use super::*;
1431    use std::path::PathBuf;
1432
1433    #[test]
1434    fn matches_pattern_suffix() {
1435        assert!(matches_pattern("foo.md", "*.md"));
1436        assert!(!matches_pattern("foo.txt", "*.md"));
1437        assert!(matches_pattern("foo.md", "*"));
1438    }
1439
1440    #[test]
1441    fn matches_pattern_prefix() {
1442        assert!(matches_pattern("README.md", "README*"));
1443        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1444    }
1445
1446    #[test]
1447    fn matches_pattern_exact() {
1448        assert!(matches_pattern("README.md", "README.md"));
1449        assert!(!matches_pattern("readme.md", "README.md"));
1450    }
1451
1452    #[test]
1453    fn derive_kebab_underscore_to_dash() {
1454        let p = PathBuf::from("/tmp/claude_code_headless.md");
1455        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1456        assert_eq!(name, "claude-code-headless");
1457        assert!(!truncated);
1458        assert!(original.is_none());
1459    }
1460
1461    #[test]
1462    fn derive_kebab_uppercase_lowered() {
1463        let p = PathBuf::from("/tmp/README.md");
1464        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1465        assert_eq!(name, "readme");
1466        assert!(!truncated);
1467        assert!(original.is_none());
1468    }
1469
1470    #[test]
1471    fn derive_kebab_strips_non_kebab_chars() {
1472        let p = PathBuf::from("/tmp/some@weird#name!.md");
1473        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1474        assert_eq!(name, "someweirdname");
1475        assert!(!truncated);
1476        assert!(original.is_none());
1477    }
1478
1479    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1480    // accented characters instead of dropping them entirely.
1481    #[test]
1482    fn derive_kebab_folds_accented_letters_to_ascii() {
1483        let p = PathBuf::from("/tmp/açaí.md");
1484        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1485        assert_eq!(name, "acai", "got '{name}'");
1486    }
1487
1488    #[test]
1489    fn derive_kebab_handles_naive_with_diaeresis() {
1490        let p = PathBuf::from("/tmp/naïve-test.md");
1491        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1492        assert_eq!(name, "naive-test", "got '{name}'");
1493    }
1494
1495    #[test]
1496    fn derive_kebab_drops_emoji_keeps_word() {
1497        let p = PathBuf::from("/tmp/🚀-rocket.md");
1498        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1499        assert_eq!(name, "rocket", "got '{name}'");
1500    }
1501
1502    #[test]
1503    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1504        let p = PathBuf::from("/tmp/açaí🦜.md");
1505        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1506        assert_eq!(name, "acai", "got '{name}'");
1507    }
1508
1509    #[test]
1510    fn derive_kebab_pure_emoji_yields_empty() {
1511        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1512        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1513        assert!(name.is_empty(), "got '{name}'");
1514    }
1515
1516    #[test]
1517    fn derive_kebab_collapses_consecutive_dashes() {
1518        let p = PathBuf::from("/tmp/a__b___c.md");
1519        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1520        assert_eq!(name, "a-b-c");
1521        assert!(!truncated);
1522        assert!(original.is_none());
1523    }
1524
1525    #[test]
1526    fn derive_kebab_truncates_to_60_chars() {
1527        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1528        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1529        assert!(name.len() <= 60, "got len {}", name.len());
1530        assert!(truncated);
1531        assert!(original.is_some());
1532        assert!(original.unwrap().len() > 60);
1533    }
1534
1535    #[test]
1536    fn collect_files_finds_md_files() {
1537        let tmp = tempfile::tempdir().expect("tempdir");
1538        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1539        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1540        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1541        let mut out = Vec::new();
1542        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1543        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1544    }
1545
1546    #[test]
1547    fn collect_files_recursive_descends_subdirs() {
1548        let tmp = tempfile::tempdir().expect("tempdir");
1549        let sub = tmp.path().join("sub");
1550        std::fs::create_dir(&sub).unwrap();
1551        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1552        std::fs::write(sub.join("b.md"), "y").unwrap();
1553        let mut out = Vec::new();
1554        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1555        assert_eq!(out.len(), 2);
1556    }
1557
1558    #[test]
1559    fn collect_files_non_recursive_skips_subdirs() {
1560        let tmp = tempfile::tempdir().expect("tempdir");
1561        let sub = tmp.path().join("sub");
1562        std::fs::create_dir(&sub).unwrap();
1563        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1564        std::fs::write(sub.join("b.md"), "y").unwrap();
1565        let mut out = Vec::new();
1566        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1567        assert_eq!(out.len(), 1);
1568    }
1569
1570    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1571
1572    #[test]
1573    fn derive_kebab_long_basename_truncated_within_cap() {
1574        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1575        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1576        assert!(
1577            name.len() <= DERIVED_NAME_MAX_LEN,
1578            "truncated name must respect cap; got {} chars",
1579            name.len()
1580        );
1581        assert!(!name.is_empty());
1582        assert!(truncated);
1583        assert!(original.is_some());
1584    }
1585
1586    #[test]
1587    fn unique_name_returns_base_when_free() {
1588        let taken: BTreeSet<String> = BTreeSet::new();
1589        let resolved = unique_name("note", &taken).expect("must resolve");
1590        assert_eq!(resolved, "note");
1591    }
1592
1593    #[test]
1594    fn unique_name_appends_first_free_suffix_on_collision() {
1595        let mut taken: BTreeSet<String> = BTreeSet::new();
1596        taken.insert("note".to_string());
1597        taken.insert("note-1".to_string());
1598        let resolved = unique_name("note", &taken).expect("must resolve");
1599        assert_eq!(resolved, "note-2");
1600    }
1601
1602    #[test]
1603    fn unique_name_errors_after_collision_cap() {
1604        let mut taken: BTreeSet<String> = BTreeSet::new();
1605        taken.insert("note".to_string());
1606        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1607            taken.insert(format!("note-{i}"));
1608        }
1609        let err = unique_name("note", &taken).expect_err("must surface error");
1610        assert!(matches!(err, AppError::Validation(_)));
1611    }
1612
1613    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1614
1615    #[test]
1616    fn validate_relation_format_accepts_valid_relations() {
1617        use crate::parsers::{is_canonical_relation, validate_relation_format};
1618        assert!(validate_relation_format("applies_to").is_ok());
1619        assert!(validate_relation_format("depends_on").is_ok());
1620        assert!(validate_relation_format("implements").is_ok());
1621        assert!(validate_relation_format("").is_err());
1622        assert!(is_canonical_relation("applies_to"));
1623        assert!(!is_canonical_relation("implements"));
1624    }
1625
1626    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1627
1628    use serial_test::serial;
1629
1630    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1631    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1632        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1633        let prev = std::env::var(key).ok();
1634        match value {
1635            Some(v) => std::env::set_var(key, v),
1636            None => std::env::remove_var(key),
1637        }
1638        f();
1639        match prev {
1640            Some(p) => std::env::set_var(key, p),
1641            None => std::env::remove_var(key),
1642        }
1643    }
1644
1645    #[test]
1646    #[serial]
1647    fn env_low_memory_enabled_unset_returns_false() {
1648        with_env_var(None, || assert!(!env_low_memory_enabled()));
1649    }
1650
1651    #[test]
1652    #[serial]
1653    fn env_low_memory_enabled_empty_returns_false() {
1654        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1655    }
1656
1657    #[test]
1658    #[serial]
1659    fn env_low_memory_enabled_truthy_values_return_true() {
1660        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1661            with_env_var(Some(v), || {
1662                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1663            });
1664        }
1665    }
1666
1667    #[test]
1668    #[serial]
1669    fn env_low_memory_enabled_falsy_values_return_false() {
1670        for v in ["0", "false", "FALSE", "no", "off"] {
1671            with_env_var(Some(v), || {
1672                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1673            });
1674        }
1675    }
1676
1677    #[test]
1678    #[serial]
1679    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1680        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1681    }
1682
1683    #[test]
1684    #[serial]
1685    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1686        with_env_var(None, || {
1687            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1688            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1689            assert_eq!(resolve_parallelism(true, None), 1);
1690        });
1691    }
1692
1693    #[test]
1694    #[serial]
1695    fn resolve_parallelism_env_forces_one_when_flag_off() {
1696        with_env_var(Some("1"), || {
1697            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1698            assert_eq!(resolve_parallelism(false, None), 1);
1699        });
1700    }
1701
1702    #[test]
1703    #[serial]
1704    fn resolve_parallelism_falsy_env_does_not_override() {
1705        with_env_var(Some("0"), || {
1706            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1707        });
1708    }
1709
1710    #[test]
1711    #[serial]
1712    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1713        with_env_var(None, || {
1714            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1715            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1716        });
1717    }
1718
1719    #[test]
1720    #[serial]
1721    fn resolve_parallelism_default_when_unset() {
1722        with_env_var(None, || {
1723            let p = resolve_parallelism(false, None);
1724            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1725        });
1726    }
1727
1728    #[test]
1729    fn ingest_args_parses_low_memory_flag_via_clap() {
1730        use clap::Parser;
1731        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1732        // on the public `Cli` definition so the flag is wired end-to-end.
1733        let cli = crate::cli::Cli::try_parse_from([
1734            "sqlite-graphrag",
1735            "ingest",
1736            "/tmp/dummy",
1737            "--type",
1738            "document",
1739            "--low-memory",
1740        ])
1741        .expect("parse must succeed");
1742        match cli.command {
1743            crate::cli::Commands::Ingest(args) => {
1744                assert!(args.low_memory, "--low-memory must set field to true");
1745            }
1746            _ => panic!("expected Ingest subcommand"),
1747        }
1748    }
1749
1750    #[test]
1751    fn ingest_args_low_memory_defaults_false() {
1752        use clap::Parser;
1753        let cli = crate::cli::Cli::try_parse_from([
1754            "sqlite-graphrag",
1755            "ingest",
1756            "/tmp/dummy",
1757            "--type",
1758            "document",
1759        ])
1760        .expect("parse must succeed");
1761        match cli.command {
1762            crate::cli::Commands::Ingest(args) => {
1763                assert!(!args.low_memory, "default must be false");
1764            }
1765            _ => panic!("expected Ingest subcommand"),
1766        }
1767    }
1768}