1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(
95 long,
96 env = "SQLITE_GRAPHRAG_ENABLE_NER",
97 value_parser = crate::parsers::parse_bool_flexible,
98 action = clap::ArgAction::Set,
99 num_args = 0..=1,
100 default_missing_value = "true",
101 default_value = "false",
102 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
103 )]
104 pub enable_ner: bool,
105 #[arg(
106 long,
107 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
108 default_value = "fp32",
109 help = "GLiNER model variant: fp32 (best quality, 1.1GB), fp16 (580MB), int8 (349MB, fastest)"
110 )]
111 pub gliner_variant: String,
112
113 #[arg(long, default_value_t = false, hide = true)]
115 pub skip_extraction: bool,
116
117 #[arg(long, default_value_t = false)]
119 pub fail_fast: bool,
120
121 #[arg(long, default_value_t = 10_000)]
123 pub max_files: usize,
124
125 #[arg(long)]
127 pub namespace: Option<String>,
128
129 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
131 pub db: Option<String>,
132
133 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
134 pub format: JsonOutputFormat,
135
136 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
137 pub json: bool,
138
139 #[arg(
141 long,
142 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
143 )]
144 pub ingest_parallelism: Option<usize>,
145
146 #[arg(
154 long,
155 default_value_t = false,
156 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
157 Recommended for environments with <4 GB available RAM or container/cgroup \
158 constraints. Trade-off: 3-4x longer wall time. Also honored via \
159 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
160 )]
161 pub low_memory: bool,
162}
163
164fn env_low_memory_enabled() -> bool {
169 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
170 Ok(v) if v.is_empty() => false,
171 Ok(v) => match v.to_lowercase().as_str() {
172 "1" | "true" | "yes" | "on" => true,
173 "0" | "false" | "no" | "off" => false,
174 other => {
175 tracing::warn!(
176 target: "ingest",
177 value = %other,
178 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
179 );
180 false
181 }
182 },
183 Err(_) => false,
184 }
185}
186
187fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
199 let env_flag = env_low_memory_enabled();
200 let low_memory = low_memory_flag || env_flag;
201
202 if low_memory {
203 if let Some(n) = ingest_parallelism {
204 if n > 1 {
205 tracing::warn!(
206 target: "ingest",
207 requested = n,
208 "--ingest-parallelism overridden by --low-memory; using 1"
209 );
210 }
211 }
212 if low_memory_flag {
213 tracing::info!(
214 target: "ingest",
215 source = "flag",
216 "low-memory mode enabled: forcing --ingest-parallelism 1"
217 );
218 } else {
219 tracing::info!(
220 target: "ingest",
221 source = "env",
222 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
223 );
224 }
225 return 1;
226 }
227
228 ingest_parallelism
229 .unwrap_or_else(|| {
230 std::thread::available_parallelism()
231 .map(|v| v.get() / 2)
232 .unwrap_or(1)
233 .clamp(1, 4)
234 })
235 .max(1)
236}
237
238#[derive(Serialize)]
239struct IngestFileEvent<'a> {
240 file: &'a str,
241 name: &'a str,
242 status: &'a str,
243 truncated: bool,
245 #[serde(skip_serializing_if = "Option::is_none")]
247 original_name: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 error: Option<String>,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 memory_id: Option<i64>,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 action: Option<String>,
254}
255
256#[derive(Serialize)]
257struct IngestSummary {
258 summary: bool,
259 dir: String,
260 pattern: String,
261 recursive: bool,
262 files_total: usize,
263 files_succeeded: usize,
264 files_failed: usize,
265 files_skipped: usize,
266 elapsed_ms: u64,
267}
268
269struct FileSuccess {
271 memory_id: i64,
272 action: String,
273}
274
275#[derive(Serialize)]
278struct StageProgressEvent<'a> {
279 schema_version: u8,
280 event: &'a str,
281 path: &'a str,
282 ms: u64,
283 entities: usize,
284 relationships: usize,
285}
286
287struct StagedFile {
290 body: String,
291 body_hash: String,
292 snippet: String,
293 name: String,
294 description: String,
295 embedding: Vec<f32>,
296 chunk_embeddings: Option<Vec<Vec<f32>>>,
297 chunks_info: Vec<crate::chunking::Chunk>,
298 entities: Vec<NewEntity>,
299 relationships: Vec<NewRelationship>,
300 entity_embeddings: Vec<Vec<f32>>,
301 urls: Vec<crate::extraction::ExtractedUrl>,
302}
303
304fn stage_file(
307 _idx: usize,
308 path: &Path,
309 name: &str,
310 paths: &AppPaths,
311 enable_ner: bool,
312 gliner_variant: crate::extraction::GlinerVariant,
313) -> Result<StagedFile, AppError> {
314 use crate::constants::*;
315
316 if name.len() > MAX_MEMORY_NAME_LEN {
317 return Err(AppError::LimitExceeded(
318 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
319 ));
320 }
321 if name.starts_with("__") {
322 return Err(AppError::Validation(
323 crate::i18n::validation::reserved_name(),
324 ));
325 }
326 {
327 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
328 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
329 if !slug_re.is_match(name) {
330 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
331 name,
332 )));
333 }
334 }
335
336 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
337 if raw_body.len() > MAX_MEMORY_BODY_LEN {
338 return Err(AppError::LimitExceeded(
339 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
340 ));
341 }
342 if raw_body.trim().is_empty() {
343 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
344 }
345
346 let description = format!("ingested from {}", path.display());
347 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
348 return Err(AppError::Validation(
349 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
350 ));
351 }
352
353 let mut extracted_entities: Vec<NewEntity> = Vec::new();
354 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
355 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
356 if enable_ner {
357 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
358 Ok(extracted) => {
359 extracted_urls = extracted.urls;
360 extracted_entities = extracted.entities;
361 extracted_relationships = extracted.relationships;
362
363 if extracted_entities.len() > max_entities_per_memory() {
364 extracted_entities.truncate(max_entities_per_memory());
365 }
366 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
367 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
368 }
369 }
370 Err(e) => {
371 tracing::warn!(
372 file = %path.display(),
373 "auto-extraction failed (graceful degradation): {e:#}"
374 );
375 }
376 }
377 }
378
379 for rel in &mut extracted_relationships {
380 rel.relation = rel.relation.replace('-', "_");
381 if !is_valid_relation(&rel.relation) {
382 return Err(AppError::Validation(format!(
383 "invalid relation '{}' for relationship '{}' -> '{}'",
384 rel.relation, rel.source, rel.target
385 )));
386 }
387 if !(0.0..=1.0).contains(&rel.strength) {
388 return Err(AppError::Validation(format!(
389 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
390 rel.strength, rel.source, rel.target
391 )));
392 }
393 }
394
395 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
396 let snippet: String = raw_body.chars().take(200).collect();
397
398 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
399 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
400 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
401 return Err(AppError::LimitExceeded(format!(
402 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
403 chunks_info.len(),
404 REMEMBER_MAX_SAFE_MULTI_CHUNKS
405 )));
406 }
407
408 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
409 let embedding = if chunks_info.len() == 1 {
410 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
411 } else {
412 let chunk_texts: Vec<&str> = chunks_info
413 .iter()
414 .map(|c| chunking::chunk_text(&raw_body, c))
415 .collect();
416 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
417 for chunk_text in &chunk_texts {
418 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
419 &paths.models,
420 chunk_text,
421 )?);
422 }
423 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
424 chunk_embeddings_opt = Some(chunk_embeddings);
425 aggregated
426 };
427
428 let entity_embeddings = extracted_entities
429 .iter()
430 .map(|entity| {
431 let entity_text = match &entity.description {
432 Some(desc) => format!("{} {}", entity.name, desc),
433 None => entity.name.clone(),
434 };
435 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
436 })
437 .collect::<Result<Vec<_>, _>>()?;
438
439 Ok(StagedFile {
440 body: raw_body,
441 body_hash,
442 snippet,
443 name: name.to_string(),
444 description,
445 embedding,
446 chunk_embeddings: chunk_embeddings_opt,
447 chunks_info,
448 entities: extracted_entities,
449 relationships: extracted_relationships,
450 entity_embeddings,
451 urls: extracted_urls,
452 })
453}
454
455fn persist_staged(
457 conn: &mut Connection,
458 namespace: &str,
459 memory_type: &str,
460 staged: StagedFile,
461) -> Result<FileSuccess, AppError> {
462 {
463 let active_count: u32 = conn.query_row(
464 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
465 [],
466 |r| r.get::<_, i64>(0).map(|v| v as u32),
467 )?;
468 let ns_exists: bool = conn.query_row(
469 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
470 rusqlite::params![namespace],
471 |r| r.get::<_, i64>(0).map(|v| v > 0),
472 )?;
473 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
474 return Err(AppError::NamespaceError(format!(
475 "active namespace limit of {} exceeded while creating '{namespace}'",
476 crate::constants::MAX_NAMESPACES_ACTIVE
477 )));
478 }
479 }
480
481 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
482 if existing_memory.is_some() {
483 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
484 &staged.name,
485 namespace,
486 )));
487 }
488 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
489
490 let new_memory = NewMemory {
491 namespace: namespace.to_string(),
492 name: staged.name.clone(),
493 memory_type: memory_type.to_string(),
494 description: staged.description.clone(),
495 body: staged.body,
496 body_hash: staged.body_hash,
497 session_id: None,
498 source: "agent".to_string(),
499 metadata: serde_json::json!({}),
500 };
501
502 if let Some(hash_id) = duplicate_hash_id {
503 tracing::debug!(
504 target: "ingest",
505 duplicate_memory_id = hash_id,
506 "identical body already exists; persisting a new memory anyway"
507 );
508 }
509
510 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
511
512 let memory_id = memories::insert(&tx, &new_memory)?;
513 versions::insert_version(
514 &tx,
515 memory_id,
516 1,
517 &staged.name,
518 memory_type,
519 &staged.description,
520 &new_memory.body,
521 &serde_json::to_string(&new_memory.metadata)?,
522 None,
523 "create",
524 )?;
525 memories::upsert_vec(
526 &tx,
527 memory_id,
528 namespace,
529 memory_type,
530 &staged.embedding,
531 &staged.name,
532 &staged.snippet,
533 )?;
534
535 if staged.chunks_info.len() > 1 {
536 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
537 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
538 AppError::Internal(anyhow::anyhow!(
539 "missing chunk embeddings cache on multi-chunk ingest path"
540 ))
541 })?;
542 for (i, emb) in chunk_embeddings.iter().enumerate() {
543 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
544 }
545 }
546
547 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
548 for (idx, entity) in staged.entities.iter().enumerate() {
549 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
550 let entity_embedding = &staged.entity_embeddings[idx];
551 entities::upsert_entity_vec(
552 &tx,
553 entity_id,
554 namespace,
555 entity.entity_type,
556 entity_embedding,
557 &entity.name,
558 )?;
559 entities::link_memory_entity(&tx, memory_id, entity_id)?;
560 entities::increment_degree(&tx, entity_id)?;
561 }
562 let entity_types: std::collections::HashMap<&str, EntityType> = staged
563 .entities
564 .iter()
565 .map(|entity| (entity.name.as_str(), entity.entity_type))
566 .collect();
567 for rel in &staged.relationships {
568 let source_entity = NewEntity {
569 name: rel.source.clone(),
570 entity_type: entity_types
571 .get(rel.source.as_str())
572 .copied()
573 .unwrap_or(EntityType::Concept),
574 description: None,
575 };
576 let target_entity = NewEntity {
577 name: rel.target.clone(),
578 entity_type: entity_types
579 .get(rel.target.as_str())
580 .copied()
581 .unwrap_or(EntityType::Concept),
582 description: None,
583 };
584 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
585 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
586 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
587 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
588 }
589 }
590
591 tx.commit()?;
592
593 if !staged.urls.is_empty() {
594 let url_entries: Vec<storage_urls::MemoryUrl> = staged
595 .urls
596 .into_iter()
597 .map(|u| storage_urls::MemoryUrl {
598 url: u.url,
599 offset: Some(u.offset as i64),
600 })
601 .collect();
602 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
603 }
604
605 Ok(FileSuccess {
606 memory_id,
607 action: "created".to_string(),
608 })
609}
610
611pub fn run(args: IngestArgs) -> Result<(), AppError> {
612 let started = std::time::Instant::now();
613
614 if !args.dir.exists() {
615 return Err(AppError::Io(std::io::Error::new(
616 std::io::ErrorKind::NotFound,
617 format!("directory not found: {}", args.dir.display()),
618 )));
619 }
620 if !args.dir.is_dir() {
621 return Err(AppError::Validation(format!(
622 "path is not a directory: {}",
623 args.dir.display()
624 )));
625 }
626
627 let mut files: Vec<PathBuf> = Vec::new();
628 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
629 files.sort();
630
631 if files.len() > args.max_files {
632 return Err(AppError::Validation(format!(
633 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
634 files.len(),
635 args.max_files
636 )));
637 }
638
639 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
640 let memory_type_str = args.r#type.as_str().to_string();
641
642 let paths = AppPaths::resolve(args.db.as_deref())?;
643 let mut conn_or_err = match init_storage(&paths) {
644 Ok(c) => Ok(c),
645 Err(e) => Err(format!("{e}")),
646 };
647
648 let mut succeeded: usize = 0;
649 let mut failed: usize = 0;
650 let mut skipped: usize = 0;
651 let total = files.len();
652
653 let mut taken_names: BTreeSet<String> = BTreeSet::new();
656
657 enum SlotMeta {
663 Skip {
664 file_str: String,
665 derived_base: String,
666 name_truncated: bool,
667 original_name: Option<String>,
668 reason: String,
669 },
670 Process {
671 file_str: String,
672 derived_name: String,
673 name_truncated: bool,
674 original_name: Option<String>,
675 },
676 }
677
678 struct ProcessItem {
679 idx: usize,
680 path: PathBuf,
681 file_str: String,
682 derived_name: String,
683 }
684
685 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
686 let mut process_items: Vec<ProcessItem> = Vec::new();
687 let mut truncations: Vec<(String, String)> = Vec::new();
688
689 for path in &files {
690 let file_str = path.to_string_lossy().into_owned();
691 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
692
693 if name_truncated {
694 if let Some(ref orig) = original_name {
695 truncations.push((orig.clone(), derived_base.clone()));
696 }
697 }
698
699 if derived_base.is_empty() {
700 slots_meta.push(SlotMeta::Skip {
701 file_str,
702 derived_base: String::new(),
703 name_truncated: false,
704 original_name: None,
705 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
706 });
707 continue;
708 }
709
710 match unique_name(&derived_base, &taken_names) {
711 Ok(derived_name) => {
712 taken_names.insert(derived_name.clone());
713 let idx = slots_meta.len();
714 process_items.push(ProcessItem {
715 idx,
716 path: path.clone(),
717 file_str: file_str.clone(),
718 derived_name: derived_name.clone(),
719 });
720 slots_meta.push(SlotMeta::Process {
721 file_str,
722 derived_name,
723 name_truncated,
724 original_name,
725 });
726 }
727 Err(e) => {
728 slots_meta.push(SlotMeta::Skip {
729 file_str,
730 derived_base,
731 name_truncated,
732 original_name,
733 reason: e.to_string(),
734 });
735 }
736 }
737 }
738
739 if !truncations.is_empty() {
740 tracing::info!(
741 target: "ingest",
742 count = truncations.len(),
743 max_len = DERIVED_NAME_MAX_LEN,
744 "derived names truncated; pass -vv (debug) for per-file detail"
745 );
746 }
747
748 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
751
752 let pool = rayon::ThreadPoolBuilder::new()
753 .num_threads(parallelism)
754 .build()
755 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
756
757 if args.enable_ner && args.skip_extraction {
758 tracing::warn!(
759 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
760 );
761 }
762 if args.skip_extraction && !args.enable_ner {
763 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
764 }
765 let enable_ner = args.enable_ner;
766 let gliner_variant: crate::extraction::GlinerVariant =
767 args.gliner_variant.parse().unwrap_or_else(|e| {
768 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
769 crate::extraction::GlinerVariant::Fp32
770 });
771
772 let total_to_process = process_items.len();
773 tracing::info!(
774 target = "ingest",
775 phase = "pipeline_start",
776 files = total_to_process,
777 ingest_parallelism = parallelism,
778 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
779 );
780
781 let channel_bound = (parallelism * 2).max(1);
785 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
786
787 let paths_owned = paths.clone();
792 let producer_handle = std::thread::spawn(move || {
793 pool.install(|| {
794 process_items.into_par_iter().for_each(|item| {
795 let t0 = std::time::Instant::now();
796 let result = stage_file(
797 item.idx,
798 &item.path,
799 &item.derived_name,
800 &paths_owned,
801 enable_ner,
802 gliner_variant,
803 );
804 let elapsed_ms = t0.elapsed().as_millis() as u64;
805
806 let (n_entities, n_relationships) = match &result {
809 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
810 Err(_) => (0, 0),
811 };
812 let progress = StageProgressEvent {
813 schema_version: 1,
814 event: "file_extracted",
815 path: &item.file_str,
816 ms: elapsed_ms,
817 entities: n_entities,
818 relationships: n_relationships,
819 };
820 if let Ok(line) = serde_json::to_string(&progress) {
821 eprintln!("{line}");
822 }
823
824 let _ = tx.send((item.idx, result));
828 });
829 drop(tx);
831 });
832 });
833
834 let fail_fast = args.fail_fast;
846
847 for meta in &slots_meta {
849 if let SlotMeta::Skip {
850 file_str,
851 derived_base,
852 name_truncated,
853 original_name,
854 reason,
855 } = meta
856 {
857 output::emit_json_compact(&IngestFileEvent {
858 file: file_str,
859 name: derived_base,
860 status: "skipped",
861 truncated: *name_truncated,
862 original_name: original_name.clone(),
863 error: Some(reason.clone()),
864 memory_id: None,
865 action: None,
866 })?;
867 skipped += 1;
868 }
869 }
870
871 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
874 .iter()
875 .enumerate()
876 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
877 .collect();
878
879 tracing::info!(
880 target = "ingest",
881 phase = "persist_start",
882 files = total_to_process,
883 "phase B starting: persisting files incrementally as Phase A completes each one",
884 );
885
886 for (idx, stage_result) in rx {
890 let meta = meta_index.get(&idx).ok_or_else(|| {
891 AppError::Internal(anyhow::anyhow!(
892 "channel idx {idx} has no corresponding Process slot"
893 ))
894 })?;
895 let (file_str, derived_name, name_truncated, original_name) = match meta {
896 SlotMeta::Process {
897 file_str,
898 derived_name,
899 name_truncated,
900 original_name,
901 } => (file_str, derived_name, name_truncated, original_name),
902 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
903 };
904
905 let conn = match conn_or_err.as_mut() {
907 Ok(c) => c,
908 Err(err_msg) => {
909 let err_clone = err_msg.clone();
910 output::emit_json_compact(&IngestFileEvent {
911 file: file_str,
912 name: derived_name,
913 status: "failed",
914 truncated: *name_truncated,
915 original_name: original_name.clone(),
916 error: Some(err_clone.clone()),
917 memory_id: None,
918 action: None,
919 })?;
920 failed += 1;
921 if fail_fast {
922 output::emit_json_compact(&IngestSummary {
923 summary: true,
924 dir: args.dir.display().to_string(),
925 pattern: args.pattern.clone(),
926 recursive: args.recursive,
927 files_total: total,
928 files_succeeded: succeeded,
929 files_failed: failed,
930 files_skipped: skipped,
931 elapsed_ms: started.elapsed().as_millis() as u64,
932 })?;
933 return Err(AppError::Validation(format!(
934 "ingest aborted on first failure: {err_clone}"
935 )));
936 }
937 continue;
938 }
939 };
940
941 let outcome =
942 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
943
944 match outcome {
945 Ok(FileSuccess { memory_id, action }) => {
946 output::emit_json_compact(&IngestFileEvent {
947 file: file_str,
948 name: derived_name,
949 status: "indexed",
950 truncated: *name_truncated,
951 original_name: original_name.clone(),
952 error: None,
953 memory_id: Some(memory_id),
954 action: Some(action),
955 })?;
956 succeeded += 1;
957 }
958 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
959 output::emit_json_compact(&IngestFileEvent {
960 file: file_str,
961 name: derived_name,
962 status: "skipped",
963 truncated: *name_truncated,
964 original_name: original_name.clone(),
965 error: Some(format!("{e}")),
966 memory_id: None,
967 action: Some("duplicate".to_string()),
968 })?;
969 skipped += 1;
970 }
971 Err(e) => {
972 let err_msg = format!("{e}");
973 output::emit_json_compact(&IngestFileEvent {
974 file: file_str,
975 name: derived_name,
976 status: "failed",
977 truncated: *name_truncated,
978 original_name: original_name.clone(),
979 error: Some(err_msg.clone()),
980 memory_id: None,
981 action: None,
982 })?;
983 failed += 1;
984 if fail_fast {
985 output::emit_json_compact(&IngestSummary {
986 summary: true,
987 dir: args.dir.display().to_string(),
988 pattern: args.pattern.clone(),
989 recursive: args.recursive,
990 files_total: total,
991 files_succeeded: succeeded,
992 files_failed: failed,
993 files_skipped: skipped,
994 elapsed_ms: started.elapsed().as_millis() as u64,
995 })?;
996 return Err(AppError::Validation(format!(
997 "ingest aborted on first failure: {err_msg}"
998 )));
999 }
1000 }
1001 }
1002 }
1003
1004 producer_handle
1006 .join()
1007 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1008
1009 output::emit_json_compact(&IngestSummary {
1010 summary: true,
1011 dir: args.dir.display().to_string(),
1012 pattern: args.pattern.clone(),
1013 recursive: args.recursive,
1014 files_total: total,
1015 files_succeeded: succeeded,
1016 files_failed: failed,
1017 files_skipped: skipped,
1018 elapsed_ms: started.elapsed().as_millis() as u64,
1019 })?;
1020
1021 Ok(())
1022}
1023
1024fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1030 ensure_db_ready(paths)?;
1031 let conn = open_rw(&paths.db)?;
1032 Ok(conn)
1033}
1034
1035fn is_valid_relation(relation: &str) -> bool {
1036 matches!(
1037 relation,
1038 "applies_to"
1039 | "uses"
1040 | "depends_on"
1041 | "causes"
1042 | "fixes"
1043 | "contradicts"
1044 | "supports"
1045 | "follows"
1046 | "related"
1047 | "mentions"
1048 | "replaces"
1049 | "tracked_in"
1050 )
1051}
1052
1053fn collect_files(
1054 dir: &Path,
1055 pattern: &str,
1056 recursive: bool,
1057 out: &mut Vec<PathBuf>,
1058) -> Result<(), AppError> {
1059 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1060 for entry in entries {
1061 let entry = entry.map_err(AppError::Io)?;
1062 let path = entry.path();
1063 let file_type = entry.file_type().map_err(AppError::Io)?;
1064 if file_type.is_file() {
1065 let name = entry.file_name();
1066 let name_str = name.to_string_lossy();
1067 if matches_pattern(&name_str, pattern) {
1068 out.push(path);
1069 }
1070 } else if file_type.is_dir() && recursive {
1071 collect_files(&path, pattern, recursive, out)?;
1072 }
1073 }
1074 Ok(())
1075}
1076
1077fn matches_pattern(name: &str, pattern: &str) -> bool {
1078 if let Some(suffix) = pattern.strip_prefix('*') {
1079 name.ends_with(suffix)
1080 } else if let Some(prefix) = pattern.strip_suffix('*') {
1081 name.starts_with(prefix)
1082 } else {
1083 name == pattern
1084 }
1085}
1086
1087fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1098 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1099 let lowered: String = stem
1100 .nfd()
1101 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1102 .map(|c| {
1103 if c == '_' || c.is_whitespace() {
1104 '-'
1105 } else {
1106 c
1107 }
1108 })
1109 .map(|c| c.to_ascii_lowercase())
1110 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1111 .collect();
1112 let collapsed = collapse_dashes(&lowered);
1113 let trimmed = collapsed.trim_matches('-').to_string();
1114 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1115 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1116 .trim_matches('-')
1117 .to_string();
1118 tracing::debug!(
1119 target: "ingest",
1120 original = %trimmed,
1121 truncated_to = %truncated,
1122 max_len = DERIVED_NAME_MAX_LEN,
1123 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1124 );
1125 (truncated, true, Some(trimmed))
1126 } else {
1127 (trimmed, false, None)
1128 }
1129}
1130
1131fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1144 if !taken.contains(base) {
1145 return Ok(base.to_string());
1146 }
1147 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1148 let candidate = format!("{base}-{suffix}");
1149 if !taken.contains(&candidate) {
1150 tracing::warn!(
1151 target: "ingest",
1152 base = %base,
1153 resolved = %candidate,
1154 suffix,
1155 "memory name collision resolved with numeric suffix"
1156 );
1157 return Ok(candidate);
1158 }
1159 }
1160 Err(AppError::Validation(format!(
1161 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1162 )))
1163}
1164
1165fn collapse_dashes(s: &str) -> String {
1166 let mut out = String::with_capacity(s.len());
1167 let mut prev_dash = false;
1168 for c in s.chars() {
1169 if c == '-' {
1170 if !prev_dash {
1171 out.push('-');
1172 }
1173 prev_dash = true;
1174 } else {
1175 out.push(c);
1176 prev_dash = false;
1177 }
1178 }
1179 out
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184 use super::*;
1185 use std::path::PathBuf;
1186
1187 #[test]
1188 fn matches_pattern_suffix() {
1189 assert!(matches_pattern("foo.md", "*.md"));
1190 assert!(!matches_pattern("foo.txt", "*.md"));
1191 assert!(matches_pattern("foo.md", "*"));
1192 }
1193
1194 #[test]
1195 fn matches_pattern_prefix() {
1196 assert!(matches_pattern("README.md", "README*"));
1197 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1198 }
1199
1200 #[test]
1201 fn matches_pattern_exact() {
1202 assert!(matches_pattern("README.md", "README.md"));
1203 assert!(!matches_pattern("readme.md", "README.md"));
1204 }
1205
1206 #[test]
1207 fn derive_kebab_underscore_to_dash() {
1208 let p = PathBuf::from("/tmp/claude_code_headless.md");
1209 let (name, truncated, original) = derive_kebab_name(&p);
1210 assert_eq!(name, "claude-code-headless");
1211 assert!(!truncated);
1212 assert!(original.is_none());
1213 }
1214
1215 #[test]
1216 fn derive_kebab_uppercase_lowered() {
1217 let p = PathBuf::from("/tmp/README.md");
1218 let (name, truncated, original) = derive_kebab_name(&p);
1219 assert_eq!(name, "readme");
1220 assert!(!truncated);
1221 assert!(original.is_none());
1222 }
1223
1224 #[test]
1225 fn derive_kebab_strips_non_kebab_chars() {
1226 let p = PathBuf::from("/tmp/some@weird#name!.md");
1227 let (name, truncated, original) = derive_kebab_name(&p);
1228 assert_eq!(name, "someweirdname");
1229 assert!(!truncated);
1230 assert!(original.is_none());
1231 }
1232
1233 #[test]
1236 fn derive_kebab_folds_accented_letters_to_ascii() {
1237 let p = PathBuf::from("/tmp/açaí.md");
1238 let (name, _, _) = derive_kebab_name(&p);
1239 assert_eq!(name, "acai", "got '{name}'");
1240 }
1241
1242 #[test]
1243 fn derive_kebab_handles_naive_with_diaeresis() {
1244 let p = PathBuf::from("/tmp/naïve-test.md");
1245 let (name, _, _) = derive_kebab_name(&p);
1246 assert_eq!(name, "naive-test", "got '{name}'");
1247 }
1248
1249 #[test]
1250 fn derive_kebab_drops_emoji_keeps_word() {
1251 let p = PathBuf::from("/tmp/🚀-rocket.md");
1252 let (name, _, _) = derive_kebab_name(&p);
1253 assert_eq!(name, "rocket", "got '{name}'");
1254 }
1255
1256 #[test]
1257 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1258 let p = PathBuf::from("/tmp/açaí🦜.md");
1259 let (name, _, _) = derive_kebab_name(&p);
1260 assert_eq!(name, "acai", "got '{name}'");
1261 }
1262
1263 #[test]
1264 fn derive_kebab_pure_emoji_yields_empty() {
1265 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1266 let (name, _, _) = derive_kebab_name(&p);
1267 assert!(name.is_empty(), "got '{name}'");
1268 }
1269
1270 #[test]
1271 fn derive_kebab_collapses_consecutive_dashes() {
1272 let p = PathBuf::from("/tmp/a__b___c.md");
1273 let (name, truncated, original) = derive_kebab_name(&p);
1274 assert_eq!(name, "a-b-c");
1275 assert!(!truncated);
1276 assert!(original.is_none());
1277 }
1278
1279 #[test]
1280 fn derive_kebab_truncates_to_60_chars() {
1281 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1282 let (name, truncated, original) = derive_kebab_name(&p);
1283 assert!(name.len() <= 60, "got len {}", name.len());
1284 assert!(truncated);
1285 assert!(original.is_some());
1286 assert!(original.unwrap().len() > 60);
1287 }
1288
1289 #[test]
1290 fn collect_files_finds_md_files() {
1291 let tmp = tempfile::tempdir().expect("tempdir");
1292 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1293 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1294 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1295 let mut out = Vec::new();
1296 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1297 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1298 }
1299
1300 #[test]
1301 fn collect_files_recursive_descends_subdirs() {
1302 let tmp = tempfile::tempdir().expect("tempdir");
1303 let sub = tmp.path().join("sub");
1304 std::fs::create_dir(&sub).unwrap();
1305 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1306 std::fs::write(sub.join("b.md"), "y").unwrap();
1307 let mut out = Vec::new();
1308 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1309 assert_eq!(out.len(), 2);
1310 }
1311
1312 #[test]
1313 fn collect_files_non_recursive_skips_subdirs() {
1314 let tmp = tempfile::tempdir().expect("tempdir");
1315 let sub = tmp.path().join("sub");
1316 std::fs::create_dir(&sub).unwrap();
1317 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1318 std::fs::write(sub.join("b.md"), "y").unwrap();
1319 let mut out = Vec::new();
1320 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1321 assert_eq!(out.len(), 1);
1322 }
1323
1324 #[test]
1327 fn derive_kebab_long_basename_truncated_within_cap() {
1328 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1329 let (name, truncated, original) = derive_kebab_name(&p);
1330 assert!(
1331 name.len() <= DERIVED_NAME_MAX_LEN,
1332 "truncated name must respect cap; got {} chars",
1333 name.len()
1334 );
1335 assert!(!name.is_empty());
1336 assert!(truncated);
1337 assert!(original.is_some());
1338 }
1339
1340 #[test]
1341 fn unique_name_returns_base_when_free() {
1342 let taken: BTreeSet<String> = BTreeSet::new();
1343 let resolved = unique_name("note", &taken).expect("must resolve");
1344 assert_eq!(resolved, "note");
1345 }
1346
1347 #[test]
1348 fn unique_name_appends_first_free_suffix_on_collision() {
1349 let mut taken: BTreeSet<String> = BTreeSet::new();
1350 taken.insert("note".to_string());
1351 taken.insert("note-1".to_string());
1352 let resolved = unique_name("note", &taken).expect("must resolve");
1353 assert_eq!(resolved, "note-2");
1354 }
1355
1356 #[test]
1357 fn unique_name_errors_after_collision_cap() {
1358 let mut taken: BTreeSet<String> = BTreeSet::new();
1359 taken.insert("note".to_string());
1360 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1361 taken.insert(format!("note-{i}"));
1362 }
1363 let err = unique_name("note", &taken).expect_err("must surface error");
1364 assert!(matches!(err, AppError::Validation(_)));
1365 }
1366
1367 #[test]
1370 fn is_valid_relation_accepts_canonical_relations() {
1371 assert!(is_valid_relation("applies_to"));
1372 assert!(is_valid_relation("depends_on"));
1373 assert!(!is_valid_relation("foo_bar"));
1374 }
1375
1376 use serial_test::serial;
1379
1380 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1382 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1383 let prev = std::env::var(key).ok();
1384 match value {
1385 Some(v) => std::env::set_var(key, v),
1386 None => std::env::remove_var(key),
1387 }
1388 f();
1389 match prev {
1390 Some(p) => std::env::set_var(key, p),
1391 None => std::env::remove_var(key),
1392 }
1393 }
1394
1395 #[test]
1396 #[serial]
1397 fn env_low_memory_enabled_unset_returns_false() {
1398 with_env_var(None, || assert!(!env_low_memory_enabled()));
1399 }
1400
1401 #[test]
1402 #[serial]
1403 fn env_low_memory_enabled_empty_returns_false() {
1404 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1405 }
1406
1407 #[test]
1408 #[serial]
1409 fn env_low_memory_enabled_truthy_values_return_true() {
1410 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1411 with_env_var(Some(v), || {
1412 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1413 });
1414 }
1415 }
1416
1417 #[test]
1418 #[serial]
1419 fn env_low_memory_enabled_falsy_values_return_false() {
1420 for v in ["0", "false", "FALSE", "no", "off"] {
1421 with_env_var(Some(v), || {
1422 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1423 });
1424 }
1425 }
1426
1427 #[test]
1428 #[serial]
1429 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1430 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1431 }
1432
1433 #[test]
1434 #[serial]
1435 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1436 with_env_var(None, || {
1437 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1438 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1439 assert_eq!(resolve_parallelism(true, None), 1);
1440 });
1441 }
1442
1443 #[test]
1444 #[serial]
1445 fn resolve_parallelism_env_forces_one_when_flag_off() {
1446 with_env_var(Some("1"), || {
1447 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1448 assert_eq!(resolve_parallelism(false, None), 1);
1449 });
1450 }
1451
1452 #[test]
1453 #[serial]
1454 fn resolve_parallelism_falsy_env_does_not_override() {
1455 with_env_var(Some("0"), || {
1456 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1457 });
1458 }
1459
1460 #[test]
1461 #[serial]
1462 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1463 with_env_var(None, || {
1464 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1465 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1466 });
1467 }
1468
1469 #[test]
1470 #[serial]
1471 fn resolve_parallelism_default_when_unset() {
1472 with_env_var(None, || {
1473 let p = resolve_parallelism(false, None);
1474 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1475 });
1476 }
1477
1478 #[test]
1479 fn ingest_args_parses_low_memory_flag_via_clap() {
1480 use clap::Parser;
1481 let cli = crate::cli::Cli::try_parse_from([
1484 "sqlite-graphrag",
1485 "ingest",
1486 "/tmp/dummy",
1487 "--type",
1488 "document",
1489 "--low-memory",
1490 ])
1491 .expect("parse must succeed");
1492 match cli.command {
1493 crate::cli::Commands::Ingest(args) => {
1494 assert!(args.low_memory, "--low-memory must set field to true");
1495 }
1496 _ => panic!("expected Ingest subcommand"),
1497 }
1498 }
1499
1500 #[test]
1501 fn ingest_args_low_memory_defaults_false() {
1502 use clap::Parser;
1503 let cli = crate::cli::Cli::try_parse_from([
1504 "sqlite-graphrag",
1505 "ingest",
1506 "/tmp/dummy",
1507 "--type",
1508 "document",
1509 ])
1510 .expect("parse must succeed");
1511 match cli.command {
1512 crate::cli::Commands::Ingest(args) => {
1513 assert!(!args.low_memory, "default must be false");
1514 }
1515 _ => panic!("expected Ingest subcommand"),
1516 }
1517 }
1518}