1use crate::chunking;
28use crate::cli::MemoryType;
29use crate::errors::AppError;
30use crate::i18n::errors_msg;
31use crate::output::{self, JsonOutputFormat};
32use crate::paths::AppPaths;
33use crate::storage::chunks as storage_chunks;
34use crate::storage::connection::{ensure_db_ready, open_rw};
35use crate::storage::entities::{NewEntity, NewRelationship};
36use crate::storage::memories::NewMemory;
37use crate::storage::{entities, memories, urls as storage_urls, versions};
38use rayon::prelude::*;
39use rusqlite::Connection;
40use serde::Serialize;
41use std::collections::BTreeSet;
42use std::path::{Path, PathBuf};
43use std::sync::Mutex;
44use unicode_normalization::UnicodeNormalization;
45
46use crate::constants::DERIVED_NAME_MAX_LEN;
47
48const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
51
52#[derive(clap::Args)]
53#[command(after_long_help = "EXAMPLES:\n \
54 # Ingest every Markdown file under ./docs as `document` memories\n \
55 sqlite-graphrag ingest ./docs --type document\n\n \
56 # Ingest .txt files recursively under ./notes\n \
57 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
58 # Skip BERT NER auto-extraction for faster bulk import\n \
59 sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n \
60NOTES:\n \
61 Each file becomes a separate memory. Names derive from file basenames\n \
62 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
63 followed by a final summary line with counts. Per-file errors are reported\n \
64 inline and processing continues unless --fail-fast is set.")]
65pub struct IngestArgs {
66 #[arg(
68 value_name = "DIR",
69 help = "Directory to ingest recursively (each matching file becomes a memory)"
70 )]
71 pub dir: PathBuf,
72
73 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
75 pub r#type: MemoryType,
76
77 #[arg(long, default_value = "*.md")]
80 pub pattern: String,
81
82 #[arg(long, default_value_t = false)]
84 pub recursive: bool,
85
86 #[arg(long, default_value_t = false)]
88 pub skip_extraction: bool,
89
90 #[arg(long, default_value_t = false)]
92 pub fail_fast: bool,
93
94 #[arg(long, default_value_t = 10_000)]
96 pub max_files: usize,
97
98 #[arg(long)]
100 pub namespace: Option<String>,
101
102 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
104 pub db: Option<String>,
105
106 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
107 pub format: JsonOutputFormat,
108
109 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
110 pub json: bool,
111
112 #[arg(
114 long,
115 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
116 )]
117 pub ingest_parallelism: Option<usize>,
118
119 #[arg(
127 long,
128 default_value_t = false,
129 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
130 Recommended for environments with <4 GB available RAM or container/cgroup \
131 constraints. Trade-off: 3-4x longer wall time. Also honored via \
132 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
133 )]
134 pub low_memory: bool,
135}
136
137fn env_low_memory_enabled() -> bool {
142 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
143 Ok(v) if v.is_empty() => false,
144 Ok(v) => match v.to_lowercase().as_str() {
145 "1" | "true" | "yes" | "on" => true,
146 "0" | "false" | "no" | "off" => false,
147 other => {
148 tracing::warn!(
149 target: "ingest",
150 value = %other,
151 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
152 );
153 false
154 }
155 },
156 Err(_) => false,
157 }
158}
159
160fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
172 let env_flag = env_low_memory_enabled();
173 let low_memory = low_memory_flag || env_flag;
174
175 if low_memory {
176 if let Some(n) = ingest_parallelism {
177 if n > 1 {
178 tracing::warn!(
179 target: "ingest",
180 requested = n,
181 "--ingest-parallelism overridden by --low-memory; using 1"
182 );
183 }
184 }
185 if low_memory_flag {
186 tracing::info!(
187 target: "ingest",
188 source = "flag",
189 "low-memory mode enabled: forcing --ingest-parallelism 1"
190 );
191 } else {
192 tracing::info!(
193 target: "ingest",
194 source = "env",
195 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
196 );
197 }
198 return 1;
199 }
200
201 ingest_parallelism
202 .unwrap_or_else(|| {
203 std::thread::available_parallelism()
204 .map(|v| v.get() / 2)
205 .unwrap_or(1)
206 .clamp(1, 4)
207 })
208 .max(1)
209}
210
211#[derive(Serialize)]
212struct IngestFileEvent<'a> {
213 file: &'a str,
214 name: &'a str,
215 status: &'a str,
216 truncated: bool,
218 #[serde(skip_serializing_if = "Option::is_none")]
220 original_name: Option<String>,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 error: Option<String>,
223 #[serde(skip_serializing_if = "Option::is_none")]
224 memory_id: Option<i64>,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 action: Option<String>,
227}
228
229#[derive(Serialize)]
230struct IngestSummary {
231 summary: bool,
232 dir: String,
233 pattern: String,
234 recursive: bool,
235 files_total: usize,
236 files_succeeded: usize,
237 files_failed: usize,
238 files_skipped: usize,
239 elapsed_ms: u64,
240}
241
242struct FileSuccess {
244 memory_id: i64,
245 action: String,
246}
247
248struct StagedFile {
251 body: String,
252 body_hash: String,
253 snippet: String,
254 name: String,
255 description: String,
256 embedding: Vec<f32>,
257 chunk_embeddings: Option<Vec<Vec<f32>>>,
258 chunks_info: Vec<crate::chunking::Chunk>,
259 entities: Vec<NewEntity>,
260 relationships: Vec<NewRelationship>,
261 entity_embeddings: Vec<Vec<f32>>,
262 urls: Vec<crate::extraction::ExtractedUrl>,
263}
264
265fn stage_file(
268 _idx: usize,
269 path: &Path,
270 name: &str,
271 paths: &AppPaths,
272 skip_extraction: bool,
273) -> Result<StagedFile, AppError> {
274 use crate::constants::*;
275
276 if name.len() > MAX_MEMORY_NAME_LEN {
277 return Err(AppError::LimitExceeded(
278 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
279 ));
280 }
281 if name.starts_with("__") {
282 return Err(AppError::Validation(
283 crate::i18n::validation::reserved_name(),
284 ));
285 }
286 {
287 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
288 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
289 if !slug_re.is_match(name) {
290 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
291 name,
292 )));
293 }
294 }
295
296 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
297 if raw_body.len() > MAX_MEMORY_BODY_LEN {
298 return Err(AppError::LimitExceeded(
299 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
300 ));
301 }
302 if raw_body.trim().is_empty() {
303 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
304 }
305
306 let description = format!("ingested from {}", path.display());
307 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
308 return Err(AppError::Validation(
309 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
310 ));
311 }
312
313 let mut extracted_entities: Vec<NewEntity> = Vec::new();
314 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
315 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
316 if !skip_extraction {
317 match crate::extraction::extract_graph_auto(&raw_body, paths) {
318 Ok(extracted) => {
319 extracted_urls = extracted.urls;
320 extracted_entities = extracted.entities;
321 extracted_relationships = extracted.relationships;
322
323 if extracted_entities.len() > MAX_ENTITIES_PER_MEMORY {
324 extracted_entities.truncate(MAX_ENTITIES_PER_MEMORY);
325 }
326 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
327 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
328 }
329 }
330 Err(e) => {
331 tracing::warn!(
332 file = %path.display(),
333 "auto-extraction failed (graceful degradation): {e:#}"
334 );
335 }
336 }
337 }
338
339 for entity in &extracted_entities {
340 if !is_valid_entity_type(&entity.entity_type) {
341 return Err(AppError::Validation(format!(
342 "invalid entity_type '{}' for entity '{}'",
343 entity.entity_type, entity.name
344 )));
345 }
346 }
347 for rel in &mut extracted_relationships {
348 rel.relation = rel.relation.replace('-', "_");
349 if !is_valid_relation(&rel.relation) {
350 return Err(AppError::Validation(format!(
351 "invalid relation '{}' for relationship '{}' -> '{}'",
352 rel.relation, rel.source, rel.target
353 )));
354 }
355 if !(0.0..=1.0).contains(&rel.strength) {
356 return Err(AppError::Validation(format!(
357 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
358 rel.strength, rel.source, rel.target
359 )));
360 }
361 }
362
363 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
364 let snippet: String = raw_body.chars().take(200).collect();
365
366 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
367 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
368 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
369 return Err(AppError::LimitExceeded(format!(
370 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
371 chunks_info.len(),
372 REMEMBER_MAX_SAFE_MULTI_CHUNKS
373 )));
374 }
375
376 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
377 let embedding = if chunks_info.len() == 1 {
378 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
379 } else {
380 let chunk_texts: Vec<&str> = chunks_info
381 .iter()
382 .map(|c| chunking::chunk_text(&raw_body, c))
383 .collect();
384 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
385 for chunk_text in &chunk_texts {
386 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
387 &paths.models,
388 chunk_text,
389 )?);
390 }
391 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
392 chunk_embeddings_opt = Some(chunk_embeddings);
393 aggregated
394 };
395
396 let entity_embeddings = extracted_entities
397 .iter()
398 .map(|entity| {
399 let entity_text = match &entity.description {
400 Some(desc) => format!("{} {}", entity.name, desc),
401 None => entity.name.clone(),
402 };
403 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
404 })
405 .collect::<Result<Vec<_>, _>>()?;
406
407 Ok(StagedFile {
408 body: raw_body,
409 body_hash,
410 snippet,
411 name: name.to_string(),
412 description,
413 embedding,
414 chunk_embeddings: chunk_embeddings_opt,
415 chunks_info,
416 entities: extracted_entities,
417 relationships: extracted_relationships,
418 entity_embeddings,
419 urls: extracted_urls,
420 })
421}
422
423fn persist_staged(
425 conn: &mut Connection,
426 namespace: &str,
427 memory_type: &str,
428 staged: StagedFile,
429) -> Result<FileSuccess, AppError> {
430 {
431 let active_count: u32 = conn.query_row(
432 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
433 [],
434 |r| r.get::<_, i64>(0).map(|v| v as u32),
435 )?;
436 let ns_exists: bool = conn.query_row(
437 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
438 rusqlite::params![namespace],
439 |r| r.get::<_, i64>(0).map(|v| v > 0),
440 )?;
441 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
442 return Err(AppError::NamespaceError(format!(
443 "active namespace limit of {} exceeded while creating '{namespace}'",
444 crate::constants::MAX_NAMESPACES_ACTIVE
445 )));
446 }
447 }
448
449 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
450 if existing_memory.is_some() {
451 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
452 &staged.name,
453 namespace,
454 )));
455 }
456 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
457
458 let new_memory = NewMemory {
459 namespace: namespace.to_string(),
460 name: staged.name.clone(),
461 memory_type: memory_type.to_string(),
462 description: staged.description.clone(),
463 body: staged.body,
464 body_hash: staged.body_hash,
465 session_id: None,
466 source: "agent".to_string(),
467 metadata: serde_json::json!({}),
468 };
469
470 if let Some(hash_id) = duplicate_hash_id {
471 tracing::debug!(
472 target: "ingest",
473 duplicate_memory_id = hash_id,
474 "identical body already exists; persisting a new memory anyway"
475 );
476 }
477
478 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
479
480 let memory_id = memories::insert(&tx, &new_memory)?;
481 versions::insert_version(
482 &tx,
483 memory_id,
484 1,
485 &staged.name,
486 memory_type,
487 &staged.description,
488 &new_memory.body,
489 &serde_json::to_string(&new_memory.metadata)?,
490 None,
491 "create",
492 )?;
493 memories::upsert_vec(
494 &tx,
495 memory_id,
496 namespace,
497 memory_type,
498 &staged.embedding,
499 &staged.name,
500 &staged.snippet,
501 )?;
502
503 if staged.chunks_info.len() > 1 {
504 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
505 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
506 AppError::Internal(anyhow::anyhow!(
507 "missing chunk embeddings cache on multi-chunk ingest path"
508 ))
509 })?;
510 for (i, emb) in chunk_embeddings.iter().enumerate() {
511 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
512 }
513 }
514
515 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
516 for (idx, entity) in staged.entities.iter().enumerate() {
517 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
518 let entity_embedding = &staged.entity_embeddings[idx];
519 entities::upsert_entity_vec(
520 &tx,
521 entity_id,
522 namespace,
523 &entity.entity_type,
524 entity_embedding,
525 &entity.name,
526 )?;
527 entities::link_memory_entity(&tx, memory_id, entity_id)?;
528 entities::increment_degree(&tx, entity_id)?;
529 }
530 let entity_types: std::collections::HashMap<&str, &str> = staged
531 .entities
532 .iter()
533 .map(|entity| (entity.name.as_str(), entity.entity_type.as_str()))
534 .collect();
535 for rel in &staged.relationships {
536 let source_entity = NewEntity {
537 name: rel.source.clone(),
538 entity_type: entity_types
539 .get(rel.source.as_str())
540 .copied()
541 .unwrap_or("concept")
542 .to_string(),
543 description: None,
544 };
545 let target_entity = NewEntity {
546 name: rel.target.clone(),
547 entity_type: entity_types
548 .get(rel.target.as_str())
549 .copied()
550 .unwrap_or("concept")
551 .to_string(),
552 description: None,
553 };
554 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
555 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
556 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
557 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
558 }
559 }
560
561 tx.commit()?;
562
563 if !staged.urls.is_empty() {
564 let url_entries: Vec<storage_urls::MemoryUrl> = staged
565 .urls
566 .into_iter()
567 .map(|u| storage_urls::MemoryUrl {
568 url: u.url,
569 offset: Some(u.offset as i64),
570 })
571 .collect();
572 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
573 }
574
575 Ok(FileSuccess {
576 memory_id,
577 action: "created".to_string(),
578 })
579}
580
581pub fn run(args: IngestArgs) -> Result<(), AppError> {
582 let started = std::time::Instant::now();
583
584 if !args.dir.exists() {
585 return Err(AppError::NotFound(format!(
586 "directory not found: {}",
587 args.dir.display()
588 )));
589 }
590 if !args.dir.is_dir() {
591 return Err(AppError::Validation(format!(
592 "path is not a directory: {}",
593 args.dir.display()
594 )));
595 }
596
597 let mut files: Vec<PathBuf> = Vec::new();
598 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
599 files.sort();
600
601 if files.len() > args.max_files {
602 return Err(AppError::Validation(format!(
603 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
604 files.len(),
605 args.max_files
606 )));
607 }
608
609 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
610 let memory_type_str = args.r#type.as_str().to_string();
611
612 let paths = AppPaths::resolve(args.db.as_deref())?;
613 let mut conn_or_err = match init_storage(&paths) {
614 Ok(c) => Ok(c),
615 Err(e) => Err(format!("{e}")),
616 };
617
618 let mut succeeded: usize = 0;
619 let mut failed: usize = 0;
620 let mut skipped: usize = 0;
621 let total = files.len();
622
623 let mut taken_names: BTreeSet<String> = BTreeSet::new();
626
627 struct FileSlot {
630 path: PathBuf,
631 file_str: String,
632 derived_name: String,
633 name_truncated: bool,
634 original_name: Option<String>,
635 }
636 enum Slot {
637 Skip {
638 file_str: String,
639 derived_base: String,
640 name_truncated: bool,
641 original_name: Option<String>,
642 reason: String,
643 },
644 Process(FileSlot),
645 }
646
647 let slots: Vec<Slot> = files
648 .iter()
649 .map(|path| {
650 let file_str = path.to_string_lossy().into_owned();
651 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
652
653 if derived_base.is_empty() {
654 return Slot::Skip {
655 file_str,
656 derived_base: String::new(),
657 name_truncated: false,
658 original_name: None,
659 reason: "could not derive a non-empty kebab-case name from filename"
660 .to_string(),
661 };
662 }
663
664 match unique_name(&derived_base, &taken_names) {
665 Ok(derived_name) => {
666 taken_names.insert(derived_name.clone());
667 Slot::Process(FileSlot {
668 path: path.clone(),
669 file_str,
670 derived_name,
671 name_truncated,
672 original_name,
673 })
674 }
675 Err(e) => Slot::Skip {
676 file_str,
677 derived_base,
678 name_truncated,
679 original_name,
680 reason: e.to_string(),
681 },
682 }
683 })
684 .collect();
685
686 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
689
690 let pool = rayon::ThreadPoolBuilder::new()
691 .num_threads(parallelism)
692 .build()
693 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
694
695 let staged: Vec<Mutex<Option<Result<StagedFile, AppError>>>> =
697 (0..slots.len()).map(|_| Mutex::new(None)).collect();
698
699 let skip_extraction = args.skip_extraction;
700 let paths_ref = &paths;
701
702 let total_to_process = slots
703 .iter()
704 .filter(|s| matches!(s, Slot::Process(_)))
705 .count();
706 tracing::info!(
707 target = "ingest",
708 phase = "stage_start",
709 files = total_to_process,
710 ingest_parallelism = parallelism,
711 "phase A (stage) starting: chunk + embed + NER on rayon pool",
712 );
713 let staged_done = std::sync::atomic::AtomicUsize::new(0);
714
715 pool.install(|| {
716 slots.par_iter().enumerate().for_each(|(idx, slot)| {
717 if let Slot::Process(fs) = slot {
718 let result =
719 stage_file(idx, &fs.path, &fs.derived_name, paths_ref, skip_extraction);
720 *staged[idx].lock().expect("staged slot poisoned") = Some(result);
722 let done = staged_done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
723 if done % 10 == 0 || done == total_to_process {
724 tracing::info!(
725 target = "ingest",
726 phase = "stage_progress",
727 done = done,
728 total = total_to_process,
729 "phase A progress",
730 );
731 }
732 }
733 });
734 });
735
736 tracing::info!(
737 target = "ingest",
738 phase = "persist_start",
739 files = total_to_process,
740 "phase B (persist) starting: sequential writes to SQLite + NDJSON emit",
741 );
742
743 let fail_fast = args.fail_fast;
745 for (idx, slot) in slots.iter().enumerate() {
746 match slot {
747 Slot::Skip {
748 file_str,
749 derived_base,
750 name_truncated,
751 original_name,
752 reason,
753 } => {
754 output::emit_json_compact(&IngestFileEvent {
755 file: file_str,
756 name: derived_base,
757 status: "skipped",
758 truncated: *name_truncated,
759 original_name: original_name.clone(),
760 error: Some(reason.clone()),
761 memory_id: None,
762 action: None,
763 })?;
764 skipped += 1;
765 }
766 Slot::Process(fs) => {
767 let conn = match conn_or_err.as_mut() {
769 Ok(c) => c,
770 Err(err_msg) => {
771 let err_clone = err_msg.clone();
772 output::emit_json_compact(&IngestFileEvent {
773 file: &fs.file_str,
774 name: &fs.derived_name,
775 status: "failed",
776 truncated: fs.name_truncated,
777 original_name: fs.original_name.clone(),
778 error: Some(err_clone.clone()),
779 memory_id: None,
780 action: None,
781 })?;
782 failed += 1;
783 if fail_fast {
784 output::emit_json_compact(&IngestSummary {
785 summary: true,
786 dir: args.dir.display().to_string(),
787 pattern: args.pattern.clone(),
788 recursive: args.recursive,
789 files_total: total,
790 files_succeeded: succeeded,
791 files_failed: failed,
792 files_skipped: skipped,
793 elapsed_ms: started.elapsed().as_millis() as u64,
794 })?;
795 return Err(AppError::Validation(format!(
796 "ingest aborted on first failure: {err_clone}"
797 )));
798 }
799 continue;
800 }
801 };
802
803 let stage_result = staged[idx]
805 .lock()
806 .expect("staged slot poisoned")
807 .take()
808 .expect("staged slot empty for Process slot");
809
810 let outcome = stage_result
811 .and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
812
813 match outcome {
814 Ok(FileSuccess { memory_id, action }) => {
815 output::emit_json_compact(&IngestFileEvent {
816 file: &fs.file_str,
817 name: &fs.derived_name,
818 status: "indexed",
819 truncated: fs.name_truncated,
820 original_name: fs.original_name.clone(),
821 error: None,
822 memory_id: Some(memory_id),
823 action: Some(action),
824 })?;
825 succeeded += 1;
826 }
827 Err(e) => {
828 let err_msg = format!("{e}");
829 output::emit_json_compact(&IngestFileEvent {
830 file: &fs.file_str,
831 name: &fs.derived_name,
832 status: "failed",
833 truncated: fs.name_truncated,
834 original_name: fs.original_name.clone(),
835 error: Some(err_msg.clone()),
836 memory_id: None,
837 action: None,
838 })?;
839 failed += 1;
840 if fail_fast {
841 output::emit_json_compact(&IngestSummary {
842 summary: true,
843 dir: args.dir.display().to_string(),
844 pattern: args.pattern.clone(),
845 recursive: args.recursive,
846 files_total: total,
847 files_succeeded: succeeded,
848 files_failed: failed,
849 files_skipped: skipped,
850 elapsed_ms: started.elapsed().as_millis() as u64,
851 })?;
852 return Err(AppError::Validation(format!(
853 "ingest aborted on first failure: {err_msg}"
854 )));
855 }
856 }
857 }
858 }
859 }
860 }
861
862 output::emit_json_compact(&IngestSummary {
863 summary: true,
864 dir: args.dir.display().to_string(),
865 pattern: args.pattern.clone(),
866 recursive: args.recursive,
867 files_total: total,
868 files_succeeded: succeeded,
869 files_failed: failed,
870 files_skipped: skipped,
871 elapsed_ms: started.elapsed().as_millis() as u64,
872 })?;
873
874 Ok(())
875}
876
877fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
883 ensure_db_ready(paths)?;
884 let conn = open_rw(&paths.db)?;
885 Ok(conn)
886}
887
888fn is_valid_entity_type(entity_type: &str) -> bool {
889 matches!(
890 entity_type,
891 "project"
892 | "tool"
893 | "person"
894 | "file"
895 | "concept"
896 | "incident"
897 | "decision"
898 | "memory"
899 | "dashboard"
900 | "issue_tracker"
901 | "organization"
902 | "location"
903 | "date"
904 )
905}
906
907fn is_valid_relation(relation: &str) -> bool {
908 matches!(
909 relation,
910 "applies_to"
911 | "uses"
912 | "depends_on"
913 | "causes"
914 | "fixes"
915 | "contradicts"
916 | "supports"
917 | "follows"
918 | "related"
919 | "mentions"
920 | "replaces"
921 | "tracked_in"
922 )
923}
924
925fn collect_files(
926 dir: &Path,
927 pattern: &str,
928 recursive: bool,
929 out: &mut Vec<PathBuf>,
930) -> Result<(), AppError> {
931 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
932 for entry in entries {
933 let entry = entry.map_err(AppError::Io)?;
934 let path = entry.path();
935 let file_type = entry.file_type().map_err(AppError::Io)?;
936 if file_type.is_file() {
937 let name = entry.file_name();
938 let name_str = name.to_string_lossy();
939 if matches_pattern(&name_str, pattern) {
940 out.push(path);
941 }
942 } else if file_type.is_dir() && recursive {
943 collect_files(&path, pattern, recursive, out)?;
944 }
945 }
946 Ok(())
947}
948
949fn matches_pattern(name: &str, pattern: &str) -> bool {
950 if let Some(suffix) = pattern.strip_prefix('*') {
951 name.ends_with(suffix)
952 } else if let Some(prefix) = pattern.strip_suffix('*') {
953 name.starts_with(prefix)
954 } else {
955 name == pattern
956 }
957}
958
959fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
970 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
971 let lowered: String = stem
972 .nfd()
973 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
974 .map(|c| {
975 if c == '_' || c.is_whitespace() {
976 '-'
977 } else {
978 c
979 }
980 })
981 .map(|c| c.to_ascii_lowercase())
982 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
983 .collect();
984 let collapsed = collapse_dashes(&lowered);
985 let trimmed = collapsed.trim_matches('-').to_string();
986 if trimmed.len() > DERIVED_NAME_MAX_LEN {
987 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
988 .trim_matches('-')
989 .to_string();
990 tracing::warn!(
993 target: "ingest",
994 original = %trimmed,
995 truncated_to = %truncated,
996 max_len = DERIVED_NAME_MAX_LEN,
997 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
998 );
999 (truncated, true, Some(trimmed))
1000 } else {
1001 (trimmed, false, None)
1002 }
1003}
1004
1005fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1018 if !taken.contains(base) {
1019 return Ok(base.to_string());
1020 }
1021 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1022 let candidate = format!("{base}-{suffix}");
1023 if !taken.contains(&candidate) {
1024 tracing::warn!(
1025 target: "ingest",
1026 base = %base,
1027 resolved = %candidate,
1028 suffix,
1029 "memory name collision resolved with numeric suffix"
1030 );
1031 return Ok(candidate);
1032 }
1033 }
1034 Err(AppError::Validation(format!(
1035 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1036 )))
1037}
1038
1039fn collapse_dashes(s: &str) -> String {
1040 let mut out = String::with_capacity(s.len());
1041 let mut prev_dash = false;
1042 for c in s.chars() {
1043 if c == '-' {
1044 if !prev_dash {
1045 out.push('-');
1046 }
1047 prev_dash = true;
1048 } else {
1049 out.push(c);
1050 prev_dash = false;
1051 }
1052 }
1053 out
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059 use std::path::PathBuf;
1060
1061 #[test]
1062 fn matches_pattern_suffix() {
1063 assert!(matches_pattern("foo.md", "*.md"));
1064 assert!(!matches_pattern("foo.txt", "*.md"));
1065 assert!(matches_pattern("foo.md", "*"));
1066 }
1067
1068 #[test]
1069 fn matches_pattern_prefix() {
1070 assert!(matches_pattern("README.md", "README*"));
1071 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1072 }
1073
1074 #[test]
1075 fn matches_pattern_exact() {
1076 assert!(matches_pattern("README.md", "README.md"));
1077 assert!(!matches_pattern("readme.md", "README.md"));
1078 }
1079
1080 #[test]
1081 fn derive_kebab_underscore_to_dash() {
1082 let p = PathBuf::from("/tmp/claude_code_headless.md");
1083 let (name, truncated, original) = derive_kebab_name(&p);
1084 assert_eq!(name, "claude-code-headless");
1085 assert!(!truncated);
1086 assert!(original.is_none());
1087 }
1088
1089 #[test]
1090 fn derive_kebab_uppercase_lowered() {
1091 let p = PathBuf::from("/tmp/README.md");
1092 let (name, truncated, original) = derive_kebab_name(&p);
1093 assert_eq!(name, "readme");
1094 assert!(!truncated);
1095 assert!(original.is_none());
1096 }
1097
1098 #[test]
1099 fn derive_kebab_strips_non_kebab_chars() {
1100 let p = PathBuf::from("/tmp/some@weird#name!.md");
1101 let (name, truncated, original) = derive_kebab_name(&p);
1102 assert_eq!(name, "someweirdname");
1103 assert!(!truncated);
1104 assert!(original.is_none());
1105 }
1106
1107 #[test]
1110 fn derive_kebab_folds_accented_letters_to_ascii() {
1111 let p = PathBuf::from("/tmp/açaí.md");
1112 let (name, _, _) = derive_kebab_name(&p);
1113 assert_eq!(name, "acai", "got '{name}'");
1114 }
1115
1116 #[test]
1117 fn derive_kebab_handles_naive_with_diaeresis() {
1118 let p = PathBuf::from("/tmp/naïve-test.md");
1119 let (name, _, _) = derive_kebab_name(&p);
1120 assert_eq!(name, "naive-test", "got '{name}'");
1121 }
1122
1123 #[test]
1124 fn derive_kebab_drops_emoji_keeps_word() {
1125 let p = PathBuf::from("/tmp/🚀-rocket.md");
1126 let (name, _, _) = derive_kebab_name(&p);
1127 assert_eq!(name, "rocket", "got '{name}'");
1128 }
1129
1130 #[test]
1131 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1132 let p = PathBuf::from("/tmp/açaí🦜.md");
1133 let (name, _, _) = derive_kebab_name(&p);
1134 assert_eq!(name, "acai", "got '{name}'");
1135 }
1136
1137 #[test]
1138 fn derive_kebab_pure_emoji_yields_empty() {
1139 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1140 let (name, _, _) = derive_kebab_name(&p);
1141 assert!(name.is_empty(), "got '{name}'");
1142 }
1143
1144 #[test]
1145 fn derive_kebab_collapses_consecutive_dashes() {
1146 let p = PathBuf::from("/tmp/a__b___c.md");
1147 let (name, truncated, original) = derive_kebab_name(&p);
1148 assert_eq!(name, "a-b-c");
1149 assert!(!truncated);
1150 assert!(original.is_none());
1151 }
1152
1153 #[test]
1154 fn derive_kebab_truncates_to_60_chars() {
1155 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1156 let (name, truncated, original) = derive_kebab_name(&p);
1157 assert!(name.len() <= 60, "got len {}", name.len());
1158 assert!(truncated);
1159 assert!(original.is_some());
1160 assert!(original.unwrap().len() > 60);
1161 }
1162
1163 #[test]
1164 fn collect_files_finds_md_files() {
1165 let tmp = tempfile::tempdir().expect("tempdir");
1166 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1167 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1168 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1169 let mut out = Vec::new();
1170 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1171 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1172 }
1173
1174 #[test]
1175 fn collect_files_recursive_descends_subdirs() {
1176 let tmp = tempfile::tempdir().expect("tempdir");
1177 let sub = tmp.path().join("sub");
1178 std::fs::create_dir(&sub).unwrap();
1179 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1180 std::fs::write(sub.join("b.md"), "y").unwrap();
1181 let mut out = Vec::new();
1182 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1183 assert_eq!(out.len(), 2);
1184 }
1185
1186 #[test]
1187 fn collect_files_non_recursive_skips_subdirs() {
1188 let tmp = tempfile::tempdir().expect("tempdir");
1189 let sub = tmp.path().join("sub");
1190 std::fs::create_dir(&sub).unwrap();
1191 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1192 std::fs::write(sub.join("b.md"), "y").unwrap();
1193 let mut out = Vec::new();
1194 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1195 assert_eq!(out.len(), 1);
1196 }
1197
1198 #[test]
1201 fn derive_kebab_long_basename_truncated_within_cap() {
1202 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1203 let (name, truncated, original) = derive_kebab_name(&p);
1204 assert!(
1205 name.len() <= DERIVED_NAME_MAX_LEN,
1206 "truncated name must respect cap; got {} chars",
1207 name.len()
1208 );
1209 assert!(!name.is_empty());
1210 assert!(truncated);
1211 assert!(original.is_some());
1212 }
1213
1214 #[test]
1215 fn unique_name_returns_base_when_free() {
1216 let taken: BTreeSet<String> = BTreeSet::new();
1217 let resolved = unique_name("note", &taken).expect("must resolve");
1218 assert_eq!(resolved, "note");
1219 }
1220
1221 #[test]
1222 fn unique_name_appends_first_free_suffix_on_collision() {
1223 let mut taken: BTreeSet<String> = BTreeSet::new();
1224 taken.insert("note".to_string());
1225 taken.insert("note-1".to_string());
1226 let resolved = unique_name("note", &taken).expect("must resolve");
1227 assert_eq!(resolved, "note-2");
1228 }
1229
1230 #[test]
1231 fn unique_name_errors_after_collision_cap() {
1232 let mut taken: BTreeSet<String> = BTreeSet::new();
1233 taken.insert("note".to_string());
1234 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1235 taken.insert(format!("note-{i}"));
1236 }
1237 let err = unique_name("note", &taken).expect_err("must surface error");
1238 assert!(matches!(err, AppError::Validation(_)));
1239 }
1240
1241 #[test]
1244 fn is_valid_entity_type_accepts_v008_types() {
1245 assert!(is_valid_entity_type("organization"));
1246 assert!(is_valid_entity_type("location"));
1247 assert!(is_valid_entity_type("date"));
1248 assert!(!is_valid_entity_type("unknown"));
1249 }
1250
1251 #[test]
1252 fn is_valid_relation_accepts_canonical_relations() {
1253 assert!(is_valid_relation("applies_to"));
1254 assert!(is_valid_relation("depends_on"));
1255 assert!(!is_valid_relation("foo_bar"));
1256 }
1257
1258 use serial_test::serial;
1261
1262 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1264 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1265 let prev = std::env::var(key).ok();
1266 match value {
1267 Some(v) => std::env::set_var(key, v),
1268 None => std::env::remove_var(key),
1269 }
1270 f();
1271 match prev {
1272 Some(p) => std::env::set_var(key, p),
1273 None => std::env::remove_var(key),
1274 }
1275 }
1276
1277 #[test]
1278 #[serial]
1279 fn env_low_memory_enabled_unset_returns_false() {
1280 with_env_var(None, || assert!(!env_low_memory_enabled()));
1281 }
1282
1283 #[test]
1284 #[serial]
1285 fn env_low_memory_enabled_empty_returns_false() {
1286 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1287 }
1288
1289 #[test]
1290 #[serial]
1291 fn env_low_memory_enabled_truthy_values_return_true() {
1292 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1293 with_env_var(Some(v), || {
1294 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1295 });
1296 }
1297 }
1298
1299 #[test]
1300 #[serial]
1301 fn env_low_memory_enabled_falsy_values_return_false() {
1302 for v in ["0", "false", "FALSE", "no", "off"] {
1303 with_env_var(Some(v), || {
1304 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1305 });
1306 }
1307 }
1308
1309 #[test]
1310 #[serial]
1311 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1312 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1313 }
1314
1315 #[test]
1316 #[serial]
1317 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1318 with_env_var(None, || {
1319 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1320 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1321 assert_eq!(resolve_parallelism(true, None), 1);
1322 });
1323 }
1324
1325 #[test]
1326 #[serial]
1327 fn resolve_parallelism_env_forces_one_when_flag_off() {
1328 with_env_var(Some("1"), || {
1329 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1330 assert_eq!(resolve_parallelism(false, None), 1);
1331 });
1332 }
1333
1334 #[test]
1335 #[serial]
1336 fn resolve_parallelism_falsy_env_does_not_override() {
1337 with_env_var(Some("0"), || {
1338 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1339 });
1340 }
1341
1342 #[test]
1343 #[serial]
1344 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1345 with_env_var(None, || {
1346 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1347 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1348 });
1349 }
1350
1351 #[test]
1352 #[serial]
1353 fn resolve_parallelism_default_when_unset() {
1354 with_env_var(None, || {
1355 let p = resolve_parallelism(false, None);
1356 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1357 });
1358 }
1359
1360 #[test]
1361 fn ingest_args_parses_low_memory_flag_via_clap() {
1362 use clap::Parser;
1363 let cli = crate::cli::Cli::try_parse_from([
1366 "sqlite-graphrag",
1367 "ingest",
1368 "/tmp/dummy",
1369 "--type",
1370 "document",
1371 "--low-memory",
1372 ])
1373 .expect("parse must succeed");
1374 match cli.command {
1375 crate::cli::Commands::Ingest(args) => {
1376 assert!(args.low_memory, "--low-memory must set field to true");
1377 }
1378 _ => panic!("expected Ingest subcommand"),
1379 }
1380 }
1381
1382 #[test]
1383 fn ingest_args_low_memory_defaults_false() {
1384 use clap::Parser;
1385 let cli = crate::cli::Cli::try_parse_from([
1386 "sqlite-graphrag",
1387 "ingest",
1388 "/tmp/dummy",
1389 "--type",
1390 "document",
1391 ])
1392 .expect("parse must succeed");
1393 match cli.command {
1394 crate::cli::Commands::Ingest(args) => {
1395 assert!(!args.low_memory, "default must be false");
1396 }
1397 _ => panic!("expected Ingest subcommand"),
1398 }
1399 }
1400}