1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76NOTES:\n \
77 Each file becomes a separate memory. Names derive from file basenames\n \
78 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
79 followed by a final summary line with counts. Per-file errors are reported\n \
80 inline and processing continues unless --fail-fast is set.")]
81pub struct IngestArgs {
82 #[arg(
84 value_name = "DIR",
85 help = "Directory to ingest recursively (each matching file becomes a memory)"
86 )]
87 pub dir: PathBuf,
88
89 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
91 pub r#type: MemoryType,
92
93 #[arg(long, default_value = "*.md")]
96 pub pattern: String,
97
98 #[arg(long, default_value_t = false)]
100 pub recursive: bool,
101
102 #[arg(
103 long,
104 env = "SQLITE_GRAPHRAG_ENABLE_NER",
105 value_parser = crate::parsers::parse_bool_flexible,
106 action = clap::ArgAction::Set,
107 num_args = 0..=1,
108 default_missing_value = "true",
109 default_value = "false",
110 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
111 )]
112 pub enable_ner: bool,
113 #[arg(
114 long,
115 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
116 default_value = "fp32",
117 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
118 )]
119 pub gliner_variant: String,
120
121 #[arg(long, default_value_t = false, hide = true)]
123 pub skip_extraction: bool,
124
125 #[arg(long, default_value_t = false)]
127 pub fail_fast: bool,
128
129 #[arg(long, default_value_t = false)]
131 pub dry_run: bool,
132
133 #[arg(long, default_value_t = 10_000)]
135 pub max_files: usize,
136
137 #[arg(long)]
139 pub namespace: Option<String>,
140
141 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
143 pub db: Option<String>,
144
145 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
146 pub format: JsonOutputFormat,
147
148 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
149 pub json: bool,
150
151 #[arg(
153 long,
154 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
155 )]
156 pub ingest_parallelism: Option<usize>,
157
158 #[arg(
166 long,
167 default_value_t = false,
168 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
169 Recommended for environments with <4 GB available RAM or container/cgroup \
170 constraints. Trade-off: 3-4x longer wall time. Also honored via \
171 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
172 )]
173 pub low_memory: bool,
174
175 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
177 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
178 pub max_rss_mb: u64,
179
180 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
185 help = "Maximum length for derived memory names (default: 60)")]
186 pub max_name_length: usize,
187
188 #[arg(long, value_enum, default_value_t = IngestMode::None)]
190 pub mode: IngestMode,
191
192 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
194 pub claude_binary: Option<std::path::PathBuf>,
195
196 #[arg(long)]
198 pub claude_model: Option<String>,
199
200 #[arg(long, default_value_t = false)]
202 pub resume: bool,
203
204 #[arg(long, default_value_t = false)]
206 pub retry_failed: bool,
207
208 #[arg(long, default_value_t = false)]
210 pub keep_queue: bool,
211
212 #[arg(long, default_value = ".ingest-queue.sqlite")]
214 pub queue_db: String,
215
216 #[arg(long, default_value_t = 60)]
218 pub rate_limit_wait: u64,
219
220 #[arg(long)]
222 pub max_cost_usd: Option<f64>,
223
224 #[arg(
226 long,
227 default_value_t = 300,
228 help = "Timeout in seconds for each claude -p invocation (default: 300)"
229 )]
230 pub claude_timeout: u64,
231}
232
233#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
235pub enum IngestMode {
236 None,
238 Gliner,
240 ClaudeCode,
242}
243
244fn env_low_memory_enabled() -> bool {
249 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
250 Ok(v) if v.is_empty() => false,
251 Ok(v) => match v.to_lowercase().as_str() {
252 "1" | "true" | "yes" | "on" => true,
253 "0" | "false" | "no" | "off" => false,
254 other => {
255 tracing::warn!(
256 target: "ingest",
257 value = %other,
258 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
259 );
260 false
261 }
262 },
263 Err(_) => false,
264 }
265}
266
267fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
279 let env_flag = env_low_memory_enabled();
280 let low_memory = low_memory_flag || env_flag;
281
282 if low_memory {
283 if let Some(n) = ingest_parallelism {
284 if n > 1 {
285 tracing::warn!(
286 target: "ingest",
287 requested = n,
288 "--ingest-parallelism overridden by --low-memory; using 1"
289 );
290 }
291 }
292 if low_memory_flag {
293 tracing::info!(
294 target: "ingest",
295 source = "flag",
296 "low-memory mode enabled: forcing --ingest-parallelism 1"
297 );
298 } else {
299 tracing::info!(
300 target: "ingest",
301 source = "env",
302 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
303 );
304 }
305 return 1;
306 }
307
308 ingest_parallelism
309 .unwrap_or_else(|| {
310 std::thread::available_parallelism()
311 .map(|v| v.get() / 2)
312 .unwrap_or(1)
313 .clamp(1, 4)
314 })
315 .max(1)
316}
317
318#[derive(Serialize)]
319struct IngestFileEvent<'a> {
320 file: &'a str,
321 name: &'a str,
322 status: &'a str,
323 truncated: bool,
325 #[serde(skip_serializing_if = "Option::is_none")]
327 original_name: Option<String>,
328 #[serde(skip_serializing_if = "Option::is_none")]
330 original_filename: Option<&'a str>,
331 #[serde(skip_serializing_if = "Option::is_none")]
332 error: Option<String>,
333 #[serde(skip_serializing_if = "Option::is_none")]
334 memory_id: Option<i64>,
335 #[serde(skip_serializing_if = "Option::is_none")]
336 action: Option<String>,
337 body_length: usize,
339}
340
341#[derive(Serialize)]
342struct IngestSummary {
343 summary: bool,
344 dir: String,
345 pattern: String,
346 recursive: bool,
347 files_total: usize,
348 files_succeeded: usize,
349 files_failed: usize,
350 files_skipped: usize,
351 elapsed_ms: u64,
352}
353
354struct FileSuccess {
356 memory_id: i64,
357 action: String,
358 body_length: usize,
359}
360
361#[derive(Serialize)]
364struct StageProgressEvent<'a> {
365 schema_version: u8,
366 event: &'a str,
367 path: &'a str,
368 ms: u64,
369 entities: usize,
370 relationships: usize,
371}
372
373struct StagedFile {
376 body: String,
377 body_hash: String,
378 snippet: String,
379 name: String,
380 description: String,
381 embedding: Vec<f32>,
382 chunk_embeddings: Option<Vec<Vec<f32>>>,
383 chunks_info: Vec<crate::chunking::Chunk>,
384 entities: Vec<NewEntity>,
385 relationships: Vec<NewRelationship>,
386 entity_embeddings: Vec<Vec<f32>>,
387 urls: Vec<crate::extraction::ExtractedUrl>,
388}
389
390fn stage_file(
393 _idx: usize,
394 path: &Path,
395 name: &str,
396 paths: &AppPaths,
397 enable_ner: bool,
398 gliner_variant: crate::extraction::GlinerVariant,
399 max_rss_mb: u64,
400) -> Result<StagedFile, AppError> {
401 use crate::constants::*;
402
403 if name.len() > MAX_MEMORY_NAME_LEN {
404 return Err(AppError::LimitExceeded(
405 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
406 ));
407 }
408 if name.starts_with("__") {
409 return Err(AppError::Validation(
410 crate::i18n::validation::reserved_name(),
411 ));
412 }
413 {
414 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
415 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
416 if !slug_re.is_match(name) {
417 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
418 name,
419 )));
420 }
421 }
422
423 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
424 if raw_body.len() > MAX_MEMORY_BODY_LEN {
425 return Err(AppError::LimitExceeded(
426 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
427 ));
428 }
429 if raw_body.trim().is_empty() {
430 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
431 }
432
433 let description = format!("ingested from {}", path.display());
434 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
435 return Err(AppError::Validation(
436 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
437 ));
438 }
439
440 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
441 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
442 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
443 if enable_ner {
444 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
445 Ok(extracted) => {
446 extracted_urls = extracted.urls;
447 extracted_entities = extracted.entities;
448 extracted_relationships = extracted.relationships;
449
450 if extracted_entities.len() > max_entities_per_memory() {
451 extracted_entities.truncate(max_entities_per_memory());
452 }
453 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
454 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
455 }
456 }
457 Err(e) => {
458 tracing::warn!(
459 file = %path.display(),
460 "auto-extraction failed (graceful degradation): {e:#}"
461 );
462 }
463 }
464 }
465
466 for rel in &mut extracted_relationships {
467 rel.relation = crate::parsers::normalize_relation(&rel.relation);
468 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
469 return Err(AppError::Validation(format!(
470 "{e} for relationship '{}' -> '{}'",
471 rel.source, rel.target
472 )));
473 }
474 crate::parsers::warn_if_non_canonical(&rel.relation);
475 if !(0.0..=1.0).contains(&rel.strength) {
476 return Err(AppError::Validation(format!(
477 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
478 rel.strength, rel.source, rel.target
479 )));
480 }
481 }
482
483 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
484 let snippet: String = raw_body.chars().take(200).collect();
485
486 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
487 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
488 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
489 return Err(AppError::LimitExceeded(format!(
490 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
491 chunks_info.len(),
492 REMEMBER_MAX_SAFE_MULTI_CHUNKS
493 )));
494 }
495
496 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
497 let embedding = if chunks_info.len() == 1 {
498 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
499 } else {
500 let chunk_texts: Vec<&str> = chunks_info
501 .iter()
502 .map(|c| chunking::chunk_text(&raw_body, c))
503 .collect();
504 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
505 for chunk_text in &chunk_texts {
506 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
507 if rss > max_rss_mb {
508 tracing::error!(
509 rss_mb = rss,
510 max_rss_mb = max_rss_mb,
511 file = %path.display(),
512 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
513 );
514 return Err(AppError::LowMemory {
515 available_mb: crate::memory_guard::available_memory_mb(),
516 required_mb: max_rss_mb,
517 });
518 }
519 }
520 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
521 &paths.models,
522 chunk_text,
523 )?);
524 }
525 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
526 chunk_embeddings_opt = Some(chunk_embeddings);
527 aggregated
528 };
529
530 let entity_embeddings = extracted_entities
531 .iter()
532 .map(|entity| {
533 let entity_text = match &entity.description {
534 Some(desc) => format!("{} {}", entity.name, desc),
535 None => entity.name.clone(),
536 };
537 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
538 })
539 .collect::<Result<Vec<_>, _>>()?;
540
541 Ok(StagedFile {
542 body: raw_body,
543 body_hash,
544 snippet,
545 name: name.to_string(),
546 description,
547 embedding,
548 chunk_embeddings: chunk_embeddings_opt,
549 chunks_info,
550 entities: extracted_entities,
551 relationships: extracted_relationships,
552 entity_embeddings,
553 urls: extracted_urls,
554 })
555}
556
557fn persist_staged(
559 conn: &mut Connection,
560 namespace: &str,
561 memory_type: &str,
562 staged: StagedFile,
563) -> Result<FileSuccess, AppError> {
564 {
565 let active_count: u32 = conn.query_row(
566 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
567 [],
568 |r| r.get::<_, i64>(0).map(|v| v as u32),
569 )?;
570 let ns_exists: bool = conn.query_row(
571 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
572 rusqlite::params![namespace],
573 |r| r.get::<_, i64>(0).map(|v| v > 0),
574 )?;
575 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
576 return Err(AppError::NamespaceError(format!(
577 "active namespace limit of {} exceeded while creating '{namespace}'",
578 crate::constants::MAX_NAMESPACES_ACTIVE
579 )));
580 }
581 }
582
583 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
584 if existing_memory.is_some() {
585 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
586 &staged.name,
587 namespace,
588 )));
589 }
590 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
591
592 let new_memory = NewMemory {
593 namespace: namespace.to_string(),
594 name: staged.name.clone(),
595 memory_type: memory_type.to_string(),
596 description: staged.description.clone(),
597 body: staged.body,
598 body_hash: staged.body_hash,
599 session_id: None,
600 source: "agent".to_string(),
601 metadata: serde_json::json!({}),
602 };
603
604 if let Some(hash_id) = duplicate_hash_id {
605 tracing::debug!(
606 target: "ingest",
607 duplicate_memory_id = hash_id,
608 "identical body already exists; persisting a new memory anyway"
609 );
610 }
611
612 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
613
614 let memory_id = memories::insert(&tx, &new_memory)?;
615 versions::insert_version(
616 &tx,
617 memory_id,
618 1,
619 &staged.name,
620 memory_type,
621 &staged.description,
622 &new_memory.body,
623 &serde_json::to_string(&new_memory.metadata)?,
624 None,
625 "create",
626 )?;
627 memories::upsert_vec(
628 &tx,
629 memory_id,
630 namespace,
631 memory_type,
632 &staged.embedding,
633 &staged.name,
634 &staged.snippet,
635 )?;
636
637 if staged.chunks_info.len() > 1 {
638 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
639 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
640 AppError::Internal(anyhow::anyhow!(
641 "missing chunk embeddings cache on multi-chunk ingest path"
642 ))
643 })?;
644 for (i, emb) in chunk_embeddings.iter().enumerate() {
645 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
646 }
647 }
648
649 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
650 for (idx, entity) in staged.entities.iter().enumerate() {
651 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
652 let entity_embedding = &staged.entity_embeddings[idx];
653 entities::upsert_entity_vec(
654 &tx,
655 entity_id,
656 namespace,
657 entity.entity_type,
658 entity_embedding,
659 &entity.name,
660 )?;
661 entities::link_memory_entity(&tx, memory_id, entity_id)?;
662 entities::increment_degree(&tx, entity_id)?;
663 }
664 let entity_types: std::collections::HashMap<&str, EntityType> = staged
665 .entities
666 .iter()
667 .map(|entity| (entity.name.as_str(), entity.entity_type))
668 .collect();
669 for rel in &staged.relationships {
670 let source_entity = NewEntity {
671 name: rel.source.clone(),
672 entity_type: entity_types
673 .get(rel.source.as_str())
674 .copied()
675 .unwrap_or(EntityType::Concept),
676 description: None,
677 };
678 let target_entity = NewEntity {
679 name: rel.target.clone(),
680 entity_type: entity_types
681 .get(rel.target.as_str())
682 .copied()
683 .unwrap_or(EntityType::Concept),
684 description: None,
685 };
686 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
687 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
688 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
689 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
690 }
691 }
692
693 tx.commit()?;
694
695 if !staged.urls.is_empty() {
696 let url_entries: Vec<storage_urls::MemoryUrl> = staged
697 .urls
698 .into_iter()
699 .map(|u| storage_urls::MemoryUrl {
700 url: u.url,
701 offset: Some(u.offset as i64),
702 })
703 .collect();
704 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
705 }
706
707 Ok(FileSuccess {
708 memory_id,
709 action: "created".to_string(),
710 body_length: new_memory.body.len(),
711 })
712}
713
714pub fn run(args: IngestArgs) -> Result<(), AppError> {
715 if args.mode == IngestMode::ClaudeCode {
716 return super::ingest_claude::run_claude_ingest(&args);
717 }
718
719 let started = std::time::Instant::now();
720
721 if !args.dir.exists() {
722 return Err(AppError::Validation(format!(
723 "directory not found: {}",
724 args.dir.display()
725 )));
726 }
727 if !args.dir.is_dir() {
728 return Err(AppError::Validation(format!(
729 "path is not a directory: {}",
730 args.dir.display()
731 )));
732 }
733
734 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
735 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
736 files.sort();
737
738 if files.len() > args.max_files {
739 return Err(AppError::Validation(format!(
740 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
741 files.len(),
742 args.max_files
743 )));
744 }
745
746 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
747 let memory_type_str = args.r#type.as_str().to_string();
748
749 let paths = AppPaths::resolve(args.db.as_deref())?;
750 let mut conn_or_err = match init_storage(&paths) {
751 Ok(c) => Ok(c),
752 Err(e) => Err(format!("{e}")),
753 };
754
755 let mut succeeded: usize = 0;
756 let mut failed: usize = 0;
757 let mut skipped: usize = 0;
758 let total = files.len();
759
760 let mut taken_names: BTreeSet<String> = BTreeSet::new();
763
764 enum SlotMeta {
770 Skip {
771 file_str: String,
772 derived_base: String,
773 name_truncated: bool,
774 original_name: Option<String>,
775 original_filename: Option<String>,
776 reason: String,
777 },
778 Process {
779 file_str: String,
780 derived_name: String,
781 name_truncated: bool,
782 original_name: Option<String>,
783 original_filename: Option<String>,
784 },
785 }
786
787 struct ProcessItem {
788 idx: usize,
789 path: PathBuf,
790 file_str: String,
791 derived_name: String,
792 }
793
794 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
795 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
796 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
797
798 let max_name_length = args.max_name_length;
799 for path in &files {
800 let file_str = path.to_string_lossy().into_owned();
801 let (derived_base, name_truncated, original_name) =
802 derive_kebab_name(path, max_name_length);
803 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
804
805 if name_truncated {
806 if let Some(ref orig) = original_name {
807 truncations.push((orig.clone(), derived_base.clone()));
808 }
809 }
810
811 if derived_base.is_empty() {
812 let orig_filename = if !original_basename.is_empty() {
814 Some(original_basename.to_string())
815 } else {
816 None
817 };
818 slots_meta.push(SlotMeta::Skip {
819 file_str,
820 derived_base: String::new(),
821 name_truncated: false,
822 original_name: None,
823 original_filename: orig_filename,
824 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
825 });
826 continue;
827 }
828
829 match unique_name(&derived_base, &taken_names) {
830 Ok(derived_name) => {
831 taken_names.insert(derived_name.clone());
832 let idx = slots_meta.len();
833 let orig_filename = if original_basename != derived_name {
835 Some(original_basename.to_string())
836 } else {
837 None
838 };
839 process_items.push(ProcessItem {
840 idx,
841 path: path.clone(),
842 file_str: file_str.clone(),
843 derived_name: derived_name.clone(),
844 });
845 slots_meta.push(SlotMeta::Process {
846 file_str,
847 derived_name,
848 name_truncated,
849 original_name,
850 original_filename: orig_filename,
851 });
852 }
853 Err(e) => {
854 let orig_filename = if original_basename != derived_base {
855 Some(original_basename.to_string())
856 } else {
857 None
858 };
859 slots_meta.push(SlotMeta::Skip {
860 file_str,
861 derived_base,
862 name_truncated,
863 original_name,
864 original_filename: orig_filename,
865 reason: e.to_string(),
866 });
867 }
868 }
869 }
870
871 if !truncations.is_empty() {
872 tracing::info!(
873 target: "ingest",
874 count = truncations.len(),
875 max_name_length = max_name_length,
876 max_len = DERIVED_NAME_MAX_LEN,
877 "derived names truncated; pass -vv (debug) for per-file detail"
878 );
879 }
880
881 if args.dry_run {
883 for meta in &slots_meta {
884 match meta {
885 SlotMeta::Skip {
886 file_str,
887 derived_base,
888 name_truncated,
889 original_name,
890 original_filename,
891 reason,
892 } => {
893 output::emit_json_compact(&IngestFileEvent {
894 file: file_str,
895 name: derived_base,
896 status: "skip",
897 truncated: *name_truncated,
898 original_name: original_name.clone(),
899 original_filename: original_filename.as_deref(),
900 error: Some(reason.clone()),
901 memory_id: None,
902 action: None,
903 body_length: 0,
904 })?;
905 }
906 SlotMeta::Process {
907 file_str,
908 derived_name,
909 name_truncated,
910 original_name,
911 original_filename,
912 } => {
913 output::emit_json_compact(&IngestFileEvent {
914 file: file_str,
915 name: derived_name,
916 status: "preview",
917 truncated: *name_truncated,
918 original_name: original_name.clone(),
919 original_filename: original_filename.as_deref(),
920 error: None,
921 memory_id: None,
922 action: None,
923 body_length: 0,
924 })?;
925 }
926 }
927 }
928 output::emit_json_compact(&IngestSummary {
929 summary: true,
930 dir: args.dir.to_string_lossy().into_owned(),
931 pattern: args.pattern.clone(),
932 recursive: args.recursive,
933 files_total: total,
934 files_succeeded: 0,
935 files_failed: 0,
936 files_skipped: 0,
937 elapsed_ms: started.elapsed().as_millis() as u64,
938 })?;
939 return Ok(());
940 }
941
942 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
945
946 let pool = rayon::ThreadPoolBuilder::new()
947 .num_threads(parallelism)
948 .build()
949 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
950
951 if args.enable_ner && args.skip_extraction {
952 tracing::warn!(
953 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
954 );
955 }
956 if args.skip_extraction && !args.enable_ner {
957 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
958 }
959 let enable_ner = args.enable_ner;
960 let max_rss_mb = args.max_rss_mb;
961 let gliner_variant: crate::extraction::GlinerVariant =
962 args.gliner_variant.parse().unwrap_or_else(|e| {
963 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
964 crate::extraction::GlinerVariant::Fp32
965 });
966
967 let total_to_process = process_items.len();
968 tracing::info!(
969 target = "ingest",
970 phase = "pipeline_start",
971 files = total_to_process,
972 ingest_parallelism = parallelism,
973 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
974 );
975
976 let channel_bound = (parallelism * 2).max(1);
980 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
981
982 let paths_owned = paths.clone();
987 let producer_handle = std::thread::spawn(move || {
988 pool.install(|| {
989 process_items.into_par_iter().for_each(|item| {
990 let t0 = std::time::Instant::now();
991 let result = stage_file(
992 item.idx,
993 &item.path,
994 &item.derived_name,
995 &paths_owned,
996 enable_ner,
997 gliner_variant,
998 max_rss_mb,
999 );
1000 let elapsed_ms = t0.elapsed().as_millis() as u64;
1001
1002 let (n_entities, n_relationships) = match &result {
1005 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1006 Err(_) => (0, 0),
1007 };
1008 let progress = StageProgressEvent {
1009 schema_version: 1,
1010 event: "file_extracted",
1011 path: &item.file_str,
1012 ms: elapsed_ms,
1013 entities: n_entities,
1014 relationships: n_relationships,
1015 };
1016 if let Ok(line) = serde_json::to_string(&progress) {
1017 eprintln!("{line}");
1018 }
1019
1020 let _ = tx.send((item.idx, result));
1024 });
1025 drop(tx);
1027 });
1028 });
1029
1030 let fail_fast = args.fail_fast;
1042
1043 for meta in &slots_meta {
1045 if let SlotMeta::Skip {
1046 file_str,
1047 derived_base,
1048 name_truncated,
1049 original_name,
1050 original_filename,
1051 reason,
1052 } = meta
1053 {
1054 output::emit_json_compact(&IngestFileEvent {
1055 file: file_str,
1056 name: derived_base,
1057 status: "skipped",
1058 truncated: *name_truncated,
1059 original_name: original_name.clone(),
1060 original_filename: original_filename.as_deref(),
1061 error: Some(reason.clone()),
1062 memory_id: None,
1063 action: None,
1064 body_length: 0,
1065 })?;
1066 skipped += 1;
1067 }
1068 }
1069
1070 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1073 .iter()
1074 .enumerate()
1075 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1076 .collect();
1077
1078 tracing::info!(
1079 target = "ingest",
1080 phase = "persist_start",
1081 files = total_to_process,
1082 "phase B starting: persisting files incrementally as Phase A completes each one",
1083 );
1084
1085 for (idx, stage_result) in rx {
1089 let meta = meta_index.get(&idx).ok_or_else(|| {
1090 AppError::Internal(anyhow::anyhow!(
1091 "channel idx {idx} has no corresponding Process slot"
1092 ))
1093 })?;
1094 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1095 {
1096 SlotMeta::Process {
1097 file_str,
1098 derived_name,
1099 name_truncated,
1100 original_name,
1101 original_filename,
1102 } => (
1103 file_str,
1104 derived_name,
1105 name_truncated,
1106 original_name,
1107 original_filename,
1108 ),
1109 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1110 };
1111
1112 let conn = match conn_or_err.as_mut() {
1114 Ok(c) => c,
1115 Err(err_msg) => {
1116 let err_clone = err_msg.clone();
1117 output::emit_json_compact(&IngestFileEvent {
1118 file: file_str,
1119 name: derived_name,
1120 status: "failed",
1121 truncated: *name_truncated,
1122 original_name: original_name.clone(),
1123 original_filename: original_filename.as_deref(),
1124 error: Some(err_clone.clone()),
1125 memory_id: None,
1126 action: None,
1127 body_length: 0,
1128 })?;
1129 failed += 1;
1130 if fail_fast {
1131 output::emit_json_compact(&IngestSummary {
1132 summary: true,
1133 dir: args.dir.display().to_string(),
1134 pattern: args.pattern.clone(),
1135 recursive: args.recursive,
1136 files_total: total,
1137 files_succeeded: succeeded,
1138 files_failed: failed,
1139 files_skipped: skipped,
1140 elapsed_ms: started.elapsed().as_millis() as u64,
1141 })?;
1142 return Err(AppError::Validation(format!(
1143 "ingest aborted on first failure: {err_clone}"
1144 )));
1145 }
1146 continue;
1147 }
1148 };
1149
1150 let outcome =
1151 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1152
1153 match outcome {
1154 Ok(FileSuccess {
1155 memory_id,
1156 action,
1157 body_length,
1158 }) => {
1159 output::emit_json_compact(&IngestFileEvent {
1160 file: file_str,
1161 name: derived_name,
1162 status: "indexed",
1163 truncated: *name_truncated,
1164 original_name: original_name.clone(),
1165 original_filename: original_filename.as_deref(),
1166 error: None,
1167 memory_id: Some(memory_id),
1168 action: Some(action),
1169 body_length,
1170 })?;
1171 succeeded += 1;
1172 }
1173 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1174 output::emit_json_compact(&IngestFileEvent {
1175 file: file_str,
1176 name: derived_name,
1177 status: "skipped",
1178 truncated: *name_truncated,
1179 original_name: original_name.clone(),
1180 original_filename: original_filename.as_deref(),
1181 error: Some(format!("{e}")),
1182 memory_id: None,
1183 action: Some("duplicate".to_string()),
1184 body_length: 0,
1185 })?;
1186 skipped += 1;
1187 }
1188 Err(e) => {
1189 let err_msg = format!("{e}");
1190 output::emit_json_compact(&IngestFileEvent {
1191 file: file_str,
1192 name: derived_name,
1193 status: "failed",
1194 truncated: *name_truncated,
1195 original_name: original_name.clone(),
1196 original_filename: original_filename.as_deref(),
1197 error: Some(err_msg.clone()),
1198 memory_id: None,
1199 action: None,
1200 body_length: 0,
1201 })?;
1202 failed += 1;
1203 if fail_fast {
1204 output::emit_json_compact(&IngestSummary {
1205 summary: true,
1206 dir: args.dir.display().to_string(),
1207 pattern: args.pattern.clone(),
1208 recursive: args.recursive,
1209 files_total: total,
1210 files_succeeded: succeeded,
1211 files_failed: failed,
1212 files_skipped: skipped,
1213 elapsed_ms: started.elapsed().as_millis() as u64,
1214 })?;
1215 return Err(AppError::Validation(format!(
1216 "ingest aborted on first failure: {err_msg}"
1217 )));
1218 }
1219 }
1220 }
1221 }
1222
1223 producer_handle
1225 .join()
1226 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1227
1228 if let Ok(ref conn) = conn_or_err {
1229 if succeeded > 0 {
1230 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1231 }
1232 }
1233
1234 output::emit_json_compact(&IngestSummary {
1235 summary: true,
1236 dir: args.dir.display().to_string(),
1237 pattern: args.pattern.clone(),
1238 recursive: args.recursive,
1239 files_total: total,
1240 files_succeeded: succeeded,
1241 files_failed: failed,
1242 files_skipped: skipped,
1243 elapsed_ms: started.elapsed().as_millis() as u64,
1244 })?;
1245
1246 Ok(())
1247}
1248
1249fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1255 ensure_db_ready(paths)?;
1256 let conn = open_rw(&paths.db)?;
1257 Ok(conn)
1258}
1259
1260pub(crate) fn collect_files(
1261 dir: &Path,
1262 pattern: &str,
1263 recursive: bool,
1264 out: &mut Vec<PathBuf>,
1265) -> Result<(), AppError> {
1266 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1267 for entry in entries {
1268 let entry = entry.map_err(AppError::Io)?;
1269 let path = entry.path();
1270 let file_type = entry.file_type().map_err(AppError::Io)?;
1271 if file_type.is_file() {
1272 let name = entry.file_name();
1273 let name_str = name.to_string_lossy();
1274 if matches_pattern(&name_str, pattern) {
1275 out.push(path);
1276 }
1277 } else if file_type.is_dir() && recursive {
1278 collect_files(&path, pattern, recursive, out)?;
1279 }
1280 }
1281 Ok(())
1282}
1283
1284fn matches_pattern(name: &str, pattern: &str) -> bool {
1285 if let Some(suffix) = pattern.strip_prefix('*') {
1286 name.ends_with(suffix)
1287 } else if let Some(prefix) = pattern.strip_suffix('*') {
1288 name.starts_with(prefix)
1289 } else {
1290 name == pattern
1291 }
1292}
1293
1294pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1305 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1306 let lowered: String = stem
1307 .nfd()
1308 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1309 .map(|c| {
1310 if c == '_' || c.is_whitespace() {
1311 '-'
1312 } else {
1313 c
1314 }
1315 })
1316 .map(|c| c.to_ascii_lowercase())
1317 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1318 .collect();
1319 let collapsed = collapse_dashes(&lowered);
1320 let trimmed_raw = collapsed.trim_matches('-').to_string();
1321 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1323 format!("doc-{trimmed_raw}")
1324 } else {
1325 trimmed_raw
1326 };
1327 if trimmed.len() > max_len {
1328 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1329 tracing::debug!(
1330 target: "ingest",
1331 original = %trimmed,
1332 truncated_to = %truncated,
1333 max_len = max_len,
1334 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1335 );
1336 (truncated, true, Some(trimmed))
1337 } else {
1338 (trimmed, false, None)
1339 }
1340}
1341
1342fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1355 if !taken.contains(base) {
1356 return Ok(base.to_string());
1357 }
1358 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1359 let candidate = format!("{base}-{suffix}");
1360 if !taken.contains(&candidate) {
1361 tracing::warn!(
1362 target: "ingest",
1363 base = %base,
1364 resolved = %candidate,
1365 suffix,
1366 "memory name collision resolved with numeric suffix"
1367 );
1368 return Ok(candidate);
1369 }
1370 }
1371 Err(AppError::Validation(format!(
1372 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1373 )))
1374}
1375
1376fn collapse_dashes(s: &str) -> String {
1377 let mut out = String::with_capacity(s.len());
1378 let mut prev_dash = false;
1379 for c in s.chars() {
1380 if c == '-' {
1381 if !prev_dash {
1382 out.push('-');
1383 }
1384 prev_dash = true;
1385 } else {
1386 out.push(c);
1387 prev_dash = false;
1388 }
1389 }
1390 out
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395 use super::*;
1396 use std::path::PathBuf;
1397
1398 #[test]
1399 fn matches_pattern_suffix() {
1400 assert!(matches_pattern("foo.md", "*.md"));
1401 assert!(!matches_pattern("foo.txt", "*.md"));
1402 assert!(matches_pattern("foo.md", "*"));
1403 }
1404
1405 #[test]
1406 fn matches_pattern_prefix() {
1407 assert!(matches_pattern("README.md", "README*"));
1408 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1409 }
1410
1411 #[test]
1412 fn matches_pattern_exact() {
1413 assert!(matches_pattern("README.md", "README.md"));
1414 assert!(!matches_pattern("readme.md", "README.md"));
1415 }
1416
1417 #[test]
1418 fn derive_kebab_underscore_to_dash() {
1419 let p = PathBuf::from("/tmp/claude_code_headless.md");
1420 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1421 assert_eq!(name, "claude-code-headless");
1422 assert!(!truncated);
1423 assert!(original.is_none());
1424 }
1425
1426 #[test]
1427 fn derive_kebab_uppercase_lowered() {
1428 let p = PathBuf::from("/tmp/README.md");
1429 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1430 assert_eq!(name, "readme");
1431 assert!(!truncated);
1432 assert!(original.is_none());
1433 }
1434
1435 #[test]
1436 fn derive_kebab_strips_non_kebab_chars() {
1437 let p = PathBuf::from("/tmp/some@weird#name!.md");
1438 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1439 assert_eq!(name, "someweirdname");
1440 assert!(!truncated);
1441 assert!(original.is_none());
1442 }
1443
1444 #[test]
1447 fn derive_kebab_folds_accented_letters_to_ascii() {
1448 let p = PathBuf::from("/tmp/açaí.md");
1449 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1450 assert_eq!(name, "acai", "got '{name}'");
1451 }
1452
1453 #[test]
1454 fn derive_kebab_handles_naive_with_diaeresis() {
1455 let p = PathBuf::from("/tmp/naïve-test.md");
1456 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1457 assert_eq!(name, "naive-test", "got '{name}'");
1458 }
1459
1460 #[test]
1461 fn derive_kebab_drops_emoji_keeps_word() {
1462 let p = PathBuf::from("/tmp/🚀-rocket.md");
1463 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1464 assert_eq!(name, "rocket", "got '{name}'");
1465 }
1466
1467 #[test]
1468 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1469 let p = PathBuf::from("/tmp/açaí🦜.md");
1470 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1471 assert_eq!(name, "acai", "got '{name}'");
1472 }
1473
1474 #[test]
1475 fn derive_kebab_pure_emoji_yields_empty() {
1476 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1477 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1478 assert!(name.is_empty(), "got '{name}'");
1479 }
1480
1481 #[test]
1482 fn derive_kebab_collapses_consecutive_dashes() {
1483 let p = PathBuf::from("/tmp/a__b___c.md");
1484 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1485 assert_eq!(name, "a-b-c");
1486 assert!(!truncated);
1487 assert!(original.is_none());
1488 }
1489
1490 #[test]
1491 fn derive_kebab_truncates_to_60_chars() {
1492 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1493 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1494 assert!(name.len() <= 60, "got len {}", name.len());
1495 assert!(truncated);
1496 assert!(original.is_some());
1497 assert!(original.unwrap().len() > 60);
1498 }
1499
1500 #[test]
1501 fn collect_files_finds_md_files() {
1502 let tmp = tempfile::tempdir().expect("tempdir");
1503 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1504 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1505 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1506 let mut out = Vec::new();
1507 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1508 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1509 }
1510
1511 #[test]
1512 fn collect_files_recursive_descends_subdirs() {
1513 let tmp = tempfile::tempdir().expect("tempdir");
1514 let sub = tmp.path().join("sub");
1515 std::fs::create_dir(&sub).unwrap();
1516 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1517 std::fs::write(sub.join("b.md"), "y").unwrap();
1518 let mut out = Vec::new();
1519 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1520 assert_eq!(out.len(), 2);
1521 }
1522
1523 #[test]
1524 fn collect_files_non_recursive_skips_subdirs() {
1525 let tmp = tempfile::tempdir().expect("tempdir");
1526 let sub = tmp.path().join("sub");
1527 std::fs::create_dir(&sub).unwrap();
1528 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1529 std::fs::write(sub.join("b.md"), "y").unwrap();
1530 let mut out = Vec::new();
1531 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1532 assert_eq!(out.len(), 1);
1533 }
1534
1535 #[test]
1538 fn derive_kebab_long_basename_truncated_within_cap() {
1539 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1540 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1541 assert!(
1542 name.len() <= DERIVED_NAME_MAX_LEN,
1543 "truncated name must respect cap; got {} chars",
1544 name.len()
1545 );
1546 assert!(!name.is_empty());
1547 assert!(truncated);
1548 assert!(original.is_some());
1549 }
1550
1551 #[test]
1552 fn unique_name_returns_base_when_free() {
1553 let taken: BTreeSet<String> = BTreeSet::new();
1554 let resolved = unique_name("note", &taken).expect("must resolve");
1555 assert_eq!(resolved, "note");
1556 }
1557
1558 #[test]
1559 fn unique_name_appends_first_free_suffix_on_collision() {
1560 let mut taken: BTreeSet<String> = BTreeSet::new();
1561 taken.insert("note".to_string());
1562 taken.insert("note-1".to_string());
1563 let resolved = unique_name("note", &taken).expect("must resolve");
1564 assert_eq!(resolved, "note-2");
1565 }
1566
1567 #[test]
1568 fn unique_name_errors_after_collision_cap() {
1569 let mut taken: BTreeSet<String> = BTreeSet::new();
1570 taken.insert("note".to_string());
1571 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1572 taken.insert(format!("note-{i}"));
1573 }
1574 let err = unique_name("note", &taken).expect_err("must surface error");
1575 assert!(matches!(err, AppError::Validation(_)));
1576 }
1577
1578 #[test]
1581 fn validate_relation_format_accepts_valid_relations() {
1582 use crate::parsers::{is_canonical_relation, validate_relation_format};
1583 assert!(validate_relation_format("applies_to").is_ok());
1584 assert!(validate_relation_format("depends_on").is_ok());
1585 assert!(validate_relation_format("implements").is_ok());
1586 assert!(validate_relation_format("").is_err());
1587 assert!(is_canonical_relation("applies_to"));
1588 assert!(!is_canonical_relation("implements"));
1589 }
1590
1591 use serial_test::serial;
1594
1595 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1597 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1598 let prev = std::env::var(key).ok();
1599 match value {
1600 Some(v) => std::env::set_var(key, v),
1601 None => std::env::remove_var(key),
1602 }
1603 f();
1604 match prev {
1605 Some(p) => std::env::set_var(key, p),
1606 None => std::env::remove_var(key),
1607 }
1608 }
1609
1610 #[test]
1611 #[serial]
1612 fn env_low_memory_enabled_unset_returns_false() {
1613 with_env_var(None, || assert!(!env_low_memory_enabled()));
1614 }
1615
1616 #[test]
1617 #[serial]
1618 fn env_low_memory_enabled_empty_returns_false() {
1619 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1620 }
1621
1622 #[test]
1623 #[serial]
1624 fn env_low_memory_enabled_truthy_values_return_true() {
1625 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1626 with_env_var(Some(v), || {
1627 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1628 });
1629 }
1630 }
1631
1632 #[test]
1633 #[serial]
1634 fn env_low_memory_enabled_falsy_values_return_false() {
1635 for v in ["0", "false", "FALSE", "no", "off"] {
1636 with_env_var(Some(v), || {
1637 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1638 });
1639 }
1640 }
1641
1642 #[test]
1643 #[serial]
1644 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1645 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1646 }
1647
1648 #[test]
1649 #[serial]
1650 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1651 with_env_var(None, || {
1652 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1653 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1654 assert_eq!(resolve_parallelism(true, None), 1);
1655 });
1656 }
1657
1658 #[test]
1659 #[serial]
1660 fn resolve_parallelism_env_forces_one_when_flag_off() {
1661 with_env_var(Some("1"), || {
1662 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1663 assert_eq!(resolve_parallelism(false, None), 1);
1664 });
1665 }
1666
1667 #[test]
1668 #[serial]
1669 fn resolve_parallelism_falsy_env_does_not_override() {
1670 with_env_var(Some("0"), || {
1671 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1672 });
1673 }
1674
1675 #[test]
1676 #[serial]
1677 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1678 with_env_var(None, || {
1679 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1680 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1681 });
1682 }
1683
1684 #[test]
1685 #[serial]
1686 fn resolve_parallelism_default_when_unset() {
1687 with_env_var(None, || {
1688 let p = resolve_parallelism(false, None);
1689 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1690 });
1691 }
1692
1693 #[test]
1694 fn ingest_args_parses_low_memory_flag_via_clap() {
1695 use clap::Parser;
1696 let cli = crate::cli::Cli::try_parse_from([
1699 "sqlite-graphrag",
1700 "ingest",
1701 "/tmp/dummy",
1702 "--type",
1703 "document",
1704 "--low-memory",
1705 ])
1706 .expect("parse must succeed");
1707 match cli.command {
1708 crate::cli::Commands::Ingest(args) => {
1709 assert!(args.low_memory, "--low-memory must set field to true");
1710 }
1711 _ => panic!("expected Ingest subcommand"),
1712 }
1713 }
1714
1715 #[test]
1716 fn ingest_args_low_memory_defaults_false() {
1717 use clap::Parser;
1718 let cli = crate::cli::Cli::try_parse_from([
1719 "sqlite-graphrag",
1720 "ingest",
1721 "/tmp/dummy",
1722 "--type",
1723 "document",
1724 ])
1725 .expect("parse must succeed");
1726 match cli.command {
1727 crate::cli::Commands::Ingest(args) => {
1728 assert!(!args.low_memory, "default must be false");
1729 }
1730 _ => panic!("expected Ingest subcommand"),
1731 }
1732 }
1733}