1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70NOTES:\n \
71 Each file becomes a separate memory. Names derive from file basenames\n \
72 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
73 followed by a final summary line with counts. Per-file errors are reported\n \
74 inline and processing continues unless --fail-fast is set.")]
75pub struct IngestArgs {
76 #[arg(
78 value_name = "DIR",
79 help = "Directory to ingest recursively (each matching file becomes a memory)"
80 )]
81 pub dir: PathBuf,
82
83 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
85 pub r#type: MemoryType,
86
87 #[arg(long, default_value = "*.md")]
90 pub pattern: String,
91
92 #[arg(long, default_value_t = false)]
94 pub recursive: bool,
95
96 #[arg(
97 long,
98 env = "SQLITE_GRAPHRAG_ENABLE_NER",
99 value_parser = crate::parsers::parse_bool_flexible,
100 action = clap::ArgAction::Set,
101 num_args = 0..=1,
102 default_missing_value = "true",
103 default_value = "false",
104 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
105 )]
106 pub enable_ner: bool,
107 #[arg(
108 long,
109 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
110 default_value = "fp32",
111 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
112 )]
113 pub gliner_variant: String,
114
115 #[arg(long, default_value_t = false, hide = true)]
117 pub skip_extraction: bool,
118
119 #[arg(long, default_value_t = false)]
121 pub fail_fast: bool,
122
123 #[arg(long, default_value_t = false)]
125 pub dry_run: bool,
126
127 #[arg(long, default_value_t = 10_000)]
129 pub max_files: usize,
130
131 #[arg(long)]
133 pub namespace: Option<String>,
134
135 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
137 pub db: Option<String>,
138
139 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
140 pub format: JsonOutputFormat,
141
142 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
143 pub json: bool,
144
145 #[arg(
147 long,
148 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
149 )]
150 pub ingest_parallelism: Option<usize>,
151
152 #[arg(
160 long,
161 default_value_t = false,
162 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
163 Recommended for environments with <4 GB available RAM or container/cgroup \
164 constraints. Trade-off: 3-4x longer wall time. Also honored via \
165 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
166 )]
167 pub low_memory: bool,
168
169 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
171 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
172 pub max_rss_mb: u64,
173
174 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
179 help = "Maximum length for derived memory names (default: 60)")]
180 pub max_name_length: usize,
181
182 #[arg(long, value_enum, default_value_t = IngestMode::None)]
184 pub mode: IngestMode,
185
186 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
188 pub claude_binary: Option<std::path::PathBuf>,
189
190 #[arg(long)]
192 pub claude_model: Option<String>,
193
194 #[arg(long, default_value_t = false)]
196 pub resume: bool,
197
198 #[arg(long, default_value_t = false)]
200 pub retry_failed: bool,
201
202 #[arg(long, default_value_t = false)]
204 pub keep_queue: bool,
205
206 #[arg(long, default_value = ".ingest-queue.sqlite")]
208 pub queue_db: String,
209
210 #[arg(long, default_value_t = 60)]
212 pub rate_limit_wait: u64,
213
214 #[arg(long)]
216 pub max_cost_usd: Option<f64>,
217}
218
219#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
221pub enum IngestMode {
222 None,
224 Gliner,
226 ClaudeCode,
228}
229
230fn env_low_memory_enabled() -> bool {
235 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
236 Ok(v) if v.is_empty() => false,
237 Ok(v) => match v.to_lowercase().as_str() {
238 "1" | "true" | "yes" | "on" => true,
239 "0" | "false" | "no" | "off" => false,
240 other => {
241 tracing::warn!(
242 target: "ingest",
243 value = %other,
244 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
245 );
246 false
247 }
248 },
249 Err(_) => false,
250 }
251}
252
253fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
265 let env_flag = env_low_memory_enabled();
266 let low_memory = low_memory_flag || env_flag;
267
268 if low_memory {
269 if let Some(n) = ingest_parallelism {
270 if n > 1 {
271 tracing::warn!(
272 target: "ingest",
273 requested = n,
274 "--ingest-parallelism overridden by --low-memory; using 1"
275 );
276 }
277 }
278 if low_memory_flag {
279 tracing::info!(
280 target: "ingest",
281 source = "flag",
282 "low-memory mode enabled: forcing --ingest-parallelism 1"
283 );
284 } else {
285 tracing::info!(
286 target: "ingest",
287 source = "env",
288 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
289 );
290 }
291 return 1;
292 }
293
294 ingest_parallelism
295 .unwrap_or_else(|| {
296 std::thread::available_parallelism()
297 .map(|v| v.get() / 2)
298 .unwrap_or(1)
299 .clamp(1, 4)
300 })
301 .max(1)
302}
303
304#[derive(Serialize)]
305struct IngestFileEvent<'a> {
306 file: &'a str,
307 name: &'a str,
308 status: &'a str,
309 truncated: bool,
311 #[serde(skip_serializing_if = "Option::is_none")]
313 original_name: Option<String>,
314 #[serde(skip_serializing_if = "Option::is_none")]
316 original_filename: Option<&'a str>,
317 #[serde(skip_serializing_if = "Option::is_none")]
318 error: Option<String>,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 memory_id: Option<i64>,
321 #[serde(skip_serializing_if = "Option::is_none")]
322 action: Option<String>,
323 body_length: usize,
325}
326
327#[derive(Serialize)]
328struct IngestSummary {
329 summary: bool,
330 dir: String,
331 pattern: String,
332 recursive: bool,
333 files_total: usize,
334 files_succeeded: usize,
335 files_failed: usize,
336 files_skipped: usize,
337 elapsed_ms: u64,
338}
339
340struct FileSuccess {
342 memory_id: i64,
343 action: String,
344 body_length: usize,
345}
346
347#[derive(Serialize)]
350struct StageProgressEvent<'a> {
351 schema_version: u8,
352 event: &'a str,
353 path: &'a str,
354 ms: u64,
355 entities: usize,
356 relationships: usize,
357}
358
359struct StagedFile {
362 body: String,
363 body_hash: String,
364 snippet: String,
365 name: String,
366 description: String,
367 embedding: Vec<f32>,
368 chunk_embeddings: Option<Vec<Vec<f32>>>,
369 chunks_info: Vec<crate::chunking::Chunk>,
370 entities: Vec<NewEntity>,
371 relationships: Vec<NewRelationship>,
372 entity_embeddings: Vec<Vec<f32>>,
373 urls: Vec<crate::extraction::ExtractedUrl>,
374}
375
376fn stage_file(
379 _idx: usize,
380 path: &Path,
381 name: &str,
382 paths: &AppPaths,
383 enable_ner: bool,
384 gliner_variant: crate::extraction::GlinerVariant,
385 max_rss_mb: u64,
386) -> Result<StagedFile, AppError> {
387 use crate::constants::*;
388
389 if name.len() > MAX_MEMORY_NAME_LEN {
390 return Err(AppError::LimitExceeded(
391 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
392 ));
393 }
394 if name.starts_with("__") {
395 return Err(AppError::Validation(
396 crate::i18n::validation::reserved_name(),
397 ));
398 }
399 {
400 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
401 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
402 if !slug_re.is_match(name) {
403 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
404 name,
405 )));
406 }
407 }
408
409 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
410 if raw_body.len() > MAX_MEMORY_BODY_LEN {
411 return Err(AppError::LimitExceeded(
412 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
413 ));
414 }
415 if raw_body.trim().is_empty() {
416 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
417 }
418
419 let description = format!("ingested from {}", path.display());
420 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
421 return Err(AppError::Validation(
422 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
423 ));
424 }
425
426 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
427 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
428 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
429 if enable_ner {
430 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
431 Ok(extracted) => {
432 extracted_urls = extracted.urls;
433 extracted_entities = extracted.entities;
434 extracted_relationships = extracted.relationships;
435
436 if extracted_entities.len() > max_entities_per_memory() {
437 extracted_entities.truncate(max_entities_per_memory());
438 }
439 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
440 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
441 }
442 }
443 Err(e) => {
444 tracing::warn!(
445 file = %path.display(),
446 "auto-extraction failed (graceful degradation): {e:#}"
447 );
448 }
449 }
450 }
451
452 for rel in &mut extracted_relationships {
453 rel.relation = crate::parsers::normalize_relation(&rel.relation);
454 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
455 return Err(AppError::Validation(format!(
456 "{e} for relationship '{}' -> '{}'",
457 rel.source, rel.target
458 )));
459 }
460 crate::parsers::warn_if_non_canonical(&rel.relation);
461 if !(0.0..=1.0).contains(&rel.strength) {
462 return Err(AppError::Validation(format!(
463 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
464 rel.strength, rel.source, rel.target
465 )));
466 }
467 }
468
469 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
470 let snippet: String = raw_body.chars().take(200).collect();
471
472 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
473 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
474 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
475 return Err(AppError::LimitExceeded(format!(
476 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
477 chunks_info.len(),
478 REMEMBER_MAX_SAFE_MULTI_CHUNKS
479 )));
480 }
481
482 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
483 let embedding = if chunks_info.len() == 1 {
484 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
485 } else {
486 let chunk_texts: Vec<&str> = chunks_info
487 .iter()
488 .map(|c| chunking::chunk_text(&raw_body, c))
489 .collect();
490 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
491 for chunk_text in &chunk_texts {
492 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
493 if rss > max_rss_mb {
494 tracing::error!(
495 rss_mb = rss,
496 max_rss_mb = max_rss_mb,
497 file = %path.display(),
498 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
499 );
500 return Err(AppError::LowMemory {
501 available_mb: crate::memory_guard::available_memory_mb(),
502 required_mb: max_rss_mb,
503 });
504 }
505 }
506 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
507 &paths.models,
508 chunk_text,
509 )?);
510 }
511 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
512 chunk_embeddings_opt = Some(chunk_embeddings);
513 aggregated
514 };
515
516 let entity_embeddings = extracted_entities
517 .iter()
518 .map(|entity| {
519 let entity_text = match &entity.description {
520 Some(desc) => format!("{} {}", entity.name, desc),
521 None => entity.name.clone(),
522 };
523 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
524 })
525 .collect::<Result<Vec<_>, _>>()?;
526
527 Ok(StagedFile {
528 body: raw_body,
529 body_hash,
530 snippet,
531 name: name.to_string(),
532 description,
533 embedding,
534 chunk_embeddings: chunk_embeddings_opt,
535 chunks_info,
536 entities: extracted_entities,
537 relationships: extracted_relationships,
538 entity_embeddings,
539 urls: extracted_urls,
540 })
541}
542
543fn persist_staged(
545 conn: &mut Connection,
546 namespace: &str,
547 memory_type: &str,
548 staged: StagedFile,
549) -> Result<FileSuccess, AppError> {
550 {
551 let active_count: u32 = conn.query_row(
552 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
553 [],
554 |r| r.get::<_, i64>(0).map(|v| v as u32),
555 )?;
556 let ns_exists: bool = conn.query_row(
557 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
558 rusqlite::params![namespace],
559 |r| r.get::<_, i64>(0).map(|v| v > 0),
560 )?;
561 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
562 return Err(AppError::NamespaceError(format!(
563 "active namespace limit of {} exceeded while creating '{namespace}'",
564 crate::constants::MAX_NAMESPACES_ACTIVE
565 )));
566 }
567 }
568
569 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
570 if existing_memory.is_some() {
571 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
572 &staged.name,
573 namespace,
574 )));
575 }
576 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
577
578 let new_memory = NewMemory {
579 namespace: namespace.to_string(),
580 name: staged.name.clone(),
581 memory_type: memory_type.to_string(),
582 description: staged.description.clone(),
583 body: staged.body,
584 body_hash: staged.body_hash,
585 session_id: None,
586 source: "agent".to_string(),
587 metadata: serde_json::json!({}),
588 };
589
590 if let Some(hash_id) = duplicate_hash_id {
591 tracing::debug!(
592 target: "ingest",
593 duplicate_memory_id = hash_id,
594 "identical body already exists; persisting a new memory anyway"
595 );
596 }
597
598 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
599
600 let memory_id = memories::insert(&tx, &new_memory)?;
601 versions::insert_version(
602 &tx,
603 memory_id,
604 1,
605 &staged.name,
606 memory_type,
607 &staged.description,
608 &new_memory.body,
609 &serde_json::to_string(&new_memory.metadata)?,
610 None,
611 "create",
612 )?;
613 memories::upsert_vec(
614 &tx,
615 memory_id,
616 namespace,
617 memory_type,
618 &staged.embedding,
619 &staged.name,
620 &staged.snippet,
621 )?;
622
623 if staged.chunks_info.len() > 1 {
624 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
625 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
626 AppError::Internal(anyhow::anyhow!(
627 "missing chunk embeddings cache on multi-chunk ingest path"
628 ))
629 })?;
630 for (i, emb) in chunk_embeddings.iter().enumerate() {
631 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
632 }
633 }
634
635 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
636 for (idx, entity) in staged.entities.iter().enumerate() {
637 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
638 let entity_embedding = &staged.entity_embeddings[idx];
639 entities::upsert_entity_vec(
640 &tx,
641 entity_id,
642 namespace,
643 entity.entity_type,
644 entity_embedding,
645 &entity.name,
646 )?;
647 entities::link_memory_entity(&tx, memory_id, entity_id)?;
648 entities::increment_degree(&tx, entity_id)?;
649 }
650 let entity_types: std::collections::HashMap<&str, EntityType> = staged
651 .entities
652 .iter()
653 .map(|entity| (entity.name.as_str(), entity.entity_type))
654 .collect();
655 for rel in &staged.relationships {
656 let source_entity = NewEntity {
657 name: rel.source.clone(),
658 entity_type: entity_types
659 .get(rel.source.as_str())
660 .copied()
661 .unwrap_or(EntityType::Concept),
662 description: None,
663 };
664 let target_entity = NewEntity {
665 name: rel.target.clone(),
666 entity_type: entity_types
667 .get(rel.target.as_str())
668 .copied()
669 .unwrap_or(EntityType::Concept),
670 description: None,
671 };
672 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
673 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
674 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
675 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
676 }
677 }
678
679 tx.commit()?;
680
681 if !staged.urls.is_empty() {
682 let url_entries: Vec<storage_urls::MemoryUrl> = staged
683 .urls
684 .into_iter()
685 .map(|u| storage_urls::MemoryUrl {
686 url: u.url,
687 offset: Some(u.offset as i64),
688 })
689 .collect();
690 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
691 }
692
693 Ok(FileSuccess {
694 memory_id,
695 action: "created".to_string(),
696 body_length: new_memory.body.len(),
697 })
698}
699
700pub fn run(args: IngestArgs) -> Result<(), AppError> {
701 if args.mode == IngestMode::ClaudeCode {
702 return super::ingest_claude::run_claude_ingest(&args);
703 }
704
705 let started = std::time::Instant::now();
706
707 if !args.dir.exists() {
708 return Err(AppError::Validation(format!(
709 "directory not found: {}",
710 args.dir.display()
711 )));
712 }
713 if !args.dir.is_dir() {
714 return Err(AppError::Validation(format!(
715 "path is not a directory: {}",
716 args.dir.display()
717 )));
718 }
719
720 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
721 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
722 files.sort();
723
724 if files.len() > args.max_files {
725 return Err(AppError::Validation(format!(
726 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
727 files.len(),
728 args.max_files
729 )));
730 }
731
732 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
733 let memory_type_str = args.r#type.as_str().to_string();
734
735 let paths = AppPaths::resolve(args.db.as_deref())?;
736 let mut conn_or_err = match init_storage(&paths) {
737 Ok(c) => Ok(c),
738 Err(e) => Err(format!("{e}")),
739 };
740
741 let mut succeeded: usize = 0;
742 let mut failed: usize = 0;
743 let mut skipped: usize = 0;
744 let total = files.len();
745
746 let mut taken_names: BTreeSet<String> = BTreeSet::new();
749
750 enum SlotMeta {
756 Skip {
757 file_str: String,
758 derived_base: String,
759 name_truncated: bool,
760 original_name: Option<String>,
761 original_filename: Option<String>,
762 reason: String,
763 },
764 Process {
765 file_str: String,
766 derived_name: String,
767 name_truncated: bool,
768 original_name: Option<String>,
769 original_filename: Option<String>,
770 },
771 }
772
773 struct ProcessItem {
774 idx: usize,
775 path: PathBuf,
776 file_str: String,
777 derived_name: String,
778 }
779
780 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
781 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
782 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
783
784 let max_name_length = args.max_name_length;
785 for path in &files {
786 let file_str = path.to_string_lossy().into_owned();
787 let (derived_base, name_truncated, original_name) =
788 derive_kebab_name(path, max_name_length);
789 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
790
791 if name_truncated {
792 if let Some(ref orig) = original_name {
793 truncations.push((orig.clone(), derived_base.clone()));
794 }
795 }
796
797 if derived_base.is_empty() {
798 let orig_filename = if !original_basename.is_empty() {
800 Some(original_basename.to_string())
801 } else {
802 None
803 };
804 slots_meta.push(SlotMeta::Skip {
805 file_str,
806 derived_base: String::new(),
807 name_truncated: false,
808 original_name: None,
809 original_filename: orig_filename,
810 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
811 });
812 continue;
813 }
814
815 match unique_name(&derived_base, &taken_names) {
816 Ok(derived_name) => {
817 taken_names.insert(derived_name.clone());
818 let idx = slots_meta.len();
819 let orig_filename = if original_basename != derived_name {
821 Some(original_basename.to_string())
822 } else {
823 None
824 };
825 process_items.push(ProcessItem {
826 idx,
827 path: path.clone(),
828 file_str: file_str.clone(),
829 derived_name: derived_name.clone(),
830 });
831 slots_meta.push(SlotMeta::Process {
832 file_str,
833 derived_name,
834 name_truncated,
835 original_name,
836 original_filename: orig_filename,
837 });
838 }
839 Err(e) => {
840 let orig_filename = if original_basename != derived_base {
841 Some(original_basename.to_string())
842 } else {
843 None
844 };
845 slots_meta.push(SlotMeta::Skip {
846 file_str,
847 derived_base,
848 name_truncated,
849 original_name,
850 original_filename: orig_filename,
851 reason: e.to_string(),
852 });
853 }
854 }
855 }
856
857 if !truncations.is_empty() {
858 tracing::info!(
859 target: "ingest",
860 count = truncations.len(),
861 max_name_length = max_name_length,
862 max_len = DERIVED_NAME_MAX_LEN,
863 "derived names truncated; pass -vv (debug) for per-file detail"
864 );
865 }
866
867 if args.dry_run {
869 for meta in &slots_meta {
870 match meta {
871 SlotMeta::Skip {
872 file_str,
873 derived_base,
874 name_truncated,
875 original_name,
876 original_filename,
877 reason,
878 } => {
879 output::emit_json_compact(&IngestFileEvent {
880 file: file_str,
881 name: derived_base,
882 status: "skip",
883 truncated: *name_truncated,
884 original_name: original_name.clone(),
885 original_filename: original_filename.as_deref(),
886 error: Some(reason.clone()),
887 memory_id: None,
888 action: None,
889 body_length: 0,
890 })?;
891 }
892 SlotMeta::Process {
893 file_str,
894 derived_name,
895 name_truncated,
896 original_name,
897 original_filename,
898 } => {
899 output::emit_json_compact(&IngestFileEvent {
900 file: file_str,
901 name: derived_name,
902 status: "preview",
903 truncated: *name_truncated,
904 original_name: original_name.clone(),
905 original_filename: original_filename.as_deref(),
906 error: None,
907 memory_id: None,
908 action: None,
909 body_length: 0,
910 })?;
911 }
912 }
913 }
914 output::emit_json_compact(&IngestSummary {
915 summary: true,
916 dir: args.dir.to_string_lossy().into_owned(),
917 pattern: args.pattern.clone(),
918 recursive: args.recursive,
919 files_total: total,
920 files_succeeded: 0,
921 files_failed: 0,
922 files_skipped: 0,
923 elapsed_ms: started.elapsed().as_millis() as u64,
924 })?;
925 return Ok(());
926 }
927
928 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
931
932 let pool = rayon::ThreadPoolBuilder::new()
933 .num_threads(parallelism)
934 .build()
935 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
936
937 if args.enable_ner && args.skip_extraction {
938 tracing::warn!(
939 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
940 );
941 }
942 if args.skip_extraction && !args.enable_ner {
943 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
944 }
945 let enable_ner = args.enable_ner;
946 let max_rss_mb = args.max_rss_mb;
947 let gliner_variant: crate::extraction::GlinerVariant =
948 args.gliner_variant.parse().unwrap_or_else(|e| {
949 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
950 crate::extraction::GlinerVariant::Fp32
951 });
952
953 let total_to_process = process_items.len();
954 tracing::info!(
955 target = "ingest",
956 phase = "pipeline_start",
957 files = total_to_process,
958 ingest_parallelism = parallelism,
959 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
960 );
961
962 let channel_bound = (parallelism * 2).max(1);
966 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
967
968 let paths_owned = paths.clone();
973 let producer_handle = std::thread::spawn(move || {
974 pool.install(|| {
975 process_items.into_par_iter().for_each(|item| {
976 let t0 = std::time::Instant::now();
977 let result = stage_file(
978 item.idx,
979 &item.path,
980 &item.derived_name,
981 &paths_owned,
982 enable_ner,
983 gliner_variant,
984 max_rss_mb,
985 );
986 let elapsed_ms = t0.elapsed().as_millis() as u64;
987
988 let (n_entities, n_relationships) = match &result {
991 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
992 Err(_) => (0, 0),
993 };
994 let progress = StageProgressEvent {
995 schema_version: 1,
996 event: "file_extracted",
997 path: &item.file_str,
998 ms: elapsed_ms,
999 entities: n_entities,
1000 relationships: n_relationships,
1001 };
1002 if let Ok(line) = serde_json::to_string(&progress) {
1003 eprintln!("{line}");
1004 }
1005
1006 let _ = tx.send((item.idx, result));
1010 });
1011 drop(tx);
1013 });
1014 });
1015
1016 let fail_fast = args.fail_fast;
1028
1029 for meta in &slots_meta {
1031 if let SlotMeta::Skip {
1032 file_str,
1033 derived_base,
1034 name_truncated,
1035 original_name,
1036 original_filename,
1037 reason,
1038 } = meta
1039 {
1040 output::emit_json_compact(&IngestFileEvent {
1041 file: file_str,
1042 name: derived_base,
1043 status: "skipped",
1044 truncated: *name_truncated,
1045 original_name: original_name.clone(),
1046 original_filename: original_filename.as_deref(),
1047 error: Some(reason.clone()),
1048 memory_id: None,
1049 action: None,
1050 body_length: 0,
1051 })?;
1052 skipped += 1;
1053 }
1054 }
1055
1056 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1059 .iter()
1060 .enumerate()
1061 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1062 .collect();
1063
1064 tracing::info!(
1065 target = "ingest",
1066 phase = "persist_start",
1067 files = total_to_process,
1068 "phase B starting: persisting files incrementally as Phase A completes each one",
1069 );
1070
1071 for (idx, stage_result) in rx {
1075 let meta = meta_index.get(&idx).ok_or_else(|| {
1076 AppError::Internal(anyhow::anyhow!(
1077 "channel idx {idx} has no corresponding Process slot"
1078 ))
1079 })?;
1080 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1081 {
1082 SlotMeta::Process {
1083 file_str,
1084 derived_name,
1085 name_truncated,
1086 original_name,
1087 original_filename,
1088 } => (
1089 file_str,
1090 derived_name,
1091 name_truncated,
1092 original_name,
1093 original_filename,
1094 ),
1095 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1096 };
1097
1098 let conn = match conn_or_err.as_mut() {
1100 Ok(c) => c,
1101 Err(err_msg) => {
1102 let err_clone = err_msg.clone();
1103 output::emit_json_compact(&IngestFileEvent {
1104 file: file_str,
1105 name: derived_name,
1106 status: "failed",
1107 truncated: *name_truncated,
1108 original_name: original_name.clone(),
1109 original_filename: original_filename.as_deref(),
1110 error: Some(err_clone.clone()),
1111 memory_id: None,
1112 action: None,
1113 body_length: 0,
1114 })?;
1115 failed += 1;
1116 if fail_fast {
1117 output::emit_json_compact(&IngestSummary {
1118 summary: true,
1119 dir: args.dir.display().to_string(),
1120 pattern: args.pattern.clone(),
1121 recursive: args.recursive,
1122 files_total: total,
1123 files_succeeded: succeeded,
1124 files_failed: failed,
1125 files_skipped: skipped,
1126 elapsed_ms: started.elapsed().as_millis() as u64,
1127 })?;
1128 return Err(AppError::Validation(format!(
1129 "ingest aborted on first failure: {err_clone}"
1130 )));
1131 }
1132 continue;
1133 }
1134 };
1135
1136 let outcome =
1137 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1138
1139 match outcome {
1140 Ok(FileSuccess {
1141 memory_id,
1142 action,
1143 body_length,
1144 }) => {
1145 output::emit_json_compact(&IngestFileEvent {
1146 file: file_str,
1147 name: derived_name,
1148 status: "indexed",
1149 truncated: *name_truncated,
1150 original_name: original_name.clone(),
1151 original_filename: original_filename.as_deref(),
1152 error: None,
1153 memory_id: Some(memory_id),
1154 action: Some(action),
1155 body_length,
1156 })?;
1157 succeeded += 1;
1158 }
1159 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1160 output::emit_json_compact(&IngestFileEvent {
1161 file: file_str,
1162 name: derived_name,
1163 status: "skipped",
1164 truncated: *name_truncated,
1165 original_name: original_name.clone(),
1166 original_filename: original_filename.as_deref(),
1167 error: Some(format!("{e}")),
1168 memory_id: None,
1169 action: Some("duplicate".to_string()),
1170 body_length: 0,
1171 })?;
1172 skipped += 1;
1173 }
1174 Err(e) => {
1175 let err_msg = format!("{e}");
1176 output::emit_json_compact(&IngestFileEvent {
1177 file: file_str,
1178 name: derived_name,
1179 status: "failed",
1180 truncated: *name_truncated,
1181 original_name: original_name.clone(),
1182 original_filename: original_filename.as_deref(),
1183 error: Some(err_msg.clone()),
1184 memory_id: None,
1185 action: None,
1186 body_length: 0,
1187 })?;
1188 failed += 1;
1189 if fail_fast {
1190 output::emit_json_compact(&IngestSummary {
1191 summary: true,
1192 dir: args.dir.display().to_string(),
1193 pattern: args.pattern.clone(),
1194 recursive: args.recursive,
1195 files_total: total,
1196 files_succeeded: succeeded,
1197 files_failed: failed,
1198 files_skipped: skipped,
1199 elapsed_ms: started.elapsed().as_millis() as u64,
1200 })?;
1201 return Err(AppError::Validation(format!(
1202 "ingest aborted on first failure: {err_msg}"
1203 )));
1204 }
1205 }
1206 }
1207 }
1208
1209 producer_handle
1211 .join()
1212 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1213
1214 if let Ok(ref conn) = conn_or_err {
1215 if succeeded > 0 {
1216 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1217 }
1218 }
1219
1220 output::emit_json_compact(&IngestSummary {
1221 summary: true,
1222 dir: args.dir.display().to_string(),
1223 pattern: args.pattern.clone(),
1224 recursive: args.recursive,
1225 files_total: total,
1226 files_succeeded: succeeded,
1227 files_failed: failed,
1228 files_skipped: skipped,
1229 elapsed_ms: started.elapsed().as_millis() as u64,
1230 })?;
1231
1232 Ok(())
1233}
1234
1235fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1241 ensure_db_ready(paths)?;
1242 let conn = open_rw(&paths.db)?;
1243 Ok(conn)
1244}
1245
1246pub(crate) fn collect_files(
1247 dir: &Path,
1248 pattern: &str,
1249 recursive: bool,
1250 out: &mut Vec<PathBuf>,
1251) -> Result<(), AppError> {
1252 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1253 for entry in entries {
1254 let entry = entry.map_err(AppError::Io)?;
1255 let path = entry.path();
1256 let file_type = entry.file_type().map_err(AppError::Io)?;
1257 if file_type.is_file() {
1258 let name = entry.file_name();
1259 let name_str = name.to_string_lossy();
1260 if matches_pattern(&name_str, pattern) {
1261 out.push(path);
1262 }
1263 } else if file_type.is_dir() && recursive {
1264 collect_files(&path, pattern, recursive, out)?;
1265 }
1266 }
1267 Ok(())
1268}
1269
1270fn matches_pattern(name: &str, pattern: &str) -> bool {
1271 if let Some(suffix) = pattern.strip_prefix('*') {
1272 name.ends_with(suffix)
1273 } else if let Some(prefix) = pattern.strip_suffix('*') {
1274 name.starts_with(prefix)
1275 } else {
1276 name == pattern
1277 }
1278}
1279
1280fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1291 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1292 let lowered: String = stem
1293 .nfd()
1294 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1295 .map(|c| {
1296 if c == '_' || c.is_whitespace() {
1297 '-'
1298 } else {
1299 c
1300 }
1301 })
1302 .map(|c| c.to_ascii_lowercase())
1303 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1304 .collect();
1305 let collapsed = collapse_dashes(&lowered);
1306 let trimmed_raw = collapsed.trim_matches('-').to_string();
1307 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1309 format!("doc-{trimmed_raw}")
1310 } else {
1311 trimmed_raw
1312 };
1313 if trimmed.len() > max_len {
1314 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1315 tracing::debug!(
1316 target: "ingest",
1317 original = %trimmed,
1318 truncated_to = %truncated,
1319 max_len = max_len,
1320 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1321 );
1322 (truncated, true, Some(trimmed))
1323 } else {
1324 (trimmed, false, None)
1325 }
1326}
1327
1328fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1341 if !taken.contains(base) {
1342 return Ok(base.to_string());
1343 }
1344 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1345 let candidate = format!("{base}-{suffix}");
1346 if !taken.contains(&candidate) {
1347 tracing::warn!(
1348 target: "ingest",
1349 base = %base,
1350 resolved = %candidate,
1351 suffix,
1352 "memory name collision resolved with numeric suffix"
1353 );
1354 return Ok(candidate);
1355 }
1356 }
1357 Err(AppError::Validation(format!(
1358 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1359 )))
1360}
1361
1362fn collapse_dashes(s: &str) -> String {
1363 let mut out = String::with_capacity(s.len());
1364 let mut prev_dash = false;
1365 for c in s.chars() {
1366 if c == '-' {
1367 if !prev_dash {
1368 out.push('-');
1369 }
1370 prev_dash = true;
1371 } else {
1372 out.push(c);
1373 prev_dash = false;
1374 }
1375 }
1376 out
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use super::*;
1382 use std::path::PathBuf;
1383
1384 #[test]
1385 fn matches_pattern_suffix() {
1386 assert!(matches_pattern("foo.md", "*.md"));
1387 assert!(!matches_pattern("foo.txt", "*.md"));
1388 assert!(matches_pattern("foo.md", "*"));
1389 }
1390
1391 #[test]
1392 fn matches_pattern_prefix() {
1393 assert!(matches_pattern("README.md", "README*"));
1394 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1395 }
1396
1397 #[test]
1398 fn matches_pattern_exact() {
1399 assert!(matches_pattern("README.md", "README.md"));
1400 assert!(!matches_pattern("readme.md", "README.md"));
1401 }
1402
1403 #[test]
1404 fn derive_kebab_underscore_to_dash() {
1405 let p = PathBuf::from("/tmp/claude_code_headless.md");
1406 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1407 assert_eq!(name, "claude-code-headless");
1408 assert!(!truncated);
1409 assert!(original.is_none());
1410 }
1411
1412 #[test]
1413 fn derive_kebab_uppercase_lowered() {
1414 let p = PathBuf::from("/tmp/README.md");
1415 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1416 assert_eq!(name, "readme");
1417 assert!(!truncated);
1418 assert!(original.is_none());
1419 }
1420
1421 #[test]
1422 fn derive_kebab_strips_non_kebab_chars() {
1423 let p = PathBuf::from("/tmp/some@weird#name!.md");
1424 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1425 assert_eq!(name, "someweirdname");
1426 assert!(!truncated);
1427 assert!(original.is_none());
1428 }
1429
1430 #[test]
1433 fn derive_kebab_folds_accented_letters_to_ascii() {
1434 let p = PathBuf::from("/tmp/açaí.md");
1435 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1436 assert_eq!(name, "acai", "got '{name}'");
1437 }
1438
1439 #[test]
1440 fn derive_kebab_handles_naive_with_diaeresis() {
1441 let p = PathBuf::from("/tmp/naïve-test.md");
1442 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1443 assert_eq!(name, "naive-test", "got '{name}'");
1444 }
1445
1446 #[test]
1447 fn derive_kebab_drops_emoji_keeps_word() {
1448 let p = PathBuf::from("/tmp/🚀-rocket.md");
1449 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1450 assert_eq!(name, "rocket", "got '{name}'");
1451 }
1452
1453 #[test]
1454 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1455 let p = PathBuf::from("/tmp/açaí🦜.md");
1456 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1457 assert_eq!(name, "acai", "got '{name}'");
1458 }
1459
1460 #[test]
1461 fn derive_kebab_pure_emoji_yields_empty() {
1462 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1463 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1464 assert!(name.is_empty(), "got '{name}'");
1465 }
1466
1467 #[test]
1468 fn derive_kebab_collapses_consecutive_dashes() {
1469 let p = PathBuf::from("/tmp/a__b___c.md");
1470 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1471 assert_eq!(name, "a-b-c");
1472 assert!(!truncated);
1473 assert!(original.is_none());
1474 }
1475
1476 #[test]
1477 fn derive_kebab_truncates_to_60_chars() {
1478 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1479 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1480 assert!(name.len() <= 60, "got len {}", name.len());
1481 assert!(truncated);
1482 assert!(original.is_some());
1483 assert!(original.unwrap().len() > 60);
1484 }
1485
1486 #[test]
1487 fn collect_files_finds_md_files() {
1488 let tmp = tempfile::tempdir().expect("tempdir");
1489 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1490 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1491 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1492 let mut out = Vec::new();
1493 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1494 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1495 }
1496
1497 #[test]
1498 fn collect_files_recursive_descends_subdirs() {
1499 let tmp = tempfile::tempdir().expect("tempdir");
1500 let sub = tmp.path().join("sub");
1501 std::fs::create_dir(&sub).unwrap();
1502 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1503 std::fs::write(sub.join("b.md"), "y").unwrap();
1504 let mut out = Vec::new();
1505 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1506 assert_eq!(out.len(), 2);
1507 }
1508
1509 #[test]
1510 fn collect_files_non_recursive_skips_subdirs() {
1511 let tmp = tempfile::tempdir().expect("tempdir");
1512 let sub = tmp.path().join("sub");
1513 std::fs::create_dir(&sub).unwrap();
1514 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1515 std::fs::write(sub.join("b.md"), "y").unwrap();
1516 let mut out = Vec::new();
1517 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1518 assert_eq!(out.len(), 1);
1519 }
1520
1521 #[test]
1524 fn derive_kebab_long_basename_truncated_within_cap() {
1525 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1526 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1527 assert!(
1528 name.len() <= DERIVED_NAME_MAX_LEN,
1529 "truncated name must respect cap; got {} chars",
1530 name.len()
1531 );
1532 assert!(!name.is_empty());
1533 assert!(truncated);
1534 assert!(original.is_some());
1535 }
1536
1537 #[test]
1538 fn unique_name_returns_base_when_free() {
1539 let taken: BTreeSet<String> = BTreeSet::new();
1540 let resolved = unique_name("note", &taken).expect("must resolve");
1541 assert_eq!(resolved, "note");
1542 }
1543
1544 #[test]
1545 fn unique_name_appends_first_free_suffix_on_collision() {
1546 let mut taken: BTreeSet<String> = BTreeSet::new();
1547 taken.insert("note".to_string());
1548 taken.insert("note-1".to_string());
1549 let resolved = unique_name("note", &taken).expect("must resolve");
1550 assert_eq!(resolved, "note-2");
1551 }
1552
1553 #[test]
1554 fn unique_name_errors_after_collision_cap() {
1555 let mut taken: BTreeSet<String> = BTreeSet::new();
1556 taken.insert("note".to_string());
1557 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1558 taken.insert(format!("note-{i}"));
1559 }
1560 let err = unique_name("note", &taken).expect_err("must surface error");
1561 assert!(matches!(err, AppError::Validation(_)));
1562 }
1563
1564 #[test]
1567 fn validate_relation_format_accepts_valid_relations() {
1568 use crate::parsers::{is_canonical_relation, validate_relation_format};
1569 assert!(validate_relation_format("applies_to").is_ok());
1570 assert!(validate_relation_format("depends_on").is_ok());
1571 assert!(validate_relation_format("implements").is_ok());
1572 assert!(validate_relation_format("").is_err());
1573 assert!(is_canonical_relation("applies_to"));
1574 assert!(!is_canonical_relation("implements"));
1575 }
1576
1577 use serial_test::serial;
1580
1581 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1583 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1584 let prev = std::env::var(key).ok();
1585 match value {
1586 Some(v) => std::env::set_var(key, v),
1587 None => std::env::remove_var(key),
1588 }
1589 f();
1590 match prev {
1591 Some(p) => std::env::set_var(key, p),
1592 None => std::env::remove_var(key),
1593 }
1594 }
1595
1596 #[test]
1597 #[serial]
1598 fn env_low_memory_enabled_unset_returns_false() {
1599 with_env_var(None, || assert!(!env_low_memory_enabled()));
1600 }
1601
1602 #[test]
1603 #[serial]
1604 fn env_low_memory_enabled_empty_returns_false() {
1605 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1606 }
1607
1608 #[test]
1609 #[serial]
1610 fn env_low_memory_enabled_truthy_values_return_true() {
1611 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1612 with_env_var(Some(v), || {
1613 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1614 });
1615 }
1616 }
1617
1618 #[test]
1619 #[serial]
1620 fn env_low_memory_enabled_falsy_values_return_false() {
1621 for v in ["0", "false", "FALSE", "no", "off"] {
1622 with_env_var(Some(v), || {
1623 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1624 });
1625 }
1626 }
1627
1628 #[test]
1629 #[serial]
1630 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1631 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1632 }
1633
1634 #[test]
1635 #[serial]
1636 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1637 with_env_var(None, || {
1638 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1639 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1640 assert_eq!(resolve_parallelism(true, None), 1);
1641 });
1642 }
1643
1644 #[test]
1645 #[serial]
1646 fn resolve_parallelism_env_forces_one_when_flag_off() {
1647 with_env_var(Some("1"), || {
1648 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1649 assert_eq!(resolve_parallelism(false, None), 1);
1650 });
1651 }
1652
1653 #[test]
1654 #[serial]
1655 fn resolve_parallelism_falsy_env_does_not_override() {
1656 with_env_var(Some("0"), || {
1657 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1658 });
1659 }
1660
1661 #[test]
1662 #[serial]
1663 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1664 with_env_var(None, || {
1665 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1666 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1667 });
1668 }
1669
1670 #[test]
1671 #[serial]
1672 fn resolve_parallelism_default_when_unset() {
1673 with_env_var(None, || {
1674 let p = resolve_parallelism(false, None);
1675 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1676 });
1677 }
1678
1679 #[test]
1680 fn ingest_args_parses_low_memory_flag_via_clap() {
1681 use clap::Parser;
1682 let cli = crate::cli::Cli::try_parse_from([
1685 "sqlite-graphrag",
1686 "ingest",
1687 "/tmp/dummy",
1688 "--type",
1689 "document",
1690 "--low-memory",
1691 ])
1692 .expect("parse must succeed");
1693 match cli.command {
1694 crate::cli::Commands::Ingest(args) => {
1695 assert!(args.low_memory, "--low-memory must set field to true");
1696 }
1697 _ => panic!("expected Ingest subcommand"),
1698 }
1699 }
1700
1701 #[test]
1702 fn ingest_args_low_memory_defaults_false() {
1703 use clap::Parser;
1704 let cli = crate::cli::Cli::try_parse_from([
1705 "sqlite-graphrag",
1706 "ingest",
1707 "/tmp/dummy",
1708 "--type",
1709 "document",
1710 ])
1711 .expect("parse must succeed");
1712 match cli.command {
1713 crate::cli::Commands::Ingest(args) => {
1714 assert!(!args.low_memory, "default must be false");
1715 }
1716 _ => panic!("expected Ingest subcommand"),
1717 }
1718 }
1719}