Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16
17use crate::chunking;
18use crate::cli::MemoryType;
19use crate::errors::AppError;
20use crate::i18n::errors_msg;
21use crate::output::{self, JsonOutputFormat};
22use crate::paths::AppPaths;
23use crate::storage::chunks as storage_chunks;
24use crate::storage::connection::{ensure_db_ready, open_rw};
25use crate::storage::entities::{NewEntity, NewRelationship};
26use crate::storage::memories::NewMemory;
27use crate::storage::{entities, memories, urls as storage_urls, versions};
28use rusqlite::Connection;
29use serde::Serialize;
30use std::collections::BTreeSet;
31use std::path::{Path, PathBuf};
32
33/// Maximum length of a derived kebab-case name. Longer basenames are truncated
34/// (with a `tracing::warn!`) to keep the `memories.name` column bounded.
35const DERIVED_NAME_MAX_LEN: usize = 60;
36
37/// Hard cap on the numeric suffix appended for collision resolution. If 1000
38/// candidates collide we surface an error rather than loop forever.
39const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
40
41#[derive(clap::Args)]
42#[command(after_long_help = "EXAMPLES:\n  \
43    # Ingest every Markdown file under ./docs as `document` memories\n  \
44    sqlite-graphrag ingest ./docs --type document\n\n  \
45    # Ingest .txt files recursively under ./notes\n  \
46    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
47    # Skip BERT NER auto-extraction for faster bulk import\n  \
48    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
49NOTES:\n  \
50    Each file becomes a separate memory. Names derive from file basenames\n  \
51    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
52    followed by a final summary line with counts. Per-file errors are reported\n  \
53    inline and processing continues unless --fail-fast is set.")]
54pub struct IngestArgs {
55    /// Directory containing files to ingest.
56    #[arg(
57        value_name = "DIR",
58        help = "Directory to ingest recursively (each matching file becomes a memory)"
59    )]
60    pub dir: PathBuf,
61
62    /// Memory type stored in `memories.type` for every ingested file.
63    #[arg(long, value_enum)]
64    pub r#type: MemoryType,
65
66    /// Glob pattern matched against file basenames (default: `*.md`). Supports
67    /// `*.<ext>`, `<prefix>*`, and exact filename match.
68    #[arg(long, default_value = "*.md")]
69    pub pattern: String,
70
71    /// Recurse into subdirectories.
72    #[arg(long, default_value_t = false)]
73    pub recursive: bool,
74
75    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
76    #[arg(long, default_value_t = false)]
77    pub skip_extraction: bool,
78
79    /// Stop on first per-file error instead of continuing with the next file.
80    #[arg(long, default_value_t = false)]
81    pub fail_fast: bool,
82
83    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
84    #[arg(long, default_value_t = 10_000)]
85    pub max_files: usize,
86
87    /// Namespace for the ingested memories.
88    #[arg(long)]
89    pub namespace: Option<String>,
90
91    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
92    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
93    pub db: Option<String>,
94
95    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
96    pub format: JsonOutputFormat,
97
98    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
99    pub json: bool,
100}
101
102#[derive(Serialize)]
103struct IngestFileEvent<'a> {
104    file: &'a str,
105    name: &'a str,
106    status: &'a str,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    error: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    memory_id: Option<i64>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    action: Option<String>,
113}
114
115#[derive(Serialize)]
116struct IngestSummary {
117    summary: bool,
118    dir: String,
119    pattern: String,
120    recursive: bool,
121    files_total: usize,
122    files_succeeded: usize,
123    files_failed: usize,
124    files_skipped: usize,
125    elapsed_ms: u64,
126}
127
128/// Outcome of a successful per-file ingest, used to build the NDJSON event.
129struct FileSuccess {
130    memory_id: i64,
131    action: String,
132}
133
134pub fn run(args: IngestArgs) -> Result<(), AppError> {
135    let started = std::time::Instant::now();
136
137    if !args.dir.exists() {
138        return Err(AppError::NotFound(format!(
139            "directory not found: {}",
140            args.dir.display()
141        )));
142    }
143    if !args.dir.is_dir() {
144        return Err(AppError::Validation(format!(
145            "path is not a directory: {}",
146            args.dir.display()
147        )));
148    }
149
150    let mut files: Vec<PathBuf> = Vec::new();
151    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
152    files.sort();
153
154    if files.len() > args.max_files {
155        return Err(AppError::Validation(format!(
156            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
157            files.len(),
158            args.max_files
159        )));
160    }
161
162    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
163    let memory_type_str = args.r#type.as_str().to_string();
164
165    // v1.0.32 Onda 4B: open the DB once and reuse the connection (and the
166    // warm embedder via `crate::daemon::embed_passage_or_local`) across every
167    // file, eliminating the ~17s ONNX cold-start that the previous
168    // fork-spawn-per-file design paid on each iteration. We tolerate a startup
169    // failure (e.g. an unwritable `--db` path) by capturing the error string
170    // and surfacing it as a per-file failure event so callers preserve the
171    // existing fail-fast / continue-on-error contract.
172    let paths = AppPaths::resolve(args.db.as_deref())?;
173    let mut conn_or_err = match init_storage(&paths) {
174        Ok(c) => Ok(c),
175        Err(e) => Err(format!("{e}")),
176    };
177
178    let mut succeeded: usize = 0;
179    let mut failed: usize = 0;
180    let mut skipped: usize = 0;
181    let total = files.len();
182
183    // v1.0.31 A10: track names produced during this run so two files with the
184    // same kebab basename (after truncation, transliteration, etc.) get
185    // distinct `-1`, `-2` suffixes within the same ingest invocation.
186    // Cross-run collisions are intentionally left to the per-file persistence
187    // path so re-ingesting an identical corpus still surfaces duplicates
188    // instead of silently creating shadow copies.
189    let mut taken_names: BTreeSet<String> = BTreeSet::new();
190
191    for path in &files {
192        let file_str = path.to_string_lossy().into_owned();
193        let derived_base = derive_kebab_name(path);
194
195        if derived_base.is_empty() {
196            output::emit_json_compact(&IngestFileEvent {
197                file: &file_str,
198                name: "",
199                status: "skipped",
200                error: Some(
201                    "could not derive a non-empty kebab-case name from filename".to_string(),
202                ),
203                memory_id: None,
204                action: None,
205            })?;
206            skipped += 1;
207            continue;
208        }
209
210        let derived_name = match unique_name(&derived_base, &taken_names) {
211            Ok(n) => n,
212            Err(e) => {
213                output::emit_json_compact(&IngestFileEvent {
214                    file: &file_str,
215                    name: &derived_base,
216                    status: "skipped",
217                    error: Some(e.to_string()),
218                    memory_id: None,
219                    action: None,
220                })?;
221                skipped += 1;
222                continue;
223            }
224        };
225        taken_names.insert(derived_name.clone());
226
227        // If startup failed, every file inherits the same fatal error rather
228        // than silently succeeding against a non-existent database.
229        let conn = match conn_or_err.as_mut() {
230            Ok(c) => c,
231            Err(err_msg) => {
232                let err_clone = err_msg.clone();
233                output::emit_json_compact(&IngestFileEvent {
234                    file: &file_str,
235                    name: &derived_name,
236                    status: "failed",
237                    error: Some(err_clone.clone()),
238                    memory_id: None,
239                    action: None,
240                })?;
241                failed += 1;
242                if args.fail_fast {
243                    output::emit_json_compact(&IngestSummary {
244                        summary: true,
245                        dir: args.dir.display().to_string(),
246                        pattern: args.pattern.clone(),
247                        recursive: args.recursive,
248                        files_total: total,
249                        files_succeeded: succeeded,
250                        files_failed: failed,
251                        files_skipped: skipped,
252                        elapsed_ms: started.elapsed().as_millis() as u64,
253                    })?;
254                    return Err(AppError::Validation(format!(
255                        "ingest aborted on first failure: {err_clone}"
256                    )));
257                }
258                continue;
259            }
260        };
261
262        let outcome = process_file(
263            conn,
264            &paths,
265            &namespace,
266            &memory_type_str,
267            args.skip_extraction,
268            path,
269            &derived_name,
270        );
271
272        match outcome {
273            Ok(FileSuccess { memory_id, action }) => {
274                output::emit_json_compact(&IngestFileEvent {
275                    file: &file_str,
276                    name: &derived_name,
277                    status: "indexed",
278                    error: None,
279                    memory_id: Some(memory_id),
280                    action: Some(action),
281                })?;
282                succeeded += 1;
283            }
284            Err(e) => {
285                let err_msg = format!("{e}");
286                output::emit_json_compact(&IngestFileEvent {
287                    file: &file_str,
288                    name: &derived_name,
289                    status: "failed",
290                    error: Some(err_msg.clone()),
291                    memory_id: None,
292                    action: None,
293                })?;
294                failed += 1;
295                if args.fail_fast {
296                    output::emit_json_compact(&IngestSummary {
297                        summary: true,
298                        dir: args.dir.display().to_string(),
299                        pattern: args.pattern.clone(),
300                        recursive: args.recursive,
301                        files_total: total,
302                        files_succeeded: succeeded,
303                        files_failed: failed,
304                        files_skipped: skipped,
305                        elapsed_ms: started.elapsed().as_millis() as u64,
306                    })?;
307                    return Err(AppError::Validation(format!(
308                        "ingest aborted on first failure: {err_msg}"
309                    )));
310                }
311            }
312        }
313    }
314
315    output::emit_json_compact(&IngestSummary {
316        summary: true,
317        dir: args.dir.display().to_string(),
318        pattern: args.pattern.clone(),
319        recursive: args.recursive,
320        files_total: total,
321        files_succeeded: succeeded,
322        files_failed: failed,
323        files_skipped: skipped,
324        elapsed_ms: started.elapsed().as_millis() as u64,
325    })?;
326
327    Ok(())
328}
329
330/// Auto-initialises the database (matches the contract of every other CRUD
331/// handler) and returns a fresh read/write connection ready for the ingest
332/// loop. Errors here are recoverable per-file: the caller surfaces them as
333/// failure events so `--fail-fast` and the continue-on-error path keep
334/// working when, for example, the user points `--db` at an unwritable path.
335fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
336    ensure_db_ready(paths)?;
337    let conn = open_rw(&paths.db)?;
338    Ok(conn)
339}
340
341/// In-process equivalent of `remember::run` for a single file. Mirrors the
342/// canonical pipeline: read body, validate length, chunk, embed via the
343/// daemon-or-local fallback (the warm embedder is reused across every file),
344/// optionally extract entities, then persist memory + chunks + entities +
345/// URLs in a single immediate transaction.
346#[allow(clippy::too_many_arguments)]
347fn process_file(
348    conn: &mut Connection,
349    paths: &AppPaths,
350    namespace: &str,
351    memory_type: &str,
352    skip_extraction: bool,
353    path: &Path,
354    name: &str,
355) -> Result<FileSuccess, AppError> {
356    use crate::constants::*;
357
358    if name.len() > MAX_MEMORY_NAME_LEN {
359        return Err(AppError::LimitExceeded(
360            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
361        ));
362    }
363    if name.starts_with("__") {
364        return Err(AppError::Validation(
365            crate::i18n::validation::reserved_name(),
366        ));
367    }
368    {
369        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
370            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
371        if !slug_re.is_match(name) {
372            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
373                name,
374            )));
375        }
376    }
377
378    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
379    if raw_body.len() > MAX_MEMORY_BODY_LEN {
380        return Err(AppError::LimitExceeded(
381            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
382        ));
383    }
384    if raw_body.trim().is_empty() {
385        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
386    }
387
388    let description = format!("ingested from {}", path.display());
389    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
390        return Err(AppError::Validation(
391            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
392        ));
393    }
394
395    // Auto-extraction is best-effort — failures degrade gracefully like in
396    // `remember::run`. With `--skip-extraction` we bypass the BERT NER cost
397    // entirely (the chunking + embedding cost is independent).
398    let mut extracted_entities: Vec<NewEntity> = Vec::new();
399    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
400    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
401    let mut relationships_truncated = false;
402    if !skip_extraction {
403        match crate::extraction::extract_graph_auto(&raw_body, paths) {
404            Ok(extracted) => {
405                extracted_urls = extracted.urls;
406                extracted_entities = extracted.entities;
407                extracted_relationships = extracted.relationships;
408                relationships_truncated = extracted.relationships_truncated;
409
410                if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
411                    extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
412                }
413                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
414                    relationships_truncated = true;
415                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
416                }
417            }
418            Err(e) => {
419                tracing::warn!(
420                    file = %path.display(),
421                    "auto-extraction failed (graceful degradation): {e:#}"
422                );
423            }
424        }
425    }
426
427    // Validate extracted graph types/relations to match `remember::run` rules.
428    for entity in &extracted_entities {
429        if !is_valid_entity_type(&entity.entity_type) {
430            return Err(AppError::Validation(format!(
431                "invalid entity_type '{}' for entity '{}'",
432                entity.entity_type, entity.name
433            )));
434        }
435    }
436    for rel in &mut extracted_relationships {
437        rel.relation = rel.relation.replace('-', "_");
438        if !is_valid_relation(&rel.relation) {
439            return Err(AppError::Validation(format!(
440                "invalid relation '{}' for relationship '{}' -> '{}'",
441                rel.relation, rel.source, rel.target
442            )));
443        }
444        if !(0.0..=1.0).contains(&rel.strength) {
445            return Err(AppError::Validation(format!(
446                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
447                rel.strength, rel.source, rel.target
448            )));
449        }
450    }
451
452    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
453    let snippet: String = raw_body.chars().take(200).collect();
454
455    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
456    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
457    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
458        return Err(AppError::LimitExceeded(format!(
459            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
460            chunks_info.len(),
461            REMEMBER_MAX_SAFE_MULTI_CHUNKS
462        )));
463    }
464
465    // Reuse the warm embedder (daemon when available, in-process otherwise).
466    // This is the load-bearing change of Onda 4B: the model is loaded ONCE
467    // for the whole ingest run, not once per file.
468    let mut chunk_embeddings_cache: Option<Vec<Vec<f32>>> = None;
469    let embedding = if chunks_info.len() == 1 {
470        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
471    } else {
472        let chunk_texts: Vec<&str> = chunks_info
473            .iter()
474            .map(|c| chunking::chunk_text(&raw_body, c))
475            .collect();
476        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
477        for chunk_text in &chunk_texts {
478            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
479                &paths.models,
480                chunk_text,
481            )?);
482        }
483        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
484        chunk_embeddings_cache = Some(chunk_embeddings);
485        aggregated
486    };
487
488    // Namespace bookkeeping (mirrors remember::run): reject when active
489    // namespaces already hit the cap and this file would create a new one.
490    {
491        let active_count: u32 = conn.query_row(
492            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
493            [],
494            |r| r.get::<_, i64>(0).map(|v| v as u32),
495        )?;
496        let ns_exists: bool = conn.query_row(
497            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
498            rusqlite::params![namespace],
499            |r| r.get::<_, i64>(0).map(|v| v > 0),
500        )?;
501        if !ns_exists && active_count >= MAX_NAMESPACES_ACTIVE {
502            return Err(AppError::NamespaceError(format!(
503                "active namespace limit of {MAX_NAMESPACES_ACTIVE} exceeded while creating '{namespace}'"
504            )));
505        }
506    }
507
508    let existing_memory = memories::find_by_name(conn, namespace, name)?;
509    if existing_memory.is_some() {
510        // Ingest does not implement merge semantics; surface the duplicate as
511        // a per-file failure so the caller can decide whether to remove the
512        // existing memory or rename the source file.
513        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
514            name, namespace,
515        )));
516    }
517    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &body_hash)?;
518
519    let new_memory = NewMemory {
520        namespace: namespace.to_string(),
521        name: name.to_string(),
522        memory_type: memory_type.to_string(),
523        description: description.clone(),
524        body: raw_body,
525        body_hash: body_hash.clone(),
526        session_id: None,
527        source: "agent".to_string(),
528        metadata: serde_json::json!({}),
529    };
530
531    // Pre-compute entity embeddings BEFORE the transaction so the embedder
532    // (and any daemon socket) is touched outside the immediate write lock.
533    let graph_entity_embeddings = extracted_entities
534        .iter()
535        .map(|entity| {
536            let entity_text = match &entity.description {
537                Some(desc) => format!("{} {}", entity.name, desc),
538                None => entity.name.clone(),
539            };
540            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
541        })
542        .collect::<Result<Vec<_>, _>>()?;
543
544    let _ = relationships_truncated; // not surfaced in the per-file event today
545
546    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
547
548    if let Some(hash_id) = duplicate_hash_id {
549        tracing::debug!(
550            target: "ingest",
551            duplicate_memory_id = hash_id,
552            "identical body already exists; persisting a new memory anyway"
553        );
554    }
555
556    let memory_id = memories::insert(&tx, &new_memory)?;
557    versions::insert_version(
558        &tx,
559        memory_id,
560        1,
561        name,
562        memory_type,
563        &description,
564        &new_memory.body,
565        &serde_json::to_string(&new_memory.metadata)?,
566        None,
567        "create",
568    )?;
569    memories::upsert_vec(
570        &tx,
571        memory_id,
572        namespace,
573        memory_type,
574        &embedding,
575        name,
576        &snippet,
577    )?;
578
579    if chunks_info.len() > 1 {
580        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &chunks_info)?;
581        let chunk_embeddings = chunk_embeddings_cache.take().ok_or_else(|| {
582            AppError::Internal(anyhow::anyhow!(
583                "missing chunk embeddings cache on multi-chunk ingest path"
584            ))
585        })?;
586        for (i, emb) in chunk_embeddings.iter().enumerate() {
587            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
588        }
589    }
590
591    if !extracted_entities.is_empty() || !extracted_relationships.is_empty() {
592        for (idx, entity) in extracted_entities.iter().enumerate() {
593            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
594            let entity_embedding = &graph_entity_embeddings[idx];
595            entities::upsert_entity_vec(
596                &tx,
597                entity_id,
598                namespace,
599                &entity.entity_type,
600                entity_embedding,
601                &entity.name,
602            )?;
603            entities::link_memory_entity(&tx, memory_id, entity_id)?;
604            entities::increment_degree(&tx, entity_id)?;
605        }
606        let entity_types: std::collections::HashMap<&str, &str> = extracted_entities
607            .iter()
608            .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
609            .collect();
610        for rel in &extracted_relationships {
611            let source_entity = NewEntity {
612                name: rel.source.clone(),
613                entity_type: entity_types
614                    .get(rel.source.as_str())
615                    .copied()
616                    .unwrap_or("concept")
617                    .to_string(),
618                description: None,
619            };
620            let target_entity = NewEntity {
621                name: rel.target.clone(),
622                entity_type: entity_types
623                    .get(rel.target.as_str())
624                    .copied()
625                    .unwrap_or("concept")
626                    .to_string(),
627                description: None,
628            };
629            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
630            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
631            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
632            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
633        }
634    }
635
636    tx.commit()?;
637
638    // URLs persistence is non-critical (failures don't propagate) and lives
639    // outside the main transaction to mirror `remember::run` semantics.
640    if !extracted_urls.is_empty() {
641        let url_entries: Vec<storage_urls::MemoryUrl> = extracted_urls
642            .into_iter()
643            .map(|u| storage_urls::MemoryUrl {
644                url: u.url,
645                offset: Some(u.offset as i64),
646            })
647            .collect();
648        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
649    }
650
651    Ok(FileSuccess {
652        memory_id,
653        action: "created".to_string(),
654    })
655}
656
657fn is_valid_entity_type(entity_type: &str) -> bool {
658    matches!(
659        entity_type,
660        "project"
661            | "tool"
662            | "person"
663            | "file"
664            | "concept"
665            | "incident"
666            | "decision"
667            | "memory"
668            | "dashboard"
669            | "issue_tracker"
670            | "organization"
671            | "location"
672            | "date"
673    )
674}
675
676fn is_valid_relation(relation: &str) -> bool {
677    matches!(
678        relation,
679        "applies_to"
680            | "uses"
681            | "depends_on"
682            | "causes"
683            | "fixes"
684            | "contradicts"
685            | "supports"
686            | "follows"
687            | "related"
688            | "mentions"
689            | "replaces"
690            | "tracked_in"
691    )
692}
693
694fn collect_files(
695    dir: &Path,
696    pattern: &str,
697    recursive: bool,
698    out: &mut Vec<PathBuf>,
699) -> Result<(), AppError> {
700    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
701    for entry in entries {
702        let entry = entry.map_err(AppError::Io)?;
703        let path = entry.path();
704        let file_type = entry.file_type().map_err(AppError::Io)?;
705        if file_type.is_file() {
706            let name = entry.file_name();
707            let name_str = name.to_string_lossy();
708            if matches_pattern(&name_str, pattern) {
709                out.push(path);
710            }
711        } else if file_type.is_dir() && recursive {
712            collect_files(&path, pattern, recursive, out)?;
713        }
714    }
715    Ok(())
716}
717
718fn matches_pattern(name: &str, pattern: &str) -> bool {
719    if let Some(suffix) = pattern.strip_prefix('*') {
720        name.ends_with(suffix)
721    } else if let Some(prefix) = pattern.strip_suffix('*') {
722        name.starts_with(prefix)
723    } else {
724        name == pattern
725    }
726}
727
728fn derive_kebab_name(path: &Path) -> String {
729    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
730    let lowered: String = stem
731        .chars()
732        .map(|c| {
733            if c == '_' || c.is_whitespace() {
734                '-'
735            } else {
736                c
737            }
738        })
739        .map(|c| c.to_ascii_lowercase())
740        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
741        .collect();
742    let collapsed = collapse_dashes(&lowered);
743    let trimmed = collapsed.trim_matches('-').to_string();
744    if trimmed.len() > DERIVED_NAME_MAX_LEN {
745        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
746            .trim_matches('-')
747            .to_string();
748        // v1.0.31 A10: surface the truncation so users can fix overly long file
749        // basenames before they collide with siblings sharing the same prefix.
750        tracing::warn!(
751            target: "ingest",
752            original = %trimmed,
753            truncated_to = %truncated,
754            max_len = DERIVED_NAME_MAX_LEN,
755            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
756        );
757        truncated
758    } else {
759        trimmed
760    }
761}
762
763/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
764/// numeric suffix (`-1`, `-2`, …) when needed.
765///
766/// `taken` is the set of names already consumed in the current ingest run.
767/// The caller is expected to insert the returned name into `taken` so the
768/// next call observes the consumption. Cross-run collisions are intentionally
769/// surfaced by the per-file persistence path as duplicates so re-ingestion
770/// of identical corpora stays idempotent.
771///
772/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
773/// candidates collide, signalling a pathological corpus that should be
774/// renamed manually.
775fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
776    if !taken.contains(base) {
777        return Ok(base.to_string());
778    }
779    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
780        let candidate = format!("{base}-{suffix}");
781        if !taken.contains(&candidate) {
782            tracing::warn!(
783                target: "ingest",
784                base = %base,
785                resolved = %candidate,
786                suffix,
787                "memory name collision resolved with numeric suffix"
788            );
789            return Ok(candidate);
790        }
791    }
792    Err(AppError::Validation(format!(
793        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
794    )))
795}
796
797fn collapse_dashes(s: &str) -> String {
798    let mut out = String::with_capacity(s.len());
799    let mut prev_dash = false;
800    for c in s.chars() {
801        if c == '-' {
802            if !prev_dash {
803                out.push('-');
804            }
805            prev_dash = true;
806        } else {
807            out.push(c);
808            prev_dash = false;
809        }
810    }
811    out
812}
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817    use std::path::PathBuf;
818
819    #[test]
820    fn matches_pattern_suffix() {
821        assert!(matches_pattern("foo.md", "*.md"));
822        assert!(!matches_pattern("foo.txt", "*.md"));
823        assert!(matches_pattern("foo.md", "*"));
824    }
825
826    #[test]
827    fn matches_pattern_prefix() {
828        assert!(matches_pattern("README.md", "README*"));
829        assert!(!matches_pattern("CHANGELOG.md", "README*"));
830    }
831
832    #[test]
833    fn matches_pattern_exact() {
834        assert!(matches_pattern("README.md", "README.md"));
835        assert!(!matches_pattern("readme.md", "README.md"));
836    }
837
838    #[test]
839    fn derive_kebab_underscore_to_dash() {
840        let p = PathBuf::from("/tmp/claude_code_headless.md");
841        assert_eq!(derive_kebab_name(&p), "claude-code-headless");
842    }
843
844    #[test]
845    fn derive_kebab_uppercase_lowered() {
846        let p = PathBuf::from("/tmp/README.md");
847        assert_eq!(derive_kebab_name(&p), "readme");
848    }
849
850    #[test]
851    fn derive_kebab_strips_non_kebab_chars() {
852        let p = PathBuf::from("/tmp/some@weird#name!.md");
853        assert_eq!(derive_kebab_name(&p), "someweirdname");
854    }
855
856    #[test]
857    fn derive_kebab_collapses_consecutive_dashes() {
858        let p = PathBuf::from("/tmp/a__b___c.md");
859        assert_eq!(derive_kebab_name(&p), "a-b-c");
860    }
861
862    #[test]
863    fn derive_kebab_truncates_to_60_chars() {
864        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
865        let name = derive_kebab_name(&p);
866        assert!(name.len() <= 60, "got len {}", name.len());
867    }
868
869    #[test]
870    fn collect_files_finds_md_files() {
871        let tmp = tempfile::tempdir().expect("tempdir");
872        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
873        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
874        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
875        let mut out = Vec::new();
876        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
877        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
878    }
879
880    #[test]
881    fn collect_files_recursive_descends_subdirs() {
882        let tmp = tempfile::tempdir().expect("tempdir");
883        let sub = tmp.path().join("sub");
884        std::fs::create_dir(&sub).unwrap();
885        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
886        std::fs::write(sub.join("b.md"), "y").unwrap();
887        let mut out = Vec::new();
888        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
889        assert_eq!(out.len(), 2);
890    }
891
892    #[test]
893    fn collect_files_non_recursive_skips_subdirs() {
894        let tmp = tempfile::tempdir().expect("tempdir");
895        let sub = tmp.path().join("sub");
896        std::fs::create_dir(&sub).unwrap();
897        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
898        std::fs::write(sub.join("b.md"), "y").unwrap();
899        let mut out = Vec::new();
900        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
901        assert_eq!(out.len(), 1);
902    }
903
904    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
905
906    #[test]
907    fn derive_kebab_long_basename_truncated_within_cap() {
908        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
909        let name = derive_kebab_name(&p);
910        assert!(
911            name.len() <= DERIVED_NAME_MAX_LEN,
912            "truncated name must respect cap; got {} chars",
913            name.len()
914        );
915        assert!(!name.is_empty());
916    }
917
918    #[test]
919    fn unique_name_returns_base_when_free() {
920        let taken: BTreeSet<String> = BTreeSet::new();
921        let resolved = unique_name("note", &taken).expect("must resolve");
922        assert_eq!(resolved, "note");
923    }
924
925    #[test]
926    fn unique_name_appends_first_free_suffix_on_collision() {
927        let mut taken: BTreeSet<String> = BTreeSet::new();
928        taken.insert("note".to_string());
929        taken.insert("note-1".to_string());
930        let resolved = unique_name("note", &taken).expect("must resolve");
931        assert_eq!(resolved, "note-2");
932    }
933
934    #[test]
935    fn unique_name_errors_after_collision_cap() {
936        let mut taken: BTreeSet<String> = BTreeSet::new();
937        taken.insert("note".to_string());
938        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
939            taken.insert(format!("note-{i}"));
940        }
941        let err = unique_name("note", &taken).expect_err("must surface error");
942        assert!(matches!(err, AppError::Validation(_)));
943    }
944
945    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
946
947    #[test]
948    fn is_valid_entity_type_accepts_v008_types() {
949        assert!(is_valid_entity_type("organization"));
950        assert!(is_valid_entity_type("location"));
951        assert!(is_valid_entity_type("date"));
952        assert!(!is_valid_entity_type("unknown"));
953    }
954
955    #[test]
956    fn is_valid_relation_accepts_canonical_relations() {
957        assert!(is_valid_relation("applies_to"));
958        assert!(is_valid_relation("depends_on"));
959        assert!(!is_valid_relation("foo_bar"));
960    }
961}