Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16//!
17//! ## Two-phase pipeline (v1.0.39)
18//!
19//! Phase A runs on a rayon thread pool (size = `--ingest-parallelism`):
20//! read + chunk + embed + NER per file, results stored in a pre-sized
21//! `Vec<Mutex<Option<Result<StagedFile>>>>` indexed by submission order.
22//!
23//! Phase B runs on the main thread sequentially by index: pulls each
24//! `StagedFile` and writes to SQLite. `Connection` is not `Sync` so it
25//! never crosses thread boundaries. NDJSON output order equals input order.
26
27use crate::chunking;
28use crate::cli::MemoryType;
29use crate::errors::AppError;
30use crate::i18n::errors_msg;
31use crate::output::{self, JsonOutputFormat};
32use crate::paths::AppPaths;
33use crate::storage::chunks as storage_chunks;
34use crate::storage::connection::{ensure_db_ready, open_rw};
35use crate::storage::entities::{NewEntity, NewRelationship};
36use crate::storage::memories::NewMemory;
37use crate::storage::{entities, memories, urls as storage_urls, versions};
38use rayon::prelude::*;
39use rusqlite::Connection;
40use serde::Serialize;
41use std::collections::BTreeSet;
42use std::path::{Path, PathBuf};
43use std::sync::Mutex;
44
45/// Maximum length of a derived kebab-case name. Longer basenames are truncated
46/// (with a `tracing::warn!`) to keep the `memories.name` column bounded.
47const DERIVED_NAME_MAX_LEN: usize = 60;
48
49/// Hard cap on the numeric suffix appended for collision resolution. If 1000
50/// candidates collide we surface an error rather than loop forever.
51const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
52
53#[derive(clap::Args)]
54#[command(after_long_help = "EXAMPLES:\n  \
55    # Ingest every Markdown file under ./docs as `document` memories\n  \
56    sqlite-graphrag ingest ./docs --type document\n\n  \
57    # Ingest .txt files recursively under ./notes\n  \
58    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
59    # Skip BERT NER auto-extraction for faster bulk import\n  \
60    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
61NOTES:\n  \
62    Each file becomes a separate memory. Names derive from file basenames\n  \
63    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
64    followed by a final summary line with counts. Per-file errors are reported\n  \
65    inline and processing continues unless --fail-fast is set.")]
66pub struct IngestArgs {
67    /// Directory containing files to ingest.
68    #[arg(
69        value_name = "DIR",
70        help = "Directory to ingest recursively (each matching file becomes a memory)"
71    )]
72    pub dir: PathBuf,
73
74    /// Memory type stored in `memories.type` for every ingested file.
75    #[arg(long, value_enum)]
76    pub r#type: MemoryType,
77
78    /// Glob pattern matched against file basenames (default: `*.md`). Supports
79    /// `*.<ext>`, `<prefix>*`, and exact filename match.
80    #[arg(long, default_value = "*.md")]
81    pub pattern: String,
82
83    /// Recurse into subdirectories.
84    #[arg(long, default_value_t = false)]
85    pub recursive: bool,
86
87    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
88    #[arg(long, default_value_t = false)]
89    pub skip_extraction: bool,
90
91    /// Stop on first per-file error instead of continuing with the next file.
92    #[arg(long, default_value_t = false)]
93    pub fail_fast: bool,
94
95    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
96    #[arg(long, default_value_t = 10_000)]
97    pub max_files: usize,
98
99    /// Namespace for the ingested memories.
100    #[arg(long)]
101    pub namespace: Option<String>,
102
103    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
104    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
105    pub db: Option<String>,
106
107    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
108    pub format: JsonOutputFormat,
109
110    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
111    pub json: bool,
112
113    /// Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4).
114    #[arg(
115        long,
116        help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
117    )]
118    pub ingest_parallelism: Option<usize>,
119}
120
121#[derive(Serialize)]
122struct IngestFileEvent<'a> {
123    file: &'a str,
124    name: &'a str,
125    status: &'a str,
126    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
127    truncated: bool,
128    /// Original derived name before truncation; only present when `truncated=true`.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    original_name: Option<String>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    error: Option<String>,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    memory_id: Option<i64>,
135    #[serde(skip_serializing_if = "Option::is_none")]
136    action: Option<String>,
137}
138
139#[derive(Serialize)]
140struct IngestSummary {
141    summary: bool,
142    dir: String,
143    pattern: String,
144    recursive: bool,
145    files_total: usize,
146    files_succeeded: usize,
147    files_failed: usize,
148    files_skipped: usize,
149    elapsed_ms: u64,
150}
151
152/// Outcome of a successful per-file ingest, used to build the NDJSON event.
153struct FileSuccess {
154    memory_id: i64,
155    action: String,
156}
157
158/// All artefacts pre-computed by Phase A (CPU-bound, runs on rayon thread pool).
159/// Phase B persists these to SQLite on the main thread in submission order.
160struct StagedFile {
161    body: String,
162    body_hash: String,
163    snippet: String,
164    name: String,
165    description: String,
166    embedding: Vec<f32>,
167    chunk_embeddings: Option<Vec<Vec<f32>>>,
168    chunks_info: Vec<crate::chunking::Chunk>,
169    entities: Vec<NewEntity>,
170    relationships: Vec<NewRelationship>,
171    entity_embeddings: Vec<Vec<f32>>,
172    urls: Vec<crate::extraction::ExtractedUrl>,
173}
174
175/// Phase A worker: reads, chunks, embeds and extracts NER for one file.
176/// Never touches the database — safe to run on any rayon thread.
177fn stage_file(
178    _idx: usize,
179    path: &Path,
180    name: &str,
181    paths: &AppPaths,
182    skip_extraction: bool,
183) -> Result<StagedFile, AppError> {
184    use crate::constants::*;
185
186    if name.len() > MAX_MEMORY_NAME_LEN {
187        return Err(AppError::LimitExceeded(
188            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
189        ));
190    }
191    if name.starts_with("__") {
192        return Err(AppError::Validation(
193            crate::i18n::validation::reserved_name(),
194        ));
195    }
196    {
197        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
198            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
199        if !slug_re.is_match(name) {
200            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
201                name,
202            )));
203        }
204    }
205
206    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
207    if raw_body.len() > MAX_MEMORY_BODY_LEN {
208        return Err(AppError::LimitExceeded(
209            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
210        ));
211    }
212    if raw_body.trim().is_empty() {
213        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
214    }
215
216    let description = format!("ingested from {}", path.display());
217    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
218        return Err(AppError::Validation(
219            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
220        ));
221    }
222
223    let mut extracted_entities: Vec<NewEntity> = Vec::new();
224    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
225    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
226    if !skip_extraction {
227        match crate::extraction::extract_graph_auto(&raw_body, paths) {
228            Ok(extracted) => {
229                extracted_urls = extracted.urls;
230                extracted_entities = extracted.entities;
231                extracted_relationships = extracted.relationships;
232
233                if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
234                    extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
235                }
236                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
237                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
238                }
239            }
240            Err(e) => {
241                tracing::warn!(
242                    file = %path.display(),
243                    "auto-extraction failed (graceful degradation): {e:#}"
244                );
245            }
246        }
247    }
248
249    for entity in &extracted_entities {
250        if !is_valid_entity_type(&entity.entity_type) {
251            return Err(AppError::Validation(format!(
252                "invalid entity_type '{}' for entity '{}'",
253                entity.entity_type, entity.name
254            )));
255        }
256    }
257    for rel in &mut extracted_relationships {
258        rel.relation = rel.relation.replace('-', "_");
259        if !is_valid_relation(&rel.relation) {
260            return Err(AppError::Validation(format!(
261                "invalid relation '{}' for relationship '{}' -> '{}'",
262                rel.relation, rel.source, rel.target
263            )));
264        }
265        if !(0.0..=1.0).contains(&rel.strength) {
266            return Err(AppError::Validation(format!(
267                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
268                rel.strength, rel.source, rel.target
269            )));
270        }
271    }
272
273    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
274    let snippet: String = raw_body.chars().take(200).collect();
275
276    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
277    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
278    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
279        return Err(AppError::LimitExceeded(format!(
280            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
281            chunks_info.len(),
282            REMEMBER_MAX_SAFE_MULTI_CHUNKS
283        )));
284    }
285
286    let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
287    let embedding = if chunks_info.len() == 1 {
288        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
289    } else {
290        let chunk_texts: Vec<&str> = chunks_info
291            .iter()
292            .map(|c| chunking::chunk_text(&raw_body, c))
293            .collect();
294        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
295        for chunk_text in &chunk_texts {
296            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
297                &paths.models,
298                chunk_text,
299            )?);
300        }
301        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
302        chunk_embeddings_opt = Some(chunk_embeddings);
303        aggregated
304    };
305
306    let entity_embeddings = extracted_entities
307        .iter()
308        .map(|entity| {
309            let entity_text = match &entity.description {
310                Some(desc) => format!("{} {}", entity.name, desc),
311                None => entity.name.clone(),
312            };
313            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
314        })
315        .collect::<Result<Vec<_>, _>>()?;
316
317    Ok(StagedFile {
318        body: raw_body,
319        body_hash,
320        snippet,
321        name: name.to_string(),
322        description,
323        embedding,
324        chunk_embeddings: chunk_embeddings_opt,
325        chunks_info,
326        entities: extracted_entities,
327        relationships: extracted_relationships,
328        entity_embeddings,
329        urls: extracted_urls,
330    })
331}
332
333/// Phase B: persists one `StagedFile` to the database on the main thread.
334fn persist_staged(
335    conn: &mut Connection,
336    namespace: &str,
337    memory_type: &str,
338    staged: StagedFile,
339) -> Result<FileSuccess, AppError> {
340    {
341        let active_count: u32 = conn.query_row(
342            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
343            [],
344            |r| r.get::<_, i64>(0).map(|v| v as u32),
345        )?;
346        let ns_exists: bool = conn.query_row(
347            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
348            rusqlite::params![namespace],
349            |r| r.get::<_, i64>(0).map(|v| v > 0),
350        )?;
351        if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
352            return Err(AppError::NamespaceError(format!(
353                "active namespace limit of {} exceeded while creating '{namespace}'",
354                crate::constants::MAX_NAMESPACES_ACTIVE
355            )));
356        }
357    }
358
359    let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
360    if existing_memory.is_some() {
361        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
362            &staged.name,
363            namespace,
364        )));
365    }
366    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
367
368    let new_memory = NewMemory {
369        namespace: namespace.to_string(),
370        name: staged.name.clone(),
371        memory_type: memory_type.to_string(),
372        description: staged.description.clone(),
373        body: staged.body,
374        body_hash: staged.body_hash,
375        session_id: None,
376        source: "agent".to_string(),
377        metadata: serde_json::json!({}),
378    };
379
380    if let Some(hash_id) = duplicate_hash_id {
381        tracing::debug!(
382            target: "ingest",
383            duplicate_memory_id = hash_id,
384            "identical body already exists; persisting a new memory anyway"
385        );
386    }
387
388    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
389
390    let memory_id = memories::insert(&tx, &new_memory)?;
391    versions::insert_version(
392        &tx,
393        memory_id,
394        1,
395        &staged.name,
396        memory_type,
397        &staged.description,
398        &new_memory.body,
399        &serde_json::to_string(&new_memory.metadata)?,
400        None,
401        "create",
402    )?;
403    memories::upsert_vec(
404        &tx,
405        memory_id,
406        namespace,
407        memory_type,
408        &staged.embedding,
409        &staged.name,
410        &staged.snippet,
411    )?;
412
413    if staged.chunks_info.len() > 1 {
414        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
415        let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
416            AppError::Internal(anyhow::anyhow!(
417                "missing chunk embeddings cache on multi-chunk ingest path"
418            ))
419        })?;
420        for (i, emb) in chunk_embeddings.iter().enumerate() {
421            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
422        }
423    }
424
425    if !staged.entities.is_empty() || !staged.relationships.is_empty() {
426        for (idx, entity) in staged.entities.iter().enumerate() {
427            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
428            let entity_embedding = &staged.entity_embeddings[idx];
429            entities::upsert_entity_vec(
430                &tx,
431                entity_id,
432                namespace,
433                &entity.entity_type,
434                entity_embedding,
435                &entity.name,
436            )?;
437            entities::link_memory_entity(&tx, memory_id, entity_id)?;
438            entities::increment_degree(&tx, entity_id)?;
439        }
440        let entity_types: std::collections::HashMap<&str, &str> = staged
441            .entities
442            .iter()
443            .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
444            .collect();
445        for rel in &staged.relationships {
446            let source_entity = NewEntity {
447                name: rel.source.clone(),
448                entity_type: entity_types
449                    .get(rel.source.as_str())
450                    .copied()
451                    .unwrap_or("concept")
452                    .to_string(),
453                description: None,
454            };
455            let target_entity = NewEntity {
456                name: rel.target.clone(),
457                entity_type: entity_types
458                    .get(rel.target.as_str())
459                    .copied()
460                    .unwrap_or("concept")
461                    .to_string(),
462                description: None,
463            };
464            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
465            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
466            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
467            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
468        }
469    }
470
471    tx.commit()?;
472
473    if !staged.urls.is_empty() {
474        let url_entries: Vec<storage_urls::MemoryUrl> = staged
475            .urls
476            .into_iter()
477            .map(|u| storage_urls::MemoryUrl {
478                url: u.url,
479                offset: Some(u.offset as i64),
480            })
481            .collect();
482        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
483    }
484
485    Ok(FileSuccess {
486        memory_id,
487        action: "created".to_string(),
488    })
489}
490
491pub fn run(args: IngestArgs) -> Result<(), AppError> {
492    let started = std::time::Instant::now();
493
494    if !args.dir.exists() {
495        return Err(AppError::NotFound(format!(
496            "directory not found: {}",
497            args.dir.display()
498        )));
499    }
500    if !args.dir.is_dir() {
501        return Err(AppError::Validation(format!(
502            "path is not a directory: {}",
503            args.dir.display()
504        )));
505    }
506
507    let mut files: Vec<PathBuf> = Vec::new();
508    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
509    files.sort();
510
511    if files.len() > args.max_files {
512        return Err(AppError::Validation(format!(
513            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
514            files.len(),
515            args.max_files
516        )));
517    }
518
519    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
520    let memory_type_str = args.r#type.as_str().to_string();
521
522    let paths = AppPaths::resolve(args.db.as_deref())?;
523    let mut conn_or_err = match init_storage(&paths) {
524        Ok(c) => Ok(c),
525        Err(e) => Err(format!("{e}")),
526    };
527
528    let mut succeeded: usize = 0;
529    let mut failed: usize = 0;
530    let mut skipped: usize = 0;
531    let total = files.len();
532
533    // Pre-resolve all names before parallelisation so Phase A workers see a
534    // consistent, immutable name assignment (v1.0.31 A10 contract preserved).
535    let mut taken_names: BTreeSet<String> = BTreeSet::new();
536
537    // Each entry: (path, file_str, derived_name, name_truncated, original_name)
538    // or None when the file should be skipped immediately.
539    struct FileSlot {
540        path: PathBuf,
541        file_str: String,
542        derived_name: String,
543        name_truncated: bool,
544        original_name: Option<String>,
545    }
546    enum Slot {
547        Skip {
548            file_str: String,
549            derived_base: String,
550            name_truncated: bool,
551            original_name: Option<String>,
552            reason: String,
553        },
554        Process(FileSlot),
555    }
556
557    let slots: Vec<Slot> = files
558        .iter()
559        .map(|path| {
560            let file_str = path.to_string_lossy().into_owned();
561            let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
562
563            if derived_base.is_empty() {
564                return Slot::Skip {
565                    file_str,
566                    derived_base: String::new(),
567                    name_truncated: false,
568                    original_name: None,
569                    reason: "could not derive a non-empty kebab-case name from filename"
570                        .to_string(),
571                };
572            }
573
574            match unique_name(&derived_base, &taken_names) {
575                Ok(derived_name) => {
576                    taken_names.insert(derived_name.clone());
577                    Slot::Process(FileSlot {
578                        path: path.clone(),
579                        file_str,
580                        derived_name,
581                        name_truncated,
582                        original_name,
583                    })
584                }
585                Err(e) => Slot::Skip {
586                    file_str,
587                    derived_base,
588                    name_truncated,
589                    original_name,
590                    reason: e.to_string(),
591                },
592            }
593        })
594        .collect();
595
596    // Determine rayon thread pool size.
597    let parallelism = args
598        .ingest_parallelism
599        .unwrap_or_else(|| {
600            std::thread::available_parallelism()
601                .map(|v| v.get() / 2)
602                .unwrap_or(1)
603                .clamp(1, 4)
604        })
605        .max(1);
606
607    let pool = rayon::ThreadPoolBuilder::new()
608        .num_threads(parallelism)
609        .build()
610        .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
611
612    // Phase A: parallel compute. Indexed slot matches `slots` index for ordering.
613    let staged: Vec<Mutex<Option<Result<StagedFile, AppError>>>> =
614        (0..slots.len()).map(|_| Mutex::new(None)).collect();
615
616    let skip_extraction = args.skip_extraction;
617    let paths_ref = &paths;
618
619    pool.install(|| {
620        slots.par_iter().enumerate().for_each(|(idx, slot)| {
621            if let Slot::Process(fs) = slot {
622                let result =
623                    stage_file(idx, &fs.path, &fs.derived_name, paths_ref, skip_extraction);
624                // SAFETY: staged[idx] is only written once by this worker.
625                *staged[idx].lock().expect("staged slot poisoned") = Some(result);
626            }
627        });
628    });
629
630    // Phase B: sequential persist on main thread (Connection is !Sync).
631    let fail_fast = args.fail_fast;
632    for (idx, slot) in slots.iter().enumerate() {
633        match slot {
634            Slot::Skip {
635                file_str,
636                derived_base,
637                name_truncated,
638                original_name,
639                reason,
640            } => {
641                output::emit_json_compact(&IngestFileEvent {
642                    file: file_str,
643                    name: derived_base,
644                    status: "skipped",
645                    truncated: *name_truncated,
646                    original_name: original_name.clone(),
647                    error: Some(reason.clone()),
648                    memory_id: None,
649                    action: None,
650                })?;
651                skipped += 1;
652            }
653            Slot::Process(fs) => {
654                // If storage init failed, every file fails with the same error.
655                let conn = match conn_or_err.as_mut() {
656                    Ok(c) => c,
657                    Err(err_msg) => {
658                        let err_clone = err_msg.clone();
659                        output::emit_json_compact(&IngestFileEvent {
660                            file: &fs.file_str,
661                            name: &fs.derived_name,
662                            status: "failed",
663                            truncated: fs.name_truncated,
664                            original_name: fs.original_name.clone(),
665                            error: Some(err_clone.clone()),
666                            memory_id: None,
667                            action: None,
668                        })?;
669                        failed += 1;
670                        if fail_fast {
671                            output::emit_json_compact(&IngestSummary {
672                                summary: true,
673                                dir: args.dir.display().to_string(),
674                                pattern: args.pattern.clone(),
675                                recursive: args.recursive,
676                                files_total: total,
677                                files_succeeded: succeeded,
678                                files_failed: failed,
679                                files_skipped: skipped,
680                                elapsed_ms: started.elapsed().as_millis() as u64,
681                            })?;
682                            return Err(AppError::Validation(format!(
683                                "ingest aborted on first failure: {err_clone}"
684                            )));
685                        }
686                        continue;
687                    }
688                };
689
690                // Take the Phase A result (always Some for Process slots).
691                let stage_result = staged[idx]
692                    .lock()
693                    .expect("staged slot poisoned")
694                    .take()
695                    .expect("staged slot empty for Process slot");
696
697                let outcome = stage_result
698                    .and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
699
700                match outcome {
701                    Ok(FileSuccess { memory_id, action }) => {
702                        output::emit_json_compact(&IngestFileEvent {
703                            file: &fs.file_str,
704                            name: &fs.derived_name,
705                            status: "indexed",
706                            truncated: fs.name_truncated,
707                            original_name: fs.original_name.clone(),
708                            error: None,
709                            memory_id: Some(memory_id),
710                            action: Some(action),
711                        })?;
712                        succeeded += 1;
713                    }
714                    Err(e) => {
715                        let err_msg = format!("{e}");
716                        output::emit_json_compact(&IngestFileEvent {
717                            file: &fs.file_str,
718                            name: &fs.derived_name,
719                            status: "failed",
720                            truncated: fs.name_truncated,
721                            original_name: fs.original_name.clone(),
722                            error: Some(err_msg.clone()),
723                            memory_id: None,
724                            action: None,
725                        })?;
726                        failed += 1;
727                        if fail_fast {
728                            output::emit_json_compact(&IngestSummary {
729                                summary: true,
730                                dir: args.dir.display().to_string(),
731                                pattern: args.pattern.clone(),
732                                recursive: args.recursive,
733                                files_total: total,
734                                files_succeeded: succeeded,
735                                files_failed: failed,
736                                files_skipped: skipped,
737                                elapsed_ms: started.elapsed().as_millis() as u64,
738                            })?;
739                            return Err(AppError::Validation(format!(
740                                "ingest aborted on first failure: {err_msg}"
741                            )));
742                        }
743                    }
744                }
745            }
746        }
747    }
748
749    output::emit_json_compact(&IngestSummary {
750        summary: true,
751        dir: args.dir.display().to_string(),
752        pattern: args.pattern.clone(),
753        recursive: args.recursive,
754        files_total: total,
755        files_succeeded: succeeded,
756        files_failed: failed,
757        files_skipped: skipped,
758        elapsed_ms: started.elapsed().as_millis() as u64,
759    })?;
760
761    Ok(())
762}
763
764/// Auto-initialises the database (matches the contract of every other CRUD
765/// handler) and returns a fresh read/write connection ready for the ingest
766/// loop. Errors here are recoverable per-file: the caller surfaces them as
767/// failure events so `--fail-fast` and the continue-on-error path keep
768/// working when, for example, the user points `--db` at an unwritable path.
769fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
770    ensure_db_ready(paths)?;
771    let conn = open_rw(&paths.db)?;
772    Ok(conn)
773}
774
775fn is_valid_entity_type(entity_type: &str) -> bool {
776    matches!(
777        entity_type,
778        "project"
779            | "tool"
780            | "person"
781            | "file"
782            | "concept"
783            | "incident"
784            | "decision"
785            | "memory"
786            | "dashboard"
787            | "issue_tracker"
788            | "organization"
789            | "location"
790            | "date"
791    )
792}
793
794fn is_valid_relation(relation: &str) -> bool {
795    matches!(
796        relation,
797        "applies_to"
798            | "uses"
799            | "depends_on"
800            | "causes"
801            | "fixes"
802            | "contradicts"
803            | "supports"
804            | "follows"
805            | "related"
806            | "mentions"
807            | "replaces"
808            | "tracked_in"
809    )
810}
811
812fn collect_files(
813    dir: &Path,
814    pattern: &str,
815    recursive: bool,
816    out: &mut Vec<PathBuf>,
817) -> Result<(), AppError> {
818    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
819    for entry in entries {
820        let entry = entry.map_err(AppError::Io)?;
821        let path = entry.path();
822        let file_type = entry.file_type().map_err(AppError::Io)?;
823        if file_type.is_file() {
824            let name = entry.file_name();
825            let name_str = name.to_string_lossy();
826            if matches_pattern(&name_str, pattern) {
827                out.push(path);
828            }
829        } else if file_type.is_dir() && recursive {
830            collect_files(&path, pattern, recursive, out)?;
831        }
832    }
833    Ok(())
834}
835
836fn matches_pattern(name: &str, pattern: &str) -> bool {
837    if let Some(suffix) = pattern.strip_prefix('*') {
838        name.ends_with(suffix)
839    } else if let Some(prefix) = pattern.strip_suffix('*') {
840        name.starts_with(prefix)
841    } else {
842        name == pattern
843    }
844}
845
846/// Returns `(final_name, truncated, original_name)`.
847/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
848/// `original_name` holds the pre-truncation name only when `truncated=true`.
849fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
850    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
851    let lowered: String = stem
852        .chars()
853        .map(|c| {
854            if c == '_' || c.is_whitespace() {
855                '-'
856            } else {
857                c
858            }
859        })
860        .map(|c| c.to_ascii_lowercase())
861        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
862        .collect();
863    let collapsed = collapse_dashes(&lowered);
864    let trimmed = collapsed.trim_matches('-').to_string();
865    if trimmed.len() > DERIVED_NAME_MAX_LEN {
866        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
867            .trim_matches('-')
868            .to_string();
869        // v1.0.31 A10: surface the truncation so users can fix overly long file
870        // basenames before they collide with siblings sharing the same prefix.
871        tracing::warn!(
872            target: "ingest",
873            original = %trimmed,
874            truncated_to = %truncated,
875            max_len = DERIVED_NAME_MAX_LEN,
876            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
877        );
878        (truncated, true, Some(trimmed))
879    } else {
880        (trimmed, false, None)
881    }
882}
883
884/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
885/// numeric suffix (`-1`, `-2`, …) when needed.
886///
887/// `taken` is the set of names already consumed in the current ingest run.
888/// The caller is expected to insert the returned name into `taken` so the
889/// next call observes the consumption. Cross-run collisions are intentionally
890/// surfaced by the per-file persistence path as duplicates so re-ingestion
891/// of identical corpora stays idempotent.
892///
893/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
894/// candidates collide, signalling a pathological corpus that should be
895/// renamed manually.
896fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
897    if !taken.contains(base) {
898        return Ok(base.to_string());
899    }
900    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
901        let candidate = format!("{base}-{suffix}");
902        if !taken.contains(&candidate) {
903            tracing::warn!(
904                target: "ingest",
905                base = %base,
906                resolved = %candidate,
907                suffix,
908                "memory name collision resolved with numeric suffix"
909            );
910            return Ok(candidate);
911        }
912    }
913    Err(AppError::Validation(format!(
914        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
915    )))
916}
917
918fn collapse_dashes(s: &str) -> String {
919    let mut out = String::with_capacity(s.len());
920    let mut prev_dash = false;
921    for c in s.chars() {
922        if c == '-' {
923            if !prev_dash {
924                out.push('-');
925            }
926            prev_dash = true;
927        } else {
928            out.push(c);
929            prev_dash = false;
930        }
931    }
932    out
933}
934
935#[cfg(test)]
936mod tests {
937    use super::*;
938    use std::path::PathBuf;
939
940    #[test]
941    fn matches_pattern_suffix() {
942        assert!(matches_pattern("foo.md", "*.md"));
943        assert!(!matches_pattern("foo.txt", "*.md"));
944        assert!(matches_pattern("foo.md", "*"));
945    }
946
947    #[test]
948    fn matches_pattern_prefix() {
949        assert!(matches_pattern("README.md", "README*"));
950        assert!(!matches_pattern("CHANGELOG.md", "README*"));
951    }
952
953    #[test]
954    fn matches_pattern_exact() {
955        assert!(matches_pattern("README.md", "README.md"));
956        assert!(!matches_pattern("readme.md", "README.md"));
957    }
958
959    #[test]
960    fn derive_kebab_underscore_to_dash() {
961        let p = PathBuf::from("/tmp/claude_code_headless.md");
962        let (name, truncated, original) = derive_kebab_name(&p);
963        assert_eq!(name, "claude-code-headless");
964        assert!(!truncated);
965        assert!(original.is_none());
966    }
967
968    #[test]
969    fn derive_kebab_uppercase_lowered() {
970        let p = PathBuf::from("/tmp/README.md");
971        let (name, truncated, original) = derive_kebab_name(&p);
972        assert_eq!(name, "readme");
973        assert!(!truncated);
974        assert!(original.is_none());
975    }
976
977    #[test]
978    fn derive_kebab_strips_non_kebab_chars() {
979        let p = PathBuf::from("/tmp/some@weird#name!.md");
980        let (name, truncated, original) = derive_kebab_name(&p);
981        assert_eq!(name, "someweirdname");
982        assert!(!truncated);
983        assert!(original.is_none());
984    }
985
986    #[test]
987    fn derive_kebab_collapses_consecutive_dashes() {
988        let p = PathBuf::from("/tmp/a__b___c.md");
989        let (name, truncated, original) = derive_kebab_name(&p);
990        assert_eq!(name, "a-b-c");
991        assert!(!truncated);
992        assert!(original.is_none());
993    }
994
995    #[test]
996    fn derive_kebab_truncates_to_60_chars() {
997        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
998        let (name, truncated, original) = derive_kebab_name(&p);
999        assert!(name.len() <= 60, "got len {}", name.len());
1000        assert!(truncated);
1001        assert!(original.is_some());
1002        assert!(original.unwrap().len() > 60);
1003    }
1004
1005    #[test]
1006    fn collect_files_finds_md_files() {
1007        let tmp = tempfile::tempdir().expect("tempdir");
1008        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1009        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1010        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1011        let mut out = Vec::new();
1012        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1013        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1014    }
1015
1016    #[test]
1017    fn collect_files_recursive_descends_subdirs() {
1018        let tmp = tempfile::tempdir().expect("tempdir");
1019        let sub = tmp.path().join("sub");
1020        std::fs::create_dir(&sub).unwrap();
1021        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1022        std::fs::write(sub.join("b.md"), "y").unwrap();
1023        let mut out = Vec::new();
1024        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1025        assert_eq!(out.len(), 2);
1026    }
1027
1028    #[test]
1029    fn collect_files_non_recursive_skips_subdirs() {
1030        let tmp = tempfile::tempdir().expect("tempdir");
1031        let sub = tmp.path().join("sub");
1032        std::fs::create_dir(&sub).unwrap();
1033        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1034        std::fs::write(sub.join("b.md"), "y").unwrap();
1035        let mut out = Vec::new();
1036        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1037        assert_eq!(out.len(), 1);
1038    }
1039
1040    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
1041
1042    #[test]
1043    fn derive_kebab_long_basename_truncated_within_cap() {
1044        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1045        let (name, truncated, original) = derive_kebab_name(&p);
1046        assert!(
1047            name.len() <= DERIVED_NAME_MAX_LEN,
1048            "truncated name must respect cap; got {} chars",
1049            name.len()
1050        );
1051        assert!(!name.is_empty());
1052        assert!(truncated);
1053        assert!(original.is_some());
1054    }
1055
1056    #[test]
1057    fn unique_name_returns_base_when_free() {
1058        let taken: BTreeSet<String> = BTreeSet::new();
1059        let resolved = unique_name("note", &taken).expect("must resolve");
1060        assert_eq!(resolved, "note");
1061    }
1062
1063    #[test]
1064    fn unique_name_appends_first_free_suffix_on_collision() {
1065        let mut taken: BTreeSet<String> = BTreeSet::new();
1066        taken.insert("note".to_string());
1067        taken.insert("note-1".to_string());
1068        let resolved = unique_name("note", &taken).expect("must resolve");
1069        assert_eq!(resolved, "note-2");
1070    }
1071
1072    #[test]
1073    fn unique_name_errors_after_collision_cap() {
1074        let mut taken: BTreeSet<String> = BTreeSet::new();
1075        taken.insert("note".to_string());
1076        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1077            taken.insert(format!("note-{i}"));
1078        }
1079        let err = unique_name("note", &taken).expect_err("must surface error");
1080        assert!(matches!(err, AppError::Validation(_)));
1081    }
1082
1083    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
1084
1085    #[test]
1086    fn is_valid_entity_type_accepts_v008_types() {
1087        assert!(is_valid_entity_type("organization"));
1088        assert!(is_valid_entity_type("location"));
1089        assert!(is_valid_entity_type("date"));
1090        assert!(!is_valid_entity_type("unknown"));
1091    }
1092
1093    #[test]
1094    fn is_valid_relation_accepts_canonical_relations() {
1095        assert!(is_valid_relation("applies_to"));
1096        assert!(is_valid_relation("depends_on"));
1097        assert!(!is_valid_relation("foo_bar"));
1098    }
1099}