Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable BERT NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68NOTES:\n  \
69    Each file becomes a separate memory. Names derive from file basenames\n  \
70    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
71    followed by a final summary line with counts. Per-file errors are reported\n  \
72    inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74    /// Directory containing files to ingest.
75    #[arg(
76        value_name = "DIR",
77        help = "Directory to ingest recursively (each matching file becomes a memory)"
78    )]
79    pub dir: PathBuf,
80
81    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
82    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83    pub r#type: MemoryType,
84
85    /// Glob pattern matched against file basenames (default: `*.md`). Supports
86    /// `*.<ext>`, `<prefix>*`, and exact filename match.
87    #[arg(long, default_value = "*.md")]
88    pub pattern: String,
89
90    /// Recurse into subdirectories.
91    #[arg(long, default_value_t = false)]
92    pub recursive: bool,
93
94    /// Enable automatic BERT NER entity/relationship extraction (disabled by default).
95    #[arg(long, env = "SQLITE_GRAPHRAG_ENABLE_NER", default_value_t = false)]
96    pub enable_ner: bool,
97
98    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
99    #[arg(long, default_value_t = false, hide = true)]
100    pub skip_extraction: bool,
101
102    /// Stop on first per-file error instead of continuing with the next file.
103    #[arg(long, default_value_t = false)]
104    pub fail_fast: bool,
105
106    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
107    #[arg(long, default_value_t = 10_000)]
108    pub max_files: usize,
109
110    /// Namespace for the ingested memories.
111    #[arg(long)]
112    pub namespace: Option<String>,
113
114    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
115    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
116    pub db: Option<String>,
117
118    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
119    pub format: JsonOutputFormat,
120
121    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
122    pub json: bool,
123
124    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
125    #[arg(
126        long,
127        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
128    )]
129    pub ingest_parallelism: Option<usize>,
130
131    /// Force single-threaded ingest to reduce RSS pressure.
132    ///
133    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
134    /// explicit value. Recommended for environments with <4 GB available
135    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
136    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
137    /// (CLI flag has higher precedence than the env var).
138    #[arg(
139        long,
140        default_value_t = false,
141        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
142                Recommended for environments with <4 GB available RAM or container/cgroup \
143                constraints. Trade-off: 3-4x longer wall time. Also honored via \
144                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
145    )]
146    pub low_memory: bool,
147}
148
149/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
150/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
151/// values evaluate to false. Unrecognized non-empty values emit a
152/// `tracing::warn!` and evaluate to false.
153fn env_low_memory_enabled() -> bool {
154    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
155        Ok(v) if v.is_empty() => false,
156        Ok(v) => match v.to_lowercase().as_str() {
157            "1" | "true" | "yes" | "on" => true,
158            "0" | "false" | "no" | "off" => false,
159            other => {
160                tracing::warn!(
161                    target: "ingest",
162                    value = %other,
163                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
164                );
165                false
166            }
167        },
168        Err(_) => false,
169    }
170}
171
172/// Resolves the effective ingest parallelism honoring `--low-memory` and the
173/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
174///
175/// Precedence:
176/// 1. `--low-memory` CLI flag forces parallelism = 1.
177/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
178/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
179/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
180///
181/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
182/// emits a `tracing::warn!` advertising the override.
183fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
184    let env_flag = env_low_memory_enabled();
185    let low_memory = low_memory_flag || env_flag;
186
187    if low_memory {
188        if let Some(n) = ingest_parallelism {
189            if n > 1 {
190                tracing::warn!(
191                    target: "ingest",
192                    requested = n,
193                    "--ingest-parallelism overridden by --low-memory; using 1"
194                );
195            }
196        }
197        if low_memory_flag {
198            tracing::info!(
199                target: "ingest",
200                source = "flag",
201                "low-memory mode enabled: forcing --ingest-parallelism 1"
202            );
203        } else {
204            tracing::info!(
205                target: "ingest",
206                source = "env",
207                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
208            );
209        }
210        return 1;
211    }
212
213    ingest_parallelism
214        .unwrap_or_else(|| {
215            std::thread::available_parallelism()
216                .map(|v| v.get() / 2)
217                .unwrap_or(1)
218                .clamp(1, 4)
219        })
220        .max(1)
221}
222
223#[derive(Serialize)]
224struct IngestFileEvent<'a> {
225    file: &'a str,
226    name: &'a str,
227    status: &'a str,
228    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
229    truncated: bool,
230    /// Original derived name before truncation; only present when `truncated=true`.
231    #[serde(skip_serializing_if = "Option::is_none")]
232    original_name: Option<String>,
233    #[serde(skip_serializing_if = "Option::is_none")]
234    error: Option<String>,
235    #[serde(skip_serializing_if = "Option::is_none")]
236    memory_id: Option<i64>,
237    #[serde(skip_serializing_if = "Option::is_none")]
238    action: Option<String>,
239}
240
241#[derive(Serialize)]
242struct IngestSummary {
243    summary: bool,
244    dir: String,
245    pattern: String,
246    recursive: bool,
247    files_total: usize,
248    files_succeeded: usize,
249    files_failed: usize,
250    files_skipped: usize,
251    elapsed_ms: u64,
252}
253
254/// Outcome of a successful per-file ingest, used to build the NDJSON event.
255struct FileSuccess {
256    memory_id: i64,
257    action: String,
258}
259
260/// NDJSON progress event emitted to stderr after each file completes Phase A.
261/// Schema version 1; consumers should check `schema_version` before parsing.
262#[derive(Serialize)]
263struct StageProgressEvent<'a> {
264    schema_version: u8,
265    event: &'a str,
266    path: &'a str,
267    ms: u64,
268    entities: usize,
269    relationships: usize,
270}
271
272/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
273/// Phase B persists these to SQLite on the main thread in submission order.
274struct StagedFile {
275    body: String,
276    body_hash: String,
277    snippet: String,
278    name: String,
279    description: String,
280    embedding: Vec<f32>,
281    chunk_embeddings: Option<Vec<Vec<f32>>>,
282    chunks_info: Vec<crate::chunking::Chunk>,
283    entities: Vec<NewEntity>,
284    relationships: Vec<NewRelationship>,
285    entity_embeddings: Vec<Vec<f32>>,
286    urls: Vec<crate::extraction::ExtractedUrl>,
287}
288
289/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
290/// Never touches the database — safe to run on any rayon thread.
291fn stage_file(
292    _idx: usize,
293    path: &Path,
294    name: &str,
295    paths: &AppPaths,
296    enable_ner: bool,
297) -> Result<StagedFile, AppError> {
298    use crate::constants::*;
299
300    if name.len() > MAX_MEMORY_NAME_LEN {
301        return Err(AppError::LimitExceeded(
302            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
303        ));
304    }
305    if name.starts_with("__") {
306        return Err(AppError::Validation(
307            crate::i18n::validation::reserved_name(),
308        ));
309    }
310    {
311        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
312            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
313        if !slug_re.is_match(name) {
314            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
315                name,
316            )));
317        }
318    }
319
320    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
321    if raw_body.len() > MAX_MEMORY_BODY_LEN {
322        return Err(AppError::LimitExceeded(
323            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
324        ));
325    }
326    if raw_body.trim().is_empty() {
327        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
328    }
329
330    let description = format!("ingested from {}", path.display());
331    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
332        return Err(AppError::Validation(
333            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
334        ));
335    }
336
337    let mut extracted_entities: Vec<NewEntity> = Vec::new();
338    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
339    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
340    if enable_ner {
341        match crate::extraction::extract_graph_auto(&raw_body, paths) {
342            Ok(extracted) => {
343                extracted_urls = extracted.urls;
344                extracted_entities = extracted.entities;
345                extracted_relationships = extracted.relationships;
346
347                if extracted_entities.len() > max_entities_per_memory() {
348                    extracted_entities.truncate(max_entities_per_memory());
349                }
350                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
351                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
352                }
353            }
354            Err(e) => {
355                tracing::warn!(
356                    file = %path.display(),
357                    "auto-extraction failed (graceful degradation): {e:#}"
358                );
359            }
360        }
361    }
362
363    for rel in &mut extracted_relationships {
364        rel.relation = rel.relation.replace('-', "_");
365        if !is_valid_relation(&rel.relation) {
366            return Err(AppError::Validation(format!(
367                "invalid relation '{}' for relationship '{}' -> '{}'",
368                rel.relation, rel.source, rel.target
369            )));
370        }
371        if !(0.0..=1.0).contains(&rel.strength) {
372            return Err(AppError::Validation(format!(
373                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
374                rel.strength, rel.source, rel.target
375            )));
376        }
377    }
378
379    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
380    let snippet: String = raw_body.chars().take(200).collect();
381
382    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
383    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
384    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
385        return Err(AppError::LimitExceeded(format!(
386            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
387            chunks_info.len(),
388            REMEMBER_MAX_SAFE_MULTI_CHUNKS
389        )));
390    }
391
392    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
393    let embedding = if chunks_info.len() == 1 {
394        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
395    } else {
396        let chunk_texts: Vec<&str> = chunks_info
397            .iter()
398            .map(|c| chunking::chunk_text(&raw_body, c))
399            .collect();
400        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
401        for chunk_text in &chunk_texts {
402            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
403                &paths.models,
404                chunk_text,
405            )?);
406        }
407        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
408        chunk_embeddings_opt = Some(chunk_embeddings);
409        aggregated
410    };
411
412    let entity_embeddings = extracted_entities
413        .iter()
414        .map(|entity| {
415            let entity_text = match &entity.description {
416                Some(desc) => format!("{} {}", entity.name, desc),
417                None => entity.name.clone(),
418            };
419            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
420        })
421        .collect::<Result<Vec<_>, _>>()?;
422
423    Ok(StagedFile {
424        body: raw_body,
425        body_hash,
426        snippet,
427        name: name.to_string(),
428        description,
429        embedding,
430        chunk_embeddings: chunk_embeddings_opt,
431        chunks_info,
432        entities: extracted_entities,
433        relationships: extracted_relationships,
434        entity_embeddings,
435        urls: extracted_urls,
436    })
437}
438
439/// Phase B: persists one `StagedFile` to the database on the main thread.
440fn persist_staged(
441    conn: &mut Connection,
442    namespace: &str,
443    memory_type: &str,
444    staged: StagedFile,
445) -> Result<FileSuccess, AppError> {
446    {
447        let active_count: u32 = conn.query_row(
448            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
449            [],
450            |r| r.get::<_, i64>(0).map(|v| v as u32),
451        )?;
452        let ns_exists: bool = conn.query_row(
453            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
454            rusqlite::params![namespace],
455            |r| r.get::<_, i64>(0).map(|v| v > 0),
456        )?;
457        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
458            return Err(AppError::NamespaceError(format!(
459                "active namespace limit of {} exceeded while creating '{namespace}'",
460                crate::constants::MAX_NAMESPACES_ACTIVE
461            )));
462        }
463    }
464
465    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
466    if existing_memory.is_some() {
467        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
468            &staged.name,
469            namespace,
470        )));
471    }
472    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
473
474    let new_memory = NewMemory {
475        namespace: namespace.to_string(),
476        name: staged.name.clone(),
477        memory_type: memory_type.to_string(),
478        description: staged.description.clone(),
479        body: staged.body,
480        body_hash: staged.body_hash,
481        session_id: None,
482        source: "agent".to_string(),
483        metadata: serde_json::json!({}),
484    };
485
486    if let Some(hash_id) = duplicate_hash_id {
487        tracing::debug!(
488            target: "ingest",
489            duplicate_memory_id = hash_id,
490            "identical body already exists; persisting a new memory anyway"
491        );
492    }
493
494    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
495
496    let memory_id = memories::insert(&tx, &new_memory)?;
497    versions::insert_version(
498        &tx,
499        memory_id,
500        1,
501        &staged.name,
502        memory_type,
503        &staged.description,
504        &new_memory.body,
505        &serde_json::to_string(&new_memory.metadata)?,
506        None,
507        "create",
508    )?;
509    memories::upsert_vec(
510        &tx,
511        memory_id,
512        namespace,
513        memory_type,
514        &staged.embedding,
515        &staged.name,
516        &staged.snippet,
517    )?;
518
519    if staged.chunks_info.len() > 1 {
520        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
521        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
522            AppError::Internal(anyhow::anyhow!(
523                "missing chunk embeddings cache on multi-chunk ingest path"
524            ))
525        })?;
526        for (i, emb) in chunk_embeddings.iter().enumerate() {
527            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
528        }
529    }
530
531    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
532        for (idx, entity) in staged.entities.iter().enumerate() {
533            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
534            let entity_embedding = &staged.entity_embeddings[idx];
535            entities::upsert_entity_vec(
536                &tx,
537                entity_id,
538                namespace,
539                entity.entity_type,
540                entity_embedding,
541                &entity.name,
542            )?;
543            entities::link_memory_entity(&tx, memory_id, entity_id)?;
544            entities::increment_degree(&tx, entity_id)?;
545        }
546        let entity_types: std::collections::HashMap<&str, EntityType> = staged
547            .entities
548            .iter()
549            .map(|entity| (entity.name.as_str(), entity.entity_type))
550            .collect();
551        for rel in &staged.relationships {
552            let source_entity = NewEntity {
553                name: rel.source.clone(),
554                entity_type: entity_types
555                    .get(rel.source.as_str())
556                    .copied()
557                    .unwrap_or(EntityType::Concept),
558                description: None,
559            };
560            let target_entity = NewEntity {
561                name: rel.target.clone(),
562                entity_type: entity_types
563                    .get(rel.target.as_str())
564                    .copied()
565                    .unwrap_or(EntityType::Concept),
566                description: None,
567            };
568            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
569            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
570            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
571            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
572        }
573    }
574
575    tx.commit()?;
576
577    if !staged.urls.is_empty() {
578        let url_entries: Vec<storage_urls::MemoryUrl> = staged
579            .urls
580            .into_iter()
581            .map(|u| storage_urls::MemoryUrl {
582                url: u.url,
583                offset: Some(u.offset as i64),
584            })
585            .collect();
586        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
587    }
588
589    Ok(FileSuccess {
590        memory_id,
591        action: "created".to_string(),
592    })
593}
594
595pub fn run(args: IngestArgs) -> Result<(), AppError> {
596    let started = std::time::Instant::now();
597
598    if !args.dir.exists() {
599        return Err(AppError::NotFound(format!(
600            "directory not found: {}",
601            args.dir.display()
602        )));
603    }
604    if !args.dir.is_dir() {
605        return Err(AppError::Validation(format!(
606            "path is not a directory: {}",
607            args.dir.display()
608        )));
609    }
610
611    let mut files: Vec<PathBuf> = Vec::new();
612    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
613    files.sort();
614
615    if files.len() > args.max_files {
616        return Err(AppError::Validation(format!(
617            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
618            files.len(),
619            args.max_files
620        )));
621    }
622
623    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
624    let memory_type_str = args.r#type.as_str().to_string();
625
626    let paths = AppPaths::resolve(args.db.as_deref())?;
627    let mut conn_or_err = match init_storage(&paths) {
628        Ok(c) => Ok(c),
629        Err(e) => Err(format!("{e}")),
630    };
631
632    let mut succeeded: usize = 0;
633    let mut failed: usize = 0;
634    let mut skipped: usize = 0;
635    let total = files.len();
636
637    // Pre-resolve all names before parallelisation so Phase A workers see a
638    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
639    let mut taken_names: BTreeSet<String> = BTreeSet::new();
640
641    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
642    // ProcessItem: the data moved into the producer thread for Phase A computation.
643    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
644    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
645    // boundary into the rayon producer.
646    enum SlotMeta {
647        Skip {
648            file_str: String,
649            derived_base: String,
650            name_truncated: bool,
651            original_name: Option<String>,
652            reason: String,
653        },
654        Process {
655            file_str: String,
656            derived_name: String,
657            name_truncated: bool,
658            original_name: Option<String>,
659        },
660    }
661
662    struct ProcessItem {
663        idx: usize,
664        path: PathBuf,
665        file_str: String,
666        derived_name: String,
667    }
668
669    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
670    let mut process_items: Vec<ProcessItem> = Vec::new();
671    let mut truncations: Vec<(String, String)> = Vec::new();
672
673    for path in &files {
674        let file_str = path.to_string_lossy().into_owned();
675        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
676
677        if name_truncated {
678            if let Some(ref orig) = original_name {
679                truncations.push((orig.clone(), derived_base.clone()));
680            }
681        }
682
683        if derived_base.is_empty() {
684            slots_meta.push(SlotMeta::Skip {
685                file_str,
686                derived_base: String::new(),
687                name_truncated: false,
688                original_name: None,
689                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
690            });
691            continue;
692        }
693
694        match unique_name(&derived_base, &taken_names) {
695            Ok(derived_name) => {
696                taken_names.insert(derived_name.clone());
697                let idx = slots_meta.len();
698                process_items.push(ProcessItem {
699                    idx,
700                    path: path.clone(),
701                    file_str: file_str.clone(),
702                    derived_name: derived_name.clone(),
703                });
704                slots_meta.push(SlotMeta::Process {
705                    file_str,
706                    derived_name,
707                    name_truncated,
708                    original_name,
709                });
710            }
711            Err(e) => {
712                slots_meta.push(SlotMeta::Skip {
713                    file_str,
714                    derived_base,
715                    name_truncated,
716                    original_name,
717                    reason: e.to_string(),
718                });
719            }
720        }
721    }
722
723    if !truncations.is_empty() {
724        tracing::info!(
725            target: "ingest",
726            count = truncations.len(),
727            max_len = DERIVED_NAME_MAX_LEN,
728            "derived names truncated; pass -vv (debug) for per-file detail"
729        );
730    }
731
732    // Determine rayon thread pool size, honoring --low-memory and the
733    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
734    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
735
736    let pool = rayon::ThreadPoolBuilder::new()
737        .num_threads(parallelism)
738        .build()
739        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
740
741    let enable_ner = args.enable_ner;
742
743    let total_to_process = process_items.len();
744    tracing::info!(
745        target = "ingest",
746        phase = "pipeline_start",
747        files = total_to_process,
748        ingest_parallelism = parallelism,
749        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
750    );
751
752    // Bounded channel: producer never gets more than parallelism*2 items ahead of
753    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
754    // Each message carries the slot index so Phase B can look up SlotMeta in order.
755    let channel_bound = (parallelism * 2).max(1);
756    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
757
758    // Phase A: launched in a dedicated OS thread so the main thread can consume
759    // the channel concurrently. pool.install() blocks the calling thread until
760    // all rayon workers finish — if called on the main thread it would
761    // reintroduce the 2-phase blocking behaviour we are eliminating.
762    let paths_owned = paths.clone();
763    let producer_handle = std::thread::spawn(move || {
764        pool.install(|| {
765            process_items.into_par_iter().for_each(|item| {
766                let t0 = std::time::Instant::now();
767                let result = stage_file(
768                    item.idx,
769                    &item.path,
770                    &item.derived_name,
771                    &paths_owned,
772                    enable_ner,
773                );
774                let elapsed_ms = t0.elapsed().as_millis() as u64;
775
776                // Emit NDJSON progress event to stderr so the user sees work
777                // happening during long NER runs (e.g. 50 files × 27s each).
778                let (n_entities, n_relationships) = match &result {
779                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
780                    Err(_) => (0, 0),
781                };
782                let progress = StageProgressEvent {
783                    schema_version: 1,
784                    event: "file_extracted",
785                    path: &item.file_str,
786                    ms: elapsed_ms,
787                    entities: n_entities,
788                    relationships: n_relationships,
789                };
790                if let Ok(line) = serde_json::to_string(&progress) {
791                    eprintln!("{line}");
792                }
793
794                // Blocking send applies backpressure: if Phase B is slower,
795                // Phase A workers wait here instead of accumulating staged files
796                // in memory. If the receiver is dropped (fail_fast abort), ignore.
797                let _ = tx.send((item.idx, result));
798            });
799            // Explicit drop of tx signals Phase B (rx iteration) to stop.
800            drop(tx);
801        });
802    });
803
804    // Phase B: main thread persists files as results arrive from the channel.
805    // Results arrive in completion order (par_iter is unordered). We persist
806    // each file immediately on arrival — this is the key fix for B1: with the
807    // old 2-phase design the first DB write happened only after ALL files had
808    // finished Phase A. Now the first commit happens as soon as the first file
809    // completes Phase A, regardless of how many files remain.
810    //
811    // NDJSON output order follows completion order (not file-system sort order).
812    // Skip slots are emitted at the end, after all Process results are consumed.
813    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
814    // requirement than ensuring data is persisted before the user's timeout fires.
815    let fail_fast = args.fail_fast;
816
817    // Emit pending Skip events first so agents see them early.
818    for meta in &slots_meta {
819        if let SlotMeta::Skip {
820            file_str,
821            derived_base,
822            name_truncated,
823            original_name,
824            reason,
825        } = meta
826        {
827            output::emit_json_compact(&IngestFileEvent {
828                file: file_str,
829                name: derived_base,
830                status: "skipped",
831                truncated: *name_truncated,
832                original_name: original_name.clone(),
833                error: Some(reason.clone()),
834                memory_id: None,
835                action: None,
836            })?;
837            skipped += 1;
838        }
839    }
840
841    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
842    // as channel messages arrive in completion order.
843    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
844        .iter()
845        .enumerate()
846        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
847        .collect();
848
849    tracing::info!(
850        target = "ingest",
851        phase = "persist_start",
852        files = total_to_process,
853        "phase B starting: persisting files incrementally as Phase A completes each one",
854    );
855
856    // Drain channel and persist each file immediately — no accumulation into a
857    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
858    // Phase B without applying backpressure.
859    for (idx, stage_result) in rx {
860        let meta = meta_index.get(&idx).ok_or_else(|| {
861            AppError::Internal(anyhow::anyhow!(
862                "channel idx {idx} has no corresponding Process slot"
863            ))
864        })?;
865        let (file_str, derived_name, name_truncated, original_name) = match meta {
866            SlotMeta::Process {
867                file_str,
868                derived_name,
869                name_truncated,
870                original_name,
871            } => (file_str, derived_name, name_truncated, original_name),
872            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
873        };
874
875        // If storage init failed, every file fails with the same error.
876        let conn = match conn_or_err.as_mut() {
877            Ok(c) => c,
878            Err(err_msg) => {
879                let err_clone = err_msg.clone();
880                output::emit_json_compact(&IngestFileEvent {
881                    file: file_str,
882                    name: derived_name,
883                    status: "failed",
884                    truncated: *name_truncated,
885                    original_name: original_name.clone(),
886                    error: Some(err_clone.clone()),
887                    memory_id: None,
888                    action: None,
889                })?;
890                failed += 1;
891                if fail_fast {
892                    output::emit_json_compact(&IngestSummary {
893                        summary: true,
894                        dir: args.dir.display().to_string(),
895                        pattern: args.pattern.clone(),
896                        recursive: args.recursive,
897                        files_total: total,
898                        files_succeeded: succeeded,
899                        files_failed: failed,
900                        files_skipped: skipped,
901                        elapsed_ms: started.elapsed().as_millis() as u64,
902                    })?;
903                    return Err(AppError::Validation(format!(
904                        "ingest aborted on first failure: {err_clone}"
905                    )));
906                }
907                continue;
908            }
909        };
910
911        let outcome =
912            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
913
914        match outcome {
915            Ok(FileSuccess { memory_id, action }) => {
916                output::emit_json_compact(&IngestFileEvent {
917                    file: file_str,
918                    name: derived_name,
919                    status: "indexed",
920                    truncated: *name_truncated,
921                    original_name: original_name.clone(),
922                    error: None,
923                    memory_id: Some(memory_id),
924                    action: Some(action),
925                })?;
926                succeeded += 1;
927            }
928            Err(e) => {
929                let err_msg = format!("{e}");
930                output::emit_json_compact(&IngestFileEvent {
931                    file: file_str,
932                    name: derived_name,
933                    status: "failed",
934                    truncated: *name_truncated,
935                    original_name: original_name.clone(),
936                    error: Some(err_msg.clone()),
937                    memory_id: None,
938                    action: None,
939                })?;
940                failed += 1;
941                if fail_fast {
942                    output::emit_json_compact(&IngestSummary {
943                        summary: true,
944                        dir: args.dir.display().to_string(),
945                        pattern: args.pattern.clone(),
946                        recursive: args.recursive,
947                        files_total: total,
948                        files_succeeded: succeeded,
949                        files_failed: failed,
950                        files_skipped: skipped,
951                        elapsed_ms: started.elapsed().as_millis() as u64,
952                    })?;
953                    return Err(AppError::Validation(format!(
954                        "ingest aborted on first failure: {err_msg}"
955                    )));
956                }
957            }
958        }
959    }
960
961    // Wait for the producer thread to finish cleanly.
962    producer_handle
963        .join()
964        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
965
966    output::emit_json_compact(&IngestSummary {
967        summary: true,
968        dir: args.dir.display().to_string(),
969        pattern: args.pattern.clone(),
970        recursive: args.recursive,
971        files_total: total,
972        files_succeeded: succeeded,
973        files_failed: failed,
974        files_skipped: skipped,
975        elapsed_ms: started.elapsed().as_millis() as u64,
976    })?;
977
978    Ok(())
979}
980
981/// Auto-initialises the database (matches the contract of every other CRUD
982/// handler) and returns a fresh read/write connection ready for the ingest
983/// loop. Errors here are recoverable per-file: the caller surfaces them as
984/// failure events so `--fail-fast` and the continue-on-error path keep
985/// working when, for example, the user points `--db` at an unwritable path.
986fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
987    ensure_db_ready(paths)?;
988    let conn = open_rw(&paths.db)?;
989    Ok(conn)
990}
991
992fn is_valid_relation(relation: &str) -> bool {
993    matches!(
994        relation,
995        "applies_to"
996            | "uses"
997            | "depends_on"
998            | "causes"
999            | "fixes"
1000            | "contradicts"
1001            | "supports"
1002            | "follows"
1003            | "related"
1004            | "mentions"
1005            | "replaces"
1006            | "tracked_in"
1007    )
1008}
1009
1010fn collect_files(
1011    dir: &Path,
1012    pattern: &str,
1013    recursive: bool,
1014    out: &mut Vec<PathBuf>,
1015) -> Result<(), AppError> {
1016    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1017    for entry in entries {
1018        let entry = entry.map_err(AppError::Io)?;
1019        let path = entry.path();
1020        let file_type = entry.file_type().map_err(AppError::Io)?;
1021        if file_type.is_file() {
1022            let name = entry.file_name();
1023            let name_str = name.to_string_lossy();
1024            if matches_pattern(&name_str, pattern) {
1025                out.push(path);
1026            }
1027        } else if file_type.is_dir() && recursive {
1028            collect_files(&path, pattern, recursive, out)?;
1029        }
1030    }
1031    Ok(())
1032}
1033
1034fn matches_pattern(name: &str, pattern: &str) -> bool {
1035    if let Some(suffix) = pattern.strip_prefix('*') {
1036        name.ends_with(suffix)
1037    } else if let Some(prefix) = pattern.strip_suffix('*') {
1038        name.starts_with(prefix)
1039    } else {
1040        name == pattern
1041    }
1042}
1043
1044/// Returns `(final_name, truncated, original_name)`.
1045/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1046/// `original_name` holds the pre-truncation name only when `truncated=true`.
1047///
1048/// Non-ASCII characters are first decomposed via NFD and then stripped of
1049/// combining marks so accented letters fold to their base ASCII letter
1050/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1051/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1052/// preserves meaningful word content rather than collapsing the basename
1053/// to a few stray ASCII letters as the previous filter did.
1054fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1055    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1056    let lowered: String = stem
1057        .nfd()
1058        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1059        .map(|c| {
1060            if c == '_' || c.is_whitespace() {
1061                '-'
1062            } else {
1063                c
1064            }
1065        })
1066        .map(|c| c.to_ascii_lowercase())
1067        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1068        .collect();
1069    let collapsed = collapse_dashes(&lowered);
1070    let trimmed = collapsed.trim_matches('-').to_string();
1071    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1072        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1073            .trim_matches('-')
1074            .to_string();
1075        tracing::debug!(
1076            target: "ingest",
1077            original = %trimmed,
1078            truncated_to = %truncated,
1079            max_len = DERIVED_NAME_MAX_LEN,
1080            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1081        );
1082        (truncated, true, Some(trimmed))
1083    } else {
1084        (trimmed, false, None)
1085    }
1086}
1087
1088/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1089/// numeric suffix (`-1`, `-2`, …) when needed.
1090///
1091/// `taken` is the set of names already consumed in the current ingest run.
1092/// The caller is expected to insert the returned name into `taken` so the
1093/// next call observes the consumption. Cross-run collisions are intentionally
1094/// surfaced by the per-file persistence path as duplicates so re-ingestion
1095/// of identical corpora stays idempotent.
1096///
1097/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1098/// candidates collide, signalling a pathological corpus that should be
1099/// renamed manually.
1100fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1101    if !taken.contains(base) {
1102        return Ok(base.to_string());
1103    }
1104    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1105        let candidate = format!("{base}-{suffix}");
1106        if !taken.contains(&candidate) {
1107            tracing::warn!(
1108                target: "ingest",
1109                base = %base,
1110                resolved = %candidate,
1111                suffix,
1112                "memory name collision resolved with numeric suffix"
1113            );
1114            return Ok(candidate);
1115        }
1116    }
1117    Err(AppError::Validation(format!(
1118        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1119    )))
1120}
1121
1122fn collapse_dashes(s: &str) -> String {
1123    let mut out = String::with_capacity(s.len());
1124    let mut prev_dash = false;
1125    for c in s.chars() {
1126        if c == '-' {
1127            if !prev_dash {
1128                out.push('-');
1129            }
1130            prev_dash = true;
1131        } else {
1132            out.push(c);
1133            prev_dash = false;
1134        }
1135    }
1136    out
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use super::*;
1142    use std::path::PathBuf;
1143
1144    #[test]
1145    fn matches_pattern_suffix() {
1146        assert!(matches_pattern("foo.md", "*.md"));
1147        assert!(!matches_pattern("foo.txt", "*.md"));
1148        assert!(matches_pattern("foo.md", "*"));
1149    }
1150
1151    #[test]
1152    fn matches_pattern_prefix() {
1153        assert!(matches_pattern("README.md", "README*"));
1154        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1155    }
1156
1157    #[test]
1158    fn matches_pattern_exact() {
1159        assert!(matches_pattern("README.md", "README.md"));
1160        assert!(!matches_pattern("readme.md", "README.md"));
1161    }
1162
1163    #[test]
1164    fn derive_kebab_underscore_to_dash() {
1165        let p = PathBuf::from("/tmp/claude_code_headless.md");
1166        let (name, truncated, original) = derive_kebab_name(&p);
1167        assert_eq!(name, "claude-code-headless");
1168        assert!(!truncated);
1169        assert!(original.is_none());
1170    }
1171
1172    #[test]
1173    fn derive_kebab_uppercase_lowered() {
1174        let p = PathBuf::from("/tmp/README.md");
1175        let (name, truncated, original) = derive_kebab_name(&p);
1176        assert_eq!(name, "readme");
1177        assert!(!truncated);
1178        assert!(original.is_none());
1179    }
1180
1181    #[test]
1182    fn derive_kebab_strips_non_kebab_chars() {
1183        let p = PathBuf::from("/tmp/some@weird#name!.md");
1184        let (name, truncated, original) = derive_kebab_name(&p);
1185        assert_eq!(name, "someweirdname");
1186        assert!(!truncated);
1187        assert!(original.is_none());
1188    }
1189
1190    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1191    // accented characters instead of dropping them entirely.
1192    #[test]
1193    fn derive_kebab_folds_accented_letters_to_ascii() {
1194        let p = PathBuf::from("/tmp/açaí.md");
1195        let (name, _, _) = derive_kebab_name(&p);
1196        assert_eq!(name, "acai", "got '{name}'");
1197    }
1198
1199    #[test]
1200    fn derive_kebab_handles_naive_with_diaeresis() {
1201        let p = PathBuf::from("/tmp/naïve-test.md");
1202        let (name, _, _) = derive_kebab_name(&p);
1203        assert_eq!(name, "naive-test", "got '{name}'");
1204    }
1205
1206    #[test]
1207    fn derive_kebab_drops_emoji_keeps_word() {
1208        let p = PathBuf::from("/tmp/🚀-rocket.md");
1209        let (name, _, _) = derive_kebab_name(&p);
1210        assert_eq!(name, "rocket", "got '{name}'");
1211    }
1212
1213    #[test]
1214    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1215        let p = PathBuf::from("/tmp/açaí🦜.md");
1216        let (name, _, _) = derive_kebab_name(&p);
1217        assert_eq!(name, "acai", "got '{name}'");
1218    }
1219
1220    #[test]
1221    fn derive_kebab_pure_emoji_yields_empty() {
1222        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1223        let (name, _, _) = derive_kebab_name(&p);
1224        assert!(name.is_empty(), "got '{name}'");
1225    }
1226
1227    #[test]
1228    fn derive_kebab_collapses_consecutive_dashes() {
1229        let p = PathBuf::from("/tmp/a__b___c.md");
1230        let (name, truncated, original) = derive_kebab_name(&p);
1231        assert_eq!(name, "a-b-c");
1232        assert!(!truncated);
1233        assert!(original.is_none());
1234    }
1235
1236    #[test]
1237    fn derive_kebab_truncates_to_60_chars() {
1238        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1239        let (name, truncated, original) = derive_kebab_name(&p);
1240        assert!(name.len() <= 60, "got len {}", name.len());
1241        assert!(truncated);
1242        assert!(original.is_some());
1243        assert!(original.unwrap().len() > 60);
1244    }
1245
1246    #[test]
1247    fn collect_files_finds_md_files() {
1248        let tmp = tempfile::tempdir().expect("tempdir");
1249        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1250        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1251        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1252        let mut out = Vec::new();
1253        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1254        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1255    }
1256
1257    #[test]
1258    fn collect_files_recursive_descends_subdirs() {
1259        let tmp = tempfile::tempdir().expect("tempdir");
1260        let sub = tmp.path().join("sub");
1261        std::fs::create_dir(&sub).unwrap();
1262        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1263        std::fs::write(sub.join("b.md"), "y").unwrap();
1264        let mut out = Vec::new();
1265        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1266        assert_eq!(out.len(), 2);
1267    }
1268
1269    #[test]
1270    fn collect_files_non_recursive_skips_subdirs() {
1271        let tmp = tempfile::tempdir().expect("tempdir");
1272        let sub = tmp.path().join("sub");
1273        std::fs::create_dir(&sub).unwrap();
1274        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1275        std::fs::write(sub.join("b.md"), "y").unwrap();
1276        let mut out = Vec::new();
1277        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1278        assert_eq!(out.len(), 1);
1279    }
1280
1281    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1282
1283    #[test]
1284    fn derive_kebab_long_basename_truncated_within_cap() {
1285        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1286        let (name, truncated, original) = derive_kebab_name(&p);
1287        assert!(
1288            name.len() <= DERIVED_NAME_MAX_LEN,
1289            "truncated name must respect cap; got {} chars",
1290            name.len()
1291        );
1292        assert!(!name.is_empty());
1293        assert!(truncated);
1294        assert!(original.is_some());
1295    }
1296
1297    #[test]
1298    fn unique_name_returns_base_when_free() {
1299        let taken: BTreeSet<String> = BTreeSet::new();
1300        let resolved = unique_name("note", &taken).expect("must resolve");
1301        assert_eq!(resolved, "note");
1302    }
1303
1304    #[test]
1305    fn unique_name_appends_first_free_suffix_on_collision() {
1306        let mut taken: BTreeSet<String> = BTreeSet::new();
1307        taken.insert("note".to_string());
1308        taken.insert("note-1".to_string());
1309        let resolved = unique_name("note", &taken).expect("must resolve");
1310        assert_eq!(resolved, "note-2");
1311    }
1312
1313    #[test]
1314    fn unique_name_errors_after_collision_cap() {
1315        let mut taken: BTreeSet<String> = BTreeSet::new();
1316        taken.insert("note".to_string());
1317        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1318            taken.insert(format!("note-{i}"));
1319        }
1320        let err = unique_name("note", &taken).expect_err("must surface error");
1321        assert!(matches!(err, AppError::Validation(_)));
1322    }
1323
1324    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1325
1326    #[test]
1327    fn is_valid_relation_accepts_canonical_relations() {
1328        assert!(is_valid_relation("applies_to"));
1329        assert!(is_valid_relation("depends_on"));
1330        assert!(!is_valid_relation("foo_bar"));
1331    }
1332
1333    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1334
1335    use serial_test::serial;
1336
1337    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1338    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1339        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1340        let prev = std::env::var(key).ok();
1341        match value {
1342            Some(v) => std::env::set_var(key, v),
1343            None => std::env::remove_var(key),
1344        }
1345        f();
1346        match prev {
1347            Some(p) => std::env::set_var(key, p),
1348            None => std::env::remove_var(key),
1349        }
1350    }
1351
1352    #[test]
1353    #[serial]
1354    fn env_low_memory_enabled_unset_returns_false() {
1355        with_env_var(None, || assert!(!env_low_memory_enabled()));
1356    }
1357
1358    #[test]
1359    #[serial]
1360    fn env_low_memory_enabled_empty_returns_false() {
1361        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1362    }
1363
1364    #[test]
1365    #[serial]
1366    fn env_low_memory_enabled_truthy_values_return_true() {
1367        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1368            with_env_var(Some(v), || {
1369                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1370            });
1371        }
1372    }
1373
1374    #[test]
1375    #[serial]
1376    fn env_low_memory_enabled_falsy_values_return_false() {
1377        for v in ["0", "false", "FALSE", "no", "off"] {
1378            with_env_var(Some(v), || {
1379                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1380            });
1381        }
1382    }
1383
1384    #[test]
1385    #[serial]
1386    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1387        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1388    }
1389
1390    #[test]
1391    #[serial]
1392    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1393        with_env_var(None, || {
1394            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1395            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1396            assert_eq!(resolve_parallelism(true, None), 1);
1397        });
1398    }
1399
1400    #[test]
1401    #[serial]
1402    fn resolve_parallelism_env_forces_one_when_flag_off() {
1403        with_env_var(Some("1"), || {
1404            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1405            assert_eq!(resolve_parallelism(false, None), 1);
1406        });
1407    }
1408
1409    #[test]
1410    #[serial]
1411    fn resolve_parallelism_falsy_env_does_not_override() {
1412        with_env_var(Some("0"), || {
1413            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1414        });
1415    }
1416
1417    #[test]
1418    #[serial]
1419    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1420        with_env_var(None, || {
1421            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1422            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1423        });
1424    }
1425
1426    #[test]
1427    #[serial]
1428    fn resolve_parallelism_default_when_unset() {
1429        with_env_var(None, || {
1430            let p = resolve_parallelism(false, None);
1431            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1432        });
1433    }
1434
1435    #[test]
1436    fn ingest_args_parses_low_memory_flag_via_clap() {
1437        use clap::Parser;
1438        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1439        // on the public `Cli` definition so the flag is wired end-to-end.
1440        let cli = crate::cli::Cli::try_parse_from([
1441            "sqlite-graphrag",
1442            "ingest",
1443            "/tmp/dummy",
1444            "--type",
1445            "document",
1446            "--low-memory",
1447        ])
1448        .expect("parse must succeed");
1449        match cli.command {
1450            crate::cli::Commands::Ingest(args) => {
1451                assert!(args.low_memory, "--low-memory must set field to true");
1452            }
1453            _ => panic!("expected Ingest subcommand"),
1454        }
1455    }
1456
1457    #[test]
1458    fn ingest_args_low_memory_defaults_false() {
1459        use clap::Parser;
1460        let cli = crate::cli::Cli::try_parse_from([
1461            "sqlite-graphrag",
1462            "ingest",
1463            "/tmp/dummy",
1464            "--type",
1465            "document",
1466        ])
1467        .expect("parse must succeed");
1468        match cli.command {
1469            crate::cli::Commands::Ingest(args) => {
1470                assert!(!args.low_memory, "default must be false");
1471            }
1472            _ => panic!("expected Ingest subcommand"),
1473        }
1474    }
1475}