1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68NOTES:\n \
69 Each file becomes a separate memory. Names derive from file basenames\n \
70 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
71 followed by a final summary line with counts. Per-file errors are reported\n \
72 inline and processing continues unless --fail-fast is set.")]
73pub struct IngestArgs {
74 #[arg(
76 value_name = "DIR",
77 help = "Directory to ingest recursively (each matching file becomes a memory)"
78 )]
79 pub dir: PathBuf,
80
81 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
83 pub r#type: MemoryType,
84
85 #[arg(long, default_value = "*.md")]
88 pub pattern: String,
89
90 #[arg(long, default_value_t = false)]
92 pub recursive: bool,
93
94 #[arg(
95 long,
96 env = "SQLITE_GRAPHRAG_ENABLE_NER",
97 value_parser = crate::parsers::parse_bool_flexible,
98 action = clap::ArgAction::Set,
99 num_args = 0..=1,
100 default_missing_value = "true",
101 default_value = "false",
102 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
103 )]
104 pub enable_ner: bool,
105 #[arg(
106 long,
107 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
108 default_value = "fp32",
109 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
110 )]
111 pub gliner_variant: String,
112
113 #[arg(long, default_value_t = false, hide = true)]
115 pub skip_extraction: bool,
116
117 #[arg(long, default_value_t = false)]
119 pub fail_fast: bool,
120
121 #[arg(long, default_value_t = 10_000)]
123 pub max_files: usize,
124
125 #[arg(long)]
127 pub namespace: Option<String>,
128
129 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
131 pub db: Option<String>,
132
133 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
134 pub format: JsonOutputFormat,
135
136 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
137 pub json: bool,
138
139 #[arg(
141 long,
142 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
143 )]
144 pub ingest_parallelism: Option<usize>,
145
146 #[arg(
154 long,
155 default_value_t = false,
156 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
157 Recommended for environments with <4 GB available RAM or container/cgroup \
158 constraints. Trade-off: 3-4x longer wall time. Also honored via \
159 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
160 )]
161 pub low_memory: bool,
162
163 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
165 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
166 pub max_rss_mb: u64,
167}
168
169fn env_low_memory_enabled() -> bool {
174 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
175 Ok(v) if v.is_empty() => false,
176 Ok(v) => match v.to_lowercase().as_str() {
177 "1" | "true" | "yes" | "on" => true,
178 "0" | "false" | "no" | "off" => false,
179 other => {
180 tracing::warn!(
181 target: "ingest",
182 value = %other,
183 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
184 );
185 false
186 }
187 },
188 Err(_) => false,
189 }
190}
191
192fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
204 let env_flag = env_low_memory_enabled();
205 let low_memory = low_memory_flag || env_flag;
206
207 if low_memory {
208 if let Some(n) = ingest_parallelism {
209 if n > 1 {
210 tracing::warn!(
211 target: "ingest",
212 requested = n,
213 "--ingest-parallelism overridden by --low-memory; using 1"
214 );
215 }
216 }
217 if low_memory_flag {
218 tracing::info!(
219 target: "ingest",
220 source = "flag",
221 "low-memory mode enabled: forcing --ingest-parallelism 1"
222 );
223 } else {
224 tracing::info!(
225 target: "ingest",
226 source = "env",
227 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
228 );
229 }
230 return 1;
231 }
232
233 ingest_parallelism
234 .unwrap_or_else(|| {
235 std::thread::available_parallelism()
236 .map(|v| v.get() / 2)
237 .unwrap_or(1)
238 .clamp(1, 4)
239 })
240 .max(1)
241}
242
243#[derive(Serialize)]
244struct IngestFileEvent<'a> {
245 file: &'a str,
246 name: &'a str,
247 status: &'a str,
248 truncated: bool,
250 #[serde(skip_serializing_if = "Option::is_none")]
252 original_name: Option<String>,
253 #[serde(skip_serializing_if = "Option::is_none")]
254 error: Option<String>,
255 #[serde(skip_serializing_if = "Option::is_none")]
256 memory_id: Option<i64>,
257 #[serde(skip_serializing_if = "Option::is_none")]
258 action: Option<String>,
259}
260
261#[derive(Serialize)]
262struct IngestSummary {
263 summary: bool,
264 dir: String,
265 pattern: String,
266 recursive: bool,
267 files_total: usize,
268 files_succeeded: usize,
269 files_failed: usize,
270 files_skipped: usize,
271 elapsed_ms: u64,
272}
273
274struct FileSuccess {
276 memory_id: i64,
277 action: String,
278}
279
280#[derive(Serialize)]
283struct StageProgressEvent<'a> {
284 schema_version: u8,
285 event: &'a str,
286 path: &'a str,
287 ms: u64,
288 entities: usize,
289 relationships: usize,
290}
291
292struct StagedFile {
295 body: String,
296 body_hash: String,
297 snippet: String,
298 name: String,
299 description: String,
300 embedding: Vec<f32>,
301 chunk_embeddings: Option<Vec<Vec<f32>>>,
302 chunks_info: Vec<crate::chunking::Chunk>,
303 entities: Vec<NewEntity>,
304 relationships: Vec<NewRelationship>,
305 entity_embeddings: Vec<Vec<f32>>,
306 urls: Vec<crate::extraction::ExtractedUrl>,
307}
308
309fn stage_file(
312 _idx: usize,
313 path: &Path,
314 name: &str,
315 paths: &AppPaths,
316 enable_ner: bool,
317 gliner_variant: crate::extraction::GlinerVariant,
318 max_rss_mb: u64,
319) -> Result<StagedFile, AppError> {
320 use crate::constants::*;
321
322 if name.len() > MAX_MEMORY_NAME_LEN {
323 return Err(AppError::LimitExceeded(
324 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
325 ));
326 }
327 if name.starts_with("__") {
328 return Err(AppError::Validation(
329 crate::i18n::validation::reserved_name(),
330 ));
331 }
332 {
333 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
334 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
335 if !slug_re.is_match(name) {
336 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
337 name,
338 )));
339 }
340 }
341
342 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
343 if raw_body.len() > MAX_MEMORY_BODY_LEN {
344 return Err(AppError::LimitExceeded(
345 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
346 ));
347 }
348 if raw_body.trim().is_empty() {
349 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
350 }
351
352 let description = format!("ingested from {}", path.display());
353 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
354 return Err(AppError::Validation(
355 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
356 ));
357 }
358
359 let mut extracted_entities: Vec<NewEntity> = Vec::new();
360 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
361 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
362 if enable_ner {
363 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
364 Ok(extracted) => {
365 extracted_urls = extracted.urls;
366 extracted_entities = extracted.entities;
367 extracted_relationships = extracted.relationships;
368
369 if extracted_entities.len() > max_entities_per_memory() {
370 extracted_entities.truncate(max_entities_per_memory());
371 }
372 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
373 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
374 }
375 }
376 Err(e) => {
377 tracing::warn!(
378 file = %path.display(),
379 "auto-extraction failed (graceful degradation): {e:#}"
380 );
381 }
382 }
383 }
384
385 for rel in &mut extracted_relationships {
386 rel.relation = crate::parsers::normalize_relation(&rel.relation);
387 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
388 return Err(AppError::Validation(format!(
389 "{e} for relationship '{}' -> '{}'",
390 rel.source, rel.target
391 )));
392 }
393 crate::parsers::warn_if_non_canonical(&rel.relation);
394 if !(0.0..=1.0).contains(&rel.strength) {
395 return Err(AppError::Validation(format!(
396 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
397 rel.strength, rel.source, rel.target
398 )));
399 }
400 }
401
402 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
403 let snippet: String = raw_body.chars().take(200).collect();
404
405 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
406 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
407 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
408 return Err(AppError::LimitExceeded(format!(
409 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
410 chunks_info.len(),
411 REMEMBER_MAX_SAFE_MULTI_CHUNKS
412 )));
413 }
414
415 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
416 let embedding = if chunks_info.len() == 1 {
417 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
418 } else {
419 let chunk_texts: Vec<&str> = chunks_info
420 .iter()
421 .map(|c| chunking::chunk_text(&raw_body, c))
422 .collect();
423 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
424 for chunk_text in &chunk_texts {
425 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
426 if rss > max_rss_mb {
427 tracing::error!(
428 rss_mb = rss,
429 max_rss_mb = max_rss_mb,
430 file = %path.display(),
431 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
432 );
433 return Err(AppError::LowMemory {
434 available_mb: crate::memory_guard::available_memory_mb(),
435 required_mb: max_rss_mb,
436 });
437 }
438 }
439 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
440 &paths.models,
441 chunk_text,
442 )?);
443 }
444 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
445 chunk_embeddings_opt = Some(chunk_embeddings);
446 aggregated
447 };
448
449 let entity_embeddings = extracted_entities
450 .iter()
451 .map(|entity| {
452 let entity_text = match &entity.description {
453 Some(desc) => format!("{} {}", entity.name, desc),
454 None => entity.name.clone(),
455 };
456 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
457 })
458 .collect::<Result<Vec<_>, _>>()?;
459
460 Ok(StagedFile {
461 body: raw_body,
462 body_hash,
463 snippet,
464 name: name.to_string(),
465 description,
466 embedding,
467 chunk_embeddings: chunk_embeddings_opt,
468 chunks_info,
469 entities: extracted_entities,
470 relationships: extracted_relationships,
471 entity_embeddings,
472 urls: extracted_urls,
473 })
474}
475
476fn persist_staged(
478 conn: &mut Connection,
479 namespace: &str,
480 memory_type: &str,
481 staged: StagedFile,
482) -> Result<FileSuccess, AppError> {
483 {
484 let active_count: u32 = conn.query_row(
485 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
486 [],
487 |r| r.get::<_, i64>(0).map(|v| v as u32),
488 )?;
489 let ns_exists: bool = conn.query_row(
490 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
491 rusqlite::params![namespace],
492 |r| r.get::<_, i64>(0).map(|v| v > 0),
493 )?;
494 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
495 return Err(AppError::NamespaceError(format!(
496 "active namespace limit of {} exceeded while creating '{namespace}'",
497 crate::constants::MAX_NAMESPACES_ACTIVE
498 )));
499 }
500 }
501
502 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
503 if existing_memory.is_some() {
504 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
505 &staged.name,
506 namespace,
507 )));
508 }
509 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
510
511 let new_memory = NewMemory {
512 namespace: namespace.to_string(),
513 name: staged.name.clone(),
514 memory_type: memory_type.to_string(),
515 description: staged.description.clone(),
516 body: staged.body,
517 body_hash: staged.body_hash,
518 session_id: None,
519 source: "agent".to_string(),
520 metadata: serde_json::json!({}),
521 };
522
523 if let Some(hash_id) = duplicate_hash_id {
524 tracing::debug!(
525 target: "ingest",
526 duplicate_memory_id = hash_id,
527 "identical body already exists; persisting a new memory anyway"
528 );
529 }
530
531 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
532
533 let memory_id = memories::insert(&tx, &new_memory)?;
534 versions::insert_version(
535 &tx,
536 memory_id,
537 1,
538 &staged.name,
539 memory_type,
540 &staged.description,
541 &new_memory.body,
542 &serde_json::to_string(&new_memory.metadata)?,
543 None,
544 "create",
545 )?;
546 memories::upsert_vec(
547 &tx,
548 memory_id,
549 namespace,
550 memory_type,
551 &staged.embedding,
552 &staged.name,
553 &staged.snippet,
554 )?;
555
556 if staged.chunks_info.len() > 1 {
557 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
558 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
559 AppError::Internal(anyhow::anyhow!(
560 "missing chunk embeddings cache on multi-chunk ingest path"
561 ))
562 })?;
563 for (i, emb) in chunk_embeddings.iter().enumerate() {
564 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
565 }
566 }
567
568 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
569 for (idx, entity) in staged.entities.iter().enumerate() {
570 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
571 let entity_embedding = &staged.entity_embeddings[idx];
572 entities::upsert_entity_vec(
573 &tx,
574 entity_id,
575 namespace,
576 entity.entity_type,
577 entity_embedding,
578 &entity.name,
579 )?;
580 entities::link_memory_entity(&tx, memory_id, entity_id)?;
581 entities::increment_degree(&tx, entity_id)?;
582 }
583 let entity_types: std::collections::HashMap<&str, EntityType> = staged
584 .entities
585 .iter()
586 .map(|entity| (entity.name.as_str(), entity.entity_type))
587 .collect();
588 for rel in &staged.relationships {
589 let source_entity = NewEntity {
590 name: rel.source.clone(),
591 entity_type: entity_types
592 .get(rel.source.as_str())
593 .copied()
594 .unwrap_or(EntityType::Concept),
595 description: None,
596 };
597 let target_entity = NewEntity {
598 name: rel.target.clone(),
599 entity_type: entity_types
600 .get(rel.target.as_str())
601 .copied()
602 .unwrap_or(EntityType::Concept),
603 description: None,
604 };
605 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
606 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
607 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
608 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
609 }
610 }
611
612 tx.commit()?;
613
614 if !staged.urls.is_empty() {
615 let url_entries: Vec<storage_urls::MemoryUrl> = staged
616 .urls
617 .into_iter()
618 .map(|u| storage_urls::MemoryUrl {
619 url: u.url,
620 offset: Some(u.offset as i64),
621 })
622 .collect();
623 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
624 }
625
626 Ok(FileSuccess {
627 memory_id,
628 action: "created".to_string(),
629 })
630}
631
632pub fn run(args: IngestArgs) -> Result<(), AppError> {
633 let started = std::time::Instant::now();
634
635 if !args.dir.exists() {
636 return Err(AppError::Io(std::io::Error::new(
637 std::io::ErrorKind::NotFound,
638 format!("directory not found: {}", args.dir.display()),
639 )));
640 }
641 if !args.dir.is_dir() {
642 return Err(AppError::Validation(format!(
643 "path is not a directory: {}",
644 args.dir.display()
645 )));
646 }
647
648 let mut files: Vec<PathBuf> = Vec::new();
649 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
650 files.sort();
651
652 if files.len() > args.max_files {
653 return Err(AppError::Validation(format!(
654 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
655 files.len(),
656 args.max_files
657 )));
658 }
659
660 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
661 let memory_type_str = args.r#type.as_str().to_string();
662
663 let paths = AppPaths::resolve(args.db.as_deref())?;
664 let mut conn_or_err = match init_storage(&paths) {
665 Ok(c) => Ok(c),
666 Err(e) => Err(format!("{e}")),
667 };
668
669 let mut succeeded: usize = 0;
670 let mut failed: usize = 0;
671 let mut skipped: usize = 0;
672 let total = files.len();
673
674 let mut taken_names: BTreeSet<String> = BTreeSet::new();
677
678 enum SlotMeta {
684 Skip {
685 file_str: String,
686 derived_base: String,
687 name_truncated: bool,
688 original_name: Option<String>,
689 reason: String,
690 },
691 Process {
692 file_str: String,
693 derived_name: String,
694 name_truncated: bool,
695 original_name: Option<String>,
696 },
697 }
698
699 struct ProcessItem {
700 idx: usize,
701 path: PathBuf,
702 file_str: String,
703 derived_name: String,
704 }
705
706 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
707 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
708 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
709
710 for path in &files {
711 let file_str = path.to_string_lossy().into_owned();
712 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
713
714 if name_truncated {
715 if let Some(ref orig) = original_name {
716 truncations.push((orig.clone(), derived_base.clone()));
717 }
718 }
719
720 if derived_base.is_empty() {
721 slots_meta.push(SlotMeta::Skip {
722 file_str,
723 derived_base: String::new(),
724 name_truncated: false,
725 original_name: None,
726 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
727 });
728 continue;
729 }
730
731 match unique_name(&derived_base, &taken_names) {
732 Ok(derived_name) => {
733 taken_names.insert(derived_name.clone());
734 let idx = slots_meta.len();
735 process_items.push(ProcessItem {
736 idx,
737 path: path.clone(),
738 file_str: file_str.clone(),
739 derived_name: derived_name.clone(),
740 });
741 slots_meta.push(SlotMeta::Process {
742 file_str,
743 derived_name,
744 name_truncated,
745 original_name,
746 });
747 }
748 Err(e) => {
749 slots_meta.push(SlotMeta::Skip {
750 file_str,
751 derived_base,
752 name_truncated,
753 original_name,
754 reason: e.to_string(),
755 });
756 }
757 }
758 }
759
760 if !truncations.is_empty() {
761 tracing::info!(
762 target: "ingest",
763 count = truncations.len(),
764 max_len = DERIVED_NAME_MAX_LEN,
765 "derived names truncated; pass -vv (debug) for per-file detail"
766 );
767 }
768
769 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
772
773 let pool = rayon::ThreadPoolBuilder::new()
774 .num_threads(parallelism)
775 .build()
776 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
777
778 if args.enable_ner && args.skip_extraction {
779 tracing::warn!(
780 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
781 );
782 }
783 if args.skip_extraction && !args.enable_ner {
784 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
785 }
786 let enable_ner = args.enable_ner;
787 let max_rss_mb = args.max_rss_mb;
788 let gliner_variant: crate::extraction::GlinerVariant =
789 args.gliner_variant.parse().unwrap_or_else(|e| {
790 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
791 crate::extraction::GlinerVariant::Fp32
792 });
793
794 let total_to_process = process_items.len();
795 tracing::info!(
796 target = "ingest",
797 phase = "pipeline_start",
798 files = total_to_process,
799 ingest_parallelism = parallelism,
800 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
801 );
802
803 let channel_bound = (parallelism * 2).max(1);
807 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
808
809 let paths_owned = paths.clone();
814 let producer_handle = std::thread::spawn(move || {
815 pool.install(|| {
816 process_items.into_par_iter().for_each(|item| {
817 let t0 = std::time::Instant::now();
818 let result = stage_file(
819 item.idx,
820 &item.path,
821 &item.derived_name,
822 &paths_owned,
823 enable_ner,
824 gliner_variant,
825 max_rss_mb,
826 );
827 let elapsed_ms = t0.elapsed().as_millis() as u64;
828
829 let (n_entities, n_relationships) = match &result {
832 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
833 Err(_) => (0, 0),
834 };
835 let progress = StageProgressEvent {
836 schema_version: 1,
837 event: "file_extracted",
838 path: &item.file_str,
839 ms: elapsed_ms,
840 entities: n_entities,
841 relationships: n_relationships,
842 };
843 if let Ok(line) = serde_json::to_string(&progress) {
844 eprintln!("{line}");
845 }
846
847 let _ = tx.send((item.idx, result));
851 });
852 drop(tx);
854 });
855 });
856
857 let fail_fast = args.fail_fast;
869
870 for meta in &slots_meta {
872 if let SlotMeta::Skip {
873 file_str,
874 derived_base,
875 name_truncated,
876 original_name,
877 reason,
878 } = meta
879 {
880 output::emit_json_compact(&IngestFileEvent {
881 file: file_str,
882 name: derived_base,
883 status: "skipped",
884 truncated: *name_truncated,
885 original_name: original_name.clone(),
886 error: Some(reason.clone()),
887 memory_id: None,
888 action: None,
889 })?;
890 skipped += 1;
891 }
892 }
893
894 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
897 .iter()
898 .enumerate()
899 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
900 .collect();
901
902 tracing::info!(
903 target = "ingest",
904 phase = "persist_start",
905 files = total_to_process,
906 "phase B starting: persisting files incrementally as Phase A completes each one",
907 );
908
909 for (idx, stage_result) in rx {
913 let meta = meta_index.get(&idx).ok_or_else(|| {
914 AppError::Internal(anyhow::anyhow!(
915 "channel idx {idx} has no corresponding Process slot"
916 ))
917 })?;
918 let (file_str, derived_name, name_truncated, original_name) = match meta {
919 SlotMeta::Process {
920 file_str,
921 derived_name,
922 name_truncated,
923 original_name,
924 } => (file_str, derived_name, name_truncated, original_name),
925 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
926 };
927
928 let conn = match conn_or_err.as_mut() {
930 Ok(c) => c,
931 Err(err_msg) => {
932 let err_clone = err_msg.clone();
933 output::emit_json_compact(&IngestFileEvent {
934 file: file_str,
935 name: derived_name,
936 status: "failed",
937 truncated: *name_truncated,
938 original_name: original_name.clone(),
939 error: Some(err_clone.clone()),
940 memory_id: None,
941 action: None,
942 })?;
943 failed += 1;
944 if fail_fast {
945 output::emit_json_compact(&IngestSummary {
946 summary: true,
947 dir: args.dir.display().to_string(),
948 pattern: args.pattern.clone(),
949 recursive: args.recursive,
950 files_total: total,
951 files_succeeded: succeeded,
952 files_failed: failed,
953 files_skipped: skipped,
954 elapsed_ms: started.elapsed().as_millis() as u64,
955 })?;
956 return Err(AppError::Validation(format!(
957 "ingest aborted on first failure: {err_clone}"
958 )));
959 }
960 continue;
961 }
962 };
963
964 let outcome =
965 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
966
967 match outcome {
968 Ok(FileSuccess { memory_id, action }) => {
969 output::emit_json_compact(&IngestFileEvent {
970 file: file_str,
971 name: derived_name,
972 status: "indexed",
973 truncated: *name_truncated,
974 original_name: original_name.clone(),
975 error: None,
976 memory_id: Some(memory_id),
977 action: Some(action),
978 })?;
979 succeeded += 1;
980 }
981 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
982 output::emit_json_compact(&IngestFileEvent {
983 file: file_str,
984 name: derived_name,
985 status: "skipped",
986 truncated: *name_truncated,
987 original_name: original_name.clone(),
988 error: Some(format!("{e}")),
989 memory_id: None,
990 action: Some("duplicate".to_string()),
991 })?;
992 skipped += 1;
993 }
994 Err(e) => {
995 let err_msg = format!("{e}");
996 output::emit_json_compact(&IngestFileEvent {
997 file: file_str,
998 name: derived_name,
999 status: "failed",
1000 truncated: *name_truncated,
1001 original_name: original_name.clone(),
1002 error: Some(err_msg.clone()),
1003 memory_id: None,
1004 action: None,
1005 })?;
1006 failed += 1;
1007 if fail_fast {
1008 output::emit_json_compact(&IngestSummary {
1009 summary: true,
1010 dir: args.dir.display().to_string(),
1011 pattern: args.pattern.clone(),
1012 recursive: args.recursive,
1013 files_total: total,
1014 files_succeeded: succeeded,
1015 files_failed: failed,
1016 files_skipped: skipped,
1017 elapsed_ms: started.elapsed().as_millis() as u64,
1018 })?;
1019 return Err(AppError::Validation(format!(
1020 "ingest aborted on first failure: {err_msg}"
1021 )));
1022 }
1023 }
1024 }
1025 }
1026
1027 producer_handle
1029 .join()
1030 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1031
1032 output::emit_json_compact(&IngestSummary {
1033 summary: true,
1034 dir: args.dir.display().to_string(),
1035 pattern: args.pattern.clone(),
1036 recursive: args.recursive,
1037 files_total: total,
1038 files_succeeded: succeeded,
1039 files_failed: failed,
1040 files_skipped: skipped,
1041 elapsed_ms: started.elapsed().as_millis() as u64,
1042 })?;
1043
1044 Ok(())
1045}
1046
1047fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1053 ensure_db_ready(paths)?;
1054 let conn = open_rw(&paths.db)?;
1055 Ok(conn)
1056}
1057
1058fn collect_files(
1059 dir: &Path,
1060 pattern: &str,
1061 recursive: bool,
1062 out: &mut Vec<PathBuf>,
1063) -> Result<(), AppError> {
1064 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1065 for entry in entries {
1066 let entry = entry.map_err(AppError::Io)?;
1067 let path = entry.path();
1068 let file_type = entry.file_type().map_err(AppError::Io)?;
1069 if file_type.is_file() {
1070 let name = entry.file_name();
1071 let name_str = name.to_string_lossy();
1072 if matches_pattern(&name_str, pattern) {
1073 out.push(path);
1074 }
1075 } else if file_type.is_dir() && recursive {
1076 collect_files(&path, pattern, recursive, out)?;
1077 }
1078 }
1079 Ok(())
1080}
1081
1082fn matches_pattern(name: &str, pattern: &str) -> bool {
1083 if let Some(suffix) = pattern.strip_prefix('*') {
1084 name.ends_with(suffix)
1085 } else if let Some(prefix) = pattern.strip_suffix('*') {
1086 name.starts_with(prefix)
1087 } else {
1088 name == pattern
1089 }
1090}
1091
1092fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1103 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1104 let lowered: String = stem
1105 .nfd()
1106 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1107 .map(|c| {
1108 if c == '_' || c.is_whitespace() {
1109 '-'
1110 } else {
1111 c
1112 }
1113 })
1114 .map(|c| c.to_ascii_lowercase())
1115 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1116 .collect();
1117 let collapsed = collapse_dashes(&lowered);
1118 let trimmed = collapsed.trim_matches('-').to_string();
1119 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1120 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1121 .trim_matches('-')
1122 .to_string();
1123 tracing::debug!(
1124 target: "ingest",
1125 original = %trimmed,
1126 truncated_to = %truncated,
1127 max_len = DERIVED_NAME_MAX_LEN,
1128 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1129 );
1130 (truncated, true, Some(trimmed))
1131 } else {
1132 (trimmed, false, None)
1133 }
1134}
1135
1136fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1149 if !taken.contains(base) {
1150 return Ok(base.to_string());
1151 }
1152 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1153 let candidate = format!("{base}-{suffix}");
1154 if !taken.contains(&candidate) {
1155 tracing::warn!(
1156 target: "ingest",
1157 base = %base,
1158 resolved = %candidate,
1159 suffix,
1160 "memory name collision resolved with numeric suffix"
1161 );
1162 return Ok(candidate);
1163 }
1164 }
1165 Err(AppError::Validation(format!(
1166 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1167 )))
1168}
1169
1170fn collapse_dashes(s: &str) -> String {
1171 let mut out = String::with_capacity(s.len());
1172 let mut prev_dash = false;
1173 for c in s.chars() {
1174 if c == '-' {
1175 if !prev_dash {
1176 out.push('-');
1177 }
1178 prev_dash = true;
1179 } else {
1180 out.push(c);
1181 prev_dash = false;
1182 }
1183 }
1184 out
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189 use super::*;
1190 use std::path::PathBuf;
1191
1192 #[test]
1193 fn matches_pattern_suffix() {
1194 assert!(matches_pattern("foo.md", "*.md"));
1195 assert!(!matches_pattern("foo.txt", "*.md"));
1196 assert!(matches_pattern("foo.md", "*"));
1197 }
1198
1199 #[test]
1200 fn matches_pattern_prefix() {
1201 assert!(matches_pattern("README.md", "README*"));
1202 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1203 }
1204
1205 #[test]
1206 fn matches_pattern_exact() {
1207 assert!(matches_pattern("README.md", "README.md"));
1208 assert!(!matches_pattern("readme.md", "README.md"));
1209 }
1210
1211 #[test]
1212 fn derive_kebab_underscore_to_dash() {
1213 let p = PathBuf::from("/tmp/claude_code_headless.md");
1214 let (name, truncated, original) = derive_kebab_name(&p);
1215 assert_eq!(name, "claude-code-headless");
1216 assert!(!truncated);
1217 assert!(original.is_none());
1218 }
1219
1220 #[test]
1221 fn derive_kebab_uppercase_lowered() {
1222 let p = PathBuf::from("/tmp/README.md");
1223 let (name, truncated, original) = derive_kebab_name(&p);
1224 assert_eq!(name, "readme");
1225 assert!(!truncated);
1226 assert!(original.is_none());
1227 }
1228
1229 #[test]
1230 fn derive_kebab_strips_non_kebab_chars() {
1231 let p = PathBuf::from("/tmp/some@weird#name!.md");
1232 let (name, truncated, original) = derive_kebab_name(&p);
1233 assert_eq!(name, "someweirdname");
1234 assert!(!truncated);
1235 assert!(original.is_none());
1236 }
1237
1238 #[test]
1241 fn derive_kebab_folds_accented_letters_to_ascii() {
1242 let p = PathBuf::from("/tmp/açaí.md");
1243 let (name, _, _) = derive_kebab_name(&p);
1244 assert_eq!(name, "acai", "got '{name}'");
1245 }
1246
1247 #[test]
1248 fn derive_kebab_handles_naive_with_diaeresis() {
1249 let p = PathBuf::from("/tmp/naïve-test.md");
1250 let (name, _, _) = derive_kebab_name(&p);
1251 assert_eq!(name, "naive-test", "got '{name}'");
1252 }
1253
1254 #[test]
1255 fn derive_kebab_drops_emoji_keeps_word() {
1256 let p = PathBuf::from("/tmp/🚀-rocket.md");
1257 let (name, _, _) = derive_kebab_name(&p);
1258 assert_eq!(name, "rocket", "got '{name}'");
1259 }
1260
1261 #[test]
1262 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1263 let p = PathBuf::from("/tmp/açaí🦜.md");
1264 let (name, _, _) = derive_kebab_name(&p);
1265 assert_eq!(name, "acai", "got '{name}'");
1266 }
1267
1268 #[test]
1269 fn derive_kebab_pure_emoji_yields_empty() {
1270 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1271 let (name, _, _) = derive_kebab_name(&p);
1272 assert!(name.is_empty(), "got '{name}'");
1273 }
1274
1275 #[test]
1276 fn derive_kebab_collapses_consecutive_dashes() {
1277 let p = PathBuf::from("/tmp/a__b___c.md");
1278 let (name, truncated, original) = derive_kebab_name(&p);
1279 assert_eq!(name, "a-b-c");
1280 assert!(!truncated);
1281 assert!(original.is_none());
1282 }
1283
1284 #[test]
1285 fn derive_kebab_truncates_to_60_chars() {
1286 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1287 let (name, truncated, original) = derive_kebab_name(&p);
1288 assert!(name.len() <= 60, "got len {}", name.len());
1289 assert!(truncated);
1290 assert!(original.is_some());
1291 assert!(original.unwrap().len() > 60);
1292 }
1293
1294 #[test]
1295 fn collect_files_finds_md_files() {
1296 let tmp = tempfile::tempdir().expect("tempdir");
1297 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1298 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1299 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1300 let mut out = Vec::new();
1301 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1302 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1303 }
1304
1305 #[test]
1306 fn collect_files_recursive_descends_subdirs() {
1307 let tmp = tempfile::tempdir().expect("tempdir");
1308 let sub = tmp.path().join("sub");
1309 std::fs::create_dir(&sub).unwrap();
1310 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1311 std::fs::write(sub.join("b.md"), "y").unwrap();
1312 let mut out = Vec::new();
1313 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1314 assert_eq!(out.len(), 2);
1315 }
1316
1317 #[test]
1318 fn collect_files_non_recursive_skips_subdirs() {
1319 let tmp = tempfile::tempdir().expect("tempdir");
1320 let sub = tmp.path().join("sub");
1321 std::fs::create_dir(&sub).unwrap();
1322 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1323 std::fs::write(sub.join("b.md"), "y").unwrap();
1324 let mut out = Vec::new();
1325 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1326 assert_eq!(out.len(), 1);
1327 }
1328
1329 #[test]
1332 fn derive_kebab_long_basename_truncated_within_cap() {
1333 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1334 let (name, truncated, original) = derive_kebab_name(&p);
1335 assert!(
1336 name.len() <= DERIVED_NAME_MAX_LEN,
1337 "truncated name must respect cap; got {} chars",
1338 name.len()
1339 );
1340 assert!(!name.is_empty());
1341 assert!(truncated);
1342 assert!(original.is_some());
1343 }
1344
1345 #[test]
1346 fn unique_name_returns_base_when_free() {
1347 let taken: BTreeSet<String> = BTreeSet::new();
1348 let resolved = unique_name("note", &taken).expect("must resolve");
1349 assert_eq!(resolved, "note");
1350 }
1351
1352 #[test]
1353 fn unique_name_appends_first_free_suffix_on_collision() {
1354 let mut taken: BTreeSet<String> = BTreeSet::new();
1355 taken.insert("note".to_string());
1356 taken.insert("note-1".to_string());
1357 let resolved = unique_name("note", &taken).expect("must resolve");
1358 assert_eq!(resolved, "note-2");
1359 }
1360
1361 #[test]
1362 fn unique_name_errors_after_collision_cap() {
1363 let mut taken: BTreeSet<String> = BTreeSet::new();
1364 taken.insert("note".to_string());
1365 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1366 taken.insert(format!("note-{i}"));
1367 }
1368 let err = unique_name("note", &taken).expect_err("must surface error");
1369 assert!(matches!(err, AppError::Validation(_)));
1370 }
1371
1372 #[test]
1375 fn validate_relation_format_accepts_valid_relations() {
1376 use crate::parsers::{is_canonical_relation, validate_relation_format};
1377 assert!(validate_relation_format("applies_to").is_ok());
1378 assert!(validate_relation_format("depends_on").is_ok());
1379 assert!(validate_relation_format("implements").is_ok());
1380 assert!(validate_relation_format("").is_err());
1381 assert!(is_canonical_relation("applies_to"));
1382 assert!(!is_canonical_relation("implements"));
1383 }
1384
1385 use serial_test::serial;
1388
1389 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1391 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1392 let prev = std::env::var(key).ok();
1393 match value {
1394 Some(v) => std::env::set_var(key, v),
1395 None => std::env::remove_var(key),
1396 }
1397 f();
1398 match prev {
1399 Some(p) => std::env::set_var(key, p),
1400 None => std::env::remove_var(key),
1401 }
1402 }
1403
1404 #[test]
1405 #[serial]
1406 fn env_low_memory_enabled_unset_returns_false() {
1407 with_env_var(None, || assert!(!env_low_memory_enabled()));
1408 }
1409
1410 #[test]
1411 #[serial]
1412 fn env_low_memory_enabled_empty_returns_false() {
1413 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1414 }
1415
1416 #[test]
1417 #[serial]
1418 fn env_low_memory_enabled_truthy_values_return_true() {
1419 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1420 with_env_var(Some(v), || {
1421 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1422 });
1423 }
1424 }
1425
1426 #[test]
1427 #[serial]
1428 fn env_low_memory_enabled_falsy_values_return_false() {
1429 for v in ["0", "false", "FALSE", "no", "off"] {
1430 with_env_var(Some(v), || {
1431 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1432 });
1433 }
1434 }
1435
1436 #[test]
1437 #[serial]
1438 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1439 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1440 }
1441
1442 #[test]
1443 #[serial]
1444 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1445 with_env_var(None, || {
1446 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1447 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1448 assert_eq!(resolve_parallelism(true, None), 1);
1449 });
1450 }
1451
1452 #[test]
1453 #[serial]
1454 fn resolve_parallelism_env_forces_one_when_flag_off() {
1455 with_env_var(Some("1"), || {
1456 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1457 assert_eq!(resolve_parallelism(false, None), 1);
1458 });
1459 }
1460
1461 #[test]
1462 #[serial]
1463 fn resolve_parallelism_falsy_env_does_not_override() {
1464 with_env_var(Some("0"), || {
1465 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1466 });
1467 }
1468
1469 #[test]
1470 #[serial]
1471 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1472 with_env_var(None, || {
1473 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1474 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1475 });
1476 }
1477
1478 #[test]
1479 #[serial]
1480 fn resolve_parallelism_default_when_unset() {
1481 with_env_var(None, || {
1482 let p = resolve_parallelism(false, None);
1483 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1484 });
1485 }
1486
1487 #[test]
1488 fn ingest_args_parses_low_memory_flag_via_clap() {
1489 use clap::Parser;
1490 let cli = crate::cli::Cli::try_parse_from([
1493 "sqlite-graphrag",
1494 "ingest",
1495 "/tmp/dummy",
1496 "--type",
1497 "document",
1498 "--low-memory",
1499 ])
1500 .expect("parse must succeed");
1501 match cli.command {
1502 crate::cli::Commands::Ingest(args) => {
1503 assert!(args.low_memory, "--low-memory must set field to true");
1504 }
1505 _ => panic!("expected Ingest subcommand"),
1506 }
1507 }
1508
1509 #[test]
1510 fn ingest_args_low_memory_defaults_false() {
1511 use clap::Parser;
1512 let cli = crate::cli::Cli::try_parse_from([
1513 "sqlite-graphrag",
1514 "ingest",
1515 "/tmp/dummy",
1516 "--type",
1517 "document",
1518 ])
1519 .expect("parse must succeed");
1520 match cli.command {
1521 crate::cli::Commands::Ingest(args) => {
1522 assert!(!args.low_memory, "default must be false");
1523 }
1524 _ => panic!("expected Ingest subcommand"),
1525 }
1526 }
1527}