Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68NOTES:\n  \
69    Each file becomes a separate memory. Names derive from file basenames\n  \
70    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
71    followed by a final summary line with counts. Per-file errors are reported\n  \
72    inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74    /// Directory containing files to ingest.
75    #[arg(
76        value_name = "DIR",
77        help = "Directory to ingest recursively (each matching file becomes a memory)"
78    )]
79    pub dir: PathBuf,
80
81    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
82    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83    pub r#type: MemoryType,
84
85    /// Glob pattern matched against file basenames (default: `*.md`). Supports
86    /// `*.<ext>`, `<prefix>*`, and exact filename match.
87    #[arg(long, default_value = "*.md")]
88    pub pattern: String,
89
90    /// Recurse into subdirectories.
91    #[arg(long, default_value_t = false)]
92    pub recursive: bool,
93
94    #[arg(
95        long,
96        env = "SQLITE_GRAPHRAG_ENABLE_NER",
97        value_parser = crate::parsers::parse_bool_flexible,
98        action = clap::ArgAction::Set,
99        num_args = 0..=1,
100        default_missing_value = "true",
101        default_value = "false",
102        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
103    )]
104    pub enable_ner: bool,
105    #[arg(
106        long,
107        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
108        default_value = "fp32",
109        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
110    )]
111    pub gliner_variant: String,
112
113    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
114    #[arg(long, default_value_t = false, hide = true)]
115    pub skip_extraction: bool,
116
117    /// Stop on first per-file error instead of continuing with the next file.
118    #[arg(long, default_value_t = false)]
119    pub fail_fast: bool,
120
121    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
122    #[arg(long, default_value_t = 10_000)]
123    pub max_files: usize,
124
125    /// Namespace for the ingested memories.
126    #[arg(long)]
127    pub namespace: Option<String>,
128
129    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
130    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
131    pub db: Option<String>,
132
133    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
134    pub format: JsonOutputFormat,
135
136    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
137    pub json: bool,
138
139    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
140    #[arg(
141        long,
142        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
143    )]
144    pub ingest_parallelism: Option<usize>,
145
146    /// Force single-threaded ingest to reduce RSS pressure.
147    ///
148    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
149    /// explicit value. Recommended for environments with <4 GB available
150    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
151    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
152    /// (CLI flag has higher precedence than the env var).
153    #[arg(
154        long,
155        default_value_t = false,
156        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
157                Recommended for environments with <4 GB available RAM or container/cgroup \
158                constraints. Trade-off: 3-4x longer wall time. Also honored via \
159                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
160    )]
161    pub low_memory: bool,
162
163    /// Maximum process RSS in MiB; abort if exceeded during embedding.
164    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
165          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
166    pub max_rss_mb: u64,
167}
168
169/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
170/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
171/// values evaluate to false. Unrecognized non-empty values emit a
172/// `tracing::warn!` and evaluate to false.
173fn env_low_memory_enabled() -> bool {
174    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
175        Ok(v) if v.is_empty() => false,
176        Ok(v) => match v.to_lowercase().as_str() {
177            "1" | "true" | "yes" | "on" => true,
178            "0" | "false" | "no" | "off" => false,
179            other => {
180                tracing::warn!(
181                    target: "ingest",
182                    value = %other,
183                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
184                );
185                false
186            }
187        },
188        Err(_) => false,
189    }
190}
191
192/// Resolves the effective ingest parallelism honoring `--low-memory` and the
193/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
194///
195/// Precedence:
196/// 1. `--low-memory` CLI flag forces parallelism = 1.
197/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
198/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
199/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
200///
201/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
202/// emits a `tracing::warn!` advertising the override.
203fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
204    let env_flag = env_low_memory_enabled();
205    let low_memory = low_memory_flag || env_flag;
206
207    if low_memory {
208        if let Some(n) = ingest_parallelism {
209            if n > 1 {
210                tracing::warn!(
211                    target: "ingest",
212                    requested = n,
213                    "--ingest-parallelism overridden by --low-memory; using 1"
214                );
215            }
216        }
217        if low_memory_flag {
218            tracing::info!(
219                target: "ingest",
220                source = "flag",
221                "low-memory mode enabled: forcing --ingest-parallelism 1"
222            );
223        } else {
224            tracing::info!(
225                target: "ingest",
226                source = "env",
227                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
228            );
229        }
230        return 1;
231    }
232
233    ingest_parallelism
234        .unwrap_or_else(|| {
235            std::thread::available_parallelism()
236                .map(|v| v.get() / 2)
237                .unwrap_or(1)
238                .clamp(1, 4)
239        })
240        .max(1)
241}
242
243#[derive(Serialize)]
244struct IngestFileEvent<'a> {
245    file: &'a str,
246    name: &'a str,
247    status: &'a str,
248    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
249    truncated: bool,
250    /// Original derived name before truncation; only present when `truncated=true`.
251    #[serde(skip_serializing_if = "Option::is_none")]
252    original_name: Option<String>,
253    #[serde(skip_serializing_if = "Option::is_none")]
254    error: Option<String>,
255    #[serde(skip_serializing_if = "Option::is_none")]
256    memory_id: Option<i64>,
257    #[serde(skip_serializing_if = "Option::is_none")]
258    action: Option<String>,
259}
260
261#[derive(Serialize)]
262struct IngestSummary {
263    summary: bool,
264    dir: String,
265    pattern: String,
266    recursive: bool,
267    files_total: usize,
268    files_succeeded: usize,
269    files_failed: usize,
270    files_skipped: usize,
271    elapsed_ms: u64,
272}
273
274/// Outcome of a successful per-file ingest, used to build the NDJSON event.
275struct FileSuccess {
276    memory_id: i64,
277    action: String,
278}
279
280/// NDJSON progress event emitted to stderr after each file completes Phase A.
281/// Schema version 1; consumers should check `schema_version` before parsing.
282#[derive(Serialize)]
283struct StageProgressEvent<'a> {
284    schema_version: u8,
285    event: &'a str,
286    path: &'a str,
287    ms: u64,
288    entities: usize,
289    relationships: usize,
290}
291
292/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
293/// Phase B persists these to SQLite on the main thread in submission order.
294struct StagedFile {
295    body: String,
296    body_hash: String,
297    snippet: String,
298    name: String,
299    description: String,
300    embedding: Vec<f32>,
301    chunk_embeddings: Option<Vec<Vec<f32>>>,
302    chunks_info: Vec<crate::chunking::Chunk>,
303    entities: Vec<NewEntity>,
304    relationships: Vec<NewRelationship>,
305    entity_embeddings: Vec<Vec<f32>>,
306    urls: Vec<crate::extraction::ExtractedUrl>,
307}
308
309/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
310/// Never touches the database — safe to run on any rayon thread.
311fn stage_file(
312    _idx: usize,
313    path: &Path,
314    name: &str,
315    paths: &AppPaths,
316    enable_ner: bool,
317    gliner_variant: crate::extraction::GlinerVariant,
318    max_rss_mb: u64,
319) -> Result<StagedFile, AppError> {
320    use crate::constants::*;
321
322    if name.len() > MAX_MEMORY_NAME_LEN {
323        return Err(AppError::LimitExceeded(
324            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
325        ));
326    }
327    if name.starts_with("__") {
328        return Err(AppError::Validation(
329            crate::i18n::validation::reserved_name(),
330        ));
331    }
332    {
333        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
334            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
335        if !slug_re.is_match(name) {
336            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
337                name,
338            )));
339        }
340    }
341
342    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
343    if raw_body.len() > MAX_MEMORY_BODY_LEN {
344        return Err(AppError::LimitExceeded(
345            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
346        ));
347    }
348    if raw_body.trim().is_empty() {
349        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
350    }
351
352    let description = format!("ingested from {}", path.display());
353    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
354        return Err(AppError::Validation(
355            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
356        ));
357    }
358
359    let mut extracted_entities: Vec<NewEntity> = Vec::new();
360    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
361    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
362    if enable_ner {
363        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
364            Ok(extracted) => {
365                extracted_urls = extracted.urls;
366                extracted_entities = extracted.entities;
367                extracted_relationships = extracted.relationships;
368
369                if extracted_entities.len() > max_entities_per_memory() {
370                    extracted_entities.truncate(max_entities_per_memory());
371                }
372                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
373                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
374                }
375            }
376            Err(e) => {
377                tracing::warn!(
378                    file = %path.display(),
379                    "auto-extraction failed (graceful degradation): {e:#}"
380                );
381            }
382        }
383    }
384
385    for rel in &mut extracted_relationships {
386        rel.relation = crate::parsers::normalize_relation(&rel.relation);
387        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
388            return Err(AppError::Validation(format!(
389                "{e} for relationship '{}' -> '{}'",
390                rel.source, rel.target
391            )));
392        }
393        crate::parsers::warn_if_non_canonical(&rel.relation);
394        if !(0.0..=1.0).contains(&rel.strength) {
395            return Err(AppError::Validation(format!(
396                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
397                rel.strength, rel.source, rel.target
398            )));
399        }
400    }
401
402    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
403    let snippet: String = raw_body.chars().take(200).collect();
404
405    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
406    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
407    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
408        return Err(AppError::LimitExceeded(format!(
409            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
410            chunks_info.len(),
411            REMEMBER_MAX_SAFE_MULTI_CHUNKS
412        )));
413    }
414
415    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
416    let embedding = if chunks_info.len() == 1 {
417        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
418    } else {
419        let chunk_texts: Vec<&str> = chunks_info
420            .iter()
421            .map(|c| chunking::chunk_text(&raw_body, c))
422            .collect();
423        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
424        for chunk_text in &chunk_texts {
425            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
426                if rss > max_rss_mb {
427                    tracing::error!(
428                        rss_mb = rss,
429                        max_rss_mb = max_rss_mb,
430                        file = %path.display(),
431                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
432                    );
433                    return Err(AppError::LowMemory {
434                        available_mb: crate::memory_guard::available_memory_mb(),
435                        required_mb: max_rss_mb,
436                    });
437                }
438            }
439            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
440                &paths.models,
441                chunk_text,
442            )?);
443        }
444        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
445        chunk_embeddings_opt = Some(chunk_embeddings);
446        aggregated
447    };
448
449    let entity_embeddings = extracted_entities
450        .iter()
451        .map(|entity| {
452            let entity_text = match &entity.description {
453                Some(desc) => format!("{} {}", entity.name, desc),
454                None => entity.name.clone(),
455            };
456            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
457        })
458        .collect::<Result<Vec<_>, _>>()?;
459
460    Ok(StagedFile {
461        body: raw_body,
462        body_hash,
463        snippet,
464        name: name.to_string(),
465        description,
466        embedding,
467        chunk_embeddings: chunk_embeddings_opt,
468        chunks_info,
469        entities: extracted_entities,
470        relationships: extracted_relationships,
471        entity_embeddings,
472        urls: extracted_urls,
473    })
474}
475
476/// Phase B: persists one `StagedFile` to the database on the main thread.
477fn persist_staged(
478    conn: &mut Connection,
479    namespace: &str,
480    memory_type: &str,
481    staged: StagedFile,
482) -> Result<FileSuccess, AppError> {
483    {
484        let active_count: u32 = conn.query_row(
485            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
486            [],
487            |r| r.get::<_, i64>(0).map(|v| v as u32),
488        )?;
489        let ns_exists: bool = conn.query_row(
490            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
491            rusqlite::params![namespace],
492            |r| r.get::<_, i64>(0).map(|v| v > 0),
493        )?;
494        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
495            return Err(AppError::NamespaceError(format!(
496                "active namespace limit of {} exceeded while creating '{namespace}'",
497                crate::constants::MAX_NAMESPACES_ACTIVE
498            )));
499        }
500    }
501
502    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
503    if existing_memory.is_some() {
504        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
505            &staged.name,
506            namespace,
507        )));
508    }
509    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
510
511    let new_memory = NewMemory {
512        namespace: namespace.to_string(),
513        name: staged.name.clone(),
514        memory_type: memory_type.to_string(),
515        description: staged.description.clone(),
516        body: staged.body,
517        body_hash: staged.body_hash,
518        session_id: None,
519        source: "agent".to_string(),
520        metadata: serde_json::json!({}),
521    };
522
523    if let Some(hash_id) = duplicate_hash_id {
524        tracing::debug!(
525            target: "ingest",
526            duplicate_memory_id = hash_id,
527            "identical body already exists; persisting a new memory anyway"
528        );
529    }
530
531    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
532
533    let memory_id = memories::insert(&tx, &new_memory)?;
534    versions::insert_version(
535        &tx,
536        memory_id,
537        1,
538        &staged.name,
539        memory_type,
540        &staged.description,
541        &new_memory.body,
542        &serde_json::to_string(&new_memory.metadata)?,
543        None,
544        "create",
545    )?;
546    memories::upsert_vec(
547        &tx,
548        memory_id,
549        namespace,
550        memory_type,
551        &staged.embedding,
552        &staged.name,
553        &staged.snippet,
554    )?;
555
556    if staged.chunks_info.len() > 1 {
557        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
558        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
559            AppError::Internal(anyhow::anyhow!(
560                "missing chunk embeddings cache on multi-chunk ingest path"
561            ))
562        })?;
563        for (i, emb) in chunk_embeddings.iter().enumerate() {
564            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
565        }
566    }
567
568    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
569        for (idx, entity) in staged.entities.iter().enumerate() {
570            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
571            let entity_embedding = &staged.entity_embeddings[idx];
572            entities::upsert_entity_vec(
573                &tx,
574                entity_id,
575                namespace,
576                entity.entity_type,
577                entity_embedding,
578                &entity.name,
579            )?;
580            entities::link_memory_entity(&tx, memory_id, entity_id)?;
581            entities::increment_degree(&tx, entity_id)?;
582        }
583        let entity_types: std::collections::HashMap<&str, EntityType> = staged
584            .entities
585            .iter()
586            .map(|entity| (entity.name.as_str(), entity.entity_type))
587            .collect();
588        for rel in &staged.relationships {
589            let source_entity = NewEntity {
590                name: rel.source.clone(),
591                entity_type: entity_types
592                    .get(rel.source.as_str())
593                    .copied()
594                    .unwrap_or(EntityType::Concept),
595                description: None,
596            };
597            let target_entity = NewEntity {
598                name: rel.target.clone(),
599                entity_type: entity_types
600                    .get(rel.target.as_str())
601                    .copied()
602                    .unwrap_or(EntityType::Concept),
603                description: None,
604            };
605            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
606            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
607            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
608            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
609        }
610    }
611
612    tx.commit()?;
613
614    if !staged.urls.is_empty() {
615        let url_entries: Vec<storage_urls::MemoryUrl> = staged
616            .urls
617            .into_iter()
618            .map(|u| storage_urls::MemoryUrl {
619                url: u.url,
620                offset: Some(u.offset as i64),
621            })
622            .collect();
623        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
624    }
625
626    Ok(FileSuccess {
627        memory_id,
628        action: "created".to_string(),
629    })
630}
631
632pub fn run(args: IngestArgs) -> Result<(), AppError> {
633    let started = std::time::Instant::now();
634
635    if !args.dir.exists() {
636        return Err(AppError::Io(std::io::Error::new(
637            std::io::ErrorKind::NotFound,
638            format!("directory not found: {}", args.dir.display()),
639        )));
640    }
641    if !args.dir.is_dir() {
642        return Err(AppError::Validation(format!(
643            "path is not a directory: {}",
644            args.dir.display()
645        )));
646    }
647
648    let mut files: Vec<PathBuf> = Vec::new();
649    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
650    files.sort();
651
652    if files.len() > args.max_files {
653        return Err(AppError::Validation(format!(
654            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
655            files.len(),
656            args.max_files
657        )));
658    }
659
660    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
661    let memory_type_str = args.r#type.as_str().to_string();
662
663    let paths = AppPaths::resolve(args.db.as_deref())?;
664    let mut conn_or_err = match init_storage(&paths) {
665        Ok(c) => Ok(c),
666        Err(e) => Err(format!("{e}")),
667    };
668
669    let mut succeeded: usize = 0;
670    let mut failed: usize = 0;
671    let mut skipped: usize = 0;
672    let total = files.len();
673
674    // Pre-resolve all names before parallelisation so Phase A workers see a
675    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
676    let mut taken_names: BTreeSet<String> = BTreeSet::new();
677
678    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
679    // ProcessItem: the data moved into the producer thread for Phase A computation.
680    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
681    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
682    // boundary into the rayon producer.
683    enum SlotMeta {
684        Skip {
685            file_str: String,
686            derived_base: String,
687            name_truncated: bool,
688            original_name: Option<String>,
689            reason: String,
690        },
691        Process {
692            file_str: String,
693            derived_name: String,
694            name_truncated: bool,
695            original_name: Option<String>,
696        },
697    }
698
699    struct ProcessItem {
700        idx: usize,
701        path: PathBuf,
702        file_str: String,
703        derived_name: String,
704    }
705
706    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
707    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
708    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
709
710    for path in &files {
711        let file_str = path.to_string_lossy().into_owned();
712        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
713
714        if name_truncated {
715            if let Some(ref orig) = original_name {
716                truncations.push((orig.clone(), derived_base.clone()));
717            }
718        }
719
720        if derived_base.is_empty() {
721            slots_meta.push(SlotMeta::Skip {
722                file_str,
723                derived_base: String::new(),
724                name_truncated: false,
725                original_name: None,
726                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
727            });
728            continue;
729        }
730
731        match unique_name(&derived_base, &taken_names) {
732            Ok(derived_name) => {
733                taken_names.insert(derived_name.clone());
734                let idx = slots_meta.len();
735                process_items.push(ProcessItem {
736                    idx,
737                    path: path.clone(),
738                    file_str: file_str.clone(),
739                    derived_name: derived_name.clone(),
740                });
741                slots_meta.push(SlotMeta::Process {
742                    file_str,
743                    derived_name,
744                    name_truncated,
745                    original_name,
746                });
747            }
748            Err(e) => {
749                slots_meta.push(SlotMeta::Skip {
750                    file_str,
751                    derived_base,
752                    name_truncated,
753                    original_name,
754                    reason: e.to_string(),
755                });
756            }
757        }
758    }
759
760    if !truncations.is_empty() {
761        tracing::info!(
762            target: "ingest",
763            count = truncations.len(),
764            max_len = DERIVED_NAME_MAX_LEN,
765            "derived names truncated; pass -vv (debug) for per-file detail"
766        );
767    }
768
769    // Determine rayon thread pool size, honoring --low-memory and the
770    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
771    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
772
773    let pool = rayon::ThreadPoolBuilder::new()
774        .num_threads(parallelism)
775        .build()
776        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
777
778    if args.enable_ner && args.skip_extraction {
779        tracing::warn!(
780            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
781        );
782    }
783    if args.skip_extraction && !args.enable_ner {
784        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
785    }
786    let enable_ner = args.enable_ner;
787    let max_rss_mb = args.max_rss_mb;
788    let gliner_variant: crate::extraction::GlinerVariant =
789        args.gliner_variant.parse().unwrap_or_else(|e| {
790            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
791            crate::extraction::GlinerVariant::Fp32
792        });
793
794    let total_to_process = process_items.len();
795    tracing::info!(
796        target = "ingest",
797        phase = "pipeline_start",
798        files = total_to_process,
799        ingest_parallelism = parallelism,
800        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
801    );
802
803    // Bounded channel: producer never gets more than parallelism*2 items ahead of
804    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
805    // Each message carries the slot index so Phase B can look up SlotMeta in order.
806    let channel_bound = (parallelism * 2).max(1);
807    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
808
809    // Phase A: launched in a dedicated OS thread so the main thread can consume
810    // the channel concurrently. pool.install() blocks the calling thread until
811    // all rayon workers finish — if called on the main thread it would
812    // reintroduce the 2-phase blocking behaviour we are eliminating.
813    let paths_owned = paths.clone();
814    let producer_handle = std::thread::spawn(move || {
815        pool.install(|| {
816            process_items.into_par_iter().for_each(|item| {
817                let t0 = std::time::Instant::now();
818                let result = stage_file(
819                    item.idx,
820                    &item.path,
821                    &item.derived_name,
822                    &paths_owned,
823                    enable_ner,
824                    gliner_variant,
825                    max_rss_mb,
826                );
827                let elapsed_ms = t0.elapsed().as_millis() as u64;
828
829                // Emit NDJSON progress event to stderr so the user sees work
830                // happening during long NER runs (e.g. 50 files × 27s each).
831                let (n_entities, n_relationships) = match &result {
832                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
833                    Err(_) => (0, 0),
834                };
835                let progress = StageProgressEvent {
836                    schema_version: 1,
837                    event: "file_extracted",
838                    path: &item.file_str,
839                    ms: elapsed_ms,
840                    entities: n_entities,
841                    relationships: n_relationships,
842                };
843                if let Ok(line) = serde_json::to_string(&progress) {
844                    eprintln!("{line}");
845                }
846
847                // Blocking send applies backpressure: if Phase B is slower,
848                // Phase A workers wait here instead of accumulating staged files
849                // in memory. If the receiver is dropped (fail_fast abort), ignore.
850                let _ = tx.send((item.idx, result));
851            });
852            // Explicit drop of tx signals Phase B (rx iteration) to stop.
853            drop(tx);
854        });
855    });
856
857    // Phase B: main thread persists files as results arrive from the channel.
858    // Results arrive in completion order (par_iter is unordered). We persist
859    // each file immediately on arrival — this is the key fix for B1: with the
860    // old 2-phase design the first DB write happened only after ALL files had
861    // finished Phase A. Now the first commit happens as soon as the first file
862    // completes Phase A, regardless of how many files remain.
863    //
864    // NDJSON output order follows completion order (not file-system sort order).
865    // Skip slots are emitted at the end, after all Process results are consumed.
866    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
867    // requirement than ensuring data is persisted before the user's timeout fires.
868    let fail_fast = args.fail_fast;
869
870    // Emit pending Skip events first so agents see them early.
871    for meta in &slots_meta {
872        if let SlotMeta::Skip {
873            file_str,
874            derived_base,
875            name_truncated,
876            original_name,
877            reason,
878        } = meta
879        {
880            output::emit_json_compact(&IngestFileEvent {
881                file: file_str,
882                name: derived_base,
883                status: "skipped",
884                truncated: *name_truncated,
885                original_name: original_name.clone(),
886                error: Some(reason.clone()),
887                memory_id: None,
888                action: None,
889            })?;
890            skipped += 1;
891        }
892    }
893
894    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
895    // as channel messages arrive in completion order.
896    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
897        .iter()
898        .enumerate()
899        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
900        .collect();
901
902    tracing::info!(
903        target = "ingest",
904        phase = "persist_start",
905        files = total_to_process,
906        "phase B starting: persisting files incrementally as Phase A completes each one",
907    );
908
909    // Drain channel and persist each file immediately — no accumulation into a
910    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
911    // Phase B without applying backpressure.
912    for (idx, stage_result) in rx {
913        let meta = meta_index.get(&idx).ok_or_else(|| {
914            AppError::Internal(anyhow::anyhow!(
915                "channel idx {idx} has no corresponding Process slot"
916            ))
917        })?;
918        let (file_str, derived_name, name_truncated, original_name) = match meta {
919            SlotMeta::Process {
920                file_str,
921                derived_name,
922                name_truncated,
923                original_name,
924            } => (file_str, derived_name, name_truncated, original_name),
925            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
926        };
927
928        // If storage init failed, every file fails with the same error.
929        let conn = match conn_or_err.as_mut() {
930            Ok(c) => c,
931            Err(err_msg) => {
932                let err_clone = err_msg.clone();
933                output::emit_json_compact(&IngestFileEvent {
934                    file: file_str,
935                    name: derived_name,
936                    status: "failed",
937                    truncated: *name_truncated,
938                    original_name: original_name.clone(),
939                    error: Some(err_clone.clone()),
940                    memory_id: None,
941                    action: None,
942                })?;
943                failed += 1;
944                if fail_fast {
945                    output::emit_json_compact(&IngestSummary {
946                        summary: true,
947                        dir: args.dir.display().to_string(),
948                        pattern: args.pattern.clone(),
949                        recursive: args.recursive,
950                        files_total: total,
951                        files_succeeded: succeeded,
952                        files_failed: failed,
953                        files_skipped: skipped,
954                        elapsed_ms: started.elapsed().as_millis() as u64,
955                    })?;
956                    return Err(AppError::Validation(format!(
957                        "ingest aborted on first failure: {err_clone}"
958                    )));
959                }
960                continue;
961            }
962        };
963
964        let outcome =
965            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
966
967        match outcome {
968            Ok(FileSuccess { memory_id, action }) => {
969                output::emit_json_compact(&IngestFileEvent {
970                    file: file_str,
971                    name: derived_name,
972                    status: "indexed",
973                    truncated: *name_truncated,
974                    original_name: original_name.clone(),
975                    error: None,
976                    memory_id: Some(memory_id),
977                    action: Some(action),
978                })?;
979                succeeded += 1;
980            }
981            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
982                output::emit_json_compact(&IngestFileEvent {
983                    file: file_str,
984                    name: derived_name,
985                    status: "skipped",
986                    truncated: *name_truncated,
987                    original_name: original_name.clone(),
988                    error: Some(format!("{e}")),
989                    memory_id: None,
990                    action: Some("duplicate".to_string()),
991                })?;
992                skipped += 1;
993            }
994            Err(e) => {
995                let err_msg = format!("{e}");
996                output::emit_json_compact(&IngestFileEvent {
997                    file: file_str,
998                    name: derived_name,
999                    status: "failed",
1000                    truncated: *name_truncated,
1001                    original_name: original_name.clone(),
1002                    error: Some(err_msg.clone()),
1003                    memory_id: None,
1004                    action: None,
1005                })?;
1006                failed += 1;
1007                if fail_fast {
1008                    output::emit_json_compact(&IngestSummary {
1009                        summary: true,
1010                        dir: args.dir.display().to_string(),
1011                        pattern: args.pattern.clone(),
1012                        recursive: args.recursive,
1013                        files_total: total,
1014                        files_succeeded: succeeded,
1015                        files_failed: failed,
1016                        files_skipped: skipped,
1017                        elapsed_ms: started.elapsed().as_millis() as u64,
1018                    })?;
1019                    return Err(AppError::Validation(format!(
1020                        "ingest aborted on first failure: {err_msg}"
1021                    )));
1022                }
1023            }
1024        }
1025    }
1026
1027    // Wait for the producer thread to finish cleanly.
1028    producer_handle
1029        .join()
1030        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1031
1032    output::emit_json_compact(&IngestSummary {
1033        summary: true,
1034        dir: args.dir.display().to_string(),
1035        pattern: args.pattern.clone(),
1036        recursive: args.recursive,
1037        files_total: total,
1038        files_succeeded: succeeded,
1039        files_failed: failed,
1040        files_skipped: skipped,
1041        elapsed_ms: started.elapsed().as_millis() as u64,
1042    })?;
1043
1044    Ok(())
1045}
1046
1047/// Auto-initialises the database (matches the contract of every other CRUD
1048/// handler) and returns a fresh read/write connection ready for the ingest
1049/// loop. Errors here are recoverable per-file: the caller surfaces them as
1050/// failure events so `--fail-fast` and the continue-on-error path keep
1051/// working when, for example, the user points `--db` at an unwritable path.
1052fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1053    ensure_db_ready(paths)?;
1054    let conn = open_rw(&paths.db)?;
1055    Ok(conn)
1056}
1057
1058fn collect_files(
1059    dir: &Path,
1060    pattern: &str,
1061    recursive: bool,
1062    out: &mut Vec<PathBuf>,
1063) -> Result<(), AppError> {
1064    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1065    for entry in entries {
1066        let entry = entry.map_err(AppError::Io)?;
1067        let path = entry.path();
1068        let file_type = entry.file_type().map_err(AppError::Io)?;
1069        if file_type.is_file() {
1070            let name = entry.file_name();
1071            let name_str = name.to_string_lossy();
1072            if matches_pattern(&name_str, pattern) {
1073                out.push(path);
1074            }
1075        } else if file_type.is_dir() && recursive {
1076            collect_files(&path, pattern, recursive, out)?;
1077        }
1078    }
1079    Ok(())
1080}
1081
1082fn matches_pattern(name: &str, pattern: &str) -> bool {
1083    if let Some(suffix) = pattern.strip_prefix('*') {
1084        name.ends_with(suffix)
1085    } else if let Some(prefix) = pattern.strip_suffix('*') {
1086        name.starts_with(prefix)
1087    } else {
1088        name == pattern
1089    }
1090}
1091
1092/// Returns `(final_name, truncated, original_name)`.
1093/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1094/// `original_name` holds the pre-truncation name only when `truncated=true`.
1095///
1096/// Non-ASCII characters are first decomposed via NFD and then stripped of
1097/// combining marks so accented letters fold to their base ASCII letter
1098/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1099/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1100/// preserves meaningful word content rather than collapsing the basename
1101/// to a few stray ASCII letters as the previous filter did.
1102fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1103    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1104    let lowered: String = stem
1105        .nfd()
1106        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1107        .map(|c| {
1108            if c == '_' || c.is_whitespace() {
1109                '-'
1110            } else {
1111                c
1112            }
1113        })
1114        .map(|c| c.to_ascii_lowercase())
1115        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1116        .collect();
1117    let collapsed = collapse_dashes(&lowered);
1118    let trimmed = collapsed.trim_matches('-').to_string();
1119    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1120        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1121            .trim_matches('-')
1122            .to_string();
1123        tracing::debug!(
1124            target: "ingest",
1125            original = %trimmed,
1126            truncated_to = %truncated,
1127            max_len = DERIVED_NAME_MAX_LEN,
1128            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1129        );
1130        (truncated, true, Some(trimmed))
1131    } else {
1132        (trimmed, false, None)
1133    }
1134}
1135
1136/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1137/// numeric suffix (`-1`, `-2`, …) when needed.
1138///
1139/// `taken` is the set of names already consumed in the current ingest run.
1140/// The caller is expected to insert the returned name into `taken` so the
1141/// next call observes the consumption. Cross-run collisions are intentionally
1142/// surfaced by the per-file persistence path as duplicates so re-ingestion
1143/// of identical corpora stays idempotent.
1144///
1145/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1146/// candidates collide, signalling a pathological corpus that should be
1147/// renamed manually.
1148fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1149    if !taken.contains(base) {
1150        return Ok(base.to_string());
1151    }
1152    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1153        let candidate = format!("{base}-{suffix}");
1154        if !taken.contains(&candidate) {
1155            tracing::warn!(
1156                target: "ingest",
1157                base = %base,
1158                resolved = %candidate,
1159                suffix,
1160                "memory name collision resolved with numeric suffix"
1161            );
1162            return Ok(candidate);
1163        }
1164    }
1165    Err(AppError::Validation(format!(
1166        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1167    )))
1168}
1169
1170fn collapse_dashes(s: &str) -> String {
1171    let mut out = String::with_capacity(s.len());
1172    let mut prev_dash = false;
1173    for c in s.chars() {
1174        if c == '-' {
1175            if !prev_dash {
1176                out.push('-');
1177            }
1178            prev_dash = true;
1179        } else {
1180            out.push(c);
1181            prev_dash = false;
1182        }
1183    }
1184    out
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189    use super::*;
1190    use std::path::PathBuf;
1191
1192    #[test]
1193    fn matches_pattern_suffix() {
1194        assert!(matches_pattern("foo.md", "*.md"));
1195        assert!(!matches_pattern("foo.txt", "*.md"));
1196        assert!(matches_pattern("foo.md", "*"));
1197    }
1198
1199    #[test]
1200    fn matches_pattern_prefix() {
1201        assert!(matches_pattern("README.md", "README*"));
1202        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1203    }
1204
1205    #[test]
1206    fn matches_pattern_exact() {
1207        assert!(matches_pattern("README.md", "README.md"));
1208        assert!(!matches_pattern("readme.md", "README.md"));
1209    }
1210
1211    #[test]
1212    fn derive_kebab_underscore_to_dash() {
1213        let p = PathBuf::from("/tmp/claude_code_headless.md");
1214        let (name, truncated, original) = derive_kebab_name(&p);
1215        assert_eq!(name, "claude-code-headless");
1216        assert!(!truncated);
1217        assert!(original.is_none());
1218    }
1219
1220    #[test]
1221    fn derive_kebab_uppercase_lowered() {
1222        let p = PathBuf::from("/tmp/README.md");
1223        let (name, truncated, original) = derive_kebab_name(&p);
1224        assert_eq!(name, "readme");
1225        assert!(!truncated);
1226        assert!(original.is_none());
1227    }
1228
1229    #[test]
1230    fn derive_kebab_strips_non_kebab_chars() {
1231        let p = PathBuf::from("/tmp/some@weird#name!.md");
1232        let (name, truncated, original) = derive_kebab_name(&p);
1233        assert_eq!(name, "someweirdname");
1234        assert!(!truncated);
1235        assert!(original.is_none());
1236    }
1237
1238    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1239    // accented characters instead of dropping them entirely.
1240    #[test]
1241    fn derive_kebab_folds_accented_letters_to_ascii() {
1242        let p = PathBuf::from("/tmp/açaí.md");
1243        let (name, _, _) = derive_kebab_name(&p);
1244        assert_eq!(name, "acai", "got '{name}'");
1245    }
1246
1247    #[test]
1248    fn derive_kebab_handles_naive_with_diaeresis() {
1249        let p = PathBuf::from("/tmp/naïve-test.md");
1250        let (name, _, _) = derive_kebab_name(&p);
1251        assert_eq!(name, "naive-test", "got '{name}'");
1252    }
1253
1254    #[test]
1255    fn derive_kebab_drops_emoji_keeps_word() {
1256        let p = PathBuf::from("/tmp/🚀-rocket.md");
1257        let (name, _, _) = derive_kebab_name(&p);
1258        assert_eq!(name, "rocket", "got '{name}'");
1259    }
1260
1261    #[test]
1262    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1263        let p = PathBuf::from("/tmp/açaí🦜.md");
1264        let (name, _, _) = derive_kebab_name(&p);
1265        assert_eq!(name, "acai", "got '{name}'");
1266    }
1267
1268    #[test]
1269    fn derive_kebab_pure_emoji_yields_empty() {
1270        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1271        let (name, _, _) = derive_kebab_name(&p);
1272        assert!(name.is_empty(), "got '{name}'");
1273    }
1274
1275    #[test]
1276    fn derive_kebab_collapses_consecutive_dashes() {
1277        let p = PathBuf::from("/tmp/a__b___c.md");
1278        let (name, truncated, original) = derive_kebab_name(&p);
1279        assert_eq!(name, "a-b-c");
1280        assert!(!truncated);
1281        assert!(original.is_none());
1282    }
1283
1284    #[test]
1285    fn derive_kebab_truncates_to_60_chars() {
1286        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1287        let (name, truncated, original) = derive_kebab_name(&p);
1288        assert!(name.len() <= 60, "got len {}", name.len());
1289        assert!(truncated);
1290        assert!(original.is_some());
1291        assert!(original.unwrap().len() > 60);
1292    }
1293
1294    #[test]
1295    fn collect_files_finds_md_files() {
1296        let tmp = tempfile::tempdir().expect("tempdir");
1297        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1298        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1299        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1300        let mut out = Vec::new();
1301        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1302        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1303    }
1304
1305    #[test]
1306    fn collect_files_recursive_descends_subdirs() {
1307        let tmp = tempfile::tempdir().expect("tempdir");
1308        let sub = tmp.path().join("sub");
1309        std::fs::create_dir(&sub).unwrap();
1310        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1311        std::fs::write(sub.join("b.md"), "y").unwrap();
1312        let mut out = Vec::new();
1313        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1314        assert_eq!(out.len(), 2);
1315    }
1316
1317    #[test]
1318    fn collect_files_non_recursive_skips_subdirs() {
1319        let tmp = tempfile::tempdir().expect("tempdir");
1320        let sub = tmp.path().join("sub");
1321        std::fs::create_dir(&sub).unwrap();
1322        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1323        std::fs::write(sub.join("b.md"), "y").unwrap();
1324        let mut out = Vec::new();
1325        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1326        assert_eq!(out.len(), 1);
1327    }
1328
1329    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1330
1331    #[test]
1332    fn derive_kebab_long_basename_truncated_within_cap() {
1333        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1334        let (name, truncated, original) = derive_kebab_name(&p);
1335        assert!(
1336            name.len() <= DERIVED_NAME_MAX_LEN,
1337            "truncated name must respect cap; got {} chars",
1338            name.len()
1339        );
1340        assert!(!name.is_empty());
1341        assert!(truncated);
1342        assert!(original.is_some());
1343    }
1344
1345    #[test]
1346    fn unique_name_returns_base_when_free() {
1347        let taken: BTreeSet<String> = BTreeSet::new();
1348        let resolved = unique_name("note", &taken).expect("must resolve");
1349        assert_eq!(resolved, "note");
1350    }
1351
1352    #[test]
1353    fn unique_name_appends_first_free_suffix_on_collision() {
1354        let mut taken: BTreeSet<String> = BTreeSet::new();
1355        taken.insert("note".to_string());
1356        taken.insert("note-1".to_string());
1357        let resolved = unique_name("note", &taken).expect("must resolve");
1358        assert_eq!(resolved, "note-2");
1359    }
1360
1361    #[test]
1362    fn unique_name_errors_after_collision_cap() {
1363        let mut taken: BTreeSet<String> = BTreeSet::new();
1364        taken.insert("note".to_string());
1365        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1366            taken.insert(format!("note-{i}"));
1367        }
1368        let err = unique_name("note", &taken).expect_err("must surface error");
1369        assert!(matches!(err, AppError::Validation(_)));
1370    }
1371
1372    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1373
1374    #[test]
1375    fn validate_relation_format_accepts_valid_relations() {
1376        use crate::parsers::{is_canonical_relation, validate_relation_format};
1377        assert!(validate_relation_format("applies_to").is_ok());
1378        assert!(validate_relation_format("depends_on").is_ok());
1379        assert!(validate_relation_format("implements").is_ok());
1380        assert!(validate_relation_format("").is_err());
1381        assert!(is_canonical_relation("applies_to"));
1382        assert!(!is_canonical_relation("implements"));
1383    }
1384
1385    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1386
1387    use serial_test::serial;
1388
1389    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1390    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1391        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1392        let prev = std::env::var(key).ok();
1393        match value {
1394            Some(v) => std::env::set_var(key, v),
1395            None => std::env::remove_var(key),
1396        }
1397        f();
1398        match prev {
1399            Some(p) => std::env::set_var(key, p),
1400            None => std::env::remove_var(key),
1401        }
1402    }
1403
1404    #[test]
1405    #[serial]
1406    fn env_low_memory_enabled_unset_returns_false() {
1407        with_env_var(None, || assert!(!env_low_memory_enabled()));
1408    }
1409
1410    #[test]
1411    #[serial]
1412    fn env_low_memory_enabled_empty_returns_false() {
1413        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1414    }
1415
1416    #[test]
1417    #[serial]
1418    fn env_low_memory_enabled_truthy_values_return_true() {
1419        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1420            with_env_var(Some(v), || {
1421                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1422            });
1423        }
1424    }
1425
1426    #[test]
1427    #[serial]
1428    fn env_low_memory_enabled_falsy_values_return_false() {
1429        for v in ["0", "false", "FALSE", "no", "off"] {
1430            with_env_var(Some(v), || {
1431                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1432            });
1433        }
1434    }
1435
1436    #[test]
1437    #[serial]
1438    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1439        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1440    }
1441
1442    #[test]
1443    #[serial]
1444    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1445        with_env_var(None, || {
1446            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1447            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1448            assert_eq!(resolve_parallelism(true, None), 1);
1449        });
1450    }
1451
1452    #[test]
1453    #[serial]
1454    fn resolve_parallelism_env_forces_one_when_flag_off() {
1455        with_env_var(Some("1"), || {
1456            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1457            assert_eq!(resolve_parallelism(false, None), 1);
1458        });
1459    }
1460
1461    #[test]
1462    #[serial]
1463    fn resolve_parallelism_falsy_env_does_not_override() {
1464        with_env_var(Some("0"), || {
1465            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1466        });
1467    }
1468
1469    #[test]
1470    #[serial]
1471    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1472        with_env_var(None, || {
1473            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1474            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1475        });
1476    }
1477
1478    #[test]
1479    #[serial]
1480    fn resolve_parallelism_default_when_unset() {
1481        with_env_var(None, || {
1482            let p = resolve_parallelism(false, None);
1483            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1484        });
1485    }
1486
1487    #[test]
1488    fn ingest_args_parses_low_memory_flag_via_clap() {
1489        use clap::Parser;
1490        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1491        // on the public `Cli` definition so the flag is wired end-to-end.
1492        let cli = crate::cli::Cli::try_parse_from([
1493            "sqlite-graphrag",
1494            "ingest",
1495            "/tmp/dummy",
1496            "--type",
1497            "document",
1498            "--low-memory",
1499        ])
1500        .expect("parse must succeed");
1501        match cli.command {
1502            crate::cli::Commands::Ingest(args) => {
1503                assert!(args.low_memory, "--low-memory must set field to true");
1504            }
1505            _ => panic!("expected Ingest subcommand"),
1506        }
1507    }
1508
1509    #[test]
1510    fn ingest_args_low_memory_defaults_false() {
1511        use clap::Parser;
1512        let cli = crate::cli::Cli::try_parse_from([
1513            "sqlite-graphrag",
1514            "ingest",
1515            "/tmp/dummy",
1516            "--type",
1517            "document",
1518        ])
1519        .expect("parse must succeed");
1520        match cli.command {
1521            crate::cli::Commands::Ingest(args) => {
1522                assert!(!args.low_memory, "default must be false");
1523            }
1524            _ => panic!("expected Ingest subcommand"),
1525        }
1526    }
1527}