1use crate::chunking;
28use crate::cli::MemoryType;
29use crate::errors::AppError;
30use crate::i18n::errors_msg;
31use crate::output::{self, JsonOutputFormat};
32use crate::paths::AppPaths;
33use crate::storage::chunks as storage_chunks;
34use crate::storage::connection::{ensure_db_ready, open_rw};
35use crate::storage::entities::{NewEntity, NewRelationship};
36use crate::storage::memories::NewMemory;
37use crate::storage::{entities, memories, urls as storage_urls, versions};
38use rayon::prelude::*;
39use rusqlite::Connection;
40use serde::Serialize;
41use std::collections::BTreeSet;
42use std::path::{Path, PathBuf};
43use std::sync::Mutex;
44use unicode_normalization::UnicodeNormalization;
45
46const DERIVED_NAME_MAX_LEN: usize = 60;
49
50const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
53
54#[derive(clap::Args)]
55#[command(after_long_help = "EXAMPLES:\n \
56 # Ingest every Markdown file under ./docs as `document` memories\n \
57 sqlite-graphrag ingest ./docs --type document\n\n \
58 # Ingest .txt files recursively under ./notes\n \
59 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
60 # Skip BERT NER auto-extraction for faster bulk import\n \
61 sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n \
62NOTES:\n \
63 Each file becomes a separate memory. Names derive from file basenames\n \
64 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
65 followed by a final summary line with counts. Per-file errors are reported\n \
66 inline and processing continues unless --fail-fast is set.")]
67pub struct IngestArgs {
68 #[arg(
70 value_name = "DIR",
71 help = "Directory to ingest recursively (each matching file becomes a memory)"
72 )]
73 pub dir: PathBuf,
74
75 #[arg(long, value_enum)]
77 pub r#type: MemoryType,
78
79 #[arg(long, default_value = "*.md")]
82 pub pattern: String,
83
84 #[arg(long, default_value_t = false)]
86 pub recursive: bool,
87
88 #[arg(long, default_value_t = false)]
90 pub skip_extraction: bool,
91
92 #[arg(long, default_value_t = false)]
94 pub fail_fast: bool,
95
96 #[arg(long, default_value_t = 10_000)]
98 pub max_files: usize,
99
100 #[arg(long)]
102 pub namespace: Option<String>,
103
104 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
106 pub db: Option<String>,
107
108 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
109 pub format: JsonOutputFormat,
110
111 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
112 pub json: bool,
113
114 #[arg(
116 long,
117 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
118 )]
119 pub ingest_parallelism: Option<usize>,
120
121 #[arg(
129 long,
130 default_value_t = false,
131 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
132 Recommended for environments with <4 GB available RAM or container/cgroup \
133 constraints. Trade-off: 3-4x longer wall time. Also honored via \
134 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
135 )]
136 pub low_memory: bool,
137}
138
139fn env_low_memory_enabled() -> bool {
144 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
145 Ok(v) if v.is_empty() => false,
146 Ok(v) => match v.to_lowercase().as_str() {
147 "1" | "true" | "yes" | "on" => true,
148 "0" | "false" | "no" | "off" => false,
149 other => {
150 tracing::warn!(
151 target: "ingest",
152 value = %other,
153 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
154 );
155 false
156 }
157 },
158 Err(_) => false,
159 }
160}
161
162fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
174 let env_flag = env_low_memory_enabled();
175 let low_memory = low_memory_flag || env_flag;
176
177 if low_memory {
178 if let Some(n) = ingest_parallelism {
179 if n > 1 {
180 tracing::warn!(
181 target: "ingest",
182 requested = n,
183 "--ingest-parallelism overridden by --low-memory; using 1"
184 );
185 }
186 }
187 if low_memory_flag {
188 tracing::info!(
189 target: "ingest",
190 source = "flag",
191 "low-memory mode enabled: forcing --ingest-parallelism 1"
192 );
193 } else {
194 tracing::info!(
195 target: "ingest",
196 source = "env",
197 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
198 );
199 }
200 return 1;
201 }
202
203 ingest_parallelism
204 .unwrap_or_else(|| {
205 std::thread::available_parallelism()
206 .map(|v| v.get() / 2)
207 .unwrap_or(1)
208 .clamp(1, 4)
209 })
210 .max(1)
211}
212
213#[derive(Serialize)]
214struct IngestFileEvent<'a> {
215 file: &'a str,
216 name: &'a str,
217 status: &'a str,
218 truncated: bool,
220 #[serde(skip_serializing_if = "Option::is_none")]
222 original_name: Option<String>,
223 #[serde(skip_serializing_if = "Option::is_none")]
224 error: Option<String>,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 memory_id: Option<i64>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 action: Option<String>,
229}
230
231#[derive(Serialize)]
232struct IngestSummary {
233 summary: bool,
234 dir: String,
235 pattern: String,
236 recursive: bool,
237 files_total: usize,
238 files_succeeded: usize,
239 files_failed: usize,
240 files_skipped: usize,
241 elapsed_ms: u64,
242}
243
244struct FileSuccess {
246 memory_id: i64,
247 action: String,
248}
249
250struct StagedFile {
253 body: String,
254 body_hash: String,
255 snippet: String,
256 name: String,
257 description: String,
258 embedding: Vec<f32>,
259 chunk_embeddings: Option<Vec<Vec<f32>>>,
260 chunks_info: Vec<crate::chunking::Chunk>,
261 entities: Vec<NewEntity>,
262 relationships: Vec<NewRelationship>,
263 entity_embeddings: Vec<Vec<f32>>,
264 urls: Vec<crate::extraction::ExtractedUrl>,
265}
266
267fn stage_file(
270 _idx: usize,
271 path: &Path,
272 name: &str,
273 paths: &AppPaths,
274 skip_extraction: bool,
275) -> Result<StagedFile, AppError> {
276 use crate::constants::*;
277
278 if name.len() > MAX_MEMORY_NAME_LEN {
279 return Err(AppError::LimitExceeded(
280 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
281 ));
282 }
283 if name.starts_with("__") {
284 return Err(AppError::Validation(
285 crate::i18n::validation::reserved_name(),
286 ));
287 }
288 {
289 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
290 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
291 if !slug_re.is_match(name) {
292 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
293 name,
294 )));
295 }
296 }
297
298 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
299 if raw_body.len() > MAX_MEMORY_BODY_LEN {
300 return Err(AppError::LimitExceeded(
301 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
302 ));
303 }
304 if raw_body.trim().is_empty() {
305 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
306 }
307
308 let description = format!("ingested from {}", path.display());
309 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
310 return Err(AppError::Validation(
311 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
312 ));
313 }
314
315 let mut extracted_entities: Vec<NewEntity> = Vec::new();
316 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
317 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
318 if !skip_extraction {
319 match crate::extraction::extract_graph_auto(&raw_body, paths) {
320 Ok(extracted) => {
321 extracted_urls = extracted.urls;
322 extracted_entities = extracted.entities;
323 extracted_relationships = extracted.relationships;
324
325 if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
326 extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
327 }
328 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
329 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
330 }
331 }
332 Err(e) => {
333 tracing::warn!(
334 file = %path.display(),
335 "auto-extraction failed (graceful degradation): {e:#}"
336 );
337 }
338 }
339 }
340
341 for entity in &extracted_entities {
342 if !is_valid_entity_type(&entity.entity_type) {
343 return Err(AppError::Validation(format!(
344 "invalid entity_type '{}' for entity '{}'",
345 entity.entity_type, entity.name
346 )));
347 }
348 }
349 for rel in &mut extracted_relationships {
350 rel.relation = rel.relation.replace('-', "_");
351 if !is_valid_relation(&rel.relation) {
352 return Err(AppError::Validation(format!(
353 "invalid relation '{}' for relationship '{}' -> '{}'",
354 rel.relation, rel.source, rel.target
355 )));
356 }
357 if !(0.0..=1.0).contains(&rel.strength) {
358 return Err(AppError::Validation(format!(
359 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
360 rel.strength, rel.source, rel.target
361 )));
362 }
363 }
364
365 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
366 let snippet: String = raw_body.chars().take(200).collect();
367
368 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
369 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
370 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
371 return Err(AppError::LimitExceeded(format!(
372 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
373 chunks_info.len(),
374 REMEMBER_MAX_SAFE_MULTI_CHUNKS
375 )));
376 }
377
378 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
379 let embedding = if chunks_info.len() == 1 {
380 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
381 } else {
382 let chunk_texts: Vec<&str> = chunks_info
383 .iter()
384 .map(|c| chunking::chunk_text(&raw_body, c))
385 .collect();
386 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
387 for chunk_text in &chunk_texts {
388 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
389 &paths.models,
390 chunk_text,
391 )?);
392 }
393 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
394 chunk_embeddings_opt = Some(chunk_embeddings);
395 aggregated
396 };
397
398 let entity_embeddings = extracted_entities
399 .iter()
400 .map(|entity| {
401 let entity_text = match &entity.description {
402 Some(desc) => format!("{} {}", entity.name, desc),
403 None => entity.name.clone(),
404 };
405 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
406 })
407 .collect::<Result<Vec<_>, _>>()?;
408
409 Ok(StagedFile {
410 body: raw_body,
411 body_hash,
412 snippet,
413 name: name.to_string(),
414 description,
415 embedding,
416 chunk_embeddings: chunk_embeddings_opt,
417 chunks_info,
418 entities: extracted_entities,
419 relationships: extracted_relationships,
420 entity_embeddings,
421 urls: extracted_urls,
422 })
423}
424
425fn persist_staged(
427 conn: &mut Connection,
428 namespace: &str,
429 memory_type: &str,
430 staged: StagedFile,
431) -> Result<FileSuccess, AppError> {
432 {
433 let active_count: u32 = conn.query_row(
434 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
435 [],
436 |r| r.get::<_, i64>(0).map(|v| v as u32),
437 )?;
438 let ns_exists: bool = conn.query_row(
439 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
440 rusqlite::params![namespace],
441 |r| r.get::<_, i64>(0).map(|v| v > 0),
442 )?;
443 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
444 return Err(AppError::NamespaceError(format!(
445 "active namespace limit of {} exceeded while creating '{namespace}'",
446 crate::constants::MAX_NAMESPACES_ACTIVE
447 )));
448 }
449 }
450
451 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
452 if existing_memory.is_some() {
453 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
454 &staged.name,
455 namespace,
456 )));
457 }
458 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
459
460 let new_memory = NewMemory {
461 namespace: namespace.to_string(),
462 name: staged.name.clone(),
463 memory_type: memory_type.to_string(),
464 description: staged.description.clone(),
465 body: staged.body,
466 body_hash: staged.body_hash,
467 session_id: None,
468 source: "agent".to_string(),
469 metadata: serde_json::json!({}),
470 };
471
472 if let Some(hash_id) = duplicate_hash_id {
473 tracing::debug!(
474 target: "ingest",
475 duplicate_memory_id = hash_id,
476 "identical body already exists; persisting a new memory anyway"
477 );
478 }
479
480 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
481
482 let memory_id = memories::insert(&tx, &new_memory)?;
483 versions::insert_version(
484 &tx,
485 memory_id,
486 1,
487 &staged.name,
488 memory_type,
489 &staged.description,
490 &new_memory.body,
491 &serde_json::to_string(&new_memory.metadata)?,
492 None,
493 "create",
494 )?;
495 memories::upsert_vec(
496 &tx,
497 memory_id,
498 namespace,
499 memory_type,
500 &staged.embedding,
501 &staged.name,
502 &staged.snippet,
503 )?;
504
505 if staged.chunks_info.len() > 1 {
506 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
507 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
508 AppError::Internal(anyhow::anyhow!(
509 "missing chunk embeddings cache on multi-chunk ingest path"
510 ))
511 })?;
512 for (i, emb) in chunk_embeddings.iter().enumerate() {
513 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
514 }
515 }
516
517 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
518 for (idx, entity) in staged.entities.iter().enumerate() {
519 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
520 let entity_embedding = &staged.entity_embeddings[idx];
521 entities::upsert_entity_vec(
522 &tx,
523 entity_id,
524 namespace,
525 &entity.entity_type,
526 entity_embedding,
527 &entity.name,
528 )?;
529 entities::link_memory_entity(&tx, memory_id, entity_id)?;
530 entities::increment_degree(&tx, entity_id)?;
531 }
532 let entity_types: std::collections::HashMap<&str, &str> = staged
533 .entities
534 .iter()
535 .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
536 .collect();
537 for rel in &staged.relationships {
538 let source_entity = NewEntity {
539 name: rel.source.clone(),
540 entity_type: entity_types
541 .get(rel.source.as_str())
542 .copied()
543 .unwrap_or("concept")
544 .to_string(),
545 description: None,
546 };
547 let target_entity = NewEntity {
548 name: rel.target.clone(),
549 entity_type: entity_types
550 .get(rel.target.as_str())
551 .copied()
552 .unwrap_or("concept")
553 .to_string(),
554 description: None,
555 };
556 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
557 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
558 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
559 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
560 }
561 }
562
563 tx.commit()?;
564
565 if !staged.urls.is_empty() {
566 let url_entries: Vec<storage_urls::MemoryUrl> = staged
567 .urls
568 .into_iter()
569 .map(|u| storage_urls::MemoryUrl {
570 url: u.url,
571 offset: Some(u.offset as i64),
572 })
573 .collect();
574 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
575 }
576
577 Ok(FileSuccess {
578 memory_id,
579 action: "created".to_string(),
580 })
581}
582
583pub fn run(args: IngestArgs) -> Result<(), AppError> {
584 let started = std::time::Instant::now();
585
586 if !args.dir.exists() {
587 return Err(AppError::NotFound(format!(
588 "directory not found: {}",
589 args.dir.display()
590 )));
591 }
592 if !args.dir.is_dir() {
593 return Err(AppError::Validation(format!(
594 "path is not a directory: {}",
595 args.dir.display()
596 )));
597 }
598
599 let mut files: Vec<PathBuf> = Vec::new();
600 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
601 files.sort();
602
603 if files.len() > args.max_files {
604 return Err(AppError::Validation(format!(
605 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
606 files.len(),
607 args.max_files
608 )));
609 }
610
611 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
612 let memory_type_str = args.r#type.as_str().to_string();
613
614 let paths = AppPaths::resolve(args.db.as_deref())?;
615 let mut conn_or_err = match init_storage(&paths) {
616 Ok(c) => Ok(c),
617 Err(e) => Err(format!("{e}")),
618 };
619
620 let mut succeeded: usize = 0;
621 let mut failed: usize = 0;
622 let mut skipped: usize = 0;
623 let total = files.len();
624
625 let mut taken_names: BTreeSet<String> = BTreeSet::new();
628
629 struct FileSlot {
632 path: PathBuf,
633 file_str: String,
634 derived_name: String,
635 name_truncated: bool,
636 original_name: Option<String>,
637 }
638 enum Slot {
639 Skip {
640 file_str: String,
641 derived_base: String,
642 name_truncated: bool,
643 original_name: Option<String>,
644 reason: String,
645 },
646 Process(FileSlot),
647 }
648
649 let slots: Vec<Slot> = files
650 .iter()
651 .map(|path| {
652 let file_str = path.to_string_lossy().into_owned();
653 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
654
655 if derived_base.is_empty() {
656 return Slot::Skip {
657 file_str,
658 derived_base: String::new(),
659 name_truncated: false,
660 original_name: None,
661 reason: "could not derive a non-empty kebab-case name from filename"
662 .to_string(),
663 };
664 }
665
666 match unique_name(&derived_base, &taken_names) {
667 Ok(derived_name) => {
668 taken_names.insert(derived_name.clone());
669 Slot::Process(FileSlot {
670 path: path.clone(),
671 file_str,
672 derived_name,
673 name_truncated,
674 original_name,
675 })
676 }
677 Err(e) => Slot::Skip {
678 file_str,
679 derived_base,
680 name_truncated,
681 original_name,
682 reason: e.to_string(),
683 },
684 }
685 })
686 .collect();
687
688 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
691
692 let pool = rayon::ThreadPoolBuilder::new()
693 .num_threads(parallelism)
694 .build()
695 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
696
697 let staged: Vec<Mutex<Option<Result<StagedFile, AppError>>>> =
699 (0..slots.len()).map(|_| Mutex::new(None)).collect();
700
701 let skip_extraction = args.skip_extraction;
702 let paths_ref = &paths;
703
704 pool.install(|| {
705 slots.par_iter().enumerate().for_each(|(idx, slot)| {
706 if let Slot::Process(fs) = slot {
707 let result =
708 stage_file(idx, &fs.path, &fs.derived_name, paths_ref, skip_extraction);
709 *staged[idx].lock().expect("staged slot poisoned") = Some(result);
711 }
712 });
713 });
714
715 let fail_fast = args.fail_fast;
717 for (idx, slot) in slots.iter().enumerate() {
718 match slot {
719 Slot::Skip {
720 file_str,
721 derived_base,
722 name_truncated,
723 original_name,
724 reason,
725 } => {
726 output::emit_json_compact(&IngestFileEvent {
727 file: file_str,
728 name: derived_base,
729 status: "skipped",
730 truncated: *name_truncated,
731 original_name: original_name.clone(),
732 error: Some(reason.clone()),
733 memory_id: None,
734 action: None,
735 })?;
736 skipped += 1;
737 }
738 Slot::Process(fs) => {
739 let conn = match conn_or_err.as_mut() {
741 Ok(c) => c,
742 Err(err_msg) => {
743 let err_clone = err_msg.clone();
744 output::emit_json_compact(&IngestFileEvent {
745 file: &fs.file_str,
746 name: &fs.derived_name,
747 status: "failed",
748 truncated: fs.name_truncated,
749 original_name: fs.original_name.clone(),
750 error: Some(err_clone.clone()),
751 memory_id: None,
752 action: None,
753 })?;
754 failed += 1;
755 if fail_fast {
756 output::emit_json_compact(&IngestSummary {
757 summary: true,
758 dir: args.dir.display().to_string(),
759 pattern: args.pattern.clone(),
760 recursive: args.recursive,
761 files_total: total,
762 files_succeeded: succeeded,
763 files_failed: failed,
764 files_skipped: skipped,
765 elapsed_ms: started.elapsed().as_millis() as u64,
766 })?;
767 return Err(AppError::Validation(format!(
768 "ingest aborted on first failure: {err_clone}"
769 )));
770 }
771 continue;
772 }
773 };
774
775 let stage_result = staged[idx]
777 .lock()
778 .expect("staged slot poisoned")
779 .take()
780 .expect("staged slot empty for Process slot");
781
782 let outcome = stage_result
783 .and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
784
785 match outcome {
786 Ok(FileSuccess { memory_id, action }) => {
787 output::emit_json_compact(&IngestFileEvent {
788 file: &fs.file_str,
789 name: &fs.derived_name,
790 status: "indexed",
791 truncated: fs.name_truncated,
792 original_name: fs.original_name.clone(),
793 error: None,
794 memory_id: Some(memory_id),
795 action: Some(action),
796 })?;
797 succeeded += 1;
798 }
799 Err(e) => {
800 let err_msg = format!("{e}");
801 output::emit_json_compact(&IngestFileEvent {
802 file: &fs.file_str,
803 name: &fs.derived_name,
804 status: "failed",
805 truncated: fs.name_truncated,
806 original_name: fs.original_name.clone(),
807 error: Some(err_msg.clone()),
808 memory_id: None,
809 action: None,
810 })?;
811 failed += 1;
812 if fail_fast {
813 output::emit_json_compact(&IngestSummary {
814 summary: true,
815 dir: args.dir.display().to_string(),
816 pattern: args.pattern.clone(),
817 recursive: args.recursive,
818 files_total: total,
819 files_succeeded: succeeded,
820 files_failed: failed,
821 files_skipped: skipped,
822 elapsed_ms: started.elapsed().as_millis() as u64,
823 })?;
824 return Err(AppError::Validation(format!(
825 "ingest aborted on first failure: {err_msg}"
826 )));
827 }
828 }
829 }
830 }
831 }
832 }
833
834 output::emit_json_compact(&IngestSummary {
835 summary: true,
836 dir: args.dir.display().to_string(),
837 pattern: args.pattern.clone(),
838 recursive: args.recursive,
839 files_total: total,
840 files_succeeded: succeeded,
841 files_failed: failed,
842 files_skipped: skipped,
843 elapsed_ms: started.elapsed().as_millis() as u64,
844 })?;
845
846 Ok(())
847}
848
849fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
855 ensure_db_ready(paths)?;
856 let conn = open_rw(&paths.db)?;
857 Ok(conn)
858}
859
860fn is_valid_entity_type(entity_type: &str) -> bool {
861 matches!(
862 entity_type,
863 "project"
864 | "tool"
865 | "person"
866 | "file"
867 | "concept"
868 | "incident"
869 | "decision"
870 | "memory"
871 | "dashboard"
872 | "issue_tracker"
873 | "organization"
874 | "location"
875 | "date"
876 )
877}
878
879fn is_valid_relation(relation: &str) -> bool {
880 matches!(
881 relation,
882 "applies_to"
883 | "uses"
884 | "depends_on"
885 | "causes"
886 | "fixes"
887 | "contradicts"
888 | "supports"
889 | "follows"
890 | "related"
891 | "mentions"
892 | "replaces"
893 | "tracked_in"
894 )
895}
896
897fn collect_files(
898 dir: &Path,
899 pattern: &str,
900 recursive: bool,
901 out: &mut Vec<PathBuf>,
902) -> Result<(), AppError> {
903 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
904 for entry in entries {
905 let entry = entry.map_err(AppError::Io)?;
906 let path = entry.path();
907 let file_type = entry.file_type().map_err(AppError::Io)?;
908 if file_type.is_file() {
909 let name = entry.file_name();
910 let name_str = name.to_string_lossy();
911 if matches_pattern(&name_str, pattern) {
912 out.push(path);
913 }
914 } else if file_type.is_dir() && recursive {
915 collect_files(&path, pattern, recursive, out)?;
916 }
917 }
918 Ok(())
919}
920
921fn matches_pattern(name: &str, pattern: &str) -> bool {
922 if let Some(suffix) = pattern.strip_prefix('*') {
923 name.ends_with(suffix)
924 } else if let Some(prefix) = pattern.strip_suffix('*') {
925 name.starts_with(prefix)
926 } else {
927 name == pattern
928 }
929}
930
931fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
942 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
943 let lowered: String = stem
944 .nfd()
945 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
946 .map(|c| {
947 if c == '_' || c.is_whitespace() {
948 '-'
949 } else {
950 c
951 }
952 })
953 .map(|c| c.to_ascii_lowercase())
954 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
955 .collect();
956 let collapsed = collapse_dashes(&lowered);
957 let trimmed = collapsed.trim_matches('-').to_string();
958 if trimmed.len() > DERIVED_NAME_MAX_LEN {
959 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
960 .trim_matches('-')
961 .to_string();
962 tracing::warn!(
965 target: "ingest",
966 original = %trimmed,
967 truncated_to = %truncated,
968 max_len = DERIVED_NAME_MAX_LEN,
969 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
970 );
971 (truncated, true, Some(trimmed))
972 } else {
973 (trimmed, false, None)
974 }
975}
976
977fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
990 if !taken.contains(base) {
991 return Ok(base.to_string());
992 }
993 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
994 let candidate = format!("{base}-{suffix}");
995 if !taken.contains(&candidate) {
996 tracing::warn!(
997 target: "ingest",
998 base = %base,
999 resolved = %candidate,
1000 suffix,
1001 "memory name collision resolved with numeric suffix"
1002 );
1003 return Ok(candidate);
1004 }
1005 }
1006 Err(AppError::Validation(format!(
1007 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1008 )))
1009}
1010
1011fn collapse_dashes(s: &str) -> String {
1012 let mut out = String::with_capacity(s.len());
1013 let mut prev_dash = false;
1014 for c in s.chars() {
1015 if c == '-' {
1016 if !prev_dash {
1017 out.push('-');
1018 }
1019 prev_dash = true;
1020 } else {
1021 out.push(c);
1022 prev_dash = false;
1023 }
1024 }
1025 out
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use std::path::PathBuf;
1032
1033 #[test]
1034 fn matches_pattern_suffix() {
1035 assert!(matches_pattern("foo.md", "*.md"));
1036 assert!(!matches_pattern("foo.txt", "*.md"));
1037 assert!(matches_pattern("foo.md", "*"));
1038 }
1039
1040 #[test]
1041 fn matches_pattern_prefix() {
1042 assert!(matches_pattern("README.md", "README*"));
1043 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1044 }
1045
1046 #[test]
1047 fn matches_pattern_exact() {
1048 assert!(matches_pattern("README.md", "README.md"));
1049 assert!(!matches_pattern("readme.md", "README.md"));
1050 }
1051
1052 #[test]
1053 fn derive_kebab_underscore_to_dash() {
1054 let p = PathBuf::from("/tmp/claude_code_headless.md");
1055 let (name, truncated, original) = derive_kebab_name(&p);
1056 assert_eq!(name, "claude-code-headless");
1057 assert!(!truncated);
1058 assert!(original.is_none());
1059 }
1060
1061 #[test]
1062 fn derive_kebab_uppercase_lowered() {
1063 let p = PathBuf::from("/tmp/README.md");
1064 let (name, truncated, original) = derive_kebab_name(&p);
1065 assert_eq!(name, "readme");
1066 assert!(!truncated);
1067 assert!(original.is_none());
1068 }
1069
1070 #[test]
1071 fn derive_kebab_strips_non_kebab_chars() {
1072 let p = PathBuf::from("/tmp/some@weird#name!.md");
1073 let (name, truncated, original) = derive_kebab_name(&p);
1074 assert_eq!(name, "someweirdname");
1075 assert!(!truncated);
1076 assert!(original.is_none());
1077 }
1078
1079 #[test]
1082 fn derive_kebab_folds_accented_letters_to_ascii() {
1083 let p = PathBuf::from("/tmp/açaí.md");
1084 let (name, _, _) = derive_kebab_name(&p);
1085 assert_eq!(name, "acai", "got '{name}'");
1086 }
1087
1088 #[test]
1089 fn derive_kebab_handles_naive_with_diaeresis() {
1090 let p = PathBuf::from("/tmp/naïve-test.md");
1091 let (name, _, _) = derive_kebab_name(&p);
1092 assert_eq!(name, "naive-test", "got '{name}'");
1093 }
1094
1095 #[test]
1096 fn derive_kebab_drops_emoji_keeps_word() {
1097 let p = PathBuf::from("/tmp/🚀-rocket.md");
1098 let (name, _, _) = derive_kebab_name(&p);
1099 assert_eq!(name, "rocket", "got '{name}'");
1100 }
1101
1102 #[test]
1103 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1104 let p = PathBuf::from("/tmp/açaí🦜.md");
1105 let (name, _, _) = derive_kebab_name(&p);
1106 assert_eq!(name, "acai", "got '{name}'");
1107 }
1108
1109 #[test]
1110 fn derive_kebab_pure_emoji_yields_empty() {
1111 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1112 let (name, _, _) = derive_kebab_name(&p);
1113 assert!(name.is_empty(), "got '{name}'");
1114 }
1115
1116 #[test]
1117 fn derive_kebab_collapses_consecutive_dashes() {
1118 let p = PathBuf::from("/tmp/a__b___c.md");
1119 let (name, truncated, original) = derive_kebab_name(&p);
1120 assert_eq!(name, "a-b-c");
1121 assert!(!truncated);
1122 assert!(original.is_none());
1123 }
1124
1125 #[test]
1126 fn derive_kebab_truncates_to_60_chars() {
1127 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1128 let (name, truncated, original) = derive_kebab_name(&p);
1129 assert!(name.len() <= 60, "got len {}", name.len());
1130 assert!(truncated);
1131 assert!(original.is_some());
1132 assert!(original.unwrap().len() > 60);
1133 }
1134
1135 #[test]
1136 fn collect_files_finds_md_files() {
1137 let tmp = tempfile::tempdir().expect("tempdir");
1138 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1139 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1140 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1141 let mut out = Vec::new();
1142 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1143 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1144 }
1145
1146 #[test]
1147 fn collect_files_recursive_descends_subdirs() {
1148 let tmp = tempfile::tempdir().expect("tempdir");
1149 let sub = tmp.path().join("sub");
1150 std::fs::create_dir(&sub).unwrap();
1151 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1152 std::fs::write(sub.join("b.md"), "y").unwrap();
1153 let mut out = Vec::new();
1154 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1155 assert_eq!(out.len(), 2);
1156 }
1157
1158 #[test]
1159 fn collect_files_non_recursive_skips_subdirs() {
1160 let tmp = tempfile::tempdir().expect("tempdir");
1161 let sub = tmp.path().join("sub");
1162 std::fs::create_dir(&sub).unwrap();
1163 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1164 std::fs::write(sub.join("b.md"), "y").unwrap();
1165 let mut out = Vec::new();
1166 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1167 assert_eq!(out.len(), 1);
1168 }
1169
1170 #[test]
1173 fn derive_kebab_long_basename_truncated_within_cap() {
1174 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1175 let (name, truncated, original) = derive_kebab_name(&p);
1176 assert!(
1177 name.len() <= DERIVED_NAME_MAX_LEN,
1178 "truncated name must respect cap; got {} chars",
1179 name.len()
1180 );
1181 assert!(!name.is_empty());
1182 assert!(truncated);
1183 assert!(original.is_some());
1184 }
1185
1186 #[test]
1187 fn unique_name_returns_base_when_free() {
1188 let taken: BTreeSet<String> = BTreeSet::new();
1189 let resolved = unique_name("note", &taken).expect("must resolve");
1190 assert_eq!(resolved, "note");
1191 }
1192
1193 #[test]
1194 fn unique_name_appends_first_free_suffix_on_collision() {
1195 let mut taken: BTreeSet<String> = BTreeSet::new();
1196 taken.insert("note".to_string());
1197 taken.insert("note-1".to_string());
1198 let resolved = unique_name("note", &taken).expect("must resolve");
1199 assert_eq!(resolved, "note-2");
1200 }
1201
1202 #[test]
1203 fn unique_name_errors_after_collision_cap() {
1204 let mut taken: BTreeSet<String> = BTreeSet::new();
1205 taken.insert("note".to_string());
1206 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1207 taken.insert(format!("note-{i}"));
1208 }
1209 let err = unique_name("note", &taken).expect_err("must surface error");
1210 assert!(matches!(err, AppError::Validation(_)));
1211 }
1212
1213 #[test]
1216 fn is_valid_entity_type_accepts_v008_types() {
1217 assert!(is_valid_entity_type("organization"));
1218 assert!(is_valid_entity_type("location"));
1219 assert!(is_valid_entity_type("date"));
1220 assert!(!is_valid_entity_type("unknown"));
1221 }
1222
1223 #[test]
1224 fn is_valid_relation_accepts_canonical_relations() {
1225 assert!(is_valid_relation("applies_to"));
1226 assert!(is_valid_relation("depends_on"));
1227 assert!(!is_valid_relation("foo_bar"));
1228 }
1229
1230 use serial_test::serial;
1233
1234 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1236 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1237 let prev = std::env::var(key).ok();
1238 match value {
1239 Some(v) => std::env::set_var(key, v),
1240 None => std::env::remove_var(key),
1241 }
1242 f();
1243 match prev {
1244 Some(p) => std::env::set_var(key, p),
1245 None => std::env::remove_var(key),
1246 }
1247 }
1248
1249 #[test]
1250 #[serial]
1251 fn env_low_memory_enabled_unset_returns_false() {
1252 with_env_var(None, || assert!(!env_low_memory_enabled()));
1253 }
1254
1255 #[test]
1256 #[serial]
1257 fn env_low_memory_enabled_empty_returns_false() {
1258 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1259 }
1260
1261 #[test]
1262 #[serial]
1263 fn env_low_memory_enabled_truthy_values_return_true() {
1264 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1265 with_env_var(Some(v), || {
1266 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1267 });
1268 }
1269 }
1270
1271 #[test]
1272 #[serial]
1273 fn env_low_memory_enabled_falsy_values_return_false() {
1274 for v in ["0", "false", "FALSE", "no", "off"] {
1275 with_env_var(Some(v), || {
1276 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1277 });
1278 }
1279 }
1280
1281 #[test]
1282 #[serial]
1283 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1284 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1285 }
1286
1287 #[test]
1288 #[serial]
1289 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1290 with_env_var(None, || {
1291 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1292 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1293 assert_eq!(resolve_parallelism(true, None), 1);
1294 });
1295 }
1296
1297 #[test]
1298 #[serial]
1299 fn resolve_parallelism_env_forces_one_when_flag_off() {
1300 with_env_var(Some("1"), || {
1301 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1302 assert_eq!(resolve_parallelism(false, None), 1);
1303 });
1304 }
1305
1306 #[test]
1307 #[serial]
1308 fn resolve_parallelism_falsy_env_does_not_override() {
1309 with_env_var(Some("0"), || {
1310 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1311 });
1312 }
1313
1314 #[test]
1315 #[serial]
1316 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1317 with_env_var(None, || {
1318 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1319 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1320 });
1321 }
1322
1323 #[test]
1324 #[serial]
1325 fn resolve_parallelism_default_when_unset() {
1326 with_env_var(None, || {
1327 let p = resolve_parallelism(false, None);
1328 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1329 });
1330 }
1331
1332 #[test]
1333 fn ingest_args_parses_low_memory_flag_via_clap() {
1334 use clap::Parser;
1335 let cli = crate::cli::Cli::try_parse_from([
1338 "sqlite-graphrag",
1339 "ingest",
1340 "/tmp/dummy",
1341 "--type",
1342 "document",
1343 "--low-memory",
1344 ])
1345 .expect("parse must succeed");
1346 match cli.command {
1347 crate::cli::Commands::Ingest(args) => {
1348 assert!(args.low_memory, "--low-memory must set field to true");
1349 }
1350 _ => panic!("expected Ingest subcommand"),
1351 }
1352 }
1353
1354 #[test]
1355 fn ingest_args_low_memory_defaults_false() {
1356 use clap::Parser;
1357 let cli = crate::cli::Cli::try_parse_from([
1358 "sqlite-graphrag",
1359 "ingest",
1360 "/tmp/dummy",
1361 "--type",
1362 "document",
1363 ])
1364 .expect("parse must succeed");
1365 match cli.command {
1366 crate::cli::Commands::Ingest(args) => {
1367 assert!(!args.low_memory, "default must be false");
1368 }
1369 _ => panic!("expected Ingest subcommand"),
1370 }
1371 }
1372}