Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70NOTES:\n  \
71    Each file becomes a separate memory. Names derive from file basenames\n  \
72    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
73    followed by a final summary line with counts. Per-file errors are reported\n  \
74    inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76    /// Directory containing files to ingest.
77    #[arg(
78        value_name = "DIR",
79        help = "Directory to ingest recursively (each matching file becomes a memory)"
80    )]
81    pub dir: PathBuf,
82
83    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
84    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85    pub r#type: MemoryType,
86
87    /// Glob pattern matched against file basenames (default: `*.md`). Supports
88    /// `*.<ext>`, `<prefix>*`, and exact filename match.
89    #[arg(long, default_value = "*.md")]
90    pub pattern: String,
91
92    /// Recurse into subdirectories.
93    #[arg(long, default_value_t = false)]
94    pub recursive: bool,
95
96    #[arg(
97        long,
98        env = "SQLITE_GRAPHRAG_ENABLE_NER",
99        value_parser = crate::parsers::parse_bool_flexible,
100        action = clap::ArgAction::Set,
101        num_args = 0..=1,
102        default_missing_value = "true",
103        default_value = "false",
104        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105    )]
106    pub enable_ner: bool,
107    #[arg(
108        long,
109        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110        default_value = "fp32",
111        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112    )]
113    pub gliner_variant: String,
114
115    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
116    #[arg(long, default_value_t = false, hide = true)]
117    pub skip_extraction: bool,
118
119    /// Stop on first per-file error instead of continuing with the next file.
120    #[arg(long, default_value_t = false)]
121    pub fail_fast: bool,
122
123    /// Preview file-to-name mapping without loading model or persisting.
124    #[arg(long, default_value_t = false)]
125    pub dry_run: bool,
126
127    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
128    #[arg(long, default_value_t = 10_000)]
129    pub max_files: usize,
130
131    /// Namespace for the ingested memories.
132    #[arg(long)]
133    pub namespace: Option<String>,
134
135    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
136    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137    pub db: Option<String>,
138
139    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140    pub format: JsonOutputFormat,
141
142    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143    pub json: bool,
144
145    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
146    #[arg(
147        long,
148        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149    )]
150    pub ingest_parallelism: Option<usize>,
151
152    /// Force single-threaded ingest to reduce RSS pressure.
153    ///
154    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
155    /// explicit value. Recommended for environments with <4 GB available
156    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
157    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
158    /// (CLI flag has higher precedence than the env var).
159    #[arg(
160        long,
161        default_value_t = false,
162        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163                Recommended for environments with <4 GB available RAM or container/cgroup \
164                constraints. Trade-off: 3-4x longer wall time. Also honored via \
165                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166    )]
167    pub low_memory: bool,
168
169    /// Maximum process RSS in MiB; abort if exceeded during embedding.
170    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172    pub max_rss_mb: u64,
173
174    /// Maximum character length for derived memory names from file basenames.
175    ///
176    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
177    /// Shorter values leave more headroom for collision suffix resolution.
178    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
179          help = "Maximum length for derived memory names (default: 60)")]
180    pub max_name_length: usize,
181}
182
183/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
184/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
185/// values evaluate to false. Unrecognized non-empty values emit a
186/// `tracing::warn!` and evaluate to false.
187fn env_low_memory_enabled() -> bool {
188    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
189        Ok(v) if v.is_empty() => false,
190        Ok(v) => match v.to_lowercase().as_str() {
191            "1" | "true" | "yes" | "on" => true,
192            "0" | "false" | "no" | "off" => false,
193            other => {
194                tracing::warn!(
195                    target: "ingest",
196                    value = %other,
197                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
198                );
199                false
200            }
201        },
202        Err(_) => false,
203    }
204}
205
206/// Resolves the effective ingest parallelism honoring `--low-memory` and the
207/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
208///
209/// Precedence:
210/// 1. `--low-memory` CLI flag forces parallelism = 1.
211/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
212/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
213/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
214///
215/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
216/// emits a `tracing::warn!` advertising the override.
217fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
218    let env_flag = env_low_memory_enabled();
219    let low_memory = low_memory_flag || env_flag;
220
221    if low_memory {
222        if let Some(n) = ingest_parallelism {
223            if n > 1 {
224                tracing::warn!(
225                    target: "ingest",
226                    requested = n,
227                    "--ingest-parallelism overridden by --low-memory; using 1"
228                );
229            }
230        }
231        if low_memory_flag {
232            tracing::info!(
233                target: "ingest",
234                source = "flag",
235                "low-memory mode enabled: forcing --ingest-parallelism 1"
236            );
237        } else {
238            tracing::info!(
239                target: "ingest",
240                source = "env",
241                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
242            );
243        }
244        return 1;
245    }
246
247    ingest_parallelism
248        .unwrap_or_else(|| {
249            std::thread::available_parallelism()
250                .map(|v| v.get() / 2)
251                .unwrap_or(1)
252                .clamp(1, 4)
253        })
254        .max(1)
255}
256
257#[derive(Serialize)]
258struct IngestFileEvent<'a> {
259    file: &'a str,
260    name: &'a str,
261    status: &'a str,
262    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
263    truncated: bool,
264    /// Original derived name before truncation; only present when `truncated=true`.
265    #[serde(skip_serializing_if = "Option::is_none")]
266    original_name: Option<String>,
267    /// Original file basename (without extension); only present when it differs from `name`.
268    #[serde(skip_serializing_if = "Option::is_none")]
269    original_filename: Option<&'a str>,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    error: Option<String>,
272    #[serde(skip_serializing_if = "Option::is_none")]
273    memory_id: Option<i64>,
274    #[serde(skip_serializing_if = "Option::is_none")]
275    action: Option<String>,
276    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
277    body_length: usize,
278}
279
280#[derive(Serialize)]
281struct IngestSummary {
282    summary: bool,
283    dir: String,
284    pattern: String,
285    recursive: bool,
286    files_total: usize,
287    files_succeeded: usize,
288    files_failed: usize,
289    files_skipped: usize,
290    elapsed_ms: u64,
291}
292
293/// Outcome of a successful per-file ingest, used to build the NDJSON event.
294struct FileSuccess {
295    memory_id: i64,
296    action: String,
297    body_length: usize,
298}
299
300/// NDJSON progress event emitted to stderr after each file completes Phase A.
301/// Schema version 1; consumers should check `schema_version` before parsing.
302#[derive(Serialize)]
303struct StageProgressEvent<'a> {
304    schema_version: u8,
305    event: &'a str,
306    path: &'a str,
307    ms: u64,
308    entities: usize,
309    relationships: usize,
310}
311
312/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
313/// Phase B persists these to SQLite on the main thread in submission order.
314struct StagedFile {
315    body: String,
316    body_hash: String,
317    snippet: String,
318    name: String,
319    description: String,
320    embedding: Vec<f32>,
321    chunk_embeddings: Option<Vec<Vec<f32>>>,
322    chunks_info: Vec<crate::chunking::Chunk>,
323    entities: Vec<NewEntity>,
324    relationships: Vec<NewRelationship>,
325    entity_embeddings: Vec<Vec<f32>>,
326    urls: Vec<crate::extraction::ExtractedUrl>,
327}
328
329/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
330/// Never touches the database — safe to run on any rayon thread.
331fn stage_file(
332    _idx: usize,
333    path: &Path,
334    name: &str,
335    paths: &AppPaths,
336    enable_ner: bool,
337    gliner_variant: crate::extraction::GlinerVariant,
338    max_rss_mb: u64,
339) -> Result<StagedFile, AppError> {
340    use crate::constants::*;
341
342    if name.len() > MAX_MEMORY_NAME_LEN {
343        return Err(AppError::LimitExceeded(
344            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
345        ));
346    }
347    if name.starts_with("__") {
348        return Err(AppError::Validation(
349            crate::i18n::validation::reserved_name(),
350        ));
351    }
352    {
353        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
354            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
355        if !slug_re.is_match(name) {
356            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
357                name,
358            )));
359        }
360    }
361
362    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
363    if raw_body.len() > MAX_MEMORY_BODY_LEN {
364        return Err(AppError::LimitExceeded(
365            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
366        ));
367    }
368    if raw_body.trim().is_empty() {
369        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
370    }
371
372    let description = format!("ingested from {}", path.display());
373    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
374        return Err(AppError::Validation(
375            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
376        ));
377    }
378
379    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
380    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
381    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
382    if enable_ner {
383        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
384            Ok(extracted) => {
385                extracted_urls = extracted.urls;
386                extracted_entities = extracted.entities;
387                extracted_relationships = extracted.relationships;
388
389                if extracted_entities.len() > max_entities_per_memory() {
390                    extracted_entities.truncate(max_entities_per_memory());
391                }
392                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
393                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
394                }
395            }
396            Err(e) => {
397                tracing::warn!(
398                    file = %path.display(),
399                    "auto-extraction failed (graceful degradation): {e:#}"
400                );
401            }
402        }
403    }
404
405    for rel in &mut extracted_relationships {
406        rel.relation = crate::parsers::normalize_relation(&rel.relation);
407        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
408            return Err(AppError::Validation(format!(
409                "{e} for relationship '{}' -> '{}'",
410                rel.source, rel.target
411            )));
412        }
413        crate::parsers::warn_if_non_canonical(&rel.relation);
414        if !(0.0..=1.0).contains(&rel.strength) {
415            return Err(AppError::Validation(format!(
416                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
417                rel.strength, rel.source, rel.target
418            )));
419        }
420    }
421
422    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
423    let snippet: String = raw_body.chars().take(200).collect();
424
425    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
426    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
427    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
428        return Err(AppError::LimitExceeded(format!(
429            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
430            chunks_info.len(),
431            REMEMBER_MAX_SAFE_MULTI_CHUNKS
432        )));
433    }
434
435    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
436    let embedding = if chunks_info.len() == 1 {
437        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
438    } else {
439        let chunk_texts: Vec<&str> = chunks_info
440            .iter()
441            .map(|c| chunking::chunk_text(&raw_body, c))
442            .collect();
443        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
444        for chunk_text in &chunk_texts {
445            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
446                if rss > max_rss_mb {
447                    tracing::error!(
448                        rss_mb = rss,
449                        max_rss_mb = max_rss_mb,
450                        file = %path.display(),
451                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
452                    );
453                    return Err(AppError::LowMemory {
454                        available_mb: crate::memory_guard::available_memory_mb(),
455                        required_mb: max_rss_mb,
456                    });
457                }
458            }
459            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
460                &paths.models,
461                chunk_text,
462            )?);
463        }
464        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
465        chunk_embeddings_opt = Some(chunk_embeddings);
466        aggregated
467    };
468
469    let entity_embeddings = extracted_entities
470        .iter()
471        .map(|entity| {
472            let entity_text = match &entity.description {
473                Some(desc) => format!("{} {}", entity.name, desc),
474                None => entity.name.clone(),
475            };
476            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
477        })
478        .collect::<Result<Vec<_>, _>>()?;
479
480    Ok(StagedFile {
481        body: raw_body,
482        body_hash,
483        snippet,
484        name: name.to_string(),
485        description,
486        embedding,
487        chunk_embeddings: chunk_embeddings_opt,
488        chunks_info,
489        entities: extracted_entities,
490        relationships: extracted_relationships,
491        entity_embeddings,
492        urls: extracted_urls,
493    })
494}
495
496/// Phase B: persists one `StagedFile` to the database on the main thread.
497fn persist_staged(
498    conn: &mut Connection,
499    namespace: &str,
500    memory_type: &str,
501    staged: StagedFile,
502) -> Result<FileSuccess, AppError> {
503    {
504        let active_count: u32 = conn.query_row(
505            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
506            [],
507            |r| r.get::<_, i64>(0).map(|v| v as u32),
508        )?;
509        let ns_exists: bool = conn.query_row(
510            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
511            rusqlite::params![namespace],
512            |r| r.get::<_, i64>(0).map(|v| v > 0),
513        )?;
514        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
515            return Err(AppError::NamespaceError(format!(
516                "active namespace limit of {} exceeded while creating '{namespace}'",
517                crate::constants::MAX_NAMESPACES_ACTIVE
518            )));
519        }
520    }
521
522    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
523    if existing_memory.is_some() {
524        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
525            &staged.name,
526            namespace,
527        )));
528    }
529    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
530
531    let new_memory = NewMemory {
532        namespace: namespace.to_string(),
533        name: staged.name.clone(),
534        memory_type: memory_type.to_string(),
535        description: staged.description.clone(),
536        body: staged.body,
537        body_hash: staged.body_hash,
538        session_id: None,
539        source: "agent".to_string(),
540        metadata: serde_json::json!({}),
541    };
542
543    if let Some(hash_id) = duplicate_hash_id {
544        tracing::debug!(
545            target: "ingest",
546            duplicate_memory_id = hash_id,
547            "identical body already exists; persisting a new memory anyway"
548        );
549    }
550
551    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
552
553    let memory_id = memories::insert(&tx, &new_memory)?;
554    versions::insert_version(
555        &tx,
556        memory_id,
557        1,
558        &staged.name,
559        memory_type,
560        &staged.description,
561        &new_memory.body,
562        &serde_json::to_string(&new_memory.metadata)?,
563        None,
564        "create",
565    )?;
566    memories::upsert_vec(
567        &tx,
568        memory_id,
569        namespace,
570        memory_type,
571        &staged.embedding,
572        &staged.name,
573        &staged.snippet,
574    )?;
575
576    if staged.chunks_info.len() > 1 {
577        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
578        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
579            AppError::Internal(anyhow::anyhow!(
580                "missing chunk embeddings cache on multi-chunk ingest path"
581            ))
582        })?;
583        for (i, emb) in chunk_embeddings.iter().enumerate() {
584            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
585        }
586    }
587
588    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
589        for (idx, entity) in staged.entities.iter().enumerate() {
590            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
591            let entity_embedding = &staged.entity_embeddings[idx];
592            entities::upsert_entity_vec(
593                &tx,
594                entity_id,
595                namespace,
596                entity.entity_type,
597                entity_embedding,
598                &entity.name,
599            )?;
600            entities::link_memory_entity(&tx, memory_id, entity_id)?;
601            entities::increment_degree(&tx, entity_id)?;
602        }
603        let entity_types: std::collections::HashMap<&str, EntityType> = staged
604            .entities
605            .iter()
606            .map(|entity| (entity.name.as_str(), entity.entity_type))
607            .collect();
608        for rel in &staged.relationships {
609            let source_entity = NewEntity {
610                name: rel.source.clone(),
611                entity_type: entity_types
612                    .get(rel.source.as_str())
613                    .copied()
614                    .unwrap_or(EntityType::Concept),
615                description: None,
616            };
617            let target_entity = NewEntity {
618                name: rel.target.clone(),
619                entity_type: entity_types
620                    .get(rel.target.as_str())
621                    .copied()
622                    .unwrap_or(EntityType::Concept),
623                description: None,
624            };
625            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
626            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
627            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
628            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
629        }
630    }
631
632    tx.commit()?;
633
634    if !staged.urls.is_empty() {
635        let url_entries: Vec<storage_urls::MemoryUrl> = staged
636            .urls
637            .into_iter()
638            .map(|u| storage_urls::MemoryUrl {
639                url: u.url,
640                offset: Some(u.offset as i64),
641            })
642            .collect();
643        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
644    }
645
646    Ok(FileSuccess {
647        memory_id,
648        action: "created".to_string(),
649        body_length: new_memory.body.len(),
650    })
651}
652
653pub fn run(args: IngestArgs) -> Result<(), AppError> {
654    let started = std::time::Instant::now();
655
656    if !args.dir.exists() {
657        return Err(AppError::Validation(format!(
658            "directory not found: {}",
659            args.dir.display()
660        )));
661    }
662    if !args.dir.is_dir() {
663        return Err(AppError::Validation(format!(
664            "path is not a directory: {}",
665            args.dir.display()
666        )));
667    }
668
669    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
670    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
671    files.sort();
672
673    if files.len() > args.max_files {
674        return Err(AppError::Validation(format!(
675            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
676            files.len(),
677            args.max_files
678        )));
679    }
680
681    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
682    let memory_type_str = args.r#type.as_str().to_string();
683
684    let paths = AppPaths::resolve(args.db.as_deref())?;
685    let mut conn_or_err = match init_storage(&paths) {
686        Ok(c) => Ok(c),
687        Err(e) => Err(format!("{e}")),
688    };
689
690    let mut succeeded: usize = 0;
691    let mut failed: usize = 0;
692    let mut skipped: usize = 0;
693    let total = files.len();
694
695    // Pre-resolve all names before parallelisation so Phase A workers see a
696    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
697    let mut taken_names: BTreeSet<String> = BTreeSet::new();
698
699    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
700    // ProcessItem: the data moved into the producer thread for Phase A computation.
701    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
702    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
703    // boundary into the rayon producer.
704    enum SlotMeta {
705        Skip {
706            file_str: String,
707            derived_base: String,
708            name_truncated: bool,
709            original_name: Option<String>,
710            original_filename: Option<String>,
711            reason: String,
712        },
713        Process {
714            file_str: String,
715            derived_name: String,
716            name_truncated: bool,
717            original_name: Option<String>,
718            original_filename: Option<String>,
719        },
720    }
721
722    struct ProcessItem {
723        idx: usize,
724        path: PathBuf,
725        file_str: String,
726        derived_name: String,
727    }
728
729    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
730    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
731    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
732
733    let max_name_length = args.max_name_length;
734    for path in &files {
735        let file_str = path.to_string_lossy().into_owned();
736        let (derived_base, name_truncated, original_name) =
737            derive_kebab_name(path, max_name_length);
738        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
739
740        if name_truncated {
741            if let Some(ref orig) = original_name {
742                truncations.push((orig.clone(), derived_base.clone()));
743            }
744        }
745
746        if derived_base.is_empty() {
747            // original_filename: always include when it differs from the empty derived name
748            let orig_filename = if !original_basename.is_empty() {
749                Some(original_basename.to_string())
750            } else {
751                None
752            };
753            slots_meta.push(SlotMeta::Skip {
754                file_str,
755                derived_base: String::new(),
756                name_truncated: false,
757                original_name: None,
758                original_filename: orig_filename,
759                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
760            });
761            continue;
762        }
763
764        match unique_name(&derived_base, &taken_names) {
765            Ok(derived_name) => {
766                taken_names.insert(derived_name.clone());
767                let idx = slots_meta.len();
768                // original_filename: present only when the raw basename differs from the derived name
769                let orig_filename = if original_basename != derived_name {
770                    Some(original_basename.to_string())
771                } else {
772                    None
773                };
774                process_items.push(ProcessItem {
775                    idx,
776                    path: path.clone(),
777                    file_str: file_str.clone(),
778                    derived_name: derived_name.clone(),
779                });
780                slots_meta.push(SlotMeta::Process {
781                    file_str,
782                    derived_name,
783                    name_truncated,
784                    original_name,
785                    original_filename: orig_filename,
786                });
787            }
788            Err(e) => {
789                let orig_filename = if original_basename != derived_base {
790                    Some(original_basename.to_string())
791                } else {
792                    None
793                };
794                slots_meta.push(SlotMeta::Skip {
795                    file_str,
796                    derived_base,
797                    name_truncated,
798                    original_name,
799                    original_filename: orig_filename,
800                    reason: e.to_string(),
801                });
802            }
803        }
804    }
805
806    if !truncations.is_empty() {
807        tracing::info!(
808            target: "ingest",
809            count = truncations.len(),
810            max_name_length = max_name_length,
811            max_len = DERIVED_NAME_MAX_LEN,
812            "derived names truncated; pass -vv (debug) for per-file detail"
813        );
814    }
815
816    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
817    if args.dry_run {
818        for meta in &slots_meta {
819            match meta {
820                SlotMeta::Skip {
821                    file_str,
822                    derived_base,
823                    name_truncated,
824                    original_name,
825                    original_filename,
826                    reason,
827                } => {
828                    output::emit_json_compact(&IngestFileEvent {
829                        file: file_str,
830                        name: derived_base,
831                        status: "skip",
832                        truncated: *name_truncated,
833                        original_name: original_name.clone(),
834                        original_filename: original_filename.as_deref(),
835                        error: Some(reason.clone()),
836                        memory_id: None,
837                        action: None,
838                        body_length: 0,
839                    })?;
840                }
841                SlotMeta::Process {
842                    file_str,
843                    derived_name,
844                    name_truncated,
845                    original_name,
846                    original_filename,
847                } => {
848                    output::emit_json_compact(&IngestFileEvent {
849                        file: file_str,
850                        name: derived_name,
851                        status: "preview",
852                        truncated: *name_truncated,
853                        original_name: original_name.clone(),
854                        original_filename: original_filename.as_deref(),
855                        error: None,
856                        memory_id: None,
857                        action: None,
858                        body_length: 0,
859                    })?;
860                }
861            }
862        }
863        output::emit_json_compact(&IngestSummary {
864            summary: true,
865            dir: args.dir.to_string_lossy().into_owned(),
866            pattern: args.pattern.clone(),
867            recursive: args.recursive,
868            files_total: total,
869            files_succeeded: 0,
870            files_failed: 0,
871            files_skipped: 0,
872            elapsed_ms: started.elapsed().as_millis() as u64,
873        })?;
874        return Ok(());
875    }
876
877    // Determine rayon thread pool size, honoring --low-memory and the
878    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
879    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
880
881    let pool = rayon::ThreadPoolBuilder::new()
882        .num_threads(parallelism)
883        .build()
884        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
885
886    if args.enable_ner && args.skip_extraction {
887        tracing::warn!(
888            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
889        );
890    }
891    if args.skip_extraction && !args.enable_ner {
892        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
893    }
894    let enable_ner = args.enable_ner;
895    let max_rss_mb = args.max_rss_mb;
896    let gliner_variant: crate::extraction::GlinerVariant =
897        args.gliner_variant.parse().unwrap_or_else(|e| {
898            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
899            crate::extraction::GlinerVariant::Fp32
900        });
901
902    let total_to_process = process_items.len();
903    tracing::info!(
904        target = "ingest",
905        phase = "pipeline_start",
906        files = total_to_process,
907        ingest_parallelism = parallelism,
908        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
909    );
910
911    // Bounded channel: producer never gets more than parallelism*2 items ahead of
912    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
913    // Each message carries the slot index so Phase B can look up SlotMeta in order.
914    let channel_bound = (parallelism * 2).max(1);
915    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
916
917    // Phase A: launched in a dedicated OS thread so the main thread can consume
918    // the channel concurrently. pool.install() blocks the calling thread until
919    // all rayon workers finish — if called on the main thread it would
920    // reintroduce the 2-phase blocking behaviour we are eliminating.
921    let paths_owned = paths.clone();
922    let producer_handle = std::thread::spawn(move || {
923        pool.install(|| {
924            process_items.into_par_iter().for_each(|item| {
925                let t0 = std::time::Instant::now();
926                let result = stage_file(
927                    item.idx,
928                    &item.path,
929                    &item.derived_name,
930                    &paths_owned,
931                    enable_ner,
932                    gliner_variant,
933                    max_rss_mb,
934                );
935                let elapsed_ms = t0.elapsed().as_millis() as u64;
936
937                // Emit NDJSON progress event to stderr so the user sees work
938                // happening during long NER runs (e.g. 50 files × 27s each).
939                let (n_entities, n_relationships) = match &result {
940                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
941                    Err(_) => (0, 0),
942                };
943                let progress = StageProgressEvent {
944                    schema_version: 1,
945                    event: "file_extracted",
946                    path: &item.file_str,
947                    ms: elapsed_ms,
948                    entities: n_entities,
949                    relationships: n_relationships,
950                };
951                if let Ok(line) = serde_json::to_string(&progress) {
952                    eprintln!("{line}");
953                }
954
955                // Blocking send applies backpressure: if Phase B is slower,
956                // Phase A workers wait here instead of accumulating staged files
957                // in memory. If the receiver is dropped (fail_fast abort), ignore.
958                let _ = tx.send((item.idx, result));
959            });
960            // Explicit drop of tx signals Phase B (rx iteration) to stop.
961            drop(tx);
962        });
963    });
964
965    // Phase B: main thread persists files as results arrive from the channel.
966    // Results arrive in completion order (par_iter is unordered). We persist
967    // each file immediately on arrival — this is the key fix for B1: with the
968    // old 2-phase design the first DB write happened only after ALL files had
969    // finished Phase A. Now the first commit happens as soon as the first file
970    // completes Phase A, regardless of how many files remain.
971    //
972    // NDJSON output order follows completion order (not file-system sort order).
973    // Skip slots are emitted at the end, after all Process results are consumed.
974    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
975    // requirement than ensuring data is persisted before the user's timeout fires.
976    let fail_fast = args.fail_fast;
977
978    // Emit pending Skip events first so agents see them early.
979    for meta in &slots_meta {
980        if let SlotMeta::Skip {
981            file_str,
982            derived_base,
983            name_truncated,
984            original_name,
985            original_filename,
986            reason,
987        } = meta
988        {
989            output::emit_json_compact(&IngestFileEvent {
990                file: file_str,
991                name: derived_base,
992                status: "skipped",
993                truncated: *name_truncated,
994                original_name: original_name.clone(),
995                original_filename: original_filename.as_deref(),
996                error: Some(reason.clone()),
997                memory_id: None,
998                action: None,
999                body_length: 0,
1000            })?;
1001            skipped += 1;
1002        }
1003    }
1004
1005    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1006    // as channel messages arrive in completion order.
1007    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1008        .iter()
1009        .enumerate()
1010        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1011        .collect();
1012
1013    tracing::info!(
1014        target = "ingest",
1015        phase = "persist_start",
1016        files = total_to_process,
1017        "phase B starting: persisting files incrementally as Phase A completes each one",
1018    );
1019
1020    // Drain channel and persist each file immediately — no accumulation into a
1021    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1022    // Phase B without applying backpressure.
1023    for (idx, stage_result) in rx {
1024        let meta = meta_index.get(&idx).ok_or_else(|| {
1025            AppError::Internal(anyhow::anyhow!(
1026                "channel idx {idx} has no corresponding Process slot"
1027            ))
1028        })?;
1029        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1030        {
1031            SlotMeta::Process {
1032                file_str,
1033                derived_name,
1034                name_truncated,
1035                original_name,
1036                original_filename,
1037            } => (
1038                file_str,
1039                derived_name,
1040                name_truncated,
1041                original_name,
1042                original_filename,
1043            ),
1044            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1045        };
1046
1047        // If storage init failed, every file fails with the same error.
1048        let conn = match conn_or_err.as_mut() {
1049            Ok(c) => c,
1050            Err(err_msg) => {
1051                let err_clone = err_msg.clone();
1052                output::emit_json_compact(&IngestFileEvent {
1053                    file: file_str,
1054                    name: derived_name,
1055                    status: "failed",
1056                    truncated: *name_truncated,
1057                    original_name: original_name.clone(),
1058                    original_filename: original_filename.as_deref(),
1059                    error: Some(err_clone.clone()),
1060                    memory_id: None,
1061                    action: None,
1062                    body_length: 0,
1063                })?;
1064                failed += 1;
1065                if fail_fast {
1066                    output::emit_json_compact(&IngestSummary {
1067                        summary: true,
1068                        dir: args.dir.display().to_string(),
1069                        pattern: args.pattern.clone(),
1070                        recursive: args.recursive,
1071                        files_total: total,
1072                        files_succeeded: succeeded,
1073                        files_failed: failed,
1074                        files_skipped: skipped,
1075                        elapsed_ms: started.elapsed().as_millis() as u64,
1076                    })?;
1077                    return Err(AppError::Validation(format!(
1078                        "ingest aborted on first failure: {err_clone}"
1079                    )));
1080                }
1081                continue;
1082            }
1083        };
1084
1085        let outcome =
1086            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1087
1088        match outcome {
1089            Ok(FileSuccess {
1090                memory_id,
1091                action,
1092                body_length,
1093            }) => {
1094                output::emit_json_compact(&IngestFileEvent {
1095                    file: file_str,
1096                    name: derived_name,
1097                    status: "indexed",
1098                    truncated: *name_truncated,
1099                    original_name: original_name.clone(),
1100                    original_filename: original_filename.as_deref(),
1101                    error: None,
1102                    memory_id: Some(memory_id),
1103                    action: Some(action),
1104                    body_length,
1105                })?;
1106                succeeded += 1;
1107            }
1108            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1109                output::emit_json_compact(&IngestFileEvent {
1110                    file: file_str,
1111                    name: derived_name,
1112                    status: "skipped",
1113                    truncated: *name_truncated,
1114                    original_name: original_name.clone(),
1115                    original_filename: original_filename.as_deref(),
1116                    error: Some(format!("{e}")),
1117                    memory_id: None,
1118                    action: Some("duplicate".to_string()),
1119                    body_length: 0,
1120                })?;
1121                skipped += 1;
1122            }
1123            Err(e) => {
1124                let err_msg = format!("{e}");
1125                output::emit_json_compact(&IngestFileEvent {
1126                    file: file_str,
1127                    name: derived_name,
1128                    status: "failed",
1129                    truncated: *name_truncated,
1130                    original_name: original_name.clone(),
1131                    original_filename: original_filename.as_deref(),
1132                    error: Some(err_msg.clone()),
1133                    memory_id: None,
1134                    action: None,
1135                    body_length: 0,
1136                })?;
1137                failed += 1;
1138                if fail_fast {
1139                    output::emit_json_compact(&IngestSummary {
1140                        summary: true,
1141                        dir: args.dir.display().to_string(),
1142                        pattern: args.pattern.clone(),
1143                        recursive: args.recursive,
1144                        files_total: total,
1145                        files_succeeded: succeeded,
1146                        files_failed: failed,
1147                        files_skipped: skipped,
1148                        elapsed_ms: started.elapsed().as_millis() as u64,
1149                    })?;
1150                    return Err(AppError::Validation(format!(
1151                        "ingest aborted on first failure: {err_msg}"
1152                    )));
1153                }
1154            }
1155        }
1156    }
1157
1158    // Wait for the producer thread to finish cleanly.
1159    producer_handle
1160        .join()
1161        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1162
1163    if let Ok(ref conn) = conn_or_err {
1164        if succeeded > 0 {
1165            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1166        }
1167    }
1168
1169    output::emit_json_compact(&IngestSummary {
1170        summary: true,
1171        dir: args.dir.display().to_string(),
1172        pattern: args.pattern.clone(),
1173        recursive: args.recursive,
1174        files_total: total,
1175        files_succeeded: succeeded,
1176        files_failed: failed,
1177        files_skipped: skipped,
1178        elapsed_ms: started.elapsed().as_millis() as u64,
1179    })?;
1180
1181    Ok(())
1182}
1183
1184/// Auto-initialises the database (matches the contract of every other CRUD
1185/// handler) and returns a fresh read/write connection ready for the ingest
1186/// loop. Errors here are recoverable per-file: the caller surfaces them as
1187/// failure events so `--fail-fast` and the continue-on-error path keep
1188/// working when, for example, the user points `--db` at an unwritable path.
1189fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1190    ensure_db_ready(paths)?;
1191    let conn = open_rw(&paths.db)?;
1192    Ok(conn)
1193}
1194
1195fn collect_files(
1196    dir: &Path,
1197    pattern: &str,
1198    recursive: bool,
1199    out: &mut Vec<PathBuf>,
1200) -> Result<(), AppError> {
1201    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1202    for entry in entries {
1203        let entry = entry.map_err(AppError::Io)?;
1204        let path = entry.path();
1205        let file_type = entry.file_type().map_err(AppError::Io)?;
1206        if file_type.is_file() {
1207            let name = entry.file_name();
1208            let name_str = name.to_string_lossy();
1209            if matches_pattern(&name_str, pattern) {
1210                out.push(path);
1211            }
1212        } else if file_type.is_dir() && recursive {
1213            collect_files(&path, pattern, recursive, out)?;
1214        }
1215    }
1216    Ok(())
1217}
1218
1219fn matches_pattern(name: &str, pattern: &str) -> bool {
1220    if let Some(suffix) = pattern.strip_prefix('*') {
1221        name.ends_with(suffix)
1222    } else if let Some(prefix) = pattern.strip_suffix('*') {
1223        name.starts_with(prefix)
1224    } else {
1225        name == pattern
1226    }
1227}
1228
1229/// Returns `(final_name, truncated, original_name)`.
1230/// `truncated` is true when the derived name exceeded `max_len`.
1231/// `original_name` holds the pre-truncation name only when `truncated=true`.
1232///
1233/// Non-ASCII characters are first decomposed via NFD and then stripped of
1234/// combining marks so accented letters fold to their base ASCII letter
1235/// (e.g. `açaí` → `acai`, `naïve` → `naive`). Characters with no ASCII
1236/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1237/// preserves meaningful word content rather than collapsing the basename
1238/// to a few stray ASCII letters as the previous filter did.
1239fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1240    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1241    let lowered: String = stem
1242        .nfd()
1243        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1244        .map(|c| {
1245            if c == '_' || c.is_whitespace() {
1246                '-'
1247            } else {
1248                c
1249            }
1250        })
1251        .map(|c| c.to_ascii_lowercase())
1252        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1253        .collect();
1254    let collapsed = collapse_dashes(&lowered);
1255    let trimmed_raw = collapsed.trim_matches('-').to_string();
1256    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1257    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1258        format!("doc-{trimmed_raw}")
1259    } else {
1260        trimmed_raw
1261    };
1262    if trimmed.len() > max_len {
1263        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1264        tracing::debug!(
1265            target: "ingest",
1266            original = %trimmed,
1267            truncated_to = %truncated,
1268            max_len = max_len,
1269            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1270        );
1271        (truncated, true, Some(trimmed))
1272    } else {
1273        (trimmed, false, None)
1274    }
1275}
1276
1277/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1278/// numeric suffix (`-1`, `-2`, …) when needed.
1279///
1280/// `taken` is the set of names already consumed in the current ingest run.
1281/// The caller is expected to insert the returned name into `taken` so the
1282/// next call observes the consumption. Cross-run collisions are intentionally
1283/// surfaced by the per-file persistence path as duplicates so re-ingestion
1284/// of identical corpora stays idempotent.
1285///
1286/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1287/// candidates collide, signalling a pathological corpus that should be
1288/// renamed manually.
1289fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1290    if !taken.contains(base) {
1291        return Ok(base.to_string());
1292    }
1293    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1294        let candidate = format!("{base}-{suffix}");
1295        if !taken.contains(&candidate) {
1296            tracing::warn!(
1297                target: "ingest",
1298                base = %base,
1299                resolved = %candidate,
1300                suffix,
1301                "memory name collision resolved with numeric suffix"
1302            );
1303            return Ok(candidate);
1304        }
1305    }
1306    Err(AppError::Validation(format!(
1307        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1308    )))
1309}
1310
1311fn collapse_dashes(s: &str) -> String {
1312    let mut out = String::with_capacity(s.len());
1313    let mut prev_dash = false;
1314    for c in s.chars() {
1315        if c == '-' {
1316            if !prev_dash {
1317                out.push('-');
1318            }
1319            prev_dash = true;
1320        } else {
1321            out.push(c);
1322            prev_dash = false;
1323        }
1324    }
1325    out
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330    use super::*;
1331    use std::path::PathBuf;
1332
1333    #[test]
1334    fn matches_pattern_suffix() {
1335        assert!(matches_pattern("foo.md", "*.md"));
1336        assert!(!matches_pattern("foo.txt", "*.md"));
1337        assert!(matches_pattern("foo.md", "*"));
1338    }
1339
1340    #[test]
1341    fn matches_pattern_prefix() {
1342        assert!(matches_pattern("README.md", "README*"));
1343        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1344    }
1345
1346    #[test]
1347    fn matches_pattern_exact() {
1348        assert!(matches_pattern("README.md", "README.md"));
1349        assert!(!matches_pattern("readme.md", "README.md"));
1350    }
1351
1352    #[test]
1353    fn derive_kebab_underscore_to_dash() {
1354        let p = PathBuf::from("/tmp/claude_code_headless.md");
1355        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1356        assert_eq!(name, "claude-code-headless");
1357        assert!(!truncated);
1358        assert!(original.is_none());
1359    }
1360
1361    #[test]
1362    fn derive_kebab_uppercase_lowered() {
1363        let p = PathBuf::from("/tmp/README.md");
1364        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1365        assert_eq!(name, "readme");
1366        assert!(!truncated);
1367        assert!(original.is_none());
1368    }
1369
1370    #[test]
1371    fn derive_kebab_strips_non_kebab_chars() {
1372        let p = PathBuf::from("/tmp/some@weird#name!.md");
1373        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1374        assert_eq!(name, "someweirdname");
1375        assert!(!truncated);
1376        assert!(original.is_none());
1377    }
1378
1379    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1380    // accented characters instead of dropping them entirely.
1381    #[test]
1382    fn derive_kebab_folds_accented_letters_to_ascii() {
1383        let p = PathBuf::from("/tmp/açaí.md");
1384        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1385        assert_eq!(name, "acai", "got '{name}'");
1386    }
1387
1388    #[test]
1389    fn derive_kebab_handles_naive_with_diaeresis() {
1390        let p = PathBuf::from("/tmp/naïve-test.md");
1391        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1392        assert_eq!(name, "naive-test", "got '{name}'");
1393    }
1394
1395    #[test]
1396    fn derive_kebab_drops_emoji_keeps_word() {
1397        let p = PathBuf::from("/tmp/🚀-rocket.md");
1398        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1399        assert_eq!(name, "rocket", "got '{name}'");
1400    }
1401
1402    #[test]
1403    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1404        let p = PathBuf::from("/tmp/açaí🦜.md");
1405        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1406        assert_eq!(name, "acai", "got '{name}'");
1407    }
1408
1409    #[test]
1410    fn derive_kebab_pure_emoji_yields_empty() {
1411        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1412        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1413        assert!(name.is_empty(), "got '{name}'");
1414    }
1415
1416    #[test]
1417    fn derive_kebab_collapses_consecutive_dashes() {
1418        let p = PathBuf::from("/tmp/a__b___c.md");
1419        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1420        assert_eq!(name, "a-b-c");
1421        assert!(!truncated);
1422        assert!(original.is_none());
1423    }
1424
1425    #[test]
1426    fn derive_kebab_truncates_to_60_chars() {
1427        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1428        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1429        assert!(name.len() <= 60, "got len {}", name.len());
1430        assert!(truncated);
1431        assert!(original.is_some());
1432        assert!(original.unwrap().len() > 60);
1433    }
1434
1435    #[test]
1436    fn collect_files_finds_md_files() {
1437        let tmp = tempfile::tempdir().expect("tempdir");
1438        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1439        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1440        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1441        let mut out = Vec::new();
1442        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1443        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1444    }
1445
1446    #[test]
1447    fn collect_files_recursive_descends_subdirs() {
1448        let tmp = tempfile::tempdir().expect("tempdir");
1449        let sub = tmp.path().join("sub");
1450        std::fs::create_dir(&sub).unwrap();
1451        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1452        std::fs::write(sub.join("b.md"), "y").unwrap();
1453        let mut out = Vec::new();
1454        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1455        assert_eq!(out.len(), 2);
1456    }
1457
1458    #[test]
1459    fn collect_files_non_recursive_skips_subdirs() {
1460        let tmp = tempfile::tempdir().expect("tempdir");
1461        let sub = tmp.path().join("sub");
1462        std::fs::create_dir(&sub).unwrap();
1463        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1464        std::fs::write(sub.join("b.md"), "y").unwrap();
1465        let mut out = Vec::new();
1466        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1467        assert_eq!(out.len(), 1);
1468    }
1469
1470    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1471
1472    #[test]
1473    fn derive_kebab_long_basename_truncated_within_cap() {
1474        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1475        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1476        assert!(
1477            name.len() <= DERIVED_NAME_MAX_LEN,
1478            "truncated name must respect cap; got {} chars",
1479            name.len()
1480        );
1481        assert!(!name.is_empty());
1482        assert!(truncated);
1483        assert!(original.is_some());
1484    }
1485
1486    #[test]
1487    fn unique_name_returns_base_when_free() {
1488        let taken: BTreeSet<String> = BTreeSet::new();
1489        let resolved = unique_name("note", &taken).expect("must resolve");
1490        assert_eq!(resolved, "note");
1491    }
1492
1493    #[test]
1494    fn unique_name_appends_first_free_suffix_on_collision() {
1495        let mut taken: BTreeSet<String> = BTreeSet::new();
1496        taken.insert("note".to_string());
1497        taken.insert("note-1".to_string());
1498        let resolved = unique_name("note", &taken).expect("must resolve");
1499        assert_eq!(resolved, "note-2");
1500    }
1501
1502    #[test]
1503    fn unique_name_errors_after_collision_cap() {
1504        let mut taken: BTreeSet<String> = BTreeSet::new();
1505        taken.insert("note".to_string());
1506        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1507            taken.insert(format!("note-{i}"));
1508        }
1509        let err = unique_name("note", &taken).expect_err("must surface error");
1510        assert!(matches!(err, AppError::Validation(_)));
1511    }
1512
1513    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1514
1515    #[test]
1516    fn validate_relation_format_accepts_valid_relations() {
1517        use crate::parsers::{is_canonical_relation, validate_relation_format};
1518        assert!(validate_relation_format("applies_to").is_ok());
1519        assert!(validate_relation_format("depends_on").is_ok());
1520        assert!(validate_relation_format("implements").is_ok());
1521        assert!(validate_relation_format("").is_err());
1522        assert!(is_canonical_relation("applies_to"));
1523        assert!(!is_canonical_relation("implements"));
1524    }
1525
1526    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1527
1528    use serial_test::serial;
1529
1530    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1531    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1532        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1533        let prev = std::env::var(key).ok();
1534        match value {
1535            Some(v) => std::env::set_var(key, v),
1536            None => std::env::remove_var(key),
1537        }
1538        f();
1539        match prev {
1540            Some(p) => std::env::set_var(key, p),
1541            None => std::env::remove_var(key),
1542        }
1543    }
1544
1545    #[test]
1546    #[serial]
1547    fn env_low_memory_enabled_unset_returns_false() {
1548        with_env_var(None, || assert!(!env_low_memory_enabled()));
1549    }
1550
1551    #[test]
1552    #[serial]
1553    fn env_low_memory_enabled_empty_returns_false() {
1554        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1555    }
1556
1557    #[test]
1558    #[serial]
1559    fn env_low_memory_enabled_truthy_values_return_true() {
1560        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1561            with_env_var(Some(v), || {
1562                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1563            });
1564        }
1565    }
1566
1567    #[test]
1568    #[serial]
1569    fn env_low_memory_enabled_falsy_values_return_false() {
1570        for v in ["0", "false", "FALSE", "no", "off"] {
1571            with_env_var(Some(v), || {
1572                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1573            });
1574        }
1575    }
1576
1577    #[test]
1578    #[serial]
1579    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1580        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1581    }
1582
1583    #[test]
1584    #[serial]
1585    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1586        with_env_var(None, || {
1587            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1588            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1589            assert_eq!(resolve_parallelism(true, None), 1);
1590        });
1591    }
1592
1593    #[test]
1594    #[serial]
1595    fn resolve_parallelism_env_forces_one_when_flag_off() {
1596        with_env_var(Some("1"), || {
1597            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1598            assert_eq!(resolve_parallelism(false, None), 1);
1599        });
1600    }
1601
1602    #[test]
1603    #[serial]
1604    fn resolve_parallelism_falsy_env_does_not_override() {
1605        with_env_var(Some("0"), || {
1606            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1607        });
1608    }
1609
1610    #[test]
1611    #[serial]
1612    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1613        with_env_var(None, || {
1614            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1615            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1616        });
1617    }
1618
1619    #[test]
1620    #[serial]
1621    fn resolve_parallelism_default_when_unset() {
1622        with_env_var(None, || {
1623            let p = resolve_parallelism(false, None);
1624            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1625        });
1626    }
1627
1628    #[test]
1629    fn ingest_args_parses_low_memory_flag_via_clap() {
1630        use clap::Parser;
1631        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1632        // on the public `Cli` definition so the flag is wired end-to-end.
1633        let cli = crate::cli::Cli::try_parse_from([
1634            "sqlite-graphrag",
1635            "ingest",
1636            "/tmp/dummy",
1637            "--type",
1638            "document",
1639            "--low-memory",
1640        ])
1641        .expect("parse must succeed");
1642        match cli.command {
1643            crate::cli::Commands::Ingest(args) => {
1644                assert!(args.low_memory, "--low-memory must set field to true");
1645            }
1646            _ => panic!("expected Ingest subcommand"),
1647        }
1648    }
1649
1650    #[test]
1651    fn ingest_args_low_memory_defaults_false() {
1652        use clap::Parser;
1653        let cli = crate::cli::Cli::try_parse_from([
1654            "sqlite-graphrag",
1655            "ingest",
1656            "/tmp/dummy",
1657            "--type",
1658            "document",
1659        ])
1660        .expect("parse must succeed");
1661        match cli.command {
1662            crate::cli::Commands::Ingest(args) => {
1663                assert!(!args.low_memory, "default must be false");
1664            }
1665            _ => panic!("expected Ingest subcommand"),
1666        }
1667    }
1668}