Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Two-phase pipeline (v1.0.39)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file, results stored in a pre-sized
21//! `Vec<Mutex<Option<Result<StagedFile>>>>` indexed by submission order.
22//!
23//! Phase B runs on the main thread sequentially by index: pulls each
24//! `StagedFile` and writes to SQLite. `Connection` is not `Sync` so it
25//! never crosses thread boundaries. NDJSON output order equals input order.
26
27use crate::chunking;
28use crate::cli::MemoryType;
29use crate::errors::AppError;
30use crate::i18n::errors_msg;
31use crate::output::{self, JsonOutputFormat};
32use crate::paths::AppPaths;
33use crate::storage::chunks as storage_chunks;
34use crate::storage::connection::{ensure_db_ready, open_rw};
35use crate::storage::entities::{NewEntity, NewRelationship};
36use crate::storage::memories::NewMemory;
37use crate::storage::{entities, memories, urls as storage_urls, versions};
38use rayon::prelude::*;
39use rusqlite::Connection;
40use serde::Serialize;
41use std::collections::BTreeSet;
42use std::path::{Path, PathBuf};
43use std::sync::Mutex;
44use unicode_normalization::UnicodeNormalization;
45
46/// Maximum length of a derived kebab-case name. Longer basenames are truncated
47/// (with a `tracing::warn!`) to keep the `memories.name` column bounded.
48const DERIVED_NAME_MAX_LEN: usize = 60;
49
50/// Hard cap on the numeric suffix appended for collision resolution. If 1000
51/// candidates collide we surface an error rather than loop forever.
52const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
53
54#[derive(clap::Args)]
55#[command(after_long_help = "EXAMPLES:\n  \
56    # Ingest every Markdown file under ./docs as `document` memories\n  \
57    sqlite-graphrag ingest ./docs --type document\n\n  \
58    # Ingest .txt files recursively under ./notes\n  \
59    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
60    # Skip BERT NER auto-extraction for faster bulk import\n  \
61    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
62NOTES:\n  \
63    Each file becomes a separate memory. Names derive from file basenames\n  \
64    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
65    followed by a final summary line with counts. Per-file errors are reported\n  \
66    inline and processing continues unless --fail-fast is set.")]
67pub struct IngestArgs {
68    /// Directory containing files to ingest.
69    #[arg(
70        value_name = "DIR",
71        help = "Directory to ingest recursively (each matching file becomes a memory)"
72    )]
73    pub dir: PathBuf,
74
75    /// Memory type stored in `memories.type` for every ingested file.
76    #[arg(long, value_enum)]
77    pub r#type: MemoryType,
78
79    /// Glob pattern matched against file basenames (default: `*.md`). Supports
80    /// `*.<ext>`, `<prefix>*`, and exact filename match.
81    #[arg(long, default_value = "*.md")]
82    pub pattern: String,
83
84    /// Recurse into subdirectories.
85    #[arg(long, default_value_t = false)]
86    pub recursive: bool,
87
88    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
89    #[arg(long, default_value_t = false)]
90    pub skip_extraction: bool,
91
92    /// Stop on first per-file error instead of continuing with the next file.
93    #[arg(long, default_value_t = false)]
94    pub fail_fast: bool,
95
96    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
97    #[arg(long, default_value_t = 10_000)]
98    pub max_files: usize,
99
100    /// Namespace for the ingested memories.
101    #[arg(long)]
102    pub namespace: Option<String>,
103
104    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
105    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
106    pub db: Option<String>,
107
108    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
109    pub format: JsonOutputFormat,
110
111    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
112    pub json: bool,
113
114    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
115    #[arg(
116        long,
117        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
118    )]
119    pub ingest_parallelism: Option<usize>,
120
121    /// Force single-threaded ingest to reduce RSS pressure.
122    ///
123    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
124    /// explicit value. Recommended for environments with <4 GB available
125    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
126    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
127    /// (CLI flag has higher precedence than the env var).
128    #[arg(
129        long,
130        default_value_t = false,
131        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
132                Recommended for environments with <4 GB available RAM or container/cgroup \
133                constraints. Trade-off: 3-4x longer wall time. Also honored via \
134                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
135    )]
136    pub low_memory: bool,
137}
138
139/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
140/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
141/// values evaluate to false. Unrecognized non-empty values emit a
142/// `tracing::warn!` and evaluate to false.
143fn env_low_memory_enabled() -> bool {
144    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
145        Ok(v) if v.is_empty() => false,
146        Ok(v) => match v.to_lowercase().as_str() {
147            "1" | "true" | "yes" | "on" => true,
148            "0" | "false" | "no" | "off" => false,
149            other => {
150                tracing::warn!(
151                    target: "ingest",
152                    value = %other,
153                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
154                );
155                false
156            }
157        },
158        Err(_) => false,
159    }
160}
161
162/// Resolves the effective ingest parallelism honoring `--low-memory` and the
163/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
164///
165/// Precedence:
166/// 1. `--low-memory` CLI flag forces parallelism = 1.
167/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
168/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
169/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
170///
171/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
172/// emits a `tracing::warn!` advertising the override.
173fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
174    let env_flag = env_low_memory_enabled();
175    let low_memory = low_memory_flag || env_flag;
176
177    if low_memory {
178        if let Some(n) = ingest_parallelism {
179            if n > 1 {
180                tracing::warn!(
181                    target: "ingest",
182                    requested = n,
183                    "--ingest-parallelism overridden by --low-memory; using 1"
184                );
185            }
186        }
187        if low_memory_flag {
188            tracing::info!(
189                target: "ingest",
190                source = "flag",
191                "low-memory mode enabled: forcing --ingest-parallelism 1"
192            );
193        } else {
194            tracing::info!(
195                target: "ingest",
196                source = "env",
197                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
198            );
199        }
200        return 1;
201    }
202
203    ingest_parallelism
204        .unwrap_or_else(|| {
205            std::thread::available_parallelism()
206                .map(|v| v.get() / 2)
207                .unwrap_or(1)
208                .clamp(1, 4)
209        })
210        .max(1)
211}
212
213#[derive(Serialize)]
214struct IngestFileEvent<'a> {
215    file: &'a str,
216    name: &'a str,
217    status: &'a str,
218    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
219    truncated: bool,
220    /// Original derived name before truncation; only present when `truncated=true`.
221    #[serde(skip_serializing_if = "Option::is_none")]
222    original_name: Option<String>,
223    #[serde(skip_serializing_if = "Option::is_none")]
224    error: Option<String>,
225    #[serde(skip_serializing_if = "Option::is_none")]
226    memory_id: Option<i64>,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    action: Option<String>,
229}
230
231#[derive(Serialize)]
232struct IngestSummary {
233    summary: bool,
234    dir: String,
235    pattern: String,
236    recursive: bool,
237    files_total: usize,
238    files_succeeded: usize,
239    files_failed: usize,
240    files_skipped: usize,
241    elapsed_ms: u64,
242}
243
244/// Outcome of a successful per-file ingest, used to build the NDJSON event.
245struct FileSuccess {
246    memory_id: i64,
247    action: String,
248}
249
250/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
251/// Phase B persists these to SQLite on the main thread in submission order.
252struct StagedFile {
253    body: String,
254    body_hash: String,
255    snippet: String,
256    name: String,
257    description: String,
258    embedding: Vec<f32>,
259    chunk_embeddings: Option<Vec<Vec<f32>>>,
260    chunks_info: Vec<crate::chunking::Chunk>,
261    entities: Vec<NewEntity>,
262    relationships: Vec<NewRelationship>,
263    entity_embeddings: Vec<Vec<f32>>,
264    urls: Vec<crate::extraction::ExtractedUrl>,
265}
266
267/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
268/// Never touches the database — safe to run on any rayon thread.
269fn stage_file(
270    _idx: usize,
271    path: &Path,
272    name: &str,
273    paths: &AppPaths,
274    skip_extraction: bool,
275) -> Result<StagedFile, AppError> {
276    use crate::constants::*;
277
278    if name.len() > MAX_MEMORY_NAME_LEN {
279        return Err(AppError::LimitExceeded(
280            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
281        ));
282    }
283    if name.starts_with("__") {
284        return Err(AppError::Validation(
285            crate::i18n::validation::reserved_name(),
286        ));
287    }
288    {
289        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
290            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
291        if !slug_re.is_match(name) {
292            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
293                name,
294            )));
295        }
296    }
297
298    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
299    if raw_body.len() > MAX_MEMORY_BODY_LEN {
300        return Err(AppError::LimitExceeded(
301            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
302        ));
303    }
304    if raw_body.trim().is_empty() {
305        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
306    }
307
308    let description = format!("ingested from {}", path.display());
309    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
310        return Err(AppError::Validation(
311            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
312        ));
313    }
314
315    let mut extracted_entities: Vec<NewEntity> = Vec::new();
316    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
317    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
318    if !skip_extraction {
319        match crate::extraction::extract_graph_auto(&raw_body, paths) {
320            Ok(extracted) => {
321                extracted_urls = extracted.urls;
322                extracted_entities = extracted.entities;
323                extracted_relationships = extracted.relationships;
324
325                if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
326                    extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
327                }
328                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
329                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
330                }
331            }
332            Err(e) => {
333                tracing::warn!(
334                    file = %path.display(),
335                    "auto-extraction failed (graceful degradation): {e:#}"
336                );
337            }
338        }
339    }
340
341    for entity in &extracted_entities {
342        if !is_valid_entity_type(&entity.entity_type) {
343            return Err(AppError::Validation(format!(
344                "invalid entity_type '{}' for entity '{}'",
345                entity.entity_type, entity.name
346            )));
347        }
348    }
349    for rel in &mut extracted_relationships {
350        rel.relation = rel.relation.replace('-', "_");
351        if !is_valid_relation(&rel.relation) {
352            return Err(AppError::Validation(format!(
353                "invalid relation '{}' for relationship '{}' -> '{}'",
354                rel.relation, rel.source, rel.target
355            )));
356        }
357        if !(0.0..=1.0).contains(&rel.strength) {
358            return Err(AppError::Validation(format!(
359                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
360                rel.strength, rel.source, rel.target
361            )));
362        }
363    }
364
365    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
366    let snippet: String = raw_body.chars().take(200).collect();
367
368    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
369    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
370    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
371        return Err(AppError::LimitExceeded(format!(
372            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
373            chunks_info.len(),
374            REMEMBER_MAX_SAFE_MULTI_CHUNKS
375        )));
376    }
377
378    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
379    let embedding = if chunks_info.len() == 1 {
380        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
381    } else {
382        let chunk_texts: Vec<&str> = chunks_info
383            .iter()
384            .map(|c| chunking::chunk_text(&raw_body, c))
385            .collect();
386        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
387        for chunk_text in &chunk_texts {
388            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
389                &paths.models,
390                chunk_text,
391            )?);
392        }
393        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
394        chunk_embeddings_opt = Some(chunk_embeddings);
395        aggregated
396    };
397
398    let entity_embeddings = extracted_entities
399        .iter()
400        .map(|entity| {
401            let entity_text = match &entity.description {
402                Some(desc) => format!("{} {}", entity.name, desc),
403                None => entity.name.clone(),
404            };
405            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
406        })
407        .collect::<Result<Vec<_>, _>>()?;
408
409    Ok(StagedFile {
410        body: raw_body,
411        body_hash,
412        snippet,
413        name: name.to_string(),
414        description,
415        embedding,
416        chunk_embeddings: chunk_embeddings_opt,
417        chunks_info,
418        entities: extracted_entities,
419        relationships: extracted_relationships,
420        entity_embeddings,
421        urls: extracted_urls,
422    })
423}
424
425/// Phase B: persists one `StagedFile` to the database on the main thread.
426fn persist_staged(
427    conn: &mut Connection,
428    namespace: &str,
429    memory_type: &str,
430    staged: StagedFile,
431) -> Result<FileSuccess, AppError> {
432    {
433        let active_count: u32 = conn.query_row(
434            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
435            [],
436            |r| r.get::<_, i64>(0).map(|v| v as u32),
437        )?;
438        let ns_exists: bool = conn.query_row(
439            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
440            rusqlite::params![namespace],
441            |r| r.get::<_, i64>(0).map(|v| v > 0),
442        )?;
443        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
444            return Err(AppError::NamespaceError(format!(
445                "active namespace limit of {} exceeded while creating '{namespace}'",
446                crate::constants::MAX_NAMESPACES_ACTIVE
447            )));
448        }
449    }
450
451    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
452    if existing_memory.is_some() {
453        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
454            &staged.name,
455            namespace,
456        )));
457    }
458    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
459
460    let new_memory = NewMemory {
461        namespace: namespace.to_string(),
462        name: staged.name.clone(),
463        memory_type: memory_type.to_string(),
464        description: staged.description.clone(),
465        body: staged.body,
466        body_hash: staged.body_hash,
467        session_id: None,
468        source: "agent".to_string(),
469        metadata: serde_json::json!({}),
470    };
471
472    if let Some(hash_id) = duplicate_hash_id {
473        tracing::debug!(
474            target: "ingest",
475            duplicate_memory_id = hash_id,
476            "identical body already exists; persisting a new memory anyway"
477        );
478    }
479
480    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
481
482    let memory_id = memories::insert(&tx, &new_memory)?;
483    versions::insert_version(
484        &tx,
485        memory_id,
486        1,
487        &staged.name,
488        memory_type,
489        &staged.description,
490        &new_memory.body,
491        &serde_json::to_string(&new_memory.metadata)?,
492        None,
493        "create",
494    )?;
495    memories::upsert_vec(
496        &tx,
497        memory_id,
498        namespace,
499        memory_type,
500        &staged.embedding,
501        &staged.name,
502        &staged.snippet,
503    )?;
504
505    if staged.chunks_info.len() > 1 {
506        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
507        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
508            AppError::Internal(anyhow::anyhow!(
509                "missing chunk embeddings cache on multi-chunk ingest path"
510            ))
511        })?;
512        for (i, emb) in chunk_embeddings.iter().enumerate() {
513            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
514        }
515    }
516
517    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
518        for (idx, entity) in staged.entities.iter().enumerate() {
519            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
520            let entity_embedding = &staged.entity_embeddings[idx];
521            entities::upsert_entity_vec(
522                &tx,
523                entity_id,
524                namespace,
525                &entity.entity_type,
526                entity_embedding,
527                &entity.name,
528            )?;
529            entities::link_memory_entity(&tx, memory_id, entity_id)?;
530            entities::increment_degree(&tx, entity_id)?;
531        }
532        let entity_types: std::collections::HashMap<&str, &str> = staged
533            .entities
534            .iter()
535            .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
536            .collect();
537        for rel in &staged.relationships {
538            let source_entity = NewEntity {
539                name: rel.source.clone(),
540                entity_type: entity_types
541                    .get(rel.source.as_str())
542                    .copied()
543                    .unwrap_or("concept")
544                    .to_string(),
545                description: None,
546            };
547            let target_entity = NewEntity {
548                name: rel.target.clone(),
549                entity_type: entity_types
550                    .get(rel.target.as_str())
551                    .copied()
552                    .unwrap_or("concept")
553                    .to_string(),
554                description: None,
555            };
556            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
557            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
558            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
559            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
560        }
561    }
562
563    tx.commit()?;
564
565    if !staged.urls.is_empty() {
566        let url_entries: Vec<storage_urls::MemoryUrl> = staged
567            .urls
568            .into_iter()
569            .map(|u| storage_urls::MemoryUrl {
570                url: u.url,
571                offset: Some(u.offset as i64),
572            })
573            .collect();
574        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
575    }
576
577    Ok(FileSuccess {
578        memory_id,
579        action: "created".to_string(),
580    })
581}
582
583pub fn run(args: IngestArgs) -> Result<(), AppError> {
584    let started = std::time::Instant::now();
585
586    if !args.dir.exists() {
587        return Err(AppError::NotFound(format!(
588            "directory not found: {}",
589            args.dir.display()
590        )));
591    }
592    if !args.dir.is_dir() {
593        return Err(AppError::Validation(format!(
594            "path is not a directory: {}",
595            args.dir.display()
596        )));
597    }
598
599    let mut files: Vec<PathBuf> = Vec::new();
600    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
601    files.sort();
602
603    if files.len() > args.max_files {
604        return Err(AppError::Validation(format!(
605            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
606            files.len(),
607            args.max_files
608        )));
609    }
610
611    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
612    let memory_type_str = args.r#type.as_str().to_string();
613
614    let paths = AppPaths::resolve(args.db.as_deref())?;
615    let mut conn_or_err = match init_storage(&paths) {
616        Ok(c) => Ok(c),
617        Err(e) => Err(format!("{e}")),
618    };
619
620    let mut succeeded: usize = 0;
621    let mut failed: usize = 0;
622    let mut skipped: usize = 0;
623    let total = files.len();
624
625    // Pre-resolve all names before parallelisation so Phase A workers see a
626    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
627    let mut taken_names: BTreeSet<String> = BTreeSet::new();
628
629    // Each entry: (path, file_str, derived_name, name_truncated, original_name)
630    // or None when the file should be skipped immediately.
631    struct FileSlot {
632        path: PathBuf,
633        file_str: String,
634        derived_name: String,
635        name_truncated: bool,
636        original_name: Option<String>,
637    }
638    enum Slot {
639        Skip {
640            file_str: String,
641            derived_base: String,
642            name_truncated: bool,
643            original_name: Option<String>,
644            reason: String,
645        },
646        Process(FileSlot),
647    }
648
649    let slots: Vec<Slot> = files
650        .iter()
651        .map(|path| {
652            let file_str = path.to_string_lossy().into_owned();
653            let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
654
655            if derived_base.is_empty() {
656                return Slot::Skip {
657                    file_str,
658                    derived_base: String::new(),
659                    name_truncated: false,
660                    original_name: None,
661                    reason: "could not derive a non-empty kebab-case name from filename"
662                        .to_string(),
663                };
664            }
665
666            match unique_name(&derived_base, &taken_names) {
667                Ok(derived_name) => {
668                    taken_names.insert(derived_name.clone());
669                    Slot::Process(FileSlot {
670                        path: path.clone(),
671                        file_str,
672                        derived_name,
673                        name_truncated,
674                        original_name,
675                    })
676                }
677                Err(e) => Slot::Skip {
678                    file_str,
679                    derived_base,
680                    name_truncated,
681                    original_name,
682                    reason: e.to_string(),
683                },
684            }
685        })
686        .collect();
687
688    // Determine rayon thread pool size, honoring --low-memory and the
689    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
690    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
691
692    let pool = rayon::ThreadPoolBuilder::new()
693        .num_threads(parallelism)
694        .build()
695        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
696
697    // Phase A: parallel compute. Indexed slot matches `slots` index for ordering.
698    let staged: Vec<Mutex<Option<Result<StagedFile, AppError>>>> =
699        (0..slots.len()).map(|_| Mutex::new(None)).collect();
700
701    let skip_extraction = args.skip_extraction;
702    let paths_ref = &paths;
703
704    pool.install(|| {
705        slots.par_iter().enumerate().for_each(|(idx, slot)| {
706            if let Slot::Process(fs) = slot {
707                let result =
708                    stage_file(idx, &fs.path, &fs.derived_name, paths_ref, skip_extraction);
709                // SAFETY: staged[idx] is only written once by this worker.
710                *staged[idx].lock().expect("staged slot poisoned") = Some(result);
711            }
712        });
713    });
714
715    // Phase B: sequential persist on main thread (Connection is !Sync).
716    let fail_fast = args.fail_fast;
717    for (idx, slot) in slots.iter().enumerate() {
718        match slot {
719            Slot::Skip {
720                file_str,
721                derived_base,
722                name_truncated,
723                original_name,
724                reason,
725            } => {
726                output::emit_json_compact(&IngestFileEvent {
727                    file: file_str,
728                    name: derived_base,
729                    status: "skipped",
730                    truncated: *name_truncated,
731                    original_name: original_name.clone(),
732                    error: Some(reason.clone()),
733                    memory_id: None,
734                    action: None,
735                })?;
736                skipped += 1;
737            }
738            Slot::Process(fs) => {
739                // If storage init failed, every file fails with the same error.
740                let conn = match conn_or_err.as_mut() {
741                    Ok(c) => c,
742                    Err(err_msg) => {
743                        let err_clone = err_msg.clone();
744                        output::emit_json_compact(&IngestFileEvent {
745                            file: &fs.file_str,
746                            name: &fs.derived_name,
747                            status: "failed",
748                            truncated: fs.name_truncated,
749                            original_name: fs.original_name.clone(),
750                            error: Some(err_clone.clone()),
751                            memory_id: None,
752                            action: None,
753                        })?;
754                        failed += 1;
755                        if fail_fast {
756                            output::emit_json_compact(&IngestSummary {
757                                summary: true,
758                                dir: args.dir.display().to_string(),
759                                pattern: args.pattern.clone(),
760                                recursive: args.recursive,
761                                files_total: total,
762                                files_succeeded: succeeded,
763                                files_failed: failed,
764                                files_skipped: skipped,
765                                elapsed_ms: started.elapsed().as_millis() as u64,
766                            })?;
767                            return Err(AppError::Validation(format!(
768                                "ingest aborted on first failure: {err_clone}"
769                            )));
770                        }
771                        continue;
772                    }
773                };
774
775                // Take the Phase A result (always Some for Process slots).
776                let stage_result = staged[idx]
777                    .lock()
778                    .expect("staged slot poisoned")
779                    .take()
780                    .expect("staged slot empty for Process slot");
781
782                let outcome = stage_result
783                    .and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
784
785                match outcome {
786                    Ok(FileSuccess { memory_id, action }) => {
787                        output::emit_json_compact(&IngestFileEvent {
788                            file: &fs.file_str,
789                            name: &fs.derived_name,
790                            status: "indexed",
791                            truncated: fs.name_truncated,
792                            original_name: fs.original_name.clone(),
793                            error: None,
794                            memory_id: Some(memory_id),
795                            action: Some(action),
796                        })?;
797                        succeeded += 1;
798                    }
799                    Err(e) => {
800                        let err_msg = format!("{e}");
801                        output::emit_json_compact(&IngestFileEvent {
802                            file: &fs.file_str,
803                            name: &fs.derived_name,
804                            status: "failed",
805                            truncated: fs.name_truncated,
806                            original_name: fs.original_name.clone(),
807                            error: Some(err_msg.clone()),
808                            memory_id: None,
809                            action: None,
810                        })?;
811                        failed += 1;
812                        if fail_fast {
813                            output::emit_json_compact(&IngestSummary {
814                                summary: true,
815                                dir: args.dir.display().to_string(),
816                                pattern: args.pattern.clone(),
817                                recursive: args.recursive,
818                                files_total: total,
819                                files_succeeded: succeeded,
820                                files_failed: failed,
821                                files_skipped: skipped,
822                                elapsed_ms: started.elapsed().as_millis() as u64,
823                            })?;
824                            return Err(AppError::Validation(format!(
825                                "ingest aborted on first failure: {err_msg}"
826                            )));
827                        }
828                    }
829                }
830            }
831        }
832    }
833
834    output::emit_json_compact(&IngestSummary {
835        summary: true,
836        dir: args.dir.display().to_string(),
837        pattern: args.pattern.clone(),
838        recursive: args.recursive,
839        files_total: total,
840        files_succeeded: succeeded,
841        files_failed: failed,
842        files_skipped: skipped,
843        elapsed_ms: started.elapsed().as_millis() as u64,
844    })?;
845
846    Ok(())
847}
848
849/// Auto-initialises the database (matches the contract of every other CRUD
850/// handler) and returns a fresh read/write connection ready for the ingest
851/// loop. Errors here are recoverable per-file: the caller surfaces them as
852/// failure events so `--fail-fast` and the continue-on-error path keep
853/// working when, for example, the user points `--db` at an unwritable path.
854fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
855    ensure_db_ready(paths)?;
856    let conn = open_rw(&paths.db)?;
857    Ok(conn)
858}
859
860fn is_valid_entity_type(entity_type: &str) -> bool {
861    matches!(
862        entity_type,
863        "project"
864            | "tool"
865            | "person"
866            | "file"
867            | "concept"
868            | "incident"
869            | "decision"
870            | "memory"
871            | "dashboard"
872            | "issue_tracker"
873            | "organization"
874            | "location"
875            | "date"
876    )
877}
878
879fn is_valid_relation(relation: &str) -> bool {
880    matches!(
881        relation,
882        "applies_to"
883            | "uses"
884            | "depends_on"
885            | "causes"
886            | "fixes"
887            | "contradicts"
888            | "supports"
889            | "follows"
890            | "related"
891            | "mentions"
892            | "replaces"
893            | "tracked_in"
894    )
895}
896
897fn collect_files(
898    dir: &Path,
899    pattern: &str,
900    recursive: bool,
901    out: &mut Vec<PathBuf>,
902) -> Result<(), AppError> {
903    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
904    for entry in entries {
905        let entry = entry.map_err(AppError::Io)?;
906        let path = entry.path();
907        let file_type = entry.file_type().map_err(AppError::Io)?;
908        if file_type.is_file() {
909            let name = entry.file_name();
910            let name_str = name.to_string_lossy();
911            if matches_pattern(&name_str, pattern) {
912                out.push(path);
913            }
914        } else if file_type.is_dir() && recursive {
915            collect_files(&path, pattern, recursive, out)?;
916        }
917    }
918    Ok(())
919}
920
921fn matches_pattern(name: &str, pattern: &str) -> bool {
922    if let Some(suffix) = pattern.strip_prefix('*') {
923        name.ends_with(suffix)
924    } else if let Some(prefix) = pattern.strip_suffix('*') {
925        name.starts_with(prefix)
926    } else {
927        name == pattern
928    }
929}
930
931/// Returns `(final_name, truncated, original_name)`.
932/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
933/// `original_name` holds the pre-truncation name only when `truncated=true`.
934///
935/// Non-ASCII characters are first decomposed via NFD and then stripped of
936/// combining marks so accented letters fold to their base ASCII letter
937/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
938/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
939/// preserves meaningful word content rather than collapsing the basename
940/// to a few stray ASCII letters as the previous filter did.
941fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
942    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
943    let lowered: String = stem
944        .nfd()
945        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
946        .map(|c| {
947            if c == '_' || c.is_whitespace() {
948                '-'
949            } else {
950                c
951            }
952        })
953        .map(|c| c.to_ascii_lowercase())
954        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
955        .collect();
956    let collapsed = collapse_dashes(&lowered);
957    let trimmed = collapsed.trim_matches('-').to_string();
958    if trimmed.len() > DERIVED_NAME_MAX_LEN {
959        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
960            .trim_matches('-')
961            .to_string();
962        // v1.0.31 A10: surface the truncation so users can fix overly long file
963        // basenames before they collide with siblings sharing the same prefix.
964        tracing::warn!(
965            target: "ingest",
966            original = %trimmed,
967            truncated_to = %truncated,
968            max_len = DERIVED_NAME_MAX_LEN,
969            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
970        );
971        (truncated, true, Some(trimmed))
972    } else {
973        (trimmed, false, None)
974    }
975}
976
977/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
978/// numeric suffix (`-1`, `-2`, …) when needed.
979///
980/// `taken` is the set of names already consumed in the current ingest run.
981/// The caller is expected to insert the returned name into `taken` so the
982/// next call observes the consumption. Cross-run collisions are intentionally
983/// surfaced by the per-file persistence path as duplicates so re-ingestion
984/// of identical corpora stays idempotent.
985///
986/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
987/// candidates collide, signalling a pathological corpus that should be
988/// renamed manually.
989fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
990    if !taken.contains(base) {
991        return Ok(base.to_string());
992    }
993    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
994        let candidate = format!("{base}-{suffix}");
995        if !taken.contains(&candidate) {
996            tracing::warn!(
997                target: "ingest",
998                base = %base,
999                resolved = %candidate,
1000                suffix,
1001                "memory name collision resolved with numeric suffix"
1002            );
1003            return Ok(candidate);
1004        }
1005    }
1006    Err(AppError::Validation(format!(
1007        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1008    )))
1009}
1010
1011fn collapse_dashes(s: &str) -> String {
1012    let mut out = String::with_capacity(s.len());
1013    let mut prev_dash = false;
1014    for c in s.chars() {
1015        if c == '-' {
1016            if !prev_dash {
1017                out.push('-');
1018            }
1019            prev_dash = true;
1020        } else {
1021            out.push(c);
1022            prev_dash = false;
1023        }
1024    }
1025    out
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use std::path::PathBuf;
1032
1033    #[test]
1034    fn matches_pattern_suffix() {
1035        assert!(matches_pattern("foo.md", "*.md"));
1036        assert!(!matches_pattern("foo.txt", "*.md"));
1037        assert!(matches_pattern("foo.md", "*"));
1038    }
1039
1040    #[test]
1041    fn matches_pattern_prefix() {
1042        assert!(matches_pattern("README.md", "README*"));
1043        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1044    }
1045
1046    #[test]
1047    fn matches_pattern_exact() {
1048        assert!(matches_pattern("README.md", "README.md"));
1049        assert!(!matches_pattern("readme.md", "README.md"));
1050    }
1051
1052    #[test]
1053    fn derive_kebab_underscore_to_dash() {
1054        let p = PathBuf::from("/tmp/claude_code_headless.md");
1055        let (name, truncated, original) = derive_kebab_name(&p);
1056        assert_eq!(name, "claude-code-headless");
1057        assert!(!truncated);
1058        assert!(original.is_none());
1059    }
1060
1061    #[test]
1062    fn derive_kebab_uppercase_lowered() {
1063        let p = PathBuf::from("/tmp/README.md");
1064        let (name, truncated, original) = derive_kebab_name(&p);
1065        assert_eq!(name, "readme");
1066        assert!(!truncated);
1067        assert!(original.is_none());
1068    }
1069
1070    #[test]
1071    fn derive_kebab_strips_non_kebab_chars() {
1072        let p = PathBuf::from("/tmp/some@weird#name!.md");
1073        let (name, truncated, original) = derive_kebab_name(&p);
1074        assert_eq!(name, "someweirdname");
1075        assert!(!truncated);
1076        assert!(original.is_none());
1077    }
1078
1079    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1080    // accented characters instead of dropping them entirely.
1081    #[test]
1082    fn derive_kebab_folds_accented_letters_to_ascii() {
1083        let p = PathBuf::from("/tmp/açaí.md");
1084        let (name, _, _) = derive_kebab_name(&p);
1085        assert_eq!(name, "acai", "got '{name}'");
1086    }
1087
1088    #[test]
1089    fn derive_kebab_handles_naive_with_diaeresis() {
1090        let p = PathBuf::from("/tmp/naïve-test.md");
1091        let (name, _, _) = derive_kebab_name(&p);
1092        assert_eq!(name, "naive-test", "got '{name}'");
1093    }
1094
1095    #[test]
1096    fn derive_kebab_drops_emoji_keeps_word() {
1097        let p = PathBuf::from("/tmp/🚀-rocket.md");
1098        let (name, _, _) = derive_kebab_name(&p);
1099        assert_eq!(name, "rocket", "got '{name}'");
1100    }
1101
1102    #[test]
1103    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1104        let p = PathBuf::from("/tmp/açaí🦜.md");
1105        let (name, _, _) = derive_kebab_name(&p);
1106        assert_eq!(name, "acai", "got '{name}'");
1107    }
1108
1109    #[test]
1110    fn derive_kebab_pure_emoji_yields_empty() {
1111        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1112        let (name, _, _) = derive_kebab_name(&p);
1113        assert!(name.is_empty(), "got '{name}'");
1114    }
1115
1116    #[test]
1117    fn derive_kebab_collapses_consecutive_dashes() {
1118        let p = PathBuf::from("/tmp/a__b___c.md");
1119        let (name, truncated, original) = derive_kebab_name(&p);
1120        assert_eq!(name, "a-b-c");
1121        assert!(!truncated);
1122        assert!(original.is_none());
1123    }
1124
1125    #[test]
1126    fn derive_kebab_truncates_to_60_chars() {
1127        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1128        let (name, truncated, original) = derive_kebab_name(&p);
1129        assert!(name.len() <= 60, "got len {}", name.len());
1130        assert!(truncated);
1131        assert!(original.is_some());
1132        assert!(original.unwrap().len() > 60);
1133    }
1134
1135    #[test]
1136    fn collect_files_finds_md_files() {
1137        let tmp = tempfile::tempdir().expect("tempdir");
1138        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1139        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1140        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1141        let mut out = Vec::new();
1142        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1143        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1144    }
1145
1146    #[test]
1147    fn collect_files_recursive_descends_subdirs() {
1148        let tmp = tempfile::tempdir().expect("tempdir");
1149        let sub = tmp.path().join("sub");
1150        std::fs::create_dir(&sub).unwrap();
1151        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1152        std::fs::write(sub.join("b.md"), "y").unwrap();
1153        let mut out = Vec::new();
1154        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1155        assert_eq!(out.len(), 2);
1156    }
1157
1158    #[test]
1159    fn collect_files_non_recursive_skips_subdirs() {
1160        let tmp = tempfile::tempdir().expect("tempdir");
1161        let sub = tmp.path().join("sub");
1162        std::fs::create_dir(&sub).unwrap();
1163        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1164        std::fs::write(sub.join("b.md"), "y").unwrap();
1165        let mut out = Vec::new();
1166        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1167        assert_eq!(out.len(), 1);
1168    }
1169
1170    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1171
1172    #[test]
1173    fn derive_kebab_long_basename_truncated_within_cap() {
1174        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1175        let (name, truncated, original) = derive_kebab_name(&p);
1176        assert!(
1177            name.len() <= DERIVED_NAME_MAX_LEN,
1178            "truncated name must respect cap; got {} chars",
1179            name.len()
1180        );
1181        assert!(!name.is_empty());
1182        assert!(truncated);
1183        assert!(original.is_some());
1184    }
1185
1186    #[test]
1187    fn unique_name_returns_base_when_free() {
1188        let taken: BTreeSet<String> = BTreeSet::new();
1189        let resolved = unique_name("note", &taken).expect("must resolve");
1190        assert_eq!(resolved, "note");
1191    }
1192
1193    #[test]
1194    fn unique_name_appends_first_free_suffix_on_collision() {
1195        let mut taken: BTreeSet<String> = BTreeSet::new();
1196        taken.insert("note".to_string());
1197        taken.insert("note-1".to_string());
1198        let resolved = unique_name("note", &taken).expect("must resolve");
1199        assert_eq!(resolved, "note-2");
1200    }
1201
1202    #[test]
1203    fn unique_name_errors_after_collision_cap() {
1204        let mut taken: BTreeSet<String> = BTreeSet::new();
1205        taken.insert("note".to_string());
1206        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1207            taken.insert(format!("note-{i}"));
1208        }
1209        let err = unique_name("note", &taken).expect_err("must surface error");
1210        assert!(matches!(err, AppError::Validation(_)));
1211    }
1212
1213    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1214
1215    #[test]
1216    fn is_valid_entity_type_accepts_v008_types() {
1217        assert!(is_valid_entity_type("organization"));
1218        assert!(is_valid_entity_type("location"));
1219        assert!(is_valid_entity_type("date"));
1220        assert!(!is_valid_entity_type("unknown"));
1221    }
1222
1223    #[test]
1224    fn is_valid_relation_accepts_canonical_relations() {
1225        assert!(is_valid_relation("applies_to"));
1226        assert!(is_valid_relation("depends_on"));
1227        assert!(!is_valid_relation("foo_bar"));
1228    }
1229
1230    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1231
1232    use serial_test::serial;
1233
1234    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1235    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1236        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1237        let prev = std::env::var(key).ok();
1238        match value {
1239            Some(v) => std::env::set_var(key, v),
1240            None => std::env::remove_var(key),
1241        }
1242        f();
1243        match prev {
1244            Some(p) => std::env::set_var(key, p),
1245            None => std::env::remove_var(key),
1246        }
1247    }
1248
1249    #[test]
1250    #[serial]
1251    fn env_low_memory_enabled_unset_returns_false() {
1252        with_env_var(None, || assert!(!env_low_memory_enabled()));
1253    }
1254
1255    #[test]
1256    #[serial]
1257    fn env_low_memory_enabled_empty_returns_false() {
1258        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1259    }
1260
1261    #[test]
1262    #[serial]
1263    fn env_low_memory_enabled_truthy_values_return_true() {
1264        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1265            with_env_var(Some(v), || {
1266                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1267            });
1268        }
1269    }
1270
1271    #[test]
1272    #[serial]
1273    fn env_low_memory_enabled_falsy_values_return_false() {
1274        for v in ["0", "false", "FALSE", "no", "off"] {
1275            with_env_var(Some(v), || {
1276                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1277            });
1278        }
1279    }
1280
1281    #[test]
1282    #[serial]
1283    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1284        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1285    }
1286
1287    #[test]
1288    #[serial]
1289    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1290        with_env_var(None, || {
1291            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1292            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1293            assert_eq!(resolve_parallelism(true, None), 1);
1294        });
1295    }
1296
1297    #[test]
1298    #[serial]
1299    fn resolve_parallelism_env_forces_one_when_flag_off() {
1300        with_env_var(Some("1"), || {
1301            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1302            assert_eq!(resolve_parallelism(false, None), 1);
1303        });
1304    }
1305
1306    #[test]
1307    #[serial]
1308    fn resolve_parallelism_falsy_env_does_not_override() {
1309        with_env_var(Some("0"), || {
1310            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1311        });
1312    }
1313
1314    #[test]
1315    #[serial]
1316    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1317        with_env_var(None, || {
1318            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1319            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1320        });
1321    }
1322
1323    #[test]
1324    #[serial]
1325    fn resolve_parallelism_default_when_unset() {
1326        with_env_var(None, || {
1327            let p = resolve_parallelism(false, None);
1328            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1329        });
1330    }
1331
1332    #[test]
1333    fn ingest_args_parses_low_memory_flag_via_clap() {
1334        use clap::Parser;
1335        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1336        // on the public `Cli` definition so the flag is wired end-to-end.
1337        let cli = crate::cli::Cli::try_parse_from([
1338            "sqlite-graphrag",
1339            "ingest",
1340            "/tmp/dummy",
1341            "--type",
1342            "document",
1343            "--low-memory",
1344        ])
1345        .expect("parse must succeed");
1346        match cli.command {
1347            crate::cli::Commands::Ingest(args) => {
1348                assert!(args.low_memory, "--low-memory must set field to true");
1349            }
1350            _ => panic!("expected Ingest subcommand"),
1351        }
1352    }
1353
1354    #[test]
1355    fn ingest_args_low_memory_defaults_false() {
1356        use clap::Parser;
1357        let cli = crate::cli::Cli::try_parse_from([
1358            "sqlite-graphrag",
1359            "ingest",
1360            "/tmp/dummy",
1361            "--type",
1362            "document",
1363        ])
1364        .expect("parse must succeed");
1365        match cli.command {
1366            crate::cli::Commands::Ingest(args) => {
1367                assert!(!args.low_memory, "default must be false");
1368            }
1369            _ => panic!("expected Ingest subcommand"),
1370        }
1371    }
1372}