Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68NOTES:\n  \
69    Each file becomes a separate memory. Names derive from file basenames\n  \
70    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
71    followed by a final summary line with counts. Per-file errors are reported\n  \
72    inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74    /// Directory containing files to ingest.
75    #[arg(
76        value_name = "DIR",
77        help = "Directory to ingest recursively (each matching file becomes a memory)"
78    )]
79    pub dir: PathBuf,
80
81    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
82    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83    pub r#type: MemoryType,
84
85    /// Glob pattern matched against file basenames (default: `*.md`). Supports
86    /// `*.<ext>`, `<prefix>*`, and exact filename match.
87    #[arg(long, default_value = "*.md")]
88    pub pattern: String,
89
90    /// Recurse into subdirectories.
91    #[arg(long, default_value_t = false)]
92    pub recursive: bool,
93
94    #[arg(
95        long,
96        env = "SQLITE_GRAPHRAG_ENABLE_NER",
97        value_parser = crate::parsers::parse_bool_flexible,
98        action = clap::ArgAction::Set,
99        num_args = 0..=1,
100        default_missing_value = "true",
101        default_value = "false",
102        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
103    )]
104    pub enable_ner: bool,
105    #[arg(
106        long,
107        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
108        default_value = "fp32",
109        help = "GLiNER model variant: fp32 (best quality, 1.1GB), fp16 (580MB), int8 (349MB, fastest)"
110    )]
111    pub gliner_variant: String,
112
113    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
114    #[arg(long, default_value_t = false, hide = true)]
115    pub skip_extraction: bool,
116
117    /// Stop on first per-file error instead of continuing with the next file.
118    #[arg(long, default_value_t = false)]
119    pub fail_fast: bool,
120
121    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
122    #[arg(long, default_value_t = 10_000)]
123    pub max_files: usize,
124
125    /// Namespace for the ingested memories.
126    #[arg(long)]
127    pub namespace: Option<String>,
128
129    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
130    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
131    pub db: Option<String>,
132
133    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
134    pub format: JsonOutputFormat,
135
136    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
137    pub json: bool,
138
139    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
140    #[arg(
141        long,
142        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
143    )]
144    pub ingest_parallelism: Option<usize>,
145
146    /// Force single-threaded ingest to reduce RSS pressure.
147    ///
148    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
149    /// explicit value. Recommended for environments with <4 GB available
150    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
151    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
152    /// (CLI flag has higher precedence than the env var).
153    #[arg(
154        long,
155        default_value_t = false,
156        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
157                Recommended for environments with <4 GB available RAM or container/cgroup \
158                constraints. Trade-off: 3-4x longer wall time. Also honored via \
159                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
160    )]
161    pub low_memory: bool,
162}
163
164/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
165/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
166/// values evaluate to false. Unrecognized non-empty values emit a
167/// `tracing::warn!` and evaluate to false.
168fn env_low_memory_enabled() -> bool {
169    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
170        Ok(v) if v.is_empty() => false,
171        Ok(v) => match v.to_lowercase().as_str() {
172            "1" | "true" | "yes" | "on" => true,
173            "0" | "false" | "no" | "off" => false,
174            other => {
175                tracing::warn!(
176                    target: "ingest",
177                    value = %other,
178                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
179                );
180                false
181            }
182        },
183        Err(_) => false,
184    }
185}
186
187/// Resolves the effective ingest parallelism honoring `--low-memory` and the
188/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
189///
190/// Precedence:
191/// 1. `--low-memory` CLI flag forces parallelism = 1.
192/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
193/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
194/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
195///
196/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
197/// emits a `tracing::warn!` advertising the override.
198fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
199    let env_flag = env_low_memory_enabled();
200    let low_memory = low_memory_flag || env_flag;
201
202    if low_memory {
203        if let Some(n) = ingest_parallelism {
204            if n > 1 {
205                tracing::warn!(
206                    target: "ingest",
207                    requested = n,
208                    "--ingest-parallelism overridden by --low-memory; using 1"
209                );
210            }
211        }
212        if low_memory_flag {
213            tracing::info!(
214                target: "ingest",
215                source = "flag",
216                "low-memory mode enabled: forcing --ingest-parallelism 1"
217            );
218        } else {
219            tracing::info!(
220                target: "ingest",
221                source = "env",
222                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
223            );
224        }
225        return 1;
226    }
227
228    ingest_parallelism
229        .unwrap_or_else(|| {
230            std::thread::available_parallelism()
231                .map(|v| v.get() / 2)
232                .unwrap_or(1)
233                .clamp(1, 4)
234        })
235        .max(1)
236}
237
238#[derive(Serialize)]
239struct IngestFileEvent<'a> {
240    file: &'a str,
241    name: &'a str,
242    status: &'a str,
243    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
244    truncated: bool,
245    /// Original derived name before truncation; only present when `truncated=true`.
246    #[serde(skip_serializing_if = "Option::is_none")]
247    original_name: Option<String>,
248    #[serde(skip_serializing_if = "Option::is_none")]
249    error: Option<String>,
250    #[serde(skip_serializing_if = "Option::is_none")]
251    memory_id: Option<i64>,
252    #[serde(skip_serializing_if = "Option::is_none")]
253    action: Option<String>,
254}
255
256#[derive(Serialize)]
257struct IngestSummary {
258    summary: bool,
259    dir: String,
260    pattern: String,
261    recursive: bool,
262    files_total: usize,
263    files_succeeded: usize,
264    files_failed: usize,
265    files_skipped: usize,
266    elapsed_ms: u64,
267}
268
269/// Outcome of a successful per-file ingest, used to build the NDJSON event.
270struct FileSuccess {
271    memory_id: i64,
272    action: String,
273}
274
275/// NDJSON progress event emitted to stderr after each file completes Phase A.
276/// Schema version 1; consumers should check `schema_version` before parsing.
277#[derive(Serialize)]
278struct StageProgressEvent<'a> {
279    schema_version: u8,
280    event: &'a str,
281    path: &'a str,
282    ms: u64,
283    entities: usize,
284    relationships: usize,
285}
286
287/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
288/// Phase B persists these to SQLite on the main thread in submission order.
289struct StagedFile {
290    body: String,
291    body_hash: String,
292    snippet: String,
293    name: String,
294    description: String,
295    embedding: Vec<f32>,
296    chunk_embeddings: Option<Vec<Vec<f32>>>,
297    chunks_info: Vec<crate::chunking::Chunk>,
298    entities: Vec<NewEntity>,
299    relationships: Vec<NewRelationship>,
300    entity_embeddings: Vec<Vec<f32>>,
301    urls: Vec<crate::extraction::ExtractedUrl>,
302}
303
304/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
305/// Never touches the database — safe to run on any rayon thread.
306fn stage_file(
307    _idx: usize,
308    path: &Path,
309    name: &str,
310    paths: &AppPaths,
311    enable_ner: bool,
312    gliner_variant: crate::extraction::GlinerVariant,
313) -> Result<StagedFile, AppError> {
314    use crate::constants::*;
315
316    if name.len() > MAX_MEMORY_NAME_LEN {
317        return Err(AppError::LimitExceeded(
318            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
319        ));
320    }
321    if name.starts_with("__") {
322        return Err(AppError::Validation(
323            crate::i18n::validation::reserved_name(),
324        ));
325    }
326    {
327        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
328            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
329        if !slug_re.is_match(name) {
330            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
331                name,
332            )));
333        }
334    }
335
336    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
337    if raw_body.len() > MAX_MEMORY_BODY_LEN {
338        return Err(AppError::LimitExceeded(
339            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
340        ));
341    }
342    if raw_body.trim().is_empty() {
343        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
344    }
345
346    let description = format!("ingested from {}", path.display());
347    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
348        return Err(AppError::Validation(
349            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
350        ));
351    }
352
353    let mut extracted_entities: Vec<NewEntity> = Vec::new();
354    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
355    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
356    if enable_ner {
357        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
358            Ok(extracted) => {
359                extracted_urls = extracted.urls;
360                extracted_entities = extracted.entities;
361                extracted_relationships = extracted.relationships;
362
363                if extracted_entities.len() > max_entities_per_memory() {
364                    extracted_entities.truncate(max_entities_per_memory());
365                }
366                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
367                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
368                }
369            }
370            Err(e) => {
371                tracing::warn!(
372                    file = %path.display(),
373                    "auto-extraction failed (graceful degradation): {e:#}"
374                );
375            }
376        }
377    }
378
379    for rel in &mut extracted_relationships {
380        rel.relation = rel.relation.replace('-', "_");
381        if !is_valid_relation(&rel.relation) {
382            return Err(AppError::Validation(format!(
383                "invalid relation '{}' for relationship '{}' -> '{}'",
384                rel.relation, rel.source, rel.target
385            )));
386        }
387        if !(0.0..=1.0).contains(&rel.strength) {
388            return Err(AppError::Validation(format!(
389                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
390                rel.strength, rel.source, rel.target
391            )));
392        }
393    }
394
395    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
396    let snippet: String = raw_body.chars().take(200).collect();
397
398    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
399    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
400    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
401        return Err(AppError::LimitExceeded(format!(
402            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
403            chunks_info.len(),
404            REMEMBER_MAX_SAFE_MULTI_CHUNKS
405        )));
406    }
407
408    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
409    let embedding = if chunks_info.len() == 1 {
410        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
411    } else {
412        let chunk_texts: Vec<&str> = chunks_info
413            .iter()
414            .map(|c| chunking::chunk_text(&raw_body, c))
415            .collect();
416        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
417        for chunk_text in &chunk_texts {
418            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
419                &paths.models,
420                chunk_text,
421            )?);
422        }
423        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
424        chunk_embeddings_opt = Some(chunk_embeddings);
425        aggregated
426    };
427
428    let entity_embeddings = extracted_entities
429        .iter()
430        .map(|entity| {
431            let entity_text = match &entity.description {
432                Some(desc) => format!("{} {}", entity.name, desc),
433                None => entity.name.clone(),
434            };
435            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
436        })
437        .collect::<Result<Vec<_>, _>>()?;
438
439    Ok(StagedFile {
440        body: raw_body,
441        body_hash,
442        snippet,
443        name: name.to_string(),
444        description,
445        embedding,
446        chunk_embeddings: chunk_embeddings_opt,
447        chunks_info,
448        entities: extracted_entities,
449        relationships: extracted_relationships,
450        entity_embeddings,
451        urls: extracted_urls,
452    })
453}
454
455/// Phase B: persists one `StagedFile` to the database on the main thread.
456fn persist_staged(
457    conn: &mut Connection,
458    namespace: &str,
459    memory_type: &str,
460    staged: StagedFile,
461) -> Result<FileSuccess, AppError> {
462    {
463        let active_count: u32 = conn.query_row(
464            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
465            [],
466            |r| r.get::<_, i64>(0).map(|v| v as u32),
467        )?;
468        let ns_exists: bool = conn.query_row(
469            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
470            rusqlite::params![namespace],
471            |r| r.get::<_, i64>(0).map(|v| v > 0),
472        )?;
473        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
474            return Err(AppError::NamespaceError(format!(
475                "active namespace limit of {} exceeded while creating '{namespace}'",
476                crate::constants::MAX_NAMESPACES_ACTIVE
477            )));
478        }
479    }
480
481    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
482    if existing_memory.is_some() {
483        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
484            &staged.name,
485            namespace,
486        )));
487    }
488    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
489
490    let new_memory = NewMemory {
491        namespace: namespace.to_string(),
492        name: staged.name.clone(),
493        memory_type: memory_type.to_string(),
494        description: staged.description.clone(),
495        body: staged.body,
496        body_hash: staged.body_hash,
497        session_id: None,
498        source: "agent".to_string(),
499        metadata: serde_json::json!({}),
500    };
501
502    if let Some(hash_id) = duplicate_hash_id {
503        tracing::debug!(
504            target: "ingest",
505            duplicate_memory_id = hash_id,
506            "identical body already exists; persisting a new memory anyway"
507        );
508    }
509
510    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
511
512    let memory_id = memories::insert(&tx, &new_memory)?;
513    versions::insert_version(
514        &tx,
515        memory_id,
516        1,
517        &staged.name,
518        memory_type,
519        &staged.description,
520        &new_memory.body,
521        &serde_json::to_string(&new_memory.metadata)?,
522        None,
523        "create",
524    )?;
525    memories::upsert_vec(
526        &tx,
527        memory_id,
528        namespace,
529        memory_type,
530        &staged.embedding,
531        &staged.name,
532        &staged.snippet,
533    )?;
534
535    if staged.chunks_info.len() > 1 {
536        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
537        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
538            AppError::Internal(anyhow::anyhow!(
539                "missing chunk embeddings cache on multi-chunk ingest path"
540            ))
541        })?;
542        for (i, emb) in chunk_embeddings.iter().enumerate() {
543            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
544        }
545    }
546
547    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
548        for (idx, entity) in staged.entities.iter().enumerate() {
549            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
550            let entity_embedding = &staged.entity_embeddings[idx];
551            entities::upsert_entity_vec(
552                &tx,
553                entity_id,
554                namespace,
555                entity.entity_type,
556                entity_embedding,
557                &entity.name,
558            )?;
559            entities::link_memory_entity(&tx, memory_id, entity_id)?;
560            entities::increment_degree(&tx, entity_id)?;
561        }
562        let entity_types: std::collections::HashMap<&str, EntityType> = staged
563            .entities
564            .iter()
565            .map(|entity| (entity.name.as_str(), entity.entity_type))
566            .collect();
567        for rel in &staged.relationships {
568            let source_entity = NewEntity {
569                name: rel.source.clone(),
570                entity_type: entity_types
571                    .get(rel.source.as_str())
572                    .copied()
573                    .unwrap_or(EntityType::Concept),
574                description: None,
575            };
576            let target_entity = NewEntity {
577                name: rel.target.clone(),
578                entity_type: entity_types
579                    .get(rel.target.as_str())
580                    .copied()
581                    .unwrap_or(EntityType::Concept),
582                description: None,
583            };
584            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
585            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
586            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
587            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
588        }
589    }
590
591    tx.commit()?;
592
593    if !staged.urls.is_empty() {
594        let url_entries: Vec<storage_urls::MemoryUrl> = staged
595            .urls
596            .into_iter()
597            .map(|u| storage_urls::MemoryUrl {
598                url: u.url,
599                offset: Some(u.offset as i64),
600            })
601            .collect();
602        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
603    }
604
605    Ok(FileSuccess {
606        memory_id,
607        action: "created".to_string(),
608    })
609}
610
611pub fn run(args: IngestArgs) -> Result<(), AppError> {
612    let started = std::time::Instant::now();
613
614    if !args.dir.exists() {
615        return Err(AppError::Io(std::io::Error::new(
616            std::io::ErrorKind::NotFound,
617            format!("directory not found: {}", args.dir.display()),
618        )));
619    }
620    if !args.dir.is_dir() {
621        return Err(AppError::Validation(format!(
622            "path is not a directory: {}",
623            args.dir.display()
624        )));
625    }
626
627    let mut files: Vec<PathBuf> = Vec::new();
628    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
629    files.sort();
630
631    if files.len() > args.max_files {
632        return Err(AppError::Validation(format!(
633            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
634            files.len(),
635            args.max_files
636        )));
637    }
638
639    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
640    let memory_type_str = args.r#type.as_str().to_string();
641
642    let paths = AppPaths::resolve(args.db.as_deref())?;
643    let mut conn_or_err = match init_storage(&paths) {
644        Ok(c) => Ok(c),
645        Err(e) => Err(format!("{e}")),
646    };
647
648    let mut succeeded: usize = 0;
649    let mut failed: usize = 0;
650    let mut skipped: usize = 0;
651    let total = files.len();
652
653    // Pre-resolve all names before parallelisation so Phase A workers see a
654    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
655    let mut taken_names: BTreeSet<String> = BTreeSet::new();
656
657    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
658    // ProcessItem: the data moved into the producer thread for Phase A computation.
659    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
660    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
661    // boundary into the rayon producer.
662    enum SlotMeta {
663        Skip {
664            file_str: String,
665            derived_base: String,
666            name_truncated: bool,
667            original_name: Option<String>,
668            reason: String,
669        },
670        Process {
671            file_str: String,
672            derived_name: String,
673            name_truncated: bool,
674            original_name: Option<String>,
675        },
676    }
677
678    struct ProcessItem {
679        idx: usize,
680        path: PathBuf,
681        file_str: String,
682        derived_name: String,
683    }
684
685    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
686    let mut process_items: Vec<ProcessItem> = Vec::new();
687    let mut truncations: Vec<(String, String)> = Vec::new();
688
689    for path in &files {
690        let file_str = path.to_string_lossy().into_owned();
691        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
692
693        if name_truncated {
694            if let Some(ref orig) = original_name {
695                truncations.push((orig.clone(), derived_base.clone()));
696            }
697        }
698
699        if derived_base.is_empty() {
700            slots_meta.push(SlotMeta::Skip {
701                file_str,
702                derived_base: String::new(),
703                name_truncated: false,
704                original_name: None,
705                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
706            });
707            continue;
708        }
709
710        match unique_name(&derived_base, &taken_names) {
711            Ok(derived_name) => {
712                taken_names.insert(derived_name.clone());
713                let idx = slots_meta.len();
714                process_items.push(ProcessItem {
715                    idx,
716                    path: path.clone(),
717                    file_str: file_str.clone(),
718                    derived_name: derived_name.clone(),
719                });
720                slots_meta.push(SlotMeta::Process {
721                    file_str,
722                    derived_name,
723                    name_truncated,
724                    original_name,
725                });
726            }
727            Err(e) => {
728                slots_meta.push(SlotMeta::Skip {
729                    file_str,
730                    derived_base,
731                    name_truncated,
732                    original_name,
733                    reason: e.to_string(),
734                });
735            }
736        }
737    }
738
739    if !truncations.is_empty() {
740        tracing::info!(
741            target: "ingest",
742            count = truncations.len(),
743            max_len = DERIVED_NAME_MAX_LEN,
744            "derived names truncated; pass -vv (debug) for per-file detail"
745        );
746    }
747
748    // Determine rayon thread pool size, honoring --low-memory and the
749    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
750    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
751
752    let pool = rayon::ThreadPoolBuilder::new()
753        .num_threads(parallelism)
754        .build()
755        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
756
757    if args.enable_ner && args.skip_extraction {
758        tracing::warn!(
759            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
760        );
761    }
762    if args.skip_extraction && !args.enable_ner {
763        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
764    }
765    let enable_ner = args.enable_ner;
766    let gliner_variant: crate::extraction::GlinerVariant =
767        args.gliner_variant.parse().unwrap_or_else(|e| {
768            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
769            crate::extraction::GlinerVariant::Fp32
770        });
771
772    let total_to_process = process_items.len();
773    tracing::info!(
774        target = "ingest",
775        phase = "pipeline_start",
776        files = total_to_process,
777        ingest_parallelism = parallelism,
778        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
779    );
780
781    // Bounded channel: producer never gets more than parallelism*2 items ahead of
782    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
783    // Each message carries the slot index so Phase B can look up SlotMeta in order.
784    let channel_bound = (parallelism * 2).max(1);
785    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
786
787    // Phase A: launched in a dedicated OS thread so the main thread can consume
788    // the channel concurrently. pool.install() blocks the calling thread until
789    // all rayon workers finish — if called on the main thread it would
790    // reintroduce the 2-phase blocking behaviour we are eliminating.
791    let paths_owned = paths.clone();
792    let producer_handle = std::thread::spawn(move || {
793        pool.install(|| {
794            process_items.into_par_iter().for_each(|item| {
795                let t0 = std::time::Instant::now();
796                let result = stage_file(
797                    item.idx,
798                    &item.path,
799                    &item.derived_name,
800                    &paths_owned,
801                    enable_ner,
802                    gliner_variant,
803                );
804                let elapsed_ms = t0.elapsed().as_millis() as u64;
805
806                // Emit NDJSON progress event to stderr so the user sees work
807                // happening during long NER runs (e.g. 50 files × 27s each).
808                let (n_entities, n_relationships) = match &result {
809                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
810                    Err(_) => (0, 0),
811                };
812                let progress = StageProgressEvent {
813                    schema_version: 1,
814                    event: "file_extracted",
815                    path: &item.file_str,
816                    ms: elapsed_ms,
817                    entities: n_entities,
818                    relationships: n_relationships,
819                };
820                if let Ok(line) = serde_json::to_string(&progress) {
821                    eprintln!("{line}");
822                }
823
824                // Blocking send applies backpressure: if Phase B is slower,
825                // Phase A workers wait here instead of accumulating staged files
826                // in memory. If the receiver is dropped (fail_fast abort), ignore.
827                let _ = tx.send((item.idx, result));
828            });
829            // Explicit drop of tx signals Phase B (rx iteration) to stop.
830            drop(tx);
831        });
832    });
833
834    // Phase B: main thread persists files as results arrive from the channel.
835    // Results arrive in completion order (par_iter is unordered). We persist
836    // each file immediately on arrival — this is the key fix for B1: with the
837    // old 2-phase design the first DB write happened only after ALL files had
838    // finished Phase A. Now the first commit happens as soon as the first file
839    // completes Phase A, regardless of how many files remain.
840    //
841    // NDJSON output order follows completion order (not file-system sort order).
842    // Skip slots are emitted at the end, after all Process results are consumed.
843    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
844    // requirement than ensuring data is persisted before the user's timeout fires.
845    let fail_fast = args.fail_fast;
846
847    // Emit pending Skip events first so agents see them early.
848    for meta in &slots_meta {
849        if let SlotMeta::Skip {
850            file_str,
851            derived_base,
852            name_truncated,
853            original_name,
854            reason,
855        } = meta
856        {
857            output::emit_json_compact(&IngestFileEvent {
858                file: file_str,
859                name: derived_base,
860                status: "skipped",
861                truncated: *name_truncated,
862                original_name: original_name.clone(),
863                error: Some(reason.clone()),
864                memory_id: None,
865                action: None,
866            })?;
867            skipped += 1;
868        }
869    }
870
871    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
872    // as channel messages arrive in completion order.
873    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
874        .iter()
875        .enumerate()
876        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
877        .collect();
878
879    tracing::info!(
880        target = "ingest",
881        phase = "persist_start",
882        files = total_to_process,
883        "phase B starting: persisting files incrementally as Phase A completes each one",
884    );
885
886    // Drain channel and persist each file immediately — no accumulation into a
887    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
888    // Phase B without applying backpressure.
889    for (idx, stage_result) in rx {
890        let meta = meta_index.get(&idx).ok_or_else(|| {
891            AppError::Internal(anyhow::anyhow!(
892                "channel idx {idx} has no corresponding Process slot"
893            ))
894        })?;
895        let (file_str, derived_name, name_truncated, original_name) = match meta {
896            SlotMeta::Process {
897                file_str,
898                derived_name,
899                name_truncated,
900                original_name,
901            } => (file_str, derived_name, name_truncated, original_name),
902            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
903        };
904
905        // If storage init failed, every file fails with the same error.
906        let conn = match conn_or_err.as_mut() {
907            Ok(c) => c,
908            Err(err_msg) => {
909                let err_clone = err_msg.clone();
910                output::emit_json_compact(&IngestFileEvent {
911                    file: file_str,
912                    name: derived_name,
913                    status: "failed",
914                    truncated: *name_truncated,
915                    original_name: original_name.clone(),
916                    error: Some(err_clone.clone()),
917                    memory_id: None,
918                    action: None,
919                })?;
920                failed += 1;
921                if fail_fast {
922                    output::emit_json_compact(&IngestSummary {
923                        summary: true,
924                        dir: args.dir.display().to_string(),
925                        pattern: args.pattern.clone(),
926                        recursive: args.recursive,
927                        files_total: total,
928                        files_succeeded: succeeded,
929                        files_failed: failed,
930                        files_skipped: skipped,
931                        elapsed_ms: started.elapsed().as_millis() as u64,
932                    })?;
933                    return Err(AppError::Validation(format!(
934                        "ingest aborted on first failure: {err_clone}"
935                    )));
936                }
937                continue;
938            }
939        };
940
941        let outcome =
942            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
943
944        match outcome {
945            Ok(FileSuccess { memory_id, action }) => {
946                output::emit_json_compact(&IngestFileEvent {
947                    file: file_str,
948                    name: derived_name,
949                    status: "indexed",
950                    truncated: *name_truncated,
951                    original_name: original_name.clone(),
952                    error: None,
953                    memory_id: Some(memory_id),
954                    action: Some(action),
955                })?;
956                succeeded += 1;
957            }
958            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
959                output::emit_json_compact(&IngestFileEvent {
960                    file: file_str,
961                    name: derived_name,
962                    status: "skipped",
963                    truncated: *name_truncated,
964                    original_name: original_name.clone(),
965                    error: Some(format!("{e}")),
966                    memory_id: None,
967                    action: Some("duplicate".to_string()),
968                })?;
969                skipped += 1;
970            }
971            Err(e) => {
972                let err_msg = format!("{e}");
973                output::emit_json_compact(&IngestFileEvent {
974                    file: file_str,
975                    name: derived_name,
976                    status: "failed",
977                    truncated: *name_truncated,
978                    original_name: original_name.clone(),
979                    error: Some(err_msg.clone()),
980                    memory_id: None,
981                    action: None,
982                })?;
983                failed += 1;
984                if fail_fast {
985                    output::emit_json_compact(&IngestSummary {
986                        summary: true,
987                        dir: args.dir.display().to_string(),
988                        pattern: args.pattern.clone(),
989                        recursive: args.recursive,
990                        files_total: total,
991                        files_succeeded: succeeded,
992                        files_failed: failed,
993                        files_skipped: skipped,
994                        elapsed_ms: started.elapsed().as_millis() as u64,
995                    })?;
996                    return Err(AppError::Validation(format!(
997                        "ingest aborted on first failure: {err_msg}"
998                    )));
999                }
1000            }
1001        }
1002    }
1003
1004    // Wait for the producer thread to finish cleanly.
1005    producer_handle
1006        .join()
1007        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1008
1009    output::emit_json_compact(&IngestSummary {
1010        summary: true,
1011        dir: args.dir.display().to_string(),
1012        pattern: args.pattern.clone(),
1013        recursive: args.recursive,
1014        files_total: total,
1015        files_succeeded: succeeded,
1016        files_failed: failed,
1017        files_skipped: skipped,
1018        elapsed_ms: started.elapsed().as_millis() as u64,
1019    })?;
1020
1021    Ok(())
1022}
1023
1024/// Auto-initialises the database (matches the contract of every other CRUD
1025/// handler) and returns a fresh read/write connection ready for the ingest
1026/// loop. Errors here are recoverable per-file: the caller surfaces them as
1027/// failure events so `--fail-fast` and the continue-on-error path keep
1028/// working when, for example, the user points `--db` at an unwritable path.
1029fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1030    ensure_db_ready(paths)?;
1031    let conn = open_rw(&paths.db)?;
1032    Ok(conn)
1033}
1034
1035fn is_valid_relation(relation: &str) -> bool {
1036    matches!(
1037        relation,
1038        "applies_to"
1039            | "uses"
1040            | "depends_on"
1041            | "causes"
1042            | "fixes"
1043            | "contradicts"
1044            | "supports"
1045            | "follows"
1046            | "related"
1047            | "mentions"
1048            | "replaces"
1049            | "tracked_in"
1050    )
1051}
1052
1053fn collect_files(
1054    dir: &Path,
1055    pattern: &str,
1056    recursive: bool,
1057    out: &mut Vec<PathBuf>,
1058) -> Result<(), AppError> {
1059    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1060    for entry in entries {
1061        let entry = entry.map_err(AppError::Io)?;
1062        let path = entry.path();
1063        let file_type = entry.file_type().map_err(AppError::Io)?;
1064        if file_type.is_file() {
1065            let name = entry.file_name();
1066            let name_str = name.to_string_lossy();
1067            if matches_pattern(&name_str, pattern) {
1068                out.push(path);
1069            }
1070        } else if file_type.is_dir() && recursive {
1071            collect_files(&path, pattern, recursive, out)?;
1072        }
1073    }
1074    Ok(())
1075}
1076
1077fn matches_pattern(name: &str, pattern: &str) -> bool {
1078    if let Some(suffix) = pattern.strip_prefix('*') {
1079        name.ends_with(suffix)
1080    } else if let Some(prefix) = pattern.strip_suffix('*') {
1081        name.starts_with(prefix)
1082    } else {
1083        name == pattern
1084    }
1085}
1086
1087/// Returns `(final_name, truncated, original_name)`.
1088/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1089/// `original_name` holds the pre-truncation name only when `truncated=true`.
1090///
1091/// Non-ASCII characters are first decomposed via NFD and then stripped of
1092/// combining marks so accented letters fold to their base ASCII letter
1093/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1094/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1095/// preserves meaningful word content rather than collapsing the basename
1096/// to a few stray ASCII letters as the previous filter did.
1097fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1098    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1099    let lowered: String = stem
1100        .nfd()
1101        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1102        .map(|c| {
1103            if c == '_' || c.is_whitespace() {
1104                '-'
1105            } else {
1106                c
1107            }
1108        })
1109        .map(|c| c.to_ascii_lowercase())
1110        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1111        .collect();
1112    let collapsed = collapse_dashes(&lowered);
1113    let trimmed = collapsed.trim_matches('-').to_string();
1114    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1115        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1116            .trim_matches('-')
1117            .to_string();
1118        tracing::debug!(
1119            target: "ingest",
1120            original = %trimmed,
1121            truncated_to = %truncated,
1122            max_len = DERIVED_NAME_MAX_LEN,
1123            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1124        );
1125        (truncated, true, Some(trimmed))
1126    } else {
1127        (trimmed, false, None)
1128    }
1129}
1130
1131/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1132/// numeric suffix (`-1`, `-2`, …) when needed.
1133///
1134/// `taken` is the set of names already consumed in the current ingest run.
1135/// The caller is expected to insert the returned name into `taken` so the
1136/// next call observes the consumption. Cross-run collisions are intentionally
1137/// surfaced by the per-file persistence path as duplicates so re-ingestion
1138/// of identical corpora stays idempotent.
1139///
1140/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1141/// candidates collide, signalling a pathological corpus that should be
1142/// renamed manually.
1143fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1144    if !taken.contains(base) {
1145        return Ok(base.to_string());
1146    }
1147    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1148        let candidate = format!("{base}-{suffix}");
1149        if !taken.contains(&candidate) {
1150            tracing::warn!(
1151                target: "ingest",
1152                base = %base,
1153                resolved = %candidate,
1154                suffix,
1155                "memory name collision resolved with numeric suffix"
1156            );
1157            return Ok(candidate);
1158        }
1159    }
1160    Err(AppError::Validation(format!(
1161        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1162    )))
1163}
1164
1165fn collapse_dashes(s: &str) -> String {
1166    let mut out = String::with_capacity(s.len());
1167    let mut prev_dash = false;
1168    for c in s.chars() {
1169        if c == '-' {
1170            if !prev_dash {
1171                out.push('-');
1172            }
1173            prev_dash = true;
1174        } else {
1175            out.push(c);
1176            prev_dash = false;
1177        }
1178    }
1179    out
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184    use super::*;
1185    use std::path::PathBuf;
1186
1187    #[test]
1188    fn matches_pattern_suffix() {
1189        assert!(matches_pattern("foo.md", "*.md"));
1190        assert!(!matches_pattern("foo.txt", "*.md"));
1191        assert!(matches_pattern("foo.md", "*"));
1192    }
1193
1194    #[test]
1195    fn matches_pattern_prefix() {
1196        assert!(matches_pattern("README.md", "README*"));
1197        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1198    }
1199
1200    #[test]
1201    fn matches_pattern_exact() {
1202        assert!(matches_pattern("README.md", "README.md"));
1203        assert!(!matches_pattern("readme.md", "README.md"));
1204    }
1205
1206    #[test]
1207    fn derive_kebab_underscore_to_dash() {
1208        let p = PathBuf::from("/tmp/claude_code_headless.md");
1209        let (name, truncated, original) = derive_kebab_name(&p);
1210        assert_eq!(name, "claude-code-headless");
1211        assert!(!truncated);
1212        assert!(original.is_none());
1213    }
1214
1215    #[test]
1216    fn derive_kebab_uppercase_lowered() {
1217        let p = PathBuf::from("/tmp/README.md");
1218        let (name, truncated, original) = derive_kebab_name(&p);
1219        assert_eq!(name, "readme");
1220        assert!(!truncated);
1221        assert!(original.is_none());
1222    }
1223
1224    #[test]
1225    fn derive_kebab_strips_non_kebab_chars() {
1226        let p = PathBuf::from("/tmp/some@weird#name!.md");
1227        let (name, truncated, original) = derive_kebab_name(&p);
1228        assert_eq!(name, "someweirdname");
1229        assert!(!truncated);
1230        assert!(original.is_none());
1231    }
1232
1233    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1234    // accented characters instead of dropping them entirely.
1235    #[test]
1236    fn derive_kebab_folds_accented_letters_to_ascii() {
1237        let p = PathBuf::from("/tmp/açaí.md");
1238        let (name, _, _) = derive_kebab_name(&p);
1239        assert_eq!(name, "acai", "got '{name}'");
1240    }
1241
1242    #[test]
1243    fn derive_kebab_handles_naive_with_diaeresis() {
1244        let p = PathBuf::from("/tmp/naïve-test.md");
1245        let (name, _, _) = derive_kebab_name(&p);
1246        assert_eq!(name, "naive-test", "got '{name}'");
1247    }
1248
1249    #[test]
1250    fn derive_kebab_drops_emoji_keeps_word() {
1251        let p = PathBuf::from("/tmp/🚀-rocket.md");
1252        let (name, _, _) = derive_kebab_name(&p);
1253        assert_eq!(name, "rocket", "got '{name}'");
1254    }
1255
1256    #[test]
1257    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1258        let p = PathBuf::from("/tmp/açaí🦜.md");
1259        let (name, _, _) = derive_kebab_name(&p);
1260        assert_eq!(name, "acai", "got '{name}'");
1261    }
1262
1263    #[test]
1264    fn derive_kebab_pure_emoji_yields_empty() {
1265        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1266        let (name, _, _) = derive_kebab_name(&p);
1267        assert!(name.is_empty(), "got '{name}'");
1268    }
1269
1270    #[test]
1271    fn derive_kebab_collapses_consecutive_dashes() {
1272        let p = PathBuf::from("/tmp/a__b___c.md");
1273        let (name, truncated, original) = derive_kebab_name(&p);
1274        assert_eq!(name, "a-b-c");
1275        assert!(!truncated);
1276        assert!(original.is_none());
1277    }
1278
1279    #[test]
1280    fn derive_kebab_truncates_to_60_chars() {
1281        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1282        let (name, truncated, original) = derive_kebab_name(&p);
1283        assert!(name.len() <= 60, "got len {}", name.len());
1284        assert!(truncated);
1285        assert!(original.is_some());
1286        assert!(original.unwrap().len() > 60);
1287    }
1288
1289    #[test]
1290    fn collect_files_finds_md_files() {
1291        let tmp = tempfile::tempdir().expect("tempdir");
1292        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1293        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1294        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1295        let mut out = Vec::new();
1296        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1297        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1298    }
1299
1300    #[test]
1301    fn collect_files_recursive_descends_subdirs() {
1302        let tmp = tempfile::tempdir().expect("tempdir");
1303        let sub = tmp.path().join("sub");
1304        std::fs::create_dir(&sub).unwrap();
1305        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1306        std::fs::write(sub.join("b.md"), "y").unwrap();
1307        let mut out = Vec::new();
1308        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1309        assert_eq!(out.len(), 2);
1310    }
1311
1312    #[test]
1313    fn collect_files_non_recursive_skips_subdirs() {
1314        let tmp = tempfile::tempdir().expect("tempdir");
1315        let sub = tmp.path().join("sub");
1316        std::fs::create_dir(&sub).unwrap();
1317        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1318        std::fs::write(sub.join("b.md"), "y").unwrap();
1319        let mut out = Vec::new();
1320        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1321        assert_eq!(out.len(), 1);
1322    }
1323
1324    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1325
1326    #[test]
1327    fn derive_kebab_long_basename_truncated_within_cap() {
1328        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1329        let (name, truncated, original) = derive_kebab_name(&p);
1330        assert!(
1331            name.len() <= DERIVED_NAME_MAX_LEN,
1332            "truncated name must respect cap; got {} chars",
1333            name.len()
1334        );
1335        assert!(!name.is_empty());
1336        assert!(truncated);
1337        assert!(original.is_some());
1338    }
1339
1340    #[test]
1341    fn unique_name_returns_base_when_free() {
1342        let taken: BTreeSet<String> = BTreeSet::new();
1343        let resolved = unique_name("note", &taken).expect("must resolve");
1344        assert_eq!(resolved, "note");
1345    }
1346
1347    #[test]
1348    fn unique_name_appends_first_free_suffix_on_collision() {
1349        let mut taken: BTreeSet<String> = BTreeSet::new();
1350        taken.insert("note".to_string());
1351        taken.insert("note-1".to_string());
1352        let resolved = unique_name("note", &taken).expect("must resolve");
1353        assert_eq!(resolved, "note-2");
1354    }
1355
1356    #[test]
1357    fn unique_name_errors_after_collision_cap() {
1358        let mut taken: BTreeSet<String> = BTreeSet::new();
1359        taken.insert("note".to_string());
1360        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1361            taken.insert(format!("note-{i}"));
1362        }
1363        let err = unique_name("note", &taken).expect_err("must surface error");
1364        assert!(matches!(err, AppError::Validation(_)));
1365    }
1366
1367    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1368
1369    #[test]
1370    fn is_valid_relation_accepts_canonical_relations() {
1371        assert!(is_valid_relation("applies_to"));
1372        assert!(is_valid_relation("depends_on"));
1373        assert!(!is_valid_relation("foo_bar"));
1374    }
1375
1376    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1377
1378    use serial_test::serial;
1379
1380    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1381    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1382        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1383        let prev = std::env::var(key).ok();
1384        match value {
1385            Some(v) => std::env::set_var(key, v),
1386            None => std::env::remove_var(key),
1387        }
1388        f();
1389        match prev {
1390            Some(p) => std::env::set_var(key, p),
1391            None => std::env::remove_var(key),
1392        }
1393    }
1394
1395    #[test]
1396    #[serial]
1397    fn env_low_memory_enabled_unset_returns_false() {
1398        with_env_var(None, || assert!(!env_low_memory_enabled()));
1399    }
1400
1401    #[test]
1402    #[serial]
1403    fn env_low_memory_enabled_empty_returns_false() {
1404        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1405    }
1406
1407    #[test]
1408    #[serial]
1409    fn env_low_memory_enabled_truthy_values_return_true() {
1410        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1411            with_env_var(Some(v), || {
1412                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1413            });
1414        }
1415    }
1416
1417    #[test]
1418    #[serial]
1419    fn env_low_memory_enabled_falsy_values_return_false() {
1420        for v in ["0", "false", "FALSE", "no", "off"] {
1421            with_env_var(Some(v), || {
1422                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1423            });
1424        }
1425    }
1426
1427    #[test]
1428    #[serial]
1429    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1430        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1431    }
1432
1433    #[test]
1434    #[serial]
1435    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1436        with_env_var(None, || {
1437            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1438            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1439            assert_eq!(resolve_parallelism(true, None), 1);
1440        });
1441    }
1442
1443    #[test]
1444    #[serial]
1445    fn resolve_parallelism_env_forces_one_when_flag_off() {
1446        with_env_var(Some("1"), || {
1447            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1448            assert_eq!(resolve_parallelism(false, None), 1);
1449        });
1450    }
1451
1452    #[test]
1453    #[serial]
1454    fn resolve_parallelism_falsy_env_does_not_override() {
1455        with_env_var(Some("0"), || {
1456            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1457        });
1458    }
1459
1460    #[test]
1461    #[serial]
1462    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1463        with_env_var(None, || {
1464            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1465            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1466        });
1467    }
1468
1469    #[test]
1470    #[serial]
1471    fn resolve_parallelism_default_when_unset() {
1472        with_env_var(None, || {
1473            let p = resolve_parallelism(false, None);
1474            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1475        });
1476    }
1477
1478    #[test]
1479    fn ingest_args_parses_low_memory_flag_via_clap() {
1480        use clap::Parser;
1481        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1482        // on the public `Cli` definition so the flag is wired end-to-end.
1483        let cli = crate::cli::Cli::try_parse_from([
1484            "sqlite-graphrag",
1485            "ingest",
1486            "/tmp/dummy",
1487            "--type",
1488            "document",
1489            "--low-memory",
1490        ])
1491        .expect("parse must succeed");
1492        match cli.command {
1493            crate::cli::Commands::Ingest(args) => {
1494                assert!(args.low_memory, "--low-memory must set field to true");
1495            }
1496            _ => panic!("expected Ingest subcommand"),
1497        }
1498    }
1499
1500    #[test]
1501    fn ingest_args_low_memory_defaults_false() {
1502        use clap::Parser;
1503        let cli = crate::cli::Cli::try_parse_from([
1504            "sqlite-graphrag",
1505            "ingest",
1506            "/tmp/dummy",
1507            "--type",
1508            "document",
1509        ])
1510        .expect("parse must succeed");
1511        match cli.command {
1512            crate::cli::Commands::Ingest(args) => {
1513                assert!(!args.low_memory, "default must be false");
1514            }
1515            _ => panic!("expected Ingest subcommand"),
1516        }
1517    }
1518}