Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Skip BERT NER auto-extraction for faster bulk import\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
68NOTES:\n  \
69    Each file becomes a separate memory. Names derive from file basenames\n  \
70    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
71    followed by a final summary line with counts. Per-file errors are reported\n  \
72    inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74    /// Directory containing files to ingest.
75    #[arg(
76        value_name = "DIR",
77        help = "Directory to ingest recursively (each matching file becomes a memory)"
78    )]
79    pub dir: PathBuf,
80
81    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
82    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83    pub r#type: MemoryType,
84
85    /// Glob pattern matched against file basenames (default: `*.md`). Supports
86    /// `*.<ext>`, `<prefix>*`, and exact filename match.
87    #[arg(long, default_value = "*.md")]
88    pub pattern: String,
89
90    /// Recurse into subdirectories.
91    #[arg(long, default_value_t = false)]
92    pub recursive: bool,
93
94    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
95    #[arg(long, default_value_t = false)]
96    pub skip_extraction: bool,
97
98    /// Stop on first per-file error instead of continuing with the next file.
99    #[arg(long, default_value_t = false)]
100    pub fail_fast: bool,
101
102    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
103    #[arg(long, default_value_t = 10_000)]
104    pub max_files: usize,
105
106    /// Namespace for the ingested memories.
107    #[arg(long)]
108    pub namespace: Option<String>,
109
110    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
111    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
112    pub db: Option<String>,
113
114    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
115    pub format: JsonOutputFormat,
116
117    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
118    pub json: bool,
119
120    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
121    #[arg(
122        long,
123        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
124    )]
125    pub ingest_parallelism: Option<usize>,
126
127    /// Force single-threaded ingest to reduce RSS pressure.
128    ///
129    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
130    /// explicit value. Recommended for environments with <4 GB available
131    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
132    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
133    /// (CLI flag has higher precedence than the env var).
134    #[arg(
135        long,
136        default_value_t = false,
137        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
138                Recommended for environments with <4 GB available RAM or container/cgroup \
139                constraints. Trade-off: 3-4x longer wall time. Also honored via \
140                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
141    )]
142    pub low_memory: bool,
143}
144
145/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
146/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
147/// values evaluate to false. Unrecognized non-empty values emit a
148/// `tracing::warn!` and evaluate to false.
149fn env_low_memory_enabled() -> bool {
150    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
151        Ok(v) if v.is_empty() => false,
152        Ok(v) => match v.to_lowercase().as_str() {
153            "1" | "true" | "yes" | "on" => true,
154            "0" | "false" | "no" | "off" => false,
155            other => {
156                tracing::warn!(
157                    target: "ingest",
158                    value = %other,
159                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
160                );
161                false
162            }
163        },
164        Err(_) => false,
165    }
166}
167
168/// Resolves the effective ingest parallelism honoring `--low-memory` and the
169/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
170///
171/// Precedence:
172/// 1. `--low-memory` CLI flag forces parallelism = 1.
173/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
174/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
175/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
176///
177/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
178/// emits a `tracing::warn!` advertising the override.
179fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
180    let env_flag = env_low_memory_enabled();
181    let low_memory = low_memory_flag || env_flag;
182
183    if low_memory {
184        if let Some(n) = ingest_parallelism {
185            if n > 1 {
186                tracing::warn!(
187                    target: "ingest",
188                    requested = n,
189                    "--ingest-parallelism overridden by --low-memory; using 1"
190                );
191            }
192        }
193        if low_memory_flag {
194            tracing::info!(
195                target: "ingest",
196                source = "flag",
197                "low-memory mode enabled: forcing --ingest-parallelism 1"
198            );
199        } else {
200            tracing::info!(
201                target: "ingest",
202                source = "env",
203                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
204            );
205        }
206        return 1;
207    }
208
209    ingest_parallelism
210        .unwrap_or_else(|| {
211            std::thread::available_parallelism()
212                .map(|v| v.get() / 2)
213                .unwrap_or(1)
214                .clamp(1, 4)
215        })
216        .max(1)
217}
218
219#[derive(Serialize)]
220struct IngestFileEvent<'a> {
221    file: &'a str,
222    name: &'a str,
223    status: &'a str,
224    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
225    truncated: bool,
226    /// Original derived name before truncation; only present when `truncated=true`.
227    #[serde(skip_serializing_if = "Option::is_none")]
228    original_name: Option<String>,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    error: Option<String>,
231    #[serde(skip_serializing_if = "Option::is_none")]
232    memory_id: Option<i64>,
233    #[serde(skip_serializing_if = "Option::is_none")]
234    action: Option<String>,
235}
236
237#[derive(Serialize)]
238struct IngestSummary {
239    summary: bool,
240    dir: String,
241    pattern: String,
242    recursive: bool,
243    files_total: usize,
244    files_succeeded: usize,
245    files_failed: usize,
246    files_skipped: usize,
247    elapsed_ms: u64,
248}
249
250/// Outcome of a successful per-file ingest, used to build the NDJSON event.
251struct FileSuccess {
252    memory_id: i64,
253    action: String,
254}
255
256/// NDJSON progress event emitted to stderr after each file completes Phase A.
257/// Schema version 1; consumers should check `schema_version` before parsing.
258#[derive(Serialize)]
259struct StageProgressEvent<'a> {
260    schema_version: u8,
261    event: &'a str,
262    path: &'a str,
263    ms: u64,
264    entities: usize,
265    relationships: usize,
266}
267
268/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
269/// Phase B persists these to SQLite on the main thread in submission order.
270struct StagedFile {
271    body: String,
272    body_hash: String,
273    snippet: String,
274    name: String,
275    description: String,
276    embedding: Vec<f32>,
277    chunk_embeddings: Option<Vec<Vec<f32>>>,
278    chunks_info: Vec<crate::chunking::Chunk>,
279    entities: Vec<NewEntity>,
280    relationships: Vec<NewRelationship>,
281    entity_embeddings: Vec<Vec<f32>>,
282    urls: Vec<crate::extraction::ExtractedUrl>,
283}
284
285/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
286/// Never touches the database — safe to run on any rayon thread.
287fn stage_file(
288    _idx: usize,
289    path: &Path,
290    name: &str,
291    paths: &AppPaths,
292    skip_extraction: bool,
293) -> Result<StagedFile, AppError> {
294    use crate::constants::*;
295
296    if name.len() > MAX_MEMORY_NAME_LEN {
297        return Err(AppError::LimitExceeded(
298            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
299        ));
300    }
301    if name.starts_with("__") {
302        return Err(AppError::Validation(
303            crate::i18n::validation::reserved_name(),
304        ));
305    }
306    {
307        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
308            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
309        if !slug_re.is_match(name) {
310            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
311                name,
312            )));
313        }
314    }
315
316    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
317    if raw_body.len() > MAX_MEMORY_BODY_LEN {
318        return Err(AppError::LimitExceeded(
319            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
320        ));
321    }
322    if raw_body.trim().is_empty() {
323        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
324    }
325
326    let description = format!("ingested from {}", path.display());
327    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
328        return Err(AppError::Validation(
329            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
330        ));
331    }
332
333    let mut extracted_entities: Vec<NewEntity> = Vec::new();
334    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
335    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
336    if !skip_extraction {
337        match crate::extraction::extract_graph_auto(&raw_body, paths) {
338            Ok(extracted) => {
339                extracted_urls = extracted.urls;
340                extracted_entities = extracted.entities;
341                extracted_relationships = extracted.relationships;
342
343                if extracted_entities.len() > max_entities_per_memory() {
344                    extracted_entities.truncate(max_entities_per_memory());
345                }
346                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
347                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
348                }
349            }
350            Err(e) => {
351                tracing::warn!(
352                    file = %path.display(),
353                    "auto-extraction failed (graceful degradation): {e:#}"
354                );
355            }
356        }
357    }
358
359    for rel in &mut extracted_relationships {
360        rel.relation = rel.relation.replace('-', "_");
361        if !is_valid_relation(&rel.relation) {
362            return Err(AppError::Validation(format!(
363                "invalid relation '{}' for relationship '{}' -> '{}'",
364                rel.relation, rel.source, rel.target
365            )));
366        }
367        if !(0.0..=1.0).contains(&rel.strength) {
368            return Err(AppError::Validation(format!(
369                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
370                rel.strength, rel.source, rel.target
371            )));
372        }
373    }
374
375    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
376    let snippet: String = raw_body.chars().take(200).collect();
377
378    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
379    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
380    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
381        return Err(AppError::LimitExceeded(format!(
382            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
383            chunks_info.len(),
384            REMEMBER_MAX_SAFE_MULTI_CHUNKS
385        )));
386    }
387
388    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
389    let embedding = if chunks_info.len() == 1 {
390        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
391    } else {
392        let chunk_texts: Vec<&str> = chunks_info
393            .iter()
394            .map(|c| chunking::chunk_text(&raw_body, c))
395            .collect();
396        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
397        for chunk_text in &chunk_texts {
398            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
399                &paths.models,
400                chunk_text,
401            )?);
402        }
403        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
404        chunk_embeddings_opt = Some(chunk_embeddings);
405        aggregated
406    };
407
408    let entity_embeddings = extracted_entities
409        .iter()
410        .map(|entity| {
411            let entity_text = match &entity.description {
412                Some(desc) => format!("{} {}", entity.name, desc),
413                None => entity.name.clone(),
414            };
415            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
416        })
417        .collect::<Result<Vec<_>, _>>()?;
418
419    Ok(StagedFile {
420        body: raw_body,
421        body_hash,
422        snippet,
423        name: name.to_string(),
424        description,
425        embedding,
426        chunk_embeddings: chunk_embeddings_opt,
427        chunks_info,
428        entities: extracted_entities,
429        relationships: extracted_relationships,
430        entity_embeddings,
431        urls: extracted_urls,
432    })
433}
434
435/// Phase B: persists one `StagedFile` to the database on the main thread.
436fn persist_staged(
437    conn: &mut Connection,
438    namespace: &str,
439    memory_type: &str,
440    staged: StagedFile,
441) -> Result<FileSuccess, AppError> {
442    {
443        let active_count: u32 = conn.query_row(
444            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
445            [],
446            |r| r.get::<_, i64>(0).map(|v| v as u32),
447        )?;
448        let ns_exists: bool = conn.query_row(
449            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
450            rusqlite::params![namespace],
451            |r| r.get::<_, i64>(0).map(|v| v > 0),
452        )?;
453        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
454            return Err(AppError::NamespaceError(format!(
455                "active namespace limit of {} exceeded while creating '{namespace}'",
456                crate::constants::MAX_NAMESPACES_ACTIVE
457            )));
458        }
459    }
460
461    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
462    if existing_memory.is_some() {
463        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
464            &staged.name,
465            namespace,
466        )));
467    }
468    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
469
470    let new_memory = NewMemory {
471        namespace: namespace.to_string(),
472        name: staged.name.clone(),
473        memory_type: memory_type.to_string(),
474        description: staged.description.clone(),
475        body: staged.body,
476        body_hash: staged.body_hash,
477        session_id: None,
478        source: "agent".to_string(),
479        metadata: serde_json::json!({}),
480    };
481
482    if let Some(hash_id) = duplicate_hash_id {
483        tracing::debug!(
484            target: "ingest",
485            duplicate_memory_id = hash_id,
486            "identical body already exists; persisting a new memory anyway"
487        );
488    }
489
490    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
491
492    let memory_id = memories::insert(&tx, &new_memory)?;
493    versions::insert_version(
494        &tx,
495        memory_id,
496        1,
497        &staged.name,
498        memory_type,
499        &staged.description,
500        &new_memory.body,
501        &serde_json::to_string(&new_memory.metadata)?,
502        None,
503        "create",
504    )?;
505    memories::upsert_vec(
506        &tx,
507        memory_id,
508        namespace,
509        memory_type,
510        &staged.embedding,
511        &staged.name,
512        &staged.snippet,
513    )?;
514
515    if staged.chunks_info.len() > 1 {
516        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
517        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
518            AppError::Internal(anyhow::anyhow!(
519                "missing chunk embeddings cache on multi-chunk ingest path"
520            ))
521        })?;
522        for (i, emb) in chunk_embeddings.iter().enumerate() {
523            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
524        }
525    }
526
527    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
528        for (idx, entity) in staged.entities.iter().enumerate() {
529            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
530            let entity_embedding = &staged.entity_embeddings[idx];
531            entities::upsert_entity_vec(
532                &tx,
533                entity_id,
534                namespace,
535                entity.entity_type,
536                entity_embedding,
537                &entity.name,
538            )?;
539            entities::link_memory_entity(&tx, memory_id, entity_id)?;
540            entities::increment_degree(&tx, entity_id)?;
541        }
542        let entity_types: std::collections::HashMap<&str, EntityType> = staged
543            .entities
544            .iter()
545            .map(|entity| (entity.name.as_str(), entity.entity_type))
546            .collect();
547        for rel in &staged.relationships {
548            let source_entity = NewEntity {
549                name: rel.source.clone(),
550                entity_type: entity_types
551                    .get(rel.source.as_str())
552                    .copied()
553                    .unwrap_or(EntityType::Concept),
554                description: None,
555            };
556            let target_entity = NewEntity {
557                name: rel.target.clone(),
558                entity_type: entity_types
559                    .get(rel.target.as_str())
560                    .copied()
561                    .unwrap_or(EntityType::Concept),
562                description: None,
563            };
564            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
565            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
566            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
567            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
568        }
569    }
570
571    tx.commit()?;
572
573    if !staged.urls.is_empty() {
574        let url_entries: Vec<storage_urls::MemoryUrl> = staged
575            .urls
576            .into_iter()
577            .map(|u| storage_urls::MemoryUrl {
578                url: u.url,
579                offset: Some(u.offset as i64),
580            })
581            .collect();
582        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
583    }
584
585    Ok(FileSuccess {
586        memory_id,
587        action: "created".to_string(),
588    })
589}
590
591pub fn run(args: IngestArgs) -> Result<(), AppError> {
592    let started = std::time::Instant::now();
593
594    if !args.dir.exists() {
595        return Err(AppError::NotFound(format!(
596            "directory not found: {}",
597            args.dir.display()
598        )));
599    }
600    if !args.dir.is_dir() {
601        return Err(AppError::Validation(format!(
602            "path is not a directory: {}",
603            args.dir.display()
604        )));
605    }
606
607    let mut files: Vec<PathBuf> = Vec::new();
608    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
609    files.sort();
610
611    if files.len() > args.max_files {
612        return Err(AppError::Validation(format!(
613            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
614            files.len(),
615            args.max_files
616        )));
617    }
618
619    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
620    let memory_type_str = args.r#type.as_str().to_string();
621
622    let paths = AppPaths::resolve(args.db.as_deref())?;
623    let mut conn_or_err = match init_storage(&paths) {
624        Ok(c) => Ok(c),
625        Err(e) => Err(format!("{e}")),
626    };
627
628    let mut succeeded: usize = 0;
629    let mut failed: usize = 0;
630    let mut skipped: usize = 0;
631    let total = files.len();
632
633    // Pre-resolve all names before parallelisation so Phase A workers see a
634    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
635    let mut taken_names: BTreeSet<String> = BTreeSet::new();
636
637    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
638    // ProcessItem: the data moved into the producer thread for Phase A computation.
639    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
640    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
641    // boundary into the rayon producer.
642    enum SlotMeta {
643        Skip {
644            file_str: String,
645            derived_base: String,
646            name_truncated: bool,
647            original_name: Option<String>,
648            reason: String,
649        },
650        Process {
651            file_str: String,
652            derived_name: String,
653            name_truncated: bool,
654            original_name: Option<String>,
655        },
656    }
657
658    struct ProcessItem {
659        idx: usize,
660        path: PathBuf,
661        file_str: String,
662        derived_name: String,
663    }
664
665    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
666    let mut process_items: Vec<ProcessItem> = Vec::new();
667
668    for path in &files {
669        let file_str = path.to_string_lossy().into_owned();
670        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
671
672        if derived_base.is_empty() {
673            slots_meta.push(SlotMeta::Skip {
674                file_str,
675                derived_base: String::new(),
676                name_truncated: false,
677                original_name: None,
678                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
679            });
680            continue;
681        }
682
683        match unique_name(&derived_base, &taken_names) {
684            Ok(derived_name) => {
685                taken_names.insert(derived_name.clone());
686                let idx = slots_meta.len();
687                process_items.push(ProcessItem {
688                    idx,
689                    path: path.clone(),
690                    file_str: file_str.clone(),
691                    derived_name: derived_name.clone(),
692                });
693                slots_meta.push(SlotMeta::Process {
694                    file_str,
695                    derived_name,
696                    name_truncated,
697                    original_name,
698                });
699            }
700            Err(e) => {
701                slots_meta.push(SlotMeta::Skip {
702                    file_str,
703                    derived_base,
704                    name_truncated,
705                    original_name,
706                    reason: e.to_string(),
707                });
708            }
709        }
710    }
711
712    // Determine rayon thread pool size, honoring --low-memory and the
713    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
714    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
715
716    let pool = rayon::ThreadPoolBuilder::new()
717        .num_threads(parallelism)
718        .build()
719        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
720
721    let skip_extraction = args.skip_extraction;
722
723    let total_to_process = process_items.len();
724    tracing::info!(
725        target = "ingest",
726        phase = "pipeline_start",
727        files = total_to_process,
728        ingest_parallelism = parallelism,
729        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
730    );
731
732    // Bounded channel: producer never gets more than parallelism*2 items ahead of
733    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
734    // Each message carries the slot index so Phase B can look up SlotMeta in order.
735    let channel_bound = (parallelism * 2).max(1);
736    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
737
738    // Phase A: launched in a dedicated OS thread so the main thread can consume
739    // the channel concurrently. pool.install() blocks the calling thread until
740    // all rayon workers finish — if called on the main thread it would
741    // reintroduce the 2-phase blocking behaviour we are eliminating.
742    let paths_owned = paths.clone();
743    let producer_handle = std::thread::spawn(move || {
744        pool.install(|| {
745            process_items.into_par_iter().for_each(|item| {
746                let t0 = std::time::Instant::now();
747                let result = stage_file(
748                    item.idx,
749                    &item.path,
750                    &item.derived_name,
751                    &paths_owned,
752                    skip_extraction,
753                );
754                let elapsed_ms = t0.elapsed().as_millis() as u64;
755
756                // Emit NDJSON progress event to stderr so the user sees work
757                // happening during long NER runs (e.g. 50 files × 27s each).
758                let (n_entities, n_relationships) = match &result {
759                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
760                    Err(_) => (0, 0),
761                };
762                let progress = StageProgressEvent {
763                    schema_version: 1,
764                    event: "file_extracted",
765                    path: &item.file_str,
766                    ms: elapsed_ms,
767                    entities: n_entities,
768                    relationships: n_relationships,
769                };
770                if let Ok(line) = serde_json::to_string(&progress) {
771                    eprintln!("{line}");
772                }
773
774                // Blocking send applies backpressure: if Phase B is slower,
775                // Phase A workers wait here instead of accumulating staged files
776                // in memory. If the receiver is dropped (fail_fast abort), ignore.
777                let _ = tx.send((item.idx, result));
778            });
779            // Explicit drop of tx signals Phase B (rx iteration) to stop.
780            drop(tx);
781        });
782    });
783
784    // Phase B: main thread persists files as results arrive from the channel.
785    // Results arrive in completion order (par_iter is unordered). We persist
786    // each file immediately on arrival — this is the key fix for B1: with the
787    // old 2-phase design the first DB write happened only after ALL files had
788    // finished Phase A. Now the first commit happens as soon as the first file
789    // completes Phase A, regardless of how many files remain.
790    //
791    // NDJSON output order follows completion order (not file-system sort order).
792    // Skip slots are emitted at the end, after all Process results are consumed.
793    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
794    // requirement than ensuring data is persisted before the user's timeout fires.
795    let fail_fast = args.fail_fast;
796
797    // Emit pending Skip events first so agents see them early.
798    for meta in &slots_meta {
799        if let SlotMeta::Skip {
800            file_str,
801            derived_base,
802            name_truncated,
803            original_name,
804            reason,
805        } = meta
806        {
807            output::emit_json_compact(&IngestFileEvent {
808                file: file_str,
809                name: derived_base,
810                status: "skipped",
811                truncated: *name_truncated,
812                original_name: original_name.clone(),
813                error: Some(reason.clone()),
814                memory_id: None,
815                action: None,
816            })?;
817            skipped += 1;
818        }
819    }
820
821    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
822    // as channel messages arrive in completion order.
823    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
824        .iter()
825        .enumerate()
826        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
827        .collect();
828
829    tracing::info!(
830        target = "ingest",
831        phase = "persist_start",
832        files = total_to_process,
833        "phase B starting: persisting files incrementally as Phase A completes each one",
834    );
835
836    // Drain channel and persist each file immediately — no accumulation into a
837    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
838    // Phase B without applying backpressure.
839    for (idx, stage_result) in rx {
840        let meta = meta_index
841            .get(&idx)
842            .expect("channel idx must correspond to a Process slot");
843        let (file_str, derived_name, name_truncated, original_name) = match meta {
844            SlotMeta::Process {
845                file_str,
846                derived_name,
847                name_truncated,
848                original_name,
849            } => (file_str, derived_name, name_truncated, original_name),
850            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
851        };
852
853        // If storage init failed, every file fails with the same error.
854        let conn = match conn_or_err.as_mut() {
855            Ok(c) => c,
856            Err(err_msg) => {
857                let err_clone = err_msg.clone();
858                output::emit_json_compact(&IngestFileEvent {
859                    file: file_str,
860                    name: derived_name,
861                    status: "failed",
862                    truncated: *name_truncated,
863                    original_name: original_name.clone(),
864                    error: Some(err_clone.clone()),
865                    memory_id: None,
866                    action: None,
867                })?;
868                failed += 1;
869                if fail_fast {
870                    output::emit_json_compact(&IngestSummary {
871                        summary: true,
872                        dir: args.dir.display().to_string(),
873                        pattern: args.pattern.clone(),
874                        recursive: args.recursive,
875                        files_total: total,
876                        files_succeeded: succeeded,
877                        files_failed: failed,
878                        files_skipped: skipped,
879                        elapsed_ms: started.elapsed().as_millis() as u64,
880                    })?;
881                    return Err(AppError::Validation(format!(
882                        "ingest aborted on first failure: {err_clone}"
883                    )));
884                }
885                continue;
886            }
887        };
888
889        let outcome =
890            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
891
892        match outcome {
893            Ok(FileSuccess { memory_id, action }) => {
894                output::emit_json_compact(&IngestFileEvent {
895                    file: file_str,
896                    name: derived_name,
897                    status: "indexed",
898                    truncated: *name_truncated,
899                    original_name: original_name.clone(),
900                    error: None,
901                    memory_id: Some(memory_id),
902                    action: Some(action),
903                })?;
904                succeeded += 1;
905            }
906            Err(e) => {
907                let err_msg = format!("{e}");
908                output::emit_json_compact(&IngestFileEvent {
909                    file: file_str,
910                    name: derived_name,
911                    status: "failed",
912                    truncated: *name_truncated,
913                    original_name: original_name.clone(),
914                    error: Some(err_msg.clone()),
915                    memory_id: None,
916                    action: None,
917                })?;
918                failed += 1;
919                if fail_fast {
920                    output::emit_json_compact(&IngestSummary {
921                        summary: true,
922                        dir: args.dir.display().to_string(),
923                        pattern: args.pattern.clone(),
924                        recursive: args.recursive,
925                        files_total: total,
926                        files_succeeded: succeeded,
927                        files_failed: failed,
928                        files_skipped: skipped,
929                        elapsed_ms: started.elapsed().as_millis() as u64,
930                    })?;
931                    return Err(AppError::Validation(format!(
932                        "ingest aborted on first failure: {err_msg}"
933                    )));
934                }
935            }
936        }
937    }
938
939    // Wait for the producer thread to finish cleanly.
940    producer_handle
941        .join()
942        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
943
944    output::emit_json_compact(&IngestSummary {
945        summary: true,
946        dir: args.dir.display().to_string(),
947        pattern: args.pattern.clone(),
948        recursive: args.recursive,
949        files_total: total,
950        files_succeeded: succeeded,
951        files_failed: failed,
952        files_skipped: skipped,
953        elapsed_ms: started.elapsed().as_millis() as u64,
954    })?;
955
956    Ok(())
957}
958
959/// Auto-initialises the database (matches the contract of every other CRUD
960/// handler) and returns a fresh read/write connection ready for the ingest
961/// loop. Errors here are recoverable per-file: the caller surfaces them as
962/// failure events so `--fail-fast` and the continue-on-error path keep
963/// working when, for example, the user points `--db` at an unwritable path.
964fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
965    ensure_db_ready(paths)?;
966    let conn = open_rw(&paths.db)?;
967    Ok(conn)
968}
969
970fn is_valid_relation(relation: &str) -> bool {
971    matches!(
972        relation,
973        "applies_to"
974            | "uses"
975            | "depends_on"
976            | "causes"
977            | "fixes"
978            | "contradicts"
979            | "supports"
980            | "follows"
981            | "related"
982            | "mentions"
983            | "replaces"
984            | "tracked_in"
985    )
986}
987
988fn collect_files(
989    dir: &Path,
990    pattern: &str,
991    recursive: bool,
992    out: &mut Vec<PathBuf>,
993) -> Result<(), AppError> {
994    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
995    for entry in entries {
996        let entry = entry.map_err(AppError::Io)?;
997        let path = entry.path();
998        let file_type = entry.file_type().map_err(AppError::Io)?;
999        if file_type.is_file() {
1000            let name = entry.file_name();
1001            let name_str = name.to_string_lossy();
1002            if matches_pattern(&name_str, pattern) {
1003                out.push(path);
1004            }
1005        } else if file_type.is_dir() && recursive {
1006            collect_files(&path, pattern, recursive, out)?;
1007        }
1008    }
1009    Ok(())
1010}
1011
1012fn matches_pattern(name: &str, pattern: &str) -> bool {
1013    if let Some(suffix) = pattern.strip_prefix('*') {
1014        name.ends_with(suffix)
1015    } else if let Some(prefix) = pattern.strip_suffix('*') {
1016        name.starts_with(prefix)
1017    } else {
1018        name == pattern
1019    }
1020}
1021
1022/// Returns `(final_name, truncated, original_name)`.
1023/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1024/// `original_name` holds the pre-truncation name only when `truncated=true`.
1025///
1026/// Non-ASCII characters are first decomposed via NFD and then stripped of
1027/// combining marks so accented letters fold to their base ASCII letter
1028/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1029/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1030/// preserves meaningful word content rather than collapsing the basename
1031/// to a few stray ASCII letters as the previous filter did.
1032fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1033    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1034    let lowered: String = stem
1035        .nfd()
1036        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1037        .map(|c| {
1038            if c == '_' || c.is_whitespace() {
1039                '-'
1040            } else {
1041                c
1042            }
1043        })
1044        .map(|c| c.to_ascii_lowercase())
1045        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1046        .collect();
1047    let collapsed = collapse_dashes(&lowered);
1048    let trimmed = collapsed.trim_matches('-').to_string();
1049    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1050        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1051            .trim_matches('-')
1052            .to_string();
1053        // v1.0.31 A10: surface the truncation so users can fix overly long file
1054        // basenames before they collide with siblings sharing the same prefix.
1055        tracing::warn!(
1056            target: "ingest",
1057            original = %trimmed,
1058            truncated_to = %truncated,
1059            max_len = DERIVED_NAME_MAX_LEN,
1060            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1061        );
1062        (truncated, true, Some(trimmed))
1063    } else {
1064        (trimmed, false, None)
1065    }
1066}
1067
1068/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1069/// numeric suffix (`-1`, `-2`, …) when needed.
1070///
1071/// `taken` is the set of names already consumed in the current ingest run.
1072/// The caller is expected to insert the returned name into `taken` so the
1073/// next call observes the consumption. Cross-run collisions are intentionally
1074/// surfaced by the per-file persistence path as duplicates so re-ingestion
1075/// of identical corpora stays idempotent.
1076///
1077/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1078/// candidates collide, signalling a pathological corpus that should be
1079/// renamed manually.
1080fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1081    if !taken.contains(base) {
1082        return Ok(base.to_string());
1083    }
1084    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1085        let candidate = format!("{base}-{suffix}");
1086        if !taken.contains(&candidate) {
1087            tracing::warn!(
1088                target: "ingest",
1089                base = %base,
1090                resolved = %candidate,
1091                suffix,
1092                "memory name collision resolved with numeric suffix"
1093            );
1094            return Ok(candidate);
1095        }
1096    }
1097    Err(AppError::Validation(format!(
1098        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1099    )))
1100}
1101
1102fn collapse_dashes(s: &str) -> String {
1103    let mut out = String::with_capacity(s.len());
1104    let mut prev_dash = false;
1105    for c in s.chars() {
1106        if c == '-' {
1107            if !prev_dash {
1108                out.push('-');
1109            }
1110            prev_dash = true;
1111        } else {
1112            out.push(c);
1113            prev_dash = false;
1114        }
1115    }
1116    out
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::*;
1122    use std::path::PathBuf;
1123
1124    #[test]
1125    fn matches_pattern_suffix() {
1126        assert!(matches_pattern("foo.md", "*.md"));
1127        assert!(!matches_pattern("foo.txt", "*.md"));
1128        assert!(matches_pattern("foo.md", "*"));
1129    }
1130
1131    #[test]
1132    fn matches_pattern_prefix() {
1133        assert!(matches_pattern("README.md", "README*"));
1134        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1135    }
1136
1137    #[test]
1138    fn matches_pattern_exact() {
1139        assert!(matches_pattern("README.md", "README.md"));
1140        assert!(!matches_pattern("readme.md", "README.md"));
1141    }
1142
1143    #[test]
1144    fn derive_kebab_underscore_to_dash() {
1145        let p = PathBuf::from("/tmp/claude_code_headless.md");
1146        let (name, truncated, original) = derive_kebab_name(&p);
1147        assert_eq!(name, "claude-code-headless");
1148        assert!(!truncated);
1149        assert!(original.is_none());
1150    }
1151
1152    #[test]
1153    fn derive_kebab_uppercase_lowered() {
1154        let p = PathBuf::from("/tmp/README.md");
1155        let (name, truncated, original) = derive_kebab_name(&p);
1156        assert_eq!(name, "readme");
1157        assert!(!truncated);
1158        assert!(original.is_none());
1159    }
1160
1161    #[test]
1162    fn derive_kebab_strips_non_kebab_chars() {
1163        let p = PathBuf::from("/tmp/some@weird#name!.md");
1164        let (name, truncated, original) = derive_kebab_name(&p);
1165        assert_eq!(name, "someweirdname");
1166        assert!(!truncated);
1167        assert!(original.is_none());
1168    }
1169
1170    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1171    // accented characters instead of dropping them entirely.
1172    #[test]
1173    fn derive_kebab_folds_accented_letters_to_ascii() {
1174        let p = PathBuf::from("/tmp/açaí.md");
1175        let (name, _, _) = derive_kebab_name(&p);
1176        assert_eq!(name, "acai", "got '{name}'");
1177    }
1178
1179    #[test]
1180    fn derive_kebab_handles_naive_with_diaeresis() {
1181        let p = PathBuf::from("/tmp/naïve-test.md");
1182        let (name, _, _) = derive_kebab_name(&p);
1183        assert_eq!(name, "naive-test", "got '{name}'");
1184    }
1185
1186    #[test]
1187    fn derive_kebab_drops_emoji_keeps_word() {
1188        let p = PathBuf::from("/tmp/🚀-rocket.md");
1189        let (name, _, _) = derive_kebab_name(&p);
1190        assert_eq!(name, "rocket", "got '{name}'");
1191    }
1192
1193    #[test]
1194    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1195        let p = PathBuf::from("/tmp/açaí🦜.md");
1196        let (name, _, _) = derive_kebab_name(&p);
1197        assert_eq!(name, "acai", "got '{name}'");
1198    }
1199
1200    #[test]
1201    fn derive_kebab_pure_emoji_yields_empty() {
1202        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1203        let (name, _, _) = derive_kebab_name(&p);
1204        assert!(name.is_empty(), "got '{name}'");
1205    }
1206
1207    #[test]
1208    fn derive_kebab_collapses_consecutive_dashes() {
1209        let p = PathBuf::from("/tmp/a__b___c.md");
1210        let (name, truncated, original) = derive_kebab_name(&p);
1211        assert_eq!(name, "a-b-c");
1212        assert!(!truncated);
1213        assert!(original.is_none());
1214    }
1215
1216    #[test]
1217    fn derive_kebab_truncates_to_60_chars() {
1218        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1219        let (name, truncated, original) = derive_kebab_name(&p);
1220        assert!(name.len() <= 60, "got len {}", name.len());
1221        assert!(truncated);
1222        assert!(original.is_some());
1223        assert!(original.unwrap().len() > 60);
1224    }
1225
1226    #[test]
1227    fn collect_files_finds_md_files() {
1228        let tmp = tempfile::tempdir().expect("tempdir");
1229        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1230        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1231        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1232        let mut out = Vec::new();
1233        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1234        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1235    }
1236
1237    #[test]
1238    fn collect_files_recursive_descends_subdirs() {
1239        let tmp = tempfile::tempdir().expect("tempdir");
1240        let sub = tmp.path().join("sub");
1241        std::fs::create_dir(&sub).unwrap();
1242        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1243        std::fs::write(sub.join("b.md"), "y").unwrap();
1244        let mut out = Vec::new();
1245        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1246        assert_eq!(out.len(), 2);
1247    }
1248
1249    #[test]
1250    fn collect_files_non_recursive_skips_subdirs() {
1251        let tmp = tempfile::tempdir().expect("tempdir");
1252        let sub = tmp.path().join("sub");
1253        std::fs::create_dir(&sub).unwrap();
1254        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1255        std::fs::write(sub.join("b.md"), "y").unwrap();
1256        let mut out = Vec::new();
1257        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1258        assert_eq!(out.len(), 1);
1259    }
1260
1261    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1262
1263    #[test]
1264    fn derive_kebab_long_basename_truncated_within_cap() {
1265        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1266        let (name, truncated, original) = derive_kebab_name(&p);
1267        assert!(
1268            name.len() <= DERIVED_NAME_MAX_LEN,
1269            "truncated name must respect cap; got {} chars",
1270            name.len()
1271        );
1272        assert!(!name.is_empty());
1273        assert!(truncated);
1274        assert!(original.is_some());
1275    }
1276
1277    #[test]
1278    fn unique_name_returns_base_when_free() {
1279        let taken: BTreeSet<String> = BTreeSet::new();
1280        let resolved = unique_name("note", &taken).expect("must resolve");
1281        assert_eq!(resolved, "note");
1282    }
1283
1284    #[test]
1285    fn unique_name_appends_first_free_suffix_on_collision() {
1286        let mut taken: BTreeSet<String> = BTreeSet::new();
1287        taken.insert("note".to_string());
1288        taken.insert("note-1".to_string());
1289        let resolved = unique_name("note", &taken).expect("must resolve");
1290        assert_eq!(resolved, "note-2");
1291    }
1292
1293    #[test]
1294    fn unique_name_errors_after_collision_cap() {
1295        let mut taken: BTreeSet<String> = BTreeSet::new();
1296        taken.insert("note".to_string());
1297        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1298            taken.insert(format!("note-{i}"));
1299        }
1300        let err = unique_name("note", &taken).expect_err("must surface error");
1301        assert!(matches!(err, AppError::Validation(_)));
1302    }
1303
1304    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1305
1306    #[test]
1307    fn is_valid_relation_accepts_canonical_relations() {
1308        assert!(is_valid_relation("applies_to"));
1309        assert!(is_valid_relation("depends_on"));
1310        assert!(!is_valid_relation("foo_bar"));
1311    }
1312
1313    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1314
1315    use serial_test::serial;
1316
1317    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1318    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1319        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1320        let prev = std::env::var(key).ok();
1321        match value {
1322            Some(v) => std::env::set_var(key, v),
1323            None => std::env::remove_var(key),
1324        }
1325        f();
1326        match prev {
1327            Some(p) => std::env::set_var(key, p),
1328            None => std::env::remove_var(key),
1329        }
1330    }
1331
1332    #[test]
1333    #[serial]
1334    fn env_low_memory_enabled_unset_returns_false() {
1335        with_env_var(None, || assert!(!env_low_memory_enabled()));
1336    }
1337
1338    #[test]
1339    #[serial]
1340    fn env_low_memory_enabled_empty_returns_false() {
1341        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1342    }
1343
1344    #[test]
1345    #[serial]
1346    fn env_low_memory_enabled_truthy_values_return_true() {
1347        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1348            with_env_var(Some(v), || {
1349                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1350            });
1351        }
1352    }
1353
1354    #[test]
1355    #[serial]
1356    fn env_low_memory_enabled_falsy_values_return_false() {
1357        for v in ["0", "false", "FALSE", "no", "off"] {
1358            with_env_var(Some(v), || {
1359                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1360            });
1361        }
1362    }
1363
1364    #[test]
1365    #[serial]
1366    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1367        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1368    }
1369
1370    #[test]
1371    #[serial]
1372    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1373        with_env_var(None, || {
1374            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1375            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1376            assert_eq!(resolve_parallelism(true, None), 1);
1377        });
1378    }
1379
1380    #[test]
1381    #[serial]
1382    fn resolve_parallelism_env_forces_one_when_flag_off() {
1383        with_env_var(Some("1"), || {
1384            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1385            assert_eq!(resolve_parallelism(false, None), 1);
1386        });
1387    }
1388
1389    #[test]
1390    #[serial]
1391    fn resolve_parallelism_falsy_env_does_not_override() {
1392        with_env_var(Some("0"), || {
1393            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1394        });
1395    }
1396
1397    #[test]
1398    #[serial]
1399    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1400        with_env_var(None, || {
1401            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1402            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1403        });
1404    }
1405
1406    #[test]
1407    #[serial]
1408    fn resolve_parallelism_default_when_unset() {
1409        with_env_var(None, || {
1410            let p = resolve_parallelism(false, None);
1411            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1412        });
1413    }
1414
1415    #[test]
1416    fn ingest_args_parses_low_memory_flag_via_clap() {
1417        use clap::Parser;
1418        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1419        // on the public `Cli` definition so the flag is wired end-to-end.
1420        let cli = crate::cli::Cli::try_parse_from([
1421            "sqlite-graphrag",
1422            "ingest",
1423            "/tmp/dummy",
1424            "--type",
1425            "document",
1426            "--low-memory",
1427        ])
1428        .expect("parse must succeed");
1429        match cli.command {
1430            crate::cli::Commands::Ingest(args) => {
1431                assert!(args.low_memory, "--low-memory must set field to true");
1432            }
1433            _ => panic!("expected Ingest subcommand"),
1434        }
1435    }
1436
1437    #[test]
1438    fn ingest_args_low_memory_defaults_false() {
1439        use clap::Parser;
1440        let cli = crate::cli::Cli::try_parse_from([
1441            "sqlite-graphrag",
1442            "ingest",
1443            "/tmp/dummy",
1444            "--type",
1445            "document",
1446        ])
1447        .expect("parse must succeed");
1448        match cli.command {
1449            crate::cli::Commands::Ingest(args) => {
1450                assert!(!args.low_memory, "default must be false");
1451            }
1452            _ => panic!("expected Ingest subcommand"),
1453        }
1454    }
1455}