Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70    # LLM-curated extraction via Claude Code CLI\n  \
71    sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n  \
72    # Resume interrupted claude-code ingest\n  \
73    sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n  \
74    # Claude Code with budget cap and custom timeout\n  \
75    sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n  \
76NOTES:\n  \
77    Each file becomes a separate memory. Names derive from file basenames\n  \
78    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
79    followed by a final summary line with counts. Per-file errors are reported\n  \
80    inline and processing continues unless --fail-fast is set.")]
81pub struct IngestArgs {
82    /// Directory containing files to ingest.
83    #[arg(
84        value_name = "DIR",
85        help = "Directory to ingest recursively (each matching file becomes a memory)"
86    )]
87    pub dir: PathBuf,
88
89    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
90    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
91    pub r#type: MemoryType,
92
93    /// Glob pattern matched against file basenames (default: `*.md`). Supports
94    /// `*.<ext>`, `<prefix>*`, and exact filename match.
95    #[arg(long, default_value = "*.md")]
96    pub pattern: String,
97
98    /// Recurse into subdirectories.
99    #[arg(long, default_value_t = false)]
100    pub recursive: bool,
101
102    #[arg(
103        long,
104        env = "SQLITE_GRAPHRAG_ENABLE_NER",
105        value_parser = crate::parsers::parse_bool_flexible,
106        action = clap::ArgAction::Set,
107        num_args = 0..=1,
108        default_missing_value = "true",
109        default_value = "false",
110        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
111    )]
112    pub enable_ner: bool,
113    #[arg(
114        long,
115        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
116        default_value = "fp32",
117        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
118    )]
119    pub gliner_variant: String,
120
121    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
122    #[arg(long, default_value_t = false, hide = true)]
123    pub skip_extraction: bool,
124
125    /// Stop on first per-file error instead of continuing with the next file.
126    #[arg(long, default_value_t = false)]
127    pub fail_fast: bool,
128
129    /// Preview file-to-name mapping without loading model or persisting.
130    #[arg(long, default_value_t = false)]
131    pub dry_run: bool,
132
133    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
134    #[arg(long, default_value_t = 10_000)]
135    pub max_files: usize,
136
137    /// Namespace for the ingested memories.
138    #[arg(long)]
139    pub namespace: Option<String>,
140
141    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
142    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
143    pub db: Option<String>,
144
145    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
146    pub format: JsonOutputFormat,
147
148    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
149    pub json: bool,
150
151    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
152    #[arg(
153        long,
154        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
155    )]
156    pub ingest_parallelism: Option<usize>,
157
158    /// Force single-threaded ingest to reduce RSS pressure.
159    ///
160    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
161    /// explicit value. Recommended for environments with <4 GB available
162    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
163    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
164    /// (CLI flag has higher precedence than the env var).
165    #[arg(
166        long,
167        default_value_t = false,
168        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
169                Recommended for environments with <4 GB available RAM or container/cgroup \
170                constraints. Trade-off: 3-4x longer wall time. Also honored via \
171                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
172    )]
173    pub low_memory: bool,
174
175    /// Maximum process RSS in MiB; abort if exceeded during embedding.
176    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
177          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
178    pub max_rss_mb: u64,
179
180    /// Maximum character length for derived memory names from file basenames.
181    ///
182    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
183    /// Shorter values leave more headroom for collision suffix resolution.
184    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
185          help = "Maximum length for derived memory names (default: 60)")]
186    pub max_name_length: usize,
187
188    /// Extraction mode: `none` (body-only, default), `gliner` (NER), or `claude-code` (LLM-curated via Claude Code CLI).
189    #[arg(long, value_enum, default_value_t = IngestMode::None)]
190    pub mode: IngestMode,
191
192    /// Explicit path to the Claude Code binary (only with --mode claude-code).
193    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
194    pub claude_binary: Option<std::path::PathBuf>,
195
196    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
197    #[arg(long)]
198    pub claude_model: Option<String>,
199
200    /// Resume a previously interrupted claude-code ingest from the queue DB.
201    #[arg(long, default_value_t = false)]
202    pub resume: bool,
203
204    /// Retry only failed files from a previous claude-code ingest.
205    #[arg(long, default_value_t = false)]
206    pub retry_failed: bool,
207
208    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
209    #[arg(long, default_value_t = false)]
210    pub keep_queue: bool,
211
212    /// Custom path for the claude-code ingest queue database.
213    #[arg(long, default_value = ".ingest-queue.sqlite")]
214    pub queue_db: String,
215
216    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
217    #[arg(long, default_value_t = 60)]
218    pub rate_limit_wait: u64,
219
220    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
221    #[arg(long)]
222    pub max_cost_usd: Option<f64>,
223
224    /// Timeout in seconds for each claude -p invocation (only with --mode claude-code).
225    #[arg(
226        long,
227        default_value_t = 300,
228        help = "Timeout in seconds for each claude -p invocation (default: 300)"
229    )]
230    pub claude_timeout: u64,
231
232    /// Explicit path to the Codex CLI binary (only with --mode codex).
233    #[arg(
234        long,
235        env = "SQLITE_GRAPHRAG_CODEX_BINARY",
236        help = "Explicit path to the Codex CLI binary (only with --mode codex)"
237    )]
238    pub codex_binary: Option<PathBuf>,
239
240    /// Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex).
241    #[arg(
242        long,
243        help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
244    )]
245    pub codex_model: Option<String>,
246
247    /// Timeout in seconds for each codex exec invocation.
248    #[arg(
249        long,
250        default_value_t = 300,
251        help = "Timeout in seconds for each codex exec invocation (default: 300)"
252    )]
253    pub codex_timeout: u64,
254}
255
256/// Extraction mode for the ingest pipeline.
257#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
258pub enum IngestMode {
259    /// Body-only ingestion without entity/relationship extraction (default).
260    None,
261    /// GLiNER zero-shot NER extraction (requires --enable-ner).
262    Gliner,
263    /// LLM-curated extraction via locally installed Claude Code CLI.
264    ClaudeCode,
265    /// LLM-curated extraction via locally installed OpenAI Codex CLI.
266    Codex,
267}
268
269/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
270/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
271/// values evaluate to false. Unrecognized non-empty values emit a
272/// `tracing::warn!` and evaluate to false.
273fn env_low_memory_enabled() -> bool {
274    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
275        Ok(v) if v.is_empty() => false,
276        Ok(v) => match v.to_lowercase().as_str() {
277            "1" | "true" | "yes" | "on" => true,
278            "0" | "false" | "no" | "off" => false,
279            other => {
280                tracing::warn!(
281                    target: "ingest",
282                    value = %other,
283                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
284                );
285                false
286            }
287        },
288        Err(_) => false,
289    }
290}
291
292/// Resolves the effective ingest parallelism honoring `--low-memory` and the
293/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
294///
295/// Precedence:
296/// 1. `--low-memory` CLI flag forces parallelism = 1.
297/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
298/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
299/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
300///
301/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
302/// emits a `tracing::warn!` advertising the override.
303fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
304    let env_flag = env_low_memory_enabled();
305    let low_memory = low_memory_flag || env_flag;
306
307    if low_memory {
308        if let Some(n) = ingest_parallelism {
309            if n > 1 {
310                tracing::warn!(
311                    target: "ingest",
312                    requested = n,
313                    "--ingest-parallelism overridden by --low-memory; using 1"
314                );
315            }
316        }
317        if low_memory_flag {
318            tracing::info!(
319                target: "ingest",
320                source = "flag",
321                "low-memory mode enabled: forcing --ingest-parallelism 1"
322            );
323        } else {
324            tracing::info!(
325                target: "ingest",
326                source = "env",
327                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
328            );
329        }
330        return 1;
331    }
332
333    ingest_parallelism
334        .unwrap_or_else(|| {
335            std::thread::available_parallelism()
336                .map(|v| v.get() / 2)
337                .unwrap_or(1)
338                .clamp(1, 4)
339        })
340        .max(1)
341}
342
343#[derive(Serialize)]
344struct IngestFileEvent<'a> {
345    file: &'a str,
346    name: &'a str,
347    status: &'a str,
348    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
349    truncated: bool,
350    /// Original derived name before truncation; only present when `truncated=true`.
351    #[serde(skip_serializing_if = "Option::is_none")]
352    original_name: Option<String>,
353    /// Original file basename (without extension); only present when it differs from `name`.
354    #[serde(skip_serializing_if = "Option::is_none")]
355    original_filename: Option<&'a str>,
356    #[serde(skip_serializing_if = "Option::is_none")]
357    error: Option<String>,
358    #[serde(skip_serializing_if = "Option::is_none")]
359    memory_id: Option<i64>,
360    #[serde(skip_serializing_if = "Option::is_none")]
361    action: Option<String>,
362    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
363    body_length: usize,
364}
365
366#[derive(Serialize)]
367struct IngestSummary {
368    summary: bool,
369    dir: String,
370    pattern: String,
371    recursive: bool,
372    files_total: usize,
373    files_succeeded: usize,
374    files_failed: usize,
375    files_skipped: usize,
376    elapsed_ms: u64,
377}
378
379/// Outcome of a successful per-file ingest, used to build the NDJSON event.
380struct FileSuccess {
381    memory_id: i64,
382    action: String,
383    body_length: usize,
384}
385
386/// NDJSON progress event emitted to stderr after each file completes Phase A.
387/// Schema version 1; consumers should check `schema_version` before parsing.
388#[derive(Serialize)]
389struct StageProgressEvent<'a> {
390    schema_version: u8,
391    event: &'a str,
392    path: &'a str,
393    ms: u64,
394    entities: usize,
395    relationships: usize,
396}
397
398/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
399/// Phase B persists these to SQLite on the main thread in submission order.
400struct StagedFile {
401    body: String,
402    body_hash: String,
403    snippet: String,
404    name: String,
405    description: String,
406    embedding: Vec<f32>,
407    chunk_embeddings: Option<Vec<Vec<f32>>>,
408    chunks_info: Vec<crate::chunking::Chunk>,
409    entities: Vec<NewEntity>,
410    relationships: Vec<NewRelationship>,
411    entity_embeddings: Vec<Vec<f32>>,
412    urls: Vec<crate::extraction::ExtractedUrl>,
413}
414
415/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
416/// Never touches the database — safe to run on any rayon thread.
417fn stage_file(
418    _idx: usize,
419    path: &Path,
420    name: &str,
421    paths: &AppPaths,
422    enable_ner: bool,
423    gliner_variant: crate::extraction::GlinerVariant,
424    max_rss_mb: u64,
425) -> Result<StagedFile, AppError> {
426    use crate::constants::*;
427
428    if name.len() > MAX_MEMORY_NAME_LEN {
429        return Err(AppError::LimitExceeded(
430            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
431        ));
432    }
433    if name.starts_with("__") {
434        return Err(AppError::Validation(
435            crate::i18n::validation::reserved_name(),
436        ));
437    }
438    {
439        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
440            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
441        if !slug_re.is_match(name) {
442            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
443                name,
444            )));
445        }
446    }
447
448    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
449    if raw_body.len() > MAX_MEMORY_BODY_LEN {
450        return Err(AppError::LimitExceeded(
451            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
452        ));
453    }
454    if raw_body.trim().is_empty() {
455        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
456    }
457
458    let description = format!("ingested from {}", path.display());
459    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
460        return Err(AppError::Validation(
461            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
462        ));
463    }
464
465    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
466    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
467    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
468    if enable_ner {
469        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
470            Ok(extracted) => {
471                extracted_urls = extracted.urls;
472                extracted_entities = extracted.entities;
473                extracted_relationships = extracted.relationships;
474
475                if extracted_entities.len() > max_entities_per_memory() {
476                    extracted_entities.truncate(max_entities_per_memory());
477                }
478                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
479                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
480                }
481            }
482            Err(e) => {
483                tracing::warn!(
484                    file = %path.display(),
485                    "auto-extraction failed (graceful degradation): {e:#}"
486                );
487            }
488        }
489    }
490
491    for rel in &mut extracted_relationships {
492        rel.relation = crate::parsers::normalize_relation(&rel.relation);
493        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
494            return Err(AppError::Validation(format!(
495                "{e} for relationship '{}' -> '{}'",
496                rel.source, rel.target
497            )));
498        }
499        crate::parsers::warn_if_non_canonical(&rel.relation);
500        if !(0.0..=1.0).contains(&rel.strength) {
501            return Err(AppError::Validation(format!(
502                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
503                rel.strength, rel.source, rel.target
504            )));
505        }
506    }
507
508    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
509    let snippet: String = raw_body.chars().take(200).collect();
510
511    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
512    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
513    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
514        return Err(AppError::LimitExceeded(format!(
515            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
516            chunks_info.len(),
517            REMEMBER_MAX_SAFE_MULTI_CHUNKS
518        )));
519    }
520
521    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
522    let embedding = if chunks_info.len() == 1 {
523        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
524    } else {
525        let chunk_texts: Vec<&str> = chunks_info
526            .iter()
527            .map(|c| chunking::chunk_text(&raw_body, c))
528            .collect();
529        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
530        for chunk_text in &chunk_texts {
531            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
532                if rss > max_rss_mb {
533                    tracing::error!(
534                        rss_mb = rss,
535                        max_rss_mb = max_rss_mb,
536                        file = %path.display(),
537                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
538                    );
539                    return Err(AppError::LowMemory {
540                        available_mb: crate::memory_guard::available_memory_mb(),
541                        required_mb: max_rss_mb,
542                    });
543                }
544            }
545            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
546                &paths.models,
547                chunk_text,
548            )?);
549        }
550        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
551        chunk_embeddings_opt = Some(chunk_embeddings);
552        aggregated
553    };
554
555    let entity_embeddings = extracted_entities
556        .iter()
557        .map(|entity| {
558            let entity_text = match &entity.description {
559                Some(desc) => format!("{} {}", entity.name, desc),
560                None => entity.name.clone(),
561            };
562            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
563        })
564        .collect::<Result<Vec<_>, _>>()?;
565
566    Ok(StagedFile {
567        body: raw_body,
568        body_hash,
569        snippet,
570        name: name.to_string(),
571        description,
572        embedding,
573        chunk_embeddings: chunk_embeddings_opt,
574        chunks_info,
575        entities: extracted_entities,
576        relationships: extracted_relationships,
577        entity_embeddings,
578        urls: extracted_urls,
579    })
580}
581
582/// Phase B: persists one `StagedFile` to the database on the main thread.
583fn persist_staged(
584    conn: &mut Connection,
585    namespace: &str,
586    memory_type: &str,
587    staged: StagedFile,
588) -> Result<FileSuccess, AppError> {
589    {
590        let active_count: u32 = conn.query_row(
591            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
592            [],
593            |r| r.get::<_, i64>(0).map(|v| v as u32),
594        )?;
595        let ns_exists: bool = conn.query_row(
596            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
597            rusqlite::params![namespace],
598            |r| r.get::<_, i64>(0).map(|v| v > 0),
599        )?;
600        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
601            return Err(AppError::NamespaceError(format!(
602                "active namespace limit of {} exceeded while creating '{namespace}'",
603                crate::constants::MAX_NAMESPACES_ACTIVE
604            )));
605        }
606    }
607
608    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
609    if existing_memory.is_some() {
610        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
611            &staged.name,
612            namespace,
613        )));
614    }
615    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
616
617    let new_memory = NewMemory {
618        namespace: namespace.to_string(),
619        name: staged.name.clone(),
620        memory_type: memory_type.to_string(),
621        description: staged.description.clone(),
622        body: staged.body,
623        body_hash: staged.body_hash,
624        session_id: None,
625        source: "agent".to_string(),
626        metadata: serde_json::json!({}),
627    };
628
629    if let Some(hash_id) = duplicate_hash_id {
630        tracing::debug!(
631            target: "ingest",
632            duplicate_memory_id = hash_id,
633            "identical body already exists; persisting a new memory anyway"
634        );
635    }
636
637    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
638
639    let memory_id = memories::insert(&tx, &new_memory)?;
640    versions::insert_version(
641        &tx,
642        memory_id,
643        1,
644        &staged.name,
645        memory_type,
646        &staged.description,
647        &new_memory.body,
648        &serde_json::to_string(&new_memory.metadata)?,
649        None,
650        "create",
651    )?;
652    memories::upsert_vec(
653        &tx,
654        memory_id,
655        namespace,
656        memory_type,
657        &staged.embedding,
658        &staged.name,
659        &staged.snippet,
660    )?;
661
662    if staged.chunks_info.len() > 1 {
663        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
664        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
665            AppError::Internal(anyhow::anyhow!(
666                "missing chunk embeddings cache on multi-chunk ingest path"
667            ))
668        })?;
669        for (i, emb) in chunk_embeddings.iter().enumerate() {
670            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
671        }
672    }
673
674    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
675        for (idx, entity) in staged.entities.iter().enumerate() {
676            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
677            let entity_embedding = &staged.entity_embeddings[idx];
678            entities::upsert_entity_vec(
679                &tx,
680                entity_id,
681                namespace,
682                entity.entity_type,
683                entity_embedding,
684                &entity.name,
685            )?;
686            entities::link_memory_entity(&tx, memory_id, entity_id)?;
687            entities::increment_degree(&tx, entity_id)?;
688        }
689        let entity_types: std::collections::HashMap<&str, EntityType> = staged
690            .entities
691            .iter()
692            .map(|entity| (entity.name.as_str(), entity.entity_type))
693            .collect();
694        for rel in &staged.relationships {
695            let source_entity = NewEntity {
696                name: rel.source.clone(),
697                entity_type: entity_types
698                    .get(rel.source.as_str())
699                    .copied()
700                    .unwrap_or(EntityType::Concept),
701                description: None,
702            };
703            let target_entity = NewEntity {
704                name: rel.target.clone(),
705                entity_type: entity_types
706                    .get(rel.target.as_str())
707                    .copied()
708                    .unwrap_or(EntityType::Concept),
709                description: None,
710            };
711            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
712            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
713            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
714            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
715        }
716    }
717
718    tx.commit()?;
719
720    if !staged.urls.is_empty() {
721        let url_entries: Vec<storage_urls::MemoryUrl> = staged
722            .urls
723            .into_iter()
724            .map(|u| storage_urls::MemoryUrl {
725                url: u.url,
726                offset: Some(u.offset as i64),
727            })
728            .collect();
729        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
730    }
731
732    Ok(FileSuccess {
733        memory_id,
734        action: "created".to_string(),
735        body_length: new_memory.body.len(),
736    })
737}
738
739pub fn run(args: IngestArgs) -> Result<(), AppError> {
740    if args.mode == IngestMode::ClaudeCode {
741        return super::ingest_claude::run_claude_ingest(&args);
742    }
743    if args.mode == IngestMode::Codex {
744        return super::ingest_codex::run_codex_ingest(&args);
745    }
746
747    let started = std::time::Instant::now();
748
749    if !args.dir.exists() {
750        return Err(AppError::Validation(format!(
751            "directory not found: {}",
752            args.dir.display()
753        )));
754    }
755    if !args.dir.is_dir() {
756        return Err(AppError::Validation(format!(
757            "path is not a directory: {}",
758            args.dir.display()
759        )));
760    }
761
762    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
763    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
764    files.sort();
765
766    if files.len() > args.max_files {
767        return Err(AppError::Validation(format!(
768            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
769            files.len(),
770            args.max_files
771        )));
772    }
773
774    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
775    let memory_type_str = args.r#type.as_str().to_string();
776
777    let paths = AppPaths::resolve(args.db.as_deref())?;
778    let mut conn_or_err = match init_storage(&paths) {
779        Ok(c) => Ok(c),
780        Err(e) => Err(format!("{e}")),
781    };
782
783    let mut succeeded: usize = 0;
784    let mut failed: usize = 0;
785    let mut skipped: usize = 0;
786    let total = files.len();
787
788    // Pre-resolve all names before parallelisation so Phase A workers see a
789    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
790    let mut taken_names: BTreeSet<String> = BTreeSet::new();
791
792    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
793    // ProcessItem: the data moved into the producer thread for Phase A computation.
794    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
795    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
796    // boundary into the rayon producer.
797    enum SlotMeta {
798        Skip {
799            file_str: String,
800            derived_base: String,
801            name_truncated: bool,
802            original_name: Option<String>,
803            original_filename: Option<String>,
804            reason: String,
805        },
806        Process {
807            file_str: String,
808            derived_name: String,
809            name_truncated: bool,
810            original_name: Option<String>,
811            original_filename: Option<String>,
812        },
813    }
814
815    struct ProcessItem {
816        idx: usize,
817        path: PathBuf,
818        file_str: String,
819        derived_name: String,
820    }
821
822    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
823    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
824    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
825
826    let max_name_length = args.max_name_length;
827    for path in &files {
828        let file_str = path.to_string_lossy().into_owned();
829        let (derived_base, name_truncated, original_name) =
830            derive_kebab_name(path, max_name_length);
831        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
832
833        if name_truncated {
834            if let Some(ref orig) = original_name {
835                truncations.push((orig.clone(), derived_base.clone()));
836            }
837        }
838
839        if derived_base.is_empty() {
840            // original_filename: always include when it differs from the empty derived name
841            let orig_filename = if !original_basename.is_empty() {
842                Some(original_basename.to_string())
843            } else {
844                None
845            };
846            slots_meta.push(SlotMeta::Skip {
847                file_str,
848                derived_base: String::new(),
849                name_truncated: false,
850                original_name: None,
851                original_filename: orig_filename,
852                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
853            });
854            continue;
855        }
856
857        match unique_name(&derived_base, &taken_names) {
858            Ok(derived_name) => {
859                taken_names.insert(derived_name.clone());
860                let idx = slots_meta.len();
861                // original_filename: present only when the raw basename differs from the derived name
862                let orig_filename = if original_basename != derived_name {
863                    Some(original_basename.to_string())
864                } else {
865                    None
866                };
867                process_items.push(ProcessItem {
868                    idx,
869                    path: path.clone(),
870                    file_str: file_str.clone(),
871                    derived_name: derived_name.clone(),
872                });
873                slots_meta.push(SlotMeta::Process {
874                    file_str,
875                    derived_name,
876                    name_truncated,
877                    original_name,
878                    original_filename: orig_filename,
879                });
880            }
881            Err(e) => {
882                let orig_filename = if original_basename != derived_base {
883                    Some(original_basename.to_string())
884                } else {
885                    None
886                };
887                slots_meta.push(SlotMeta::Skip {
888                    file_str,
889                    derived_base,
890                    name_truncated,
891                    original_name,
892                    original_filename: orig_filename,
893                    reason: e.to_string(),
894                });
895            }
896        }
897    }
898
899    if !truncations.is_empty() {
900        tracing::info!(
901            target: "ingest",
902            count = truncations.len(),
903            max_name_length = max_name_length,
904            max_len = DERIVED_NAME_MAX_LEN,
905            "derived names truncated; pass -vv (debug) for per-file detail"
906        );
907    }
908
909    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
910    if args.dry_run {
911        for meta in &slots_meta {
912            match meta {
913                SlotMeta::Skip {
914                    file_str,
915                    derived_base,
916                    name_truncated,
917                    original_name,
918                    original_filename,
919                    reason,
920                } => {
921                    output::emit_json_compact(&IngestFileEvent {
922                        file: file_str,
923                        name: derived_base,
924                        status: "skip",
925                        truncated: *name_truncated,
926                        original_name: original_name.clone(),
927                        original_filename: original_filename.as_deref(),
928                        error: Some(reason.clone()),
929                        memory_id: None,
930                        action: None,
931                        body_length: 0,
932                    })?;
933                }
934                SlotMeta::Process {
935                    file_str,
936                    derived_name,
937                    name_truncated,
938                    original_name,
939                    original_filename,
940                } => {
941                    output::emit_json_compact(&IngestFileEvent {
942                        file: file_str,
943                        name: derived_name,
944                        status: "preview",
945                        truncated: *name_truncated,
946                        original_name: original_name.clone(),
947                        original_filename: original_filename.as_deref(),
948                        error: None,
949                        memory_id: None,
950                        action: None,
951                        body_length: 0,
952                    })?;
953                }
954            }
955        }
956        output::emit_json_compact(&IngestSummary {
957            summary: true,
958            dir: args.dir.to_string_lossy().into_owned(),
959            pattern: args.pattern.clone(),
960            recursive: args.recursive,
961            files_total: total,
962            files_succeeded: 0,
963            files_failed: 0,
964            files_skipped: 0,
965            elapsed_ms: started.elapsed().as_millis() as u64,
966        })?;
967        return Ok(());
968    }
969
970    // Determine rayon thread pool size, honoring --low-memory and the
971    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
972    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
973
974    let pool = rayon::ThreadPoolBuilder::new()
975        .num_threads(parallelism)
976        .build()
977        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
978
979    if args.enable_ner && args.skip_extraction {
980        tracing::warn!(
981            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
982        );
983    }
984    if args.skip_extraction && !args.enable_ner {
985        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
986    }
987    let enable_ner = args.enable_ner;
988    let max_rss_mb = args.max_rss_mb;
989    let gliner_variant: crate::extraction::GlinerVariant =
990        args.gliner_variant.parse().unwrap_or_else(|e| {
991            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
992            crate::extraction::GlinerVariant::Fp32
993        });
994
995    let total_to_process = process_items.len();
996    tracing::info!(
997        target = "ingest",
998        phase = "pipeline_start",
999        files = total_to_process,
1000        ingest_parallelism = parallelism,
1001        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1002    );
1003
1004    // Bounded channel: producer never gets more than parallelism*2 items ahead of
1005    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
1006    // Each message carries the slot index so Phase B can look up SlotMeta in order.
1007    let channel_bound = (parallelism * 2).max(1);
1008    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1009
1010    // Phase A: launched in a dedicated OS thread so the main thread can consume
1011    // the channel concurrently. pool.install() blocks the calling thread until
1012    // all rayon workers finish — if called on the main thread it would
1013    // reintroduce the 2-phase blocking behaviour we are eliminating.
1014    let paths_owned = paths.clone();
1015    let producer_handle = std::thread::spawn(move || {
1016        pool.install(|| {
1017            process_items.into_par_iter().for_each(|item| {
1018                let t0 = std::time::Instant::now();
1019                let result = stage_file(
1020                    item.idx,
1021                    &item.path,
1022                    &item.derived_name,
1023                    &paths_owned,
1024                    enable_ner,
1025                    gliner_variant,
1026                    max_rss_mb,
1027                );
1028                let elapsed_ms = t0.elapsed().as_millis() as u64;
1029
1030                // Emit NDJSON progress event to stderr so the user sees work
1031                // happening during long NER runs (e.g. 50 files × 27s each).
1032                let (n_entities, n_relationships) = match &result {
1033                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1034                    Err(_) => (0, 0),
1035                };
1036                let progress = StageProgressEvent {
1037                    schema_version: 1,
1038                    event: "file_extracted",
1039                    path: &item.file_str,
1040                    ms: elapsed_ms,
1041                    entities: n_entities,
1042                    relationships: n_relationships,
1043                };
1044                if let Ok(line) = serde_json::to_string(&progress) {
1045                    eprintln!("{line}");
1046                }
1047
1048                // Blocking send applies backpressure: if Phase B is slower,
1049                // Phase A workers wait here instead of accumulating staged files
1050                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1051                let _ = tx.send((item.idx, result));
1052            });
1053            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1054            drop(tx);
1055        });
1056    });
1057
1058    // Phase B: main thread persists files as results arrive from the channel.
1059    // Results arrive in completion order (par_iter is unordered). We persist
1060    // each file immediately on arrival — this is the key fix for B1: with the
1061    // old 2-phase design the first DB write happened only after ALL files had
1062    // finished Phase A. Now the first commit happens as soon as the first file
1063    // completes Phase A, regardless of how many files remain.
1064    //
1065    // NDJSON output order follows completion order (not file-system sort order).
1066    // Skip slots are emitted at the end, after all Process results are consumed.
1067    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1068    // requirement than ensuring data is persisted before the user's timeout fires.
1069    let fail_fast = args.fail_fast;
1070
1071    // Emit pending Skip events first so agents see them early.
1072    for meta in &slots_meta {
1073        if let SlotMeta::Skip {
1074            file_str,
1075            derived_base,
1076            name_truncated,
1077            original_name,
1078            original_filename,
1079            reason,
1080        } = meta
1081        {
1082            output::emit_json_compact(&IngestFileEvent {
1083                file: file_str,
1084                name: derived_base,
1085                status: "skipped",
1086                truncated: *name_truncated,
1087                original_name: original_name.clone(),
1088                original_filename: original_filename.as_deref(),
1089                error: Some(reason.clone()),
1090                memory_id: None,
1091                action: None,
1092                body_length: 0,
1093            })?;
1094            skipped += 1;
1095        }
1096    }
1097
1098    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1099    // as channel messages arrive in completion order.
1100    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1101        .iter()
1102        .enumerate()
1103        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1104        .collect();
1105
1106    tracing::info!(
1107        target = "ingest",
1108        phase = "persist_start",
1109        files = total_to_process,
1110        "phase B starting: persisting files incrementally as Phase A completes each one",
1111    );
1112
1113    // Drain channel and persist each file immediately — no accumulation into a
1114    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1115    // Phase B without applying backpressure.
1116    for (idx, stage_result) in rx {
1117        let meta = meta_index.get(&idx).ok_or_else(|| {
1118            AppError::Internal(anyhow::anyhow!(
1119                "channel idx {idx} has no corresponding Process slot"
1120            ))
1121        })?;
1122        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1123        {
1124            SlotMeta::Process {
1125                file_str,
1126                derived_name,
1127                name_truncated,
1128                original_name,
1129                original_filename,
1130            } => (
1131                file_str,
1132                derived_name,
1133                name_truncated,
1134                original_name,
1135                original_filename,
1136            ),
1137            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1138        };
1139
1140        // If storage init failed, every file fails with the same error.
1141        let conn = match conn_or_err.as_mut() {
1142            Ok(c) => c,
1143            Err(err_msg) => {
1144                let err_clone = err_msg.clone();
1145                output::emit_json_compact(&IngestFileEvent {
1146                    file: file_str,
1147                    name: derived_name,
1148                    status: "failed",
1149                    truncated: *name_truncated,
1150                    original_name: original_name.clone(),
1151                    original_filename: original_filename.as_deref(),
1152                    error: Some(err_clone.clone()),
1153                    memory_id: None,
1154                    action: None,
1155                    body_length: 0,
1156                })?;
1157                failed += 1;
1158                if fail_fast {
1159                    output::emit_json_compact(&IngestSummary {
1160                        summary: true,
1161                        dir: args.dir.display().to_string(),
1162                        pattern: args.pattern.clone(),
1163                        recursive: args.recursive,
1164                        files_total: total,
1165                        files_succeeded: succeeded,
1166                        files_failed: failed,
1167                        files_skipped: skipped,
1168                        elapsed_ms: started.elapsed().as_millis() as u64,
1169                    })?;
1170                    return Err(AppError::Validation(format!(
1171                        "ingest aborted on first failure: {err_clone}"
1172                    )));
1173                }
1174                continue;
1175            }
1176        };
1177
1178        let outcome =
1179            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1180
1181        match outcome {
1182            Ok(FileSuccess {
1183                memory_id,
1184                action,
1185                body_length,
1186            }) => {
1187                output::emit_json_compact(&IngestFileEvent {
1188                    file: file_str,
1189                    name: derived_name,
1190                    status: "indexed",
1191                    truncated: *name_truncated,
1192                    original_name: original_name.clone(),
1193                    original_filename: original_filename.as_deref(),
1194                    error: None,
1195                    memory_id: Some(memory_id),
1196                    action: Some(action),
1197                    body_length,
1198                })?;
1199                succeeded += 1;
1200            }
1201            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1202                output::emit_json_compact(&IngestFileEvent {
1203                    file: file_str,
1204                    name: derived_name,
1205                    status: "skipped",
1206                    truncated: *name_truncated,
1207                    original_name: original_name.clone(),
1208                    original_filename: original_filename.as_deref(),
1209                    error: Some(format!("{e}")),
1210                    memory_id: None,
1211                    action: Some("duplicate".to_string()),
1212                    body_length: 0,
1213                })?;
1214                skipped += 1;
1215            }
1216            Err(e) => {
1217                let err_msg = format!("{e}");
1218                output::emit_json_compact(&IngestFileEvent {
1219                    file: file_str,
1220                    name: derived_name,
1221                    status: "failed",
1222                    truncated: *name_truncated,
1223                    original_name: original_name.clone(),
1224                    original_filename: original_filename.as_deref(),
1225                    error: Some(err_msg.clone()),
1226                    memory_id: None,
1227                    action: None,
1228                    body_length: 0,
1229                })?;
1230                failed += 1;
1231                if fail_fast {
1232                    output::emit_json_compact(&IngestSummary {
1233                        summary: true,
1234                        dir: args.dir.display().to_string(),
1235                        pattern: args.pattern.clone(),
1236                        recursive: args.recursive,
1237                        files_total: total,
1238                        files_succeeded: succeeded,
1239                        files_failed: failed,
1240                        files_skipped: skipped,
1241                        elapsed_ms: started.elapsed().as_millis() as u64,
1242                    })?;
1243                    return Err(AppError::Validation(format!(
1244                        "ingest aborted on first failure: {err_msg}"
1245                    )));
1246                }
1247            }
1248        }
1249    }
1250
1251    // Wait for the producer thread to finish cleanly.
1252    producer_handle
1253        .join()
1254        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1255
1256    if let Ok(ref conn) = conn_or_err {
1257        if succeeded > 0 {
1258            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1259        }
1260    }
1261
1262    output::emit_json_compact(&IngestSummary {
1263        summary: true,
1264        dir: args.dir.display().to_string(),
1265        pattern: args.pattern.clone(),
1266        recursive: args.recursive,
1267        files_total: total,
1268        files_succeeded: succeeded,
1269        files_failed: failed,
1270        files_skipped: skipped,
1271        elapsed_ms: started.elapsed().as_millis() as u64,
1272    })?;
1273
1274    Ok(())
1275}
1276
1277/// Auto-initialises the database (matches the contract of every other CRUD
1278/// handler) and returns a fresh read/write connection ready for the ingest
1279/// loop. Errors here are recoverable per-file: the caller surfaces them as
1280/// failure events so `--fail-fast` and the continue-on-error path keep
1281/// working when, for example, the user points `--db` at an unwritable path.
1282fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1283    ensure_db_ready(paths)?;
1284    let conn = open_rw(&paths.db)?;
1285    Ok(conn)
1286}
1287
1288pub(crate) fn collect_files(
1289    dir: &Path,
1290    pattern: &str,
1291    recursive: bool,
1292    out: &mut Vec<PathBuf>,
1293) -> Result<(), AppError> {
1294    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1295    for entry in entries {
1296        let entry = entry.map_err(AppError::Io)?;
1297        let path = entry.path();
1298        let file_type = entry.file_type().map_err(AppError::Io)?;
1299        if file_type.is_file() {
1300            let name = entry.file_name();
1301            let name_str = name.to_string_lossy();
1302            if matches_pattern(&name_str, pattern) {
1303                out.push(path);
1304            }
1305        } else if file_type.is_dir() && recursive {
1306            collect_files(&path, pattern, recursive, out)?;
1307        }
1308    }
1309    Ok(())
1310}
1311
1312fn matches_pattern(name: &str, pattern: &str) -> bool {
1313    if let Some(suffix) = pattern.strip_prefix('*') {
1314        name.ends_with(suffix)
1315    } else if let Some(prefix) = pattern.strip_suffix('*') {
1316        name.starts_with(prefix)
1317    } else {
1318        name == pattern
1319    }
1320}
1321
1322/// Returns `(final_name, truncated, original_name)`.
1323/// `truncated` is true when the derived name exceeded `max_len`.
1324/// `original_name` holds the pre-truncation name only when `truncated=true`.
1325///
1326/// Non-ASCII characters are first decomposed via NFD and then stripped of
1327/// combining marks so accented letters fold to their base ASCII letter
1328/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1329/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1330/// preserves meaningful word content rather than collapsing the basename
1331/// to a few stray ASCII letters as the previous filter did.
1332pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1333    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1334    let lowered: String = stem
1335        .nfd()
1336        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1337        .map(|c| {
1338            if c == '_' || c.is_whitespace() {
1339                '-'
1340            } else {
1341                c
1342            }
1343        })
1344        .map(|c| c.to_ascii_lowercase())
1345        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1346        .collect();
1347    let collapsed = collapse_dashes(&lowered);
1348    let trimmed_raw = collapsed.trim_matches('-').to_string();
1349    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1350    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1351        format!("doc-{trimmed_raw}")
1352    } else {
1353        trimmed_raw
1354    };
1355    if trimmed.len() > max_len {
1356        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1357        tracing::debug!(
1358            target: "ingest",
1359            original = %trimmed,
1360            truncated_to = %truncated,
1361            max_len = max_len,
1362            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1363        );
1364        (truncated, true, Some(trimmed))
1365    } else {
1366        (trimmed, false, None)
1367    }
1368}
1369
1370/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1371/// numeric suffix (`-1`, `-2`, …) when needed.
1372///
1373/// `taken` is the set of names already consumed in the current ingest run.
1374/// The caller is expected to insert the returned name into `taken` so the
1375/// next call observes the consumption. Cross-run collisions are intentionally
1376/// surfaced by the per-file persistence path as duplicates so re-ingestion
1377/// of identical corpora stays idempotent.
1378///
1379/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1380/// candidates collide, signalling a pathological corpus that should be
1381/// renamed manually.
1382fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1383    if !taken.contains(base) {
1384        return Ok(base.to_string());
1385    }
1386    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1387        let candidate = format!("{base}-{suffix}");
1388        if !taken.contains(&candidate) {
1389            tracing::warn!(
1390                target: "ingest",
1391                base = %base,
1392                resolved = %candidate,
1393                suffix,
1394                "memory name collision resolved with numeric suffix"
1395            );
1396            return Ok(candidate);
1397        }
1398    }
1399    Err(AppError::Validation(format!(
1400        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1401    )))
1402}
1403
1404fn collapse_dashes(s: &str) -> String {
1405    let mut out = String::with_capacity(s.len());
1406    let mut prev_dash = false;
1407    for c in s.chars() {
1408        if c == '-' {
1409            if !prev_dash {
1410                out.push('-');
1411            }
1412            prev_dash = true;
1413        } else {
1414            out.push(c);
1415            prev_dash = false;
1416        }
1417    }
1418    out
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423    use super::*;
1424    use std::path::PathBuf;
1425
1426    #[test]
1427    fn matches_pattern_suffix() {
1428        assert!(matches_pattern("foo.md", "*.md"));
1429        assert!(!matches_pattern("foo.txt", "*.md"));
1430        assert!(matches_pattern("foo.md", "*"));
1431    }
1432
1433    #[test]
1434    fn matches_pattern_prefix() {
1435        assert!(matches_pattern("README.md", "README*"));
1436        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1437    }
1438
1439    #[test]
1440    fn matches_pattern_exact() {
1441        assert!(matches_pattern("README.md", "README.md"));
1442        assert!(!matches_pattern("readme.md", "README.md"));
1443    }
1444
1445    #[test]
1446    fn derive_kebab_underscore_to_dash() {
1447        let p = PathBuf::from("/tmp/claude_code_headless.md");
1448        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1449        assert_eq!(name, "claude-code-headless");
1450        assert!(!truncated);
1451        assert!(original.is_none());
1452    }
1453
1454    #[test]
1455    fn derive_kebab_uppercase_lowered() {
1456        let p = PathBuf::from("/tmp/README.md");
1457        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1458        assert_eq!(name, "readme");
1459        assert!(!truncated);
1460        assert!(original.is_none());
1461    }
1462
1463    #[test]
1464    fn derive_kebab_strips_non_kebab_chars() {
1465        let p = PathBuf::from("/tmp/some@weird#name!.md");
1466        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1467        assert_eq!(name, "someweirdname");
1468        assert!(!truncated);
1469        assert!(original.is_none());
1470    }
1471
1472    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1473    // accented characters instead of dropping them entirely.
1474    #[test]
1475    fn derive_kebab_folds_accented_letters_to_ascii() {
1476        let p = PathBuf::from("/tmp/açaí.md");
1477        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1478        assert_eq!(name, "acai", "got '{name}'");
1479    }
1480
1481    #[test]
1482    fn derive_kebab_handles_naive_with_diaeresis() {
1483        let p = PathBuf::from("/tmp/naïve-test.md");
1484        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1485        assert_eq!(name, "naive-test", "got '{name}'");
1486    }
1487
1488    #[test]
1489    fn derive_kebab_drops_emoji_keeps_word() {
1490        let p = PathBuf::from("/tmp/🚀-rocket.md");
1491        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1492        assert_eq!(name, "rocket", "got '{name}'");
1493    }
1494
1495    #[test]
1496    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1497        let p = PathBuf::from("/tmp/açaí🦜.md");
1498        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1499        assert_eq!(name, "acai", "got '{name}'");
1500    }
1501
1502    #[test]
1503    fn derive_kebab_pure_emoji_yields_empty() {
1504        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1505        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1506        assert!(name.is_empty(), "got '{name}'");
1507    }
1508
1509    #[test]
1510    fn derive_kebab_collapses_consecutive_dashes() {
1511        let p = PathBuf::from("/tmp/a__b___c.md");
1512        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1513        assert_eq!(name, "a-b-c");
1514        assert!(!truncated);
1515        assert!(original.is_none());
1516    }
1517
1518    #[test]
1519    fn derive_kebab_truncates_to_60_chars() {
1520        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1521        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1522        assert!(name.len() <= 60, "got len {}", name.len());
1523        assert!(truncated);
1524        assert!(original.is_some());
1525        assert!(original.unwrap().len() > 60);
1526    }
1527
1528    #[test]
1529    fn collect_files_finds_md_files() {
1530        let tmp = tempfile::tempdir().expect("tempdir");
1531        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1532        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1533        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1534        let mut out = Vec::new();
1535        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1536        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1537    }
1538
1539    #[test]
1540    fn collect_files_recursive_descends_subdirs() {
1541        let tmp = tempfile::tempdir().expect("tempdir");
1542        let sub = tmp.path().join("sub");
1543        std::fs::create_dir(&sub).unwrap();
1544        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1545        std::fs::write(sub.join("b.md"), "y").unwrap();
1546        let mut out = Vec::new();
1547        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1548        assert_eq!(out.len(), 2);
1549    }
1550
1551    #[test]
1552    fn collect_files_non_recursive_skips_subdirs() {
1553        let tmp = tempfile::tempdir().expect("tempdir");
1554        let sub = tmp.path().join("sub");
1555        std::fs::create_dir(&sub).unwrap();
1556        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1557        std::fs::write(sub.join("b.md"), "y").unwrap();
1558        let mut out = Vec::new();
1559        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1560        assert_eq!(out.len(), 1);
1561    }
1562
1563    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1564
1565    #[test]
1566    fn derive_kebab_long_basename_truncated_within_cap() {
1567        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1568        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1569        assert!(
1570            name.len() <= DERIVED_NAME_MAX_LEN,
1571            "truncated name must respect cap; got {} chars",
1572            name.len()
1573        );
1574        assert!(!name.is_empty());
1575        assert!(truncated);
1576        assert!(original.is_some());
1577    }
1578
1579    #[test]
1580    fn unique_name_returns_base_when_free() {
1581        let taken: BTreeSet<String> = BTreeSet::new();
1582        let resolved = unique_name("note", &taken).expect("must resolve");
1583        assert_eq!(resolved, "note");
1584    }
1585
1586    #[test]
1587    fn unique_name_appends_first_free_suffix_on_collision() {
1588        let mut taken: BTreeSet<String> = BTreeSet::new();
1589        taken.insert("note".to_string());
1590        taken.insert("note-1".to_string());
1591        let resolved = unique_name("note", &taken).expect("must resolve");
1592        assert_eq!(resolved, "note-2");
1593    }
1594
1595    #[test]
1596    fn unique_name_errors_after_collision_cap() {
1597        let mut taken: BTreeSet<String> = BTreeSet::new();
1598        taken.insert("note".to_string());
1599        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1600            taken.insert(format!("note-{i}"));
1601        }
1602        let err = unique_name("note", &taken).expect_err("must surface error");
1603        assert!(matches!(err, AppError::Validation(_)));
1604    }
1605
1606    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1607
1608    #[test]
1609    fn validate_relation_format_accepts_valid_relations() {
1610        use crate::parsers::{is_canonical_relation, validate_relation_format};
1611        assert!(validate_relation_format("applies_to").is_ok());
1612        assert!(validate_relation_format("depends_on").is_ok());
1613        assert!(validate_relation_format("implements").is_ok());
1614        assert!(validate_relation_format("").is_err());
1615        assert!(is_canonical_relation("applies_to"));
1616        assert!(!is_canonical_relation("implements"));
1617    }
1618
1619    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1620
1621    use serial_test::serial;
1622
1623    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1624    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1625        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1626        let prev = std::env::var(key).ok();
1627        match value {
1628            Some(v) => std::env::set_var(key, v),
1629            None => std::env::remove_var(key),
1630        }
1631        f();
1632        match prev {
1633            Some(p) => std::env::set_var(key, p),
1634            None => std::env::remove_var(key),
1635        }
1636    }
1637
1638    #[test]
1639    #[serial]
1640    fn env_low_memory_enabled_unset_returns_false() {
1641        with_env_var(None, || assert!(!env_low_memory_enabled()));
1642    }
1643
1644    #[test]
1645    #[serial]
1646    fn env_low_memory_enabled_empty_returns_false() {
1647        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1648    }
1649
1650    #[test]
1651    #[serial]
1652    fn env_low_memory_enabled_truthy_values_return_true() {
1653        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1654            with_env_var(Some(v), || {
1655                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1656            });
1657        }
1658    }
1659
1660    #[test]
1661    #[serial]
1662    fn env_low_memory_enabled_falsy_values_return_false() {
1663        for v in ["0", "false", "FALSE", "no", "off"] {
1664            with_env_var(Some(v), || {
1665                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1666            });
1667        }
1668    }
1669
1670    #[test]
1671    #[serial]
1672    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1673        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1674    }
1675
1676    #[test]
1677    #[serial]
1678    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1679        with_env_var(None, || {
1680            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1681            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1682            assert_eq!(resolve_parallelism(true, None), 1);
1683        });
1684    }
1685
1686    #[test]
1687    #[serial]
1688    fn resolve_parallelism_env_forces_one_when_flag_off() {
1689        with_env_var(Some("1"), || {
1690            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1691            assert_eq!(resolve_parallelism(false, None), 1);
1692        });
1693    }
1694
1695    #[test]
1696    #[serial]
1697    fn resolve_parallelism_falsy_env_does_not_override() {
1698        with_env_var(Some("0"), || {
1699            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1700        });
1701    }
1702
1703    #[test]
1704    #[serial]
1705    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1706        with_env_var(None, || {
1707            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1708            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1709        });
1710    }
1711
1712    #[test]
1713    #[serial]
1714    fn resolve_parallelism_default_when_unset() {
1715        with_env_var(None, || {
1716            let p = resolve_parallelism(false, None);
1717            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1718        });
1719    }
1720
1721    #[test]
1722    fn ingest_args_parses_low_memory_flag_via_clap() {
1723        use clap::Parser;
1724        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1725        // on the public `Cli` definition so the flag is wired end-to-end.
1726        let cli = crate::cli::Cli::try_parse_from([
1727            "sqlite-graphrag",
1728            "ingest",
1729            "/tmp/dummy",
1730            "--type",
1731            "document",
1732            "--low-memory",
1733        ])
1734        .expect("parse must succeed");
1735        match cli.command {
1736            crate::cli::Commands::Ingest(args) => {
1737                assert!(args.low_memory, "--low-memory must set field to true");
1738            }
1739            _ => panic!("expected Ingest subcommand"),
1740        }
1741    }
1742
1743    #[test]
1744    fn ingest_args_low_memory_defaults_false() {
1745        use clap::Parser;
1746        let cli = crate::cli::Cli::try_parse_from([
1747            "sqlite-graphrag",
1748            "ingest",
1749            "/tmp/dummy",
1750            "--type",
1751            "document",
1752        ])
1753        .expect("parse must succeed");
1754        match cli.command {
1755            crate::cli::Commands::Ingest(args) => {
1756                assert!(!args.low_memory, "default must be false");
1757            }
1758            _ => panic!("expected Ingest subcommand"),
1759        }
1760    }
1761}