Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70NOTES:\n  \
71    Each file becomes a separate memory. Names derive from file basenames\n  \
72    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
73    followed by a final summary line with counts. Per-file errors are reported\n  \
74    inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76    /// Directory containing files to ingest.
77    #[arg(
78        value_name = "DIR",
79        help = "Directory to ingest recursively (each matching file becomes a memory)"
80    )]
81    pub dir: PathBuf,
82
83    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
84    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85    pub r#type: MemoryType,
86
87    /// Glob pattern matched against file basenames (default: `*.md`). Supports
88    /// `*.<ext>`, `<prefix>*`, and exact filename match.
89    #[arg(long, default_value = "*.md")]
90    pub pattern: String,
91
92    /// Recurse into subdirectories.
93    #[arg(long, default_value_t = false)]
94    pub recursive: bool,
95
96    #[arg(
97        long,
98        env = "SQLITE_GRAPHRAG_ENABLE_NER",
99        value_parser = crate::parsers::parse_bool_flexible,
100        action = clap::ArgAction::Set,
101        num_args = 0..=1,
102        default_missing_value = "true",
103        default_value = "false",
104        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105    )]
106    pub enable_ner: bool,
107    #[arg(
108        long,
109        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110        default_value = "fp32",
111        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112    )]
113    pub gliner_variant: String,
114
115    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
116    #[arg(long, default_value_t = false, hide = true)]
117    pub skip_extraction: bool,
118
119    /// Stop on first per-file error instead of continuing with the next file.
120    #[arg(long, default_value_t = false)]
121    pub fail_fast: bool,
122
123    /// Preview file-to-name mapping without loading model or persisting.
124    #[arg(long, default_value_t = false)]
125    pub dry_run: bool,
126
127    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
128    #[arg(long, default_value_t = 10_000)]
129    pub max_files: usize,
130
131    /// Namespace for the ingested memories.
132    #[arg(long)]
133    pub namespace: Option<String>,
134
135    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
136    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137    pub db: Option<String>,
138
139    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140    pub format: JsonOutputFormat,
141
142    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143    pub json: bool,
144
145    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
146    #[arg(
147        long,
148        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149    )]
150    pub ingest_parallelism: Option<usize>,
151
152    /// Force single-threaded ingest to reduce RSS pressure.
153    ///
154    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
155    /// explicit value. Recommended for environments with <4 GB available
156    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
157    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
158    /// (CLI flag has higher precedence than the env var).
159    #[arg(
160        long,
161        default_value_t = false,
162        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163                Recommended for environments with <4 GB available RAM or container/cgroup \
164                constraints. Trade-off: 3-4x longer wall time. Also honored via \
165                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166    )]
167    pub low_memory: bool,
168
169    /// Maximum process RSS in MiB; abort if exceeded during embedding.
170    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172    pub max_rss_mb: u64,
173}
174
175/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
176/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
177/// values evaluate to false. Unrecognized non-empty values emit a
178/// `tracing::warn!` and evaluate to false.
179fn env_low_memory_enabled() -> bool {
180    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
181        Ok(v) if v.is_empty() => false,
182        Ok(v) => match v.to_lowercase().as_str() {
183            "1" | "true" | "yes" | "on" => true,
184            "0" | "false" | "no" | "off" => false,
185            other => {
186                tracing::warn!(
187                    target: "ingest",
188                    value = %other,
189                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
190                );
191                false
192            }
193        },
194        Err(_) => false,
195    }
196}
197
198/// Resolves the effective ingest parallelism honoring `--low-memory` and the
199/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
200///
201/// Precedence:
202/// 1. `--low-memory` CLI flag forces parallelism = 1.
203/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
204/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
205/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
206///
207/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
208/// emits a `tracing::warn!` advertising the override.
209fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
210    let env_flag = env_low_memory_enabled();
211    let low_memory = low_memory_flag || env_flag;
212
213    if low_memory {
214        if let Some(n) = ingest_parallelism {
215            if n > 1 {
216                tracing::warn!(
217                    target: "ingest",
218                    requested = n,
219                    "--ingest-parallelism overridden by --low-memory; using 1"
220                );
221            }
222        }
223        if low_memory_flag {
224            tracing::info!(
225                target: "ingest",
226                source = "flag",
227                "low-memory mode enabled: forcing --ingest-parallelism 1"
228            );
229        } else {
230            tracing::info!(
231                target: "ingest",
232                source = "env",
233                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
234            );
235        }
236        return 1;
237    }
238
239    ingest_parallelism
240        .unwrap_or_else(|| {
241            std::thread::available_parallelism()
242                .map(|v| v.get() / 2)
243                .unwrap_or(1)
244                .clamp(1, 4)
245        })
246        .max(1)
247}
248
249#[derive(Serialize)]
250struct IngestFileEvent<'a> {
251    file: &'a str,
252    name: &'a str,
253    status: &'a str,
254    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
255    truncated: bool,
256    /// Original derived name before truncation; only present when `truncated=true`.
257    #[serde(skip_serializing_if = "Option::is_none")]
258    original_name: Option<String>,
259    /// Original file basename (without extension); only present when it differs from `name`.
260    #[serde(skip_serializing_if = "Option::is_none")]
261    original_filename: Option<&'a str>,
262    #[serde(skip_serializing_if = "Option::is_none")]
263    error: Option<String>,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    memory_id: Option<i64>,
266    #[serde(skip_serializing_if = "Option::is_none")]
267    action: Option<String>,
268}
269
270#[derive(Serialize)]
271struct IngestSummary {
272    summary: bool,
273    dir: String,
274    pattern: String,
275    recursive: bool,
276    files_total: usize,
277    files_succeeded: usize,
278    files_failed: usize,
279    files_skipped: usize,
280    elapsed_ms: u64,
281}
282
283/// Outcome of a successful per-file ingest, used to build the NDJSON event.
284struct FileSuccess {
285    memory_id: i64,
286    action: String,
287}
288
289/// NDJSON progress event emitted to stderr after each file completes Phase A.
290/// Schema version 1; consumers should check `schema_version` before parsing.
291#[derive(Serialize)]
292struct StageProgressEvent<'a> {
293    schema_version: u8,
294    event: &'a str,
295    path: &'a str,
296    ms: u64,
297    entities: usize,
298    relationships: usize,
299}
300
301/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
302/// Phase B persists these to SQLite on the main thread in submission order.
303struct StagedFile {
304    body: String,
305    body_hash: String,
306    snippet: String,
307    name: String,
308    description: String,
309    embedding: Vec<f32>,
310    chunk_embeddings: Option<Vec<Vec<f32>>>,
311    chunks_info: Vec<crate::chunking::Chunk>,
312    entities: Vec<NewEntity>,
313    relationships: Vec<NewRelationship>,
314    entity_embeddings: Vec<Vec<f32>>,
315    urls: Vec<crate::extraction::ExtractedUrl>,
316}
317
318/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
319/// Never touches the database — safe to run on any rayon thread.
320fn stage_file(
321    _idx: usize,
322    path: &Path,
323    name: &str,
324    paths: &AppPaths,
325    enable_ner: bool,
326    gliner_variant: crate::extraction::GlinerVariant,
327    max_rss_mb: u64,
328) -> Result<StagedFile, AppError> {
329    use crate::constants::*;
330
331    if name.len() > MAX_MEMORY_NAME_LEN {
332        return Err(AppError::LimitExceeded(
333            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
334        ));
335    }
336    if name.starts_with("__") {
337        return Err(AppError::Validation(
338            crate::i18n::validation::reserved_name(),
339        ));
340    }
341    {
342        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
343            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
344        if !slug_re.is_match(name) {
345            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
346                name,
347            )));
348        }
349    }
350
351    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
352    if raw_body.len() > MAX_MEMORY_BODY_LEN {
353        return Err(AppError::LimitExceeded(
354            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
355        ));
356    }
357    if raw_body.trim().is_empty() {
358        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
359    }
360
361    let description = format!("ingested from {}", path.display());
362    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
363        return Err(AppError::Validation(
364            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
365        ));
366    }
367
368    let mut extracted_entities: Vec<NewEntity> = Vec::new();
369    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
370    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
371    if enable_ner {
372        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
373            Ok(extracted) => {
374                extracted_urls = extracted.urls;
375                extracted_entities = extracted.entities;
376                extracted_relationships = extracted.relationships;
377
378                if extracted_entities.len() > max_entities_per_memory() {
379                    extracted_entities.truncate(max_entities_per_memory());
380                }
381                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
382                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
383                }
384            }
385            Err(e) => {
386                tracing::warn!(
387                    file = %path.display(),
388                    "auto-extraction failed (graceful degradation): {e:#}"
389                );
390            }
391        }
392    }
393
394    for rel in &mut extracted_relationships {
395        rel.relation = crate::parsers::normalize_relation(&rel.relation);
396        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
397            return Err(AppError::Validation(format!(
398                "{e} for relationship '{}' -> '{}'",
399                rel.source, rel.target
400            )));
401        }
402        crate::parsers::warn_if_non_canonical(&rel.relation);
403        if !(0.0..=1.0).contains(&rel.strength) {
404            return Err(AppError::Validation(format!(
405                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
406                rel.strength, rel.source, rel.target
407            )));
408        }
409    }
410
411    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
412    let snippet: String = raw_body.chars().take(200).collect();
413
414    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
415    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
416    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
417        return Err(AppError::LimitExceeded(format!(
418            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
419            chunks_info.len(),
420            REMEMBER_MAX_SAFE_MULTI_CHUNKS
421        )));
422    }
423
424    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
425    let embedding = if chunks_info.len() == 1 {
426        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
427    } else {
428        let chunk_texts: Vec<&str> = chunks_info
429            .iter()
430            .map(|c| chunking::chunk_text(&raw_body, c))
431            .collect();
432        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
433        for chunk_text in &chunk_texts {
434            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
435                if rss > max_rss_mb {
436                    tracing::error!(
437                        rss_mb = rss,
438                        max_rss_mb = max_rss_mb,
439                        file = %path.display(),
440                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
441                    );
442                    return Err(AppError::LowMemory {
443                        available_mb: crate::memory_guard::available_memory_mb(),
444                        required_mb: max_rss_mb,
445                    });
446                }
447            }
448            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
449                &paths.models,
450                chunk_text,
451            )?);
452        }
453        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
454        chunk_embeddings_opt = Some(chunk_embeddings);
455        aggregated
456    };
457
458    let entity_embeddings = extracted_entities
459        .iter()
460        .map(|entity| {
461            let entity_text = match &entity.description {
462                Some(desc) => format!("{} {}", entity.name, desc),
463                None => entity.name.clone(),
464            };
465            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
466        })
467        .collect::<Result<Vec<_>, _>>()?;
468
469    Ok(StagedFile {
470        body: raw_body,
471        body_hash,
472        snippet,
473        name: name.to_string(),
474        description,
475        embedding,
476        chunk_embeddings: chunk_embeddings_opt,
477        chunks_info,
478        entities: extracted_entities,
479        relationships: extracted_relationships,
480        entity_embeddings,
481        urls: extracted_urls,
482    })
483}
484
485/// Phase B: persists one `StagedFile` to the database on the main thread.
486fn persist_staged(
487    conn: &mut Connection,
488    namespace: &str,
489    memory_type: &str,
490    staged: StagedFile,
491) -> Result<FileSuccess, AppError> {
492    {
493        let active_count: u32 = conn.query_row(
494            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
495            [],
496            |r| r.get::<_, i64>(0).map(|v| v as u32),
497        )?;
498        let ns_exists: bool = conn.query_row(
499            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
500            rusqlite::params![namespace],
501            |r| r.get::<_, i64>(0).map(|v| v > 0),
502        )?;
503        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
504            return Err(AppError::NamespaceError(format!(
505                "active namespace limit of {} exceeded while creating '{namespace}'",
506                crate::constants::MAX_NAMESPACES_ACTIVE
507            )));
508        }
509    }
510
511    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
512    if existing_memory.is_some() {
513        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
514            &staged.name,
515            namespace,
516        )));
517    }
518    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
519
520    let new_memory = NewMemory {
521        namespace: namespace.to_string(),
522        name: staged.name.clone(),
523        memory_type: memory_type.to_string(),
524        description: staged.description.clone(),
525        body: staged.body,
526        body_hash: staged.body_hash,
527        session_id: None,
528        source: "agent".to_string(),
529        metadata: serde_json::json!({}),
530    };
531
532    if let Some(hash_id) = duplicate_hash_id {
533        tracing::debug!(
534            target: "ingest",
535            duplicate_memory_id = hash_id,
536            "identical body already exists; persisting a new memory anyway"
537        );
538    }
539
540    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
541
542    let memory_id = memories::insert(&tx, &new_memory)?;
543    versions::insert_version(
544        &tx,
545        memory_id,
546        1,
547        &staged.name,
548        memory_type,
549        &staged.description,
550        &new_memory.body,
551        &serde_json::to_string(&new_memory.metadata)?,
552        None,
553        "create",
554    )?;
555    memories::upsert_vec(
556        &tx,
557        memory_id,
558        namespace,
559        memory_type,
560        &staged.embedding,
561        &staged.name,
562        &staged.snippet,
563    )?;
564
565    if staged.chunks_info.len() > 1 {
566        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
567        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
568            AppError::Internal(anyhow::anyhow!(
569                "missing chunk embeddings cache on multi-chunk ingest path"
570            ))
571        })?;
572        for (i, emb) in chunk_embeddings.iter().enumerate() {
573            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
574        }
575    }
576
577    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
578        for (idx, entity) in staged.entities.iter().enumerate() {
579            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
580            let entity_embedding = &staged.entity_embeddings[idx];
581            entities::upsert_entity_vec(
582                &tx,
583                entity_id,
584                namespace,
585                entity.entity_type,
586                entity_embedding,
587                &entity.name,
588            )?;
589            entities::link_memory_entity(&tx, memory_id, entity_id)?;
590            entities::increment_degree(&tx, entity_id)?;
591        }
592        let entity_types: std::collections::HashMap<&str, EntityType> = staged
593            .entities
594            .iter()
595            .map(|entity| (entity.name.as_str(), entity.entity_type))
596            .collect();
597        for rel in &staged.relationships {
598            let source_entity = NewEntity {
599                name: rel.source.clone(),
600                entity_type: entity_types
601                    .get(rel.source.as_str())
602                    .copied()
603                    .unwrap_or(EntityType::Concept),
604                description: None,
605            };
606            let target_entity = NewEntity {
607                name: rel.target.clone(),
608                entity_type: entity_types
609                    .get(rel.target.as_str())
610                    .copied()
611                    .unwrap_or(EntityType::Concept),
612                description: None,
613            };
614            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
615            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
616            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
617            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
618        }
619    }
620
621    tx.commit()?;
622
623    if !staged.urls.is_empty() {
624        let url_entries: Vec<storage_urls::MemoryUrl> = staged
625            .urls
626            .into_iter()
627            .map(|u| storage_urls::MemoryUrl {
628                url: u.url,
629                offset: Some(u.offset as i64),
630            })
631            .collect();
632        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
633    }
634
635    Ok(FileSuccess {
636        memory_id,
637        action: "created".to_string(),
638    })
639}
640
641pub fn run(args: IngestArgs) -> Result<(), AppError> {
642    let started = std::time::Instant::now();
643
644    if !args.dir.exists() {
645        return Err(AppError::Validation(format!(
646            "directory not found: {}",
647            args.dir.display()
648        )));
649    }
650    if !args.dir.is_dir() {
651        return Err(AppError::Validation(format!(
652            "path is not a directory: {}",
653            args.dir.display()
654        )));
655    }
656
657    let mut files: Vec<PathBuf> = Vec::new();
658    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
659    files.sort();
660
661    if files.len() > args.max_files {
662        return Err(AppError::Validation(format!(
663            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
664            files.len(),
665            args.max_files
666        )));
667    }
668
669    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
670    let memory_type_str = args.r#type.as_str().to_string();
671
672    let paths = AppPaths::resolve(args.db.as_deref())?;
673    let mut conn_or_err = match init_storage(&paths) {
674        Ok(c) => Ok(c),
675        Err(e) => Err(format!("{e}")),
676    };
677
678    let mut succeeded: usize = 0;
679    let mut failed: usize = 0;
680    let mut skipped: usize = 0;
681    let total = files.len();
682
683    // Pre-resolve all names before parallelisation so Phase A workers see a
684    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
685    let mut taken_names: BTreeSet<String> = BTreeSet::new();
686
687    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
688    // ProcessItem: the data moved into the producer thread for Phase A computation.
689    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
690    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
691    // boundary into the rayon producer.
692    enum SlotMeta {
693        Skip {
694            file_str: String,
695            derived_base: String,
696            name_truncated: bool,
697            original_name: Option<String>,
698            original_filename: Option<String>,
699            reason: String,
700        },
701        Process {
702            file_str: String,
703            derived_name: String,
704            name_truncated: bool,
705            original_name: Option<String>,
706            original_filename: Option<String>,
707        },
708    }
709
710    struct ProcessItem {
711        idx: usize,
712        path: PathBuf,
713        file_str: String,
714        derived_name: String,
715    }
716
717    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
718    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
719    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
720
721    for path in &files {
722        let file_str = path.to_string_lossy().into_owned();
723        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
724        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
725
726        if name_truncated {
727            if let Some(ref orig) = original_name {
728                truncations.push((orig.clone(), derived_base.clone()));
729            }
730        }
731
732        if derived_base.is_empty() {
733            // original_filename: always include when it differs from the empty derived name
734            let orig_filename = if !original_basename.is_empty() {
735                Some(original_basename.to_string())
736            } else {
737                None
738            };
739            slots_meta.push(SlotMeta::Skip {
740                file_str,
741                derived_base: String::new(),
742                name_truncated: false,
743                original_name: None,
744                original_filename: orig_filename,
745                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
746            });
747            continue;
748        }
749
750        match unique_name(&derived_base, &taken_names) {
751            Ok(derived_name) => {
752                taken_names.insert(derived_name.clone());
753                let idx = slots_meta.len();
754                // original_filename: present only when the raw basename differs from the derived name
755                let orig_filename = if original_basename != derived_name {
756                    Some(original_basename.to_string())
757                } else {
758                    None
759                };
760                process_items.push(ProcessItem {
761                    idx,
762                    path: path.clone(),
763                    file_str: file_str.clone(),
764                    derived_name: derived_name.clone(),
765                });
766                slots_meta.push(SlotMeta::Process {
767                    file_str,
768                    derived_name,
769                    name_truncated,
770                    original_name,
771                    original_filename: orig_filename,
772                });
773            }
774            Err(e) => {
775                let orig_filename = if original_basename != derived_base {
776                    Some(original_basename.to_string())
777                } else {
778                    None
779                };
780                slots_meta.push(SlotMeta::Skip {
781                    file_str,
782                    derived_base,
783                    name_truncated,
784                    original_name,
785                    original_filename: orig_filename,
786                    reason: e.to_string(),
787                });
788            }
789        }
790    }
791
792    if !truncations.is_empty() {
793        tracing::info!(
794            target: "ingest",
795            count = truncations.len(),
796            max_len = DERIVED_NAME_MAX_LEN,
797            "derived names truncated; pass -vv (debug) for per-file detail"
798        );
799    }
800
801    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
802    if args.dry_run {
803        for meta in &slots_meta {
804            match meta {
805                SlotMeta::Skip {
806                    file_str,
807                    derived_base,
808                    name_truncated,
809                    original_name,
810                    original_filename,
811                    reason,
812                } => {
813                    output::emit_json_compact(&IngestFileEvent {
814                        file: file_str,
815                        name: derived_base,
816                        status: "skip",
817                        truncated: *name_truncated,
818                        original_name: original_name.clone(),
819                        original_filename: original_filename.as_deref(),
820                        error: Some(reason.clone()),
821                        memory_id: None,
822                        action: None,
823                    })?;
824                }
825                SlotMeta::Process {
826                    file_str,
827                    derived_name,
828                    name_truncated,
829                    original_name,
830                    original_filename,
831                } => {
832                    output::emit_json_compact(&IngestFileEvent {
833                        file: file_str,
834                        name: derived_name,
835                        status: "preview",
836                        truncated: *name_truncated,
837                        original_name: original_name.clone(),
838                        original_filename: original_filename.as_deref(),
839                        error: None,
840                        memory_id: None,
841                        action: None,
842                    })?;
843                }
844            }
845        }
846        output::emit_json_compact(&IngestSummary {
847            summary: true,
848            dir: args.dir.to_string_lossy().into_owned(),
849            pattern: args.pattern.clone(),
850            recursive: args.recursive,
851            files_total: total,
852            files_succeeded: 0,
853            files_failed: 0,
854            files_skipped: 0,
855            elapsed_ms: started.elapsed().as_millis() as u64,
856        })?;
857        return Ok(());
858    }
859
860    // Determine rayon thread pool size, honoring --low-memory and the
861    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
862    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
863
864    let pool = rayon::ThreadPoolBuilder::new()
865        .num_threads(parallelism)
866        .build()
867        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
868
869    if args.enable_ner && args.skip_extraction {
870        tracing::warn!(
871            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
872        );
873    }
874    if args.skip_extraction && !args.enable_ner {
875        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
876    }
877    let enable_ner = args.enable_ner;
878    let max_rss_mb = args.max_rss_mb;
879    let gliner_variant: crate::extraction::GlinerVariant =
880        args.gliner_variant.parse().unwrap_or_else(|e| {
881            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
882            crate::extraction::GlinerVariant::Fp32
883        });
884
885    let total_to_process = process_items.len();
886    tracing::info!(
887        target = "ingest",
888        phase = "pipeline_start",
889        files = total_to_process,
890        ingest_parallelism = parallelism,
891        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
892    );
893
894    // Bounded channel: producer never gets more than parallelism*2 items ahead of
895    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
896    // Each message carries the slot index so Phase B can look up SlotMeta in order.
897    let channel_bound = (parallelism * 2).max(1);
898    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
899
900    // Phase A: launched in a dedicated OS thread so the main thread can consume
901    // the channel concurrently. pool.install() blocks the calling thread until
902    // all rayon workers finish — if called on the main thread it would
903    // reintroduce the 2-phase blocking behaviour we are eliminating.
904    let paths_owned = paths.clone();
905    let producer_handle = std::thread::spawn(move || {
906        pool.install(|| {
907            process_items.into_par_iter().for_each(|item| {
908                let t0 = std::time::Instant::now();
909                let result = stage_file(
910                    item.idx,
911                    &item.path,
912                    &item.derived_name,
913                    &paths_owned,
914                    enable_ner,
915                    gliner_variant,
916                    max_rss_mb,
917                );
918                let elapsed_ms = t0.elapsed().as_millis() as u64;
919
920                // Emit NDJSON progress event to stderr so the user sees work
921                // happening during long NER runs (e.g. 50 files × 27s each).
922                let (n_entities, n_relationships) = match &result {
923                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
924                    Err(_) => (0, 0),
925                };
926                let progress = StageProgressEvent {
927                    schema_version: 1,
928                    event: "file_extracted",
929                    path: &item.file_str,
930                    ms: elapsed_ms,
931                    entities: n_entities,
932                    relationships: n_relationships,
933                };
934                if let Ok(line) = serde_json::to_string(&progress) {
935                    eprintln!("{line}");
936                }
937
938                // Blocking send applies backpressure: if Phase B is slower,
939                // Phase A workers wait here instead of accumulating staged files
940                // in memory. If the receiver is dropped (fail_fast abort), ignore.
941                let _ = tx.send((item.idx, result));
942            });
943            // Explicit drop of tx signals Phase B (rx iteration) to stop.
944            drop(tx);
945        });
946    });
947
948    // Phase B: main thread persists files as results arrive from the channel.
949    // Results arrive in completion order (par_iter is unordered). We persist
950    // each file immediately on arrival — this is the key fix for B1: with the
951    // old 2-phase design the first DB write happened only after ALL files had
952    // finished Phase A. Now the first commit happens as soon as the first file
953    // completes Phase A, regardless of how many files remain.
954    //
955    // NDJSON output order follows completion order (not file-system sort order).
956    // Skip slots are emitted at the end, after all Process results are consumed.
957    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
958    // requirement than ensuring data is persisted before the user's timeout fires.
959    let fail_fast = args.fail_fast;
960
961    // Emit pending Skip events first so agents see them early.
962    for meta in &slots_meta {
963        if let SlotMeta::Skip {
964            file_str,
965            derived_base,
966            name_truncated,
967            original_name,
968            original_filename,
969            reason,
970        } = meta
971        {
972            output::emit_json_compact(&IngestFileEvent {
973                file: file_str,
974                name: derived_base,
975                status: "skipped",
976                truncated: *name_truncated,
977                original_name: original_name.clone(),
978                original_filename: original_filename.as_deref(),
979                error: Some(reason.clone()),
980                memory_id: None,
981                action: None,
982            })?;
983            skipped += 1;
984        }
985    }
986
987    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
988    // as channel messages arrive in completion order.
989    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
990        .iter()
991        .enumerate()
992        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
993        .collect();
994
995    tracing::info!(
996        target = "ingest",
997        phase = "persist_start",
998        files = total_to_process,
999        "phase B starting: persisting files incrementally as Phase A completes each one",
1000    );
1001
1002    // Drain channel and persist each file immediately — no accumulation into a
1003    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1004    // Phase B without applying backpressure.
1005    for (idx, stage_result) in rx {
1006        let meta = meta_index.get(&idx).ok_or_else(|| {
1007            AppError::Internal(anyhow::anyhow!(
1008                "channel idx {idx} has no corresponding Process slot"
1009            ))
1010        })?;
1011        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1012        {
1013            SlotMeta::Process {
1014                file_str,
1015                derived_name,
1016                name_truncated,
1017                original_name,
1018                original_filename,
1019            } => (
1020                file_str,
1021                derived_name,
1022                name_truncated,
1023                original_name,
1024                original_filename,
1025            ),
1026            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1027        };
1028
1029        // If storage init failed, every file fails with the same error.
1030        let conn = match conn_or_err.as_mut() {
1031            Ok(c) => c,
1032            Err(err_msg) => {
1033                let err_clone = err_msg.clone();
1034                output::emit_json_compact(&IngestFileEvent {
1035                    file: file_str,
1036                    name: derived_name,
1037                    status: "failed",
1038                    truncated: *name_truncated,
1039                    original_name: original_name.clone(),
1040                    original_filename: original_filename.as_deref(),
1041                    error: Some(err_clone.clone()),
1042                    memory_id: None,
1043                    action: None,
1044                })?;
1045                failed += 1;
1046                if fail_fast {
1047                    output::emit_json_compact(&IngestSummary {
1048                        summary: true,
1049                        dir: args.dir.display().to_string(),
1050                        pattern: args.pattern.clone(),
1051                        recursive: args.recursive,
1052                        files_total: total,
1053                        files_succeeded: succeeded,
1054                        files_failed: failed,
1055                        files_skipped: skipped,
1056                        elapsed_ms: started.elapsed().as_millis() as u64,
1057                    })?;
1058                    return Err(AppError::Validation(format!(
1059                        "ingest aborted on first failure: {err_clone}"
1060                    )));
1061                }
1062                continue;
1063            }
1064        };
1065
1066        let outcome =
1067            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1068
1069        match outcome {
1070            Ok(FileSuccess { memory_id, action }) => {
1071                output::emit_json_compact(&IngestFileEvent {
1072                    file: file_str,
1073                    name: derived_name,
1074                    status: "indexed",
1075                    truncated: *name_truncated,
1076                    original_name: original_name.clone(),
1077                    original_filename: original_filename.as_deref(),
1078                    error: None,
1079                    memory_id: Some(memory_id),
1080                    action: Some(action),
1081                })?;
1082                succeeded += 1;
1083            }
1084            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1085                output::emit_json_compact(&IngestFileEvent {
1086                    file: file_str,
1087                    name: derived_name,
1088                    status: "skipped",
1089                    truncated: *name_truncated,
1090                    original_name: original_name.clone(),
1091                    original_filename: original_filename.as_deref(),
1092                    error: Some(format!("{e}")),
1093                    memory_id: None,
1094                    action: Some("duplicate".to_string()),
1095                })?;
1096                skipped += 1;
1097            }
1098            Err(e) => {
1099                let err_msg = format!("{e}");
1100                output::emit_json_compact(&IngestFileEvent {
1101                    file: file_str,
1102                    name: derived_name,
1103                    status: "failed",
1104                    truncated: *name_truncated,
1105                    original_name: original_name.clone(),
1106                    original_filename: original_filename.as_deref(),
1107                    error: Some(err_msg.clone()),
1108                    memory_id: None,
1109                    action: None,
1110                })?;
1111                failed += 1;
1112                if fail_fast {
1113                    output::emit_json_compact(&IngestSummary {
1114                        summary: true,
1115                        dir: args.dir.display().to_string(),
1116                        pattern: args.pattern.clone(),
1117                        recursive: args.recursive,
1118                        files_total: total,
1119                        files_succeeded: succeeded,
1120                        files_failed: failed,
1121                        files_skipped: skipped,
1122                        elapsed_ms: started.elapsed().as_millis() as u64,
1123                    })?;
1124                    return Err(AppError::Validation(format!(
1125                        "ingest aborted on first failure: {err_msg}"
1126                    )));
1127                }
1128            }
1129        }
1130    }
1131
1132    // Wait for the producer thread to finish cleanly.
1133    producer_handle
1134        .join()
1135        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1136
1137    output::emit_json_compact(&IngestSummary {
1138        summary: true,
1139        dir: args.dir.display().to_string(),
1140        pattern: args.pattern.clone(),
1141        recursive: args.recursive,
1142        files_total: total,
1143        files_succeeded: succeeded,
1144        files_failed: failed,
1145        files_skipped: skipped,
1146        elapsed_ms: started.elapsed().as_millis() as u64,
1147    })?;
1148
1149    Ok(())
1150}
1151
1152/// Auto-initialises the database (matches the contract of every other CRUD
1153/// handler) and returns a fresh read/write connection ready for the ingest
1154/// loop. Errors here are recoverable per-file: the caller surfaces them as
1155/// failure events so `--fail-fast` and the continue-on-error path keep
1156/// working when, for example, the user points `--db` at an unwritable path.
1157fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1158    ensure_db_ready(paths)?;
1159    let conn = open_rw(&paths.db)?;
1160    Ok(conn)
1161}
1162
1163fn collect_files(
1164    dir: &Path,
1165    pattern: &str,
1166    recursive: bool,
1167    out: &mut Vec<PathBuf>,
1168) -> Result<(), AppError> {
1169    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1170    for entry in entries {
1171        let entry = entry.map_err(AppError::Io)?;
1172        let path = entry.path();
1173        let file_type = entry.file_type().map_err(AppError::Io)?;
1174        if file_type.is_file() {
1175            let name = entry.file_name();
1176            let name_str = name.to_string_lossy();
1177            if matches_pattern(&name_str, pattern) {
1178                out.push(path);
1179            }
1180        } else if file_type.is_dir() && recursive {
1181            collect_files(&path, pattern, recursive, out)?;
1182        }
1183    }
1184    Ok(())
1185}
1186
1187fn matches_pattern(name: &str, pattern: &str) -> bool {
1188    if let Some(suffix) = pattern.strip_prefix('*') {
1189        name.ends_with(suffix)
1190    } else if let Some(prefix) = pattern.strip_suffix('*') {
1191        name.starts_with(prefix)
1192    } else {
1193        name == pattern
1194    }
1195}
1196
1197/// Returns `(final_name, truncated, original_name)`.
1198/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
1199/// `original_name` holds the pre-truncation name only when `truncated=true`.
1200///
1201/// Non-ASCII characters are first decomposed via NFD and then stripped of
1202/// combining marks so accented letters fold to their base ASCII letter
1203/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1204/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1205/// preserves meaningful word content rather than collapsing the basename
1206/// to a few stray ASCII letters as the previous filter did.
1207fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1208    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1209    let lowered: String = stem
1210        .nfd()
1211        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1212        .map(|c| {
1213            if c == '_' || c.is_whitespace() {
1214                '-'
1215            } else {
1216                c
1217            }
1218        })
1219        .map(|c| c.to_ascii_lowercase())
1220        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1221        .collect();
1222    let collapsed = collapse_dashes(&lowered);
1223    let trimmed = collapsed.trim_matches('-').to_string();
1224    if trimmed.len() > DERIVED_NAME_MAX_LEN {
1225        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1226            .trim_matches('-')
1227            .to_string();
1228        tracing::debug!(
1229            target: "ingest",
1230            original = %trimmed,
1231            truncated_to = %truncated,
1232            max_len = DERIVED_NAME_MAX_LEN,
1233            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1234        );
1235        (truncated, true, Some(trimmed))
1236    } else {
1237        (trimmed, false, None)
1238    }
1239}
1240
1241/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1242/// numeric suffix (`-1`, `-2`, …) when needed.
1243///
1244/// `taken` is the set of names already consumed in the current ingest run.
1245/// The caller is expected to insert the returned name into `taken` so the
1246/// next call observes the consumption. Cross-run collisions are intentionally
1247/// surfaced by the per-file persistence path as duplicates so re-ingestion
1248/// of identical corpora stays idempotent.
1249///
1250/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1251/// candidates collide, signalling a pathological corpus that should be
1252/// renamed manually.
1253fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1254    if !taken.contains(base) {
1255        return Ok(base.to_string());
1256    }
1257    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1258        let candidate = format!("{base}-{suffix}");
1259        if !taken.contains(&candidate) {
1260            tracing::warn!(
1261                target: "ingest",
1262                base = %base,
1263                resolved = %candidate,
1264                suffix,
1265                "memory name collision resolved with numeric suffix"
1266            );
1267            return Ok(candidate);
1268        }
1269    }
1270    Err(AppError::Validation(format!(
1271        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1272    )))
1273}
1274
1275fn collapse_dashes(s: &str) -> String {
1276    let mut out = String::with_capacity(s.len());
1277    let mut prev_dash = false;
1278    for c in s.chars() {
1279        if c == '-' {
1280            if !prev_dash {
1281                out.push('-');
1282            }
1283            prev_dash = true;
1284        } else {
1285            out.push(c);
1286            prev_dash = false;
1287        }
1288    }
1289    out
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294    use super::*;
1295    use std::path::PathBuf;
1296
1297    #[test]
1298    fn matches_pattern_suffix() {
1299        assert!(matches_pattern("foo.md", "*.md"));
1300        assert!(!matches_pattern("foo.txt", "*.md"));
1301        assert!(matches_pattern("foo.md", "*"));
1302    }
1303
1304    #[test]
1305    fn matches_pattern_prefix() {
1306        assert!(matches_pattern("README.md", "README*"));
1307        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1308    }
1309
1310    #[test]
1311    fn matches_pattern_exact() {
1312        assert!(matches_pattern("README.md", "README.md"));
1313        assert!(!matches_pattern("readme.md", "README.md"));
1314    }
1315
1316    #[test]
1317    fn derive_kebab_underscore_to_dash() {
1318        let p = PathBuf::from("/tmp/claude_code_headless.md");
1319        let (name, truncated, original) = derive_kebab_name(&p);
1320        assert_eq!(name, "claude-code-headless");
1321        assert!(!truncated);
1322        assert!(original.is_none());
1323    }
1324
1325    #[test]
1326    fn derive_kebab_uppercase_lowered() {
1327        let p = PathBuf::from("/tmp/README.md");
1328        let (name, truncated, original) = derive_kebab_name(&p);
1329        assert_eq!(name, "readme");
1330        assert!(!truncated);
1331        assert!(original.is_none());
1332    }
1333
1334    #[test]
1335    fn derive_kebab_strips_non_kebab_chars() {
1336        let p = PathBuf::from("/tmp/some@weird#name!.md");
1337        let (name, truncated, original) = derive_kebab_name(&p);
1338        assert_eq!(name, "someweirdname");
1339        assert!(!truncated);
1340        assert!(original.is_none());
1341    }
1342
1343    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1344    // accented characters instead of dropping them entirely.
1345    #[test]
1346    fn derive_kebab_folds_accented_letters_to_ascii() {
1347        let p = PathBuf::from("/tmp/açaí.md");
1348        let (name, _, _) = derive_kebab_name(&p);
1349        assert_eq!(name, "acai", "got '{name}'");
1350    }
1351
1352    #[test]
1353    fn derive_kebab_handles_naive_with_diaeresis() {
1354        let p = PathBuf::from("/tmp/naïve-test.md");
1355        let (name, _, _) = derive_kebab_name(&p);
1356        assert_eq!(name, "naive-test", "got '{name}'");
1357    }
1358
1359    #[test]
1360    fn derive_kebab_drops_emoji_keeps_word() {
1361        let p = PathBuf::from("/tmp/🚀-rocket.md");
1362        let (name, _, _) = derive_kebab_name(&p);
1363        assert_eq!(name, "rocket", "got '{name}'");
1364    }
1365
1366    #[test]
1367    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1368        let p = PathBuf::from("/tmp/açaí🦜.md");
1369        let (name, _, _) = derive_kebab_name(&p);
1370        assert_eq!(name, "acai", "got '{name}'");
1371    }
1372
1373    #[test]
1374    fn derive_kebab_pure_emoji_yields_empty() {
1375        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1376        let (name, _, _) = derive_kebab_name(&p);
1377        assert!(name.is_empty(), "got '{name}'");
1378    }
1379
1380    #[test]
1381    fn derive_kebab_collapses_consecutive_dashes() {
1382        let p = PathBuf::from("/tmp/a__b___c.md");
1383        let (name, truncated, original) = derive_kebab_name(&p);
1384        assert_eq!(name, "a-b-c");
1385        assert!(!truncated);
1386        assert!(original.is_none());
1387    }
1388
1389    #[test]
1390    fn derive_kebab_truncates_to_60_chars() {
1391        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1392        let (name, truncated, original) = derive_kebab_name(&p);
1393        assert!(name.len() <= 60, "got len {}", name.len());
1394        assert!(truncated);
1395        assert!(original.is_some());
1396        assert!(original.unwrap().len() > 60);
1397    }
1398
1399    #[test]
1400    fn collect_files_finds_md_files() {
1401        let tmp = tempfile::tempdir().expect("tempdir");
1402        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1403        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1404        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1405        let mut out = Vec::new();
1406        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1407        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1408    }
1409
1410    #[test]
1411    fn collect_files_recursive_descends_subdirs() {
1412        let tmp = tempfile::tempdir().expect("tempdir");
1413        let sub = tmp.path().join("sub");
1414        std::fs::create_dir(&sub).unwrap();
1415        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1416        std::fs::write(sub.join("b.md"), "y").unwrap();
1417        let mut out = Vec::new();
1418        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1419        assert_eq!(out.len(), 2);
1420    }
1421
1422    #[test]
1423    fn collect_files_non_recursive_skips_subdirs() {
1424        let tmp = tempfile::tempdir().expect("tempdir");
1425        let sub = tmp.path().join("sub");
1426        std::fs::create_dir(&sub).unwrap();
1427        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1428        std::fs::write(sub.join("b.md"), "y").unwrap();
1429        let mut out = Vec::new();
1430        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1431        assert_eq!(out.len(), 1);
1432    }
1433
1434    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1435
1436    #[test]
1437    fn derive_kebab_long_basename_truncated_within_cap() {
1438        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1439        let (name, truncated, original) = derive_kebab_name(&p);
1440        assert!(
1441            name.len() <= DERIVED_NAME_MAX_LEN,
1442            "truncated name must respect cap; got {} chars",
1443            name.len()
1444        );
1445        assert!(!name.is_empty());
1446        assert!(truncated);
1447        assert!(original.is_some());
1448    }
1449
1450    #[test]
1451    fn unique_name_returns_base_when_free() {
1452        let taken: BTreeSet<String> = BTreeSet::new();
1453        let resolved = unique_name("note", &taken).expect("must resolve");
1454        assert_eq!(resolved, "note");
1455    }
1456
1457    #[test]
1458    fn unique_name_appends_first_free_suffix_on_collision() {
1459        let mut taken: BTreeSet<String> = BTreeSet::new();
1460        taken.insert("note".to_string());
1461        taken.insert("note-1".to_string());
1462        let resolved = unique_name("note", &taken).expect("must resolve");
1463        assert_eq!(resolved, "note-2");
1464    }
1465
1466    #[test]
1467    fn unique_name_errors_after_collision_cap() {
1468        let mut taken: BTreeSet<String> = BTreeSet::new();
1469        taken.insert("note".to_string());
1470        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1471            taken.insert(format!("note-{i}"));
1472        }
1473        let err = unique_name("note", &taken).expect_err("must surface error");
1474        assert!(matches!(err, AppError::Validation(_)));
1475    }
1476
1477    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1478
1479    #[test]
1480    fn validate_relation_format_accepts_valid_relations() {
1481        use crate::parsers::{is_canonical_relation, validate_relation_format};
1482        assert!(validate_relation_format("applies_to").is_ok());
1483        assert!(validate_relation_format("depends_on").is_ok());
1484        assert!(validate_relation_format("implements").is_ok());
1485        assert!(validate_relation_format("").is_err());
1486        assert!(is_canonical_relation("applies_to"));
1487        assert!(!is_canonical_relation("implements"));
1488    }
1489
1490    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1491
1492    use serial_test::serial;
1493
1494    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1495    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1496        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1497        let prev = std::env::var(key).ok();
1498        match value {
1499            Some(v) => std::env::set_var(key, v),
1500            None => std::env::remove_var(key),
1501        }
1502        f();
1503        match prev {
1504            Some(p) => std::env::set_var(key, p),
1505            None => std::env::remove_var(key),
1506        }
1507    }
1508
1509    #[test]
1510    #[serial]
1511    fn env_low_memory_enabled_unset_returns_false() {
1512        with_env_var(None, || assert!(!env_low_memory_enabled()));
1513    }
1514
1515    #[test]
1516    #[serial]
1517    fn env_low_memory_enabled_empty_returns_false() {
1518        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1519    }
1520
1521    #[test]
1522    #[serial]
1523    fn env_low_memory_enabled_truthy_values_return_true() {
1524        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1525            with_env_var(Some(v), || {
1526                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1527            });
1528        }
1529    }
1530
1531    #[test]
1532    #[serial]
1533    fn env_low_memory_enabled_falsy_values_return_false() {
1534        for v in ["0", "false", "FALSE", "no", "off"] {
1535            with_env_var(Some(v), || {
1536                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1537            });
1538        }
1539    }
1540
1541    #[test]
1542    #[serial]
1543    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1544        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1545    }
1546
1547    #[test]
1548    #[serial]
1549    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1550        with_env_var(None, || {
1551            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1552            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1553            assert_eq!(resolve_parallelism(true, None), 1);
1554        });
1555    }
1556
1557    #[test]
1558    #[serial]
1559    fn resolve_parallelism_env_forces_one_when_flag_off() {
1560        with_env_var(Some("1"), || {
1561            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1562            assert_eq!(resolve_parallelism(false, None), 1);
1563        });
1564    }
1565
1566    #[test]
1567    #[serial]
1568    fn resolve_parallelism_falsy_env_does_not_override() {
1569        with_env_var(Some("0"), || {
1570            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1571        });
1572    }
1573
1574    #[test]
1575    #[serial]
1576    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1577        with_env_var(None, || {
1578            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1579            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1580        });
1581    }
1582
1583    #[test]
1584    #[serial]
1585    fn resolve_parallelism_default_when_unset() {
1586        with_env_var(None, || {
1587            let p = resolve_parallelism(false, None);
1588            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1589        });
1590    }
1591
1592    #[test]
1593    fn ingest_args_parses_low_memory_flag_via_clap() {
1594        use clap::Parser;
1595        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1596        // on the public `Cli` definition so the flag is wired end-to-end.
1597        let cli = crate::cli::Cli::try_parse_from([
1598            "sqlite-graphrag",
1599            "ingest",
1600            "/tmp/dummy",
1601            "--type",
1602            "document",
1603            "--low-memory",
1604        ])
1605        .expect("parse must succeed");
1606        match cli.command {
1607            crate::cli::Commands::Ingest(args) => {
1608                assert!(args.low_memory, "--low-memory must set field to true");
1609            }
1610            _ => panic!("expected Ingest subcommand"),
1611        }
1612    }
1613
1614    #[test]
1615    fn ingest_args_low_memory_defaults_false() {
1616        use clap::Parser;
1617        let cli = crate::cli::Cli::try_parse_from([
1618            "sqlite-graphrag",
1619            "ingest",
1620            "/tmp/dummy",
1621            "--type",
1622            "document",
1623        ])
1624        .expect("parse must succeed");
1625        match cli.command {
1626            crate::cli::Commands::Ingest(args) => {
1627                assert!(!args.low_memory, "default must be false");
1628            }
1629            _ => panic!("expected Ingest subcommand"),
1630        }
1631    }
1632}