1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable BERT NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(long, env = "SQLITE_GRAPHRAG_ENABLE_NER", default_value_t = false)]
96 pub enable_ner: bool,
97
98 #[arg(long, default_value_t = false, hide = true)]
100 pub skip_extraction: bool,
101
102 #[arg(long, default_value_t = false)]
104 pub fail_fast: bool,
105
106 #[arg(long, default_value_t = 10_000)]
108 pub max_files: usize,
109
110 #[arg(long)]
112 pub namespace: Option<String>,
113
114 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
116 pub db: Option<String>,
117
118 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
119 pub format: JsonOutputFormat,
120
121 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
122 pub json: bool,
123
124 #[arg(
126 long,
127 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
128 )]
129 pub ingest_parallelism: Option<usize>,
130
131 #[arg(
139 long,
140 default_value_t = false,
141 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
142 Recommended for environments with <4 GB available RAM or container/cgroup \
143 constraints. Trade-off: 3-4x longer wall time. Also honored via \
144 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
145 )]
146 pub low_memory: bool,
147}
148
149fn env_low_memory_enabled() -> bool {
154 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
155 Ok(v) if v.is_empty() => false,
156 Ok(v) => match v.to_lowercase().as_str() {
157 "1" | "true" | "yes" | "on" => true,
158 "0" | "false" | "no" | "off" => false,
159 other => {
160 tracing::warn!(
161 target: "ingest",
162 value = %other,
163 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
164 );
165 false
166 }
167 },
168 Err(_) => false,
169 }
170}
171
172fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
184 let env_flag = env_low_memory_enabled();
185 let low_memory = low_memory_flag || env_flag;
186
187 if low_memory {
188 if let Some(n) = ingest_parallelism {
189 if n > 1 {
190 tracing::warn!(
191 target: "ingest",
192 requested = n,
193 "--ingest-parallelism overridden by --low-memory; using 1"
194 );
195 }
196 }
197 if low_memory_flag {
198 tracing::info!(
199 target: "ingest",
200 source = "flag",
201 "low-memory mode enabled: forcing --ingest-parallelism 1"
202 );
203 } else {
204 tracing::info!(
205 target: "ingest",
206 source = "env",
207 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
208 );
209 }
210 return 1;
211 }
212
213 ingest_parallelism
214 .unwrap_or_else(|| {
215 std::thread::available_parallelism()
216 .map(|v| v.get() / 2)
217 .unwrap_or(1)
218 .clamp(1, 4)
219 })
220 .max(1)
221}
222
223#[derive(Serialize)]
224struct IngestFileEvent<'a> {
225 file: &'a str,
226 name: &'a str,
227 status: &'a str,
228 truncated: bool,
230 #[serde(skip_serializing_if = "Option::is_none")]
232 original_name: Option<String>,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 error: Option<String>,
235 #[serde(skip_serializing_if = "Option::is_none")]
236 memory_id: Option<i64>,
237 #[serde(skip_serializing_if = "Option::is_none")]
238 action: Option<String>,
239}
240
241#[derive(Serialize)]
242struct IngestSummary {
243 summary: bool,
244 dir: String,
245 pattern: String,
246 recursive: bool,
247 files_total: usize,
248 files_succeeded: usize,
249 files_failed: usize,
250 files_skipped: usize,
251 elapsed_ms: u64,
252}
253
254struct FileSuccess {
256 memory_id: i64,
257 action: String,
258}
259
260#[derive(Serialize)]
263struct StageProgressEvent<'a> {
264 schema_version: u8,
265 event: &'a str,
266 path: &'a str,
267 ms: u64,
268 entities: usize,
269 relationships: usize,
270}
271
272struct StagedFile {
275 body: String,
276 body_hash: String,
277 snippet: String,
278 name: String,
279 description: String,
280 embedding: Vec<f32>,
281 chunk_embeddings: Option<Vec<Vec<f32>>>,
282 chunks_info: Vec<crate::chunking::Chunk>,
283 entities: Vec<NewEntity>,
284 relationships: Vec<NewRelationship>,
285 entity_embeddings: Vec<Vec<f32>>,
286 urls: Vec<crate::extraction::ExtractedUrl>,
287}
288
289fn stage_file(
292 _idx: usize,
293 path: &Path,
294 name: &str,
295 paths: &AppPaths,
296 enable_ner: bool,
297) -> Result<StagedFile, AppError> {
298 use crate::constants::*;
299
300 if name.len() > MAX_MEMORY_NAME_LEN {
301 return Err(AppError::LimitExceeded(
302 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
303 ));
304 }
305 if name.starts_with("__") {
306 return Err(AppError::Validation(
307 crate::i18n::validation::reserved_name(),
308 ));
309 }
310 {
311 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
312 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
313 if !slug_re.is_match(name) {
314 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
315 name,
316 )));
317 }
318 }
319
320 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
321 if raw_body.len() > MAX_MEMORY_BODY_LEN {
322 return Err(AppError::LimitExceeded(
323 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
324 ));
325 }
326 if raw_body.trim().is_empty() {
327 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
328 }
329
330 let description = format!("ingested from {}", path.display());
331 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
332 return Err(AppError::Validation(
333 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
334 ));
335 }
336
337 let mut extracted_entities: Vec<NewEntity> = Vec::new();
338 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
339 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
340 if enable_ner {
341 match crate::extraction::extract_graph_auto(&raw_body, paths) {
342 Ok(extracted) => {
343 extracted_urls = extracted.urls;
344 extracted_entities = extracted.entities;
345 extracted_relationships = extracted.relationships;
346
347 if extracted_entities.len() > max_entities_per_memory() {
348 extracted_entities.truncate(max_entities_per_memory());
349 }
350 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
351 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
352 }
353 }
354 Err(e) => {
355 tracing::warn!(
356 file = %path.display(),
357 "auto-extraction failed (graceful degradation): {e:#}"
358 );
359 }
360 }
361 }
362
363 for rel in &mut extracted_relationships {
364 rel.relation = rel.relation.replace('-', "_");
365 if !is_valid_relation(&rel.relation) {
366 return Err(AppError::Validation(format!(
367 "invalid relation '{}' for relationship '{}' -> '{}'",
368 rel.relation, rel.source, rel.target
369 )));
370 }
371 if !(0.0..=1.0).contains(&rel.strength) {
372 return Err(AppError::Validation(format!(
373 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
374 rel.strength, rel.source, rel.target
375 )));
376 }
377 }
378
379 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
380 let snippet: String = raw_body.chars().take(200).collect();
381
382 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
383 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
384 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
385 return Err(AppError::LimitExceeded(format!(
386 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
387 chunks_info.len(),
388 REMEMBER_MAX_SAFE_MULTI_CHUNKS
389 )));
390 }
391
392 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
393 let embedding = if chunks_info.len() == 1 {
394 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
395 } else {
396 let chunk_texts: Vec<&str> = chunks_info
397 .iter()
398 .map(|c| chunking::chunk_text(&raw_body, c))
399 .collect();
400 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
401 for chunk_text in &chunk_texts {
402 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
403 &paths.models,
404 chunk_text,
405 )?);
406 }
407 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
408 chunk_embeddings_opt = Some(chunk_embeddings);
409 aggregated
410 };
411
412 let entity_embeddings = extracted_entities
413 .iter()
414 .map(|entity| {
415 let entity_text = match &entity.description {
416 Some(desc) => format!("{} {}", entity.name, desc),
417 None => entity.name.clone(),
418 };
419 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
420 })
421 .collect::<Result<Vec<_>, _>>()?;
422
423 Ok(StagedFile {
424 body: raw_body,
425 body_hash,
426 snippet,
427 name: name.to_string(),
428 description,
429 embedding,
430 chunk_embeddings: chunk_embeddings_opt,
431 chunks_info,
432 entities: extracted_entities,
433 relationships: extracted_relationships,
434 entity_embeddings,
435 urls: extracted_urls,
436 })
437}
438
439fn persist_staged(
441 conn: &mut Connection,
442 namespace: &str,
443 memory_type: &str,
444 staged: StagedFile,
445) -> Result<FileSuccess, AppError> {
446 {
447 let active_count: u32 = conn.query_row(
448 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
449 [],
450 |r| r.get::<_, i64>(0).map(|v| v as u32),
451 )?;
452 let ns_exists: bool = conn.query_row(
453 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
454 rusqlite::params![namespace],
455 |r| r.get::<_, i64>(0).map(|v| v > 0),
456 )?;
457 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
458 return Err(AppError::NamespaceError(format!(
459 "active namespace limit of {} exceeded while creating '{namespace}'",
460 crate::constants::MAX_NAMESPACES_ACTIVE
461 )));
462 }
463 }
464
465 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
466 if existing_memory.is_some() {
467 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
468 &staged.name,
469 namespace,
470 )));
471 }
472 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
473
474 let new_memory = NewMemory {
475 namespace: namespace.to_string(),
476 name: staged.name.clone(),
477 memory_type: memory_type.to_string(),
478 description: staged.description.clone(),
479 body: staged.body,
480 body_hash: staged.body_hash,
481 session_id: None,
482 source: "agent".to_string(),
483 metadata: serde_json::json!({}),
484 };
485
486 if let Some(hash_id) = duplicate_hash_id {
487 tracing::debug!(
488 target: "ingest",
489 duplicate_memory_id = hash_id,
490 "identical body already exists; persisting a new memory anyway"
491 );
492 }
493
494 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
495
496 let memory_id = memories::insert(&tx, &new_memory)?;
497 versions::insert_version(
498 &tx,
499 memory_id,
500 1,
501 &staged.name,
502 memory_type,
503 &staged.description,
504 &new_memory.body,
505 &serde_json::to_string(&new_memory.metadata)?,
506 None,
507 "create",
508 )?;
509 memories::upsert_vec(
510 &tx,
511 memory_id,
512 namespace,
513 memory_type,
514 &staged.embedding,
515 &staged.name,
516 &staged.snippet,
517 )?;
518
519 if staged.chunks_info.len() > 1 {
520 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
521 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
522 AppError::Internal(anyhow::anyhow!(
523 "missing chunk embeddings cache on multi-chunk ingest path"
524 ))
525 })?;
526 for (i, emb) in chunk_embeddings.iter().enumerate() {
527 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
528 }
529 }
530
531 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
532 for (idx, entity) in staged.entities.iter().enumerate() {
533 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
534 let entity_embedding = &staged.entity_embeddings[idx];
535 entities::upsert_entity_vec(
536 &tx,
537 entity_id,
538 namespace,
539 entity.entity_type,
540 entity_embedding,
541 &entity.name,
542 )?;
543 entities::link_memory_entity(&tx, memory_id, entity_id)?;
544 entities::increment_degree(&tx, entity_id)?;
545 }
546 let entity_types: std::collections::HashMap<&str, EntityType> = staged
547 .entities
548 .iter()
549 .map(|entity| (entity.name.as_str(), entity.entity_type))
550 .collect();
551 for rel in &staged.relationships {
552 let source_entity = NewEntity {
553 name: rel.source.clone(),
554 entity_type: entity_types
555 .get(rel.source.as_str())
556 .copied()
557 .unwrap_or(EntityType::Concept),
558 description: None,
559 };
560 let target_entity = NewEntity {
561 name: rel.target.clone(),
562 entity_type: entity_types
563 .get(rel.target.as_str())
564 .copied()
565 .unwrap_or(EntityType::Concept),
566 description: None,
567 };
568 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
569 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
570 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
571 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
572 }
573 }
574
575 tx.commit()?;
576
577 if !staged.urls.is_empty() {
578 let url_entries: Vec<storage_urls::MemoryUrl> = staged
579 .urls
580 .into_iter()
581 .map(|u| storage_urls::MemoryUrl {
582 url: u.url,
583 offset: Some(u.offset as i64),
584 })
585 .collect();
586 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
587 }
588
589 Ok(FileSuccess {
590 memory_id,
591 action: "created".to_string(),
592 })
593}
594
595pub fn run(args: IngestArgs) -> Result<(), AppError> {
596 let started = std::time::Instant::now();
597
598 if !args.dir.exists() {
599 return Err(AppError::NotFound(format!(
600 "directory not found: {}",
601 args.dir.display()
602 )));
603 }
604 if !args.dir.is_dir() {
605 return Err(AppError::Validation(format!(
606 "path is not a directory: {}",
607 args.dir.display()
608 )));
609 }
610
611 let mut files: Vec<PathBuf> = Vec::new();
612 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
613 files.sort();
614
615 if files.len() > args.max_files {
616 return Err(AppError::Validation(format!(
617 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
618 files.len(),
619 args.max_files
620 )));
621 }
622
623 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
624 let memory_type_str = args.r#type.as_str().to_string();
625
626 let paths = AppPaths::resolve(args.db.as_deref())?;
627 let mut conn_or_err = match init_storage(&paths) {
628 Ok(c) => Ok(c),
629 Err(e) => Err(format!("{e}")),
630 };
631
632 let mut succeeded: usize = 0;
633 let mut failed: usize = 0;
634 let mut skipped: usize = 0;
635 let total = files.len();
636
637 let mut taken_names: BTreeSet<String> = BTreeSet::new();
640
641 enum SlotMeta {
647 Skip {
648 file_str: String,
649 derived_base: String,
650 name_truncated: bool,
651 original_name: Option<String>,
652 reason: String,
653 },
654 Process {
655 file_str: String,
656 derived_name: String,
657 name_truncated: bool,
658 original_name: Option<String>,
659 },
660 }
661
662 struct ProcessItem {
663 idx: usize,
664 path: PathBuf,
665 file_str: String,
666 derived_name: String,
667 }
668
669 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
670 let mut process_items: Vec<ProcessItem> = Vec::new();
671 let mut truncations: Vec<(String, String)> = Vec::new();
672
673 for path in &files {
674 let file_str = path.to_string_lossy().into_owned();
675 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
676
677 if name_truncated {
678 if let Some(ref orig) = original_name {
679 truncations.push((orig.clone(), derived_base.clone()));
680 }
681 }
682
683 if derived_base.is_empty() {
684 slots_meta.push(SlotMeta::Skip {
685 file_str,
686 derived_base: String::new(),
687 name_truncated: false,
688 original_name: None,
689 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
690 });
691 continue;
692 }
693
694 match unique_name(&derived_base, &taken_names) {
695 Ok(derived_name) => {
696 taken_names.insert(derived_name.clone());
697 let idx = slots_meta.len();
698 process_items.push(ProcessItem {
699 idx,
700 path: path.clone(),
701 file_str: file_str.clone(),
702 derived_name: derived_name.clone(),
703 });
704 slots_meta.push(SlotMeta::Process {
705 file_str,
706 derived_name,
707 name_truncated,
708 original_name,
709 });
710 }
711 Err(e) => {
712 slots_meta.push(SlotMeta::Skip {
713 file_str,
714 derived_base,
715 name_truncated,
716 original_name,
717 reason: e.to_string(),
718 });
719 }
720 }
721 }
722
723 if !truncations.is_empty() {
724 tracing::info!(
725 target: "ingest",
726 count = truncations.len(),
727 max_len = DERIVED_NAME_MAX_LEN,
728 "derived names truncated; pass -vv (debug) for per-file detail"
729 );
730 }
731
732 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
735
736 let pool = rayon::ThreadPoolBuilder::new()
737 .num_threads(parallelism)
738 .build()
739 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
740
741 let enable_ner = args.enable_ner;
742
743 let total_to_process = process_items.len();
744 tracing::info!(
745 target = "ingest",
746 phase = "pipeline_start",
747 files = total_to_process,
748 ingest_parallelism = parallelism,
749 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
750 );
751
752 let channel_bound = (parallelism * 2).max(1);
756 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
757
758 let paths_owned = paths.clone();
763 let producer_handle = std::thread::spawn(move || {
764 pool.install(|| {
765 process_items.into_par_iter().for_each(|item| {
766 let t0 = std::time::Instant::now();
767 let result = stage_file(
768 item.idx,
769 &item.path,
770 &item.derived_name,
771 &paths_owned,
772 enable_ner,
773 );
774 let elapsed_ms = t0.elapsed().as_millis() as u64;
775
776 let (n_entities, n_relationships) = match &result {
779 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
780 Err(_) => (0, 0),
781 };
782 let progress = StageProgressEvent {
783 schema_version: 1,
784 event: "file_extracted",
785 path: &item.file_str,
786 ms: elapsed_ms,
787 entities: n_entities,
788 relationships: n_relationships,
789 };
790 if let Ok(line) = serde_json::to_string(&progress) {
791 eprintln!("{line}");
792 }
793
794 let _ = tx.send((item.idx, result));
798 });
799 drop(tx);
801 });
802 });
803
804 let fail_fast = args.fail_fast;
816
817 for meta in &slots_meta {
819 if let SlotMeta::Skip {
820 file_str,
821 derived_base,
822 name_truncated,
823 original_name,
824 reason,
825 } = meta
826 {
827 output::emit_json_compact(&IngestFileEvent {
828 file: file_str,
829 name: derived_base,
830 status: "skipped",
831 truncated: *name_truncated,
832 original_name: original_name.clone(),
833 error: Some(reason.clone()),
834 memory_id: None,
835 action: None,
836 })?;
837 skipped += 1;
838 }
839 }
840
841 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
844 .iter()
845 .enumerate()
846 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
847 .collect();
848
849 tracing::info!(
850 target = "ingest",
851 phase = "persist_start",
852 files = total_to_process,
853 "phase B starting: persisting files incrementally as Phase A completes each one",
854 );
855
856 for (idx, stage_result) in rx {
860 let meta = meta_index.get(&idx).ok_or_else(|| {
861 AppError::Internal(anyhow::anyhow!(
862 "channel idx {idx} has no corresponding Process slot"
863 ))
864 })?;
865 let (file_str, derived_name, name_truncated, original_name) = match meta {
866 SlotMeta::Process {
867 file_str,
868 derived_name,
869 name_truncated,
870 original_name,
871 } => (file_str, derived_name, name_truncated, original_name),
872 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
873 };
874
875 let conn = match conn_or_err.as_mut() {
877 Ok(c) => c,
878 Err(err_msg) => {
879 let err_clone = err_msg.clone();
880 output::emit_json_compact(&IngestFileEvent {
881 file: file_str,
882 name: derived_name,
883 status: "failed",
884 truncated: *name_truncated,
885 original_name: original_name.clone(),
886 error: Some(err_clone.clone()),
887 memory_id: None,
888 action: None,
889 })?;
890 failed += 1;
891 if fail_fast {
892 output::emit_json_compact(&IngestSummary {
893 summary: true,
894 dir: args.dir.display().to_string(),
895 pattern: args.pattern.clone(),
896 recursive: args.recursive,
897 files_total: total,
898 files_succeeded: succeeded,
899 files_failed: failed,
900 files_skipped: skipped,
901 elapsed_ms: started.elapsed().as_millis() as u64,
902 })?;
903 return Err(AppError::Validation(format!(
904 "ingest aborted on first failure: {err_clone}"
905 )));
906 }
907 continue;
908 }
909 };
910
911 let outcome =
912 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
913
914 match outcome {
915 Ok(FileSuccess { memory_id, action }) => {
916 output::emit_json_compact(&IngestFileEvent {
917 file: file_str,
918 name: derived_name,
919 status: "indexed",
920 truncated: *name_truncated,
921 original_name: original_name.clone(),
922 error: None,
923 memory_id: Some(memory_id),
924 action: Some(action),
925 })?;
926 succeeded += 1;
927 }
928 Err(e) => {
929 let err_msg = format!("{e}");
930 output::emit_json_compact(&IngestFileEvent {
931 file: file_str,
932 name: derived_name,
933 status: "failed",
934 truncated: *name_truncated,
935 original_name: original_name.clone(),
936 error: Some(err_msg.clone()),
937 memory_id: None,
938 action: None,
939 })?;
940 failed += 1;
941 if fail_fast {
942 output::emit_json_compact(&IngestSummary {
943 summary: true,
944 dir: args.dir.display().to_string(),
945 pattern: args.pattern.clone(),
946 recursive: args.recursive,
947 files_total: total,
948 files_succeeded: succeeded,
949 files_failed: failed,
950 files_skipped: skipped,
951 elapsed_ms: started.elapsed().as_millis() as u64,
952 })?;
953 return Err(AppError::Validation(format!(
954 "ingest aborted on first failure: {err_msg}"
955 )));
956 }
957 }
958 }
959 }
960
961 producer_handle
963 .join()
964 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
965
966 output::emit_json_compact(&IngestSummary {
967 summary: true,
968 dir: args.dir.display().to_string(),
969 pattern: args.pattern.clone(),
970 recursive: args.recursive,
971 files_total: total,
972 files_succeeded: succeeded,
973 files_failed: failed,
974 files_skipped: skipped,
975 elapsed_ms: started.elapsed().as_millis() as u64,
976 })?;
977
978 Ok(())
979}
980
981fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
987 ensure_db_ready(paths)?;
988 let conn = open_rw(&paths.db)?;
989 Ok(conn)
990}
991
992fn is_valid_relation(relation: &str) -> bool {
993 matches!(
994 relation,
995 "applies_to"
996 | "uses"
997 | "depends_on"
998 | "causes"
999 | "fixes"
1000 | "contradicts"
1001 | "supports"
1002 | "follows"
1003 | "related"
1004 | "mentions"
1005 | "replaces"
1006 | "tracked_in"
1007 )
1008}
1009
1010fn collect_files(
1011 dir: &Path,
1012 pattern: &str,
1013 recursive: bool,
1014 out: &mut Vec<PathBuf>,
1015) -> Result<(), AppError> {
1016 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1017 for entry in entries {
1018 let entry = entry.map_err(AppError::Io)?;
1019 let path = entry.path();
1020 let file_type = entry.file_type().map_err(AppError::Io)?;
1021 if file_type.is_file() {
1022 let name = entry.file_name();
1023 let name_str = name.to_string_lossy();
1024 if matches_pattern(&name_str, pattern) {
1025 out.push(path);
1026 }
1027 } else if file_type.is_dir() && recursive {
1028 collect_files(&path, pattern, recursive, out)?;
1029 }
1030 }
1031 Ok(())
1032}
1033
1034fn matches_pattern(name: &str, pattern: &str) -> bool {
1035 if let Some(suffix) = pattern.strip_prefix('*') {
1036 name.ends_with(suffix)
1037 } else if let Some(prefix) = pattern.strip_suffix('*') {
1038 name.starts_with(prefix)
1039 } else {
1040 name == pattern
1041 }
1042}
1043
1044fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1055 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1056 let lowered: String = stem
1057 .nfd()
1058 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1059 .map(|c| {
1060 if c == '_' || c.is_whitespace() {
1061 '-'
1062 } else {
1063 c
1064 }
1065 })
1066 .map(|c| c.to_ascii_lowercase())
1067 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1068 .collect();
1069 let collapsed = collapse_dashes(&lowered);
1070 let trimmed = collapsed.trim_matches('-').to_string();
1071 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1072 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1073 .trim_matches('-')
1074 .to_string();
1075 tracing::debug!(
1076 target: "ingest",
1077 original = %trimmed,
1078 truncated_to = %truncated,
1079 max_len = DERIVED_NAME_MAX_LEN,
1080 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1081 );
1082 (truncated, true, Some(trimmed))
1083 } else {
1084 (trimmed, false, None)
1085 }
1086}
1087
1088fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1101 if !taken.contains(base) {
1102 return Ok(base.to_string());
1103 }
1104 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1105 let candidate = format!("{base}-{suffix}");
1106 if !taken.contains(&candidate) {
1107 tracing::warn!(
1108 target: "ingest",
1109 base = %base,
1110 resolved = %candidate,
1111 suffix,
1112 "memory name collision resolved with numeric suffix"
1113 );
1114 return Ok(candidate);
1115 }
1116 }
1117 Err(AppError::Validation(format!(
1118 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1119 )))
1120}
1121
1122fn collapse_dashes(s: &str) -> String {
1123 let mut out = String::with_capacity(s.len());
1124 let mut prev_dash = false;
1125 for c in s.chars() {
1126 if c == '-' {
1127 if !prev_dash {
1128 out.push('-');
1129 }
1130 prev_dash = true;
1131 } else {
1132 out.push(c);
1133 prev_dash = false;
1134 }
1135 }
1136 out
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141 use super::*;
1142 use std::path::PathBuf;
1143
1144 #[test]
1145 fn matches_pattern_suffix() {
1146 assert!(matches_pattern("foo.md", "*.md"));
1147 assert!(!matches_pattern("foo.txt", "*.md"));
1148 assert!(matches_pattern("foo.md", "*"));
1149 }
1150
1151 #[test]
1152 fn matches_pattern_prefix() {
1153 assert!(matches_pattern("README.md", "README*"));
1154 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1155 }
1156
1157 #[test]
1158 fn matches_pattern_exact() {
1159 assert!(matches_pattern("README.md", "README.md"));
1160 assert!(!matches_pattern("readme.md", "README.md"));
1161 }
1162
1163 #[test]
1164 fn derive_kebab_underscore_to_dash() {
1165 let p = PathBuf::from("/tmp/claude_code_headless.md");
1166 let (name, truncated, original) = derive_kebab_name(&p);
1167 assert_eq!(name, "claude-code-headless");
1168 assert!(!truncated);
1169 assert!(original.is_none());
1170 }
1171
1172 #[test]
1173 fn derive_kebab_uppercase_lowered() {
1174 let p = PathBuf::from("/tmp/README.md");
1175 let (name, truncated, original) = derive_kebab_name(&p);
1176 assert_eq!(name, "readme");
1177 assert!(!truncated);
1178 assert!(original.is_none());
1179 }
1180
1181 #[test]
1182 fn derive_kebab_strips_non_kebab_chars() {
1183 let p = PathBuf::from("/tmp/some@weird#name!.md");
1184 let (name, truncated, original) = derive_kebab_name(&p);
1185 assert_eq!(name, "someweirdname");
1186 assert!(!truncated);
1187 assert!(original.is_none());
1188 }
1189
1190 #[test]
1193 fn derive_kebab_folds_accented_letters_to_ascii() {
1194 let p = PathBuf::from("/tmp/açaí.md");
1195 let (name, _, _) = derive_kebab_name(&p);
1196 assert_eq!(name, "acai", "got '{name}'");
1197 }
1198
1199 #[test]
1200 fn derive_kebab_handles_naive_with_diaeresis() {
1201 let p = PathBuf::from("/tmp/naïve-test.md");
1202 let (name, _, _) = derive_kebab_name(&p);
1203 assert_eq!(name, "naive-test", "got '{name}'");
1204 }
1205
1206 #[test]
1207 fn derive_kebab_drops_emoji_keeps_word() {
1208 let p = PathBuf::from("/tmp/🚀-rocket.md");
1209 let (name, _, _) = derive_kebab_name(&p);
1210 assert_eq!(name, "rocket", "got '{name}'");
1211 }
1212
1213 #[test]
1214 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1215 let p = PathBuf::from("/tmp/açaí🦜.md");
1216 let (name, _, _) = derive_kebab_name(&p);
1217 assert_eq!(name, "acai", "got '{name}'");
1218 }
1219
1220 #[test]
1221 fn derive_kebab_pure_emoji_yields_empty() {
1222 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1223 let (name, _, _) = derive_kebab_name(&p);
1224 assert!(name.is_empty(), "got '{name}'");
1225 }
1226
1227 #[test]
1228 fn derive_kebab_collapses_consecutive_dashes() {
1229 let p = PathBuf::from("/tmp/a__b___c.md");
1230 let (name, truncated, original) = derive_kebab_name(&p);
1231 assert_eq!(name, "a-b-c");
1232 assert!(!truncated);
1233 assert!(original.is_none());
1234 }
1235
1236 #[test]
1237 fn derive_kebab_truncates_to_60_chars() {
1238 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1239 let (name, truncated, original) = derive_kebab_name(&p);
1240 assert!(name.len() <= 60, "got len {}", name.len());
1241 assert!(truncated);
1242 assert!(original.is_some());
1243 assert!(original.unwrap().len() > 60);
1244 }
1245
1246 #[test]
1247 fn collect_files_finds_md_files() {
1248 let tmp = tempfile::tempdir().expect("tempdir");
1249 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1250 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1251 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1252 let mut out = Vec::new();
1253 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1254 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1255 }
1256
1257 #[test]
1258 fn collect_files_recursive_descends_subdirs() {
1259 let tmp = tempfile::tempdir().expect("tempdir");
1260 let sub = tmp.path().join("sub");
1261 std::fs::create_dir(&sub).unwrap();
1262 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1263 std::fs::write(sub.join("b.md"), "y").unwrap();
1264 let mut out = Vec::new();
1265 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1266 assert_eq!(out.len(), 2);
1267 }
1268
1269 #[test]
1270 fn collect_files_non_recursive_skips_subdirs() {
1271 let tmp = tempfile::tempdir().expect("tempdir");
1272 let sub = tmp.path().join("sub");
1273 std::fs::create_dir(&sub).unwrap();
1274 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1275 std::fs::write(sub.join("b.md"), "y").unwrap();
1276 let mut out = Vec::new();
1277 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1278 assert_eq!(out.len(), 1);
1279 }
1280
1281 #[test]
1284 fn derive_kebab_long_basename_truncated_within_cap() {
1285 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1286 let (name, truncated, original) = derive_kebab_name(&p);
1287 assert!(
1288 name.len() <= DERIVED_NAME_MAX_LEN,
1289 "truncated name must respect cap; got {} chars",
1290 name.len()
1291 );
1292 assert!(!name.is_empty());
1293 assert!(truncated);
1294 assert!(original.is_some());
1295 }
1296
1297 #[test]
1298 fn unique_name_returns_base_when_free() {
1299 let taken: BTreeSet<String> = BTreeSet::new();
1300 let resolved = unique_name("note", &taken).expect("must resolve");
1301 assert_eq!(resolved, "note");
1302 }
1303
1304 #[test]
1305 fn unique_name_appends_first_free_suffix_on_collision() {
1306 let mut taken: BTreeSet<String> = BTreeSet::new();
1307 taken.insert("note".to_string());
1308 taken.insert("note-1".to_string());
1309 let resolved = unique_name("note", &taken).expect("must resolve");
1310 assert_eq!(resolved, "note-2");
1311 }
1312
1313 #[test]
1314 fn unique_name_errors_after_collision_cap() {
1315 let mut taken: BTreeSet<String> = BTreeSet::new();
1316 taken.insert("note".to_string());
1317 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1318 taken.insert(format!("note-{i}"));
1319 }
1320 let err = unique_name("note", &taken).expect_err("must surface error");
1321 assert!(matches!(err, AppError::Validation(_)));
1322 }
1323
1324 #[test]
1327 fn is_valid_relation_accepts_canonical_relations() {
1328 assert!(is_valid_relation("applies_to"));
1329 assert!(is_valid_relation("depends_on"));
1330 assert!(!is_valid_relation("foo_bar"));
1331 }
1332
1333 use serial_test::serial;
1336
1337 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1339 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1340 let prev = std::env::var(key).ok();
1341 match value {
1342 Some(v) => std::env::set_var(key, v),
1343 None => std::env::remove_var(key),
1344 }
1345 f();
1346 match prev {
1347 Some(p) => std::env::set_var(key, p),
1348 None => std::env::remove_var(key),
1349 }
1350 }
1351
1352 #[test]
1353 #[serial]
1354 fn env_low_memory_enabled_unset_returns_false() {
1355 with_env_var(None, || assert!(!env_low_memory_enabled()));
1356 }
1357
1358 #[test]
1359 #[serial]
1360 fn env_low_memory_enabled_empty_returns_false() {
1361 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1362 }
1363
1364 #[test]
1365 #[serial]
1366 fn env_low_memory_enabled_truthy_values_return_true() {
1367 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1368 with_env_var(Some(v), || {
1369 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1370 });
1371 }
1372 }
1373
1374 #[test]
1375 #[serial]
1376 fn env_low_memory_enabled_falsy_values_return_false() {
1377 for v in ["0", "false", "FALSE", "no", "off"] {
1378 with_env_var(Some(v), || {
1379 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1380 });
1381 }
1382 }
1383
1384 #[test]
1385 #[serial]
1386 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1387 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1388 }
1389
1390 #[test]
1391 #[serial]
1392 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1393 with_env_var(None, || {
1394 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1395 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1396 assert_eq!(resolve_parallelism(true, None), 1);
1397 });
1398 }
1399
1400 #[test]
1401 #[serial]
1402 fn resolve_parallelism_env_forces_one_when_flag_off() {
1403 with_env_var(Some("1"), || {
1404 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1405 assert_eq!(resolve_parallelism(false, None), 1);
1406 });
1407 }
1408
1409 #[test]
1410 #[serial]
1411 fn resolve_parallelism_falsy_env_does_not_override() {
1412 with_env_var(Some("0"), || {
1413 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1414 });
1415 }
1416
1417 #[test]
1418 #[serial]
1419 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1420 with_env_var(None, || {
1421 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1422 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1423 });
1424 }
1425
1426 #[test]
1427 #[serial]
1428 fn resolve_parallelism_default_when_unset() {
1429 with_env_var(None, || {
1430 let p = resolve_parallelism(false, None);
1431 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1432 });
1433 }
1434
1435 #[test]
1436 fn ingest_args_parses_low_memory_flag_via_clap() {
1437 use clap::Parser;
1438 let cli = crate::cli::Cli::try_parse_from([
1441 "sqlite-graphrag",
1442 "ingest",
1443 "/tmp/dummy",
1444 "--type",
1445 "document",
1446 "--low-memory",
1447 ])
1448 .expect("parse must succeed");
1449 match cli.command {
1450 crate::cli::Commands::Ingest(args) => {
1451 assert!(args.low_memory, "--low-memory must set field to true");
1452 }
1453 _ => panic!("expected Ingest subcommand"),
1454 }
1455 }
1456
1457 #[test]
1458 fn ingest_args_low_memory_defaults_false() {
1459 use clap::Parser;
1460 let cli = crate::cli::Cli::try_parse_from([
1461 "sqlite-graphrag",
1462 "ingest",
1463 "/tmp/dummy",
1464 "--type",
1465 "document",
1466 ])
1467 .expect("parse must succeed");
1468 match cli.command {
1469 crate::cli::Commands::Ingest(args) => {
1470 assert!(!args.low_memory, "default must be false");
1471 }
1472 _ => panic!("expected Ingest subcommand"),
1473 }
1474 }
1475}