1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70NOTES:\n \
71 Each file becomes a separate memory. Names derive from file basenames\n \
72 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
73 followed by a final summary line with counts. Per-file errors are reported\n \
74 inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76 #[arg(
78 value_name = "DIR",
79 help = "Directory to ingest recursively (each matching file becomes a memory)"
80 )]
81 pub dir: PathBuf,
82
83 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85 pub r#type: MemoryType,
86
87 #[arg(long, default_value = "*.md")]
90 pub pattern: String,
91
92 #[arg(long, default_value_t = false)]
94 pub recursive: bool,
95
96 #[arg(
97 long,
98 env = "SQLITE_GRAPHRAG_ENABLE_NER",
99 value_parser = crate::parsers::parse_bool_flexible,
100 action = clap::ArgAction::Set,
101 num_args = 0..=1,
102 default_missing_value = "true",
103 default_value = "false",
104 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105 )]
106 pub enable_ner: bool,
107 #[arg(
108 long,
109 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110 default_value = "fp32",
111 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112 )]
113 pub gliner_variant: String,
114
115 #[arg(long, default_value_t = false, hide = true)]
117 pub skip_extraction: bool,
118
119 #[arg(long, default_value_t = false)]
121 pub fail_fast: bool,
122
123 #[arg(long, default_value_t = false)]
125 pub dry_run: bool,
126
127 #[arg(long, default_value_t = 10_000)]
129 pub max_files: usize,
130
131 #[arg(long)]
133 pub namespace: Option<String>,
134
135 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137 pub db: Option<String>,
138
139 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140 pub format: JsonOutputFormat,
141
142 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143 pub json: bool,
144
145 #[arg(
147 long,
148 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149 )]
150 pub ingest_parallelism: Option<usize>,
151
152 #[arg(
160 long,
161 default_value_t = false,
162 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163 Recommended for environments with <4 GB available RAM or container/cgroup \
164 constraints. Trade-off: 3-4x longer wall time. Also honored via \
165 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166 )]
167 pub low_memory: bool,
168
169 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172 pub max_rss_mb: u64,
173}
174
175fn env_low_memory_enabled() -> bool {
180 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
181 Ok(v) if v.is_empty() => false,
182 Ok(v) => match v.to_lowercase().as_str() {
183 "1" | "true" | "yes" | "on" => true,
184 "0" | "false" | "no" | "off" => false,
185 other => {
186 tracing::warn!(
187 target: "ingest",
188 value = %other,
189 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
190 );
191 false
192 }
193 },
194 Err(_) => false,
195 }
196}
197
198fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
210 let env_flag = env_low_memory_enabled();
211 let low_memory = low_memory_flag || env_flag;
212
213 if low_memory {
214 if let Some(n) = ingest_parallelism {
215 if n > 1 {
216 tracing::warn!(
217 target: "ingest",
218 requested = n,
219 "--ingest-parallelism overridden by --low-memory; using 1"
220 );
221 }
222 }
223 if low_memory_flag {
224 tracing::info!(
225 target: "ingest",
226 source = "flag",
227 "low-memory mode enabled: forcing --ingest-parallelism 1"
228 );
229 } else {
230 tracing::info!(
231 target: "ingest",
232 source = "env",
233 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
234 );
235 }
236 return 1;
237 }
238
239 ingest_parallelism
240 .unwrap_or_else(|| {
241 std::thread::available_parallelism()
242 .map(|v| v.get() / 2)
243 .unwrap_or(1)
244 .clamp(1, 4)
245 })
246 .max(1)
247}
248
249#[derive(Serialize)]
250struct IngestFileEvent<'a> {
251 file: &'a str,
252 name: &'a str,
253 status: &'a str,
254 truncated: bool,
256 #[serde(skip_serializing_if = "Option::is_none")]
258 original_name: Option<String>,
259 #[serde(skip_serializing_if = "Option::is_none")]
261 original_filename: Option<&'a str>,
262 #[serde(skip_serializing_if = "Option::is_none")]
263 error: Option<String>,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 memory_id: Option<i64>,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 action: Option<String>,
268}
269
270#[derive(Serialize)]
271struct IngestSummary {
272 summary: bool,
273 dir: String,
274 pattern: String,
275 recursive: bool,
276 files_total: usize,
277 files_succeeded: usize,
278 files_failed: usize,
279 files_skipped: usize,
280 elapsed_ms: u64,
281}
282
283struct FileSuccess {
285 memory_id: i64,
286 action: String,
287}
288
289#[derive(Serialize)]
292struct StageProgressEvent<'a> {
293 schema_version: u8,
294 event: &'a str,
295 path: &'a str,
296 ms: u64,
297 entities: usize,
298 relationships: usize,
299}
300
301struct StagedFile {
304 body: String,
305 body_hash: String,
306 snippet: String,
307 name: String,
308 description: String,
309 embedding: Vec<f32>,
310 chunk_embeddings: Option<Vec<Vec<f32>>>,
311 chunks_info: Vec<crate::chunking::Chunk>,
312 entities: Vec<NewEntity>,
313 relationships: Vec<NewRelationship>,
314 entity_embeddings: Vec<Vec<f32>>,
315 urls: Vec<crate::extraction::ExtractedUrl>,
316}
317
318fn stage_file(
321 _idx: usize,
322 path: &Path,
323 name: &str,
324 paths: &AppPaths,
325 enable_ner: bool,
326 gliner_variant: crate::extraction::GlinerVariant,
327 max_rss_mb: u64,
328) -> Result<StagedFile, AppError> {
329 use crate::constants::*;
330
331 if name.len() > MAX_MEMORY_NAME_LEN {
332 return Err(AppError::LimitExceeded(
333 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
334 ));
335 }
336 if name.starts_with("__") {
337 return Err(AppError::Validation(
338 crate::i18n::validation::reserved_name(),
339 ));
340 }
341 {
342 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
343 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
344 if !slug_re.is_match(name) {
345 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
346 name,
347 )));
348 }
349 }
350
351 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
352 if raw_body.len() > MAX_MEMORY_BODY_LEN {
353 return Err(AppError::LimitExceeded(
354 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
355 ));
356 }
357 if raw_body.trim().is_empty() {
358 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
359 }
360
361 let description = format!("ingested from {}", path.display());
362 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
363 return Err(AppError::Validation(
364 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
365 ));
366 }
367
368 let mut extracted_entities: Vec<NewEntity> = Vec::new();
369 let mut extracted_relationships: Vec<NewRelationship> = Vec::new();
370 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::new();
371 if enable_ner {
372 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
373 Ok(extracted) => {
374 extracted_urls = extracted.urls;
375 extracted_entities = extracted.entities;
376 extracted_relationships = extracted.relationships;
377
378 if extracted_entities.len() > max_entities_per_memory() {
379 extracted_entities.truncate(max_entities_per_memory());
380 }
381 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
382 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
383 }
384 }
385 Err(e) => {
386 tracing::warn!(
387 file = %path.display(),
388 "auto-extraction failed (graceful degradation): {e:#}"
389 );
390 }
391 }
392 }
393
394 for rel in &mut extracted_relationships {
395 rel.relation = crate::parsers::normalize_relation(&rel.relation);
396 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
397 return Err(AppError::Validation(format!(
398 "{e} for relationship '{}' -> '{}'",
399 rel.source, rel.target
400 )));
401 }
402 crate::parsers::warn_if_non_canonical(&rel.relation);
403 if !(0.0..=1.0).contains(&rel.strength) {
404 return Err(AppError::Validation(format!(
405 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
406 rel.strength, rel.source, rel.target
407 )));
408 }
409 }
410
411 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
412 let snippet: String = raw_body.chars().take(200).collect();
413
414 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
415 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
416 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
417 return Err(AppError::LimitExceeded(format!(
418 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
419 chunks_info.len(),
420 REMEMBER_MAX_SAFE_MULTI_CHUNKS
421 )));
422 }
423
424 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
425 let embedding = if chunks_info.len() == 1 {
426 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
427 } else {
428 let chunk_texts: Vec<&str> = chunks_info
429 .iter()
430 .map(|c| chunking::chunk_text(&raw_body, c))
431 .collect();
432 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
433 for chunk_text in &chunk_texts {
434 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
435 if rss > max_rss_mb {
436 tracing::error!(
437 rss_mb = rss,
438 max_rss_mb = max_rss_mb,
439 file = %path.display(),
440 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
441 );
442 return Err(AppError::LowMemory {
443 available_mb: crate::memory_guard::available_memory_mb(),
444 required_mb: max_rss_mb,
445 });
446 }
447 }
448 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
449 &paths.models,
450 chunk_text,
451 )?);
452 }
453 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
454 chunk_embeddings_opt = Some(chunk_embeddings);
455 aggregated
456 };
457
458 let entity_embeddings = extracted_entities
459 .iter()
460 .map(|entity| {
461 let entity_text = match &entity.description {
462 Some(desc) => format!("{} {}", entity.name, desc),
463 None => entity.name.clone(),
464 };
465 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
466 })
467 .collect::<Result<Vec<_>, _>>()?;
468
469 Ok(StagedFile {
470 body: raw_body,
471 body_hash,
472 snippet,
473 name: name.to_string(),
474 description,
475 embedding,
476 chunk_embeddings: chunk_embeddings_opt,
477 chunks_info,
478 entities: extracted_entities,
479 relationships: extracted_relationships,
480 entity_embeddings,
481 urls: extracted_urls,
482 })
483}
484
485fn persist_staged(
487 conn: &mut Connection,
488 namespace: &str,
489 memory_type: &str,
490 staged: StagedFile,
491) -> Result<FileSuccess, AppError> {
492 {
493 let active_count: u32 = conn.query_row(
494 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
495 [],
496 |r| r.get::<_, i64>(0).map(|v| v as u32),
497 )?;
498 let ns_exists: bool = conn.query_row(
499 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
500 rusqlite::params![namespace],
501 |r| r.get::<_, i64>(0).map(|v| v > 0),
502 )?;
503 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
504 return Err(AppError::NamespaceError(format!(
505 "active namespace limit of {} exceeded while creating '{namespace}'",
506 crate::constants::MAX_NAMESPACES_ACTIVE
507 )));
508 }
509 }
510
511 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
512 if existing_memory.is_some() {
513 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
514 &staged.name,
515 namespace,
516 )));
517 }
518 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
519
520 let new_memory = NewMemory {
521 namespace: namespace.to_string(),
522 name: staged.name.clone(),
523 memory_type: memory_type.to_string(),
524 description: staged.description.clone(),
525 body: staged.body,
526 body_hash: staged.body_hash,
527 session_id: None,
528 source: "agent".to_string(),
529 metadata: serde_json::json!({}),
530 };
531
532 if let Some(hash_id) = duplicate_hash_id {
533 tracing::debug!(
534 target: "ingest",
535 duplicate_memory_id = hash_id,
536 "identical body already exists; persisting a new memory anyway"
537 );
538 }
539
540 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
541
542 let memory_id = memories::insert(&tx, &new_memory)?;
543 versions::insert_version(
544 &tx,
545 memory_id,
546 1,
547 &staged.name,
548 memory_type,
549 &staged.description,
550 &new_memory.body,
551 &serde_json::to_string(&new_memory.metadata)?,
552 None,
553 "create",
554 )?;
555 memories::upsert_vec(
556 &tx,
557 memory_id,
558 namespace,
559 memory_type,
560 &staged.embedding,
561 &staged.name,
562 &staged.snippet,
563 )?;
564
565 if staged.chunks_info.len() > 1 {
566 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
567 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
568 AppError::Internal(anyhow::anyhow!(
569 "missing chunk embeddings cache on multi-chunk ingest path"
570 ))
571 })?;
572 for (i, emb) in chunk_embeddings.iter().enumerate() {
573 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
574 }
575 }
576
577 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
578 for (idx, entity) in staged.entities.iter().enumerate() {
579 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
580 let entity_embedding = &staged.entity_embeddings[idx];
581 entities::upsert_entity_vec(
582 &tx,
583 entity_id,
584 namespace,
585 entity.entity_type,
586 entity_embedding,
587 &entity.name,
588 )?;
589 entities::link_memory_entity(&tx, memory_id, entity_id)?;
590 entities::increment_degree(&tx, entity_id)?;
591 }
592 let entity_types: std::collections::HashMap<&str, EntityType> = staged
593 .entities
594 .iter()
595 .map(|entity| (entity.name.as_str(), entity.entity_type))
596 .collect();
597 for rel in &staged.relationships {
598 let source_entity = NewEntity {
599 name: rel.source.clone(),
600 entity_type: entity_types
601 .get(rel.source.as_str())
602 .copied()
603 .unwrap_or(EntityType::Concept),
604 description: None,
605 };
606 let target_entity = NewEntity {
607 name: rel.target.clone(),
608 entity_type: entity_types
609 .get(rel.target.as_str())
610 .copied()
611 .unwrap_or(EntityType::Concept),
612 description: None,
613 };
614 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
615 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
616 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
617 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
618 }
619 }
620
621 tx.commit()?;
622
623 if !staged.urls.is_empty() {
624 let url_entries: Vec<storage_urls::MemoryUrl> = staged
625 .urls
626 .into_iter()
627 .map(|u| storage_urls::MemoryUrl {
628 url: u.url,
629 offset: Some(u.offset as i64),
630 })
631 .collect();
632 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
633 }
634
635 Ok(FileSuccess {
636 memory_id,
637 action: "created".to_string(),
638 })
639}
640
641pub fn run(args: IngestArgs) -> Result<(), AppError> {
642 let started = std::time::Instant::now();
643
644 if !args.dir.exists() {
645 return Err(AppError::Validation(format!(
646 "directory not found: {}",
647 args.dir.display()
648 )));
649 }
650 if !args.dir.is_dir() {
651 return Err(AppError::Validation(format!(
652 "path is not a directory: {}",
653 args.dir.display()
654 )));
655 }
656
657 let mut files: Vec<PathBuf> = Vec::new();
658 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
659 files.sort();
660
661 if files.len() > args.max_files {
662 return Err(AppError::Validation(format!(
663 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
664 files.len(),
665 args.max_files
666 )));
667 }
668
669 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
670 let memory_type_str = args.r#type.as_str().to_string();
671
672 let paths = AppPaths::resolve(args.db.as_deref())?;
673 let mut conn_or_err = match init_storage(&paths) {
674 Ok(c) => Ok(c),
675 Err(e) => Err(format!("{e}")),
676 };
677
678 let mut succeeded: usize = 0;
679 let mut failed: usize = 0;
680 let mut skipped: usize = 0;
681 let total = files.len();
682
683 let mut taken_names: BTreeSet<String> = BTreeSet::new();
686
687 enum SlotMeta {
693 Skip {
694 file_str: String,
695 derived_base: String,
696 name_truncated: bool,
697 original_name: Option<String>,
698 original_filename: Option<String>,
699 reason: String,
700 },
701 Process {
702 file_str: String,
703 derived_name: String,
704 name_truncated: bool,
705 original_name: Option<String>,
706 original_filename: Option<String>,
707 },
708 }
709
710 struct ProcessItem {
711 idx: usize,
712 path: PathBuf,
713 file_str: String,
714 derived_name: String,
715 }
716
717 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
718 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
719 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
720
721 for path in &files {
722 let file_str = path.to_string_lossy().into_owned();
723 let (derived_base, name_truncated, original_name) = derive_kebab_name(path);
724 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
725
726 if name_truncated {
727 if let Some(ref orig) = original_name {
728 truncations.push((orig.clone(), derived_base.clone()));
729 }
730 }
731
732 if derived_base.is_empty() {
733 let orig_filename = if !original_basename.is_empty() {
735 Some(original_basename.to_string())
736 } else {
737 None
738 };
739 slots_meta.push(SlotMeta::Skip {
740 file_str,
741 derived_base: String::new(),
742 name_truncated: false,
743 original_name: None,
744 original_filename: orig_filename,
745 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
746 });
747 continue;
748 }
749
750 match unique_name(&derived_base, &taken_names) {
751 Ok(derived_name) => {
752 taken_names.insert(derived_name.clone());
753 let idx = slots_meta.len();
754 let orig_filename = if original_basename != derived_name {
756 Some(original_basename.to_string())
757 } else {
758 None
759 };
760 process_items.push(ProcessItem {
761 idx,
762 path: path.clone(),
763 file_str: file_str.clone(),
764 derived_name: derived_name.clone(),
765 });
766 slots_meta.push(SlotMeta::Process {
767 file_str,
768 derived_name,
769 name_truncated,
770 original_name,
771 original_filename: orig_filename,
772 });
773 }
774 Err(e) => {
775 let orig_filename = if original_basename != derived_base {
776 Some(original_basename.to_string())
777 } else {
778 None
779 };
780 slots_meta.push(SlotMeta::Skip {
781 file_str,
782 derived_base,
783 name_truncated,
784 original_name,
785 original_filename: orig_filename,
786 reason: e.to_string(),
787 });
788 }
789 }
790 }
791
792 if !truncations.is_empty() {
793 tracing::info!(
794 target: "ingest",
795 count = truncations.len(),
796 max_len = DERIVED_NAME_MAX_LEN,
797 "derived names truncated; pass -vv (debug) for per-file detail"
798 );
799 }
800
801 if args.dry_run {
803 for meta in &slots_meta {
804 match meta {
805 SlotMeta::Skip {
806 file_str,
807 derived_base,
808 name_truncated,
809 original_name,
810 original_filename,
811 reason,
812 } => {
813 output::emit_json_compact(&IngestFileEvent {
814 file: file_str,
815 name: derived_base,
816 status: "skip",
817 truncated: *name_truncated,
818 original_name: original_name.clone(),
819 original_filename: original_filename.as_deref(),
820 error: Some(reason.clone()),
821 memory_id: None,
822 action: None,
823 })?;
824 }
825 SlotMeta::Process {
826 file_str,
827 derived_name,
828 name_truncated,
829 original_name,
830 original_filename,
831 } => {
832 output::emit_json_compact(&IngestFileEvent {
833 file: file_str,
834 name: derived_name,
835 status: "preview",
836 truncated: *name_truncated,
837 original_name: original_name.clone(),
838 original_filename: original_filename.as_deref(),
839 error: None,
840 memory_id: None,
841 action: None,
842 })?;
843 }
844 }
845 }
846 output::emit_json_compact(&IngestSummary {
847 summary: true,
848 dir: args.dir.to_string_lossy().into_owned(),
849 pattern: args.pattern.clone(),
850 recursive: args.recursive,
851 files_total: total,
852 files_succeeded: 0,
853 files_failed: 0,
854 files_skipped: 0,
855 elapsed_ms: started.elapsed().as_millis() as u64,
856 })?;
857 return Ok(());
858 }
859
860 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
863
864 let pool = rayon::ThreadPoolBuilder::new()
865 .num_threads(parallelism)
866 .build()
867 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
868
869 if args.enable_ner && args.skip_extraction {
870 tracing::warn!(
871 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
872 );
873 }
874 if args.skip_extraction && !args.enable_ner {
875 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
876 }
877 let enable_ner = args.enable_ner;
878 let max_rss_mb = args.max_rss_mb;
879 let gliner_variant: crate::extraction::GlinerVariant =
880 args.gliner_variant.parse().unwrap_or_else(|e| {
881 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
882 crate::extraction::GlinerVariant::Fp32
883 });
884
885 let total_to_process = process_items.len();
886 tracing::info!(
887 target = "ingest",
888 phase = "pipeline_start",
889 files = total_to_process,
890 ingest_parallelism = parallelism,
891 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
892 );
893
894 let channel_bound = (parallelism * 2).max(1);
898 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
899
900 let paths_owned = paths.clone();
905 let producer_handle = std::thread::spawn(move || {
906 pool.install(|| {
907 process_items.into_par_iter().for_each(|item| {
908 let t0 = std::time::Instant::now();
909 let result = stage_file(
910 item.idx,
911 &item.path,
912 &item.derived_name,
913 &paths_owned,
914 enable_ner,
915 gliner_variant,
916 max_rss_mb,
917 );
918 let elapsed_ms = t0.elapsed().as_millis() as u64;
919
920 let (n_entities, n_relationships) = match &result {
923 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
924 Err(_) => (0, 0),
925 };
926 let progress = StageProgressEvent {
927 schema_version: 1,
928 event: "file_extracted",
929 path: &item.file_str,
930 ms: elapsed_ms,
931 entities: n_entities,
932 relationships: n_relationships,
933 };
934 if let Ok(line) = serde_json::to_string(&progress) {
935 eprintln!("{line}");
936 }
937
938 let _ = tx.send((item.idx, result));
942 });
943 drop(tx);
945 });
946 });
947
948 let fail_fast = args.fail_fast;
960
961 for meta in &slots_meta {
963 if let SlotMeta::Skip {
964 file_str,
965 derived_base,
966 name_truncated,
967 original_name,
968 original_filename,
969 reason,
970 } = meta
971 {
972 output::emit_json_compact(&IngestFileEvent {
973 file: file_str,
974 name: derived_base,
975 status: "skipped",
976 truncated: *name_truncated,
977 original_name: original_name.clone(),
978 original_filename: original_filename.as_deref(),
979 error: Some(reason.clone()),
980 memory_id: None,
981 action: None,
982 })?;
983 skipped += 1;
984 }
985 }
986
987 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
990 .iter()
991 .enumerate()
992 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
993 .collect();
994
995 tracing::info!(
996 target = "ingest",
997 phase = "persist_start",
998 files = total_to_process,
999 "phase B starting: persisting files incrementally as Phase A completes each one",
1000 );
1001
1002 for (idx, stage_result) in rx {
1006 let meta = meta_index.get(&idx).ok_or_else(|| {
1007 AppError::Internal(anyhow::anyhow!(
1008 "channel idx {idx} has no corresponding Process slot"
1009 ))
1010 })?;
1011 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1012 {
1013 SlotMeta::Process {
1014 file_str,
1015 derived_name,
1016 name_truncated,
1017 original_name,
1018 original_filename,
1019 } => (
1020 file_str,
1021 derived_name,
1022 name_truncated,
1023 original_name,
1024 original_filename,
1025 ),
1026 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1027 };
1028
1029 let conn = match conn_or_err.as_mut() {
1031 Ok(c) => c,
1032 Err(err_msg) => {
1033 let err_clone = err_msg.clone();
1034 output::emit_json_compact(&IngestFileEvent {
1035 file: file_str,
1036 name: derived_name,
1037 status: "failed",
1038 truncated: *name_truncated,
1039 original_name: original_name.clone(),
1040 original_filename: original_filename.as_deref(),
1041 error: Some(err_clone.clone()),
1042 memory_id: None,
1043 action: None,
1044 })?;
1045 failed += 1;
1046 if fail_fast {
1047 output::emit_json_compact(&IngestSummary {
1048 summary: true,
1049 dir: args.dir.display().to_string(),
1050 pattern: args.pattern.clone(),
1051 recursive: args.recursive,
1052 files_total: total,
1053 files_succeeded: succeeded,
1054 files_failed: failed,
1055 files_skipped: skipped,
1056 elapsed_ms: started.elapsed().as_millis() as u64,
1057 })?;
1058 return Err(AppError::Validation(format!(
1059 "ingest aborted on first failure: {err_clone}"
1060 )));
1061 }
1062 continue;
1063 }
1064 };
1065
1066 let outcome =
1067 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1068
1069 match outcome {
1070 Ok(FileSuccess { memory_id, action }) => {
1071 output::emit_json_compact(&IngestFileEvent {
1072 file: file_str,
1073 name: derived_name,
1074 status: "indexed",
1075 truncated: *name_truncated,
1076 original_name: original_name.clone(),
1077 original_filename: original_filename.as_deref(),
1078 error: None,
1079 memory_id: Some(memory_id),
1080 action: Some(action),
1081 })?;
1082 succeeded += 1;
1083 }
1084 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1085 output::emit_json_compact(&IngestFileEvent {
1086 file: file_str,
1087 name: derived_name,
1088 status: "skipped",
1089 truncated: *name_truncated,
1090 original_name: original_name.clone(),
1091 original_filename: original_filename.as_deref(),
1092 error: Some(format!("{e}")),
1093 memory_id: None,
1094 action: Some("duplicate".to_string()),
1095 })?;
1096 skipped += 1;
1097 }
1098 Err(e) => {
1099 let err_msg = format!("{e}");
1100 output::emit_json_compact(&IngestFileEvent {
1101 file: file_str,
1102 name: derived_name,
1103 status: "failed",
1104 truncated: *name_truncated,
1105 original_name: original_name.clone(),
1106 original_filename: original_filename.as_deref(),
1107 error: Some(err_msg.clone()),
1108 memory_id: None,
1109 action: None,
1110 })?;
1111 failed += 1;
1112 if fail_fast {
1113 output::emit_json_compact(&IngestSummary {
1114 summary: true,
1115 dir: args.dir.display().to_string(),
1116 pattern: args.pattern.clone(),
1117 recursive: args.recursive,
1118 files_total: total,
1119 files_succeeded: succeeded,
1120 files_failed: failed,
1121 files_skipped: skipped,
1122 elapsed_ms: started.elapsed().as_millis() as u64,
1123 })?;
1124 return Err(AppError::Validation(format!(
1125 "ingest aborted on first failure: {err_msg}"
1126 )));
1127 }
1128 }
1129 }
1130 }
1131
1132 producer_handle
1134 .join()
1135 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1136
1137 output::emit_json_compact(&IngestSummary {
1138 summary: true,
1139 dir: args.dir.display().to_string(),
1140 pattern: args.pattern.clone(),
1141 recursive: args.recursive,
1142 files_total: total,
1143 files_succeeded: succeeded,
1144 files_failed: failed,
1145 files_skipped: skipped,
1146 elapsed_ms: started.elapsed().as_millis() as u64,
1147 })?;
1148
1149 Ok(())
1150}
1151
1152fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1158 ensure_db_ready(paths)?;
1159 let conn = open_rw(&paths.db)?;
1160 Ok(conn)
1161}
1162
1163fn collect_files(
1164 dir: &Path,
1165 pattern: &str,
1166 recursive: bool,
1167 out: &mut Vec<PathBuf>,
1168) -> Result<(), AppError> {
1169 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1170 for entry in entries {
1171 let entry = entry.map_err(AppError::Io)?;
1172 let path = entry.path();
1173 let file_type = entry.file_type().map_err(AppError::Io)?;
1174 if file_type.is_file() {
1175 let name = entry.file_name();
1176 let name_str = name.to_string_lossy();
1177 if matches_pattern(&name_str, pattern) {
1178 out.push(path);
1179 }
1180 } else if file_type.is_dir() && recursive {
1181 collect_files(&path, pattern, recursive, out)?;
1182 }
1183 }
1184 Ok(())
1185}
1186
1187fn matches_pattern(name: &str, pattern: &str) -> bool {
1188 if let Some(suffix) = pattern.strip_prefix('*') {
1189 name.ends_with(suffix)
1190 } else if let Some(prefix) = pattern.strip_suffix('*') {
1191 name.starts_with(prefix)
1192 } else {
1193 name == pattern
1194 }
1195}
1196
1197fn derive_kebab_name(path: &Path) -> (String, bool, Option<String>) {
1208 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1209 let lowered: String = stem
1210 .nfd()
1211 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1212 .map(|c| {
1213 if c == '_' || c.is_whitespace() {
1214 '-'
1215 } else {
1216 c
1217 }
1218 })
1219 .map(|c| c.to_ascii_lowercase())
1220 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1221 .collect();
1222 let collapsed = collapse_dashes(&lowered);
1223 let trimmed = collapsed.trim_matches('-').to_string();
1224 if trimmed.len() > DERIVED_NAME_MAX_LEN {
1225 let truncated = trimmed[..DERIVED_NAME_MAX_LEN]
1226 .trim_matches('-')
1227 .to_string();
1228 tracing::debug!(
1229 target: "ingest",
1230 original = %trimmed,
1231 truncated_to = %truncated,
1232 max_len = DERIVED_NAME_MAX_LEN,
1233 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1234 );
1235 (truncated, true, Some(trimmed))
1236 } else {
1237 (trimmed, false, None)
1238 }
1239}
1240
1241fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1254 if !taken.contains(base) {
1255 return Ok(base.to_string());
1256 }
1257 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1258 let candidate = format!("{base}-{suffix}");
1259 if !taken.contains(&candidate) {
1260 tracing::warn!(
1261 target: "ingest",
1262 base = %base,
1263 resolved = %candidate,
1264 suffix,
1265 "memory name collision resolved with numeric suffix"
1266 );
1267 return Ok(candidate);
1268 }
1269 }
1270 Err(AppError::Validation(format!(
1271 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1272 )))
1273}
1274
1275fn collapse_dashes(s: &str) -> String {
1276 let mut out = String::with_capacity(s.len());
1277 let mut prev_dash = false;
1278 for c in s.chars() {
1279 if c == '-' {
1280 if !prev_dash {
1281 out.push('-');
1282 }
1283 prev_dash = true;
1284 } else {
1285 out.push(c);
1286 prev_dash = false;
1287 }
1288 }
1289 out
1290}
1291
1292#[cfg(test)]
1293mod tests {
1294 use super::*;
1295 use std::path::PathBuf;
1296
1297 #[test]
1298 fn matches_pattern_suffix() {
1299 assert!(matches_pattern("foo.md", "*.md"));
1300 assert!(!matches_pattern("foo.txt", "*.md"));
1301 assert!(matches_pattern("foo.md", "*"));
1302 }
1303
1304 #[test]
1305 fn matches_pattern_prefix() {
1306 assert!(matches_pattern("README.md", "README*"));
1307 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1308 }
1309
1310 #[test]
1311 fn matches_pattern_exact() {
1312 assert!(matches_pattern("README.md", "README.md"));
1313 assert!(!matches_pattern("readme.md", "README.md"));
1314 }
1315
1316 #[test]
1317 fn derive_kebab_underscore_to_dash() {
1318 let p = PathBuf::from("/tmp/claude_code_headless.md");
1319 let (name, truncated, original) = derive_kebab_name(&p);
1320 assert_eq!(name, "claude-code-headless");
1321 assert!(!truncated);
1322 assert!(original.is_none());
1323 }
1324
1325 #[test]
1326 fn derive_kebab_uppercase_lowered() {
1327 let p = PathBuf::from("/tmp/README.md");
1328 let (name, truncated, original) = derive_kebab_name(&p);
1329 assert_eq!(name, "readme");
1330 assert!(!truncated);
1331 assert!(original.is_none());
1332 }
1333
1334 #[test]
1335 fn derive_kebab_strips_non_kebab_chars() {
1336 let p = PathBuf::from("/tmp/some@weird#name!.md");
1337 let (name, truncated, original) = derive_kebab_name(&p);
1338 assert_eq!(name, "someweirdname");
1339 assert!(!truncated);
1340 assert!(original.is_none());
1341 }
1342
1343 #[test]
1346 fn derive_kebab_folds_accented_letters_to_ascii() {
1347 let p = PathBuf::from("/tmp/açaí.md");
1348 let (name, _, _) = derive_kebab_name(&p);
1349 assert_eq!(name, "acai", "got '{name}'");
1350 }
1351
1352 #[test]
1353 fn derive_kebab_handles_naive_with_diaeresis() {
1354 let p = PathBuf::from("/tmp/naïve-test.md");
1355 let (name, _, _) = derive_kebab_name(&p);
1356 assert_eq!(name, "naive-test", "got '{name}'");
1357 }
1358
1359 #[test]
1360 fn derive_kebab_drops_emoji_keeps_word() {
1361 let p = PathBuf::from("/tmp/🚀-rocket.md");
1362 let (name, _, _) = derive_kebab_name(&p);
1363 assert_eq!(name, "rocket", "got '{name}'");
1364 }
1365
1366 #[test]
1367 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1368 let p = PathBuf::from("/tmp/açaí🦜.md");
1369 let (name, _, _) = derive_kebab_name(&p);
1370 assert_eq!(name, "acai", "got '{name}'");
1371 }
1372
1373 #[test]
1374 fn derive_kebab_pure_emoji_yields_empty() {
1375 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1376 let (name, _, _) = derive_kebab_name(&p);
1377 assert!(name.is_empty(), "got '{name}'");
1378 }
1379
1380 #[test]
1381 fn derive_kebab_collapses_consecutive_dashes() {
1382 let p = PathBuf::from("/tmp/a__b___c.md");
1383 let (name, truncated, original) = derive_kebab_name(&p);
1384 assert_eq!(name, "a-b-c");
1385 assert!(!truncated);
1386 assert!(original.is_none());
1387 }
1388
1389 #[test]
1390 fn derive_kebab_truncates_to_60_chars() {
1391 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1392 let (name, truncated, original) = derive_kebab_name(&p);
1393 assert!(name.len() <= 60, "got len {}", name.len());
1394 assert!(truncated);
1395 assert!(original.is_some());
1396 assert!(original.unwrap().len() > 60);
1397 }
1398
1399 #[test]
1400 fn collect_files_finds_md_files() {
1401 let tmp = tempfile::tempdir().expect("tempdir");
1402 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1403 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1404 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1405 let mut out = Vec::new();
1406 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1407 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1408 }
1409
1410 #[test]
1411 fn collect_files_recursive_descends_subdirs() {
1412 let tmp = tempfile::tempdir().expect("tempdir");
1413 let sub = tmp.path().join("sub");
1414 std::fs::create_dir(&sub).unwrap();
1415 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1416 std::fs::write(sub.join("b.md"), "y").unwrap();
1417 let mut out = Vec::new();
1418 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1419 assert_eq!(out.len(), 2);
1420 }
1421
1422 #[test]
1423 fn collect_files_non_recursive_skips_subdirs() {
1424 let tmp = tempfile::tempdir().expect("tempdir");
1425 let sub = tmp.path().join("sub");
1426 std::fs::create_dir(&sub).unwrap();
1427 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1428 std::fs::write(sub.join("b.md"), "y").unwrap();
1429 let mut out = Vec::new();
1430 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1431 assert_eq!(out.len(), 1);
1432 }
1433
1434 #[test]
1437 fn derive_kebab_long_basename_truncated_within_cap() {
1438 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1439 let (name, truncated, original) = derive_kebab_name(&p);
1440 assert!(
1441 name.len() <= DERIVED_NAME_MAX_LEN,
1442 "truncated name must respect cap; got {} chars",
1443 name.len()
1444 );
1445 assert!(!name.is_empty());
1446 assert!(truncated);
1447 assert!(original.is_some());
1448 }
1449
1450 #[test]
1451 fn unique_name_returns_base_when_free() {
1452 let taken: BTreeSet<String> = BTreeSet::new();
1453 let resolved = unique_name("note", &taken).expect("must resolve");
1454 assert_eq!(resolved, "note");
1455 }
1456
1457 #[test]
1458 fn unique_name_appends_first_free_suffix_on_collision() {
1459 let mut taken: BTreeSet<String> = BTreeSet::new();
1460 taken.insert("note".to_string());
1461 taken.insert("note-1".to_string());
1462 let resolved = unique_name("note", &taken).expect("must resolve");
1463 assert_eq!(resolved, "note-2");
1464 }
1465
1466 #[test]
1467 fn unique_name_errors_after_collision_cap() {
1468 let mut taken: BTreeSet<String> = BTreeSet::new();
1469 taken.insert("note".to_string());
1470 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1471 taken.insert(format!("note-{i}"));
1472 }
1473 let err = unique_name("note", &taken).expect_err("must surface error");
1474 assert!(matches!(err, AppError::Validation(_)));
1475 }
1476
1477 #[test]
1480 fn validate_relation_format_accepts_valid_relations() {
1481 use crate::parsers::{is_canonical_relation, validate_relation_format};
1482 assert!(validate_relation_format("applies_to").is_ok());
1483 assert!(validate_relation_format("depends_on").is_ok());
1484 assert!(validate_relation_format("implements").is_ok());
1485 assert!(validate_relation_format("").is_err());
1486 assert!(is_canonical_relation("applies_to"));
1487 assert!(!is_canonical_relation("implements"));
1488 }
1489
1490 use serial_test::serial;
1493
1494 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1496 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1497 let prev = std::env::var(key).ok();
1498 match value {
1499 Some(v) => std::env::set_var(key, v),
1500 None => std::env::remove_var(key),
1501 }
1502 f();
1503 match prev {
1504 Some(p) => std::env::set_var(key, p),
1505 None => std::env::remove_var(key),
1506 }
1507 }
1508
1509 #[test]
1510 #[serial]
1511 fn env_low_memory_enabled_unset_returns_false() {
1512 with_env_var(None, || assert!(!env_low_memory_enabled()));
1513 }
1514
1515 #[test]
1516 #[serial]
1517 fn env_low_memory_enabled_empty_returns_false() {
1518 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1519 }
1520
1521 #[test]
1522 #[serial]
1523 fn env_low_memory_enabled_truthy_values_return_true() {
1524 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1525 with_env_var(Some(v), || {
1526 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1527 });
1528 }
1529 }
1530
1531 #[test]
1532 #[serial]
1533 fn env_low_memory_enabled_falsy_values_return_false() {
1534 for v in ["0", "false", "FALSE", "no", "off"] {
1535 with_env_var(Some(v), || {
1536 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1537 });
1538 }
1539 }
1540
1541 #[test]
1542 #[serial]
1543 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1544 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1545 }
1546
1547 #[test]
1548 #[serial]
1549 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1550 with_env_var(None, || {
1551 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1552 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1553 assert_eq!(resolve_parallelism(true, None), 1);
1554 });
1555 }
1556
1557 #[test]
1558 #[serial]
1559 fn resolve_parallelism_env_forces_one_when_flag_off() {
1560 with_env_var(Some("1"), || {
1561 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1562 assert_eq!(resolve_parallelism(false, None), 1);
1563 });
1564 }
1565
1566 #[test]
1567 #[serial]
1568 fn resolve_parallelism_falsy_env_does_not_override() {
1569 with_env_var(Some("0"), || {
1570 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1571 });
1572 }
1573
1574 #[test]
1575 #[serial]
1576 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1577 with_env_var(None, || {
1578 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1579 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1580 });
1581 }
1582
1583 #[test]
1584 #[serial]
1585 fn resolve_parallelism_default_when_unset() {
1586 with_env_var(None, || {
1587 let p = resolve_parallelism(false, None);
1588 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1589 });
1590 }
1591
1592 #[test]
1593 fn ingest_args_parses_low_memory_flag_via_clap() {
1594 use clap::Parser;
1595 let cli = crate::cli::Cli::try_parse_from([
1598 "sqlite-graphrag",
1599 "ingest",
1600 "/tmp/dummy",
1601 "--type",
1602 "document",
1603 "--low-memory",
1604 ])
1605 .expect("parse must succeed");
1606 match cli.command {
1607 crate::cli::Commands::Ingest(args) => {
1608 assert!(args.low_memory, "--low-memory must set field to true");
1609 }
1610 _ => panic!("expected Ingest subcommand"),
1611 }
1612 }
1613
1614 #[test]
1615 fn ingest_args_low_memory_defaults_false() {
1616 use clap::Parser;
1617 let cli = crate::cli::Cli::try_parse_from([
1618 "sqlite-graphrag",
1619 "ingest",
1620 "/tmp/dummy",
1621 "--type",
1622 "document",
1623 ])
1624 .expect("parse must succeed");
1625 match cli.command {
1626 crate::cli::Commands::Ingest(args) => {
1627 assert!(!args.low_memory, "default must be false");
1628 }
1629 _ => panic!("expected Ingest subcommand"),
1630 }
1631 }
1632}