1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable BERT NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(
95 long,
96 env = "SQLITE_GRAPHRAG_ENABLE_NER",
97 value_parser = crate::parsers::parse_bool_flexible,
98 action = clap::ArgAction::Set,
99 num_args = 0..=1,
100 default_missing_value = "true",
101 default_value = "false",
102 help = "Enable automatic BERT NER entity/relationship extraction (disabled by default)"
103 )]
104 pub enable_ner: bool,
105
106 #[arg(long, default_value_t = false, hide = true)]
108 pub skip_extraction: bool,
109
110 #[arg(long, default_value_t = false)]
112 pub fail_fast: bool,
113
114 #[arg(long, default_value_t = 10_000)]
116 pub max_files: usize,
117
118 #[arg(long)]
120 pub namespace: Option<String>,
121
122 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
124 pub db: Option<String>,
125
126 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
127 pub format: JsonOutputFormat,
128
129 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
130 pub json: bool,
131
132 #[arg(
134 long,
135 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
136 )]
137 pub ingest_parallelism: Option<usize>,
138
139 #[arg(
147 long,
148 default_value_t = false,
149 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
150 Recommended for environments with <4 GB available RAM or container/cgroup \
151 constraints. Trade-off: 3-4x longer wall time. Also honored via \
152 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
153 )]
154 pub low_memory: bool,
155}
156
157fn env_low_memory_enabled() -> bool {
162 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
163 Ok(v) if v.is_empty() => false,
164 Ok(v) => match v.to_lowercase().as_str() {
165 "1" | "true" | "yes" | "on" => true,
166 "0" | "false" | "no" | "off" => false,
167 other => {
168 tracing::warn!(
169 target: "ingest",
170 value = %other,
171 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
172 );
173 false
174 }
175 },
176 Err(_) => false,
177 }
178}
179
180fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
192 let env_flag = env_low_memory_enabled();
193 let low_memory = low_memory_flag || env_flag;
194
195 if low_memory {
196 if let Some(n) = ingest_parallelism {
197 if n > 1 {
198 tracing::warn!(
199 target: "ingest",
200 requested = n,
201 "--ingest-parallelism overridden by --low-memory; using 1"
202 );
203 }
204 }
205 if low_memory_flag {
206 tracing::info!(
207 target: "ingest",
208 source = "flag",
209 "low-memory mode enabled: forcing --ingest-parallelism 1"
210 );
211 } else {
212 tracing::info!(
213 target: "ingest",
214 source = "env",
215 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
216 );
217 }
218 return 1;
219 }
220
221 ingest_parallelism
222 .unwrap_or_else(|| {
223 std::thread::available_parallelism()
224 .map(|v| v.get() / 2)
225 .unwrap_or(1)
226 .clamp(1, 4)
227 })
228 .max(1)
229}
230
231#[derive(Serialize)]
232struct IngestFileEvent<'a> {
233 file: &'a str,
234 name: &'a str,
235 status: &'a str,
236 truncated: bool,
238 #[serde(skip_serializing_if = "Option::is_none")]
240 original_name: Option<String>,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 error: Option<String>,
243 #[serde(skip_serializing_if = "Option::is_none")]
244 memory_id: Option<i64>,
245 #[serde(skip_serializing_if = "Option::is_none")]
246 action: Option<String>,
247}
248
249#[derive(Serialize)]
250struct IngestSummary {
251 summary: bool,
252 dir: String,
253 pattern: String,
254 recursive: bool,
255 files_total: usize,
256 files_succeeded: usize,
257 files_failed: usize,
258 files_skipped: usize,
259 elapsed_ms: u64,
260}
261
262struct FileSuccess {
264 memory_id: i64,
265 action: String,
266}
267
268#[derive(Serialize)]
271struct StageProgressEvent<'a> {
272 schema_version: u8,
273 event: &'a str,
274 path: &'a str,
275 ms: u64,
276 entities: usize,
277 relationships: usize,
278}
279
280struct StagedFile {
283 body: String,
284 body_hash: String,
285 snippet: String,
286 name: String,
287 description: String,
288 embedding: Vec<f32>,
289 chunk_embeddings: Option<Vec<Vec<f32>>>,
290 chunks_info: Vec<crate::chunking::Chunk>,
291 entities: Vec<NewEntity>,
292 relationships: Vec<NewRelationship>,
293 entity_embeddings: Vec<Vec<f32>>,
294 urls: Vec<crate::extraction::ExtractedUrl>,
295}
296
297fn stage_file(
300 _idx: usize,
301 path: &Path,
302 name: &str,
303 paths: &AppPaths,
304 enable_ner: bool,
305) -> Result<StagedFile, AppError> {
306 use crate::constants::*;
307
308 if name.len() > MAX_MEMORY_NAME_LEN {
309 return Err(AppError::LimitExceeded(
310 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
311 ));
312 }
313 if name.starts_with("__") {
314 return Err(AppError::Validation(
315 crate::i18n::validation::reserved_name(),
316 ));
317 }
318 {
319 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
320 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
321 if !slug_re.is_match(name) {
322 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
323 name,
324 )));
325 }
326 }
327
328 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
329 if raw_body.len() > MAX_MEMORY_BODY_LEN {
330 return Err(AppError::LimitExceeded(
331 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
332 ));
333 }
334 if raw_body.trim().is_empty() {
335 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
336 }
337
338 let description = format!("ingested from {}", path.display());
339 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
340 return Err(AppError::Validation(
341 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
342 ));
343 }
344
345 let mut extracted_entities: Vec<NewEntity> = Vec::new();
346 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
347 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
348 if enable_ner {
349 match crate::extraction::extract_graph_auto(&raw_body, paths) {
350 Ok(extracted) => {
351 extracted_urls = extracted.urls;
352 extracted_entities = extracted.entities;
353 extracted_relationships = extracted.relationships;
354
355 if extracted_entities.len() > max_entities_per_memory() {
356 extracted_entities.truncate(max_entities_per_memory());
357 }
358 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
359 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
360 }
361 }
362 Err(e) => {
363 tracing::warn!(
364 file = %path.display(),
365 "auto-extraction failed (graceful degradation): {e:#}"
366 );
367 }
368 }
369 }
370
371 for rel in &mut extracted_relationships {
372 rel.relation = rel.relation.replace('-', "_");
373 if !is_valid_relation(&rel.relation) {
374 return Err(AppError::Validation(format!(
375 "invalid relation '{}' for relationship '{}' -> '{}'",
376 rel.relation, rel.source, rel.target
377 )));
378 }
379 if !(0.0..=1.0).contains(&rel.strength) {
380 return Err(AppError::Validation(format!(
381 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
382 rel.strength, rel.source, rel.target
383 )));
384 }
385 }
386
387 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
388 let snippet: String = raw_body.chars().take(200).collect();
389
390 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
391 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
392 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
393 return Err(AppError::LimitExceeded(format!(
394 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
395 chunks_info.len(),
396 REMEMBER_MAX_SAFE_MULTI_CHUNKS
397 )));
398 }
399
400 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
401 let embedding = if chunks_info.len() == 1 {
402 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
403 } else {
404 let chunk_texts: Vec<&str> = chunks_info
405 .iter()
406 .map(|c| chunking::chunk_text(&raw_body, c))
407 .collect();
408 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
409 for chunk_text in &chunk_texts {
410 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
411 &paths.models,
412 chunk_text,
413 )?);
414 }
415 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
416 chunk_embeddings_opt = Some(chunk_embeddings);
417 aggregated
418 };
419
420 let entity_embeddings = extracted_entities
421 .iter()
422 .map(|entity| {
423 let entity_text = match &entity.description {
424 Some(desc) => format!("{} {}", entity.name, desc),
425 None => entity.name.clone(),
426 };
427 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
428 })
429 .collect::<Result<Vec<_>, _>>()?;
430
431 Ok(StagedFile {
432 body: raw_body,
433 body_hash,
434 snippet,
435 name: name.to_string(),
436 description,
437 embedding,
438 chunk_embeddings: chunk_embeddings_opt,
439 chunks_info,
440 entities: extracted_entities,
441 relationships: extracted_relationships,
442 entity_embeddings,
443 urls: extracted_urls,
444 })
445}
446
447fn persist_staged(
449 conn: &mut Connection,
450 namespace: &str,
451 memory_type: &str,
452 staged: StagedFile,
453) -> Result<FileSuccess, AppError> {
454 {
455 let active_count: u32 = conn.query_row(
456 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
457 [],
458 |r| r.get::<_, i64>(0).map(|v| v as u32),
459 )?;
460 let ns_exists: bool = conn.query_row(
461 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
462 rusqlite::params![namespace],
463 |r| r.get::<_, i64>(0).map(|v| v > 0),
464 )?;
465 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
466 return Err(AppError::NamespaceError(format!(
467 "active namespace limit of {} exceeded while creating '{namespace}'",
468 crate::constants::MAX_NAMESPACES_ACTIVE
469 )));
470 }
471 }
472
473 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
474 if existing_memory.is_some() {
475 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
476 &staged.name,
477 namespace,
478 )));
479 }
480 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
481
482 let new_memory = NewMemory {
483 namespace: namespace.to_string(),
484 name: staged.name.clone(),
485 memory_type: memory_type.to_string(),
486 description: staged.description.clone(),
487 body: staged.body,
488 body_hash: staged.body_hash,
489 session_id: None,
490 source: "agent".to_string(),
491 metadata: serde_json::json!({}),
492 };
493
494 if let Some(hash_id) = duplicate_hash_id {
495 tracing::debug!(
496 target: "ingest",
497 duplicate_memory_id = hash_id,
498 "identical body already exists; persisting a new memory anyway"
499 );
500 }
501
502 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
503
504 let memory_id = memories::insert(&tx, &new_memory)?;
505 versions::insert_version(
506 &tx,
507 memory_id,
508 1,
509 &staged.name,
510 memory_type,
511 &staged.description,
512 &new_memory.body,
513 &serde_json::to_string(&new_memory.metadata)?,
514 None,
515 "create",
516 )?;
517 memories::upsert_vec(
518 &tx,
519 memory_id,
520 namespace,
521 memory_type,
522 &staged.embedding,
523 &staged.name,
524 &staged.snippet,
525 )?;
526
527 if staged.chunks_info.len() > 1 {
528 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
529 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
530 AppError::Internal(anyhow::anyhow!(
531 "missing chunk embeddings cache on multi-chunk ingest path"
532 ))
533 })?;
534 for (i, emb) in chunk_embeddings.iter().enumerate() {
535 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
536 }
537 }
538
539 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
540 for (idx, entity) in staged.entities.iter().enumerate() {
541 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
542 let entity_embedding = &staged.entity_embeddings[idx];
543 entities::upsert_entity_vec(
544 &tx,
545 entity_id,
546 namespace,
547 entity.entity_type,
548 entity_embedding,
549 &entity.name,
550 )?;
551 entities::link_memory_entity(&tx, memory_id, entity_id)?;
552 entities::increment_degree(&tx, entity_id)?;
553 }
554 let entity_types: std::collections::HashMap<&str, EntityType> = staged
555 .entities
556 .iter()
557 .map(|entity| (entity.name.as_str(), entity.entity_type))
558 .collect();
559 for rel in &staged.relationships {
560 let source_entity = NewEntity {
561 name: rel.source.clone(),
562 entity_type: entity_types
563 .get(rel.source.as_str())
564 .copied()
565 .unwrap_or(EntityType::Concept),
566 description: None,
567 };
568 let target_entity = NewEntity {
569 name: rel.target.clone(),
570 entity_type: entity_types
571 .get(rel.target.as_str())
572 .copied()
573 .unwrap_or(EntityType::Concept),
574 description: None,
575 };
576 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
577 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
578 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
579 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
580 }
581 }
582
583 tx.commit()?;
584
585 if !staged.urls.is_empty() {
586 let url_entries: Vec<storage_urls::MemoryUrl> = staged
587 .urls
588 .into_iter()
589 .map(|u| storage_urls::MemoryUrl {
590 url: u.url,
591 offset: Some(u.offset as i64),
592 })
593 .collect();
594 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
595 }
596
597 Ok(FileSuccess {
598 memory_id,
599 action: "created".to_string(),
600 })
601}
602
603pub fn run(args: IngestArgs) -> Result<(), AppError> {
604 let started = std::time::Instant::now();
605
606 if !args.dir.exists() {
607 return Err(AppError::NotFound(format!(
608 "directory not found: {}",
609 args.dir.display()
610 )));
611 }
612 if !args.dir.is_dir() {
613 return Err(AppError::Validation(format!(
614 "path is not a directory: {}",
615 args.dir.display()
616 )));
617 }
618
619 let mut files: Vec<PathBuf> = Vec::new();
620 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
621 files.sort();
622
623 if files.len() > args.max_files {
624 return Err(AppError::Validation(format!(
625 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
626 files.len(),
627 args.max_files
628 )));
629 }
630
631 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
632 let memory_type_str = args.r#type.as_str().to_string();
633
634 let paths = AppPaths::resolve(args.db.as_deref())?;
635 let mut conn_or_err = match init_storage(&paths) {
636 Ok(c) => Ok(c),
637 Err(e) => Err(format!("{e}")),
638 };
639
640 let mut succeeded: usize = 0;
641 let mut failed: usize = 0;
642 let mut skipped: usize = 0;
643 let total = files.len();
644
645 let mut taken_names: BTreeSet<String> = BTreeSet::new();
648
649 enum SlotMeta {
655 Skip {
656 file_str: String,
657 derived_base: String,
658 name_truncated: bool,
659 original_name: Option<String>,
660 reason: String,
661 },
662 Process {
663 file_str: String,
664 derived_name: String,
665 name_truncated: bool,
666 original_name: Option<String>,
667 },
668 }
669
670 struct ProcessItem {
671 idx: usize,
672 path: PathBuf,
673 file_str: String,
674 derived_name: String,
675 }
676
677 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
678 let mut process_items: Vec<ProcessItem> = Vec::new();
679 let mut truncations: Vec<(String, String)> = Vec::new();
680
681 for path in &files {
682 let file_str = path.to_string_lossy().into_owned();
683 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
684
685 if name_truncated {
686 if let Some(ref orig) = original_name {
687 truncations.push((orig.clone(), derived_base.clone()));
688 }
689 }
690
691 if derived_base.is_empty() {
692 slots_meta.push(SlotMeta::Skip {
693 file_str,
694 derived_base: String::new(),
695 name_truncated: false,
696 original_name: None,
697 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
698 });
699 continue;
700 }
701
702 match unique_name(&derived_base, &taken_names) {
703 Ok(derived_name) => {
704 taken_names.insert(derived_name.clone());
705 let idx = slots_meta.len();
706 process_items.push(ProcessItem {
707 idx,
708 path: path.clone(),
709 file_str: file_str.clone(),
710 derived_name: derived_name.clone(),
711 });
712 slots_meta.push(SlotMeta::Process {
713 file_str,
714 derived_name,
715 name_truncated,
716 original_name,
717 });
718 }
719 Err(e) => {
720 slots_meta.push(SlotMeta::Skip {
721 file_str,
722 derived_base,
723 name_truncated,
724 original_name,
725 reason: e.to_string(),
726 });
727 }
728 }
729 }
730
731 if !truncations.is_empty() {
732 tracing::info!(
733 target: "ingest",
734 count = truncations.len(),
735 max_len = DERIVED_NAME_MAX_LEN,
736 "derived names truncated; pass -vv (debug) for per-file detail"
737 );
738 }
739
740 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
743
744 let pool = rayon::ThreadPoolBuilder::new()
745 .num_threads(parallelism)
746 .build()
747 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
748
749 if args.enable_ner && args.skip_extraction {
750 tracing::warn!(
751 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
752 );
753 }
754 let enable_ner = args.enable_ner;
755
756 let total_to_process = process_items.len();
757 tracing::info!(
758 target = "ingest",
759 phase = "pipeline_start",
760 files = total_to_process,
761 ingest_parallelism = parallelism,
762 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
763 );
764
765 let channel_bound = (parallelism * 2).max(1);
769 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
770
771 let paths_owned = paths.clone();
776 let producer_handle = std::thread::spawn(move || {
777 pool.install(|| {
778 process_items.into_par_iter().for_each(|item| {
779 let t0 = std::time::Instant::now();
780 let result = stage_file(
781 item.idx,
782 &item.path,
783 &item.derived_name,
784 &paths_owned,
785 enable_ner,
786 );
787 let elapsed_ms = t0.elapsed().as_millis() as u64;
788
789 let (n_entities, n_relationships) = match &result {
792 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
793 Err(_) => (0, 0),
794 };
795 let progress = StageProgressEvent {
796 schema_version: 1,
797 event: "file_extracted",
798 path: &item.file_str,
799 ms: elapsed_ms,
800 entities: n_entities,
801 relationships: n_relationships,
802 };
803 if let Ok(line) = serde_json::to_string(&progress) {
804 eprintln!("{line}");
805 }
806
807 let _ = tx.send((item.idx, result));
811 });
812 drop(tx);
814 });
815 });
816
817 let fail_fast = args.fail_fast;
829
830 for meta in &slots_meta {
832 if let SlotMeta::Skip {
833 file_str,
834 derived_base,
835 name_truncated,
836 original_name,
837 reason,
838 } = meta
839 {
840 output::emit_json_compact(&IngestFileEvent {
841 file: file_str,
842 name: derived_base,
843 status: "skipped",
844 truncated: *name_truncated,
845 original_name: original_name.clone(),
846 error: Some(reason.clone()),
847 memory_id: None,
848 action: None,
849 })?;
850 skipped += 1;
851 }
852 }
853
854 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
857 .iter()
858 .enumerate()
859 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
860 .collect();
861
862 tracing::info!(
863 target = "ingest",
864 phase = "persist_start",
865 files = total_to_process,
866 "phase B starting: persisting files incrementally as Phase A completes each one",
867 );
868
869 for (idx, stage_result) in rx {
873 let meta = meta_index.get(&idx).ok_or_else(|| {
874 AppError::Internal(anyhow::anyhow!(
875 "channel idx {idx} has no corresponding Process slot"
876 ))
877 })?;
878 let (file_str, derived_name, name_truncated, original_name) = match meta {
879 SlotMeta::Process {
880 file_str,
881 derived_name,
882 name_truncated,
883 original_name,
884 } => (file_str, derived_name, name_truncated, original_name),
885 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
886 };
887
888 let conn = match conn_or_err.as_mut() {
890 Ok(c) => c,
891 Err(err_msg) => {
892 let err_clone = err_msg.clone();
893 output::emit_json_compact(&IngestFileEvent {
894 file: file_str,
895 name: derived_name,
896 status: "failed",
897 truncated: *name_truncated,
898 original_name: original_name.clone(),
899 error: Some(err_clone.clone()),
900 memory_id: None,
901 action: None,
902 })?;
903 failed += 1;
904 if fail_fast {
905 output::emit_json_compact(&IngestSummary {
906 summary: true,
907 dir: args.dir.display().to_string(),
908 pattern: args.pattern.clone(),
909 recursive: args.recursive,
910 files_total: total,
911 files_succeeded: succeeded,
912 files_failed: failed,
913 files_skipped: skipped,
914 elapsed_ms: started.elapsed().as_millis() as u64,
915 })?;
916 return Err(AppError::Validation(format!(
917 "ingest aborted on first failure: {err_clone}"
918 )));
919 }
920 continue;
921 }
922 };
923
924 let outcome =
925 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
926
927 match outcome {
928 Ok(FileSuccess { memory_id, action }) => {
929 output::emit_json_compact(&IngestFileEvent {
930 file: file_str,
931 name: derived_name,
932 status: "indexed",
933 truncated: *name_truncated,
934 original_name: original_name.clone(),
935 error: None,
936 memory_id: Some(memory_id),
937 action: Some(action),
938 })?;
939 succeeded += 1;
940 }
941 Err(e) => {
942 let err_msg = format!("{e}");
943 output::emit_json_compact(&IngestFileEvent {
944 file: file_str,
945 name: derived_name,
946 status: "failed",
947 truncated: *name_truncated,
948 original_name: original_name.clone(),
949 error: Some(err_msg.clone()),
950 memory_id: None,
951 action: None,
952 })?;
953 failed += 1;
954 if fail_fast {
955 output::emit_json_compact(&IngestSummary {
956 summary: true,
957 dir: args.dir.display().to_string(),
958 pattern: args.pattern.clone(),
959 recursive: args.recursive,
960 files_total: total,
961 files_succeeded: succeeded,
962 files_failed: failed,
963 files_skipped: skipped,
964 elapsed_ms: started.elapsed().as_millis() as u64,
965 })?;
966 return Err(AppError::Validation(format!(
967 "ingest aborted on first failure: {err_msg}"
968 )));
969 }
970 }
971 }
972 }
973
974 producer_handle
976 .join()
977 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
978
979 output::emit_json_compact(&IngestSummary {
980 summary: true,
981 dir: args.dir.display().to_string(),
982 pattern: args.pattern.clone(),
983 recursive: args.recursive,
984 files_total: total,
985 files_succeeded: succeeded,
986 files_failed: failed,
987 files_skipped: skipped,
988 elapsed_ms: started.elapsed().as_millis() as u64,
989 })?;
990
991 Ok(())
992}
993
994fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1000 ensure_db_ready(paths)?;
1001 let conn = open_rw(&paths.db)?;
1002 Ok(conn)
1003}
1004
1005fn is_valid_relation(relation: &str) -> bool {
1006 matches!(
1007 relation,
1008 "applies_to"
1009 | "uses"
1010 | "depends_on"
1011 | "causes"
1012 | "fixes"
1013 | "contradicts"
1014 | "supports"
1015 | "follows"
1016 | "related"
1017 | "mentions"
1018 | "replaces"
1019 | "tracked_in"
1020 )
1021}
1022
1023fn collect_files(
1024 dir: &Path,
1025 pattern: &str,
1026 recursive: bool,
1027 out: &mut Vec<PathBuf>,
1028) -> Result<(), AppError> {
1029 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1030 for entry in entries {
1031 let entry = entry.map_err(AppError::Io)?;
1032 let path = entry.path();
1033 let file_type = entry.file_type().map_err(AppError::Io)?;
1034 if file_type.is_file() {
1035 let name = entry.file_name();
1036 let name_str = name.to_string_lossy();
1037 if matches_pattern(&name_str, pattern) {
1038 out.push(path);
1039 }
1040 } else if file_type.is_dir() && recursive {
1041 collect_files(&path, pattern, recursive, out)?;
1042 }
1043 }
1044 Ok(())
1045}
1046
1047fn matches_pattern(name: &str, pattern: &str) -> bool {
1048 if let Some(suffix) = pattern.strip_prefix('*') {
1049 name.ends_with(suffix)
1050 } else if let Some(prefix) = pattern.strip_suffix('*') {
1051 name.starts_with(prefix)
1052 } else {
1053 name == pattern
1054 }
1055}
1056
1057fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1068 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1069 let lowered: String = stem
1070 .nfd()
1071 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1072 .map(|c| {
1073 if c == '_' || c.is_whitespace() {
1074 '-'
1075 } else {
1076 c
1077 }
1078 })
1079 .map(|c| c.to_ascii_lowercase())
1080 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1081 .collect();
1082 let collapsed = collapse_dashes(&lowered);
1083 let trimmed = collapsed.trim_matches('-').to_string();
1084 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1085 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1086 .trim_matches('-')
1087 .to_string();
1088 tracing::debug!(
1089 target: "ingest",
1090 original = %trimmed,
1091 truncated_to = %truncated,
1092 max_len = DERIVED_NAME_MAX_LEN,
1093 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1094 );
1095 (truncated, true, Some(trimmed))
1096 } else {
1097 (trimmed, false, None)
1098 }
1099}
1100
1101fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1114 if !taken.contains(base) {
1115 return Ok(base.to_string());
1116 }
1117 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1118 let candidate = format!("{base}-{suffix}");
1119 if !taken.contains(&candidate) {
1120 tracing::warn!(
1121 target: "ingest",
1122 base = %base,
1123 resolved = %candidate,
1124 suffix,
1125 "memory name collision resolved with numeric suffix"
1126 );
1127 return Ok(candidate);
1128 }
1129 }
1130 Err(AppError::Validation(format!(
1131 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1132 )))
1133}
1134
1135fn collapse_dashes(s: &str) -> String {
1136 let mut out = String::with_capacity(s.len());
1137 let mut prev_dash = false;
1138 for c in s.chars() {
1139 if c == '-' {
1140 if !prev_dash {
1141 out.push('-');
1142 }
1143 prev_dash = true;
1144 } else {
1145 out.push(c);
1146 prev_dash = false;
1147 }
1148 }
1149 out
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155 use std::path::PathBuf;
1156
1157 #[test]
1158 fn matches_pattern_suffix() {
1159 assert!(matches_pattern("foo.md", "*.md"));
1160 assert!(!matches_pattern("foo.txt", "*.md"));
1161 assert!(matches_pattern("foo.md", "*"));
1162 }
1163
1164 #[test]
1165 fn matches_pattern_prefix() {
1166 assert!(matches_pattern("README.md", "README*"));
1167 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1168 }
1169
1170 #[test]
1171 fn matches_pattern_exact() {
1172 assert!(matches_pattern("README.md", "README.md"));
1173 assert!(!matches_pattern("readme.md", "README.md"));
1174 }
1175
1176 #[test]
1177 fn derive_kebab_underscore_to_dash() {
1178 let p = PathBuf::from("/tmp/claude_code_headless.md");
1179 let (name, truncated, original) = derive_kebab_name(&p);
1180 assert_eq!(name, "claude-code-headless");
1181 assert!(!truncated);
1182 assert!(original.is_none());
1183 }
1184
1185 #[test]
1186 fn derive_kebab_uppercase_lowered() {
1187 let p = PathBuf::from("/tmp/README.md");
1188 let (name, truncated, original) = derive_kebab_name(&p);
1189 assert_eq!(name, "readme");
1190 assert!(!truncated);
1191 assert!(original.is_none());
1192 }
1193
1194 #[test]
1195 fn derive_kebab_strips_non_kebab_chars() {
1196 let p = PathBuf::from("/tmp/some@weird#name!.md");
1197 let (name, truncated, original) = derive_kebab_name(&p);
1198 assert_eq!(name, "someweirdname");
1199 assert!(!truncated);
1200 assert!(original.is_none());
1201 }
1202
1203 #[test]
1206 fn derive_kebab_folds_accented_letters_to_ascii() {
1207 let p = PathBuf::from("/tmp/açaí.md");
1208 let (name, _, _) = derive_kebab_name(&p);
1209 assert_eq!(name, "acai", "got '{name}'");
1210 }
1211
1212 #[test]
1213 fn derive_kebab_handles_naive_with_diaeresis() {
1214 let p = PathBuf::from("/tmp/naïve-test.md");
1215 let (name, _, _) = derive_kebab_name(&p);
1216 assert_eq!(name, "naive-test", "got '{name}'");
1217 }
1218
1219 #[test]
1220 fn derive_kebab_drops_emoji_keeps_word() {
1221 let p = PathBuf::from("/tmp/🚀-rocket.md");
1222 let (name, _, _) = derive_kebab_name(&p);
1223 assert_eq!(name, "rocket", "got '{name}'");
1224 }
1225
1226 #[test]
1227 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1228 let p = PathBuf::from("/tmp/açaí🦜.md");
1229 let (name, _, _) = derive_kebab_name(&p);
1230 assert_eq!(name, "acai", "got '{name}'");
1231 }
1232
1233 #[test]
1234 fn derive_kebab_pure_emoji_yields_empty() {
1235 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1236 let (name, _, _) = derive_kebab_name(&p);
1237 assert!(name.is_empty(), "got '{name}'");
1238 }
1239
1240 #[test]
1241 fn derive_kebab_collapses_consecutive_dashes() {
1242 let p = PathBuf::from("/tmp/a__b___c.md");
1243 let (name, truncated, original) = derive_kebab_name(&p);
1244 assert_eq!(name, "a-b-c");
1245 assert!(!truncated);
1246 assert!(original.is_none());
1247 }
1248
1249 #[test]
1250 fn derive_kebab_truncates_to_60_chars() {
1251 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1252 let (name, truncated, original) = derive_kebab_name(&p);
1253 assert!(name.len() <= 60, "got len {}", name.len());
1254 assert!(truncated);
1255 assert!(original.is_some());
1256 assert!(original.unwrap().len() > 60);
1257 }
1258
1259 #[test]
1260 fn collect_files_finds_md_files() {
1261 let tmp = tempfile::tempdir().expect("tempdir");
1262 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1263 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1264 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1265 let mut out = Vec::new();
1266 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1267 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1268 }
1269
1270 #[test]
1271 fn collect_files_recursive_descends_subdirs() {
1272 let tmp = tempfile::tempdir().expect("tempdir");
1273 let sub = tmp.path().join("sub");
1274 std::fs::create_dir(&sub).unwrap();
1275 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1276 std::fs::write(sub.join("b.md"), "y").unwrap();
1277 let mut out = Vec::new();
1278 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1279 assert_eq!(out.len(), 2);
1280 }
1281
1282 #[test]
1283 fn collect_files_non_recursive_skips_subdirs() {
1284 let tmp = tempfile::tempdir().expect("tempdir");
1285 let sub = tmp.path().join("sub");
1286 std::fs::create_dir(&sub).unwrap();
1287 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1288 std::fs::write(sub.join("b.md"), "y").unwrap();
1289 let mut out = Vec::new();
1290 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1291 assert_eq!(out.len(), 1);
1292 }
1293
1294 #[test]
1297 fn derive_kebab_long_basename_truncated_within_cap() {
1298 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1299 let (name, truncated, original) = derive_kebab_name(&p);
1300 assert!(
1301 name.len() <= DERIVED_NAME_MAX_LEN,
1302 "truncated name must respect cap; got {} chars",
1303 name.len()
1304 );
1305 assert!(!name.is_empty());
1306 assert!(truncated);
1307 assert!(original.is_some());
1308 }
1309
1310 #[test]
1311 fn unique_name_returns_base_when_free() {
1312 let taken: BTreeSet<String> = BTreeSet::new();
1313 let resolved = unique_name("note", &taken).expect("must resolve");
1314 assert_eq!(resolved, "note");
1315 }
1316
1317 #[test]
1318 fn unique_name_appends_first_free_suffix_on_collision() {
1319 let mut taken: BTreeSet<String> = BTreeSet::new();
1320 taken.insert("note".to_string());
1321 taken.insert("note-1".to_string());
1322 let resolved = unique_name("note", &taken).expect("must resolve");
1323 assert_eq!(resolved, "note-2");
1324 }
1325
1326 #[test]
1327 fn unique_name_errors_after_collision_cap() {
1328 let mut taken: BTreeSet<String> = BTreeSet::new();
1329 taken.insert("note".to_string());
1330 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1331 taken.insert(format!("note-{i}"));
1332 }
1333 let err = unique_name("note", &taken).expect_err("must surface error");
1334 assert!(matches!(err, AppError::Validation(_)));
1335 }
1336
1337 #[test]
1340 fn is_valid_relation_accepts_canonical_relations() {
1341 assert!(is_valid_relation("applies_to"));
1342 assert!(is_valid_relation("depends_on"));
1343 assert!(!is_valid_relation("foo_bar"));
1344 }
1345
1346 use serial_test::serial;
1349
1350 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1352 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1353 let prev = std::env::var(key).ok();
1354 match value {
1355 Some(v) => std::env::set_var(key, v),
1356 None => std::env::remove_var(key),
1357 }
1358 f();
1359 match prev {
1360 Some(p) => std::env::set_var(key, p),
1361 None => std::env::remove_var(key),
1362 }
1363 }
1364
1365 #[test]
1366 #[serial]
1367 fn env_low_memory_enabled_unset_returns_false() {
1368 with_env_var(None, || assert!(!env_low_memory_enabled()));
1369 }
1370
1371 #[test]
1372 #[serial]
1373 fn env_low_memory_enabled_empty_returns_false() {
1374 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1375 }
1376
1377 #[test]
1378 #[serial]
1379 fn env_low_memory_enabled_truthy_values_return_true() {
1380 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1381 with_env_var(Some(v), || {
1382 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1383 });
1384 }
1385 }
1386
1387 #[test]
1388 #[serial]
1389 fn env_low_memory_enabled_falsy_values_return_false() {
1390 for v in ["0", "false", "FALSE", "no", "off"] {
1391 with_env_var(Some(v), || {
1392 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1393 });
1394 }
1395 }
1396
1397 #[test]
1398 #[serial]
1399 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1400 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1401 }
1402
1403 #[test]
1404 #[serial]
1405 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1406 with_env_var(None, || {
1407 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1408 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1409 assert_eq!(resolve_parallelism(true, None), 1);
1410 });
1411 }
1412
1413 #[test]
1414 #[serial]
1415 fn resolve_parallelism_env_forces_one_when_flag_off() {
1416 with_env_var(Some("1"), || {
1417 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1418 assert_eq!(resolve_parallelism(false, None), 1);
1419 });
1420 }
1421
1422 #[test]
1423 #[serial]
1424 fn resolve_parallelism_falsy_env_does_not_override() {
1425 with_env_var(Some("0"), || {
1426 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1427 });
1428 }
1429
1430 #[test]
1431 #[serial]
1432 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1433 with_env_var(None, || {
1434 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1435 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1436 });
1437 }
1438
1439 #[test]
1440 #[serial]
1441 fn resolve_parallelism_default_when_unset() {
1442 with_env_var(None, || {
1443 let p = resolve_parallelism(false, None);
1444 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1445 });
1446 }
1447
1448 #[test]
1449 fn ingest_args_parses_low_memory_flag_via_clap() {
1450 use clap::Parser;
1451 let cli = crate::cli::Cli::try_parse_from([
1454 "sqlite-graphrag",
1455 "ingest",
1456 "/tmp/dummy",
1457 "--type",
1458 "document",
1459 "--low-memory",
1460 ])
1461 .expect("parse must succeed");
1462 match cli.command {
1463 crate::cli::Commands::Ingest(args) => {
1464 assert!(args.low_memory, "--low-memory must set field to true");
1465 }
1466 _ => panic!("expected Ingest subcommand"),
1467 }
1468 }
1469
1470 #[test]
1471 fn ingest_args_low_memory_defaults_false() {
1472 use clap::Parser;
1473 let cli = crate::cli::Cli::try_parse_from([
1474 "sqlite-graphrag",
1475 "ingest",
1476 "/tmp/dummy",
1477 "--type",
1478 "document",
1479 ])
1480 .expect("parse must succeed");
1481 match cli.command {
1482 crate::cli::Commands::Ingest(args) => {
1483 assert!(!args.low_memory, "default must be false");
1484 }
1485 _ => panic!("expected Ingest subcommand"),
1486 }
1487 }
1488}