1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(
95 long,
96 env = "SQLITE_GRAPHRAG_ENABLE_NER",
97 value_parser = crate::parsers::parse_bool_flexible,
98 action = clap::ArgAction::Set,
99 num_args = 0..=1,
100 default_missing_value = "true",
101 default_value = "false",
102 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
103 )]
104 pub enable_ner: bool,
105 #[arg(
106 long,
107 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
108 default_value = "fp32",
109 help = "GLiNER model variant: fp32 (best quality, 1.1GB), fp16 (580MB), int8 (349MB, fastest)"
110 )]
111 pub gliner_variant: String,
112
113 #[arg(long, default_value_t = false, hide = true)]
115 pub skip_extraction: bool,
116
117 #[arg(long, default_value_t = false)]
119 pub fail_fast: bool,
120
121 #[arg(long, default_value_t = 10_000)]
123 pub max_files: usize,
124
125 #[arg(long)]
127 pub namespace: Option<String>,
128
129 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
131 pub db: Option<String>,
132
133 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
134 pub format: JsonOutputFormat,
135
136 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
137 pub json: bool,
138
139 #[arg(
141 long,
142 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
143 )]
144 pub ingest_parallelism: Option<usize>,
145
146 #[arg(
154 long,
155 default_value_t = false,
156 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
157 Recommended for environments with <4 GB available RAM or container/cgroup \
158 constraints. Trade-off: 3-4x longer wall time. Also honored via \
159 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
160 )]
161 pub low_memory: bool,
162}
163
164fn env_low_memory_enabled() -> bool {
169 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
170 Ok(v) if v.is_empty() => false,
171 Ok(v) => match v.to_lowercase().as_str() {
172 "1" | "true" | "yes" | "on" => true,
173 "0" | "false" | "no" | "off" => false,
174 other => {
175 tracing::warn!(
176 target: "ingest",
177 value = %other,
178 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
179 );
180 false
181 }
182 },
183 Err(_) => false,
184 }
185}
186
187fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
199 let env_flag = env_low_memory_enabled();
200 let low_memory = low_memory_flag || env_flag;
201
202 if low_memory {
203 if let Some(n) = ingest_parallelism {
204 if n > 1 {
205 tracing::warn!(
206 target: "ingest",
207 requested = n,
208 "--ingest-parallelism overridden by --low-memory; using 1"
209 );
210 }
211 }
212 if low_memory_flag {
213 tracing::info!(
214 target: "ingest",
215 source = "flag",
216 "low-memory mode enabled: forcing --ingest-parallelism 1"
217 );
218 } else {
219 tracing::info!(
220 target: "ingest",
221 source = "env",
222 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
223 );
224 }
225 return 1;
226 }
227
228 ingest_parallelism
229 .unwrap_or_else(|| {
230 std::thread::available_parallelism()
231 .map(|v| v.get() / 2)
232 .unwrap_or(1)
233 .clamp(1, 4)
234 })
235 .max(1)
236}
237
238#[derive(Serialize)]
239struct IngestFileEvent<'a> {
240 file: &'a str,
241 name: &'a str,
242 status: &'a str,
243 truncated: bool,
245 #[serde(skip_serializing_if = "Option::is_none")]
247 original_name: Option<String>,
248 #[serde(skip_serializing_if = "Option::is_none")]
249 error: Option<String>,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 memory_id: Option<i64>,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 action: Option<String>,
254}
255
256#[derive(Serialize)]
257struct IngestSummary {
258 summary: bool,
259 dir: String,
260 pattern: String,
261 recursive: bool,
262 files_total: usize,
263 files_succeeded: usize,
264 files_failed: usize,
265 files_skipped: usize,
266 elapsed_ms: u64,
267}
268
269struct FileSuccess {
271 memory_id: i64,
272 action: String,
273}
274
275#[derive(Serialize)]
278struct StageProgressEvent<'a> {
279 schema_version: u8,
280 event: &'a str,
281 path: &'a str,
282 ms: u64,
283 entities: usize,
284 relationships: usize,
285}
286
287struct StagedFile {
290 body: String,
291 body_hash: String,
292 snippet: String,
293 name: String,
294 description: String,
295 embedding: Vec<f32>,
296 chunk_embeddings: Option<Vec<Vec<f32>>>,
297 chunks_info: Vec<crate::chunking::Chunk>,
298 entities: Vec<NewEntity>,
299 relationships: Vec<NewRelationship>,
300 entity_embeddings: Vec<Vec<f32>>,
301 urls: Vec<crate::extraction::ExtractedUrl>,
302}
303
304fn stage_file(
307 _idx: usize,
308 path: &Path,
309 name: &str,
310 paths: &AppPaths,
311 enable_ner: bool,
312 gliner_variant: crate::extraction::GlinerVariant,
313) -> Result<StagedFile, AppError> {
314 use crate::constants::*;
315
316 if name.len() > MAX_MEMORY_NAME_LEN {
317 return Err(AppError::LimitExceeded(
318 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
319 ));
320 }
321 if name.starts_with("__") {
322 return Err(AppError::Validation(
323 crate::i18n::validation::reserved_name(),
324 ));
325 }
326 {
327 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
328 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
329 if !slug_re.is_match(name) {
330 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
331 name,
332 )));
333 }
334 }
335
336 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
337 if raw_body.len() > MAX_MEMORY_BODY_LEN {
338 return Err(AppError::LimitExceeded(
339 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
340 ));
341 }
342 if raw_body.trim().is_empty() {
343 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
344 }
345
346 let description = format!("ingested from {}", path.display());
347 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
348 return Err(AppError::Validation(
349 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
350 ));
351 }
352
353 let mut extracted_entities: Vec<NewEntity> = Vec::new();
354 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
355 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
356 if enable_ner {
357 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
358 Ok(extracted) => {
359 extracted_urls = extracted.urls;
360 extracted_entities = extracted.entities;
361 extracted_relationships = extracted.relationships;
362
363 if extracted_entities.len() > max_entities_per_memory() {
364 extracted_entities.truncate(max_entities_per_memory());
365 }
366 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
367 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
368 }
369 }
370 Err(e) => {
371 tracing::warn!(
372 file = %path.display(),
373 "auto-extraction failed (graceful degradation): {e:#}"
374 );
375 }
376 }
377 }
378
379 for rel in &mut extracted_relationships {
380 rel.relation = crate::parsers::normalize_relation(&rel.relation);
381 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
382 return Err(AppError::Validation(format!(
383 "{e} for relationship '{}' -> '{}'",
384 rel.source, rel.target
385 )));
386 }
387 crate::parsers::warn_if_non_canonical(&rel.relation);
388 if !(0.0..=1.0).contains(&rel.strength) {
389 return Err(AppError::Validation(format!(
390 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
391 rel.strength, rel.source, rel.target
392 )));
393 }
394 }
395
396 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
397 let snippet: String = raw_body.chars().take(200).collect();
398
399 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
400 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
401 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
402 return Err(AppError::LimitExceeded(format!(
403 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
404 chunks_info.len(),
405 REMEMBER_MAX_SAFE_MULTI_CHUNKS
406 )));
407 }
408
409 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
410 let embedding = if chunks_info.len() == 1 {
411 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
412 } else {
413 let chunk_texts: Vec<&str> = chunks_info
414 .iter()
415 .map(|c| chunking::chunk_text(&raw_body, c))
416 .collect();
417 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
418 for chunk_text in &chunk_texts {
419 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
420 &paths.models,
421 chunk_text,
422 )?);
423 }
424 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
425 chunk_embeddings_opt = Some(chunk_embeddings);
426 aggregated
427 };
428
429 let entity_embeddings = extracted_entities
430 .iter()
431 .map(|entity| {
432 let entity_text = match &entity.description {
433 Some(desc) => format!("{} {}", entity.name, desc),
434 None => entity.name.clone(),
435 };
436 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
437 })
438 .collect::<Result<Vec<_>, _>>()?;
439
440 Ok(StagedFile {
441 body: raw_body,
442 body_hash,
443 snippet,
444 name: name.to_string(),
445 description,
446 embedding,
447 chunk_embeddings: chunk_embeddings_opt,
448 chunks_info,
449 entities: extracted_entities,
450 relationships: extracted_relationships,
451 entity_embeddings,
452 urls: extracted_urls,
453 })
454}
455
456fn persist_staged(
458 conn: &mut Connection,
459 namespace: &str,
460 memory_type: &str,
461 staged: StagedFile,
462) -> Result<FileSuccess, AppError> {
463 {
464 let active_count: u32 = conn.query_row(
465 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
466 [],
467 |r| r.get::<_, i64>(0).map(|v| v as u32),
468 )?;
469 let ns_exists: bool = conn.query_row(
470 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
471 rusqlite::params![namespace],
472 |r| r.get::<_, i64>(0).map(|v| v > 0),
473 )?;
474 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
475 return Err(AppError::NamespaceError(format!(
476 "active namespace limit of {} exceeded while creating '{namespace}'",
477 crate::constants::MAX_NAMESPACES_ACTIVE
478 )));
479 }
480 }
481
482 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
483 if existing_memory.is_some() {
484 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
485 &staged.name,
486 namespace,
487 )));
488 }
489 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
490
491 let new_memory = NewMemory {
492 namespace: namespace.to_string(),
493 name: staged.name.clone(),
494 memory_type: memory_type.to_string(),
495 description: staged.description.clone(),
496 body: staged.body,
497 body_hash: staged.body_hash,
498 session_id: None,
499 source: "agent".to_string(),
500 metadata: serde_json::json!({}),
501 };
502
503 if let Some(hash_id) = duplicate_hash_id {
504 tracing::debug!(
505 target: "ingest",
506 duplicate_memory_id = hash_id,
507 "identical body already exists; persisting a new memory anyway"
508 );
509 }
510
511 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
512
513 let memory_id = memories::insert(&tx, &new_memory)?;
514 versions::insert_version(
515 &tx,
516 memory_id,
517 1,
518 &staged.name,
519 memory_type,
520 &staged.description,
521 &new_memory.body,
522 &serde_json::to_string(&new_memory.metadata)?,
523 None,
524 "create",
525 )?;
526 memories::upsert_vec(
527 &tx,
528 memory_id,
529 namespace,
530 memory_type,
531 &staged.embedding,
532 &staged.name,
533 &staged.snippet,
534 )?;
535
536 if staged.chunks_info.len() > 1 {
537 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
538 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
539 AppError::Internal(anyhow::anyhow!(
540 "missing chunk embeddings cache on multi-chunk ingest path"
541 ))
542 })?;
543 for (i, emb) in chunk_embeddings.iter().enumerate() {
544 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
545 }
546 }
547
548 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
549 for (idx, entity) in staged.entities.iter().enumerate() {
550 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
551 let entity_embedding = &staged.entity_embeddings[idx];
552 entities::upsert_entity_vec(
553 &tx,
554 entity_id,
555 namespace,
556 entity.entity_type,
557 entity_embedding,
558 &entity.name,
559 )?;
560 entities::link_memory_entity(&tx, memory_id, entity_id)?;
561 entities::increment_degree(&tx, entity_id)?;
562 }
563 let entity_types: std::collections::HashMap<&str, EntityType> = staged
564 .entities
565 .iter()
566 .map(|entity| (entity.name.as_str(), entity.entity_type))
567 .collect();
568 for rel in &staged.relationships {
569 let source_entity = NewEntity {
570 name: rel.source.clone(),
571 entity_type: entity_types
572 .get(rel.source.as_str())
573 .copied()
574 .unwrap_or(EntityType::Concept),
575 description: None,
576 };
577 let target_entity = NewEntity {
578 name: rel.target.clone(),
579 entity_type: entity_types
580 .get(rel.target.as_str())
581 .copied()
582 .unwrap_or(EntityType::Concept),
583 description: None,
584 };
585 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
586 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
587 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
588 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
589 }
590 }
591
592 tx.commit()?;
593
594 if !staged.urls.is_empty() {
595 let url_entries: Vec<storage_urls::MemoryUrl> = staged
596 .urls
597 .into_iter()
598 .map(|u| storage_urls::MemoryUrl {
599 url: u.url,
600 offset: Some(u.offset as i64),
601 })
602 .collect();
603 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
604 }
605
606 Ok(FileSuccess {
607 memory_id,
608 action: "created".to_string(),
609 })
610}
611
612pub fn run(args: IngestArgs) -> Result<(), AppError> {
613 let started = std::time::Instant::now();
614
615 if !args.dir.exists() {
616 return Err(AppError::Io(std::io::Error::new(
617 std::io::ErrorKind::NotFound,
618 format!("directory not found: {}", args.dir.display()),
619 )));
620 }
621 if !args.dir.is_dir() {
622 return Err(AppError::Validation(format!(
623 "path is not a directory: {}",
624 args.dir.display()
625 )));
626 }
627
628 let mut files: Vec<PathBuf> = Vec::new();
629 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
630 files.sort();
631
632 if files.len() > args.max_files {
633 return Err(AppError::Validation(format!(
634 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
635 files.len(),
636 args.max_files
637 )));
638 }
639
640 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
641 let memory_type_str = args.r#type.as_str().to_string();
642
643 let paths = AppPaths::resolve(args.db.as_deref())?;
644 let mut conn_or_err = match init_storage(&paths) {
645 Ok(c) => Ok(c),
646 Err(e) => Err(format!("{e}")),
647 };
648
649 let mut succeeded: usize = 0;
650 let mut failed: usize = 0;
651 let mut skipped: usize = 0;
652 let total = files.len();
653
654 let mut taken_names: BTreeSet<String> = BTreeSet::new();
657
658 enum SlotMeta {
664 Skip {
665 file_str: String,
666 derived_base: String,
667 name_truncated: bool,
668 original_name: Option<String>,
669 reason: String,
670 },
671 Process {
672 file_str: String,
673 derived_name: String,
674 name_truncated: bool,
675 original_name: Option<String>,
676 },
677 }
678
679 struct ProcessItem {
680 idx: usize,
681 path: PathBuf,
682 file_str: String,
683 derived_name: String,
684 }
685
686 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
687 let mut process_items: Vec<ProcessItem> = Vec::new();
688 let mut truncations: Vec<(String, String)> = Vec::new();
689
690 for path in &files {
691 let file_str = path.to_string_lossy().into_owned();
692 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
693
694 if name_truncated {
695 if let Some(ref orig) = original_name {
696 truncations.push((orig.clone(), derived_base.clone()));
697 }
698 }
699
700 if derived_base.is_empty() {
701 slots_meta.push(SlotMeta::Skip {
702 file_str,
703 derived_base: String::new(),
704 name_truncated: false,
705 original_name: None,
706 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
707 });
708 continue;
709 }
710
711 match unique_name(&derived_base, &taken_names) {
712 Ok(derived_name) => {
713 taken_names.insert(derived_name.clone());
714 let idx = slots_meta.len();
715 process_items.push(ProcessItem {
716 idx,
717 path: path.clone(),
718 file_str: file_str.clone(),
719 derived_name: derived_name.clone(),
720 });
721 slots_meta.push(SlotMeta::Process {
722 file_str,
723 derived_name,
724 name_truncated,
725 original_name,
726 });
727 }
728 Err(e) => {
729 slots_meta.push(SlotMeta::Skip {
730 file_str,
731 derived_base,
732 name_truncated,
733 original_name,
734 reason: e.to_string(),
735 });
736 }
737 }
738 }
739
740 if !truncations.is_empty() {
741 tracing::info!(
742 target: "ingest",
743 count = truncations.len(),
744 max_len = DERIVED_NAME_MAX_LEN,
745 "derived names truncated; pass -vv (debug) for per-file detail"
746 );
747 }
748
749 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
752
753 let pool = rayon::ThreadPoolBuilder::new()
754 .num_threads(parallelism)
755 .build()
756 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
757
758 if args.enable_ner && args.skip_extraction {
759 tracing::warn!(
760 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
761 );
762 }
763 if args.skip_extraction && !args.enable_ner {
764 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
765 }
766 let enable_ner = args.enable_ner;
767 let gliner_variant: crate::extraction::GlinerVariant =
768 args.gliner_variant.parse().unwrap_or_else(|e| {
769 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
770 crate::extraction::GlinerVariant::Fp32
771 });
772
773 let total_to_process = process_items.len();
774 tracing::info!(
775 target = "ingest",
776 phase = "pipeline_start",
777 files = total_to_process,
778 ingest_parallelism = parallelism,
779 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
780 );
781
782 let channel_bound = (parallelism * 2).max(1);
786 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
787
788 let paths_owned = paths.clone();
793 let producer_handle = std::thread::spawn(move || {
794 pool.install(|| {
795 process_items.into_par_iter().for_each(|item| {
796 let t0 = std::time::Instant::now();
797 let result = stage_file(
798 item.idx,
799 &item.path,
800 &item.derived_name,
801 &paths_owned,
802 enable_ner,
803 gliner_variant,
804 );
805 let elapsed_ms = t0.elapsed().as_millis() as u64;
806
807 let (n_entities, n_relationships) = match &result {
810 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
811 Err(_) => (0, 0),
812 };
813 let progress = StageProgressEvent {
814 schema_version: 1,
815 event: "file_extracted",
816 path: &item.file_str,
817 ms: elapsed_ms,
818 entities: n_entities,
819 relationships: n_relationships,
820 };
821 if let Ok(line) = serde_json::to_string(&progress) {
822 eprintln!("{line}");
823 }
824
825 let _ = tx.send((item.idx, result));
829 });
830 drop(tx);
832 });
833 });
834
835 let fail_fast = args.fail_fast;
847
848 for meta in &slots_meta {
850 if let SlotMeta::Skip {
851 file_str,
852 derived_base,
853 name_truncated,
854 original_name,
855 reason,
856 } = meta
857 {
858 output::emit_json_compact(&IngestFileEvent {
859 file: file_str,
860 name: derived_base,
861 status: "skipped",
862 truncated: *name_truncated,
863 original_name: original_name.clone(),
864 error: Some(reason.clone()),
865 memory_id: None,
866 action: None,
867 })?;
868 skipped += 1;
869 }
870 }
871
872 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
875 .iter()
876 .enumerate()
877 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
878 .collect();
879
880 tracing::info!(
881 target = "ingest",
882 phase = "persist_start",
883 files = total_to_process,
884 "phase B starting: persisting files incrementally as Phase A completes each one",
885 );
886
887 for (idx, stage_result) in rx {
891 let meta = meta_index.get(&idx).ok_or_else(|| {
892 AppError::Internal(anyhow::anyhow!(
893 "channel idx {idx} has no corresponding Process slot"
894 ))
895 })?;
896 let (file_str, derived_name, name_truncated, original_name) = match meta {
897 SlotMeta::Process {
898 file_str,
899 derived_name,
900 name_truncated,
901 original_name,
902 } => (file_str, derived_name, name_truncated, original_name),
903 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
904 };
905
906 let conn = match conn_or_err.as_mut() {
908 Ok(c) => c,
909 Err(err_msg) => {
910 let err_clone = err_msg.clone();
911 output::emit_json_compact(&IngestFileEvent {
912 file: file_str,
913 name: derived_name,
914 status: "failed",
915 truncated: *name_truncated,
916 original_name: original_name.clone(),
917 error: Some(err_clone.clone()),
918 memory_id: None,
919 action: None,
920 })?;
921 failed += 1;
922 if fail_fast {
923 output::emit_json_compact(&IngestSummary {
924 summary: true,
925 dir: args.dir.display().to_string(),
926 pattern: args.pattern.clone(),
927 recursive: args.recursive,
928 files_total: total,
929 files_succeeded: succeeded,
930 files_failed: failed,
931 files_skipped: skipped,
932 elapsed_ms: started.elapsed().as_millis() as u64,
933 })?;
934 return Err(AppError::Validation(format!(
935 "ingest aborted on first failure: {err_clone}"
936 )));
937 }
938 continue;
939 }
940 };
941
942 let outcome =
943 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
944
945 match outcome {
946 Ok(FileSuccess { memory_id, action }) => {
947 output::emit_json_compact(&IngestFileEvent {
948 file: file_str,
949 name: derived_name,
950 status: "indexed",
951 truncated: *name_truncated,
952 original_name: original_name.clone(),
953 error: None,
954 memory_id: Some(memory_id),
955 action: Some(action),
956 })?;
957 succeeded += 1;
958 }
959 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
960 output::emit_json_compact(&IngestFileEvent {
961 file: file_str,
962 name: derived_name,
963 status: "skipped",
964 truncated: *name_truncated,
965 original_name: original_name.clone(),
966 error: Some(format!("{e}")),
967 memory_id: None,
968 action: Some("duplicate".to_string()),
969 })?;
970 skipped += 1;
971 }
972 Err(e) => {
973 let err_msg = format!("{e}");
974 output::emit_json_compact(&IngestFileEvent {
975 file: file_str,
976 name: derived_name,
977 status: "failed",
978 truncated: *name_truncated,
979 original_name: original_name.clone(),
980 error: Some(err_msg.clone()),
981 memory_id: None,
982 action: None,
983 })?;
984 failed += 1;
985 if fail_fast {
986 output::emit_json_compact(&IngestSummary {
987 summary: true,
988 dir: args.dir.display().to_string(),
989 pattern: args.pattern.clone(),
990 recursive: args.recursive,
991 files_total: total,
992 files_succeeded: succeeded,
993 files_failed: failed,
994 files_skipped: skipped,
995 elapsed_ms: started.elapsed().as_millis() as u64,
996 })?;
997 return Err(AppError::Validation(format!(
998 "ingest aborted on first failure: {err_msg}"
999 )));
1000 }
1001 }
1002 }
1003 }
1004
1005 producer_handle
1007 .join()
1008 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1009
1010 output::emit_json_compact(&IngestSummary {
1011 summary: true,
1012 dir: args.dir.display().to_string(),
1013 pattern: args.pattern.clone(),
1014 recursive: args.recursive,
1015 files_total: total,
1016 files_succeeded: succeeded,
1017 files_failed: failed,
1018 files_skipped: skipped,
1019 elapsed_ms: started.elapsed().as_millis() as u64,
1020 })?;
1021
1022 Ok(())
1023}
1024
1025fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1031 ensure_db_ready(paths)?;
1032 let conn = open_rw(&paths.db)?;
1033 Ok(conn)
1034}
1035
1036fn collect_files(
1037 dir: &Path,
1038 pattern: &str,
1039 recursive: bool,
1040 out: &mut Vec<PathBuf>,
1041) -> Result<(), AppError> {
1042 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1043 for entry in entries {
1044 let entry = entry.map_err(AppError::Io)?;
1045 let path = entry.path();
1046 let file_type = entry.file_type().map_err(AppError::Io)?;
1047 if file_type.is_file() {
1048 let name = entry.file_name();
1049 let name_str = name.to_string_lossy();
1050 if matches_pattern(&name_str, pattern) {
1051 out.push(path);
1052 }
1053 } else if file_type.is_dir() && recursive {
1054 collect_files(&path, pattern, recursive, out)?;
1055 }
1056 }
1057 Ok(())
1058}
1059
1060fn matches_pattern(name: &str, pattern: &str) -> bool {
1061 if let Some(suffix) = pattern.strip_prefix('*') {
1062 name.ends_with(suffix)
1063 } else if let Some(prefix) = pattern.strip_suffix('*') {
1064 name.starts_with(prefix)
1065 } else {
1066 name == pattern
1067 }
1068}
1069
1070fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1081 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1082 let lowered: String = stem
1083 .nfd()
1084 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1085 .map(|c| {
1086 if c == '_' || c.is_whitespace() {
1087 '-'
1088 } else {
1089 c
1090 }
1091 })
1092 .map(|c| c.to_ascii_lowercase())
1093 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1094 .collect();
1095 let collapsed = collapse_dashes(&lowered);
1096 let trimmed = collapsed.trim_matches('-').to_string();
1097 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1098 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1099 .trim_matches('-')
1100 .to_string();
1101 tracing::debug!(
1102 target: "ingest",
1103 original = %trimmed,
1104 truncated_to = %truncated,
1105 max_len = DERIVED_NAME_MAX_LEN,
1106 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1107 );
1108 (truncated, true, Some(trimmed))
1109 } else {
1110 (trimmed, false, None)
1111 }
1112}
1113
1114fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1127 if !taken.contains(base) {
1128 return Ok(base.to_string());
1129 }
1130 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1131 let candidate = format!("{base}-{suffix}");
1132 if !taken.contains(&candidate) {
1133 tracing::warn!(
1134 target: "ingest",
1135 base = %base,
1136 resolved = %candidate,
1137 suffix,
1138 "memory name collision resolved with numeric suffix"
1139 );
1140 return Ok(candidate);
1141 }
1142 }
1143 Err(AppError::Validation(format!(
1144 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1145 )))
1146}
1147
1148fn collapse_dashes(s: &str) -> String {
1149 let mut out = String::with_capacity(s.len());
1150 let mut prev_dash = false;
1151 for c in s.chars() {
1152 if c == '-' {
1153 if !prev_dash {
1154 out.push('-');
1155 }
1156 prev_dash = true;
1157 } else {
1158 out.push(c);
1159 prev_dash = false;
1160 }
1161 }
1162 out
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167 use super::*;
1168 use std::path::PathBuf;
1169
1170 #[test]
1171 fn matches_pattern_suffix() {
1172 assert!(matches_pattern("foo.md", "*.md"));
1173 assert!(!matches_pattern("foo.txt", "*.md"));
1174 assert!(matches_pattern("foo.md", "*"));
1175 }
1176
1177 #[test]
1178 fn matches_pattern_prefix() {
1179 assert!(matches_pattern("README.md", "README*"));
1180 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1181 }
1182
1183 #[test]
1184 fn matches_pattern_exact() {
1185 assert!(matches_pattern("README.md", "README.md"));
1186 assert!(!matches_pattern("readme.md", "README.md"));
1187 }
1188
1189 #[test]
1190 fn derive_kebab_underscore_to_dash() {
1191 let p = PathBuf::from("/tmp/claude_code_headless.md");
1192 let (name, truncated, original) = derive_kebab_name(&p);
1193 assert_eq!(name, "claude-code-headless");
1194 assert!(!truncated);
1195 assert!(original.is_none());
1196 }
1197
1198 #[test]
1199 fn derive_kebab_uppercase_lowered() {
1200 let p = PathBuf::from("/tmp/README.md");
1201 let (name, truncated, original) = derive_kebab_name(&p);
1202 assert_eq!(name, "readme");
1203 assert!(!truncated);
1204 assert!(original.is_none());
1205 }
1206
1207 #[test]
1208 fn derive_kebab_strips_non_kebab_chars() {
1209 let p = PathBuf::from("/tmp/some@weird#name!.md");
1210 let (name, truncated, original) = derive_kebab_name(&p);
1211 assert_eq!(name, "someweirdname");
1212 assert!(!truncated);
1213 assert!(original.is_none());
1214 }
1215
1216 #[test]
1219 fn derive_kebab_folds_accented_letters_to_ascii() {
1220 let p = PathBuf::from("/tmp/açaí.md");
1221 let (name, _, _) = derive_kebab_name(&p);
1222 assert_eq!(name, "acai", "got '{name}'");
1223 }
1224
1225 #[test]
1226 fn derive_kebab_handles_naive_with_diaeresis() {
1227 let p = PathBuf::from("/tmp/naïve-test.md");
1228 let (name, _, _) = derive_kebab_name(&p);
1229 assert_eq!(name, "naive-test", "got '{name}'");
1230 }
1231
1232 #[test]
1233 fn derive_kebab_drops_emoji_keeps_word() {
1234 let p = PathBuf::from("/tmp/🚀-rocket.md");
1235 let (name, _, _) = derive_kebab_name(&p);
1236 assert_eq!(name, "rocket", "got '{name}'");
1237 }
1238
1239 #[test]
1240 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1241 let p = PathBuf::from("/tmp/açaí🦜.md");
1242 let (name, _, _) = derive_kebab_name(&p);
1243 assert_eq!(name, "acai", "got '{name}'");
1244 }
1245
1246 #[test]
1247 fn derive_kebab_pure_emoji_yields_empty() {
1248 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1249 let (name, _, _) = derive_kebab_name(&p);
1250 assert!(name.is_empty(), "got '{name}'");
1251 }
1252
1253 #[test]
1254 fn derive_kebab_collapses_consecutive_dashes() {
1255 let p = PathBuf::from("/tmp/a__b___c.md");
1256 let (name, truncated, original) = derive_kebab_name(&p);
1257 assert_eq!(name, "a-b-c");
1258 assert!(!truncated);
1259 assert!(original.is_none());
1260 }
1261
1262 #[test]
1263 fn derive_kebab_truncates_to_60_chars() {
1264 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1265 let (name, truncated, original) = derive_kebab_name(&p);
1266 assert!(name.len() <= 60, "got len {}", name.len());
1267 assert!(truncated);
1268 assert!(original.is_some());
1269 assert!(original.unwrap().len() > 60);
1270 }
1271
1272 #[test]
1273 fn collect_files_finds_md_files() {
1274 let tmp = tempfile::tempdir().expect("tempdir");
1275 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1276 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1277 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1278 let mut out = Vec::new();
1279 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1280 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1281 }
1282
1283 #[test]
1284 fn collect_files_recursive_descends_subdirs() {
1285 let tmp = tempfile::tempdir().expect("tempdir");
1286 let sub = tmp.path().join("sub");
1287 std::fs::create_dir(&sub).unwrap();
1288 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1289 std::fs::write(sub.join("b.md"), "y").unwrap();
1290 let mut out = Vec::new();
1291 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1292 assert_eq!(out.len(), 2);
1293 }
1294
1295 #[test]
1296 fn collect_files_non_recursive_skips_subdirs() {
1297 let tmp = tempfile::tempdir().expect("tempdir");
1298 let sub = tmp.path().join("sub");
1299 std::fs::create_dir(&sub).unwrap();
1300 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1301 std::fs::write(sub.join("b.md"), "y").unwrap();
1302 let mut out = Vec::new();
1303 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1304 assert_eq!(out.len(), 1);
1305 }
1306
1307 #[test]
1310 fn derive_kebab_long_basename_truncated_within_cap() {
1311 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1312 let (name, truncated, original) = derive_kebab_name(&p);
1313 assert!(
1314 name.len() <= DERIVED_NAME_MAX_LEN,
1315 "truncated name must respect cap; got {} chars",
1316 name.len()
1317 );
1318 assert!(!name.is_empty());
1319 assert!(truncated);
1320 assert!(original.is_some());
1321 }
1322
1323 #[test]
1324 fn unique_name_returns_base_when_free() {
1325 let taken: BTreeSet<String> = BTreeSet::new();
1326 let resolved = unique_name("note", &taken).expect("must resolve");
1327 assert_eq!(resolved, "note");
1328 }
1329
1330 #[test]
1331 fn unique_name_appends_first_free_suffix_on_collision() {
1332 let mut taken: BTreeSet<String> = BTreeSet::new();
1333 taken.insert("note".to_string());
1334 taken.insert("note-1".to_string());
1335 let resolved = unique_name("note", &taken).expect("must resolve");
1336 assert_eq!(resolved, "note-2");
1337 }
1338
1339 #[test]
1340 fn unique_name_errors_after_collision_cap() {
1341 let mut taken: BTreeSet<String> = BTreeSet::new();
1342 taken.insert("note".to_string());
1343 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1344 taken.insert(format!("note-{i}"));
1345 }
1346 let err = unique_name("note", &taken).expect_err("must surface error");
1347 assert!(matches!(err, AppError::Validation(_)));
1348 }
1349
1350 #[test]
1353 fn validate_relation_format_accepts_valid_relations() {
1354 use crate::parsers::{is_canonical_relation, validate_relation_format};
1355 assert!(validate_relation_format("applies_to").is_ok());
1356 assert!(validate_relation_format("depends_on").is_ok());
1357 assert!(validate_relation_format("implements").is_ok());
1358 assert!(validate_relation_format("").is_err());
1359 assert!(is_canonical_relation("applies_to"));
1360 assert!(!is_canonical_relation("implements"));
1361 }
1362
1363 use serial_test::serial;
1366
1367 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1369 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1370 let prev = std::env::var(key).ok();
1371 match value {
1372 Some(v) => std::env::set_var(key, v),
1373 None => std::env::remove_var(key),
1374 }
1375 f();
1376 match prev {
1377 Some(p) => std::env::set_var(key, p),
1378 None => std::env::remove_var(key),
1379 }
1380 }
1381
1382 #[test]
1383 #[serial]
1384 fn env_low_memory_enabled_unset_returns_false() {
1385 with_env_var(None, || assert!(!env_low_memory_enabled()));
1386 }
1387
1388 #[test]
1389 #[serial]
1390 fn env_low_memory_enabled_empty_returns_false() {
1391 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1392 }
1393
1394 #[test]
1395 #[serial]
1396 fn env_low_memory_enabled_truthy_values_return_true() {
1397 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1398 with_env_var(Some(v), || {
1399 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1400 });
1401 }
1402 }
1403
1404 #[test]
1405 #[serial]
1406 fn env_low_memory_enabled_falsy_values_return_false() {
1407 for v in ["0", "false", "FALSE", "no", "off"] {
1408 with_env_var(Some(v), || {
1409 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1410 });
1411 }
1412 }
1413
1414 #[test]
1415 #[serial]
1416 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1417 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1418 }
1419
1420 #[test]
1421 #[serial]
1422 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1423 with_env_var(None, || {
1424 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1425 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1426 assert_eq!(resolve_parallelism(true, None), 1);
1427 });
1428 }
1429
1430 #[test]
1431 #[serial]
1432 fn resolve_parallelism_env_forces_one_when_flag_off() {
1433 with_env_var(Some("1"), || {
1434 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1435 assert_eq!(resolve_parallelism(false, None), 1);
1436 });
1437 }
1438
1439 #[test]
1440 #[serial]
1441 fn resolve_parallelism_falsy_env_does_not_override() {
1442 with_env_var(Some("0"), || {
1443 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1444 });
1445 }
1446
1447 #[test]
1448 #[serial]
1449 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1450 with_env_var(None, || {
1451 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1452 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1453 });
1454 }
1455
1456 #[test]
1457 #[serial]
1458 fn resolve_parallelism_default_when_unset() {
1459 with_env_var(None, || {
1460 let p = resolve_parallelism(false, None);
1461 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1462 });
1463 }
1464
1465 #[test]
1466 fn ingest_args_parses_low_memory_flag_via_clap() {
1467 use clap::Parser;
1468 let cli = crate::cli::Cli::try_parse_from([
1471 "sqlite-graphrag",
1472 "ingest",
1473 "/tmp/dummy",
1474 "--type",
1475 "document",
1476 "--low-memory",
1477 ])
1478 .expect("parse must succeed");
1479 match cli.command {
1480 crate::cli::Commands::Ingest(args) => {
1481 assert!(args.low_memory, "--low-memory must set field to true");
1482 }
1483 _ => panic!("expected Ingest subcommand"),
1484 }
1485 }
1486
1487 #[test]
1488 fn ingest_args_low_memory_defaults_false() {
1489 use clap::Parser;
1490 let cli = crate::cli::Cli::try_parse_from([
1491 "sqlite-graphrag",
1492 "ingest",
1493 "/tmp/dummy",
1494 "--type",
1495 "document",
1496 ])
1497 .expect("parse must succeed");
1498 match cli.command {
1499 crate::cli::Commands::Ingest(args) => {
1500 assert!(!args.low_memory, "default must be false");
1501 }
1502 _ => panic!("expected Ingest subcommand"),
1503 }
1504 }
1505}