Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable BERT NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68NOTES:\n  \
69    Each file becomes a separate memory. Names derive from file basenames\n  \
70    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
71    followed by a final summary line with counts. Per-file errors are reported\n  \
72    inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74    /// Directory containing files to ingest.
75    #[arg(
76        value_name = "DIR",
77        help = "Directory to ingest recursively (each matching file becomes a memory)"
78    )]
79    pub dir: PathBuf,
80
81    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
82    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83    pub r#type: MemoryType,
84
85    /// Glob pattern matched against file basenames (default: `*.md`). Supports
86    /// `*.<ext>`, `<prefix>*`, and exact filename match.
87    #[arg(long, default_value = "*.md")]
88    pub pattern: String,
89
90    /// Recurse into subdirectories.
91    #[arg(long, default_value_t = false)]
92    pub recursive: bool,
93
94    #[arg(
95        long,
96        env = "SQLITE_GRAPHRAG_ENABLE_NER",
97        value_parser = crate::parsers::parse_bool_flexible,
98        action = clap::ArgAction::Set,
99        num_args = 0..=1,
100        default_missing_value = "true",
101        default_value = "false",
102        help = "Enable automatic BERT NER entity/relationship extraction (disabled by default)"
103    )]
104    pub enable_ner: bool,
105
106    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
107    #[arg(long, default_value_t = false, hide = true)]
108    pub skip_extraction: bool,
109
110    /// Stop on first per-file error instead of continuing with the next file.
111    #[arg(long, default_value_t = false)]
112    pub fail_fast: bool,
113
114    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
115    #[arg(long, default_value_t = 10_000)]
116    pub max_files: usize,
117
118    /// Namespace for the ingested memories.
119    #[arg(long)]
120    pub namespace: Option<String>,
121
122    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
123    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
124    pub db: Option<String>,
125
126    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
127    pub format: JsonOutputFormat,
128
129    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
130    pub json: bool,
131
132    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
133    #[arg(
134        long,
135        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
136    )]
137    pub ingest_parallelism: Option<usize>,
138
139    /// Force single-threaded ingest to reduce RSS pressure.
140    ///
141    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
142    /// explicit value. Recommended for environments with <4 GB available
143    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
144    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
145    /// (CLI flag has higher precedence than the env var).
146    #[arg(
147        long,
148        default_value_t = false,
149        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
150                Recommended for environments with <4 GB available RAM or container/cgroup \
151                constraints. Trade-off: 3-4x longer wall time. Also honored via \
152                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
153    )]
154    pub low_memory: bool,
155}
156
157/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
158/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
159/// values evaluate to false. Unrecognized non-empty values emit a
160/// `tracing::warn!` and evaluate to false.
161fn env_low_memory_enabled() -> bool {
162    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
163        Ok(v) if v.is_empty() => false,
164        Ok(v) => match v.to_lowercase().as_str() {
165            "1" | "true" | "yes" | "on" => true,
166            "0" | "false" | "no" | "off" => false,
167            other => {
168                tracing::warn!(
169                    target: "ingest",
170                    value = %other,
171                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
172                );
173                false
174            }
175        },
176        Err(_) => false,
177    }
178}
179
180/// Resolves the effective ingest parallelism honoring `--low-memory` and the
181/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
182///
183/// Precedence:
184/// 1. `--low-memory` CLI flag forces parallelism = 1.
185/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
186/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
187/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
188///
189/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
190/// emits a `tracing::warn!` advertising the override.
191fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
192    let env_flag = env_low_memory_enabled();
193    let low_memory = low_memory_flag || env_flag;
194
195    if low_memory {
196        if let Some(n) = ingest_parallelism {
197            if n > 1 {
198                tracing::warn!(
199                    target: "ingest",
200                    requested = n,
201                    "--ingest-parallelism overridden by --low-memory; using 1"
202                );
203            }
204        }
205        if low_memory_flag {
206            tracing::info!(
207                target: "ingest",
208                source = "flag",
209                "low-memory mode enabled: forcing --ingest-parallelism 1"
210            );
211        } else {
212            tracing::info!(
213                target: "ingest",
214                source = "env",
215                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
216            );
217        }
218        return 1;
219    }
220
221    ingest_parallelism
222        .unwrap_or_else(|| {
223            std::thread::available_parallelism()
224                .map(|v| v.get() / 2)
225                .unwrap_or(1)
226                .clamp(1, 4)
227        })
228        .max(1)
229}
230
231#[derive(Serialize)]
232struct IngestFileEvent<'a> {
233    file: &'a str,
234    name: &'a str,
235    status: &'a str,
236    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
237    truncated: bool,
238    /// Original derived name before truncation; only present when `truncated=true`.
239    #[serde(skip_serializing_if = "Option::is_none")]
240    original_name: Option<String>,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    error: Option<String>,
243    #[serde(skip_serializing_if = "Option::is_none")]
244    memory_id: Option<i64>,
245    #[serde(skip_serializing_if = "Option::is_none")]
246    action: Option<String>,
247}
248
249#[derive(Serialize)]
250struct IngestSummary {
251    summary: bool,
252    dir: String,
253    pattern: String,
254    recursive: bool,
255    files_total: usize,
256    files_succeeded: usize,
257    files_failed: usize,
258    files_skipped: usize,
259    elapsed_ms: u64,
260}
261
262/// Outcome of a successful per-file ingest, used to build the NDJSON event.
263struct FileSuccess {
264    memory_id: i64,
265    action: String,
266}
267
268/// NDJSON progress event emitted to stderr after each file completes Phase A.
269/// Schema version 1; consumers should check `schema_version` before parsing.
270#[derive(Serialize)]
271struct StageProgressEvent<'a> {
272    schema_version: u8,
273    event: &'a str,
274    path: &'a str,
275    ms: u64,
276    entities: usize,
277    relationships: usize,
278}
279
280/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
281/// Phase B persists these to SQLite on the main thread in submission order.
282struct StagedFile {
283    body: String,
284    body_hash: String,
285    snippet: String,
286    name: String,
287    description: String,
288    embedding: Vec<f32>,
289    chunk_embeddings: Option<Vec<Vec<f32>>>,
290    chunks_info: Vec<crate::chunking::Chunk>,
291    entities: Vec<NewEntity>,
292    relationships: Vec<NewRelationship>,
293    entity_embeddings: Vec<Vec<f32>>,
294    urls: Vec<crate::extraction::ExtractedUrl>,
295}
296
297/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
298/// Never touches the database — safe to run on any rayon thread.
299fn stage_file(
300    _idx: usize,
301    path: &Path,
302    name: &str,
303    paths: &AppPaths,
304    enable_ner: bool,
305) -> Result<StagedFile, AppError> {
306    use crate::constants::*;
307
308    if name.len() > MAX_MEMORY_NAME_LEN {
309        return Err(AppError::LimitExceeded(
310            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
311        ));
312    }
313    if name.starts_with("__") {
314        return Err(AppError::Validation(
315            crate::i18n::validation::reserved_name(),
316        ));
317    }
318    {
319        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
320            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
321        if !slug_re.is_match(name) {
322            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
323                name,
324            )));
325        }
326    }
327
328    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
329    if raw_body.len() > MAX_MEMORY_BODY_LEN {
330        return Err(AppError::LimitExceeded(
331            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
332        ));
333    }
334    if raw_body.trim().is_empty() {
335        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
336    }
337
338    let description = format!("ingested from {}", path.display());
339    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
340        return Err(AppError::Validation(
341            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
342        ));
343    }
344
345    let mut extracted_entities: Vec<NewEntity> = Vec::new();
346    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
347    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
348    if enable_ner {
349        match crate::extraction::extract_graph_auto(&raw_body, paths) {
350            Ok(extracted) => {
351                extracted_urls = extracted.urls;
352                extracted_entities = extracted.entities;
353                extracted_relationships = extracted.relationships;
354
355                if extracted_entities.len() > max_entities_per_memory() {
356                    extracted_entities.truncate(max_entities_per_memory());
357                }
358                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
359                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
360                }
361            }
362            Err(e) => {
363                tracing::warn!(
364                    file = %path.display(),
365                    "auto-extraction failed (graceful degradation): {e:#}"
366                );
367            }
368        }
369    }
370
371    for rel in &mut extracted_relationships {
372        rel.relation = rel.relation.replace('-', "_");
373        if !is_valid_relation(&rel.relation) {
374            return Err(AppError::Validation(format!(
375                "invalid relation '{}' for relationship '{}' -> '{}'",
376                rel.relation, rel.source, rel.target
377            )));
378        }
379        if !(0.0..=1.0).contains(&rel.strength) {
380            return Err(AppError::Validation(format!(
381                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
382                rel.strength, rel.source, rel.target
383            )));
384        }
385    }
386
387    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
388    let snippet: String = raw_body.chars().take(200).collect();
389
390    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
391    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
392    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
393        return Err(AppError::LimitExceeded(format!(
394            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
395            chunks_info.len(),
396            REMEMBER_MAX_SAFE_MULTI_CHUNKS
397        )));
398    }
399
400    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
401    let embedding = if chunks_info.len() == 1 {
402        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
403    } else {
404        let chunk_texts: Vec<&str> = chunks_info
405            .iter()
406            .map(|c| chunking::chunk_text(&raw_body, c))
407            .collect();
408        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
409        for chunk_text in &chunk_texts {
410            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
411                &paths.models,
412                chunk_text,
413            )?);
414        }
415        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
416        chunk_embeddings_opt = Some(chunk_embeddings);
417        aggregated
418    };
419
420    let entity_embeddings = extracted_entities
421        .iter()
422        .map(|entity| {
423            let entity_text = match &entity.description {
424                Some(desc) => format!("{} {}", entity.name, desc),
425                None => entity.name.clone(),
426            };
427            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
428        })
429        .collect::<Result<Vec<_>, _>>()?;
430
431    Ok(StagedFile {
432        body: raw_body,
433        body_hash,
434        snippet,
435        name: name.to_string(),
436        description,
437        embedding,
438        chunk_embeddings: chunk_embeddings_opt,
439        chunks_info,
440        entities: extracted_entities,
441        relationships: extracted_relationships,
442        entity_embeddings,
443        urls: extracted_urls,
444    })
445}
446
447/// Phase B: persists one `StagedFile` to the database on the main thread.
448fn persist_staged(
449    conn: &mut Connection,
450    namespace: &str,
451    memory_type: &str,
452    staged: StagedFile,
453) -> Result<FileSuccess, AppError> {
454    {
455        let active_count: u32 = conn.query_row(
456            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
457            [],
458            |r| r.get::<_, i64>(0).map(|v| v as u32),
459        )?;
460        let ns_exists: bool = conn.query_row(
461            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
462            rusqlite::params![namespace],
463            |r| r.get::<_, i64>(0).map(|v| v > 0),
464        )?;
465        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
466            return Err(AppError::NamespaceError(format!(
467                "active namespace limit of {} exceeded while creating '{namespace}'",
468                crate::constants::MAX_NAMESPACES_ACTIVE
469            )));
470        }
471    }
472
473    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
474    if existing_memory.is_some() {
475        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
476            &staged.name,
477            namespace,
478        )));
479    }
480    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
481
482    let new_memory = NewMemory {
483        namespace: namespace.to_string(),
484        name: staged.name.clone(),
485        memory_type: memory_type.to_string(),
486        description: staged.description.clone(),
487        body: staged.body,
488        body_hash: staged.body_hash,
489        session_id: None,
490        source: "agent".to_string(),
491        metadata: serde_json::json!({}),
492    };
493
494    if let Some(hash_id) = duplicate_hash_id {
495        tracing::debug!(
496            target: "ingest",
497            duplicate_memory_id = hash_id,
498            "identical body already exists; persisting a new memory anyway"
499        );
500    }
501
502    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
503
504    let memory_id = memories::insert(&tx, &new_memory)?;
505    versions::insert_version(
506        &tx,
507        memory_id,
508        1,
509        &staged.name,
510        memory_type,
511        &staged.description,
512        &new_memory.body,
513        &serde_json::to_string(&new_memory.metadata)?,
514        None,
515        "create",
516    )?;
517    memories::upsert_vec(
518        &tx,
519        memory_id,
520        namespace,
521        memory_type,
522        &staged.embedding,
523        &staged.name,
524        &staged.snippet,
525    )?;
526
527    if staged.chunks_info.len() > 1 {
528        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
529        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
530            AppError::Internal(anyhow::anyhow!(
531                "missing chunk embeddings cache on multi-chunk ingest path"
532            ))
533        })?;
534        for (i, emb) in chunk_embeddings.iter().enumerate() {
535            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
536        }
537    }
538
539    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
540        for (idx, entity) in staged.entities.iter().enumerate() {
541            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
542            let entity_embedding = &staged.entity_embeddings[idx];
543            entities::upsert_entity_vec(
544                &tx,
545                entity_id,
546                namespace,
547                entity.entity_type,
548                entity_embedding,
549                &entity.name,
550            )?;
551            entities::link_memory_entity(&tx, memory_id, entity_id)?;
552            entities::increment_degree(&tx, entity_id)?;
553        }
554        let entity_types: std::collections::HashMap<&str, EntityType> = staged
555            .entities
556            .iter()
557            .map(|entity| (entity.name.as_str(), entity.entity_type))
558            .collect();
559        for rel in &staged.relationships {
560            let source_entity = NewEntity {
561                name: rel.source.clone(),
562                entity_type: entity_types
563                    .get(rel.source.as_str())
564                    .copied()
565                    .unwrap_or(EntityType::Concept),
566                description: None,
567            };
568            let target_entity = NewEntity {
569                name: rel.target.clone(),
570                entity_type: entity_types
571                    .get(rel.target.as_str())
572                    .copied()
573                    .unwrap_or(EntityType::Concept),
574                description: None,
575            };
576            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
577            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
578            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
579            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
580        }
581    }
582
583    tx.commit()?;
584
585    if !staged.urls.is_empty() {
586        let url_entries: Vec<storage_urls::MemoryUrl> = staged
587            .urls
588            .into_iter()
589            .map(|u| storage_urls::MemoryUrl {
590                url: u.url,
591                offset: Some(u.offset as i64),
592            })
593            .collect();
594        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
595    }
596
597    Ok(FileSuccess {
598        memory_id,
599        action: "created".to_string(),
600    })
601}
602
603pub fn run(args: IngestArgs) -> Result<(), AppError> {
604    let started = std::time::Instant::now();
605
606    if !args.dir.exists() {
607        return Err(AppError::NotFound(format!(
608            "directory not found: {}",
609            args.dir.display()
610        )));
611    }
612    if !args.dir.is_dir() {
613        return Err(AppError::Validation(format!(
614            "path is not a directory: {}",
615            args.dir.display()
616        )));
617    }
618
619    let mut files: Vec<PathBuf> = Vec::new();
620    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
621    files.sort();
622
623    if files.len() > args.max_files {
624        return Err(AppError::Validation(format!(
625            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
626            files.len(),
627            args.max_files
628        )));
629    }
630
631    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
632    let memory_type_str = args.r#type.as_str().to_string();
633
634    let paths = AppPaths::resolve(args.db.as_deref())?;
635    let mut conn_or_err = match init_storage(&paths) {
636        Ok(c) => Ok(c),
637        Err(e) => Err(format!("{e}")),
638    };
639
640    let mut succeeded: usize = 0;
641    let mut failed: usize = 0;
642    let mut skipped: usize = 0;
643    let total = files.len();
644
645    // Pre-resolve all names before parallelisation so Phase A workers see a
646    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
647    let mut taken_names: BTreeSet<String> = BTreeSet::new();
648
649    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
650    // ProcessItem: the data moved into the producer thread for Phase A computation.
651    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
652    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
653    // boundary into the rayon producer.
654    enum SlotMeta {
655        Skip {
656            file_str: String,
657            derived_base: String,
658            name_truncated: bool,
659            original_name: Option<String>,
660            reason: String,
661        },
662        Process {
663            file_str: String,
664            derived_name: String,
665            name_truncated: bool,
666            original_name: Option<String>,
667        },
668    }
669
670    struct ProcessItem {
671        idx: usize,
672        path: PathBuf,
673        file_str: String,
674        derived_name: String,
675    }
676
677    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
678    let mut process_items: Vec<ProcessItem> = Vec::new();
679    let mut truncations: Vec<(String, String)> = Vec::new();
680
681    for path in &files {
682        let file_str = path.to_string_lossy().into_owned();
683        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
684
685        if name_truncated {
686            if let Some(ref orig) = original_name {
687                truncations.push((orig.clone(), derived_base.clone()));
688            }
689        }
690
691        if derived_base.is_empty() {
692            slots_meta.push(SlotMeta::Skip {
693                file_str,
694                derived_base: String::new(),
695                name_truncated: false,
696                original_name: None,
697                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
698            });
699            continue;
700        }
701
702        match unique_name(&derived_base, &taken_names) {
703            Ok(derived_name) => {
704                taken_names.insert(derived_name.clone());
705                let idx = slots_meta.len();
706                process_items.push(ProcessItem {
707                    idx,
708                    path: path.clone(),
709                    file_str: file_str.clone(),
710                    derived_name: derived_name.clone(),
711                });
712                slots_meta.push(SlotMeta::Process {
713                    file_str,
714                    derived_name,
715                    name_truncated,
716                    original_name,
717                });
718            }
719            Err(e) => {
720                slots_meta.push(SlotMeta::Skip {
721                    file_str,
722                    derived_base,
723                    name_truncated,
724                    original_name,
725                    reason: e.to_string(),
726                });
727            }
728        }
729    }
730
731    if !truncations.is_empty() {
732        tracing::info!(
733            target: "ingest",
734            count = truncations.len(),
735            max_len = DERIVED_NAME_MAX_LEN,
736            "derived names truncated; pass -vv (debug) for per-file detail"
737        );
738    }
739
740    // Determine rayon thread pool size, honoring --low-memory and the
741    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
742    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
743
744    let pool = rayon::ThreadPoolBuilder::new()
745        .num_threads(parallelism)
746        .build()
747        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
748
749    if args.enable_ner && args.skip_extraction {
750        tracing::warn!(
751            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
752        );
753    }
754    let enable_ner = args.enable_ner;
755
756    let total_to_process = process_items.len();
757    tracing::info!(
758        target = "ingest",
759        phase = "pipeline_start",
760        files = total_to_process,
761        ingest_parallelism = parallelism,
762        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
763    );
764
765    // Bounded channel: producer never gets more than parallelism*2 items ahead of
766    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
767    // Each message carries the slot index so Phase B can look up SlotMeta in order.
768    let channel_bound = (parallelism * 2).max(1);
769    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
770
771    // Phase A: launched in a dedicated OS thread so the main thread can consume
772    // the channel concurrently. pool.install() blocks the calling thread until
773    // all rayon workers finish — if called on the main thread it would
774    // reintroduce the 2-phase blocking behaviour we are eliminating.
775    let paths_owned = paths.clone();
776    let producer_handle = std::thread::spawn(move || {
777        pool.install(|| {
778            process_items.into_par_iter().for_each(|item| {
779                let t0 = std::time::Instant::now();
780                let result = stage_file(
781                    item.idx,
782                    &item.path,
783                    &item.derived_name,
784                    &paths_owned,
785                    enable_ner,
786                );
787                let elapsed_ms = t0.elapsed().as_millis() as u64;
788
789                // Emit NDJSON progress event to stderr so the user sees work
790                // happening during long NER runs (e.g. 50 files × 27s each).
791                let (n_entities, n_relationships) = match &result {
792                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
793                    Err(_) => (0, 0),
794                };
795                let progress = StageProgressEvent {
796                    schema_version: 1,
797                    event: "file_extracted",
798                    path: &item.file_str,
799                    ms: elapsed_ms,
800                    entities: n_entities,
801                    relationships: n_relationships,
802                };
803                if let Ok(line) = serde_json::to_string(&progress) {
804                    eprintln!("{line}");
805                }
806
807                // Blocking send applies backpressure: if Phase B is slower,
808                // Phase A workers wait here instead of accumulating staged files
809                // in memory. If the receiver is dropped (fail_fast abort), ignore.
810                let _ = tx.send((item.idx, result));
811            });
812            // Explicit drop of tx signals Phase B (rx iteration) to stop.
813            drop(tx);
814        });
815    });
816
817    // Phase B: main thread persists files as results arrive from the channel.
818    // Results arrive in completion order (par_iter is unordered). We persist
819    // each file immediately on arrival — this is the key fix for B1: with the
820    // old 2-phase design the first DB write happened only after ALL files had
821    // finished Phase A. Now the first commit happens as soon as the first file
822    // completes Phase A, regardless of how many files remain.
823    //
824    // NDJSON output order follows completion order (not file-system sort order).
825    // Skip slots are emitted at the end, after all Process results are consumed.
826    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
827    // requirement than ensuring data is persisted before the user's timeout fires.
828    let fail_fast = args.fail_fast;
829
830    // Emit pending Skip events first so agents see them early.
831    for meta in &slots_meta {
832        if let SlotMeta::Skip {
833            file_str,
834            derived_base,
835            name_truncated,
836            original_name,
837            reason,
838        } = meta
839        {
840            output::emit_json_compact(&IngestFileEvent {
841                file: file_str,
842                name: derived_base,
843                status: "skipped",
844                truncated: *name_truncated,
845                original_name: original_name.clone(),
846                error: Some(reason.clone()),
847                memory_id: None,
848                action: None,
849            })?;
850            skipped += 1;
851        }
852    }
853
854    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
855    // as channel messages arrive in completion order.
856    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
857        .iter()
858        .enumerate()
859        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
860        .collect();
861
862    tracing::info!(
863        target = "ingest",
864        phase = "persist_start",
865        files = total_to_process,
866        "phase B starting: persisting files incrementally as Phase A completes each one",
867    );
868
869    // Drain channel and persist each file immediately — no accumulation into a
870    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
871    // Phase B without applying backpressure.
872    for (idx, stage_result) in rx {
873        let meta = meta_index.get(&idx).ok_or_else(|| {
874            AppError::Internal(anyhow::anyhow!(
875                "channel idx {idx} has no corresponding Process slot"
876            ))
877        })?;
878        let (file_str, derived_name, name_truncated, original_name) = match meta {
879            SlotMeta::Process {
880                file_str,
881                derived_name,
882                name_truncated,
883                original_name,
884            } => (file_str, derived_name, name_truncated, original_name),
885            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
886        };
887
888        // If storage init failed, every file fails with the same error.
889        let conn = match conn_or_err.as_mut() {
890            Ok(c) => c,
891            Err(err_msg) => {
892                let err_clone = err_msg.clone();
893                output::emit_json_compact(&IngestFileEvent {
894                    file: file_str,
895                    name: derived_name,
896                    status: "failed",
897                    truncated: *name_truncated,
898                    original_name: original_name.clone(),
899                    error: Some(err_clone.clone()),
900                    memory_id: None,
901                    action: None,
902                })?;
903                failed += 1;
904                if fail_fast {
905                    output::emit_json_compact(&IngestSummary {
906                        summary: true,
907                        dir: args.dir.display().to_string(),
908                        pattern: args.pattern.clone(),
909                        recursive: args.recursive,
910                        files_total: total,
911                        files_succeeded: succeeded,
912                        files_failed: failed,
913                        files_skipped: skipped,
914                        elapsed_ms: started.elapsed().as_millis() as u64,
915                    })?;
916                    return Err(AppError::Validation(format!(
917                        "ingest aborted on first failure: {err_clone}"
918                    )));
919                }
920                continue;
921            }
922        };
923
924        let outcome =
925            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
926
927        match outcome {
928            Ok(FileSuccess { memory_id, action }) => {
929                output::emit_json_compact(&IngestFileEvent {
930                    file: file_str,
931                    name: derived_name,
932                    status: "indexed",
933                    truncated: *name_truncated,
934                    original_name: original_name.clone(),
935                    error: None,
936                    memory_id: Some(memory_id),
937                    action: Some(action),
938                })?;
939                succeeded += 1;
940            }
941            Err(e) => {
942                let err_msg = format!("{e}");
943                output::emit_json_compact(&IngestFileEvent {
944                    file: file_str,
945                    name: derived_name,
946                    status: "failed",
947                    truncated: *name_truncated,
948                    original_name: original_name.clone(),
949                    error: Some(err_msg.clone()),
950                    memory_id: None,
951                    action: None,
952                })?;
953                failed += 1;
954                if fail_fast {
955                    output::emit_json_compact(&IngestSummary {
956                        summary: true,
957                        dir: args.dir.display().to_string(),
958                        pattern: args.pattern.clone(),
959                        recursive: args.recursive,
960                        files_total: total,
961                        files_succeeded: succeeded,
962                        files_failed: failed,
963                        files_skipped: skipped,
964                        elapsed_ms: started.elapsed().as_millis() as u64,
965                    })?;
966                    return Err(AppError::Validation(format!(
967                        "ingest aborted on first failure: {err_msg}"
968                    )));
969                }
970            }
971        }
972    }
973
974    // Wait for the producer thread to finish cleanly.
975    producer_handle
976        .join()
977        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
978
979    output::emit_json_compact(&IngestSummary {
980        summary: true,
981        dir: args.dir.display().to_string(),
982        pattern: args.pattern.clone(),
983        recursive: args.recursive,
984        files_total: total,
985        files_succeeded: succeeded,
986        files_failed: failed,
987        files_skipped: skipped,
988        elapsed_ms: started.elapsed().as_millis() as u64,
989    })?;
990
991    Ok(())
992}
993
994/// Auto-initialises the database (matches the contract of every other CRUD
995/// handler) and returns a fresh read/write connection ready for the ingest
996/// loop. Errors here are recoverable per-file: the caller surfaces them as
997/// failure events so `--fail-fast` and the continue-on-error path keep
998/// working when, for example, the user points `--db` at an unwritable path.
999fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1000    ensure_db_ready(paths)?;
1001    let conn = open_rw(&paths.db)?;
1002    Ok(conn)
1003}
1004
1005fn is_valid_relation(relation: &str) -> bool {
1006    matches!(
1007        relation,
1008        "applies_to"
1009            | "uses"
1010            | "depends_on"
1011            | "causes"
1012            | "fixes"
1013            | "contradicts"
1014            | "supports"
1015            | "follows"
1016            | "related"
1017            | "mentions"
1018            | "replaces"
1019            | "tracked_in"
1020    )
1021}
1022
1023fn collect_files(
1024    dir: &Path,
1025    pattern: &str,
1026    recursive: bool,
1027    out: &mut Vec<PathBuf>,
1028) -> Result<(), AppError> {
1029    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1030    for entry in entries {
1031        let entry = entry.map_err(AppError::Io)?;
1032        let path = entry.path();
1033        let file_type = entry.file_type().map_err(AppError::Io)?;
1034        if file_type.is_file() {
1035            let name = entry.file_name();
1036            let name_str = name.to_string_lossy();
1037            if matches_pattern(&name_str, pattern) {
1038                out.push(path);
1039            }
1040        } else if file_type.is_dir() && recursive {
1041            collect_files(&path, pattern, recursive, out)?;
1042        }
1043    }
1044    Ok(())
1045}
1046
1047fn matches_pattern(name: &str, pattern: &str) -> bool {
1048    if let Some(suffix) = pattern.strip_prefix('*') {
1049        name.ends_with(suffix)
1050    } else if let Some(prefix) = pattern.strip_suffix('*') {
1051        name.starts_with(prefix)
1052    } else {
1053        name == pattern
1054    }
1055}
1056
1057/// Returns `(final_name, truncated, original_name)`.
1058/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1059/// `original_name` holds the pre-truncation name only when `truncated=true`.
1060///
1061/// Non-ASCII characters are first decomposed via NFD and then stripped of
1062/// combining marks so accented letters fold to their base ASCII letter
1063/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1064/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1065/// preserves meaningful word content rather than collapsing the basename
1066/// to a few stray ASCII letters as the previous filter did.
1067fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1068    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1069    let lowered: String = stem
1070        .nfd()
1071        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1072        .map(|c| {
1073            if c == '_' || c.is_whitespace() {
1074                '-'
1075            } else {
1076                c
1077            }
1078        })
1079        .map(|c| c.to_ascii_lowercase())
1080        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1081        .collect();
1082    let collapsed = collapse_dashes(&lowered);
1083    let trimmed = collapsed.trim_matches('-').to_string();
1084    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1085        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1086            .trim_matches('-')
1087            .to_string();
1088        tracing::debug!(
1089            target: "ingest",
1090            original = %trimmed,
1091            truncated_to = %truncated,
1092            max_len = DERIVED_NAME_MAX_LEN,
1093            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1094        );
1095        (truncated, true, Some(trimmed))
1096    } else {
1097        (trimmed, false, None)
1098    }
1099}
1100
1101/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1102/// numeric suffix (`-1`, `-2`, …) when needed.
1103///
1104/// `taken` is the set of names already consumed in the current ingest run.
1105/// The caller is expected to insert the returned name into `taken` so the
1106/// next call observes the consumption. Cross-run collisions are intentionally
1107/// surfaced by the per-file persistence path as duplicates so re-ingestion
1108/// of identical corpora stays idempotent.
1109///
1110/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1111/// candidates collide, signalling a pathological corpus that should be
1112/// renamed manually.
1113fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1114    if !taken.contains(base) {
1115        return Ok(base.to_string());
1116    }
1117    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1118        let candidate = format!("{base}-{suffix}");
1119        if !taken.contains(&candidate) {
1120            tracing::warn!(
1121                target: "ingest",
1122                base = %base,
1123                resolved = %candidate,
1124                suffix,
1125                "memory name collision resolved with numeric suffix"
1126            );
1127            return Ok(candidate);
1128        }
1129    }
1130    Err(AppError::Validation(format!(
1131        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1132    )))
1133}
1134
1135fn collapse_dashes(s: &str) -> String {
1136    let mut out = String::with_capacity(s.len());
1137    let mut prev_dash = false;
1138    for c in s.chars() {
1139        if c == '-' {
1140            if !prev_dash {
1141                out.push('-');
1142            }
1143            prev_dash = true;
1144        } else {
1145            out.push(c);
1146            prev_dash = false;
1147        }
1148    }
1149    out
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155    use std::path::PathBuf;
1156
1157    #[test]
1158    fn matches_pattern_suffix() {
1159        assert!(matches_pattern("foo.md", "*.md"));
1160        assert!(!matches_pattern("foo.txt", "*.md"));
1161        assert!(matches_pattern("foo.md", "*"));
1162    }
1163
1164    #[test]
1165    fn matches_pattern_prefix() {
1166        assert!(matches_pattern("README.md", "README*"));
1167        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1168    }
1169
1170    #[test]
1171    fn matches_pattern_exact() {
1172        assert!(matches_pattern("README.md", "README.md"));
1173        assert!(!matches_pattern("readme.md", "README.md"));
1174    }
1175
1176    #[test]
1177    fn derive_kebab_underscore_to_dash() {
1178        let p = PathBuf::from("/tmp/claude_code_headless.md");
1179        let (name, truncated, original) = derive_kebab_name(&p);
1180        assert_eq!(name, "claude-code-headless");
1181        assert!(!truncated);
1182        assert!(original.is_none());
1183    }
1184
1185    #[test]
1186    fn derive_kebab_uppercase_lowered() {
1187        let p = PathBuf::from("/tmp/README.md");
1188        let (name, truncated, original) = derive_kebab_name(&p);
1189        assert_eq!(name, "readme");
1190        assert!(!truncated);
1191        assert!(original.is_none());
1192    }
1193
1194    #[test]
1195    fn derive_kebab_strips_non_kebab_chars() {
1196        let p = PathBuf::from("/tmp/some@weird#name!.md");
1197        let (name, truncated, original) = derive_kebab_name(&p);
1198        assert_eq!(name, "someweirdname");
1199        assert!(!truncated);
1200        assert!(original.is_none());
1201    }
1202
1203    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1204    // accented characters instead of dropping them entirely.
1205    #[test]
1206    fn derive_kebab_folds_accented_letters_to_ascii() {
1207        let p = PathBuf::from("/tmp/açaí.md");
1208        let (name, _, _) = derive_kebab_name(&p);
1209        assert_eq!(name, "acai", "got '{name}'");
1210    }
1211
1212    #[test]
1213    fn derive_kebab_handles_naive_with_diaeresis() {
1214        let p = PathBuf::from("/tmp/naïve-test.md");
1215        let (name, _, _) = derive_kebab_name(&p);
1216        assert_eq!(name, "naive-test", "got '{name}'");
1217    }
1218
1219    #[test]
1220    fn derive_kebab_drops_emoji_keeps_word() {
1221        let p = PathBuf::from("/tmp/🚀-rocket.md");
1222        let (name, _, _) = derive_kebab_name(&p);
1223        assert_eq!(name, "rocket", "got '{name}'");
1224    }
1225
1226    #[test]
1227    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1228        let p = PathBuf::from("/tmp/açaí🦜.md");
1229        let (name, _, _) = derive_kebab_name(&p);
1230        assert_eq!(name, "acai", "got '{name}'");
1231    }
1232
1233    #[test]
1234    fn derive_kebab_pure_emoji_yields_empty() {
1235        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1236        let (name, _, _) = derive_kebab_name(&p);
1237        assert!(name.is_empty(), "got '{name}'");
1238    }
1239
1240    #[test]
1241    fn derive_kebab_collapses_consecutive_dashes() {
1242        let p = PathBuf::from("/tmp/a__b___c.md");
1243        let (name, truncated, original) = derive_kebab_name(&p);
1244        assert_eq!(name, "a-b-c");
1245        assert!(!truncated);
1246        assert!(original.is_none());
1247    }
1248
1249    #[test]
1250    fn derive_kebab_truncates_to_60_chars() {
1251        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1252        let (name, truncated, original) = derive_kebab_name(&p);
1253        assert!(name.len() <= 60, "got len {}", name.len());
1254        assert!(truncated);
1255        assert!(original.is_some());
1256        assert!(original.unwrap().len() > 60);
1257    }
1258
1259    #[test]
1260    fn collect_files_finds_md_files() {
1261        let tmp = tempfile::tempdir().expect("tempdir");
1262        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1263        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1264        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1265        let mut out = Vec::new();
1266        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1267        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1268    }
1269
1270    #[test]
1271    fn collect_files_recursive_descends_subdirs() {
1272        let tmp = tempfile::tempdir().expect("tempdir");
1273        let sub = tmp.path().join("sub");
1274        std::fs::create_dir(&sub).unwrap();
1275        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1276        std::fs::write(sub.join("b.md"), "y").unwrap();
1277        let mut out = Vec::new();
1278        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1279        assert_eq!(out.len(), 2);
1280    }
1281
1282    #[test]
1283    fn collect_files_non_recursive_skips_subdirs() {
1284        let tmp = tempfile::tempdir().expect("tempdir");
1285        let sub = tmp.path().join("sub");
1286        std::fs::create_dir(&sub).unwrap();
1287        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1288        std::fs::write(sub.join("b.md"), "y").unwrap();
1289        let mut out = Vec::new();
1290        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1291        assert_eq!(out.len(), 1);
1292    }
1293
1294    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1295
1296    #[test]
1297    fn derive_kebab_long_basename_truncated_within_cap() {
1298        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1299        let (name, truncated, original) = derive_kebab_name(&p);
1300        assert!(
1301            name.len() <= DERIVED_NAME_MAX_LEN,
1302            "truncated name must respect cap; got {} chars",
1303            name.len()
1304        );
1305        assert!(!name.is_empty());
1306        assert!(truncated);
1307        assert!(original.is_some());
1308    }
1309
1310    #[test]
1311    fn unique_name_returns_base_when_free() {
1312        let taken: BTreeSet<String> = BTreeSet::new();
1313        let resolved = unique_name("note", &taken).expect("must resolve");
1314        assert_eq!(resolved, "note");
1315    }
1316
1317    #[test]
1318    fn unique_name_appends_first_free_suffix_on_collision() {
1319        let mut taken: BTreeSet<String> = BTreeSet::new();
1320        taken.insert("note".to_string());
1321        taken.insert("note-1".to_string());
1322        let resolved = unique_name("note", &taken).expect("must resolve");
1323        assert_eq!(resolved, "note-2");
1324    }
1325
1326    #[test]
1327    fn unique_name_errors_after_collision_cap() {
1328        let mut taken: BTreeSet<String> = BTreeSet::new();
1329        taken.insert("note".to_string());
1330        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1331            taken.insert(format!("note-{i}"));
1332        }
1333        let err = unique_name("note", &taken).expect_err("must surface error");
1334        assert!(matches!(err, AppError::Validation(_)));
1335    }
1336
1337    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1338
1339    #[test]
1340    fn is_valid_relation_accepts_canonical_relations() {
1341        assert!(is_valid_relation("applies_to"));
1342        assert!(is_valid_relation("depends_on"));
1343        assert!(!is_valid_relation("foo_bar"));
1344    }
1345
1346    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1347
1348    use serial_test::serial;
1349
1350    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1351    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1352        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1353        let prev = std::env::var(key).ok();
1354        match value {
1355            Some(v) => std::env::set_var(key, v),
1356            None => std::env::remove_var(key),
1357        }
1358        f();
1359        match prev {
1360            Some(p) => std::env::set_var(key, p),
1361            None => std::env::remove_var(key),
1362        }
1363    }
1364
1365    #[test]
1366    #[serial]
1367    fn env_low_memory_enabled_unset_returns_false() {
1368        with_env_var(None, || assert!(!env_low_memory_enabled()));
1369    }
1370
1371    #[test]
1372    #[serial]
1373    fn env_low_memory_enabled_empty_returns_false() {
1374        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1375    }
1376
1377    #[test]
1378    #[serial]
1379    fn env_low_memory_enabled_truthy_values_return_true() {
1380        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1381            with_env_var(Some(v), || {
1382                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1383            });
1384        }
1385    }
1386
1387    #[test]
1388    #[serial]
1389    fn env_low_memory_enabled_falsy_values_return_false() {
1390        for v in ["0", "false", "FALSE", "no", "off"] {
1391            with_env_var(Some(v), || {
1392                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1393            });
1394        }
1395    }
1396
1397    #[test]
1398    #[serial]
1399    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1400        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1401    }
1402
1403    #[test]
1404    #[serial]
1405    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1406        with_env_var(None, || {
1407            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1408            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1409            assert_eq!(resolve_parallelism(true, None), 1);
1410        });
1411    }
1412
1413    #[test]
1414    #[serial]
1415    fn resolve_parallelism_env_forces_one_when_flag_off() {
1416        with_env_var(Some("1"), || {
1417            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1418            assert_eq!(resolve_parallelism(false, None), 1);
1419        });
1420    }
1421
1422    #[test]
1423    #[serial]
1424    fn resolve_parallelism_falsy_env_does_not_override() {
1425        with_env_var(Some("0"), || {
1426            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1427        });
1428    }
1429
1430    #[test]
1431    #[serial]
1432    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1433        with_env_var(None, || {
1434            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1435            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1436        });
1437    }
1438
1439    #[test]
1440    #[serial]
1441    fn resolve_parallelism_default_when_unset() {
1442        with_env_var(None, || {
1443            let p = resolve_parallelism(false, None);
1444            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1445        });
1446    }
1447
1448    #[test]
1449    fn ingest_args_parses_low_memory_flag_via_clap() {
1450        use clap::Parser;
1451        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1452        // on the public `Cli` definition so the flag is wired end-to-end.
1453        let cli = crate::cli::Cli::try_parse_from([
1454            "sqlite-graphrag",
1455            "ingest",
1456            "/tmp/dummy",
1457            "--type",
1458            "document",
1459            "--low-memory",
1460        ])
1461        .expect("parse must succeed");
1462        match cli.command {
1463            crate::cli::Commands::Ingest(args) => {
1464                assert!(args.low_memory, "--low-memory must set field to true");
1465            }
1466            _ => panic!("expected Ingest subcommand"),
1467        }
1468    }
1469
1470    #[test]
1471    fn ingest_args_low_memory_defaults_false() {
1472        use clap::Parser;
1473        let cli = crate::cli::Cli::try_parse_from([
1474            "sqlite-graphrag",
1475            "ingest",
1476            "/tmp/dummy",
1477            "--type",
1478            "document",
1479        ])
1480        .expect("parse must succeed");
1481        match cli.command {
1482            crate::cli::Commands::Ingest(args) => {
1483                assert!(!args.low_memory, "default must be false");
1484            }
1485            _ => panic!("expected Ingest subcommand"),
1486        }
1487    }
1488}