1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70NOTES:\n \
71 Each file becomes a separate memory. Names derive from file basenames\n \
72 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
73 followed by a final summary line with counts. Per-file errors are reported\n \
74 inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76 #[arg(
78 value_name = "DIR",
79 help = "Directory to ingest recursively (each matching file becomes a memory)"
80 )]
81 pub dir: PathBuf,
82
83 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85 pub r#type: MemoryType,
86
87 #[arg(long, default_value = "*.md")]
90 pub pattern: String,
91
92 #[arg(long, default_value_t = false)]
94 pub recursive: bool,
95
96 #[arg(
97 long,
98 env = "SQLITE_GRAPHRAG_ENABLE_NER",
99 value_parser = crate::parsers::parse_bool_flexible,
100 action = clap::ArgAction::Set,
101 num_args = 0..=1,
102 default_missing_value = "true",
103 default_value = "false",
104 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105 )]
106 pub enable_ner: bool,
107 #[arg(
108 long,
109 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110 default_value = "fp32",
111 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112 )]
113 pub gliner_variant: String,
114
115 #[arg(long, default_value_t = false, hide = true)]
117 pub skip_extraction: bool,
118
119 #[arg(long, default_value_t = false)]
121 pub fail_fast: bool,
122
123 #[arg(long, default_value_t = false)]
125 pub dry_run: bool,
126
127 #[arg(long, default_value_t = 10_000)]
129 pub max_files: usize,
130
131 #[arg(long)]
133 pub namespace: Option<String>,
134
135 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137 pub db: Option<String>,
138
139 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140 pub format: JsonOutputFormat,
141
142 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143 pub json: bool,
144
145 #[arg(
147 long,
148 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149 )]
150 pub ingest_parallelism: Option<usize>,
151
152 #[arg(
160 long,
161 default_value_t = false,
162 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163 Recommended for environments with <4 GB available RAM or container/cgroup \
164 constraints. Trade-off: 3-4x longer wall time. Also honored via \
165 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166 )]
167 pub low_memory: bool,
168
169 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172 pub max_rss_mb: u64,
173
174 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
179 help = "Maximum length for derived memory names (default: 60)")]
180 pub max_name_length: usize,
181}
182
183fn env_low_memory_enabled() -> bool {
188 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
189 Ok(v) if v.is_empty() => false,
190 Ok(v) => match v.to_lowercase().as_str() {
191 "1" | "true" | "yes" | "on" => true,
192 "0" | "false" | "no" | "off" => false,
193 other => {
194 tracing::warn!(
195 target: "ingest",
196 value = %other,
197 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
198 );
199 false
200 }
201 },
202 Err(_) => false,
203 }
204}
205
206fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
218 let env_flag = env_low_memory_enabled();
219 let low_memory = low_memory_flag || env_flag;
220
221 if low_memory {
222 if let Some(n) = ingest_parallelism {
223 if n > 1 {
224 tracing::warn!(
225 target: "ingest",
226 requested = n,
227 "--ingest-parallelism overridden by --low-memory; using 1"
228 );
229 }
230 }
231 if low_memory_flag {
232 tracing::info!(
233 target: "ingest",
234 source = "flag",
235 "low-memory mode enabled: forcing --ingest-parallelism 1"
236 );
237 } else {
238 tracing::info!(
239 target: "ingest",
240 source = "env",
241 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
242 );
243 }
244 return 1;
245 }
246
247 ingest_parallelism
248 .unwrap_or_else(|| {
249 std::thread::available_parallelism()
250 .map(|v| v.get() / 2)
251 .unwrap_or(1)
252 .clamp(1, 4)
253 })
254 .max(1)
255}
256
257#[derive(Serialize)]
258struct IngestFileEvent<'a> {
259 file: &'a str,
260 name: &'a str,
261 status: &'a str,
262 truncated: bool,
264 #[serde(skip_serializing_if = "Option::is_none")]
266 original_name: Option<String>,
267 #[serde(skip_serializing_if = "Option::is_none")]
269 original_filename: Option<&'a str>,
270 #[serde(skip_serializing_if = "Option::is_none")]
271 error: Option<String>,
272 #[serde(skip_serializing_if = "Option::is_none")]
273 memory_id: Option<i64>,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 action: Option<String>,
276 body_length: usize,
278}
279
280#[derive(Serialize)]
281struct IngestSummary {
282 summary: bool,
283 dir: String,
284 pattern: String,
285 recursive: bool,
286 files_total: usize,
287 files_succeeded: usize,
288 files_failed: usize,
289 files_skipped: usize,
290 elapsed_ms: u64,
291}
292
293struct FileSuccess {
295 memory_id: i64,
296 action: String,
297 body_length: usize,
298}
299
300#[derive(Serialize)]
303struct StageProgressEvent<'a> {
304 schema_version: u8,
305 event: &'a str,
306 path: &'a str,
307 ms: u64,
308 entities: usize,
309 relationships: usize,
310}
311
312struct StagedFile {
315 body: String,
316 body_hash: String,
317 snippet: String,
318 name: String,
319 description: String,
320 embedding: Vec<f32>,
321 chunk_embeddings: Option<Vec<Vec<f32>>>,
322 chunks_info: Vec<crate::chunking::Chunk>,
323 entities: Vec<NewEntity>,
324 relationships: Vec<NewRelationship>,
325 entity_embeddings: Vec<Vec<f32>>,
326 urls: Vec<crate::extraction::ExtractedUrl>,
327}
328
329fn stage_file(
332 _idx: usize,
333 path: &Path,
334 name: &str,
335 paths: &AppPaths,
336 enable_ner: bool,
337 gliner_variant: crate::extraction::GlinerVariant,
338 max_rss_mb: u64,
339) -> Result<StagedFile, AppError> {
340 use crate::constants::*;
341
342 if name.len() > MAX_MEMORY_NAME_LEN {
343 return Err(AppError::LimitExceeded(
344 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
345 ));
346 }
347 if name.starts_with("__") {
348 return Err(AppError::Validation(
349 crate::i18n::validation::reserved_name(),
350 ));
351 }
352 {
353 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
354 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
355 if !slug_re.is_match(name) {
356 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
357 name,
358 )));
359 }
360 }
361
362 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
363 if raw_body.len() > MAX_MEMORY_BODY_LEN {
364 return Err(AppError::LimitExceeded(
365 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
366 ));
367 }
368 if raw_body.trim().is_empty() {
369 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
370 }
371
372 let description = format!("ingested from {}", path.display());
373 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
374 return Err(AppError::Validation(
375 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
376 ));
377 }
378
379 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
380 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
381 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
382 if enable_ner {
383 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
384 Ok(extracted) => {
385 extracted_urls = extracted.urls;
386 extracted_entities = extracted.entities;
387 extracted_relationships = extracted.relationships;
388
389 if extracted_entities.len() > max_entities_per_memory() {
390 extracted_entities.truncate(max_entities_per_memory());
391 }
392 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
393 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
394 }
395 }
396 Err(e) => {
397 tracing::warn!(
398 file = %path.display(),
399 "auto-extraction failed (graceful degradation): {e:#}"
400 );
401 }
402 }
403 }
404
405 for rel in &mut extracted_relationships {
406 rel.relation = crate::parsers::normalize_relation(&rel.relation);
407 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
408 return Err(AppError::Validation(format!(
409 "{e} for relationship '{}' -> '{}'",
410 rel.source, rel.target
411 )));
412 }
413 crate::parsers::warn_if_non_canonical(&rel.relation);
414 if !(0.0..=1.0).contains(&rel.strength) {
415 return Err(AppError::Validation(format!(
416 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
417 rel.strength, rel.source, rel.target
418 )));
419 }
420 }
421
422 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
423 let snippet: String = raw_body.chars().take(200).collect();
424
425 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
426 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
427 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
428 return Err(AppError::LimitExceeded(format!(
429 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
430 chunks_info.len(),
431 REMEMBER_MAX_SAFE_MULTI_CHUNKS
432 )));
433 }
434
435 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
436 let embedding = if chunks_info.len() == 1 {
437 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
438 } else {
439 let chunk_texts: Vec<&str> = chunks_info
440 .iter()
441 .map(|c| chunking::chunk_text(&raw_body, c))
442 .collect();
443 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
444 for chunk_text in &chunk_texts {
445 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
446 if rss > max_rss_mb {
447 tracing::error!(
448 rss_mb = rss,
449 max_rss_mb = max_rss_mb,
450 file = %path.display(),
451 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
452 );
453 return Err(AppError::LowMemory {
454 available_mb: crate::memory_guard::available_memory_mb(),
455 required_mb: max_rss_mb,
456 });
457 }
458 }
459 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
460 &paths.models,
461 chunk_text,
462 )?);
463 }
464 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
465 chunk_embeddings_opt = Some(chunk_embeddings);
466 aggregated
467 };
468
469 let entity_embeddings = extracted_entities
470 .iter()
471 .map(|entity| {
472 let entity_text = match &entity.description {
473 Some(desc) => format!("{} {}", entity.name, desc),
474 None => entity.name.clone(),
475 };
476 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
477 })
478 .collect::<Result<Vec<_>, _>>()?;
479
480 Ok(StagedFile {
481 body: raw_body,
482 body_hash,
483 snippet,
484 name: name.to_string(),
485 description,
486 embedding,
487 chunk_embeddings: chunk_embeddings_opt,
488 chunks_info,
489 entities: extracted_entities,
490 relationships: extracted_relationships,
491 entity_embeddings,
492 urls: extracted_urls,
493 })
494}
495
496fn persist_staged(
498 conn: &mut Connection,
499 namespace: &str,
500 memory_type: &str,
501 staged: StagedFile,
502) -> Result<FileSuccess, AppError> {
503 {
504 let active_count: u32 = conn.query_row(
505 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
506 [],
507 |r| r.get::<_, i64>(0).map(|v| v as u32),
508 )?;
509 let ns_exists: bool = conn.query_row(
510 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
511 rusqlite::params![namespace],
512 |r| r.get::<_, i64>(0).map(|v| v > 0),
513 )?;
514 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
515 return Err(AppError::NamespaceError(format!(
516 "active namespace limit of {} exceeded while creating '{namespace}'",
517 crate::constants::MAX_NAMESPACES_ACTIVE
518 )));
519 }
520 }
521
522 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
523 if existing_memory.is_some() {
524 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
525 &staged.name,
526 namespace,
527 )));
528 }
529 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
530
531 let new_memory = NewMemory {
532 namespace: namespace.to_string(),
533 name: staged.name.clone(),
534 memory_type: memory_type.to_string(),
535 description: staged.description.clone(),
536 body: staged.body,
537 body_hash: staged.body_hash,
538 session_id: None,
539 source: "agent".to_string(),
540 metadata: serde_json::json!({}),
541 };
542
543 if let Some(hash_id) = duplicate_hash_id {
544 tracing::debug!(
545 target: "ingest",
546 duplicate_memory_id = hash_id,
547 "identical body already exists; persisting a new memory anyway"
548 );
549 }
550
551 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
552
553 let memory_id = memories::insert(&tx, &new_memory)?;
554 versions::insert_version(
555 &tx,
556 memory_id,
557 1,
558 &staged.name,
559 memory_type,
560 &staged.description,
561 &new_memory.body,
562 &serde_json::to_string(&new_memory.metadata)?,
563 None,
564 "create",
565 )?;
566 memories::upsert_vec(
567 &tx,
568 memory_id,
569 namespace,
570 memory_type,
571 &staged.embedding,
572 &staged.name,
573 &staged.snippet,
574 )?;
575
576 if staged.chunks_info.len() > 1 {
577 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
578 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
579 AppError::Internal(anyhow::anyhow!(
580 "missing chunk embeddings cache on multi-chunk ingest path"
581 ))
582 })?;
583 for (i, emb) in chunk_embeddings.iter().enumerate() {
584 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
585 }
586 }
587
588 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
589 for (idx, entity) in staged.entities.iter().enumerate() {
590 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
591 let entity_embedding = &staged.entity_embeddings[idx];
592 entities::upsert_entity_vec(
593 &tx,
594 entity_id,
595 namespace,
596 entity.entity_type,
597 entity_embedding,
598 &entity.name,
599 )?;
600 entities::link_memory_entity(&tx, memory_id, entity_id)?;
601 entities::increment_degree(&tx, entity_id)?;
602 }
603 let entity_types: std::collections::HashMap<&str, EntityType> = staged
604 .entities
605 .iter()
606 .map(|entity| (entity.name.as_str(), entity.entity_type))
607 .collect();
608 for rel in &staged.relationships {
609 let source_entity = NewEntity {
610 name: rel.source.clone(),
611 entity_type: entity_types
612 .get(rel.source.as_str())
613 .copied()
614 .unwrap_or(EntityType::Concept),
615 description: None,
616 };
617 let target_entity = NewEntity {
618 name: rel.target.clone(),
619 entity_type: entity_types
620 .get(rel.target.as_str())
621 .copied()
622 .unwrap_or(EntityType::Concept),
623 description: None,
624 };
625 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
626 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
627 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
628 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
629 }
630 }
631
632 tx.commit()?;
633
634 if !staged.urls.is_empty() {
635 let url_entries: Vec<storage_urls::MemoryUrl> = staged
636 .urls
637 .into_iter()
638 .map(|u| storage_urls::MemoryUrl {
639 url: u.url,
640 offset: Some(u.offset as i64),
641 })
642 .collect();
643 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
644 }
645
646 Ok(FileSuccess {
647 memory_id,
648 action: "created".to_string(),
649 body_length: new_memory.body.len(),
650 })
651}
652
653pub fn run(args: IngestArgs) -> Result<(), AppError> {
654 let started = std::time::Instant::now();
655
656 if !args.dir.exists() {
657 return Err(AppError::Validation(format!(
658 "directory not found: {}",
659 args.dir.display()
660 )));
661 }
662 if !args.dir.is_dir() {
663 return Err(AppError::Validation(format!(
664 "path is not a directory: {}",
665 args.dir.display()
666 )));
667 }
668
669 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
670 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
671 files.sort();
672
673 if files.len() > args.max_files {
674 return Err(AppError::Validation(format!(
675 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
676 files.len(),
677 args.max_files
678 )));
679 }
680
681 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
682 let memory_type_str = args.r#type.as_str().to_string();
683
684 let paths = AppPaths::resolve(args.db.as_deref())?;
685 let mut conn_or_err = match init_storage(&paths) {
686 Ok(c) => Ok(c),
687 Err(e) => Err(format!("{e}")),
688 };
689
690 let mut succeeded: usize = 0;
691 let mut failed: usize = 0;
692 let mut skipped: usize = 0;
693 let total = files.len();
694
695 let mut taken_names: BTreeSet<String> = BTreeSet::new();
698
699 enum SlotMeta {
705 Skip {
706 file_str: String,
707 derived_base: String,
708 name_truncated: bool,
709 original_name: Option<String>,
710 original_filename: Option<String>,
711 reason: String,
712 },
713 Process {
714 file_str: String,
715 derived_name: String,
716 name_truncated: bool,
717 original_name: Option<String>,
718 original_filename: Option<String>,
719 },
720 }
721
722 struct ProcessItem {
723 idx: usize,
724 path: PathBuf,
725 file_str: String,
726 derived_name: String,
727 }
728
729 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
730 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
731 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
732
733 let max_name_length = args.max_name_length;
734 for path in &files {
735 let file_str = path.to_string_lossy().into_owned();
736 let (derived_base, name_truncated, original_name) =
737 derive_kebab_name(path, max_name_length);
738 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
739
740 if name_truncated {
741 if let Some(ref orig) = original_name {
742 truncations.push((orig.clone(), derived_base.clone()));
743 }
744 }
745
746 if derived_base.is_empty() {
747 let orig_filename = if !original_basename.is_empty() {
749 Some(original_basename.to_string())
750 } else {
751 None
752 };
753 slots_meta.push(SlotMeta::Skip {
754 file_str,
755 derived_base: String::new(),
756 name_truncated: false,
757 original_name: None,
758 original_filename: orig_filename,
759 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
760 });
761 continue;
762 }
763
764 match unique_name(&derived_base, &taken_names) {
765 Ok(derived_name) => {
766 taken_names.insert(derived_name.clone());
767 let idx = slots_meta.len();
768 let orig_filename = if original_basename != derived_name {
770 Some(original_basename.to_string())
771 } else {
772 None
773 };
774 process_items.push(ProcessItem {
775 idx,
776 path: path.clone(),
777 file_str: file_str.clone(),
778 derived_name: derived_name.clone(),
779 });
780 slots_meta.push(SlotMeta::Process {
781 file_str,
782 derived_name,
783 name_truncated,
784 original_name,
785 original_filename: orig_filename,
786 });
787 }
788 Err(e) => {
789 let orig_filename = if original_basename != derived_base {
790 Some(original_basename.to_string())
791 } else {
792 None
793 };
794 slots_meta.push(SlotMeta::Skip {
795 file_str,
796 derived_base,
797 name_truncated,
798 original_name,
799 original_filename: orig_filename,
800 reason: e.to_string(),
801 });
802 }
803 }
804 }
805
806 if !truncations.is_empty() {
807 tracing::info!(
808 target: "ingest",
809 count = truncations.len(),
810 max_name_length = max_name_length,
811 max_len = DERIVED_NAME_MAX_LEN,
812 "derived names truncated; pass -vv (debug) for per-file detail"
813 );
814 }
815
816 if args.dry_run {
818 for meta in &slots_meta {
819 match meta {
820 SlotMeta::Skip {
821 file_str,
822 derived_base,
823 name_truncated,
824 original_name,
825 original_filename,
826 reason,
827 } => {
828 output::emit_json_compact(&IngestFileEvent {
829 file: file_str,
830 name: derived_base,
831 status: "skip",
832 truncated: *name_truncated,
833 original_name: original_name.clone(),
834 original_filename: original_filename.as_deref(),
835 error: Some(reason.clone()),
836 memory_id: None,
837 action: None,
838 body_length: 0,
839 })?;
840 }
841 SlotMeta::Process {
842 file_str,
843 derived_name,
844 name_truncated,
845 original_name,
846 original_filename,
847 } => {
848 output::emit_json_compact(&IngestFileEvent {
849 file: file_str,
850 name: derived_name,
851 status: "preview",
852 truncated: *name_truncated,
853 original_name: original_name.clone(),
854 original_filename: original_filename.as_deref(),
855 error: None,
856 memory_id: None,
857 action: None,
858 body_length: 0,
859 })?;
860 }
861 }
862 }
863 output::emit_json_compact(&IngestSummary {
864 summary: true,
865 dir: args.dir.to_string_lossy().into_owned(),
866 pattern: args.pattern.clone(),
867 recursive: args.recursive,
868 files_total: total,
869 files_succeeded: 0,
870 files_failed: 0,
871 files_skipped: 0,
872 elapsed_ms: started.elapsed().as_millis() as u64,
873 })?;
874 return Ok(());
875 }
876
877 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
880
881 let pool = rayon::ThreadPoolBuilder::new()
882 .num_threads(parallelism)
883 .build()
884 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
885
886 if args.enable_ner && args.skip_extraction {
887 tracing::warn!(
888 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
889 );
890 }
891 if args.skip_extraction && !args.enable_ner {
892 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
893 }
894 let enable_ner = args.enable_ner;
895 let max_rss_mb = args.max_rss_mb;
896 let gliner_variant: crate::extraction::GlinerVariant =
897 args.gliner_variant.parse().unwrap_or_else(|e| {
898 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
899 crate::extraction::GlinerVariant::Fp32
900 });
901
902 let total_to_process = process_items.len();
903 tracing::info!(
904 target = "ingest",
905 phase = "pipeline_start",
906 files = total_to_process,
907 ingest_parallelism = parallelism,
908 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
909 );
910
911 let channel_bound = (parallelism * 2).max(1);
915 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
916
917 let paths_owned = paths.clone();
922 let producer_handle = std::thread::spawn(move || {
923 pool.install(|| {
924 process_items.into_par_iter().for_each(|item| {
925 let t0 = std::time::Instant::now();
926 let result = stage_file(
927 item.idx,
928 &item.path,
929 &item.derived_name,
930 &paths_owned,
931 enable_ner,
932 gliner_variant,
933 max_rss_mb,
934 );
935 let elapsed_ms = t0.elapsed().as_millis() as u64;
936
937 let (n_entities, n_relationships) = match &result {
940 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
941 Err(_) => (0, 0),
942 };
943 let progress = StageProgressEvent {
944 schema_version: 1,
945 event: "file_extracted",
946 path: &item.file_str,
947 ms: elapsed_ms,
948 entities: n_entities,
949 relationships: n_relationships,
950 };
951 if let Ok(line) = serde_json::to_string(&progress) {
952 eprintln!("{line}");
953 }
954
955 let _ = tx.send((item.idx, result));
959 });
960 drop(tx);
962 });
963 });
964
965 let fail_fast = args.fail_fast;
977
978 for meta in &slots_meta {
980 if let SlotMeta::Skip {
981 file_str,
982 derived_base,
983 name_truncated,
984 original_name,
985 original_filename,
986 reason,
987 } = meta
988 {
989 output::emit_json_compact(&IngestFileEvent {
990 file: file_str,
991 name: derived_base,
992 status: "skipped",
993 truncated: *name_truncated,
994 original_name: original_name.clone(),
995 original_filename: original_filename.as_deref(),
996 error: Some(reason.clone()),
997 memory_id: None,
998 action: None,
999 body_length: 0,
1000 })?;
1001 skipped += 1;
1002 }
1003 }
1004
1005 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1008 .iter()
1009 .enumerate()
1010 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1011 .collect();
1012
1013 tracing::info!(
1014 target = "ingest",
1015 phase = "persist_start",
1016 files = total_to_process,
1017 "phase B starting: persisting files incrementally as Phase A completes each one",
1018 );
1019
1020 for (idx, stage_result) in rx {
1024 let meta = meta_index.get(&idx).ok_or_else(|| {
1025 AppError::Internal(anyhow::anyhow!(
1026 "channel idx {idx} has no corresponding Process slot"
1027 ))
1028 })?;
1029 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1030 {
1031 SlotMeta::Process {
1032 file_str,
1033 derived_name,
1034 name_truncated,
1035 original_name,
1036 original_filename,
1037 } => (
1038 file_str,
1039 derived_name,
1040 name_truncated,
1041 original_name,
1042 original_filename,
1043 ),
1044 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1045 };
1046
1047 let conn = match conn_or_err.as_mut() {
1049 Ok(c) => c,
1050 Err(err_msg) => {
1051 let err_clone = err_msg.clone();
1052 output::emit_json_compact(&IngestFileEvent {
1053 file: file_str,
1054 name: derived_name,
1055 status: "failed",
1056 truncated: *name_truncated,
1057 original_name: original_name.clone(),
1058 original_filename: original_filename.as_deref(),
1059 error: Some(err_clone.clone()),
1060 memory_id: None,
1061 action: None,
1062 body_length: 0,
1063 })?;
1064 failed += 1;
1065 if fail_fast {
1066 output::emit_json_compact(&IngestSummary {
1067 summary: true,
1068 dir: args.dir.display().to_string(),
1069 pattern: args.pattern.clone(),
1070 recursive: args.recursive,
1071 files_total: total,
1072 files_succeeded: succeeded,
1073 files_failed: failed,
1074 files_skipped: skipped,
1075 elapsed_ms: started.elapsed().as_millis() as u64,
1076 })?;
1077 return Err(AppError::Validation(format!(
1078 "ingest aborted on first failure: {err_clone}"
1079 )));
1080 }
1081 continue;
1082 }
1083 };
1084
1085 let outcome =
1086 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1087
1088 match outcome {
1089 Ok(FileSuccess {
1090 memory_id,
1091 action,
1092 body_length,
1093 }) => {
1094 output::emit_json_compact(&IngestFileEvent {
1095 file: file_str,
1096 name: derived_name,
1097 status: "indexed",
1098 truncated: *name_truncated,
1099 original_name: original_name.clone(),
1100 original_filename: original_filename.as_deref(),
1101 error: None,
1102 memory_id: Some(memory_id),
1103 action: Some(action),
1104 body_length,
1105 })?;
1106 succeeded += 1;
1107 }
1108 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1109 output::emit_json_compact(&IngestFileEvent {
1110 file: file_str,
1111 name: derived_name,
1112 status: "skipped",
1113 truncated: *name_truncated,
1114 original_name: original_name.clone(),
1115 original_filename: original_filename.as_deref(),
1116 error: Some(format!("{e}")),
1117 memory_id: None,
1118 action: Some("duplicate".to_string()),
1119 body_length: 0,
1120 })?;
1121 skipped += 1;
1122 }
1123 Err(e) => {
1124 let err_msg = format!("{e}");
1125 output::emit_json_compact(&IngestFileEvent {
1126 file: file_str,
1127 name: derived_name,
1128 status: "failed",
1129 truncated: *name_truncated,
1130 original_name: original_name.clone(),
1131 original_filename: original_filename.as_deref(),
1132 error: Some(err_msg.clone()),
1133 memory_id: None,
1134 action: None,
1135 body_length: 0,
1136 })?;
1137 failed += 1;
1138 if fail_fast {
1139 output::emit_json_compact(&IngestSummary {
1140 summary: true,
1141 dir: args.dir.display().to_string(),
1142 pattern: args.pattern.clone(),
1143 recursive: args.recursive,
1144 files_total: total,
1145 files_succeeded: succeeded,
1146 files_failed: failed,
1147 files_skipped: skipped,
1148 elapsed_ms: started.elapsed().as_millis() as u64,
1149 })?;
1150 return Err(AppError::Validation(format!(
1151 "ingest aborted on first failure: {err_msg}"
1152 )));
1153 }
1154 }
1155 }
1156 }
1157
1158 producer_handle
1160 .join()
1161 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1162
1163 if let Ok(ref conn) = conn_or_err {
1164 if succeeded > 0 {
1165 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1166 }
1167 }
1168
1169 output::emit_json_compact(&IngestSummary {
1170 summary: true,
1171 dir: args.dir.display().to_string(),
1172 pattern: args.pattern.clone(),
1173 recursive: args.recursive,
1174 files_total: total,
1175 files_succeeded: succeeded,
1176 files_failed: failed,
1177 files_skipped: skipped,
1178 elapsed_ms: started.elapsed().as_millis() as u64,
1179 })?;
1180
1181 Ok(())
1182}
1183
1184fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1190 ensure_db_ready(paths)?;
1191 let conn = open_rw(&paths.db)?;
1192 Ok(conn)
1193}
1194
1195fn collect_files(
1196 dir: &Path,
1197 pattern: &str,
1198 recursive: bool,
1199 out: &mut Vec<PathBuf>,
1200) -> Result<(), AppError> {
1201 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1202 for entry in entries {
1203 let entry = entry.map_err(AppError::Io)?;
1204 let path = entry.path();
1205 let file_type = entry.file_type().map_err(AppError::Io)?;
1206 if file_type.is_file() {
1207 let name = entry.file_name();
1208 let name_str = name.to_string_lossy();
1209 if matches_pattern(&name_str, pattern) {
1210 out.push(path);
1211 }
1212 } else if file_type.is_dir() && recursive {
1213 collect_files(&path, pattern, recursive, out)?;
1214 }
1215 }
1216 Ok(())
1217}
1218
1219fn matches_pattern(name: &str, pattern: &str) -> bool {
1220 if let Some(suffix) = pattern.strip_prefix('*') {
1221 name.ends_with(suffix)
1222 } else if let Some(prefix) = pattern.strip_suffix('*') {
1223 name.starts_with(prefix)
1224 } else {
1225 name == pattern
1226 }
1227}
1228
1229fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1240 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1241 let lowered: String = stem
1242 .nfd()
1243 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1244 .map(|c| {
1245 if c == '_' || c.is_whitespace() {
1246 '-'
1247 } else {
1248 c
1249 }
1250 })
1251 .map(|c| c.to_ascii_lowercase())
1252 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1253 .collect();
1254 let collapsed = collapse_dashes(&lowered);
1255 let trimmed_raw = collapsed.trim_matches('-').to_string();
1256 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1258 format!("doc-{trimmed_raw}")
1259 } else {
1260 trimmed_raw
1261 };
1262 if trimmed.len() > max_len {
1263 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1264 tracing::debug!(
1265 target: "ingest",
1266 original = %trimmed,
1267 truncated_to = %truncated,
1268 max_len = max_len,
1269 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1270 );
1271 (truncated, true, Some(trimmed))
1272 } else {
1273 (trimmed, false, None)
1274 }
1275}
1276
1277fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1290 if !taken.contains(base) {
1291 return Ok(base.to_string());
1292 }
1293 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1294 let candidate = format!("{base}-{suffix}");
1295 if !taken.contains(&candidate) {
1296 tracing::warn!(
1297 target: "ingest",
1298 base = %base,
1299 resolved = %candidate,
1300 suffix,
1301 "memory name collision resolved with numeric suffix"
1302 );
1303 return Ok(candidate);
1304 }
1305 }
1306 Err(AppError::Validation(format!(
1307 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1308 )))
1309}
1310
1311fn collapse_dashes(s: &str) -> String {
1312 let mut out = String::with_capacity(s.len());
1313 let mut prev_dash = false;
1314 for c in s.chars() {
1315 if c == '-' {
1316 if !prev_dash {
1317 out.push('-');
1318 }
1319 prev_dash = true;
1320 } else {
1321 out.push(c);
1322 prev_dash = false;
1323 }
1324 }
1325 out
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330 use super::*;
1331 use std::path::PathBuf;
1332
1333 #[test]
1334 fn matches_pattern_suffix() {
1335 assert!(matches_pattern("foo.md", "*.md"));
1336 assert!(!matches_pattern("foo.txt", "*.md"));
1337 assert!(matches_pattern("foo.md", "*"));
1338 }
1339
1340 #[test]
1341 fn matches_pattern_prefix() {
1342 assert!(matches_pattern("README.md", "README*"));
1343 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1344 }
1345
1346 #[test]
1347 fn matches_pattern_exact() {
1348 assert!(matches_pattern("README.md", "README.md"));
1349 assert!(!matches_pattern("readme.md", "README.md"));
1350 }
1351
1352 #[test]
1353 fn derive_kebab_underscore_to_dash() {
1354 let p = PathBuf::from("/tmp/claude_code_headless.md");
1355 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1356 assert_eq!(name, "claude-code-headless");
1357 assert!(!truncated);
1358 assert!(original.is_none());
1359 }
1360
1361 #[test]
1362 fn derive_kebab_uppercase_lowered() {
1363 let p = PathBuf::from("/tmp/README.md");
1364 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1365 assert_eq!(name, "readme");
1366 assert!(!truncated);
1367 assert!(original.is_none());
1368 }
1369
1370 #[test]
1371 fn derive_kebab_strips_non_kebab_chars() {
1372 let p = PathBuf::from("/tmp/some@weird#name!.md");
1373 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1374 assert_eq!(name, "someweirdname");
1375 assert!(!truncated);
1376 assert!(original.is_none());
1377 }
1378
1379 #[test]
1382 fn derive_kebab_folds_accented_letters_to_ascii() {
1383 let p = PathBuf::from("/tmp/açaí.md");
1384 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1385 assert_eq!(name, "acai", "got '{name}'");
1386 }
1387
1388 #[test]
1389 fn derive_kebab_handles_naive_with_diaeresis() {
1390 let p = PathBuf::from("/tmp/naïve-test.md");
1391 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1392 assert_eq!(name, "naive-test", "got '{name}'");
1393 }
1394
1395 #[test]
1396 fn derive_kebab_drops_emoji_keeps_word() {
1397 let p = PathBuf::from("/tmp/🚀-rocket.md");
1398 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1399 assert_eq!(name, "rocket", "got '{name}'");
1400 }
1401
1402 #[test]
1403 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1404 let p = PathBuf::from("/tmp/açaí🦜.md");
1405 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1406 assert_eq!(name, "acai", "got '{name}'");
1407 }
1408
1409 #[test]
1410 fn derive_kebab_pure_emoji_yields_empty() {
1411 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1412 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1413 assert!(name.is_empty(), "got '{name}'");
1414 }
1415
1416 #[test]
1417 fn derive_kebab_collapses_consecutive_dashes() {
1418 let p = PathBuf::from("/tmp/a__b___c.md");
1419 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1420 assert_eq!(name, "a-b-c");
1421 assert!(!truncated);
1422 assert!(original.is_none());
1423 }
1424
1425 #[test]
1426 fn derive_kebab_truncates_to_60_chars() {
1427 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1428 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1429 assert!(name.len() <= 60, "got len {}", name.len());
1430 assert!(truncated);
1431 assert!(original.is_some());
1432 assert!(original.unwrap().len() > 60);
1433 }
1434
1435 #[test]
1436 fn collect_files_finds_md_files() {
1437 let tmp = tempfile::tempdir().expect("tempdir");
1438 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1439 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1440 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1441 let mut out = Vec::new();
1442 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1443 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1444 }
1445
1446 #[test]
1447 fn collect_files_recursive_descends_subdirs() {
1448 let tmp = tempfile::tempdir().expect("tempdir");
1449 let sub = tmp.path().join("sub");
1450 std::fs::create_dir(&sub).unwrap();
1451 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1452 std::fs::write(sub.join("b.md"), "y").unwrap();
1453 let mut out = Vec::new();
1454 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1455 assert_eq!(out.len(), 2);
1456 }
1457
1458 #[test]
1459 fn collect_files_non_recursive_skips_subdirs() {
1460 let tmp = tempfile::tempdir().expect("tempdir");
1461 let sub = tmp.path().join("sub");
1462 std::fs::create_dir(&sub).unwrap();
1463 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1464 std::fs::write(sub.join("b.md"), "y").unwrap();
1465 let mut out = Vec::new();
1466 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1467 assert_eq!(out.len(), 1);
1468 }
1469
1470 #[test]
1473 fn derive_kebab_long_basename_truncated_within_cap() {
1474 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1475 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1476 assert!(
1477 name.len() <= DERIVED_NAME_MAX_LEN,
1478 "truncated name must respect cap; got {} chars",
1479 name.len()
1480 );
1481 assert!(!name.is_empty());
1482 assert!(truncated);
1483 assert!(original.is_some());
1484 }
1485
1486 #[test]
1487 fn unique_name_returns_base_when_free() {
1488 let taken: BTreeSet<String> = BTreeSet::new();
1489 let resolved = unique_name("note", &taken).expect("must resolve");
1490 assert_eq!(resolved, "note");
1491 }
1492
1493 #[test]
1494 fn unique_name_appends_first_free_suffix_on_collision() {
1495 let mut taken: BTreeSet<String> = BTreeSet::new();
1496 taken.insert("note".to_string());
1497 taken.insert("note-1".to_string());
1498 let resolved = unique_name("note", &taken).expect("must resolve");
1499 assert_eq!(resolved, "note-2");
1500 }
1501
1502 #[test]
1503 fn unique_name_errors_after_collision_cap() {
1504 let mut taken: BTreeSet<String> = BTreeSet::new();
1505 taken.insert("note".to_string());
1506 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1507 taken.insert(format!("note-{i}"));
1508 }
1509 let err = unique_name("note", &taken).expect_err("must surface error");
1510 assert!(matches!(err, AppError::Validation(_)));
1511 }
1512
1513 #[test]
1516 fn validate_relation_format_accepts_valid_relations() {
1517 use crate::parsers::{is_canonical_relation, validate_relation_format};
1518 assert!(validate_relation_format("applies_to").is_ok());
1519 assert!(validate_relation_format("depends_on").is_ok());
1520 assert!(validate_relation_format("implements").is_ok());
1521 assert!(validate_relation_format("").is_err());
1522 assert!(is_canonical_relation("applies_to"));
1523 assert!(!is_canonical_relation("implements"));
1524 }
1525
1526 use serial_test::serial;
1529
1530 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1532 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1533 let prev = std::env::var(key).ok();
1534 match value {
1535 Some(v) => std::env::set_var(key, v),
1536 None => std::env::remove_var(key),
1537 }
1538 f();
1539 match prev {
1540 Some(p) => std::env::set_var(key, p),
1541 None => std::env::remove_var(key),
1542 }
1543 }
1544
1545 #[test]
1546 #[serial]
1547 fn env_low_memory_enabled_unset_returns_false() {
1548 with_env_var(None, || assert!(!env_low_memory_enabled()));
1549 }
1550
1551 #[test]
1552 #[serial]
1553 fn env_low_memory_enabled_empty_returns_false() {
1554 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1555 }
1556
1557 #[test]
1558 #[serial]
1559 fn env_low_memory_enabled_truthy_values_return_true() {
1560 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1561 with_env_var(Some(v), || {
1562 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1563 });
1564 }
1565 }
1566
1567 #[test]
1568 #[serial]
1569 fn env_low_memory_enabled_falsy_values_return_false() {
1570 for v in ["0", "false", "FALSE", "no", "off"] {
1571 with_env_var(Some(v), || {
1572 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1573 });
1574 }
1575 }
1576
1577 #[test]
1578 #[serial]
1579 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1580 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1581 }
1582
1583 #[test]
1584 #[serial]
1585 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1586 with_env_var(None, || {
1587 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1588 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1589 assert_eq!(resolve_parallelism(true, None), 1);
1590 });
1591 }
1592
1593 #[test]
1594 #[serial]
1595 fn resolve_parallelism_env_forces_one_when_flag_off() {
1596 with_env_var(Some("1"), || {
1597 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1598 assert_eq!(resolve_parallelism(false, None), 1);
1599 });
1600 }
1601
1602 #[test]
1603 #[serial]
1604 fn resolve_parallelism_falsy_env_does_not_override() {
1605 with_env_var(Some("0"), || {
1606 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1607 });
1608 }
1609
1610 #[test]
1611 #[serial]
1612 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1613 with_env_var(None, || {
1614 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1615 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1616 });
1617 }
1618
1619 #[test]
1620 #[serial]
1621 fn resolve_parallelism_default_when_unset() {
1622 with_env_var(None, || {
1623 let p = resolve_parallelism(false, None);
1624 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1625 });
1626 }
1627
1628 #[test]
1629 fn ingest_args_parses_low_memory_flag_via_clap() {
1630 use clap::Parser;
1631 let cli = crate::cli::Cli::try_parse_from([
1634 "sqlite-graphrag",
1635 "ingest",
1636 "/tmp/dummy",
1637 "--type",
1638 "document",
1639 "--low-memory",
1640 ])
1641 .expect("parse must succeed");
1642 match cli.command {
1643 crate::cli::Commands::Ingest(args) => {
1644 assert!(args.low_memory, "--low-memory must set field to true");
1645 }
1646 _ => panic!("expected Ingest subcommand"),
1647 }
1648 }
1649
1650 #[test]
1651 fn ingest_args_low_memory_defaults_false() {
1652 use clap::Parser;
1653 let cli = crate::cli::Cli::try_parse_from([
1654 "sqlite-graphrag",
1655 "ingest",
1656 "/tmp/dummy",
1657 "--type",
1658 "document",
1659 ])
1660 .expect("parse must succeed");
1661 match cli.command {
1662 crate::cli::Commands::Ingest(args) => {
1663 assert!(!args.low_memory, "default must be false");
1664 }
1665 _ => panic!("expected Ingest subcommand"),
1666 }
1667 }
1668}