1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Skip BERT NER auto-extraction for faster bulk import\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --skip-extraction\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(long, default_value_t = false)]
96 pub skip_extraction: bool,
97
98 #[arg(long, default_value_t = false)]
100 pub fail_fast: bool,
101
102 #[arg(long, default_value_t = 10_000)]
104 pub max_files: usize,
105
106 #[arg(long)]
108 pub namespace: Option<String>,
109
110 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
112 pub db: Option<String>,
113
114 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
115 pub format: JsonOutputFormat,
116
117 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
118 pub json: bool,
119
120 #[arg(
122 long,
123 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
124 )]
125 pub ingest_parallelism: Option<usize>,
126
127 #[arg(
135 long,
136 default_value_t = false,
137 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
138 Recommended for environments with <4 GB available RAM or container/cgroup \
139 constraints. Trade-off: 3-4x longer wall time. Also honored via \
140 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
141 )]
142 pub low_memory: bool,
143}
144
145fn env_low_memory_enabled() -> bool {
150 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
151 Ok(v) if v.is_empty() => false,
152 Ok(v) => match v.to_lowercase().as_str() {
153 "1" | "true" | "yes" | "on" => true,
154 "0" | "false" | "no" | "off" => false,
155 other => {
156 tracing::warn!(
157 target: "ingest",
158 value = %other,
159 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
160 );
161 false
162 }
163 },
164 Err(_) => false,
165 }
166}
167
168fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
180 let env_flag = env_low_memory_enabled();
181 let low_memory = low_memory_flag || env_flag;
182
183 if low_memory {
184 if let Some(n) = ingest_parallelism {
185 if n > 1 {
186 tracing::warn!(
187 target: "ingest",
188 requested = n,
189 "--ingest-parallelism overridden by --low-memory; using 1"
190 );
191 }
192 }
193 if low_memory_flag {
194 tracing::info!(
195 target: "ingest",
196 source = "flag",
197 "low-memory mode enabled: forcing --ingest-parallelism 1"
198 );
199 } else {
200 tracing::info!(
201 target: "ingest",
202 source = "env",
203 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
204 );
205 }
206 return 1;
207 }
208
209 ingest_parallelism
210 .unwrap_or_else(|| {
211 std::thread::available_parallelism()
212 .map(|v| v.get() / 2)
213 .unwrap_or(1)
214 .clamp(1, 4)
215 })
216 .max(1)
217}
218
219#[derive(Serialize)]
220struct IngestFileEvent<'a> {
221 file: &'a str,
222 name: &'a str,
223 status: &'a str,
224 truncated: bool,
226 #[serde(skip_serializing_if = "Option::is_none")]
228 original_name: Option<String>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 error: Option<String>,
231 #[serde(skip_serializing_if = "Option::is_none")]
232 memory_id: Option<i64>,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 action: Option<String>,
235}
236
237#[derive(Serialize)]
238struct IngestSummary {
239 summary: bool,
240 dir: String,
241 pattern: String,
242 recursive: bool,
243 files_total: usize,
244 files_succeeded: usize,
245 files_failed: usize,
246 files_skipped: usize,
247 elapsed_ms: u64,
248}
249
250struct FileSuccess {
252 memory_id: i64,
253 action: String,
254}
255
256#[derive(Serialize)]
259struct StageProgressEvent<'a> {
260 schema_version: u8,
261 event: &'a str,
262 path: &'a str,
263 ms: u64,
264 entities: usize,
265 relationships: usize,
266}
267
268struct StagedFile {
271 body: String,
272 body_hash: String,
273 snippet: String,
274 name: String,
275 description: String,
276 embedding: Vec<f32>,
277 chunk_embeddings: Option<Vec<Vec<f32>>>,
278 chunks_info: Vec<crate::chunking::Chunk>,
279 entities: Vec<NewEntity>,
280 relationships: Vec<NewRelationship>,
281 entity_embeddings: Vec<Vec<f32>>,
282 urls: Vec<crate::extraction::ExtractedUrl>,
283}
284
285fn stage_file(
288 _idx: usize,
289 path: &Path,
290 name: &str,
291 paths: &AppPaths,
292 skip_extraction: bool,
293) -> Result<StagedFile, AppError> {
294 use crate::constants::*;
295
296 if name.len() > MAX_MEMORY_NAME_LEN {
297 return Err(AppError::LimitExceeded(
298 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
299 ));
300 }
301 if name.starts_with("__") {
302 return Err(AppError::Validation(
303 crate::i18n::validation::reserved_name(),
304 ));
305 }
306 {
307 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
308 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
309 if !slug_re.is_match(name) {
310 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
311 name,
312 )));
313 }
314 }
315
316 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
317 if raw_body.len() > MAX_MEMORY_BODY_LEN {
318 return Err(AppError::LimitExceeded(
319 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
320 ));
321 }
322 if raw_body.trim().is_empty() {
323 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
324 }
325
326 let description = format!("ingested from {}", path.display());
327 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
328 return Err(AppError::Validation(
329 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
330 ));
331 }
332
333 let mut extracted_entities: Vec<NewEntity> = Vec::new();
334 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
335 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
336 if !skip_extraction {
337 match crate::extraction::extract_graph_auto(&raw_body, paths) {
338 Ok(extracted) => {
339 extracted_urls = extracted.urls;
340 extracted_entities = extracted.entities;
341 extracted_relationships = extracted.relationships;
342
343 if extracted_entities.len() > max_entities_per_memory() {
344 extracted_entities.truncate(max_entities_per_memory());
345 }
346 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
347 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
348 }
349 }
350 Err(e) => {
351 tracing::warn!(
352 file = %path.display(),
353 "auto-extraction failed (graceful degradation): {e:#}"
354 );
355 }
356 }
357 }
358
359 for rel in &mut extracted_relationships {
360 rel.relation = rel.relation.replace('-', "_");
361 if !is_valid_relation(&rel.relation) {
362 return Err(AppError::Validation(format!(
363 "invalid relation '{}' for relationship '{}' -> '{}'",
364 rel.relation, rel.source, rel.target
365 )));
366 }
367 if !(0.0..=1.0).contains(&rel.strength) {
368 return Err(AppError::Validation(format!(
369 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
370 rel.strength, rel.source, rel.target
371 )));
372 }
373 }
374
375 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
376 let snippet: String = raw_body.chars().take(200).collect();
377
378 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
379 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
380 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
381 return Err(AppError::LimitExceeded(format!(
382 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
383 chunks_info.len(),
384 REMEMBER_MAX_SAFE_MULTI_CHUNKS
385 )));
386 }
387
388 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
389 let embedding = if chunks_info.len() == 1 {
390 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
391 } else {
392 let chunk_texts: Vec<&str> = chunks_info
393 .iter()
394 .map(|c| chunking::chunk_text(&raw_body, c))
395 .collect();
396 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
397 for chunk_text in &chunk_texts {
398 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
399 &paths.models,
400 chunk_text,
401 )?);
402 }
403 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
404 chunk_embeddings_opt = Some(chunk_embeddings);
405 aggregated
406 };
407
408 let entity_embeddings = extracted_entities
409 .iter()
410 .map(|entity| {
411 let entity_text = match &entity.description {
412 Some(desc) => format!("{} {}", entity.name, desc),
413 None => entity.name.clone(),
414 };
415 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
416 })
417 .collect::<Result<Vec<_>, _>>()?;
418
419 Ok(StagedFile {
420 body: raw_body,
421 body_hash,
422 snippet,
423 name: name.to_string(),
424 description,
425 embedding,
426 chunk_embeddings: chunk_embeddings_opt,
427 chunks_info,
428 entities: extracted_entities,
429 relationships: extracted_relationships,
430 entity_embeddings,
431 urls: extracted_urls,
432 })
433}
434
435fn persist_staged(
437 conn: &mut Connection,
438 namespace: &str,
439 memory_type: &str,
440 staged: StagedFile,
441) -> Result<FileSuccess, AppError> {
442 {
443 let active_count: u32 = conn.query_row(
444 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
445 [],
446 |r| r.get::<_, i64>(0).map(|v| v as u32),
447 )?;
448 let ns_exists: bool = conn.query_row(
449 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
450 rusqlite::params![namespace],
451 |r| r.get::<_, i64>(0).map(|v| v > 0),
452 )?;
453 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
454 return Err(AppError::NamespaceError(format!(
455 "active namespace limit of {} exceeded while creating '{namespace}'",
456 crate::constants::MAX_NAMESPACES_ACTIVE
457 )));
458 }
459 }
460
461 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
462 if existing_memory.is_some() {
463 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
464 &staged.name,
465 namespace,
466 )));
467 }
468 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
469
470 let new_memory = NewMemory {
471 namespace: namespace.to_string(),
472 name: staged.name.clone(),
473 memory_type: memory_type.to_string(),
474 description: staged.description.clone(),
475 body: staged.body,
476 body_hash: staged.body_hash,
477 session_id: None,
478 source: "agent".to_string(),
479 metadata: serde_json::json!({}),
480 };
481
482 if let Some(hash_id) = duplicate_hash_id {
483 tracing::debug!(
484 target: "ingest",
485 duplicate_memory_id = hash_id,
486 "identical body already exists; persisting a new memory anyway"
487 );
488 }
489
490 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
491
492 let memory_id = memories::insert(&tx, &new_memory)?;
493 versions::insert_version(
494 &tx,
495 memory_id,
496 1,
497 &staged.name,
498 memory_type,
499 &staged.description,
500 &new_memory.body,
501 &serde_json::to_string(&new_memory.metadata)?,
502 None,
503 "create",
504 )?;
505 memories::upsert_vec(
506 &tx,
507 memory_id,
508 namespace,
509 memory_type,
510 &staged.embedding,
511 &staged.name,
512 &staged.snippet,
513 )?;
514
515 if staged.chunks_info.len() > 1 {
516 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
517 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
518 AppError::Internal(anyhow::anyhow!(
519 "missing chunk embeddings cache on multi-chunk ingest path"
520 ))
521 })?;
522 for (i, emb) in chunk_embeddings.iter().enumerate() {
523 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
524 }
525 }
526
527 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
528 for (idx, entity) in staged.entities.iter().enumerate() {
529 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
530 let entity_embedding = &staged.entity_embeddings[idx];
531 entities::upsert_entity_vec(
532 &tx,
533 entity_id,
534 namespace,
535 entity.entity_type,
536 entity_embedding,
537 &entity.name,
538 )?;
539 entities::link_memory_entity(&tx, memory_id, entity_id)?;
540 entities::increment_degree(&tx, entity_id)?;
541 }
542 let entity_types: std::collections::HashMap<&str, EntityType> = staged
543 .entities
544 .iter()
545 .map(|entity| (entity.name.as_str(), entity.entity_type))
546 .collect();
547 for rel in &staged.relationships {
548 let source_entity = NewEntity {
549 name: rel.source.clone(),
550 entity_type: entity_types
551 .get(rel.source.as_str())
552 .copied()
553 .unwrap_or(EntityType::Concept),
554 description: None,
555 };
556 let target_entity = NewEntity {
557 name: rel.target.clone(),
558 entity_type: entity_types
559 .get(rel.target.as_str())
560 .copied()
561 .unwrap_or(EntityType::Concept),
562 description: None,
563 };
564 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
565 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
566 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
567 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
568 }
569 }
570
571 tx.commit()?;
572
573 if !staged.urls.is_empty() {
574 let url_entries: Vec<storage_urls::MemoryUrl> = staged
575 .urls
576 .into_iter()
577 .map(|u| storage_urls::MemoryUrl {
578 url: u.url,
579 offset: Some(u.offset as i64),
580 })
581 .collect();
582 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
583 }
584
585 Ok(FileSuccess {
586 memory_id,
587 action: "created".to_string(),
588 })
589}
590
591pub fn run(args: IngestArgs) -> Result<(), AppError> {
592 let started = std::time::Instant::now();
593
594 if !args.dir.exists() {
595 return Err(AppError::NotFound(format!(
596 "directory not found: {}",
597 args.dir.display()
598 )));
599 }
600 if !args.dir.is_dir() {
601 return Err(AppError::Validation(format!(
602 "path is not a directory: {}",
603 args.dir.display()
604 )));
605 }
606
607 let mut files: Vec<PathBuf> = Vec::new();
608 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
609 files.sort();
610
611 if files.len() > args.max_files {
612 return Err(AppError::Validation(format!(
613 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
614 files.len(),
615 args.max_files
616 )));
617 }
618
619 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
620 let memory_type_str = args.r#type.as_str().to_string();
621
622 let paths = AppPaths::resolve(args.db.as_deref())?;
623 let mut conn_or_err = match init_storage(&paths) {
624 Ok(c) => Ok(c),
625 Err(e) => Err(format!("{e}")),
626 };
627
628 let mut succeeded: usize = 0;
629 let mut failed: usize = 0;
630 let mut skipped: usize = 0;
631 let total = files.len();
632
633 let mut taken_names: BTreeSet<String> = BTreeSet::new();
636
637 enum SlotMeta {
643 Skip {
644 file_str: String,
645 derived_base: String,
646 name_truncated: bool,
647 original_name: Option<String>,
648 reason: String,
649 },
650 Process {
651 file_str: String,
652 derived_name: String,
653 name_truncated: bool,
654 original_name: Option<String>,
655 },
656 }
657
658 struct ProcessItem {
659 idx: usize,
660 path: PathBuf,
661 file_str: String,
662 derived_name: String,
663 }
664
665 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
666 let mut process_items: Vec<ProcessItem> = Vec::new();
667 let mut truncations: Vec<(String, String)> = Vec::new();
668
669 for path in &files {
670 let file_str = path.to_string_lossy().into_owned();
671 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
672
673 if name_truncated {
674 if let Some(ref orig) = original_name {
675 truncations.push((orig.clone(), derived_base.clone()));
676 }
677 }
678
679 if derived_base.is_empty() {
680 slots_meta.push(SlotMeta::Skip {
681 file_str,
682 derived_base: String::new(),
683 name_truncated: false,
684 original_name: None,
685 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
686 });
687 continue;
688 }
689
690 match unique_name(&derived_base, &taken_names) {
691 Ok(derived_name) => {
692 taken_names.insert(derived_name.clone());
693 let idx = slots_meta.len();
694 process_items.push(ProcessItem {
695 idx,
696 path: path.clone(),
697 file_str: file_str.clone(),
698 derived_name: derived_name.clone(),
699 });
700 slots_meta.push(SlotMeta::Process {
701 file_str,
702 derived_name,
703 name_truncated,
704 original_name,
705 });
706 }
707 Err(e) => {
708 slots_meta.push(SlotMeta::Skip {
709 file_str,
710 derived_base,
711 name_truncated,
712 original_name,
713 reason: e.to_string(),
714 });
715 }
716 }
717 }
718
719 if !truncations.is_empty() {
720 tracing::info!(
721 target: "ingest",
722 count = truncations.len(),
723 max_len = DERIVED_NAME_MAX_LEN,
724 "derived names truncated; pass -vv (debug) for per-file detail"
725 );
726 }
727
728 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
731
732 let pool = rayon::ThreadPoolBuilder::new()
733 .num_threads(parallelism)
734 .build()
735 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
736
737 let skip_extraction = args.skip_extraction;
738
739 let total_to_process = process_items.len();
740 tracing::info!(
741 target = "ingest",
742 phase = "pipeline_start",
743 files = total_to_process,
744 ingest_parallelism = parallelism,
745 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
746 );
747
748 let channel_bound = (parallelism * 2).max(1);
752 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
753
754 let paths_owned = paths.clone();
759 let producer_handle = std::thread::spawn(move || {
760 pool.install(|| {
761 process_items.into_par_iter().for_each(|item| {
762 let t0 = std::time::Instant::now();
763 let result = stage_file(
764 item.idx,
765 &item.path,
766 &item.derived_name,
767 &paths_owned,
768 skip_extraction,
769 );
770 let elapsed_ms = t0.elapsed().as_millis() as u64;
771
772 let (n_entities, n_relationships) = match &result {
775 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
776 Err(_) => (0, 0),
777 };
778 let progress = StageProgressEvent {
779 schema_version: 1,
780 event: "file_extracted",
781 path: &item.file_str,
782 ms: elapsed_ms,
783 entities: n_entities,
784 relationships: n_relationships,
785 };
786 if let Ok(line) = serde_json::to_string(&progress) {
787 eprintln!("{line}");
788 }
789
790 let _ = tx.send((item.idx, result));
794 });
795 drop(tx);
797 });
798 });
799
800 let fail_fast = args.fail_fast;
812
813 for meta in &slots_meta {
815 if let SlotMeta::Skip {
816 file_str,
817 derived_base,
818 name_truncated,
819 original_name,
820 reason,
821 } = meta
822 {
823 output::emit_json_compact(&IngestFileEvent {
824 file: file_str,
825 name: derived_base,
826 status: "skipped",
827 truncated: *name_truncated,
828 original_name: original_name.clone(),
829 error: Some(reason.clone()),
830 memory_id: None,
831 action: None,
832 })?;
833 skipped += 1;
834 }
835 }
836
837 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
840 .iter()
841 .enumerate()
842 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
843 .collect();
844
845 tracing::info!(
846 target = "ingest",
847 phase = "persist_start",
848 files = total_to_process,
849 "phase B starting: persisting files incrementally as Phase A completes each one",
850 );
851
852 for (idx, stage_result) in rx {
856 let meta = meta_index.get(&idx).ok_or_else(|| {
857 AppError::Internal(anyhow::anyhow!(
858 "channel idx {idx} has no corresponding Process slot"
859 ))
860 })?;
861 let (file_str, derived_name, name_truncated, original_name) = match meta {
862 SlotMeta::Process {
863 file_str,
864 derived_name,
865 name_truncated,
866 original_name,
867 } => (file_str, derived_name, name_truncated, original_name),
868 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
869 };
870
871 let conn = match conn_or_err.as_mut() {
873 Ok(c) => c,
874 Err(err_msg) => {
875 let err_clone = err_msg.clone();
876 output::emit_json_compact(&IngestFileEvent {
877 file: file_str,
878 name: derived_name,
879 status: "failed",
880 truncated: *name_truncated,
881 original_name: original_name.clone(),
882 error: Some(err_clone.clone()),
883 memory_id: None,
884 action: None,
885 })?;
886 failed += 1;
887 if fail_fast {
888 output::emit_json_compact(&IngestSummary {
889 summary: true,
890 dir: args.dir.display().to_string(),
891 pattern: args.pattern.clone(),
892 recursive: args.recursive,
893 files_total: total,
894 files_succeeded: succeeded,
895 files_failed: failed,
896 files_skipped: skipped,
897 elapsed_ms: started.elapsed().as_millis() as u64,
898 })?;
899 return Err(AppError::Validation(format!(
900 "ingest aborted on first failure: {err_clone}"
901 )));
902 }
903 continue;
904 }
905 };
906
907 let outcome =
908 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
909
910 match outcome {
911 Ok(FileSuccess { memory_id, action }) => {
912 output::emit_json_compact(&IngestFileEvent {
913 file: file_str,
914 name: derived_name,
915 status: "indexed",
916 truncated: *name_truncated,
917 original_name: original_name.clone(),
918 error: None,
919 memory_id: Some(memory_id),
920 action: Some(action),
921 })?;
922 succeeded += 1;
923 }
924 Err(e) => {
925 let err_msg = format!("{e}");
926 output::emit_json_compact(&IngestFileEvent {
927 file: file_str,
928 name: derived_name,
929 status: "failed",
930 truncated: *name_truncated,
931 original_name: original_name.clone(),
932 error: Some(err_msg.clone()),
933 memory_id: None,
934 action: None,
935 })?;
936 failed += 1;
937 if fail_fast {
938 output::emit_json_compact(&IngestSummary {
939 summary: true,
940 dir: args.dir.display().to_string(),
941 pattern: args.pattern.clone(),
942 recursive: args.recursive,
943 files_total: total,
944 files_succeeded: succeeded,
945 files_failed: failed,
946 files_skipped: skipped,
947 elapsed_ms: started.elapsed().as_millis() as u64,
948 })?;
949 return Err(AppError::Validation(format!(
950 "ingest aborted on first failure: {err_msg}"
951 )));
952 }
953 }
954 }
955 }
956
957 producer_handle
959 .join()
960 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
961
962 output::emit_json_compact(&IngestSummary {
963 summary: true,
964 dir: args.dir.display().to_string(),
965 pattern: args.pattern.clone(),
966 recursive: args.recursive,
967 files_total: total,
968 files_succeeded: succeeded,
969 files_failed: failed,
970 files_skipped: skipped,
971 elapsed_ms: started.elapsed().as_millis() as u64,
972 })?;
973
974 Ok(())
975}
976
977fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
983 ensure_db_ready(paths)?;
984 let conn = open_rw(&paths.db)?;
985 Ok(conn)
986}
987
988fn is_valid_relation(relation: &str) -> bool {
989 matches!(
990 relation,
991 "applies_to"
992 | "uses"
993 | "depends_on"
994 | "causes"
995 | "fixes"
996 | "contradicts"
997 | "supports"
998 | "follows"
999 | "related"
1000 | "mentions"
1001 | "replaces"
1002 | "tracked_in"
1003 )
1004}
1005
1006fn collect_files(
1007 dir: &Path,
1008 pattern: &str,
1009 recursive: bool,
1010 out: &mut Vec<PathBuf>,
1011) -> Result<(), AppError> {
1012 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1013 for entry in entries {
1014 let entry = entry.map_err(AppError::Io)?;
1015 let path = entry.path();
1016 let file_type = entry.file_type().map_err(AppError::Io)?;
1017 if file_type.is_file() {
1018 let name = entry.file_name();
1019 let name_str = name.to_string_lossy();
1020 if matches_pattern(&name_str, pattern) {
1021 out.push(path);
1022 }
1023 } else if file_type.is_dir() && recursive {
1024 collect_files(&path, pattern, recursive, out)?;
1025 }
1026 }
1027 Ok(())
1028}
1029
1030fn matches_pattern(name: &str, pattern: &str) -> bool {
1031 if let Some(suffix) = pattern.strip_prefix('*') {
1032 name.ends_with(suffix)
1033 } else if let Some(prefix) = pattern.strip_suffix('*') {
1034 name.starts_with(prefix)
1035 } else {
1036 name == pattern
1037 }
1038}
1039
1040fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1051 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1052 let lowered: String = stem
1053 .nfd()
1054 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1055 .map(|c| {
1056 if c == '_' || c.is_whitespace() {
1057 '-'
1058 } else {
1059 c
1060 }
1061 })
1062 .map(|c| c.to_ascii_lowercase())
1063 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1064 .collect();
1065 let collapsed = collapse_dashes(&lowered);
1066 let trimmed = collapsed.trim_matches('-').to_string();
1067 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1068 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1069 .trim_matches('-')
1070 .to_string();
1071 tracing::debug!(
1072 target: "ingest",
1073 original = %trimmed,
1074 truncated_to = %truncated,
1075 max_len = DERIVED_NAME_MAX_LEN,
1076 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1077 );
1078 (truncated, true, Some(trimmed))
1079 } else {
1080 (trimmed, false, None)
1081 }
1082}
1083
1084fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1097 if !taken.contains(base) {
1098 return Ok(base.to_string());
1099 }
1100 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1101 let candidate = format!("{base}-{suffix}");
1102 if !taken.contains(&candidate) {
1103 tracing::warn!(
1104 target: "ingest",
1105 base = %base,
1106 resolved = %candidate,
1107 suffix,
1108 "memory name collision resolved with numeric suffix"
1109 );
1110 return Ok(candidate);
1111 }
1112 }
1113 Err(AppError::Validation(format!(
1114 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1115 )))
1116}
1117
1118fn collapse_dashes(s: &str) -> String {
1119 let mut out = String::with_capacity(s.len());
1120 let mut prev_dash = false;
1121 for c in s.chars() {
1122 if c == '-' {
1123 if !prev_dash {
1124 out.push('-');
1125 }
1126 prev_dash = true;
1127 } else {
1128 out.push(c);
1129 prev_dash = false;
1130 }
1131 }
1132 out
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use super::*;
1138 use std::path::PathBuf;
1139
1140 #[test]
1141 fn matches_pattern_suffix() {
1142 assert!(matches_pattern("foo.md", "*.md"));
1143 assert!(!matches_pattern("foo.txt", "*.md"));
1144 assert!(matches_pattern("foo.md", "*"));
1145 }
1146
1147 #[test]
1148 fn matches_pattern_prefix() {
1149 assert!(matches_pattern("README.md", "README*"));
1150 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1151 }
1152
1153 #[test]
1154 fn matches_pattern_exact() {
1155 assert!(matches_pattern("README.md", "README.md"));
1156 assert!(!matches_pattern("readme.md", "README.md"));
1157 }
1158
1159 #[test]
1160 fn derive_kebab_underscore_to_dash() {
1161 let p = PathBuf::from("/tmp/claude_code_headless.md");
1162 let (name, truncated, original) = derive_kebab_name(&p);
1163 assert_eq!(name, "claude-code-headless");
1164 assert!(!truncated);
1165 assert!(original.is_none());
1166 }
1167
1168 #[test]
1169 fn derive_kebab_uppercase_lowered() {
1170 let p = PathBuf::from("/tmp/README.md");
1171 let (name, truncated, original) = derive_kebab_name(&p);
1172 assert_eq!(name, "readme");
1173 assert!(!truncated);
1174 assert!(original.is_none());
1175 }
1176
1177 #[test]
1178 fn derive_kebab_strips_non_kebab_chars() {
1179 let p = PathBuf::from("/tmp/some@weird#name!.md");
1180 let (name, truncated, original) = derive_kebab_name(&p);
1181 assert_eq!(name, "someweirdname");
1182 assert!(!truncated);
1183 assert!(original.is_none());
1184 }
1185
1186 #[test]
1189 fn derive_kebab_folds_accented_letters_to_ascii() {
1190 let p = PathBuf::from("/tmp/açaí.md");
1191 let (name, _, _) = derive_kebab_name(&p);
1192 assert_eq!(name, "acai", "got '{name}'");
1193 }
1194
1195 #[test]
1196 fn derive_kebab_handles_naive_with_diaeresis() {
1197 let p = PathBuf::from("/tmp/naïve-test.md");
1198 let (name, _, _) = derive_kebab_name(&p);
1199 assert_eq!(name, "naive-test", "got '{name}'");
1200 }
1201
1202 #[test]
1203 fn derive_kebab_drops_emoji_keeps_word() {
1204 let p = PathBuf::from("/tmp/🚀-rocket.md");
1205 let (name, _, _) = derive_kebab_name(&p);
1206 assert_eq!(name, "rocket", "got '{name}'");
1207 }
1208
1209 #[test]
1210 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1211 let p = PathBuf::from("/tmp/açaí🦜.md");
1212 let (name, _, _) = derive_kebab_name(&p);
1213 assert_eq!(name, "acai", "got '{name}'");
1214 }
1215
1216 #[test]
1217 fn derive_kebab_pure_emoji_yields_empty() {
1218 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1219 let (name, _, _) = derive_kebab_name(&p);
1220 assert!(name.is_empty(), "got '{name}'");
1221 }
1222
1223 #[test]
1224 fn derive_kebab_collapses_consecutive_dashes() {
1225 let p = PathBuf::from("/tmp/a__b___c.md");
1226 let (name, truncated, original) = derive_kebab_name(&p);
1227 assert_eq!(name, "a-b-c");
1228 assert!(!truncated);
1229 assert!(original.is_none());
1230 }
1231
1232 #[test]
1233 fn derive_kebab_truncates_to_60_chars() {
1234 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1235 let (name, truncated, original) = derive_kebab_name(&p);
1236 assert!(name.len() <= 60, "got len {}", name.len());
1237 assert!(truncated);
1238 assert!(original.is_some());
1239 assert!(original.unwrap().len() > 60);
1240 }
1241
1242 #[test]
1243 fn collect_files_finds_md_files() {
1244 let tmp = tempfile::tempdir().expect("tempdir");
1245 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1246 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1247 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1248 let mut out = Vec::new();
1249 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1250 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1251 }
1252
1253 #[test]
1254 fn collect_files_recursive_descends_subdirs() {
1255 let tmp = tempfile::tempdir().expect("tempdir");
1256 let sub = tmp.path().join("sub");
1257 std::fs::create_dir(&sub).unwrap();
1258 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1259 std::fs::write(sub.join("b.md"), "y").unwrap();
1260 let mut out = Vec::new();
1261 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1262 assert_eq!(out.len(), 2);
1263 }
1264
1265 #[test]
1266 fn collect_files_non_recursive_skips_subdirs() {
1267 let tmp = tempfile::tempdir().expect("tempdir");
1268 let sub = tmp.path().join("sub");
1269 std::fs::create_dir(&sub).unwrap();
1270 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1271 std::fs::write(sub.join("b.md"), "y").unwrap();
1272 let mut out = Vec::new();
1273 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1274 assert_eq!(out.len(), 1);
1275 }
1276
1277 #[test]
1280 fn derive_kebab_long_basename_truncated_within_cap() {
1281 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1282 let (name, truncated, original) = derive_kebab_name(&p);
1283 assert!(
1284 name.len() <= DERIVED_NAME_MAX_LEN,
1285 "truncated name must respect cap; got {} chars",
1286 name.len()
1287 );
1288 assert!(!name.is_empty());
1289 assert!(truncated);
1290 assert!(original.is_some());
1291 }
1292
1293 #[test]
1294 fn unique_name_returns_base_when_free() {
1295 let taken: BTreeSet<String> = BTreeSet::new();
1296 let resolved = unique_name("note", &taken).expect("must resolve");
1297 assert_eq!(resolved, "note");
1298 }
1299
1300 #[test]
1301 fn unique_name_appends_first_free_suffix_on_collision() {
1302 let mut taken: BTreeSet<String> = BTreeSet::new();
1303 taken.insert("note".to_string());
1304 taken.insert("note-1".to_string());
1305 let resolved = unique_name("note", &taken).expect("must resolve");
1306 assert_eq!(resolved, "note-2");
1307 }
1308
1309 #[test]
1310 fn unique_name_errors_after_collision_cap() {
1311 let mut taken: BTreeSet<String> = BTreeSet::new();
1312 taken.insert("note".to_string());
1313 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1314 taken.insert(format!("note-{i}"));
1315 }
1316 let err = unique_name("note", &taken).expect_err("must surface error");
1317 assert!(matches!(err, AppError::Validation(_)));
1318 }
1319
1320 #[test]
1323 fn is_valid_relation_accepts_canonical_relations() {
1324 assert!(is_valid_relation("applies_to"));
1325 assert!(is_valid_relation("depends_on"));
1326 assert!(!is_valid_relation("foo_bar"));
1327 }
1328
1329 use serial_test::serial;
1332
1333 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1335 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1336 let prev = std::env::var(key).ok();
1337 match value {
1338 Some(v) => std::env::set_var(key, v),
1339 None => std::env::remove_var(key),
1340 }
1341 f();
1342 match prev {
1343 Some(p) => std::env::set_var(key, p),
1344 None => std::env::remove_var(key),
1345 }
1346 }
1347
1348 #[test]
1349 #[serial]
1350 fn env_low_memory_enabled_unset_returns_false() {
1351 with_env_var(None, || assert!(!env_low_memory_enabled()));
1352 }
1353
1354 #[test]
1355 #[serial]
1356 fn env_low_memory_enabled_empty_returns_false() {
1357 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1358 }
1359
1360 #[test]
1361 #[serial]
1362 fn env_low_memory_enabled_truthy_values_return_true() {
1363 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1364 with_env_var(Some(v), || {
1365 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1366 });
1367 }
1368 }
1369
1370 #[test]
1371 #[serial]
1372 fn env_low_memory_enabled_falsy_values_return_false() {
1373 for v in ["0", "false", "FALSE", "no", "off"] {
1374 with_env_var(Some(v), || {
1375 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1376 });
1377 }
1378 }
1379
1380 #[test]
1381 #[serial]
1382 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1383 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1384 }
1385
1386 #[test]
1387 #[serial]
1388 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1389 with_env_var(None, || {
1390 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1391 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1392 assert_eq!(resolve_parallelism(true, None), 1);
1393 });
1394 }
1395
1396 #[test]
1397 #[serial]
1398 fn resolve_parallelism_env_forces_one_when_flag_off() {
1399 with_env_var(Some("1"), || {
1400 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1401 assert_eq!(resolve_parallelism(false, None), 1);
1402 });
1403 }
1404
1405 #[test]
1406 #[serial]
1407 fn resolve_parallelism_falsy_env_does_not_override() {
1408 with_env_var(Some("0"), || {
1409 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1410 });
1411 }
1412
1413 #[test]
1414 #[serial]
1415 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1416 with_env_var(None, || {
1417 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1418 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1419 });
1420 }
1421
1422 #[test]
1423 #[serial]
1424 fn resolve_parallelism_default_when_unset() {
1425 with_env_var(None, || {
1426 let p = resolve_parallelism(false, None);
1427 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1428 });
1429 }
1430
1431 #[test]
1432 fn ingest_args_parses_low_memory_flag_via_clap() {
1433 use clap::Parser;
1434 let cli = crate::cli::Cli::try_parse_from([
1437 "sqlite-graphrag",
1438 "ingest",
1439 "/tmp/dummy",
1440 "--type",
1441 "document",
1442 "--low-memory",
1443 ])
1444 .expect("parse must succeed");
1445 match cli.command {
1446 crate::cli::Commands::Ingest(args) => {
1447 assert!(args.low_memory, "--low-memory must set field to true");
1448 }
1449 _ => panic!("expected Ingest subcommand"),
1450 }
1451 }
1452
1453 #[test]
1454 fn ingest_args_low_memory_defaults_false() {
1455 use clap::Parser;
1456 let cli = crate::cli::Cli::try_parse_from([
1457 "sqlite-graphrag",
1458 "ingest",
1459 "/tmp/dummy",
1460 "--type",
1461 "document",
1462 ])
1463 .expect("parse must succeed");
1464 match cli.command {
1465 crate::cli::Commands::Ingest(args) => {
1466 assert!(!args.low_memory, "default must be false");
1467 }
1468 _ => panic!("expected Ingest subcommand"),
1469 }
1470 }
1471}