Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Two-phase pipeline (v1.0.39)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file, results stored in a pre-sized
21//! `Vec<Mutex<Option<Result<StagedFile>>>>` indexed by submission order.
22//!
23//! Phase B runs on the main thread sequentially by index: pulls each
24//! `StagedFile` and writes to SQLite. `Connection` is not `Sync` so it
25//! never crosses thread boundaries. NDJSON output order equals input order.
26
27use crate::chunking;
28use crate::cli::MemoryType;
29use crate::errors::AppError;
30use crate::i18n::errors_msg;
31use crate::output::{self, JsonOutputFormat};
32use crate::paths::AppPaths;
33use crate::storage::chunks as storage_chunks;
34use crate::storage::connection::{ensure_db_ready, open_rw};
35use crate::storage::entities::{NewEntity, NewRelationship};
36use crate::storage::memories::NewMemory;
37use crate::storage::{entities, memories, urls as storage_urls, versions};
38use rayon::prelude::*;
39use rusqlite::Connection;
40use serde::Serialize;
41use std::collections::BTreeSet;
42use std::path::{Path, PathBuf};
43use std::sync::Mutex;
44use unicode_normalization::UnicodeNormalization;
45
46use crate::constants::DERIVED_NAME_MAX_LEN;
47
48/// Hard cap on the numeric suffix appended for collision resolution. If 1000
49/// candidates collide we surface an error rather than loop forever.
50const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
51
52#[derive(clap::Args)]
53#[command(after_long_help = "EXAMPLES:\n  \
54    # Ingest every Markdown file under ./docs as `document` memories\n  \
55    sqlite-graphrag ingest ./docs --type document\n\n  \
56    # Ingest .txt files recursively under ./notes\n  \
57    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
58    # Skip BERT NER auto-extraction for faster bulk import\n  \
59    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
60NOTES:\n  \
61    Each file becomes a separate memory. Names derive from file basenames\n  \
62    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
63    followed by a final summary line with counts. Per-file errors are reported\n  \
64    inline and processing continues unless --fail-fast is set.")]
65pub struct IngestArgs {
66    /// Directory containing files to ingest.
67    #[arg(
68        value_name = "DIR",
69        help = "Directory to ingest recursively (each matching file becomes a memory)"
70    )]
71    pub dir: PathBuf,
72
73    /// Memory type stored in `memories.type` for every ingested file.
74    #[arg(long, value_enum)]
75    pub r#type: MemoryType,
76
77    /// Glob pattern matched against file basenames (default: `*.md`). Supports
78    /// `*.<ext>`, `<prefix>*`, and exact filename match.
79    #[arg(long, default_value = "*.md")]
80    pub pattern: String,
81
82    /// Recurse into subdirectories.
83    #[arg(long, default_value_t = false)]
84    pub recursive: bool,
85
86    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
87    #[arg(long, default_value_t = false)]
88    pub skip_extraction: bool,
89
90    /// Stop on first per-file error instead of continuing with the next file.
91    #[arg(long, default_value_t = false)]
92    pub fail_fast: bool,
93
94    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
95    #[arg(long, default_value_t = 10_000)]
96    pub max_files: usize,
97
98    /// Namespace for the ingested memories.
99    #[arg(long)]
100    pub namespace: Option<String>,
101
102    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
103    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
104    pub db: Option<String>,
105
106    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
107    pub format: JsonOutputFormat,
108
109    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
110    pub json: bool,
111
112    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
113    #[arg(
114        long,
115        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
116    )]
117    pub ingest_parallelism: Option<usize>,
118
119    /// Force single-threaded ingest to reduce RSS pressure.
120    ///
121    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
122    /// explicit value. Recommended for environments with <4 GB available
123    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
124    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
125    /// (CLI flag has higher precedence than the env var).
126    #[arg(
127        long,
128        default_value_t = false,
129        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
130                Recommended for environments with <4 GB available RAM or container/cgroup \
131                constraints. Trade-off: 3-4x longer wall time. Also honored via \
132                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
133    )]
134    pub low_memory: bool,
135}
136
137/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
138/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
139/// values evaluate to false. Unrecognized non-empty values emit a
140/// `tracing::warn!` and evaluate to false.
141fn env_low_memory_enabled() -> bool {
142    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
143        Ok(v) if v.is_empty() => false,
144        Ok(v) => match v.to_lowercase().as_str() {
145            "1" | "true" | "yes" | "on" => true,
146            "0" | "false" | "no" | "off" => false,
147            other => {
148                tracing::warn!(
149                    target: "ingest",
150                    value = %other,
151                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
152                );
153                false
154            }
155        },
156        Err(_) => false,
157    }
158}
159
160/// Resolves the effective ingest parallelism honoring `--low-memory` and the
161/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
162///
163/// Precedence:
164/// 1. `--low-memory` CLI flag forces parallelism = 1.
165/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
166/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
167/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
168///
169/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
170/// emits a `tracing::warn!` advertising the override.
171fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
172    let env_flag = env_low_memory_enabled();
173    let low_memory = low_memory_flag || env_flag;
174
175    if low_memory {
176        if let Some(n) = ingest_parallelism {
177            if n > 1 {
178                tracing::warn!(
179                    target: "ingest",
180                    requested = n,
181                    "--ingest-parallelism overridden by --low-memory; using 1"
182                );
183            }
184        }
185        if low_memory_flag {
186            tracing::info!(
187                target: "ingest",
188                source = "flag",
189                "low-memory mode enabled: forcing --ingest-parallelism 1"
190            );
191        } else {
192            tracing::info!(
193                target: "ingest",
194                source = "env",
195                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
196            );
197        }
198        return 1;
199    }
200
201    ingest_parallelism
202        .unwrap_or_else(|| {
203            std::thread::available_parallelism()
204                .map(|v| v.get() / 2)
205                .unwrap_or(1)
206                .clamp(1, 4)
207        })
208        .max(1)
209}
210
211#[derive(Serialize)]
212struct IngestFileEvent<'a> {
213    file: &'a str,
214    name: &'a str,
215    status: &'a str,
216    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
217    truncated: bool,
218    /// Original derived name before truncation; only present when `truncated=true`.
219    #[serde(skip_serializing_if = "Option::is_none")]
220    original_name: Option<String>,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    error: Option<String>,
223    #[serde(skip_serializing_if = "Option::is_none")]
224    memory_id: Option<i64>,
225    #[serde(skip_serializing_if = "Option::is_none")]
226    action: Option<String>,
227}
228
229#[derive(Serialize)]
230struct IngestSummary {
231    summary: bool,
232    dir: String,
233    pattern: String,
234    recursive: bool,
235    files_total: usize,
236    files_succeeded: usize,
237    files_failed: usize,
238    files_skipped: usize,
239    elapsed_ms: u64,
240}
241
242/// Outcome of a successful per-file ingest, used to build the NDJSON event.
243struct FileSuccess {
244    memory_id: i64,
245    action: String,
246}
247
248/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
249/// Phase B persists these to SQLite on the main thread in submission order.
250struct StagedFile {
251    body: String,
252    body_hash: String,
253    snippet: String,
254    name: String,
255    description: String,
256    embedding: Vec<f32>,
257    chunk_embeddings: Option<Vec<Vec<f32>>>,
258    chunks_info: Vec<crate::chunking::Chunk>,
259    entities: Vec<NewEntity>,
260    relationships: Vec<NewRelationship>,
261    entity_embeddings: Vec<Vec<f32>>,
262    urls: Vec<crate::extraction::ExtractedUrl>,
263}
264
265/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
266/// Never touches the database — safe to run on any rayon thread.
267fn stage_file(
268    _idx: usize,
269    path: &Path,
270    name: &str,
271    paths: &AppPaths,
272    skip_extraction: bool,
273) -> Result<StagedFile, AppError> {
274    use crate::constants::*;
275
276    if name.len() > MAX_MEMORY_NAME_LEN {
277        return Err(AppError::LimitExceeded(
278            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
279        ));
280    }
281    if name.starts_with("__") {
282        return Err(AppError::Validation(
283            crate::i18n::validation::reserved_name(),
284        ));
285    }
286    {
287        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
288            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
289        if !slug_re.is_match(name) {
290            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
291                name,
292            )));
293        }
294    }
295
296    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
297    if raw_body.len() > MAX_MEMORY_BODY_LEN {
298        return Err(AppError::LimitExceeded(
299            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
300        ));
301    }
302    if raw_body.trim().is_empty() {
303        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
304    }
305
306    let description = format!("ingested from {}", path.display());
307    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
308        return Err(AppError::Validation(
309            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
310        ));
311    }
312
313    let mut extracted_entities: Vec<NewEntity> = Vec::new();
314    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
315    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
316    if !skip_extraction {
317        match crate::extraction::extract_graph_auto(&raw_body, paths) {
318            Ok(extracted) => {
319                extracted_urls = extracted.urls;
320                extracted_entities = extracted.entities;
321                extracted_relationships = extracted.relationships;
322
323                if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
324                    extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
325                }
326                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
327                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
328                }
329            }
330            Err(e) => {
331                tracing::warn!(
332                    file = %path.display(),
333                    "auto-extraction failed (graceful degradation): {e:#}"
334                );
335            }
336        }
337    }
338
339    for entity in &extracted_entities {
340        if !is_valid_entity_type(&entity.entity_type) {
341            return Err(AppError::Validation(format!(
342                "invalid entity_type '{}' for entity '{}'",
343                entity.entity_type, entity.name
344            )));
345        }
346    }
347    for rel in &mut extracted_relationships {
348        rel.relation = rel.relation.replace('-', "_");
349        if !is_valid_relation(&rel.relation) {
350            return Err(AppError::Validation(format!(
351                "invalid relation '{}' for relationship '{}' -> '{}'",
352                rel.relation, rel.source, rel.target
353            )));
354        }
355        if !(0.0..=1.0).contains(&rel.strength) {
356            return Err(AppError::Validation(format!(
357                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
358                rel.strength, rel.source, rel.target
359            )));
360        }
361    }
362
363    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
364    let snippet: String = raw_body.chars().take(200).collect();
365
366    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
367    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
368    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
369        return Err(AppError::LimitExceeded(format!(
370            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
371            chunks_info.len(),
372            REMEMBER_MAX_SAFE_MULTI_CHUNKS
373        )));
374    }
375
376    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
377    let embedding = if chunks_info.len() == 1 {
378        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
379    } else {
380        let chunk_texts: Vec<&str> = chunks_info
381            .iter()
382            .map(|c| chunking::chunk_text(&raw_body, c))
383            .collect();
384        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
385        for chunk_text in &chunk_texts {
386            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
387                &paths.models,
388                chunk_text,
389            )?);
390        }
391        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
392        chunk_embeddings_opt = Some(chunk_embeddings);
393        aggregated
394    };
395
396    let entity_embeddings = extracted_entities
397        .iter()
398        .map(|entity| {
399            let entity_text = match &entity.description {
400                Some(desc) => format!("{} {}", entity.name, desc),
401                None => entity.name.clone(),
402            };
403            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
404        })
405        .collect::<Result<Vec<_>, _>>()?;
406
407    Ok(StagedFile {
408        body: raw_body,
409        body_hash,
410        snippet,
411        name: name.to_string(),
412        description,
413        embedding,
414        chunk_embeddings: chunk_embeddings_opt,
415        chunks_info,
416        entities: extracted_entities,
417        relationships: extracted_relationships,
418        entity_embeddings,
419        urls: extracted_urls,
420    })
421}
422
423/// Phase B: persists one `StagedFile` to the database on the main thread.
424fn persist_staged(
425    conn: &mut Connection,
426    namespace: &str,
427    memory_type: &str,
428    staged: StagedFile,
429) -> Result<FileSuccess, AppError> {
430    {
431        let active_count: u32 = conn.query_row(
432            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
433            [],
434            |r| r.get::<_, i64>(0).map(|v| v as u32),
435        )?;
436        let ns_exists: bool = conn.query_row(
437            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
438            rusqlite::params![namespace],
439            |r| r.get::<_, i64>(0).map(|v| v > 0),
440        )?;
441        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
442            return Err(AppError::NamespaceError(format!(
443                "active namespace limit of {} exceeded while creating '{namespace}'",
444                crate::constants::MAX_NAMESPACES_ACTIVE
445            )));
446        }
447    }
448
449    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
450    if existing_memory.is_some() {
451        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
452            &staged.name,
453            namespace,
454        )));
455    }
456    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
457
458    let new_memory = NewMemory {
459        namespace: namespace.to_string(),
460        name: staged.name.clone(),
461        memory_type: memory_type.to_string(),
462        description: staged.description.clone(),
463        body: staged.body,
464        body_hash: staged.body_hash,
465        session_id: None,
466        source: "agent".to_string(),
467        metadata: serde_json::json!({}),
468    };
469
470    if let Some(hash_id) = duplicate_hash_id {
471        tracing::debug!(
472            target: "ingest",
473            duplicate_memory_id = hash_id,
474            "identical body already exists; persisting a new memory anyway"
475        );
476    }
477
478    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
479
480    let memory_id = memories::insert(&tx, &new_memory)?;
481    versions::insert_version(
482        &tx,
483        memory_id,
484        1,
485        &staged.name,
486        memory_type,
487        &staged.description,
488        &new_memory.body,
489        &serde_json::to_string(&new_memory.metadata)?,
490        None,
491        "create",
492    )?;
493    memories::upsert_vec(
494        &tx,
495        memory_id,
496        namespace,
497        memory_type,
498        &staged.embedding,
499        &staged.name,
500        &staged.snippet,
501    )?;
502
503    if staged.chunks_info.len() > 1 {
504        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
505        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
506            AppError::Internal(anyhow::anyhow!(
507                "missing chunk embeddings cache on multi-chunk ingest path"
508            ))
509        })?;
510        for (i, emb) in chunk_embeddings.iter().enumerate() {
511            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
512        }
513    }
514
515    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
516        for (idx, entity) in staged.entities.iter().enumerate() {
517            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
518            let entity_embedding = &staged.entity_embeddings[idx];
519            entities::upsert_entity_vec(
520                &tx,
521                entity_id,
522                namespace,
523                &entity.entity_type,
524                entity_embedding,
525                &entity.name,
526            )?;
527            entities::link_memory_entity(&tx, memory_id, entity_id)?;
528            entities::increment_degree(&tx, entity_id)?;
529        }
530        let entity_types: std::collections::HashMap<&str, &str> = staged
531            .entities
532            .iter()
533            .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
534            .collect();
535        for rel in &staged.relationships {
536            let source_entity = NewEntity {
537                name: rel.source.clone(),
538                entity_type: entity_types
539                    .get(rel.source.as_str())
540                    .copied()
541                    .unwrap_or("concept")
542                    .to_string(),
543                description: None,
544            };
545            let target_entity = NewEntity {
546                name: rel.target.clone(),
547                entity_type: entity_types
548                    .get(rel.target.as_str())
549                    .copied()
550                    .unwrap_or("concept")
551                    .to_string(),
552                description: None,
553            };
554            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
555            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
556            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
557            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
558        }
559    }
560
561    tx.commit()?;
562
563    if !staged.urls.is_empty() {
564        let url_entries: Vec<storage_urls::MemoryUrl> = staged
565            .urls
566            .into_iter()
567            .map(|u| storage_urls::MemoryUrl {
568                url: u.url,
569                offset: Some(u.offset as i64),
570            })
571            .collect();
572        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
573    }
574
575    Ok(FileSuccess {
576        memory_id,
577        action: "created".to_string(),
578    })
579}
580
581pub fn run(args: IngestArgs) -> Result<(), AppError> {
582    let started = std::time::Instant::now();
583
584    if !args.dir.exists() {
585        return Err(AppError::NotFound(format!(
586            "directory not found: {}",
587            args.dir.display()
588        )));
589    }
590    if !args.dir.is_dir() {
591        return Err(AppError::Validation(format!(
592            "path is not a directory: {}",
593            args.dir.display()
594        )));
595    }
596
597    let mut files: Vec<PathBuf> = Vec::new();
598    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
599    files.sort();
600
601    if files.len() > args.max_files {
602        return Err(AppError::Validation(format!(
603            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
604            files.len(),
605            args.max_files
606        )));
607    }
608
609    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
610    let memory_type_str = args.r#type.as_str().to_string();
611
612    let paths = AppPaths::resolve(args.db.as_deref())?;
613    let mut conn_or_err = match init_storage(&paths) {
614        Ok(c) => Ok(c),
615        Err(e) => Err(format!("{e}")),
616    };
617
618    let mut succeeded: usize = 0;
619    let mut failed: usize = 0;
620    let mut skipped: usize = 0;
621    let total = files.len();
622
623    // Pre-resolve all names before parallelisation so Phase A workers see a
624    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
625    let mut taken_names: BTreeSet<String> = BTreeSet::new();
626
627    // Each entry: (path, file_str, derived_name, name_truncated, original_name)
628    // or None when the file should be skipped immediately.
629    struct FileSlot {
630        path: PathBuf,
631        file_str: String,
632        derived_name: String,
633        name_truncated: bool,
634        original_name: Option<String>,
635    }
636    enum Slot {
637        Skip {
638            file_str: String,
639            derived_base: String,
640            name_truncated: bool,
641            original_name: Option<String>,
642            reason: String,
643        },
644        Process(FileSlot),
645    }
646
647    let slots: Vec<Slot> = files
648        .iter()
649        .map(|path| {
650            let file_str = path.to_string_lossy().into_owned();
651            let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
652
653            if derived_base.is_empty() {
654                return Slot::Skip {
655                    file_str,
656                    derived_base: String::new(),
657                    name_truncated: false,
658                    original_name: None,
659                    reason: "could not derive a non-empty kebab-case name from filename"
660                        .to_string(),
661                };
662            }
663
664            match unique_name(&derived_base, &taken_names) {
665                Ok(derived_name) => {
666                    taken_names.insert(derived_name.clone());
667                    Slot::Process(FileSlot {
668                        path: path.clone(),
669                        file_str,
670                        derived_name,
671                        name_truncated,
672                        original_name,
673                    })
674                }
675                Err(e) => Slot::Skip {
676                    file_str,
677                    derived_base,
678                    name_truncated,
679                    original_name,
680                    reason: e.to_string(),
681                },
682            }
683        })
684        .collect();
685
686    // Determine rayon thread pool size, honoring --low-memory and the
687    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
688    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
689
690    let pool = rayon::ThreadPoolBuilder::new()
691        .num_threads(parallelism)
692        .build()
693        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
694
695    // Phase A: parallel compute. Indexed slot matches `slots` index for ordering.
696    let staged: Vec<Mutex<Option<Result<StagedFile, AppError>>>> =
697        (0..slots.len()).map(|_| Mutex::new(None)).collect();
698
699    let skip_extraction = args.skip_extraction;
700    let paths_ref = &paths;
701
702    let total_to_process = slots
703        .iter()
704        .filter(|s| matches!(s, Slot::Process(_)))
705        .count();
706    tracing::info!(
707        target = "ingest",
708        phase = "stage_start",
709        files = total_to_process,
710        ingest_parallelism = parallelism,
711        "phase A (stage) starting: chunk + embed + NER on rayon pool",
712    );
713    let staged_done = std::sync::atomic::AtomicUsize::new(0);
714
715    pool.install(|| {
716        slots.par_iter().enumerate().for_each(|(idx, slot)| {
717            if let Slot::Process(fs) = slot {
718                let result =
719                    stage_file(idx, &fs.path, &fs.derived_name, paths_ref, skip_extraction);
720                // SAFETY: staged[idx] is only written once by this worker.
721                *staged[idx].lock().expect("staged slot poisoned") = Some(result);
722                let done = staged_done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
723                if done % 10 == 0 || done == total_to_process {
724                    tracing::info!(
725                        target = "ingest",
726                        phase = "stage_progress",
727                        done = done,
728                        total = total_to_process,
729                        "phase A progress",
730                    );
731                }
732            }
733        });
734    });
735
736    tracing::info!(
737        target = "ingest",
738        phase = "persist_start",
739        files = total_to_process,
740        "phase B (persist) starting: sequential writes to SQLite + NDJSON emit",
741    );
742
743    // Phase B: sequential persist on main thread (Connection is !Sync).
744    let fail_fast = args.fail_fast;
745    for (idx, slot) in slots.iter().enumerate() {
746        match slot {
747            Slot::Skip {
748                file_str,
749                derived_base,
750                name_truncated,
751                original_name,
752                reason,
753            } => {
754                output::emit_json_compact(&IngestFileEvent {
755                    file: file_str,
756                    name: derived_base,
757                    status: "skipped",
758                    truncated: *name_truncated,
759                    original_name: original_name.clone(),
760                    error: Some(reason.clone()),
761                    memory_id: None,
762                    action: None,
763                })?;
764                skipped += 1;
765            }
766            Slot::Process(fs) => {
767                // If storage init failed, every file fails with the same error.
768                let conn = match conn_or_err.as_mut() {
769                    Ok(c) => c,
770                    Err(err_msg) => {
771                        let err_clone = err_msg.clone();
772                        output::emit_json_compact(&IngestFileEvent {
773                            file: &fs.file_str,
774                            name: &fs.derived_name,
775                            status: "failed",
776                            truncated: fs.name_truncated,
777                            original_name: fs.original_name.clone(),
778                            error: Some(err_clone.clone()),
779                            memory_id: None,
780                            action: None,
781                        })?;
782                        failed += 1;
783                        if fail_fast {
784                            output::emit_json_compact(&IngestSummary {
785                                summary: true,
786                                dir: args.dir.display().to_string(),
787                                pattern: args.pattern.clone(),
788                                recursive: args.recursive,
789                                files_total: total,
790                                files_succeeded: succeeded,
791                                files_failed: failed,
792                                files_skipped: skipped,
793                                elapsed_ms: started.elapsed().as_millis() as u64,
794                            })?;
795                            return Err(AppError::Validation(format!(
796                                "ingest aborted on first failure: {err_clone}"
797                            )));
798                        }
799                        continue;
800                    }
801                };
802
803                // Take the Phase A result (always Some for Process slots).
804                let stage_result = staged[idx]
805                    .lock()
806                    .expect("staged slot poisoned")
807                    .take()
808                    .expect("staged slot empty for Process slot");
809
810                let outcome = stage_result
811                    .and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
812
813                match outcome {
814                    Ok(FileSuccess { memory_id, action }) => {
815                        output::emit_json_compact(&IngestFileEvent {
816                            file: &fs.file_str,
817                            name: &fs.derived_name,
818                            status: "indexed",
819                            truncated: fs.name_truncated,
820                            original_name: fs.original_name.clone(),
821                            error: None,
822                            memory_id: Some(memory_id),
823                            action: Some(action),
824                        })?;
825                        succeeded += 1;
826                    }
827                    Err(e) => {
828                        let err_msg = format!("{e}");
829                        output::emit_json_compact(&IngestFileEvent {
830                            file: &fs.file_str,
831                            name: &fs.derived_name,
832                            status: "failed",
833                            truncated: fs.name_truncated,
834                            original_name: fs.original_name.clone(),
835                            error: Some(err_msg.clone()),
836                            memory_id: None,
837                            action: None,
838                        })?;
839                        failed += 1;
840                        if fail_fast {
841                            output::emit_json_compact(&IngestSummary {
842                                summary: true,
843                                dir: args.dir.display().to_string(),
844                                pattern: args.pattern.clone(),
845                                recursive: args.recursive,
846                                files_total: total,
847                                files_succeeded: succeeded,
848                                files_failed: failed,
849                                files_skipped: skipped,
850                                elapsed_ms: started.elapsed().as_millis() as u64,
851                            })?;
852                            return Err(AppError::Validation(format!(
853                                "ingest aborted on first failure: {err_msg}"
854                            )));
855                        }
856                    }
857                }
858            }
859        }
860    }
861
862    output::emit_json_compact(&IngestSummary {
863        summary: true,
864        dir: args.dir.display().to_string(),
865        pattern: args.pattern.clone(),
866        recursive: args.recursive,
867        files_total: total,
868        files_succeeded: succeeded,
869        files_failed: failed,
870        files_skipped: skipped,
871        elapsed_ms: started.elapsed().as_millis() as u64,
872    })?;
873
874    Ok(())
875}
876
877/// Auto-initialises the database (matches the contract of every other CRUD
878/// handler) and returns a fresh read/write connection ready for the ingest
879/// loop. Errors here are recoverable per-file: the caller surfaces them as
880/// failure events so `--fail-fast` and the continue-on-error path keep
881/// working when, for example, the user points `--db` at an unwritable path.
882fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
883    ensure_db_ready(paths)?;
884    let conn = open_rw(&paths.db)?;
885    Ok(conn)
886}
887
888fn is_valid_entity_type(entity_type: &str) -> bool {
889    matches!(
890        entity_type,
891        "project"
892            | "tool"
893            | "person"
894            | "file"
895            | "concept"
896            | "incident"
897            | "decision"
898            | "memory"
899            | "dashboard"
900            | "issue_tracker"
901            | "organization"
902            | "location"
903            | "date"
904    )
905}
906
907fn is_valid_relation(relation: &str) -> bool {
908    matches!(
909        relation,
910        "applies_to"
911            | "uses"
912            | "depends_on"
913            | "causes"
914            | "fixes"
915            | "contradicts"
916            | "supports"
917            | "follows"
918            | "related"
919            | "mentions"
920            | "replaces"
921            | "tracked_in"
922    )
923}
924
925fn collect_files(
926    dir: &Path,
927    pattern: &str,
928    recursive: bool,
929    out: &mut Vec<PathBuf>,
930) -> Result<(), AppError> {
931    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
932    for entry in entries {
933        let entry = entry.map_err(AppError::Io)?;
934        let path = entry.path();
935        let file_type = entry.file_type().map_err(AppError::Io)?;
936        if file_type.is_file() {
937            let name = entry.file_name();
938            let name_str = name.to_string_lossy();
939            if matches_pattern(&name_str, pattern) {
940                out.push(path);
941            }
942        } else if file_type.is_dir() && recursive {
943            collect_files(&path, pattern, recursive, out)?;
944        }
945    }
946    Ok(())
947}
948
949fn matches_pattern(name: &str, pattern: &str) -> bool {
950    if let Some(suffix) = pattern.strip_prefix('*') {
951        name.ends_with(suffix)
952    } else if let Some(prefix) = pattern.strip_suffix('*') {
953        name.starts_with(prefix)
954    } else {
955        name == pattern
956    }
957}
958
959/// Returns `(final_name, truncated, original_name)`.
960/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
961/// `original_name` holds the pre-truncation name only when `truncated=true`.
962///
963/// Non-ASCII characters are first decomposed via NFD and then stripped of
964/// combining marks so accented letters fold to their base ASCII letter
965/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
966/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
967/// preserves meaningful word content rather than collapsing the basename
968/// to a few stray ASCII letters as the previous filter did.
969fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
970    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
971    let lowered: String = stem
972        .nfd()
973        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
974        .map(|c| {
975            if c == '_' || c.is_whitespace() {
976                '-'
977            } else {
978                c
979            }
980        })
981        .map(|c| c.to_ascii_lowercase())
982        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
983        .collect();
984    let collapsed = collapse_dashes(&lowered);
985    let trimmed = collapsed.trim_matches('-').to_string();
986    if trimmed.len() > DERIVED_NAME_MAX_LEN {
987        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
988            .trim_matches('-')
989            .to_string();
990        // v1.0.31 A10: surface the truncation so users can fix overly long file
991        // basenames before they collide with siblings sharing the same prefix.
992        tracing::warn!(
993            target: "ingest",
994            original = %trimmed,
995            truncated_to = %truncated,
996            max_len = DERIVED_NAME_MAX_LEN,
997            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
998        );
999        (truncated, true, Some(trimmed))
1000    } else {
1001        (trimmed, false, None)
1002    }
1003}
1004
1005/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1006/// numeric suffix (`-1`, `-2`, …) when needed.
1007///
1008/// `taken` is the set of names already consumed in the current ingest run.
1009/// The caller is expected to insert the returned name into `taken` so the
1010/// next call observes the consumption. Cross-run collisions are intentionally
1011/// surfaced by the per-file persistence path as duplicates so re-ingestion
1012/// of identical corpora stays idempotent.
1013///
1014/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1015/// candidates collide, signalling a pathological corpus that should be
1016/// renamed manually.
1017fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1018    if !taken.contains(base) {
1019        return Ok(base.to_string());
1020    }
1021    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1022        let candidate = format!("{base}-{suffix}");
1023        if !taken.contains(&candidate) {
1024            tracing::warn!(
1025                target: "ingest",
1026                base = %base,
1027                resolved = %candidate,
1028                suffix,
1029                "memory name collision resolved with numeric suffix"
1030            );
1031            return Ok(candidate);
1032        }
1033    }
1034    Err(AppError::Validation(format!(
1035        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1036    )))
1037}
1038
1039fn collapse_dashes(s: &str) -> String {
1040    let mut out = String::with_capacity(s.len());
1041    let mut prev_dash = false;
1042    for c in s.chars() {
1043        if c == '-' {
1044            if !prev_dash {
1045                out.push('-');
1046            }
1047            prev_dash = true;
1048        } else {
1049            out.push(c);
1050            prev_dash = false;
1051        }
1052    }
1053    out
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use std::path::PathBuf;
1060
1061    #[test]
1062    fn matches_pattern_suffix() {
1063        assert!(matches_pattern("foo.md", "*.md"));
1064        assert!(!matches_pattern("foo.txt", "*.md"));
1065        assert!(matches_pattern("foo.md", "*"));
1066    }
1067
1068    #[test]
1069    fn matches_pattern_prefix() {
1070        assert!(matches_pattern("README.md", "README*"));
1071        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1072    }
1073
1074    #[test]
1075    fn matches_pattern_exact() {
1076        assert!(matches_pattern("README.md", "README.md"));
1077        assert!(!matches_pattern("readme.md", "README.md"));
1078    }
1079
1080    #[test]
1081    fn derive_kebab_underscore_to_dash() {
1082        let p = PathBuf::from("/tmp/claude_code_headless.md");
1083        let (name, truncated, original) = derive_kebab_name(&p);
1084        assert_eq!(name, "claude-code-headless");
1085        assert!(!truncated);
1086        assert!(original.is_none());
1087    }
1088
1089    #[test]
1090    fn derive_kebab_uppercase_lowered() {
1091        let p = PathBuf::from("/tmp/README.md");
1092        let (name, truncated, original) = derive_kebab_name(&p);
1093        assert_eq!(name, "readme");
1094        assert!(!truncated);
1095        assert!(original.is_none());
1096    }
1097
1098    #[test]
1099    fn derive_kebab_strips_non_kebab_chars() {
1100        let p = PathBuf::from("/tmp/some@weird#name!.md");
1101        let (name, truncated, original) = derive_kebab_name(&p);
1102        assert_eq!(name, "someweirdname");
1103        assert!(!truncated);
1104        assert!(original.is_none());
1105    }
1106
1107    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1108    // accented characters instead of dropping them entirely.
1109    #[test]
1110    fn derive_kebab_folds_accented_letters_to_ascii() {
1111        let p = PathBuf::from("/tmp/açaí.md");
1112        let (name, _, _) = derive_kebab_name(&p);
1113        assert_eq!(name, "acai", "got '{name}'");
1114    }
1115
1116    #[test]
1117    fn derive_kebab_handles_naive_with_diaeresis() {
1118        let p = PathBuf::from("/tmp/naïve-test.md");
1119        let (name, _, _) = derive_kebab_name(&p);
1120        assert_eq!(name, "naive-test", "got '{name}'");
1121    }
1122
1123    #[test]
1124    fn derive_kebab_drops_emoji_keeps_word() {
1125        let p = PathBuf::from("/tmp/🚀-rocket.md");
1126        let (name, _, _) = derive_kebab_name(&p);
1127        assert_eq!(name, "rocket", "got '{name}'");
1128    }
1129
1130    #[test]
1131    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1132        let p = PathBuf::from("/tmp/açaí🦜.md");
1133        let (name, _, _) = derive_kebab_name(&p);
1134        assert_eq!(name, "acai", "got '{name}'");
1135    }
1136
1137    #[test]
1138    fn derive_kebab_pure_emoji_yields_empty() {
1139        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1140        let (name, _, _) = derive_kebab_name(&p);
1141        assert!(name.is_empty(), "got '{name}'");
1142    }
1143
1144    #[test]
1145    fn derive_kebab_collapses_consecutive_dashes() {
1146        let p = PathBuf::from("/tmp/a__b___c.md");
1147        let (name, truncated, original) = derive_kebab_name(&p);
1148        assert_eq!(name, "a-b-c");
1149        assert!(!truncated);
1150        assert!(original.is_none());
1151    }
1152
1153    #[test]
1154    fn derive_kebab_truncates_to_60_chars() {
1155        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1156        let (name, truncated, original) = derive_kebab_name(&p);
1157        assert!(name.len() <= 60, "got len {}", name.len());
1158        assert!(truncated);
1159        assert!(original.is_some());
1160        assert!(original.unwrap().len() > 60);
1161    }
1162
1163    #[test]
1164    fn collect_files_finds_md_files() {
1165        let tmp = tempfile::tempdir().expect("tempdir");
1166        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1167        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1168        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1169        let mut out = Vec::new();
1170        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1171        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1172    }
1173
1174    #[test]
1175    fn collect_files_recursive_descends_subdirs() {
1176        let tmp = tempfile::tempdir().expect("tempdir");
1177        let sub = tmp.path().join("sub");
1178        std::fs::create_dir(&sub).unwrap();
1179        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1180        std::fs::write(sub.join("b.md"), "y").unwrap();
1181        let mut out = Vec::new();
1182        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1183        assert_eq!(out.len(), 2);
1184    }
1185
1186    #[test]
1187    fn collect_files_non_recursive_skips_subdirs() {
1188        let tmp = tempfile::tempdir().expect("tempdir");
1189        let sub = tmp.path().join("sub");
1190        std::fs::create_dir(&sub).unwrap();
1191        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1192        std::fs::write(sub.join("b.md"), "y").unwrap();
1193        let mut out = Vec::new();
1194        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1195        assert_eq!(out.len(), 1);
1196    }
1197
1198    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1199
1200    #[test]
1201    fn derive_kebab_long_basename_truncated_within_cap() {
1202        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1203        let (name, truncated, original) = derive_kebab_name(&p);
1204        assert!(
1205            name.len() <= DERIVED_NAME_MAX_LEN,
1206            "truncated name must respect cap; got {} chars",
1207            name.len()
1208        );
1209        assert!(!name.is_empty());
1210        assert!(truncated);
1211        assert!(original.is_some());
1212    }
1213
1214    #[test]
1215    fn unique_name_returns_base_when_free() {
1216        let taken: BTreeSet<String> = BTreeSet::new();
1217        let resolved = unique_name("note", &taken).expect("must resolve");
1218        assert_eq!(resolved, "note");
1219    }
1220
1221    #[test]
1222    fn unique_name_appends_first_free_suffix_on_collision() {
1223        let mut taken: BTreeSet<String> = BTreeSet::new();
1224        taken.insert("note".to_string());
1225        taken.insert("note-1".to_string());
1226        let resolved = unique_name("note", &taken).expect("must resolve");
1227        assert_eq!(resolved, "note-2");
1228    }
1229
1230    #[test]
1231    fn unique_name_errors_after_collision_cap() {
1232        let mut taken: BTreeSet<String> = BTreeSet::new();
1233        taken.insert("note".to_string());
1234        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1235            taken.insert(format!("note-{i}"));
1236        }
1237        let err = unique_name("note", &taken).expect_err("must surface error");
1238        assert!(matches!(err, AppError::Validation(_)));
1239    }
1240
1241    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1242
1243    #[test]
1244    fn is_valid_entity_type_accepts_v008_types() {
1245        assert!(is_valid_entity_type("organization"));
1246        assert!(is_valid_entity_type("location"));
1247        assert!(is_valid_entity_type("date"));
1248        assert!(!is_valid_entity_type("unknown"));
1249    }
1250
1251    #[test]
1252    fn is_valid_relation_accepts_canonical_relations() {
1253        assert!(is_valid_relation("applies_to"));
1254        assert!(is_valid_relation("depends_on"));
1255        assert!(!is_valid_relation("foo_bar"));
1256    }
1257
1258    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1259
1260    use serial_test::serial;
1261
1262    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1263    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1264        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1265        let prev = std::env::var(key).ok();
1266        match value {
1267            Some(v) => std::env::set_var(key, v),
1268            None => std::env::remove_var(key),
1269        }
1270        f();
1271        match prev {
1272            Some(p) => std::env::set_var(key, p),
1273            None => std::env::remove_var(key),
1274        }
1275    }
1276
1277    #[test]
1278    #[serial]
1279    fn env_low_memory_enabled_unset_returns_false() {
1280        with_env_var(None, || assert!(!env_low_memory_enabled()));
1281    }
1282
1283    #[test]
1284    #[serial]
1285    fn env_low_memory_enabled_empty_returns_false() {
1286        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1287    }
1288
1289    #[test]
1290    #[serial]
1291    fn env_low_memory_enabled_truthy_values_return_true() {
1292        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1293            with_env_var(Some(v), || {
1294                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1295            });
1296        }
1297    }
1298
1299    #[test]
1300    #[serial]
1301    fn env_low_memory_enabled_falsy_values_return_false() {
1302        for v in ["0", "false", "FALSE", "no", "off"] {
1303            with_env_var(Some(v), || {
1304                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1305            });
1306        }
1307    }
1308
1309    #[test]
1310    #[serial]
1311    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1312        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1313    }
1314
1315    #[test]
1316    #[serial]
1317    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1318        with_env_var(None, || {
1319            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1320            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1321            assert_eq!(resolve_parallelism(true, None), 1);
1322        });
1323    }
1324
1325    #[test]
1326    #[serial]
1327    fn resolve_parallelism_env_forces_one_when_flag_off() {
1328        with_env_var(Some("1"), || {
1329            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1330            assert_eq!(resolve_parallelism(false, None), 1);
1331        });
1332    }
1333
1334    #[test]
1335    #[serial]
1336    fn resolve_parallelism_falsy_env_does_not_override() {
1337        with_env_var(Some("0"), || {
1338            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1339        });
1340    }
1341
1342    #[test]
1343    #[serial]
1344    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1345        with_env_var(None, || {
1346            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1347            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1348        });
1349    }
1350
1351    #[test]
1352    #[serial]
1353    fn resolve_parallelism_default_when_unset() {
1354        with_env_var(None, || {
1355            let p = resolve_parallelism(false, None);
1356            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1357        });
1358    }
1359
1360    #[test]
1361    fn ingest_args_parses_low_memory_flag_via_clap() {
1362        use clap::Parser;
1363        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1364        // on the public `Cli` definition so the flag is wired end-to-end.
1365        let cli = crate::cli::Cli::try_parse_from([
1366            "sqlite-graphrag",
1367            "ingest",
1368            "/tmp/dummy",
1369            "--type",
1370            "document",
1371            "--low-memory",
1372        ])
1373        .expect("parse must succeed");
1374        match cli.command {
1375            crate::cli::Commands::Ingest(args) => {
1376                assert!(args.low_memory, "--low-memory must set field to true");
1377            }
1378            _ => panic!("expected Ingest subcommand"),
1379        }
1380    }
1381
1382    #[test]
1383    fn ingest_args_low_memory_defaults_false() {
1384        use clap::Parser;
1385        let cli = crate::cli::Cli::try_parse_from([
1386            "sqlite-graphrag",
1387            "ingest",
1388            "/tmp/dummy",
1389            "--type",
1390            "document",
1391        ])
1392        .expect("parse must succeed");
1393        match cli.command {
1394            crate::cli::Commands::Ingest(args) => {
1395                assert!(!args.low_memory, "default must be false");
1396            }
1397            _ => panic!("expected Ingest subcommand"),
1398        }
1399    }
1400}