Skip to main content

sqlite_graphrag/commands/
ingest.rs

1//! Handler for the `ingest` CLI subcommand.
2//!
3//! Bulk-ingests every file under a directory that matches a glob pattern.
4//! Each matched file is persisted as a separate memory by invoking the same
5//! `sqlite-graphrag remember --body-file` pipeline as a child process. Memory
6//! names are derived from file basenames (kebab-case, lowercase, ASCII
7//! alphanumerics + hyphens). Running each ingestion as a child process keeps
8//! `remember` untouched and naturally honours the same concurrency slot
9//! semantics as standalone `remember` invocations.
10//!
11//! Output is line-delimited JSON: one object per processed file (success or
12//! error), followed by a final summary object. Designed for streaming
13//! consumption by agents.
14
15use crate::cli::MemoryType;
16use crate::errors::AppError;
17use crate::output::{self, JsonOutputFormat};
18use serde::Serialize;
19use std::path::{Path, PathBuf};
20
21#[derive(clap::Args)]
22#[command(after_long_help = "EXAMPLES:\n  \
23    # Ingest every Markdown file under ./docs as `document` memories\n  \
24    sqlite-graphrag ingest ./docs --type document\n\n  \
25    # Ingest .txt files recursively under ./notes\n  \
26    sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n  \
27    # Skip BERT NER auto-extraction for faster bulk import\n  \
28    sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n  \
29NOTES:\n  \
30    Each file becomes a separate memory. Names derive from file basenames\n  \
31    (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n  \
32    followed by a final summary line with counts. Per-file errors are reported\n  \
33    inline and processing continues unless --fail-fast is set.")]
34pub struct IngestArgs {
35    /// Directory containing files to ingest.
36    #[arg(value_name = "DIR")]
37    pub dir: PathBuf,
38
39    /// Memory type stored in `memories.type` for every ingested file.
40    #[arg(long, value_enum)]
41    pub r#type: MemoryType,
42
43    /// Glob pattern matched against file basenames (default: `*.md`). Supports
44    /// `*.<ext>`, `<prefix>*`, and exact filename match.
45    #[arg(long, default_value = "*.md")]
46    pub pattern: String,
47
48    /// Recurse into subdirectories.
49    #[arg(long, default_value_t = false)]
50    pub recursive: bool,
51
52    /// Disable automatic BERT NER entity/relationship extraction (faster bulk import).
53    #[arg(long, default_value_t = false)]
54    pub skip_extraction: bool,
55
56    /// Stop on first per-file error instead of continuing with the next file.
57    #[arg(long, default_value_t = false)]
58    pub fail_fast: bool,
59
60    /// Maximum number of files to ingest (safety cap to prevent runaway ingestion).
61    #[arg(long, default_value_t = 10_000)]
62    pub max_files: usize,
63
64    /// Namespace for the ingested memories.
65    #[arg(long)]
66    pub namespace: Option<String>,
67
68    /// Database path. Falls back to `SQLITE_GRAPHRAG_DB_PATH`, then `./graphrag.sqlite`.
69    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
70    pub db: Option<String>,
71
72    #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
73    pub format: JsonOutputFormat,
74
75    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
76    pub json: bool,
77}
78
79#[derive(Serialize)]
80struct IngestFileEvent<'a> {
81    file: &'a str,
82    name: &'a str,
83    status: &'a str,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    error: Option<String>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    memory_id: Option<i64>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    action: Option<String>,
90}
91
92#[derive(Serialize)]
93struct IngestSummary {
94    summary: bool,
95    dir: String,
96    pattern: String,
97    recursive: bool,
98    files_total: usize,
99    files_succeeded: usize,
100    files_failed: usize,
101    files_skipped: usize,
102    elapsed_ms: u64,
103}
104
105pub fn run(args: IngestArgs) -> Result<(), AppError> {
106    let started = std::time::Instant::now();
107
108    if !args.dir.exists() {
109        return Err(AppError::NotFound(format!(
110            "directory not found: {}",
111            args.dir.display()
112        )));
113    }
114    if !args.dir.is_dir() {
115        return Err(AppError::Validation(format!(
116            "path is not a directory: {}",
117            args.dir.display()
118        )));
119    }
120
121    let mut files: Vec<PathBuf> = Vec::new();
122    collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
123    files.sort();
124
125    if files.len() > args.max_files {
126        return Err(AppError::Validation(format!(
127            "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
128            files.len(),
129            args.max_files
130        )));
131    }
132
133    let mut succeeded: usize = 0;
134    let mut failed: usize = 0;
135    let mut skipped: usize = 0;
136    let total = files.len();
137
138    let exe = std::env::current_exe().map_err(|e| {
139        AppError::Internal(anyhow::anyhow!("could not resolve current executable: {e}"))
140    })?;
141    let type_str = args.r#type.as_str();
142
143    for path in &files {
144        let file_str = path.to_string_lossy().into_owned();
145        let derived_name = derive_kebab_name(path);
146
147        if derived_name.is_empty() {
148            output::emit_json(&IngestFileEvent {
149                file: &file_str,
150                name: "",
151                status: "skipped",
152                error: Some(
153                    "could not derive a non-empty kebab-case name from filename".to_string(),
154                ),
155                memory_id: None,
156                action: None,
157            })?;
158            skipped += 1;
159            continue;
160        }
161
162        let description = format!("ingested from {}", path.display());
163
164        let mut cmd = std::process::Command::new(&exe);
165        cmd.arg("remember")
166            .arg("--name")
167            .arg(&derived_name)
168            .arg("--type")
169            .arg(type_str)
170            .arg("--description")
171            .arg(&description)
172            .arg("--body-file")
173            .arg(path);
174        if args.skip_extraction {
175            cmd.arg("--skip-extraction");
176        }
177        if let Some(ns) = &args.namespace {
178            cmd.arg("--namespace").arg(ns);
179        }
180        if let Some(db) = &args.db {
181            cmd.arg("--db").arg(db);
182        }
183        cmd.stdout(std::process::Stdio::piped())
184            .stderr(std::process::Stdio::piped());
185
186        let output_res = cmd.output().map_err(|e| {
187            AppError::Internal(anyhow::anyhow!(
188                "failed to spawn child remember process: {e}"
189            ))
190        })?;
191
192        if output_res.status.success() {
193            let memory_id = parse_memory_id(&output_res.stdout);
194            let action = parse_action(&output_res.stdout);
195            output::emit_json(&IngestFileEvent {
196                file: &file_str,
197                name: &derived_name,
198                status: "indexed",
199                error: None,
200                memory_id,
201                action,
202            })?;
203            succeeded += 1;
204        } else {
205            let err_msg = first_error_line(&output_res.stderr);
206            output::emit_json(&IngestFileEvent {
207                file: &file_str,
208                name: &derived_name,
209                status: "failed",
210                error: Some(err_msg.clone()),
211                memory_id: None,
212                action: None,
213            })?;
214            failed += 1;
215            if args.fail_fast {
216                output::emit_json(&IngestSummary {
217                    summary: true,
218                    dir: args.dir.display().to_string(),
219                    pattern: args.pattern.clone(),
220                    recursive: args.recursive,
221                    files_total: total,
222                    files_succeeded: succeeded,
223                    files_failed: failed,
224                    files_skipped: skipped,
225                    elapsed_ms: started.elapsed().as_millis() as u64,
226                })?;
227                return Err(AppError::Validation(format!(
228                    "ingest aborted on first failure: {err_msg}"
229                )));
230            }
231        }
232    }
233
234    output::emit_json(&IngestSummary {
235        summary: true,
236        dir: args.dir.display().to_string(),
237        pattern: args.pattern.clone(),
238        recursive: args.recursive,
239        files_total: total,
240        files_succeeded: succeeded,
241        files_failed: failed,
242        files_skipped: skipped,
243        elapsed_ms: started.elapsed().as_millis() as u64,
244    })?;
245
246    Ok(())
247}
248
249fn collect_files(
250    dir: &Path,
251    pattern: &str,
252    recursive: bool,
253    out: &mut Vec<PathBuf>,
254) -> Result<(), AppError> {
255    let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
256    for entry in entries {
257        let entry = entry.map_err(AppError::Io)?;
258        let path = entry.path();
259        let file_type = entry.file_type().map_err(AppError::Io)?;
260        if file_type.is_file() {
261            let name = entry.file_name();
262            let name_str = name.to_string_lossy();
263            if matches_pattern(&name_str, pattern) {
264                out.push(path);
265            }
266        } else if file_type.is_dir() && recursive {
267            collect_files(&path, pattern, recursive, out)?;
268        }
269    }
270    Ok(())
271}
272
273fn matches_pattern(name: &str, pattern: &str) -> bool {
274    if let Some(suffix) = pattern.strip_prefix('*') {
275        name.ends_with(suffix)
276    } else if let Some(prefix) = pattern.strip_suffix('*') {
277        name.starts_with(prefix)
278    } else {
279        name == pattern
280    }
281}
282
283fn derive_kebab_name(path: &Path) -> String {
284    let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
285    let lowered: String = stem
286        .chars()
287        .map(|c| {
288            if c == '_' || c.is_whitespace() {
289                '-'
290            } else {
291                c
292            }
293        })
294        .map(|c| c.to_ascii_lowercase())
295        .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
296        .collect();
297    let collapsed = collapse_dashes(&lowered);
298    let trimmed = collapsed.trim_matches('-').to_string();
299    let max_len = 60;
300    if trimmed.len() > max_len {
301        trimmed[..max_len].trim_matches('-').to_string()
302    } else {
303        trimmed
304    }
305}
306
307fn collapse_dashes(s: &str) -> String {
308    let mut out = String::with_capacity(s.len());
309    let mut prev_dash = false;
310    for c in s.chars() {
311        if c == '-' {
312            if !prev_dash {
313                out.push('-');
314            }
315            prev_dash = true;
316        } else {
317            out.push(c);
318            prev_dash = false;
319        }
320    }
321    out
322}
323
324fn parse_memory_id(stdout: &[u8]) -> Option<i64> {
325    let text = std::str::from_utf8(stdout).ok()?;
326    let value: serde_json::Value = serde_json::from_str(text).ok()?;
327    value.get("memory_id")?.as_i64()
328}
329
330fn parse_action(stdout: &[u8]) -> Option<String> {
331    let text = std::str::from_utf8(stdout).ok()?;
332    let value: serde_json::Value = serde_json::from_str(text).ok()?;
333    value.get("action")?.as_str().map(String::from)
334}
335
336fn first_error_line(stderr: &[u8]) -> String {
337    let text = String::from_utf8_lossy(stderr);
338    text.lines()
339        .find(|l| !l.trim().is_empty())
340        .unwrap_or("(no stderr captured)")
341        .to_string()
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use std::path::PathBuf;
348
349    #[test]
350    fn matches_pattern_suffix() {
351        assert!(matches_pattern("foo.md", "*.md"));
352        assert!(!matches_pattern("foo.txt", "*.md"));
353        assert!(matches_pattern("foo.md", "*"));
354    }
355
356    #[test]
357    fn matches_pattern_prefix() {
358        assert!(matches_pattern("README.md", "README*"));
359        assert!(!matches_pattern("CHANGELOG.md", "README*"));
360    }
361
362    #[test]
363    fn matches_pattern_exact() {
364        assert!(matches_pattern("README.md", "README.md"));
365        assert!(!matches_pattern("readme.md", "README.md"));
366    }
367
368    #[test]
369    fn derive_kebab_underscore_to_dash() {
370        let p = PathBuf::from("/tmp/claude_code_headless.md");
371        assert_eq!(derive_kebab_name(&p), "claude-code-headless");
372    }
373
374    #[test]
375    fn derive_kebab_uppercase_lowered() {
376        let p = PathBuf::from("/tmp/README.md");
377        assert_eq!(derive_kebab_name(&p), "readme");
378    }
379
380    #[test]
381    fn derive_kebab_strips_non_kebab_chars() {
382        let p = PathBuf::from("/tmp/some@weird#name!.md");
383        assert_eq!(derive_kebab_name(&p), "someweirdname");
384    }
385
386    #[test]
387    fn derive_kebab_collapses_consecutive_dashes() {
388        let p = PathBuf::from("/tmp/a__b___c.md");
389        assert_eq!(derive_kebab_name(&p), "a-b-c");
390    }
391
392    #[test]
393    fn derive_kebab_truncates_to_60_chars() {
394        let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
395        let name = derive_kebab_name(&p);
396        assert!(name.len() <= 60, "got len {}", name.len());
397    }
398
399    #[test]
400    fn collect_files_finds_md_files() {
401        let tmp = tempfile::tempdir().expect("tempdir");
402        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
403        std::fs::write(tmp.path().join("b.md"), "y").unwrap();
404        std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
405        let mut out = Vec::new();
406        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
407        assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
408    }
409
410    #[test]
411    fn collect_files_recursive_descends_subdirs() {
412        let tmp = tempfile::tempdir().expect("tempdir");
413        let sub = tmp.path().join("sub");
414        std::fs::create_dir(&sub).unwrap();
415        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
416        std::fs::write(sub.join("b.md"), "y").unwrap();
417        let mut out = Vec::new();
418        collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
419        assert_eq!(out.len(), 2);
420    }
421
422    #[test]
423    fn collect_files_non_recursive_skips_subdirs() {
424        let tmp = tempfile::tempdir().expect("tempdir");
425        let sub = tmp.path().join("sub");
426        std::fs::create_dir(&sub).unwrap();
427        std::fs::write(tmp.path().join("a.md"), "x").unwrap();
428        std::fs::write(sub.join("b.md"), "y").unwrap();
429        let mut out = Vec::new();
430        collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
431        assert_eq!(out.len(), 1);
432    }
433
434    #[test]
435    fn parse_memory_id_extracts_field() {
436        let stdout = br#"{"memory_id": 42, "name": "x"}"#;
437        assert_eq!(parse_memory_id(stdout), Some(42));
438    }
439
440    #[test]
441    fn parse_memory_id_returns_none_for_invalid_json() {
442        let stdout = b"not json";
443        assert_eq!(parse_memory_id(stdout), None);
444    }
445}