Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Incremental pipeline (v1.0.43)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file. Results are sent immediately via a
21//! bounded `mpsc::sync_channel` to Phase B so persistence starts as soon
22//! as the first file completes — no waiting for all files to finish Phase A.
23//!
24//! Phase B runs on the main thread: receives staged files from the channel,
25//! writes to SQLite per-file (WAL absorbs individual commits), and emits
26//! NDJSON progress events to stderr as each file is persisted. `Connection`
27//! is not `Sync` so it never crosses thread boundaries.
28//!
29//! This fixes B1: with the old 2-phase design, a 50-file corpus with 27s/file
30//! NER would spend ~22min in Phase A alone, exceeding the user's 900s timeout
31//! before Phase B (and any DB writes) could begin. With this pipeline, the
32//! first file is committed within seconds of starting.
33
34use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56/// Hard cap on the numeric suffix appended for collision resolution. If 1000
57/// candidates collide we surface an error rather than loop forever.
58const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n  \
62    # Ingest every Markdown file under ./docs as `document` memories\n  \
63    sqlite-graphrag ingest ./docs --type document\n\n  \
64    # Ingest .txt files recursively under ./notes\n  \
65    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
66    # Enable GLiNER NER extraction (disabled by default, slower)\n  \
67    sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n  \
68    # Preview file-to-name mapping without ingesting\n  \
69    sqlite-graphrag ingest ./docs --dry-run\n\n  \
70NOTES:\n  \
71    Each file becomes a separate memory. Names derive from file basenames\n  \
72    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
73    followed by a final summary line with counts. Per-file errors are reported\n  \
74    inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76    /// Directory containing files to ingest.
77    #[arg(
78        value_name = "DIR",
79        help = "Directory to ingest recursively (each matching file becomes a memory)"
80    )]
81    pub dir: PathBuf,
82
83    /// Memory type stored in `memories.type` for every ingested file. Defaults to `document`.
84    #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85    pub r#type: MemoryType,
86
87    /// Glob pattern matched against file basenames (default: `*.md`). Supports
88    /// `*.<ext>`, `<prefix>*`, and exact filename match.
89    #[arg(long, default_value = "*.md")]
90    pub pattern: String,
91
92    /// Recurse into subdirectories.
93    #[arg(long, default_value_t = false)]
94    pub recursive: bool,
95
96    #[arg(
97        long,
98        env = "SQLITE_GRAPHRAG_ENABLE_NER",
99        value_parser = crate::parsers::parse_bool_flexible,
100        action = clap::ArgAction::Set,
101        num_args = 0..=1,
102        default_missing_value = "true",
103        default_value = "false",
104        help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105    )]
106    pub enable_ner: bool,
107    #[arg(
108        long,
109        env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110        default_value = "fp32",
111        help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112    )]
113    pub gliner_variant: String,
114
115    /// Deprecated: NER is now disabled by default. Kept for backwards compatibility.
116    #[arg(long, default_value_t = false, hide = true)]
117    pub skip_extraction: bool,
118
119    /// Stop on first per-file error instead of continuing with the next file.
120    #[arg(long, default_value_t = false)]
121    pub fail_fast: bool,
122
123    /// Preview file-to-name mapping without loading model or persisting.
124    #[arg(long, default_value_t = false)]
125    pub dry_run: bool,
126
127    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
128    #[arg(long, default_value_t = 10_000)]
129    pub max_files: usize,
130
131    /// Namespace for the ingested memories.
132    #[arg(long)]
133    pub namespace: Option<String>,
134
135    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
136    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137    pub db: Option<String>,
138
139    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140    pub format: JsonOutputFormat,
141
142    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143    pub json: bool,
144
145    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
146    #[arg(
147        long,
148        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149    )]
150    pub ingest_parallelism: Option<usize>,
151
152    /// Force single-threaded ingest to reduce RSS pressure.
153    ///
154    /// Equivalent to `--ingest-parallelism 1`, takes precedence over any
155    /// explicit value. Recommended for environments with <4 GB available
156    /// RAM or container/cgroup constraints. Trade-off: 3-4x longer wall
157    /// time. Also honored via `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var
158    /// (CLI flag has higher precedence than the env var).
159    #[arg(
160        long,
161        default_value_t = false,
162        help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163                Recommended for environments with <4 GB available RAM or container/cgroup \
164                constraints. Trade-off: 3-4x longer wall time. Also honored via \
165                SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166    )]
167    pub low_memory: bool,
168
169    /// Maximum process RSS in MiB; abort if exceeded during embedding.
170    #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171          help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172    pub max_rss_mb: u64,
173
174    /// Maximum character length for derived memory names from file basenames.
175    ///
176    /// Overrides the compile-time `DERIVED_NAME_MAX_LEN` constant (default 60).
177    /// Shorter values leave more headroom for collision suffix resolution.
178    #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
179          help = "Maximum length for derived memory names (default: 60)")]
180    pub max_name_length: usize,
181
182    /// Extraction mode: `none` (body-only, default), `gliner` (NER), or `claude-code` (LLM-curated via Claude Code CLI).
183    #[arg(long, value_enum, default_value_t = IngestMode::None)]
184    pub mode: IngestMode,
185
186    /// Explicit path to the Claude Code binary (only with --mode claude-code).
187    #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
188    pub claude_binary: Option<std::path::PathBuf>,
189
190    /// Model override for Claude Code extraction (e.g. claude-sonnet-4-6).
191    #[arg(long)]
192    pub claude_model: Option<String>,
193
194    /// Resume a previously interrupted claude-code ingest from the queue DB.
195    #[arg(long, default_value_t = false)]
196    pub resume: bool,
197
198    /// Retry only failed files from a previous claude-code ingest.
199    #[arg(long, default_value_t = false)]
200    pub retry_failed: bool,
201
202    /// Keep the queue DB (.ingest-queue.sqlite) after completion.
203    #[arg(long, default_value_t = false)]
204    pub keep_queue: bool,
205
206    /// Custom path for the claude-code ingest queue database.
207    #[arg(long, default_value = ".ingest-queue.sqlite")]
208    pub queue_db: String,
209
210    /// Initial wait time in seconds when rate-limited (only with --mode claude-code).
211    #[arg(long, default_value_t = 60)]
212    pub rate_limit_wait: u64,
213
214    /// Maximum cumulative cost in USD before aborting (only with --mode claude-code).
215    #[arg(long)]
216    pub max_cost_usd: Option<f64>,
217}
218
219/// Extraction mode for the ingest pipeline.
220#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
221pub enum IngestMode {
222    /// Body-only ingestion without entity/relationship extraction (default).
223    None,
224    /// GLiNER zero-shot NER extraction (requires --enable-ner).
225    Gliner,
226    /// LLM-curated extraction via locally installed Claude Code CLI.
227    ClaudeCode,
228}
229
230/// Returns true when the `SQLITE_GRAPHRAG_LOW_MEMORY` env var is set to a
231/// truthy value (`1`, `true`, `yes`, `on`, case-insensitive). Empty or unset
232/// values evaluate to false. Unrecognized non-empty values emit a
233/// `tracing::warn!` and evaluate to false.
234fn env_low_memory_enabled() -> bool {
235    match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
236        Ok(v) if v.is_empty() => false,
237        Ok(v) => match v.to_lowercase().as_str() {
238            "1" | "true" | "yes" | "on" => true,
239            "0" | "false" | "no" | "off" => false,
240            other => {
241                tracing::warn!(
242                    target: "ingest",
243                    value = %other,
244                    "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
245                );
246                false
247            }
248        },
249        Err(_) => false,
250    }
251}
252
253/// Resolves the effective ingest parallelism honoring `--low-memory` and the
254/// `SQLITE_GRAPHRAG_LOW_MEMORY` env var.
255///
256/// Precedence:
257/// 1. `--low-memory` CLI flag forces parallelism = 1.
258/// 2. `SQLITE_GRAPHRAG_LOW_MEMORY=1` env var forces parallelism = 1.
259/// 3. Explicit `--ingest-parallelism N` (when low-memory is off).
260/// 4. Default heuristic `(cpus/2).clamp(1, 4)`.
261///
262/// When low-memory wins and the user also passed `--ingest-parallelism N>1`,
263/// emits a `tracing::warn!` advertising the override.
264fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
265    let env_flag = env_low_memory_enabled();
266    let low_memory = low_memory_flag || env_flag;
267
268    if low_memory {
269        if let Some(n) = ingest_parallelism {
270            if n > 1 {
271                tracing::warn!(
272                    target: "ingest",
273                    requested = n,
274                    "--ingest-parallelism overridden by --low-memory; using 1"
275                );
276            }
277        }
278        if low_memory_flag {
279            tracing::info!(
280                target: "ingest",
281                source = "flag",
282                "low-memory mode enabled: forcing --ingest-parallelism 1"
283            );
284        } else {
285            tracing::info!(
286                target: "ingest",
287                source = "env",
288                "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
289            );
290        }
291        return 1;
292    }
293
294    ingest_parallelism
295        .unwrap_or_else(|| {
296            std::thread::available_parallelism()
297                .map(|v| v.get() / 2)
298                .unwrap_or(1)
299                .clamp(1, 4)
300        })
301        .max(1)
302}
303
304#[derive(Serialize)]
305struct IngestFileEvent<'a> {
306    file: &'a str,
307    name: &'a str,
308    status: &'a str,
309    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
310    truncated: bool,
311    /// Original derived name before truncation; only present when `truncated=true`.
312    #[serde(skip_serializing_if = "Option::is_none")]
313    original_name: Option<String>,
314    /// Original file basename (without extension); only present when it differs from `name`.
315    #[serde(skip_serializing_if = "Option::is_none")]
316    original_filename: Option<&'a str>,
317    #[serde(skip_serializing_if = "Option::is_none")]
318    error: Option<String>,
319    #[serde(skip_serializing_if = "Option::is_none")]
320    memory_id: Option<i64>,
321    #[serde(skip_serializing_if = "Option::is_none")]
322    action: Option<String>,
323    /// Byte length of the body ingested; 0 when not yet read (e.g. skip or dry-run events).
324    body_length: usize,
325}
326
327#[derive(Serialize)]
328struct IngestSummary {
329    summary: bool,
330    dir: String,
331    pattern: String,
332    recursive: bool,
333    files_total: usize,
334    files_succeeded: usize,
335    files_failed: usize,
336    files_skipped: usize,
337    elapsed_ms: u64,
338}
339
340/// Outcome of a successful per-file ingest, used to build the NDJSON event.
341struct FileSuccess {
342    memory_id: i64,
343    action: String,
344    body_length: usize,
345}
346
347/// NDJSON progress event emitted to stderr after each file completes Phase A.
348/// Schema version 1; consumers should check `schema_version` before parsing.
349#[derive(Serialize)]
350struct StageProgressEvent<'a> {
351    schema_version: u8,
352    event: &'a str,
353    path: &'a str,
354    ms: u64,
355    entities: usize,
356    relationships: usize,
357}
358
359/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
360/// Phase B persists these to SQLite on the main thread in submission order.
361struct StagedFile {
362    body: String,
363    body_hash: String,
364    snippet: String,
365    name: String,
366    description: String,
367    embedding: Vec<f32>,
368    chunk_embeddings: Option<Vec<Vec<f32>>>,
369    chunks_info: Vec<crate::chunking::Chunk>,
370    entities: Vec<NewEntity>,
371    relationships: Vec<NewRelationship>,
372    entity_embeddings: Vec<Vec<f32>>,
373    urls: Vec<crate::extraction::ExtractedUrl>,
374}
375
376/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
377/// Never touches the database — safe to run on any rayon thread.
378fn stage_file(
379    _idx: usize,
380    path: &Path,
381    name: &str,
382    paths: &AppPaths,
383    enable_ner: bool,
384    gliner_variant: crate::extraction::GlinerVariant,
385    max_rss_mb: u64,
386) -> Result<StagedFile, AppError> {
387    use crate::constants::*;
388
389    if name.len() > MAX_MEMORY_NAME_LEN {
390        return Err(AppError::LimitExceeded(
391            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
392        ));
393    }
394    if name.starts_with("__") {
395        return Err(AppError::Validation(
396            crate::i18n::validation::reserved_name(),
397        ));
398    }
399    {
400        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
401            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
402        if !slug_re.is_match(name) {
403            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
404                name,
405            )));
406        }
407    }
408
409    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
410    if raw_body.len() > MAX_MEMORY_BODY_LEN {
411        return Err(AppError::LimitExceeded(
412            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
413        ));
414    }
415    if raw_body.trim().is_empty() {
416        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
417    }
418
419    let description = format!("ingested from {}", path.display());
420    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
421        return Err(AppError::Validation(
422            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
423        ));
424    }
425
426    let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
427    let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
428    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
429    if enable_ner {
430        match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
431            Ok(extracted) => {
432                extracted_urls = extracted.urls;
433                extracted_entities = extracted.entities;
434                extracted_relationships = extracted.relationships;
435
436                if extracted_entities.len() > max_entities_per_memory() {
437                    extracted_entities.truncate(max_entities_per_memory());
438                }
439                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
440                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
441                }
442            }
443            Err(e) => {
444                tracing::warn!(
445                    file = %path.display(),
446                    "auto-extraction failed (graceful degradation): {e:#}"
447                );
448            }
449        }
450    }
451
452    for rel in &mut extracted_relationships {
453        rel.relation = crate::parsers::normalize_relation(&rel.relation);
454        if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
455            return Err(AppError::Validation(format!(
456                "{e} for relationship '{}' -> '{}'",
457                rel.source, rel.target
458            )));
459        }
460        crate::parsers::warn_if_non_canonical(&rel.relation);
461        if !(0.0..=1.0).contains(&rel.strength) {
462            return Err(AppError::Validation(format!(
463                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
464                rel.strength, rel.source, rel.target
465            )));
466        }
467    }
468
469    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
470    let snippet: String = raw_body.chars().take(200).collect();
471
472    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
473    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
474    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
475        return Err(AppError::LimitExceeded(format!(
476            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
477            chunks_info.len(),
478            REMEMBER_MAX_SAFE_MULTI_CHUNKS
479        )));
480    }
481
482    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
483    let embedding = if chunks_info.len() == 1 {
484        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
485    } else {
486        let chunk_texts: Vec<&str> = chunks_info
487            .iter()
488            .map(|c| chunking::chunk_text(&raw_body, c))
489            .collect();
490        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
491        for chunk_text in &chunk_texts {
492            if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
493                if rss > max_rss_mb {
494                    tracing::error!(
495                        rss_mb = rss,
496                        max_rss_mb = max_rss_mb,
497                        file = %path.display(),
498                        "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
499                    );
500                    return Err(AppError::LowMemory {
501                        available_mb: crate::memory_guard::available_memory_mb(),
502                        required_mb: max_rss_mb,
503                    });
504                }
505            }
506            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
507                &paths.models,
508                chunk_text,
509            )?);
510        }
511        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
512        chunk_embeddings_opt = Some(chunk_embeddings);
513        aggregated
514    };
515
516    let entity_embeddings = extracted_entities
517        .iter()
518        .map(|entity| {
519            let entity_text = match &entity.description {
520                Some(desc) => format!("{} {}", entity.name, desc),
521                None => entity.name.clone(),
522            };
523            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
524        })
525        .collect::<Result<Vec<_>, _>>()?;
526
527    Ok(StagedFile {
528        body: raw_body,
529        body_hash,
530        snippet,
531        name: name.to_string(),
532        description,
533        embedding,
534        chunk_embeddings: chunk_embeddings_opt,
535        chunks_info,
536        entities: extracted_entities,
537        relationships: extracted_relationships,
538        entity_embeddings,
539        urls: extracted_urls,
540    })
541}
542
543/// Phase B: persists one `StagedFile` to the database on the main thread.
544fn persist_staged(
545    conn: &mut Connection,
546    namespace: &str,
547    memory_type: &str,
548    staged: StagedFile,
549) -> Result<FileSuccess, AppError> {
550    {
551        let active_count: u32 = conn.query_row(
552            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
553            [],
554            |r| r.get::<_, i64>(0).map(|v| v as u32),
555        )?;
556        let ns_exists: bool = conn.query_row(
557            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
558            rusqlite::params![namespace],
559            |r| r.get::<_, i64>(0).map(|v| v > 0),
560        )?;
561        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
562            return Err(AppError::NamespaceError(format!(
563                "active namespace limit of {} exceeded while creating '{namespace}'",
564                crate::constants::MAX_NAMESPACES_ACTIVE
565            )));
566        }
567    }
568
569    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
570    if existing_memory.is_some() {
571        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
572            &staged.name,
573            namespace,
574        )));
575    }
576    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
577
578    let new_memory = NewMemory {
579        namespace: namespace.to_string(),
580        name: staged.name.clone(),
581        memory_type: memory_type.to_string(),
582        description: staged.description.clone(),
583        body: staged.body,
584        body_hash: staged.body_hash,
585        session_id: None,
586        source: "agent".to_string(),
587        metadata: serde_json::json!({}),
588    };
589
590    if let Some(hash_id) = duplicate_hash_id {
591        tracing::debug!(
592            target: "ingest",
593            duplicate_memory_id = hash_id,
594            "identical body already exists; persisting a new memory anyway"
595        );
596    }
597
598    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
599
600    let memory_id = memories::insert(&tx, &new_memory)?;
601    versions::insert_version(
602        &tx,
603        memory_id,
604        1,
605        &staged.name,
606        memory_type,
607        &staged.description,
608        &new_memory.body,
609        &serde_json::to_string(&new_memory.metadata)?,
610        None,
611        "create",
612    )?;
613    memories::upsert_vec(
614        &tx,
615        memory_id,
616        namespace,
617        memory_type,
618        &staged.embedding,
619        &staged.name,
620        &staged.snippet,
621    )?;
622
623    if staged.chunks_info.len() > 1 {
624        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
625        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
626            AppError::Internal(anyhow::anyhow!(
627                "missing chunk embeddings cache on multi-chunk ingest path"
628            ))
629        })?;
630        for (i, emb) in chunk_embeddings.iter().enumerate() {
631            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
632        }
633    }
634
635    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
636        for (idx, entity) in staged.entities.iter().enumerate() {
637            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
638            let entity_embedding = &staged.entity_embeddings[idx];
639            entities::upsert_entity_vec(
640                &tx,
641                entity_id,
642                namespace,
643                entity.entity_type,
644                entity_embedding,
645                &entity.name,
646            )?;
647            entities::link_memory_entity(&tx, memory_id, entity_id)?;
648            entities::increment_degree(&tx, entity_id)?;
649        }
650        let entity_types: std::collections::HashMap<&str, EntityType> = staged
651            .entities
652            .iter()
653            .map(|entity| (entity.name.as_str(), entity.entity_type))
654            .collect();
655        for rel in &staged.relationships {
656            let source_entity = NewEntity {
657                name: rel.source.clone(),
658                entity_type: entity_types
659                    .get(rel.source.as_str())
660                    .copied()
661                    .unwrap_or(EntityType::Concept),
662                description: None,
663            };
664            let target_entity = NewEntity {
665                name: rel.target.clone(),
666                entity_type: entity_types
667                    .get(rel.target.as_str())
668                    .copied()
669                    .unwrap_or(EntityType::Concept),
670                description: None,
671            };
672            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
673            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
674            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
675            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
676        }
677    }
678
679    tx.commit()?;
680
681    if !staged.urls.is_empty() {
682        let url_entries: Vec<storage_urls::MemoryUrl> = staged
683            .urls
684            .into_iter()
685            .map(|u| storage_urls::MemoryUrl {
686                url: u.url,
687                offset: Some(u.offset as i64),
688            })
689            .collect();
690        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
691    }
692
693    Ok(FileSuccess {
694        memory_id,
695        action: "created".to_string(),
696        body_length: new_memory.body.len(),
697    })
698}
699
700pub fn run(args: IngestArgs) -> Result<(), AppError> {
701    if args.mode == IngestMode::ClaudeCode {
702        return super::ingest_claude::run_claude_ingest(&args);
703    }
704
705    let started = std::time::Instant::now();
706
707    if !args.dir.exists() {
708        return Err(AppError::Validation(format!(
709            "directory not found: {}",
710            args.dir.display()
711        )));
712    }
713    if !args.dir.is_dir() {
714        return Err(AppError::Validation(format!(
715            "path is not a directory: {}",
716            args.dir.display()
717        )));
718    }
719
720    let mut files: Vec<PathBuf> = Vec::with_capacity(128);
721    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
722    files.sort();
723
724    if files.len() > args.max_files {
725        return Err(AppError::Validation(format!(
726            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
727            files.len(),
728            args.max_files
729        )));
730    }
731
732    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
733    let memory_type_str = args.r#type.as_str().to_string();
734
735    let paths = AppPaths::resolve(args.db.as_deref())?;
736    let mut conn_or_err = match init_storage(&paths) {
737        Ok(c) => Ok(c),
738        Err(e) => Err(format!("{e}")),
739    };
740
741    let mut succeeded: usize = 0;
742    let mut failed: usize = 0;
743    let mut skipped: usize = 0;
744    let total = files.len();
745
746    // Pre-resolve all names before parallelisation so Phase A workers see a
747    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
748    let mut taken_names: BTreeSet<String> = BTreeSet::new();
749
750    // SlotMeta: per-slot output metadata retained on the main thread for NDJSON.
751    // ProcessItem: the data moved into the producer thread for Phase A computation.
752    // We split these so `slots_meta` (non-Send BTreeSet-dependent) stays on main
753    // thread while `process_items` (Send: only PathBuf + String) crosses the thread
754    // boundary into the rayon producer.
755    enum SlotMeta {
756        Skip {
757            file_str: String,
758            derived_base: String,
759            name_truncated: bool,
760            original_name: Option<String>,
761            original_filename: Option<String>,
762            reason: String,
763        },
764        Process {
765            file_str: String,
766            derived_name: String,
767            name_truncated: bool,
768            original_name: Option<String>,
769            original_filename: Option<String>,
770        },
771    }
772
773    struct ProcessItem {
774        idx: usize,
775        path: PathBuf,
776        file_str: String,
777        derived_name: String,
778    }
779
780    let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
781    let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
782    let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
783
784    let max_name_length = args.max_name_length;
785    for path in &files {
786        let file_str = path.to_string_lossy().into_owned();
787        let (derived_base, name_truncated, original_name) =
788            derive_kebab_name(path, max_name_length);
789        let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
790
791        if name_truncated {
792            if let Some(ref orig) = original_name {
793                truncations.push((orig.clone(), derived_base.clone()));
794            }
795        }
796
797        if derived_base.is_empty() {
798            // original_filename: always include when it differs from the empty derived name
799            let orig_filename = if !original_basename.is_empty() {
800                Some(original_basename.to_string())
801            } else {
802                None
803            };
804            slots_meta.push(SlotMeta::Skip {
805                file_str,
806                derived_base: String::new(),
807                name_truncated: false,
808                original_name: None,
809                original_filename: orig_filename,
810                reason: "could not derive a non-empty kebab-case name from filename".to_string(),
811            });
812            continue;
813        }
814
815        match unique_name(&derived_base, &taken_names) {
816            Ok(derived_name) => {
817                taken_names.insert(derived_name.clone());
818                let idx = slots_meta.len();
819                // original_filename: present only when the raw basename differs from the derived name
820                let orig_filename = if original_basename != derived_name {
821                    Some(original_basename.to_string())
822                } else {
823                    None
824                };
825                process_items.push(ProcessItem {
826                    idx,
827                    path: path.clone(),
828                    file_str: file_str.clone(),
829                    derived_name: derived_name.clone(),
830                });
831                slots_meta.push(SlotMeta::Process {
832                    file_str,
833                    derived_name,
834                    name_truncated,
835                    original_name,
836                    original_filename: orig_filename,
837                });
838            }
839            Err(e) => {
840                let orig_filename = if original_basename != derived_base {
841                    Some(original_basename.to_string())
842                } else {
843                    None
844                };
845                slots_meta.push(SlotMeta::Skip {
846                    file_str,
847                    derived_base,
848                    name_truncated,
849                    original_name,
850                    original_filename: orig_filename,
851                    reason: e.to_string(),
852                });
853            }
854        }
855    }
856
857    if !truncations.is_empty() {
858        tracing::info!(
859            target: "ingest",
860            count = truncations.len(),
861            max_name_length = max_name_length,
862            max_len = DERIVED_NAME_MAX_LEN,
863            "derived names truncated; pass -vv (debug) for per-file detail"
864        );
865    }
866
867    // --dry-run: emit preview events and exit before loading ONNX or touching DB.
868    if args.dry_run {
869        for meta in &slots_meta {
870            match meta {
871                SlotMeta::Skip {
872                    file_str,
873                    derived_base,
874                    name_truncated,
875                    original_name,
876                    original_filename,
877                    reason,
878                } => {
879                    output::emit_json_compact(&IngestFileEvent {
880                        file: file_str,
881                        name: derived_base,
882                        status: "skip",
883                        truncated: *name_truncated,
884                        original_name: original_name.clone(),
885                        original_filename: original_filename.as_deref(),
886                        error: Some(reason.clone()),
887                        memory_id: None,
888                        action: None,
889                        body_length: 0,
890                    })?;
891                }
892                SlotMeta::Process {
893                    file_str,
894                    derived_name,
895                    name_truncated,
896                    original_name,
897                    original_filename,
898                } => {
899                    output::emit_json_compact(&IngestFileEvent {
900                        file: file_str,
901                        name: derived_name,
902                        status: "preview",
903                        truncated: *name_truncated,
904                        original_name: original_name.clone(),
905                        original_filename: original_filename.as_deref(),
906                        error: None,
907                        memory_id: None,
908                        action: None,
909                        body_length: 0,
910                    })?;
911                }
912            }
913        }
914        output::emit_json_compact(&IngestSummary {
915            summary: true,
916            dir: args.dir.to_string_lossy().into_owned(),
917            pattern: args.pattern.clone(),
918            recursive: args.recursive,
919            files_total: total,
920            files_succeeded: 0,
921            files_failed: 0,
922            files_skipped: 0,
923            elapsed_ms: started.elapsed().as_millis() as u64,
924        })?;
925        return Ok(());
926    }
927
928    // Determine rayon thread pool size, honoring --low-memory and the
929    // SQLITE_GRAPHRAG_LOW_MEMORY env var (both force parallelism = 1).
930    let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
931
932    let pool = rayon::ThreadPoolBuilder::new()
933        .num_threads(parallelism)
934        .build()
935        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
936
937    if args.enable_ner && args.skip_extraction {
938        tracing::warn!(
939            "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
940        );
941    }
942    if args.skip_extraction && !args.enable_ner {
943        tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
944    }
945    let enable_ner = args.enable_ner;
946    let max_rss_mb = args.max_rss_mb;
947    let gliner_variant: crate::extraction::GlinerVariant =
948        args.gliner_variant.parse().unwrap_or_else(|e| {
949            tracing::warn!("invalid --gliner-variant: {e}; using fp32");
950            crate::extraction::GlinerVariant::Fp32
951        });
952
953    let total_to_process = process_items.len();
954    tracing::info!(
955        target = "ingest",
956        phase = "pipeline_start",
957        files = total_to_process,
958        ingest_parallelism = parallelism,
959        "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
960    );
961
962    // Bounded channel: producer never gets more than parallelism*2 items ahead of
963    // the consumer, preventing memory blowup when Phase A is faster than Phase B.
964    // Each message carries the slot index so Phase B can look up SlotMeta in order.
965    let channel_bound = (parallelism * 2).max(1);
966    let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
967
968    // Phase A: launched in a dedicated OS thread so the main thread can consume
969    // the channel concurrently. pool.install() blocks the calling thread until
970    // all rayon workers finish — if called on the main thread it would
971    // reintroduce the 2-phase blocking behaviour we are eliminating.
972    let paths_owned = paths.clone();
973    let producer_handle = std::thread::spawn(move || {
974        pool.install(|| {
975            process_items.into_par_iter().for_each(|item| {
976                let t0 = std::time::Instant::now();
977                let result = stage_file(
978                    item.idx,
979                    &item.path,
980                    &item.derived_name,
981                    &paths_owned,
982                    enable_ner,
983                    gliner_variant,
984                    max_rss_mb,
985                );
986                let elapsed_ms = t0.elapsed().as_millis() as u64;
987
988                // Emit NDJSON progress event to stderr so the user sees work
989                // happening during long NER runs (e.g. 50 files × 27s each).
990                let (n_entities, n_relationships) = match &result {
991                    Ok(sf) => (sf.entities.len(), sf.relationships.len()),
992                    Err(_) => (0, 0),
993                };
994                let progress = StageProgressEvent {
995                    schema_version: 1,
996                    event: "file_extracted",
997                    path: &item.file_str,
998                    ms: elapsed_ms,
999                    entities: n_entities,
1000                    relationships: n_relationships,
1001                };
1002                if let Ok(line) = serde_json::to_string(&progress) {
1003                    eprintln!("{line}");
1004                }
1005
1006                // Blocking send applies backpressure: if Phase B is slower,
1007                // Phase A workers wait here instead of accumulating staged files
1008                // in memory. If the receiver is dropped (fail_fast abort), ignore.
1009                let _ = tx.send((item.idx, result));
1010            });
1011            // Explicit drop of tx signals Phase B (rx iteration) to stop.
1012            drop(tx);
1013        });
1014    });
1015
1016    // Phase B: main thread persists files as results arrive from the channel.
1017    // Results arrive in completion order (par_iter is unordered). We persist
1018    // each file immediately on arrival — this is the key fix for B1: with the
1019    // old 2-phase design the first DB write happened only after ALL files had
1020    // finished Phase A. Now the first commit happens as soon as the first file
1021    // completes Phase A, regardless of how many files remain.
1022    //
1023    // NDJSON output order follows completion order (not file-system sort order).
1024    // Skip slots are emitted at the end, after all Process results are consumed.
1025    // This trade-off is intentional: deterministic NDJSON ordering is a lesser
1026    // requirement than ensuring data is persisted before the user's timeout fires.
1027    let fail_fast = args.fail_fast;
1028
1029    // Emit pending Skip events first so agents see them early.
1030    for meta in &slots_meta {
1031        if let SlotMeta::Skip {
1032            file_str,
1033            derived_base,
1034            name_truncated,
1035            original_name,
1036            original_filename,
1037            reason,
1038        } = meta
1039        {
1040            output::emit_json_compact(&IngestFileEvent {
1041                file: file_str,
1042                name: derived_base,
1043                status: "skipped",
1044                truncated: *name_truncated,
1045                original_name: original_name.clone(),
1046                original_filename: original_filename.as_deref(),
1047                error: Some(reason.clone()),
1048                memory_id: None,
1049                action: None,
1050                body_length: 0,
1051            })?;
1052            skipped += 1;
1053        }
1054    }
1055
1056    // Build a quick index from slot index → SlotMeta reference for O(1) lookups
1057    // as channel messages arrive in completion order.
1058    let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1059        .iter()
1060        .enumerate()
1061        .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1062        .collect();
1063
1064    tracing::info!(
1065        target = "ingest",
1066        phase = "persist_start",
1067        files = total_to_process,
1068        "phase B starting: persisting files incrementally as Phase A completes each one",
1069    );
1070
1071    // Drain channel and persist each file immediately — no accumulation into a
1072    // HashMap. The bounded channel ensures Phase A cannot run too far ahead of
1073    // Phase B without applying backpressure.
1074    for (idx, stage_result) in rx {
1075        let meta = meta_index.get(&idx).ok_or_else(|| {
1076            AppError::Internal(anyhow::anyhow!(
1077                "channel idx {idx} has no corresponding Process slot"
1078            ))
1079        })?;
1080        let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1081        {
1082            SlotMeta::Process {
1083                file_str,
1084                derived_name,
1085                name_truncated,
1086                original_name,
1087                original_filename,
1088            } => (
1089                file_str,
1090                derived_name,
1091                name_truncated,
1092                original_name,
1093                original_filename,
1094            ),
1095            SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1096        };
1097
1098        // If storage init failed, every file fails with the same error.
1099        let conn = match conn_or_err.as_mut() {
1100            Ok(c) => c,
1101            Err(err_msg) => {
1102                let err_clone = err_msg.clone();
1103                output::emit_json_compact(&IngestFileEvent {
1104                    file: file_str,
1105                    name: derived_name,
1106                    status: "failed",
1107                    truncated: *name_truncated,
1108                    original_name: original_name.clone(),
1109                    original_filename: original_filename.as_deref(),
1110                    error: Some(err_clone.clone()),
1111                    memory_id: None,
1112                    action: None,
1113                    body_length: 0,
1114                })?;
1115                failed += 1;
1116                if fail_fast {
1117                    output::emit_json_compact(&IngestSummary {
1118                        summary: true,
1119                        dir: args.dir.display().to_string(),
1120                        pattern: args.pattern.clone(),
1121                        recursive: args.recursive,
1122                        files_total: total,
1123                        files_succeeded: succeeded,
1124                        files_failed: failed,
1125                        files_skipped: skipped,
1126                        elapsed_ms: started.elapsed().as_millis() as u64,
1127                    })?;
1128                    return Err(AppError::Validation(format!(
1129                        "ingest aborted on first failure: {err_clone}"
1130                    )));
1131                }
1132                continue;
1133            }
1134        };
1135
1136        let outcome =
1137            stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1138
1139        match outcome {
1140            Ok(FileSuccess {
1141                memory_id,
1142                action,
1143                body_length,
1144            }) => {
1145                output::emit_json_compact(&IngestFileEvent {
1146                    file: file_str,
1147                    name: derived_name,
1148                    status: "indexed",
1149                    truncated: *name_truncated,
1150                    original_name: original_name.clone(),
1151                    original_filename: original_filename.as_deref(),
1152                    error: None,
1153                    memory_id: Some(memory_id),
1154                    action: Some(action),
1155                    body_length,
1156                })?;
1157                succeeded += 1;
1158            }
1159            Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1160                output::emit_json_compact(&IngestFileEvent {
1161                    file: file_str,
1162                    name: derived_name,
1163                    status: "skipped",
1164                    truncated: *name_truncated,
1165                    original_name: original_name.clone(),
1166                    original_filename: original_filename.as_deref(),
1167                    error: Some(format!("{e}")),
1168                    memory_id: None,
1169                    action: Some("duplicate".to_string()),
1170                    body_length: 0,
1171                })?;
1172                skipped += 1;
1173            }
1174            Err(e) => {
1175                let err_msg = format!("{e}");
1176                output::emit_json_compact(&IngestFileEvent {
1177                    file: file_str,
1178                    name: derived_name,
1179                    status: "failed",
1180                    truncated: *name_truncated,
1181                    original_name: original_name.clone(),
1182                    original_filename: original_filename.as_deref(),
1183                    error: Some(err_msg.clone()),
1184                    memory_id: None,
1185                    action: None,
1186                    body_length: 0,
1187                })?;
1188                failed += 1;
1189                if fail_fast {
1190                    output::emit_json_compact(&IngestSummary {
1191                        summary: true,
1192                        dir: args.dir.display().to_string(),
1193                        pattern: args.pattern.clone(),
1194                        recursive: args.recursive,
1195                        files_total: total,
1196                        files_succeeded: succeeded,
1197                        files_failed: failed,
1198                        files_skipped: skipped,
1199                        elapsed_ms: started.elapsed().as_millis() as u64,
1200                    })?;
1201                    return Err(AppError::Validation(format!(
1202                        "ingest aborted on first failure: {err_msg}"
1203                    )));
1204                }
1205            }
1206        }
1207    }
1208
1209    // Wait for the producer thread to finish cleanly.
1210    producer_handle
1211        .join()
1212        .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1213
1214    if let Ok(ref conn) = conn_or_err {
1215        if succeeded > 0 {
1216            let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1217        }
1218    }
1219
1220    output::emit_json_compact(&IngestSummary {
1221        summary: true,
1222        dir: args.dir.display().to_string(),
1223        pattern: args.pattern.clone(),
1224        recursive: args.recursive,
1225        files_total: total,
1226        files_succeeded: succeeded,
1227        files_failed: failed,
1228        files_skipped: skipped,
1229        elapsed_ms: started.elapsed().as_millis() as u64,
1230    })?;
1231
1232    Ok(())
1233}
1234
1235/// Auto-initialises the database (matches the contract of every other CRUD
1236/// handler) and returns a fresh read/write connection ready for the ingest
1237/// loop. Errors here are recoverable per-file: the caller surfaces them as
1238/// failure events so `--fail-fast` and the continue-on-error path keep
1239/// working when, for example, the user points `--db` at an unwritable path.
1240fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1241    ensure_db_ready(paths)?;
1242    let conn = open_rw(&paths.db)?;
1243    Ok(conn)
1244}
1245
1246pub(crate) fn collect_files(
1247    dir: &Path,
1248    pattern: &str,
1249    recursive: bool,
1250    out: &mut Vec<PathBuf>,
1251) -> Result<(), AppError> {
1252    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1253    for entry in entries {
1254        let entry = entry.map_err(AppError::Io)?;
1255        let path = entry.path();
1256        let file_type = entry.file_type().map_err(AppError::Io)?;
1257        if file_type.is_file() {
1258            let name = entry.file_name();
1259            let name_str = name.to_string_lossy();
1260            if matches_pattern(&name_str, pattern) {
1261                out.push(path);
1262            }
1263        } else if file_type.is_dir() && recursive {
1264            collect_files(&path, pattern, recursive, out)?;
1265        }
1266    }
1267    Ok(())
1268}
1269
1270fn matches_pattern(name: &str, pattern: &str) -> bool {
1271    if let Some(suffix) = pattern.strip_prefix('*') {
1272        name.ends_with(suffix)
1273    } else if let Some(prefix) = pattern.strip_suffix('*') {
1274        name.starts_with(prefix)
1275    } else {
1276        name == pattern
1277    }
1278}
1279
1280/// Returns `(final_name, truncated, original_name)`.
1281/// `truncated` is true when the derived name exceeded `max_len`.
1282/// `original_name` holds the pre-truncation name only when `truncated=true`.
1283///
1284/// Non-ASCII characters are first decomposed via NFD and then stripped of
1285/// combining marks so accented letters fold to their base ASCII letter
1286/// (e.g. `acai` from accented input, `naive` from diaeresis). Characters with no ASCII
1287/// fallback (emoji, CJK ideographs, symbols) are dropped silently. This
1288/// preserves meaningful word content rather than collapsing the basename
1289/// to a few stray ASCII letters as the previous filter did.
1290fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1291    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1292    let lowered: String = stem
1293        .nfd()
1294        .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1295        .map(|c| {
1296            if c == '_' || c.is_whitespace() {
1297                '-'
1298            } else {
1299                c
1300            }
1301        })
1302        .map(|c| c.to_ascii_lowercase())
1303        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1304        .collect();
1305    let collapsed = collapse_dashes(&lowered);
1306    let trimmed_raw = collapsed.trim_matches('-').to_string();
1307    // Prefix names that start with a digit to keep them valid kebab-case identifiers.
1308    let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1309        format!("doc-{trimmed_raw}")
1310    } else {
1311        trimmed_raw
1312    };
1313    if trimmed.len() > max_len {
1314        let truncated = trimmed[..max_len].trim_matches('-').to_string();
1315        tracing::debug!(
1316            target: "ingest",
1317            original = %trimmed,
1318            truncated_to = %truncated,
1319            max_len = max_len,
1320            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1321        );
1322        (truncated, true, Some(trimmed))
1323    } else {
1324        (trimmed, false, None)
1325    }
1326}
1327
1328/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
1329/// numeric suffix (`-1`, `-2`, …) when needed.
1330///
1331/// `taken` is the set of names already consumed in the current ingest run.
1332/// The caller is expected to insert the returned name into `taken` so the
1333/// next call observes the consumption. Cross-run collisions are intentionally
1334/// surfaced by the per-file persistence path as duplicates so re-ingestion
1335/// of identical corpora stays idempotent.
1336///
1337/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
1338/// candidates collide, signalling a pathological corpus that should be
1339/// renamed manually.
1340fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1341    if !taken.contains(base) {
1342        return Ok(base.to_string());
1343    }
1344    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1345        let candidate = format!("{base}-{suffix}");
1346        if !taken.contains(&candidate) {
1347            tracing::warn!(
1348                target: "ingest",
1349                base = %base,
1350                resolved = %candidate,
1351                suffix,
1352                "memory name collision resolved with numeric suffix"
1353            );
1354            return Ok(candidate);
1355        }
1356    }
1357    Err(AppError::Validation(format!(
1358        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1359    )))
1360}
1361
1362fn collapse_dashes(s: &str) -> String {
1363    let mut out = String::with_capacity(s.len());
1364    let mut prev_dash = false;
1365    for c in s.chars() {
1366        if c == '-' {
1367            if !prev_dash {
1368                out.push('-');
1369            }
1370            prev_dash = true;
1371        } else {
1372            out.push(c);
1373            prev_dash = false;
1374        }
1375    }
1376    out
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381    use super::*;
1382    use std::path::PathBuf;
1383
1384    #[test]
1385    fn matches_pattern_suffix() {
1386        assert!(matches_pattern("foo.md", "*.md"));
1387        assert!(!matches_pattern("foo.txt", "*.md"));
1388        assert!(matches_pattern("foo.md", "*"));
1389    }
1390
1391    #[test]
1392    fn matches_pattern_prefix() {
1393        assert!(matches_pattern("README.md", "README*"));
1394        assert!(!matches_pattern("CHANGELOG.md", "README*"));
1395    }
1396
1397    #[test]
1398    fn matches_pattern_exact() {
1399        assert!(matches_pattern("README.md", "README.md"));
1400        assert!(!matches_pattern("readme.md", "README.md"));
1401    }
1402
1403    #[test]
1404    fn derive_kebab_underscore_to_dash() {
1405        let p = PathBuf::from("/tmp/claude_code_headless.md");
1406        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1407        assert_eq!(name, "claude-code-headless");
1408        assert!(!truncated);
1409        assert!(original.is_none());
1410    }
1411
1412    #[test]
1413    fn derive_kebab_uppercase_lowered() {
1414        let p = PathBuf::from("/tmp/README.md");
1415        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1416        assert_eq!(name, "readme");
1417        assert!(!truncated);
1418        assert!(original.is_none());
1419    }
1420
1421    #[test]
1422    fn derive_kebab_strips_non_kebab_chars() {
1423        let p = PathBuf::from("/tmp/some@weird#name!.md");
1424        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1425        assert_eq!(name, "someweirdname");
1426        assert!(!truncated);
1427        assert!(original.is_none());
1428    }
1429
1430    // Bug M-A3: NFD-based unicode normalization preserves base letters of
1431    // accented characters instead of dropping them entirely.
1432    #[test]
1433    fn derive_kebab_folds_accented_letters_to_ascii() {
1434        let p = PathBuf::from("/tmp/açaí.md");
1435        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1436        assert_eq!(name, "acai", "got '{name}'");
1437    }
1438
1439    #[test]
1440    fn derive_kebab_handles_naive_with_diaeresis() {
1441        let p = PathBuf::from("/tmp/naïve-test.md");
1442        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1443        assert_eq!(name, "naive-test", "got '{name}'");
1444    }
1445
1446    #[test]
1447    fn derive_kebab_drops_emoji_keeps_word() {
1448        let p = PathBuf::from("/tmp/🚀-rocket.md");
1449        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1450        assert_eq!(name, "rocket", "got '{name}'");
1451    }
1452
1453    #[test]
1454    fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1455        let p = PathBuf::from("/tmp/açaí🦜.md");
1456        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1457        assert_eq!(name, "acai", "got '{name}'");
1458    }
1459
1460    #[test]
1461    fn derive_kebab_pure_emoji_yields_empty() {
1462        let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1463        let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1464        assert!(name.is_empty(), "got '{name}'");
1465    }
1466
1467    #[test]
1468    fn derive_kebab_collapses_consecutive_dashes() {
1469        let p = PathBuf::from("/tmp/a__b___c.md");
1470        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1471        assert_eq!(name, "a-b-c");
1472        assert!(!truncated);
1473        assert!(original.is_none());
1474    }
1475
1476    #[test]
1477    fn derive_kebab_truncates_to_60_chars() {
1478        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1479        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1480        assert!(name.len() <= 60, "got len {}", name.len());
1481        assert!(truncated);
1482        assert!(original.is_some());
1483        assert!(original.unwrap().len() > 60);
1484    }
1485
1486    #[test]
1487    fn collect_files_finds_md_files() {
1488        let tmp = tempfile::tempdir().expect("tempdir");
1489        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1490        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1491        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1492        let mut out = Vec::new();
1493        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1494        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1495    }
1496
1497    #[test]
1498    fn collect_files_recursive_descends_subdirs() {
1499        let tmp = tempfile::tempdir().expect("tempdir");
1500        let sub = tmp.path().join("sub");
1501        std::fs::create_dir(&sub).unwrap();
1502        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1503        std::fs::write(sub.join("b.md"), "y").unwrap();
1504        let mut out = Vec::new();
1505        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1506        assert_eq!(out.len(), 2);
1507    }
1508
1509    #[test]
1510    fn collect_files_non_recursive_skips_subdirs() {
1511        let tmp = tempfile::tempdir().expect("tempdir");
1512        let sub = tmp.path().join("sub");
1513        std::fs::create_dir(&sub).unwrap();
1514        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1515        std::fs::write(sub.join("b.md"), "y").unwrap();
1516        let mut out = Vec::new();
1517        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1518        assert_eq!(out.len(), 1);
1519    }
1520
1521    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1522
1523    #[test]
1524    fn derive_kebab_long_basename_truncated_within_cap() {
1525        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1526        let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1527        assert!(
1528            name.len() <= DERIVED_NAME_MAX_LEN,
1529            "truncated name must respect cap; got {} chars",
1530            name.len()
1531        );
1532        assert!(!name.is_empty());
1533        assert!(truncated);
1534        assert!(original.is_some());
1535    }
1536
1537    #[test]
1538    fn unique_name_returns_base_when_free() {
1539        let taken: BTreeSet<String> = BTreeSet::new();
1540        let resolved = unique_name("note", &taken).expect("must resolve");
1541        assert_eq!(resolved, "note");
1542    }
1543
1544    #[test]
1545    fn unique_name_appends_first_free_suffix_on_collision() {
1546        let mut taken: BTreeSet<String> = BTreeSet::new();
1547        taken.insert("note".to_string());
1548        taken.insert("note-1".to_string());
1549        let resolved = unique_name("note", &taken).expect("must resolve");
1550        assert_eq!(resolved, "note-2");
1551    }
1552
1553    #[test]
1554    fn unique_name_errors_after_collision_cap() {
1555        let mut taken: BTreeSet<String> = BTreeSet::new();
1556        taken.insert("note".to_string());
1557        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1558            taken.insert(format!("note-{i}"));
1559        }
1560        let err = unique_name("note", &taken).expect_err("must surface error");
1561        assert!(matches!(err, AppError::Validation(_)));
1562    }
1563
1564    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1565
1566    #[test]
1567    fn validate_relation_format_accepts_valid_relations() {
1568        use crate::parsers::{is_canonical_relation, validate_relation_format};
1569        assert!(validate_relation_format("applies_to").is_ok());
1570        assert!(validate_relation_format("depends_on").is_ok());
1571        assert!(validate_relation_format("implements").is_ok());
1572        assert!(validate_relation_format("").is_err());
1573        assert!(is_canonical_relation("applies_to"));
1574        assert!(!is_canonical_relation("implements"));
1575    }
1576
1577    // ── v1.0.40 H-A1: --low-memory flag and SQLITE_GRAPHRAG_LOW_MEMORY env var ──
1578
1579    use serial_test::serial;
1580
1581    /// Helper: scrubs the env var around a closure to keep tests deterministic.
1582    fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1583        let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1584        let prev = std::env::var(key).ok();
1585        match value {
1586            Some(v) => std::env::set_var(key, v),
1587            None => std::env::remove_var(key),
1588        }
1589        f();
1590        match prev {
1591            Some(p) => std::env::set_var(key, p),
1592            None => std::env::remove_var(key),
1593        }
1594    }
1595
1596    #[test]
1597    #[serial]
1598    fn env_low_memory_enabled_unset_returns_false() {
1599        with_env_var(None, || assert!(!env_low_memory_enabled()));
1600    }
1601
1602    #[test]
1603    #[serial]
1604    fn env_low_memory_enabled_empty_returns_false() {
1605        with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1606    }
1607
1608    #[test]
1609    #[serial]
1610    fn env_low_memory_enabled_truthy_values_return_true() {
1611        for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1612            with_env_var(Some(v), || {
1613                assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1614            });
1615        }
1616    }
1617
1618    #[test]
1619    #[serial]
1620    fn env_low_memory_enabled_falsy_values_return_false() {
1621        for v in ["0", "false", "FALSE", "no", "off"] {
1622            with_env_var(Some(v), || {
1623                assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1624            });
1625        }
1626    }
1627
1628    #[test]
1629    #[serial]
1630    fn env_low_memory_enabled_unrecognized_value_returns_false() {
1631        with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1632    }
1633
1634    #[test]
1635    #[serial]
1636    fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1637        with_env_var(None, || {
1638            assert_eq!(resolve_parallelism(true, Some(4)), 1);
1639            assert_eq!(resolve_parallelism(true, Some(8)), 1);
1640            assert_eq!(resolve_parallelism(true, None), 1);
1641        });
1642    }
1643
1644    #[test]
1645    #[serial]
1646    fn resolve_parallelism_env_forces_one_when_flag_off() {
1647        with_env_var(Some("1"), || {
1648            assert_eq!(resolve_parallelism(false, Some(4)), 1);
1649            assert_eq!(resolve_parallelism(false, None), 1);
1650        });
1651    }
1652
1653    #[test]
1654    #[serial]
1655    fn resolve_parallelism_falsy_env_does_not_override() {
1656        with_env_var(Some("0"), || {
1657            assert_eq!(resolve_parallelism(false, Some(4)), 4);
1658        });
1659    }
1660
1661    #[test]
1662    #[serial]
1663    fn resolve_parallelism_explicit_value_when_low_memory_off() {
1664        with_env_var(None, || {
1665            assert_eq!(resolve_parallelism(false, Some(3)), 3);
1666            assert_eq!(resolve_parallelism(false, Some(1)), 1);
1667        });
1668    }
1669
1670    #[test]
1671    #[serial]
1672    fn resolve_parallelism_default_when_unset() {
1673        with_env_var(None, || {
1674            let p = resolve_parallelism(false, None);
1675            assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1676        });
1677    }
1678
1679    #[test]
1680    fn ingest_args_parses_low_memory_flag_via_clap() {
1681        use clap::Parser;
1682        // Parse a synthetic Cli that contains the `ingest` subcommand. We rely
1683        // on the public `Cli` definition so the flag is wired end-to-end.
1684        let cli = crate::cli::Cli::try_parse_from([
1685            "sqlite-graphrag",
1686            "ingest",
1687            "/tmp/dummy",
1688            "--type",
1689            "document",
1690            "--low-memory",
1691        ])
1692        .expect("parse must succeed");
1693        match cli.command {
1694            crate::cli::Commands::Ingest(args) => {
1695                assert!(args.low_memory, "--low-memory must set field to true");
1696            }
1697            _ => panic!("expected Ingest subcommand"),
1698        }
1699    }
1700
1701    #[test]
1702    fn ingest_args_low_memory_defaults_false() {
1703        use clap::Parser;
1704        let cli = crate::cli::Cli::try_parse_from([
1705            "sqlite-graphrag",
1706            "ingest",
1707            "/tmp/dummy",
1708            "--type",
1709            "document",
1710        ])
1711        .expect("parse must succeed");
1712        match cli.command {
1713            crate::cli::Commands::Ingest(args) => {
1714                assert!(!args.low_memory, "default must be false");
1715            }
1716            _ => panic!("expected Ingest subcommand"),
1717        }
1718    }
1719}