Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory using the same
5//! validation, chunking, embedding and persistence pipeline as `remember`,
6//! but executed in-process so the ONNX model is loaded only once per
7//! invocation. This is the v1.0.32 Onda 4B (finding A2) refactor that
8//! replaced a fork-spawn-per-file pipeline (every file paid the ~17s ONNX
9//! cold-start cost) with an in-process loop reusing the warm embedder
10//! (daemon when available, in-process `Embedder::new` otherwise).
11//!
12//! Memory names are derived from file basenames (kebab-case, lowercase,
13//! ASCII alphanumerics + hyphens). Output is line-delimited JSON: one
14//! object per processed file (success or error), followed by a final
15//! summary object. Designed for streaming consumption by agents.
16
17use crate::chunking;
18use crate::cli::MemoryType;
19use crate::errors::AppError;
20use crate::i18n::errors_msg;
21use crate::output::{self, JsonOutputFormat};
22use crate::paths::AppPaths;
23use crate::storage::chunks as storage_chunks;
24use crate::storage::connection::{ensure_db_ready, open_rw};
25use crate::storage::entities::{NewEntity, NewRelationship};
26use crate::storage::memories::NewMemory;
27use crate::storage::{entities, memories, urls as storage_urls, versions};
28use rusqlite::Connection;
29use serde::Serialize;
30use std::collections::BTreeSet;
31use std::path::{Path, PathBuf};
32
33/// Maximum length of a derived kebab-case name. Longer basenames are truncated
34/// (with a `tracing::warn!`) to keep the `memories.name` column bounded.
35const DERIVED_NAME_MAX_LEN: usize = 60;
36
37/// Hard cap on the numeric suffix appended for collision resolution. If 1000
38/// candidates collide we surface an error rather than loop forever.
39const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
40
41#[derive(clap::Args)]
42#[command(after_long_help = "EXAMPLES:\n  \
43    # Ingest every Markdown file under ./docs as `document` memories\n  \
44    sqlite-graphrag ingest ./docs --type document\n\n  \
45    # Ingest .txt files recursively under ./notes\n  \
46    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
47    # Skip BERT NER auto-extraction for faster bulk import\n  \
48    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
49NOTES:\n  \
50    Each file becomes a separate memory. Names derive from file basenames\n  \
51    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
52    followed by a final summary line with counts. Per-file errors are reported\n  \
53    inline and processing continues unless --fail-fast is set.")]
54pub struct IngestArgs {
55    /// Directory containing files to ingest.
56    #[arg(
57        value_name = "DIR",
58        help = "Directory to ingest recursively (each matching file becomes a memory)"
59    )]
60    pub dir: PathBuf,
61
62    /// Memory type stored in `memories.type` for every ingested file.
63    #[arg(long, value_enum)]
64    pub r#type: MemoryType,
65
66    /// Glob pattern matched against file basenames (default: `*.md`). Supports
67    /// `*.<ext>`, `<prefix>*`, and exact filename match.
68    #[arg(long, default_value = "*.md")]
69    pub pattern: String,
70
71    /// Recurse into subdirectories.
72    #[arg(long, default_value_t = false)]
73    pub recursive: bool,
74
75    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
76    #[arg(long, default_value_t = false)]
77    pub skip_extraction: bool,
78
79    /// Stop on first per-file error instead of continuing with the next file.
80    #[arg(long, default_value_t = false)]
81    pub fail_fast: bool,
82
83    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
84    #[arg(long, default_value_t = 10_000)]
85    pub max_files: usize,
86
87    /// Namespace for the ingested memories.
88    #[arg(long)]
89    pub namespace: Option<String>,
90
91    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
92    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
93    pub db: Option<String>,
94
95    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
96    pub format: JsonOutputFormat,
97
98    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
99    pub json: bool,
100}
101
102#[derive(Serialize)]
103struct IngestFileEvent<'a> {
104    file: &'a str,
105    name: &'a str,
106    status: &'a str,
107    /// True when the derived name was truncated to fit `DERIVED_NAME_MAX_LEN`. False otherwise.
108    truncated: bool,
109    /// Original derived name before truncation; only present when `truncated=true`.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    original_name: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    error: Option<String>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    memory_id: Option<i64>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    action: Option<String>,
118}
119
120#[derive(Serialize)]
121struct IngestSummary {
122    summary: bool,
123    dir: String,
124    pattern: String,
125    recursive: bool,
126    files_total: usize,
127    files_succeeded: usize,
128    files_failed: usize,
129    files_skipped: usize,
130    elapsed_ms: u64,
131}
132
133/// Outcome of a successful per-file ingest, used to build the NDJSON event.
134struct FileSuccess {
135    memory_id: i64,
136    action: String,
137}
138
139pub fn run(args: IngestArgs) -> Result<(), AppError> {
140    let started = std::time::Instant::now();
141
142    if !args.dir.exists() {
143        return Err(AppError::NotFound(format!(
144            "directory not found: {}",
145            args.dir.display()
146        )));
147    }
148    if !args.dir.is_dir() {
149        return Err(AppError::Validation(format!(
150            "path is not a directory: {}",
151            args.dir.display()
152        )));
153    }
154
155    let mut files: Vec<PathBuf> = Vec::new();
156    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
157    files.sort();
158
159    if files.len() > args.max_files {
160        return Err(AppError::Validation(format!(
161            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
162            files.len(),
163            args.max_files
164        )));
165    }
166
167    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
168    let memory_type_str = args.r#type.as_str().to_string();
169
170    // v1.0.32 Onda 4B: open the DB once and reuse the connection (and the
171    // warm embedder via `crate::daemon::embed_passage_or_local`) across every
172    // file, eliminating the ~17s ONNX cold-start that the previous
173    // fork-spawn-per-file design paid on each iteration. We tolerate a startup
174    // failure (e.g. an unwritable `--db` path) by capturing the error string
175    // and surfacing it as a per-file failure event so callers preserve the
176    // existing fail-fast / continue-on-error contract.
177    let paths = AppPaths::resolve(args.db.as_deref())?;
178    let mut conn_or_err = match init_storage(&paths) {
179        Ok(c) => Ok(c),
180        Err(e) => Err(format!("{e}")),
181    };
182
183    let mut succeeded: usize = 0;
184    let mut failed: usize = 0;
185    let mut skipped: usize = 0;
186    let total = files.len();
187
188    // v1.0.31 A10: track names produced during this run so two files with the
189    // same kebab basename (after truncation, transliteration, etc.) get
190    // distinct `-1`, `-2` suffixes within the same ingest invocation.
191    // Cross-run collisions are intentionally left to the per-file persistence
192    // path so re-ingesting an identical corpus still surfaces duplicates
193    // instead of silently creating shadow copies.
194    let mut taken_names: BTreeSet<String> = BTreeSet::new();
195
196    for path in &files {
197        let file_str = path.to_string_lossy().into_owned();
198        let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
199
200        if derived_base.is_empty() {
201            output::emit_json_compact(&IngestFileEvent {
202                file: &file_str,
203                name: "",
204                status: "skipped",
205                truncated: false,
206                original_name: None,
207                error: Some(
208                    "could not derive a non-empty kebab-case name from filename".to_string(),
209                ),
210                memory_id: None,
211                action: None,
212            })?;
213            skipped += 1;
214            continue;
215        }
216
217        let derived_name = match unique_name(&derived_base, &taken_names) {
218            Ok(n) => n,
219            Err(e) => {
220                output::emit_json_compact(&IngestFileEvent {
221                    file: &file_str,
222                    name: &derived_base,
223                    status: "skipped",
224                    truncated: name_truncated,
225                    original_name: original_name.clone(),
226                    error: Some(e.to_string()),
227                    memory_id: None,
228                    action: None,
229                })?;
230                skipped += 1;
231                continue;
232            }
233        };
234        taken_names.insert(derived_name.clone());
235
236        // If startup failed, every file inherits the same fatal error rather
237        // than silently succeeding against a non-existent database.
238        let conn = match conn_or_err.as_mut() {
239            Ok(c) => c,
240            Err(err_msg) => {
241                let err_clone = err_msg.clone();
242                output::emit_json_compact(&IngestFileEvent {
243                    file: &file_str,
244                    name: &derived_name,
245                    status: "failed",
246                    truncated: name_truncated,
247                    original_name: original_name.clone(),
248                    error: Some(err_clone.clone()),
249                    memory_id: None,
250                    action: None,
251                })?;
252                failed += 1;
253                if args.fail_fast {
254                    output::emit_json_compact(&IngestSummary {
255                        summary: true,
256                        dir: args.dir.display().to_string(),
257                        pattern: args.pattern.clone(),
258                        recursive: args.recursive,
259                        files_total: total,
260                        files_succeeded: succeeded,
261                        files_failed: failed,
262                        files_skipped: skipped,
263                        elapsed_ms: started.elapsed().as_millis() as u64,
264                    })?;
265                    return Err(AppError::Validation(format!(
266                        "ingest aborted on first failure: {err_clone}"
267                    )));
268                }
269                continue;
270            }
271        };
272
273        let outcome = process_file(
274            conn,
275            &paths,
276            &namespace,
277            &memory_type_str,
278            args.skip_extraction,
279            path,
280            &derived_name,
281        );
282
283        match outcome {
284            Ok(FileSuccess { memory_id, action }) => {
285                output::emit_json_compact(&IngestFileEvent {
286                    file: &file_str,
287                    name: &derived_name,
288                    status: "indexed",
289                    truncated: name_truncated,
290                    original_name: original_name.clone(),
291                    error: None,
292                    memory_id: Some(memory_id),
293                    action: Some(action),
294                })?;
295                succeeded += 1;
296            }
297            Err(e) => {
298                let err_msg = format!("{e}");
299                output::emit_json_compact(&IngestFileEvent {
300                    file: &file_str,
301                    name: &derived_name,
302                    status: "failed",
303                    truncated: name_truncated,
304                    original_name: original_name.clone(),
305                    error: Some(err_msg.clone()),
306                    memory_id: None,
307                    action: None,
308                })?;
309                failed += 1;
310                if args.fail_fast {
311                    output::emit_json_compact(&IngestSummary {
312                        summary: true,
313                        dir: args.dir.display().to_string(),
314                        pattern: args.pattern.clone(),
315                        recursive: args.recursive,
316                        files_total: total,
317                        files_succeeded: succeeded,
318                        files_failed: failed,
319                        files_skipped: skipped,
320                        elapsed_ms: started.elapsed().as_millis() as u64,
321                    })?;
322                    return Err(AppError::Validation(format!(
323                        "ingest aborted on first failure: {err_msg}"
324                    )));
325                }
326            }
327        }
328    }
329
330    output::emit_json_compact(&IngestSummary {
331        summary: true,
332        dir: args.dir.display().to_string(),
333        pattern: args.pattern.clone(),
334        recursive: args.recursive,
335        files_total: total,
336        files_succeeded: succeeded,
337        files_failed: failed,
338        files_skipped: skipped,
339        elapsed_ms: started.elapsed().as_millis() as u64,
340    })?;
341
342    Ok(())
343}
344
345/// Auto-initialises the database (matches the contract of every other CRUD
346/// handler) and returns a fresh read/write connection ready for the ingest
347/// loop. Errors here are recoverable per-file: the caller surfaces them as
348/// failure events so `--fail-fast` and the continue-on-error path keep
349/// working when, for example, the user points `--db` at an unwritable path.
350fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
351    ensure_db_ready(paths)?;
352    let conn = open_rw(&paths.db)?;
353    Ok(conn)
354}
355
356/// In-process equivalent of `remember::run` for a single file. Mirrors the
357/// canonical pipeline: read body, validate length, chunk, embed via the
358/// daemon-or-local fallback (the warm embedder is reused across every file),
359/// optionally extract entities, then persist memory + chunks + entities +
360/// URLs in a single immediate transaction.
361#[allow(clippy::too_many_arguments)]
362fn process_file(
363    conn: &mut Connection,
364    paths: &AppPaths,
365    namespace: &str,
366    memory_type: &str,
367    skip_extraction: bool,
368    path: &Path,
369    name: &str,
370) -> Result<FileSuccess, AppError> {
371    use crate::constants::*;
372
373    if name.len() > MAX_MEMORY_NAME_LEN {
374        return Err(AppError::LimitExceeded(
375            crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
376        ));
377    }
378    if name.starts_with("__") {
379        return Err(AppError::Validation(
380            crate::i18n::validation::reserved_name(),
381        ));
382    }
383    {
384        let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
385            .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
386        if !slug_re.is_match(name) {
387            return Err(AppError::Validation(crate::i18n::validation::name_kebab(
388                name,
389            )));
390        }
391    }
392
393    let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
394    if raw_body.len() > MAX_MEMORY_BODY_LEN {
395        return Err(AppError::LimitExceeded(
396            crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
397        ));
398    }
399    if raw_body.trim().is_empty() {
400        return Err(AppError::Validation(crate::i18n::validation::empty_body()));
401    }
402
403    let description = format!("ingested from {}", path.display());
404    if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
405        return Err(AppError::Validation(
406            crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
407        ));
408    }
409
410    // Auto-extraction is best-effort — failures degrade gracefully like in
411    // `remember::run`. With `--skip-extraction` we bypass the BERT NER cost
412    // entirely (the chunking + embedding cost is independent).
413    let mut extracted_entities: Vec<NewEntity> = Vec::new();
414    let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
415    let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
416    let mut relationships_truncated = false;
417    if !skip_extraction {
418        match crate::extraction::extract_graph_auto(&raw_body, paths) {
419            Ok(extracted) => {
420                extracted_urls = extracted.urls;
421                extracted_entities = extracted.entities;
422                extracted_relationships = extracted.relationships;
423                relationships_truncated = extracted.relationships_truncated;
424
425                if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
426                    extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
427                }
428                if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
429                    relationships_truncated = true;
430                    extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
431                }
432            }
433            Err(e) => {
434                tracing::warn!(
435                    file = %path.display(),
436                    "auto-extraction failed (graceful degradation): {e:#}"
437                );
438            }
439        }
440    }
441
442    // Validate extracted graph types/relations to match `remember::run` rules.
443    for entity in &extracted_entities {
444        if !is_valid_entity_type(&entity.entity_type) {
445            return Err(AppError::Validation(format!(
446                "invalid entity_type '{}' for entity '{}'",
447                entity.entity_type, entity.name
448            )));
449        }
450    }
451    for rel in &mut extracted_relationships {
452        rel.relation = rel.relation.replace('-', "_");
453        if !is_valid_relation(&rel.relation) {
454            return Err(AppError::Validation(format!(
455                "invalid relation '{}' for relationship '{}' -> '{}'",
456                rel.relation, rel.source, rel.target
457            )));
458        }
459        if !(0.0..=1.0).contains(&rel.strength) {
460            return Err(AppError::Validation(format!(
461                "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
462                rel.strength, rel.source, rel.target
463            )));
464        }
465    }
466
467    let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
468    let snippet: String = raw_body.chars().take(200).collect();
469
470    let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
471    let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
472    if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
473        return Err(AppError::LimitExceeded(format!(
474            "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
475            chunks_info.len(),
476            REMEMBER_MAX_SAFE_MULTI_CHUNKS
477        )));
478    }
479
480    // Reuse the warm embedder (daemon when available, in-process otherwise).
481    // This is the load-bearing change of Onda 4B: the model is loaded ONCE
482    // for the whole ingest run, not once per file.
483    let mut chunk_embeddings_cache: Option<Vec<Vec<f32>>> = None;
484    let embedding = if chunks_info.len() == 1 {
485        crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
486    } else {
487        let chunk_texts: Vec<&str> = chunks_info
488            .iter()
489            .map(|c| chunking::chunk_text(&raw_body, c))
490            .collect();
491        let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
492        for chunk_text in &chunk_texts {
493            chunk_embeddings.push(crate::daemon::embed_passage_or_local(
494                &paths.models,
495                chunk_text,
496            )?);
497        }
498        let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
499        chunk_embeddings_cache = Some(chunk_embeddings);
500        aggregated
501    };
502
503    // Namespace bookkeeping (mirrors remember::run): reject when active
504    // namespaces already hit the cap and this file would create a new one.
505    {
506        let active_count: u32 = conn.query_row(
507            "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
508            [],
509            |r| r.get::<_, i64>(0).map(|v| v as u32),
510        )?;
511        let ns_exists: bool = conn.query_row(
512            "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
513            rusqlite::params![namespace],
514            |r| r.get::<_, i64>(0).map(|v| v > 0),
515        )?;
516        if !ns_exists && active_count >= MAX_NAMESPACES_ACTIVE {
517            return Err(AppError::NamespaceError(format!(
518                "active namespace limit of {MAX_NAMESPACES_ACTIVE} exceeded while creating '{namespace}'"
519            )));
520        }
521    }
522
523    let existing_memory = memories::find_by_name(conn, namespace, name)?;
524    if existing_memory.is_some() {
525        // Ingest does not implement merge semantics; surface the duplicate as
526        // a per-file failure so the caller can decide whether to remove the
527        // existing memory or rename the source file.
528        return Err(AppError::Duplicate(errors_msg::duplicate_memory(
529            name, namespace,
530        )));
531    }
532    let duplicate_hash_id = memories::find_by_hash(conn, namespace, &body_hash)?;
533
534    let new_memory = NewMemory {
535        namespace: namespace.to_string(),
536        name: name.to_string(),
537        memory_type: memory_type.to_string(),
538        description: description.clone(),
539        body: raw_body,
540        body_hash: body_hash.clone(),
541        session_id: None,
542        source: "agent".to_string(),
543        metadata: serde_json::json!({}),
544    };
545
546    // Pre-compute entity embeddings BEFORE the transaction so the embedder
547    // (and any daemon socket) is touched outside the immediate write lock.
548    let graph_entity_embeddings = extracted_entities
549        .iter()
550        .map(|entity| {
551            let entity_text = match &entity.description {
552                Some(desc) => format!("{} {}", entity.name, desc),
553                None => entity.name.clone(),
554            };
555            crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
556        })
557        .collect::<Result<Vec<_>, _>>()?;
558
559    let _ = relationships_truncated; // not surfaced in the per-file event today
560
561    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
562
563    if let Some(hash_id) = duplicate_hash_id {
564        tracing::debug!(
565            target: "ingest",
566            duplicate_memory_id = hash_id,
567            "identical body already exists; persisting a new memory anyway"
568        );
569    }
570
571    let memory_id = memories::insert(&tx, &new_memory)?;
572    versions::insert_version(
573        &tx,
574        memory_id,
575        1,
576        name,
577        memory_type,
578        &description,
579        &new_memory.body,
580        &serde_json::to_string(&new_memory.metadata)?,
581        None,
582        "create",
583    )?;
584    memories::upsert_vec(
585        &tx,
586        memory_id,
587        namespace,
588        memory_type,
589        &embedding,
590        name,
591        &snippet,
592    )?;
593
594    if chunks_info.len() > 1 {
595        storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &chunks_info)?;
596        let chunk_embeddings = chunk_embeddings_cache.take().ok_or_else(|| {
597            AppError::Internal(anyhow::anyhow!(
598                "missing chunk embeddings cache on multi-chunk ingest path"
599            ))
600        })?;
601        for (i, emb) in chunk_embeddings.iter().enumerate() {
602            storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
603        }
604    }
605
606    if !extracted_entities.is_empty() || !extracted_relationships.is_empty() {
607        for (idx, entity) in extracted_entities.iter().enumerate() {
608            let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
609            let entity_embedding = &graph_entity_embeddings[idx];
610            entities::upsert_entity_vec(
611                &tx,
612                entity_id,
613                namespace,
614                &entity.entity_type,
615                entity_embedding,
616                &entity.name,
617            )?;
618            entities::link_memory_entity(&tx, memory_id, entity_id)?;
619            entities::increment_degree(&tx, entity_id)?;
620        }
621        let entity_types: std::collections::HashMap<&str, &str> = extracted_entities
622            .iter()
623            .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
624            .collect();
625        for rel in &extracted_relationships {
626            let source_entity = NewEntity {
627                name: rel.source.clone(),
628                entity_type: entity_types
629                    .get(rel.source.as_str())
630                    .copied()
631                    .unwrap_or("concept")
632                    .to_string(),
633                description: None,
634            };
635            let target_entity = NewEntity {
636                name: rel.target.clone(),
637                entity_type: entity_types
638                    .get(rel.target.as_str())
639                    .copied()
640                    .unwrap_or("concept")
641                    .to_string(),
642                description: None,
643            };
644            let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
645            let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
646            let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
647            entities::link_memory_relationship(&tx, memory_id, rel_id)?;
648        }
649    }
650
651    tx.commit()?;
652
653    // URLs persistence is non-critical (failures don't propagate) and lives
654    // outside the main transaction to mirror `remember::run` semantics.
655    if !extracted_urls.is_empty() {
656        let url_entries: Vec<storage_urls::MemoryUrl> = extracted_urls
657            .into_iter()
658            .map(|u| storage_urls::MemoryUrl {
659                url: u.url,
660                offset: Some(u.offset as i64),
661            })
662            .collect();
663        let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
664    }
665
666    Ok(FileSuccess {
667        memory_id,
668        action: "created".to_string(),
669    })
670}
671
672fn is_valid_entity_type(entity_type: &str) -> bool {
673    matches!(
674        entity_type,
675        "project"
676            | "tool"
677            | "person"
678            | "file"
679            | "concept"
680            | "incident"
681            | "decision"
682            | "memory"
683            | "dashboard"
684            | "issue_tracker"
685            | "organization"
686            | "location"
687            | "date"
688    )
689}
690
691fn is_valid_relation(relation: &str) -> bool {
692    matches!(
693        relation,
694        "applies_to"
695            | "uses"
696            | "depends_on"
697            | "causes"
698            | "fixes"
699            | "contradicts"
700            | "supports"
701            | "follows"
702            | "related"
703            | "mentions"
704            | "replaces"
705            | "tracked_in"
706    )
707}
708
709fn collect_files(
710    dir: &Path,
711    pattern: &str,
712    recursive: bool,
713    out: &mut Vec<PathBuf>,
714) -> Result<(), AppError> {
715    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
716    for entry in entries {
717        let entry = entry.map_err(AppError::Io)?;
718        let path = entry.path();
719        let file_type = entry.file_type().map_err(AppError::Io)?;
720        if file_type.is_file() {
721            let name = entry.file_name();
722            let name_str = name.to_string_lossy();
723            if matches_pattern(&name_str, pattern) {
724                out.push(path);
725            }
726        } else if file_type.is_dir() && recursive {
727            collect_files(&path, pattern, recursive, out)?;
728        }
729    }
730    Ok(())
731}
732
733fn matches_pattern(name: &str, pattern: &str) -> bool {
734    if let Some(suffix) = pattern.strip_prefix('*') {
735        name.ends_with(suffix)
736    } else if let Some(prefix) = pattern.strip_suffix('*') {
737        name.starts_with(prefix)
738    } else {
739        name == pattern
740    }
741}
742
743/// Returns `(final_name, truncated, original_name)`.
744/// `truncated` is true when the derived name exceeded `DERIVED_NAME_MAX_LEN`.
745/// `original_name` holds the pre-truncation name only when `truncated=true`.
746fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
747    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
748    let lowered: String = stem
749        .chars()
750        .map(|c| {
751            if c == '_' || c.is_whitespace() {
752                '-'
753            } else {
754                c
755            }
756        })
757        .map(|c| c.to_ascii_lowercase())
758        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
759        .collect();
760    let collapsed = collapse_dashes(&lowered);
761    let trimmed = collapsed.trim_matches('-').to_string();
762    if trimmed.len() > DERIVED_NAME_MAX_LEN {
763        let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
764            .trim_matches('-')
765            .to_string();
766        // v1.0.31 A10: surface the truncation so users can fix overly long file
767        // basenames before they collide with siblings sharing the same prefix.
768        tracing::warn!(
769            target: "ingest",
770            original = %trimmed,
771            truncated_to = %truncated,
772            max_len = DERIVED_NAME_MAX_LEN,
773            "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
774        );
775        (truncated, true, Some(trimmed))
776    } else {
777        (trimmed, false, None)
778    }
779}
780
781/// v1.0.31 A10: returns the first non-colliding kebab name by appending a
782/// numeric suffix (`-1`, `-2`, …) when needed.
783///
784/// `taken` is the set of names already consumed in the current ingest run.
785/// The caller is expected to insert the returned name into `taken` so the
786/// next call observes the consumption. Cross-run collisions are intentionally
787/// surfaced by the per-file persistence path as duplicates so re-ingestion
788/// of identical corpora stays idempotent.
789///
790/// Returns `Err(AppError::Validation)` after `MAX_NAME_COLLISION_SUFFIX`
791/// candidates collide, signalling a pathological corpus that should be
792/// renamed manually.
793fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
794    if !taken.contains(base) {
795        return Ok(base.to_string());
796    }
797    for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
798        let candidate = format!("{base}-{suffix}");
799        if !taken.contains(&candidate) {
800            tracing::warn!(
801                target: "ingest",
802                base = %base,
803                resolved = %candidate,
804                suffix,
805                "memory name collision resolved with numeric suffix"
806            );
807            return Ok(candidate);
808        }
809    }
810    Err(AppError::Validation(format!(
811        "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
812    )))
813}
814
815fn collapse_dashes(s: &str) -> String {
816    let mut out = String::with_capacity(s.len());
817    let mut prev_dash = false;
818    for c in s.chars() {
819        if c == '-' {
820            if !prev_dash {
821                out.push('-');
822            }
823            prev_dash = true;
824        } else {
825            out.push(c);
826            prev_dash = false;
827        }
828    }
829    out
830}
831
832#[cfg(test)]
833mod tests {
834    use super::*;
835    use std::path::PathBuf;
836
837    #[test]
838    fn matches_pattern_suffix() {
839        assert!(matches_pattern("foo.md", "*.md"));
840        assert!(!matches_pattern("foo.txt", "*.md"));
841        assert!(matches_pattern("foo.md", "*"));
842    }
843
844    #[test]
845    fn matches_pattern_prefix() {
846        assert!(matches_pattern("README.md", "README*"));
847        assert!(!matches_pattern("CHANGELOG.md", "README*"));
848    }
849
850    #[test]
851    fn matches_pattern_exact() {
852        assert!(matches_pattern("README.md", "README.md"));
853        assert!(!matches_pattern("readme.md", "README.md"));
854    }
855
856    #[test]
857    fn derive_kebab_underscore_to_dash() {
858        let p = PathBuf::from("/tmp/claude_code_headless.md");
859        let (name, truncated, original) = derive_kebab_name(&p);
860        assert_eq!(name, "claude-code-headless");
861        assert!(!truncated);
862        assert!(original.is_none());
863    }
864
865    #[test]
866    fn derive_kebab_uppercase_lowered() {
867        let p = PathBuf::from("/tmp/README.md");
868        let (name, truncated, original) = derive_kebab_name(&p);
869        assert_eq!(name, "readme");
870        assert!(!truncated);
871        assert!(original.is_none());
872    }
873
874    #[test]
875    fn derive_kebab_strips_non_kebab_chars() {
876        let p = PathBuf::from("/tmp/some@weird#name!.md");
877        let (name, truncated, original) = derive_kebab_name(&p);
878        assert_eq!(name, "someweirdname");
879        assert!(!truncated);
880        assert!(original.is_none());
881    }
882
883    #[test]
884    fn derive_kebab_collapses_consecutive_dashes() {
885        let p = PathBuf::from("/tmp/a__b___c.md");
886        let (name, truncated, original) = derive_kebab_name(&p);
887        assert_eq!(name, "a-b-c");
888        assert!(!truncated);
889        assert!(original.is_none());
890    }
891
892    #[test]
893    fn derive_kebab_truncates_to_60_chars() {
894        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
895        let (name, truncated, original) = derive_kebab_name(&p);
896        assert!(name.len() <= 60, "got len {}", name.len());
897        assert!(truncated);
898        assert!(original.is_some());
899        assert!(original.unwrap().len() > 60);
900    }
901
902    #[test]
903    fn collect_files_finds_md_files() {
904        let tmp = tempfile::tempdir().expect("tempdir");
905        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
906        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
907        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
908        let mut out = Vec::new();
909        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
910        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
911    }
912
913    #[test]
914    fn collect_files_recursive_descends_subdirs() {
915        let tmp = tempfile::tempdir().expect("tempdir");
916        let sub = tmp.path().join("sub");
917        std::fs::create_dir(&sub).unwrap();
918        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
919        std::fs::write(sub.join("b.md"), "y").unwrap();
920        let mut out = Vec::new();
921        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
922        assert_eq!(out.len(), 2);
923    }
924
925    #[test]
926    fn collect_files_non_recursive_skips_subdirs() {
927        let tmp = tempfile::tempdir().expect("tempdir");
928        let sub = tmp.path().join("sub");
929        std::fs::create_dir(&sub).unwrap();
930        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
931        std::fs::write(sub.join("b.md"), "y").unwrap();
932        let mut out = Vec::new();
933        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
934        assert_eq!(out.len(), 1);
935    }
936
937    // ── v1.0.31 A10: name truncation warns and collisions are auto-resolved ──
938
939    #[test]
940    fn derive_kebab_long_basename_truncated_within_cap() {
941        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
942        let (name, truncated, original) = derive_kebab_name(&p);
943        assert!(
944            name.len() <= DERIVED_NAME_MAX_LEN,
945            "truncated name must respect cap; got {} chars",
946            name.len()
947        );
948        assert!(!name.is_empty());
949        assert!(truncated);
950        assert!(original.is_some());
951    }
952
953    #[test]
954    fn unique_name_returns_base_when_free() {
955        let taken: BTreeSet<String> = BTreeSet::new();
956        let resolved = unique_name("note", &taken).expect("must resolve");
957        assert_eq!(resolved, "note");
958    }
959
960    #[test]
961    fn unique_name_appends_first_free_suffix_on_collision() {
962        let mut taken: BTreeSet<String> = BTreeSet::new();
963        taken.insert("note".to_string());
964        taken.insert("note-1".to_string());
965        let resolved = unique_name("note", &taken).expect("must resolve");
966        assert_eq!(resolved, "note-2");
967    }
968
969    #[test]
970    fn unique_name_errors_after_collision_cap() {
971        let mut taken: BTreeSet<String> = BTreeSet::new();
972        taken.insert("note".to_string());
973        for i in 1..=MAX_NAME_COLLISION_SUFFIX {
974            taken.insert(format!("note-{i}"));
975        }
976        let err = unique_name("note", &taken).expect_err("must surface error");
977        assert!(matches!(err, AppError::Validation(_)));
978    }
979
980    // ── v1.0.32 Onda 4B: in-process pipeline validation ──
981
982    #[test]
983    fn is_valid_entity_type_accepts_v008_types() {
984        assert!(is_valid_entity_type("organization"));
985        assert!(is_valid_entity_type("location"));
986        assert!(is_valid_entity_type("date"));
987        assert!(!is_valid_entity_type("unknown"));
988    }
989
990    #[test]
991    fn is_valid_relation_accepts_canonical_relations() {
992        assert!(is_valid_relation("applies_to"));
993        assert!(is_valid_relation("depends_on"));
994        assert!(!is_valid_relation("foo_bar"));
995    }
996}