1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
118 )]
119 pub enable_ner: bool,
120 #[arg(
121 long,
122 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123 default_value = "fp32",
124 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
125 )]
126 pub gliner_variant: String,
127
128 #[arg(long, default_value_t = false, hide = true)]
130 pub skip_extraction: bool,
131
132 #[arg(long, default_value_t = false)]
134 pub fail_fast: bool,
135
136 #[arg(long, default_value_t = false)]
138 pub dry_run: bool,
139
140 #[arg(long, default_value_t = 10_000)]
142 pub max_files: usize,
143
144 #[arg(long)]
146 pub namespace: Option<String>,
147
148 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150 pub db: Option<String>,
151
152 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153 pub format: JsonOutputFormat,
154
155 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156 pub json: bool,
157
158 #[arg(
160 long,
161 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162 )]
163 pub ingest_parallelism: Option<usize>,
164
165 #[arg(
173 long,
174 default_value_t = false,
175 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176 Recommended for environments with <4 GB available RAM or container/cgroup \
177 constraints. Trade-off: 3-4x longer wall time. Also honored via \
178 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179 )]
180 pub low_memory: bool,
181
182 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185 pub max_rss_mb: u64,
186
187 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
192 help = "Maximum length for derived memory names (default: 60)")]
193 pub max_name_length: usize,
194
195 #[arg(long, value_enum, default_value_t = IngestMode::None)]
197 pub mode: IngestMode,
198
199 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
201 pub claude_binary: Option<std::path::PathBuf>,
202
203 #[arg(long)]
205 pub claude_model: Option<String>,
206
207 #[arg(long, default_value_t = false)]
209 pub resume: bool,
210
211 #[arg(long, default_value_t = false)]
213 pub retry_failed: bool,
214
215 #[arg(long, default_value_t = false)]
217 pub keep_queue: bool,
218
219 #[arg(long, default_value = ".ingest-queue.sqlite")]
221 pub queue_db: String,
222
223 #[arg(long, default_value_t = 60)]
225 pub rate_limit_wait: u64,
226
227 #[arg(long)]
229 pub max_cost_usd: Option<f64>,
230
231 #[arg(
233 long,
234 default_value_t = 300,
235 help = "Timeout in seconds for each claude -p invocation (default: 300)"
236 )]
237 pub claude_timeout: u64,
238
239 #[arg(
241 long,
242 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
243 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
244 )]
245 pub codex_binary: Option<PathBuf>,
246
247 #[arg(
249 long,
250 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
251 )]
252 pub codex_model: Option<String>,
253
254 #[arg(
256 long,
257 default_value_t = 300,
258 help = "Timeout in seconds for each codex exec invocation (default: 300)"
259 )]
260 pub codex_timeout: u64,
261}
262
263#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
265pub enum IngestMode {
266 None,
268 Gliner,
270 ClaudeCode,
272 Codex,
274}
275
276fn env_low_memory_enabled() -> bool {
281 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
282 Ok(v) if v.is_empty() => false,
283 Ok(v) => match v.to_lowercase().as_str() {
284 "1" | "true" | "yes" | "on" => true,
285 "0" | "false" | "no" | "off" => false,
286 other => {
287 tracing::warn!(
288 target: "ingest",
289 value = %other,
290 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
291 );
292 false
293 }
294 },
295 Err(_) => false,
296 }
297}
298
299fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
311 let env_flag = env_low_memory_enabled();
312 let low_memory = low_memory_flag || env_flag;
313
314 if low_memory {
315 if let Some(n) = ingest_parallelism {
316 if n > 1 {
317 tracing::warn!(
318 target: "ingest",
319 requested = n,
320 "--ingest-parallelism overridden by --low-memory; using 1"
321 );
322 }
323 }
324 if low_memory_flag {
325 tracing::info!(
326 target: "ingest",
327 source = "flag",
328 "low-memory mode enabled: forcing --ingest-parallelism 1"
329 );
330 } else {
331 tracing::info!(
332 target: "ingest",
333 source = "env",
334 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
335 );
336 }
337 return 1;
338 }
339
340 ingest_parallelism
341 .unwrap_or_else(|| {
342 std::thread::available_parallelism()
343 .map(|v| v.get() / 2)
344 .unwrap_or(1)
345 .clamp(1, 4)
346 })
347 .max(1)
348}
349
350#[derive(Serialize)]
351struct IngestFileEvent<'a> {
352 file: &'a str,
353 name: &'a str,
354 status: &'a str,
355 truncated: bool,
357 #[serde(skip_serializing_if = "Option::is_none")]
359 original_name: Option<String>,
360 #[serde(skip_serializing_if = "Option::is_none")]
362 original_filename: Option<&'a str>,
363 #[serde(skip_serializing_if = "Option::is_none")]
364 error: Option<String>,
365 #[serde(skip_serializing_if = "Option::is_none")]
366 memory_id: Option<i64>,
367 #[serde(skip_serializing_if = "Option::is_none")]
368 action: Option<String>,
369 body_length: usize,
371}
372
373#[derive(Serialize)]
374struct IngestSummary {
375 summary: bool,
376 dir: String,
377 pattern: String,
378 recursive: bool,
379 files_total: usize,
380 files_succeeded: usize,
381 files_failed: usize,
382 files_skipped: usize,
383 elapsed_ms: u64,
384}
385
386struct FileSuccess {
388 memory_id: i64,
389 action: String,
390 body_length: usize,
391}
392
393#[derive(Serialize)]
396struct StageProgressEvent<'a> {
397 schema_version: u8,
398 event: &'a str,
399 path: &'a str,
400 ms: u64,
401 entities: usize,
402 relationships: usize,
403}
404
405struct StagedFile {
408 body: String,
409 body_hash: String,
410 snippet: String,
411 name: String,
412 description: String,
413 embedding: Vec<f32>,
414 chunk_embeddings: Option<Vec<Vec<f32>>>,
415 chunks_info: Vec<crate::chunking::Chunk>,
416 entities: Vec<NewEntity>,
417 relationships: Vec<NewRelationship>,
418 entity_embeddings: Vec<Vec<f32>>,
419 urls: Vec<crate::extraction::ExtractedUrl>,
420}
421
422fn stage_file(
425 _idx: usize,
426 path: &Path,
427 name: &str,
428 paths: &AppPaths,
429 enable_ner: bool,
430 gliner_variant: crate::extraction::GlinerVariant,
431 max_rss_mb: u64,
432) -> Result<StagedFile, AppError> {
433 use crate::constants::*;
434
435 if name.len() > MAX_MEMORY_NAME_LEN {
436 return Err(AppError::LimitExceeded(
437 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
438 ));
439 }
440 if name.starts_with("__") {
441 return Err(AppError::Validation(
442 crate::i18n::validation::reserved_name(),
443 ));
444 }
445 {
446 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
447 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
448 if !slug_re.is_match(name) {
449 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
450 name,
451 )));
452 }
453 }
454
455 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
456 if raw_body.len() > MAX_MEMORY_BODY_LEN {
457 return Err(AppError::LimitExceeded(
458 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
459 ));
460 }
461 if raw_body.trim().is_empty() {
462 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
463 }
464
465 let description = format!("ingested from {}", path.display());
466 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
467 return Err(AppError::Validation(
468 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
469 ));
470 }
471
472 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
473 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
474 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
475 if enable_ner {
476 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
477 Ok(extracted) => {
478 extracted_urls = extracted.urls;
479 extracted_entities = extracted.entities;
480 extracted_relationships = extracted.relationships;
481
482 if extracted_entities.len() > max_entities_per_memory() {
483 extracted_entities.truncate(max_entities_per_memory());
484 }
485 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
486 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
487 }
488 }
489 Err(e) => {
490 tracing::warn!(
491 file = %path.display(),
492 "auto-extraction failed (graceful degradation): {e:#}"
493 );
494 }
495 }
496 }
497
498 for rel in &mut extracted_relationships {
499 rel.relation = crate::parsers::normalize_relation(&rel.relation);
500 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
501 return Err(AppError::Validation(format!(
502 "{e} for relationship '{}' -> '{}'",
503 rel.source, rel.target
504 )));
505 }
506 crate::parsers::warn_if_non_canonical(&rel.relation);
507 if !(0.0..=1.0).contains(&rel.strength) {
508 return Err(AppError::Validation(format!(
509 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
510 rel.strength, rel.source, rel.target
511 )));
512 }
513 }
514
515 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
516 let snippet: String = raw_body.chars().take(200).collect();
517
518 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
519 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
520 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
521 return Err(AppError::LimitExceeded(format!(
522 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
523 chunks_info.len(),
524 REMEMBER_MAX_SAFE_MULTI_CHUNKS
525 )));
526 }
527
528 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
529 let embedding = if chunks_info.len() == 1 {
530 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
531 } else {
532 let chunk_texts: Vec<&str> = chunks_info
533 .iter()
534 .map(|c| chunking::chunk_text(&raw_body, c))
535 .collect();
536 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
537 for chunk_text in &chunk_texts {
538 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
539 if rss > max_rss_mb {
540 tracing::error!(
541 rss_mb = rss,
542 max_rss_mb = max_rss_mb,
543 file = %path.display(),
544 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
545 );
546 return Err(AppError::LowMemory {
547 available_mb: crate::memory_guard::available_memory_mb(),
548 required_mb: max_rss_mb,
549 });
550 }
551 }
552 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
553 &paths.models,
554 chunk_text,
555 )?);
556 }
557 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
558 chunk_embeddings_opt = Some(chunk_embeddings);
559 aggregated
560 };
561
562 let entity_embeddings = extracted_entities
563 .iter()
564 .map(|entity| {
565 let entity_text = match &entity.description {
566 Some(desc) => format!("{} {}", entity.name, desc),
567 None => entity.name.clone(),
568 };
569 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
570 })
571 .collect::<Result<Vec<_>, _>>()?;
572
573 Ok(StagedFile {
574 body: raw_body,
575 body_hash,
576 snippet,
577 name: name.to_string(),
578 description,
579 embedding,
580 chunk_embeddings: chunk_embeddings_opt,
581 chunks_info,
582 entities: extracted_entities,
583 relationships: extracted_relationships,
584 entity_embeddings,
585 urls: extracted_urls,
586 })
587}
588
589fn persist_staged(
591 conn: &mut Connection,
592 namespace: &str,
593 memory_type: &str,
594 staged: StagedFile,
595) -> Result<FileSuccess, AppError> {
596 {
597 let active_count: u32 = conn.query_row(
598 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
599 [],
600 |r| r.get::<_, i64>(0).map(|v| v as u32),
601 )?;
602 let ns_exists: bool = conn.query_row(
603 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
604 rusqlite::params![namespace],
605 |r| r.get::<_, i64>(0).map(|v| v > 0),
606 )?;
607 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
608 return Err(AppError::NamespaceError(format!(
609 "active namespace limit of {} exceeded while creating '{namespace}'",
610 crate::constants::MAX_NAMESPACES_ACTIVE
611 )));
612 }
613 }
614
615 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
616 if existing_memory.is_some() {
617 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
618 &staged.name,
619 namespace,
620 )));
621 }
622 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
623
624 let new_memory = NewMemory {
625 namespace: namespace.to_string(),
626 name: staged.name.clone(),
627 memory_type: memory_type.to_string(),
628 description: staged.description.clone(),
629 body: staged.body,
630 body_hash: staged.body_hash,
631 session_id: None,
632 source: "agent".to_string(),
633 metadata: serde_json::json!({}),
634 };
635
636 if let Some(hash_id) = duplicate_hash_id {
637 tracing::debug!(
638 target: "ingest",
639 duplicate_memory_id = hash_id,
640 "identical body already exists; persisting a new memory anyway"
641 );
642 }
643
644 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
645
646 let memory_id = memories::insert(&tx, &new_memory)?;
647 versions::insert_version(
648 &tx,
649 memory_id,
650 1,
651 &staged.name,
652 memory_type,
653 &staged.description,
654 &new_memory.body,
655 &serde_json::to_string(&new_memory.metadata)?,
656 None,
657 "create",
658 )?;
659 memories::upsert_vec(
660 &tx,
661 memory_id,
662 namespace,
663 memory_type,
664 &staged.embedding,
665 &staged.name,
666 &staged.snippet,
667 )?;
668
669 if staged.chunks_info.len() > 1 {
670 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
671 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
672 AppError::Internal(anyhow::anyhow!(
673 "missing chunk embeddings cache on multi-chunk ingest path"
674 ))
675 })?;
676 for (i, emb) in chunk_embeddings.iter().enumerate() {
677 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
678 }
679 }
680
681 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
682 for (idx, entity) in staged.entities.iter().enumerate() {
683 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
684 let entity_embedding = &staged.entity_embeddings[idx];
685 entities::upsert_entity_vec(
686 &tx,
687 entity_id,
688 namespace,
689 entity.entity_type,
690 entity_embedding,
691 &entity.name,
692 )?;
693 entities::link_memory_entity(&tx, memory_id, entity_id)?;
694 entities::increment_degree(&tx, entity_id)?;
695 }
696 let entity_types: std::collections::HashMap<&str, EntityType> = staged
697 .entities
698 .iter()
699 .map(|entity| (entity.name.as_str(), entity.entity_type))
700 .collect();
701 for rel in &staged.relationships {
702 let source_entity = NewEntity {
703 name: rel.source.clone(),
704 entity_type: entity_types
705 .get(rel.source.as_str())
706 .copied()
707 .unwrap_or(EntityType::Concept),
708 description: None,
709 };
710 let target_entity = NewEntity {
711 name: rel.target.clone(),
712 entity_type: entity_types
713 .get(rel.target.as_str())
714 .copied()
715 .unwrap_or(EntityType::Concept),
716 description: None,
717 };
718 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
719 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
720 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
721 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
722 }
723 }
724
725 tx.commit()?;
726
727 if !staged.urls.is_empty() {
728 let url_entries: Vec<storage_urls::MemoryUrl> = staged
729 .urls
730 .into_iter()
731 .map(|u| storage_urls::MemoryUrl {
732 url: u.url,
733 offset: Some(u.offset as i64),
734 })
735 .collect();
736 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
737 }
738
739 Ok(FileSuccess {
740 memory_id,
741 action: "created".to_string(),
742 body_length: new_memory.body.len(),
743 })
744}
745
746pub fn run(args: IngestArgs) -> Result<(), AppError> {
747 if args.mode == IngestMode::ClaudeCode {
748 return super::ingest_claude::run_claude_ingest(&args);
749 }
750 if args.mode == IngestMode::Codex {
751 return super::ingest_codex::run_codex_ingest(&args);
752 }
753
754 let started = std::time::Instant::now();
755
756 if !args.dir.exists() {
757 return Err(AppError::Validation(format!(
758 "directory not found: {}",
759 args.dir.display()
760 )));
761 }
762 if !args.dir.is_dir() {
763 return Err(AppError::Validation(format!(
764 "path is not a directory: {}",
765 args.dir.display()
766 )));
767 }
768
769 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
770 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
771 files.sort();
772
773 if files.len() > args.max_files {
774 return Err(AppError::Validation(format!(
775 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
776 files.len(),
777 args.max_files
778 )));
779 }
780
781 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
782 let memory_type_str = args.r#type.as_str().to_string();
783
784 let paths = AppPaths::resolve(args.db.as_deref())?;
785 let mut conn_or_err = match init_storage(&paths) {
786 Ok(c) => Ok(c),
787 Err(e) => Err(format!("{e}")),
788 };
789
790 let mut succeeded: usize = 0;
791 let mut failed: usize = 0;
792 let mut skipped: usize = 0;
793 let total = files.len();
794
795 let mut taken_names: BTreeSet<String> = BTreeSet::new();
798
799 enum SlotMeta {
805 Skip {
806 file_str: String,
807 derived_base: String,
808 name_truncated: bool,
809 original_name: Option<String>,
810 original_filename: Option<String>,
811 reason: String,
812 },
813 Process {
814 file_str: String,
815 derived_name: String,
816 name_truncated: bool,
817 original_name: Option<String>,
818 original_filename: Option<String>,
819 },
820 }
821
822 struct ProcessItem {
823 idx: usize,
824 path: PathBuf,
825 file_str: String,
826 derived_name: String,
827 }
828
829 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
830 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
831 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
832
833 let max_name_length = args.max_name_length;
834 for path in &files {
835 let file_str = path.to_string_lossy().into_owned();
836 let (derived_base, name_truncated, original_name) =
837 derive_kebab_name(path, max_name_length);
838 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
839
840 if name_truncated {
841 if let Some(ref orig) = original_name {
842 truncations.push((orig.clone(), derived_base.clone()));
843 }
844 }
845
846 if derived_base.is_empty() {
847 let orig_filename = if !original_basename.is_empty() {
849 Some(original_basename.to_string())
850 } else {
851 None
852 };
853 slots_meta.push(SlotMeta::Skip {
854 file_str,
855 derived_base: String::new(),
856 name_truncated: false,
857 original_name: None,
858 original_filename: orig_filename,
859 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
860 });
861 continue;
862 }
863
864 match unique_name(&derived_base, &taken_names) {
865 Ok(derived_name) => {
866 taken_names.insert(derived_name.clone());
867 let idx = slots_meta.len();
868 let orig_filename = if original_basename != derived_name {
870 Some(original_basename.to_string())
871 } else {
872 None
873 };
874 process_items.push(ProcessItem {
875 idx,
876 path: path.clone(),
877 file_str: file_str.clone(),
878 derived_name: derived_name.clone(),
879 });
880 slots_meta.push(SlotMeta::Process {
881 file_str,
882 derived_name,
883 name_truncated,
884 original_name,
885 original_filename: orig_filename,
886 });
887 }
888 Err(e) => {
889 let orig_filename = if original_basename != derived_base {
890 Some(original_basename.to_string())
891 } else {
892 None
893 };
894 slots_meta.push(SlotMeta::Skip {
895 file_str,
896 derived_base,
897 name_truncated,
898 original_name,
899 original_filename: orig_filename,
900 reason: e.to_string(),
901 });
902 }
903 }
904 }
905
906 if !truncations.is_empty() {
907 tracing::info!(
908 target: "ingest",
909 count = truncations.len(),
910 max_name_length = max_name_length,
911 max_len = DERIVED_NAME_MAX_LEN,
912 "derived names truncated; pass -vv (debug) for per-file detail"
913 );
914 }
915
916 if args.dry_run {
918 for meta in &slots_meta {
919 match meta {
920 SlotMeta::Skip {
921 file_str,
922 derived_base,
923 name_truncated,
924 original_name,
925 original_filename,
926 reason,
927 } => {
928 output::emit_json_compact(&IngestFileEvent {
929 file: file_str,
930 name: derived_base,
931 status: "skip",
932 truncated: *name_truncated,
933 original_name: original_name.clone(),
934 original_filename: original_filename.as_deref(),
935 error: Some(reason.clone()),
936 memory_id: None,
937 action: None,
938 body_length: 0,
939 })?;
940 }
941 SlotMeta::Process {
942 file_str,
943 derived_name,
944 name_truncated,
945 original_name,
946 original_filename,
947 } => {
948 output::emit_json_compact(&IngestFileEvent {
949 file: file_str,
950 name: derived_name,
951 status: "preview",
952 truncated: *name_truncated,
953 original_name: original_name.clone(),
954 original_filename: original_filename.as_deref(),
955 error: None,
956 memory_id: None,
957 action: None,
958 body_length: 0,
959 })?;
960 }
961 }
962 }
963 output::emit_json_compact(&IngestSummary {
964 summary: true,
965 dir: args.dir.to_string_lossy().into_owned(),
966 pattern: args.pattern.clone(),
967 recursive: args.recursive,
968 files_total: total,
969 files_succeeded: 0,
970 files_failed: 0,
971 files_skipped: 0,
972 elapsed_ms: started.elapsed().as_millis() as u64,
973 })?;
974 return Ok(());
975 }
976
977 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
980
981 let pool = rayon::ThreadPoolBuilder::new()
982 .num_threads(parallelism)
983 .build()
984 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
985
986 if args.enable_ner && args.skip_extraction {
987 tracing::warn!(
988 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
989 );
990 }
991 if args.skip_extraction && !args.enable_ner {
992 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
993 }
994 let enable_ner = args.enable_ner;
995 let max_rss_mb = args.max_rss_mb;
996 let gliner_variant: crate::extraction::GlinerVariant =
997 args.gliner_variant.parse().unwrap_or_else(|e| {
998 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
999 crate::extraction::GlinerVariant::Fp32
1000 });
1001
1002 let total_to_process = process_items.len();
1003 tracing::info!(
1004 target = "ingest",
1005 phase = "pipeline_start",
1006 files = total_to_process,
1007 ingest_parallelism = parallelism,
1008 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1009 );
1010
1011 let channel_bound = (parallelism * 2).max(1);
1015 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1016
1017 let paths_owned = paths.clone();
1022 let producer_handle = std::thread::spawn(move || {
1023 pool.install(|| {
1024 process_items.into_par_iter().for_each(|item| {
1025 let t0 = std::time::Instant::now();
1026 let result = stage_file(
1027 item.idx,
1028 &item.path,
1029 &item.derived_name,
1030 &paths_owned,
1031 enable_ner,
1032 gliner_variant,
1033 max_rss_mb,
1034 );
1035 let elapsed_ms = t0.elapsed().as_millis() as u64;
1036
1037 let (n_entities, n_relationships) = match &result {
1040 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1041 Err(_) => (0, 0),
1042 };
1043 let progress = StageProgressEvent {
1044 schema_version: 1,
1045 event: "file_extracted",
1046 path: &item.file_str,
1047 ms: elapsed_ms,
1048 entities: n_entities,
1049 relationships: n_relationships,
1050 };
1051 if let Ok(line) = serde_json::to_string(&progress) {
1052 eprintln!("{line}");
1053 }
1054
1055 let _ = tx.send((item.idx, result));
1059 });
1060 drop(tx);
1062 });
1063 });
1064
1065 let fail_fast = args.fail_fast;
1077
1078 for meta in &slots_meta {
1080 if let SlotMeta::Skip {
1081 file_str,
1082 derived_base,
1083 name_truncated,
1084 original_name,
1085 original_filename,
1086 reason,
1087 } = meta
1088 {
1089 output::emit_json_compact(&IngestFileEvent {
1090 file: file_str,
1091 name: derived_base,
1092 status: "skipped",
1093 truncated: *name_truncated,
1094 original_name: original_name.clone(),
1095 original_filename: original_filename.as_deref(),
1096 error: Some(reason.clone()),
1097 memory_id: None,
1098 action: None,
1099 body_length: 0,
1100 })?;
1101 skipped += 1;
1102 }
1103 }
1104
1105 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1108 .iter()
1109 .enumerate()
1110 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1111 .collect();
1112
1113 tracing::info!(
1114 target = "ingest",
1115 phase = "persist_start",
1116 files = total_to_process,
1117 "phase B starting: persisting files incrementally as Phase A completes each one",
1118 );
1119
1120 for (idx, stage_result) in rx {
1124 let meta = meta_index.get(&idx).ok_or_else(|| {
1125 AppError::Internal(anyhow::anyhow!(
1126 "channel idx {idx} has no corresponding Process slot"
1127 ))
1128 })?;
1129 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1130 {
1131 SlotMeta::Process {
1132 file_str,
1133 derived_name,
1134 name_truncated,
1135 original_name,
1136 original_filename,
1137 } => (
1138 file_str,
1139 derived_name,
1140 name_truncated,
1141 original_name,
1142 original_filename,
1143 ),
1144 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1145 };
1146
1147 let conn = match conn_or_err.as_mut() {
1149 Ok(c) => c,
1150 Err(err_msg) => {
1151 let err_clone = err_msg.clone();
1152 output::emit_json_compact(&IngestFileEvent {
1153 file: file_str,
1154 name: derived_name,
1155 status: "failed",
1156 truncated: *name_truncated,
1157 original_name: original_name.clone(),
1158 original_filename: original_filename.as_deref(),
1159 error: Some(err_clone.clone()),
1160 memory_id: None,
1161 action: None,
1162 body_length: 0,
1163 })?;
1164 failed += 1;
1165 if fail_fast {
1166 output::emit_json_compact(&IngestSummary {
1167 summary: true,
1168 dir: args.dir.display().to_string(),
1169 pattern: args.pattern.clone(),
1170 recursive: args.recursive,
1171 files_total: total,
1172 files_succeeded: succeeded,
1173 files_failed: failed,
1174 files_skipped: skipped,
1175 elapsed_ms: started.elapsed().as_millis() as u64,
1176 })?;
1177 return Err(AppError::Validation(format!(
1178 "ingest aborted on first failure: {err_clone}"
1179 )));
1180 }
1181 continue;
1182 }
1183 };
1184
1185 let outcome =
1186 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1187
1188 match outcome {
1189 Ok(FileSuccess {
1190 memory_id,
1191 action,
1192 body_length,
1193 }) => {
1194 output::emit_json_compact(&IngestFileEvent {
1195 file: file_str,
1196 name: derived_name,
1197 status: "indexed",
1198 truncated: *name_truncated,
1199 original_name: original_name.clone(),
1200 original_filename: original_filename.as_deref(),
1201 error: None,
1202 memory_id: Some(memory_id),
1203 action: Some(action),
1204 body_length,
1205 })?;
1206 succeeded += 1;
1207 }
1208 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1209 output::emit_json_compact(&IngestFileEvent {
1210 file: file_str,
1211 name: derived_name,
1212 status: "skipped",
1213 truncated: *name_truncated,
1214 original_name: original_name.clone(),
1215 original_filename: original_filename.as_deref(),
1216 error: Some(format!("{e}")),
1217 memory_id: None,
1218 action: Some("duplicate".to_string()),
1219 body_length: 0,
1220 })?;
1221 skipped += 1;
1222 }
1223 Err(e) => {
1224 let err_msg = format!("{e}");
1225 output::emit_json_compact(&IngestFileEvent {
1226 file: file_str,
1227 name: derived_name,
1228 status: "failed",
1229 truncated: *name_truncated,
1230 original_name: original_name.clone(),
1231 original_filename: original_filename.as_deref(),
1232 error: Some(err_msg.clone()),
1233 memory_id: None,
1234 action: None,
1235 body_length: 0,
1236 })?;
1237 failed += 1;
1238 if fail_fast {
1239 output::emit_json_compact(&IngestSummary {
1240 summary: true,
1241 dir: args.dir.display().to_string(),
1242 pattern: args.pattern.clone(),
1243 recursive: args.recursive,
1244 files_total: total,
1245 files_succeeded: succeeded,
1246 files_failed: failed,
1247 files_skipped: skipped,
1248 elapsed_ms: started.elapsed().as_millis() as u64,
1249 })?;
1250 return Err(AppError::Validation(format!(
1251 "ingest aborted on first failure: {err_msg}"
1252 )));
1253 }
1254 }
1255 }
1256 }
1257
1258 producer_handle
1260 .join()
1261 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1262
1263 if let Ok(ref conn) = conn_or_err {
1264 if succeeded > 0 {
1265 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1266 }
1267 }
1268
1269 output::emit_json_compact(&IngestSummary {
1270 summary: true,
1271 dir: args.dir.display().to_string(),
1272 pattern: args.pattern.clone(),
1273 recursive: args.recursive,
1274 files_total: total,
1275 files_succeeded: succeeded,
1276 files_failed: failed,
1277 files_skipped: skipped,
1278 elapsed_ms: started.elapsed().as_millis() as u64,
1279 })?;
1280
1281 Ok(())
1282}
1283
1284fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1290 ensure_db_ready(paths)?;
1291 let conn = open_rw(&paths.db)?;
1292 Ok(conn)
1293}
1294
1295pub(crate) fn collect_files(
1296 dir: &Path,
1297 pattern: &str,
1298 recursive: bool,
1299 out: &mut Vec<PathBuf>,
1300) -> Result<(), AppError> {
1301 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1302 for entry in entries {
1303 let entry = entry.map_err(AppError::Io)?;
1304 let path = entry.path();
1305 let file_type = entry.file_type().map_err(AppError::Io)?;
1306 if file_type.is_file() {
1307 let name = entry.file_name();
1308 let name_str = name.to_string_lossy();
1309 if matches_pattern(&name_str, pattern) {
1310 out.push(path);
1311 }
1312 } else if file_type.is_dir() && recursive {
1313 collect_files(&path, pattern, recursive, out)?;
1314 }
1315 }
1316 Ok(())
1317}
1318
1319fn matches_pattern(name: &str, pattern: &str) -> bool {
1320 if let Some(suffix) = pattern.strip_prefix('*') {
1321 name.ends_with(suffix)
1322 } else if let Some(prefix) = pattern.strip_suffix('*') {
1323 name.starts_with(prefix)
1324 } else {
1325 name == pattern
1326 }
1327}
1328
1329pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1340 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1341 let lowered: String = stem
1342 .nfd()
1343 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1344 .map(|c| {
1345 if c == '_' || c.is_whitespace() {
1346 '-'
1347 } else {
1348 c
1349 }
1350 })
1351 .map(|c| c.to_ascii_lowercase())
1352 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1353 .collect();
1354 let collapsed = collapse_dashes(&lowered);
1355 let trimmed_raw = collapsed.trim_matches('-').to_string();
1356 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1358 format!("doc-{trimmed_raw}")
1359 } else {
1360 trimmed_raw
1361 };
1362 if trimmed.len() > max_len {
1363 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1364 tracing::debug!(
1365 target: "ingest",
1366 original = %trimmed,
1367 truncated_to = %truncated,
1368 max_len = max_len,
1369 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1370 );
1371 (truncated, true, Some(trimmed))
1372 } else {
1373 (trimmed, false, None)
1374 }
1375}
1376
1377fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1390 if !taken.contains(base) {
1391 return Ok(base.to_string());
1392 }
1393 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1394 let candidate = format!("{base}-{suffix}");
1395 if !taken.contains(&candidate) {
1396 tracing::warn!(
1397 target: "ingest",
1398 base = %base,
1399 resolved = %candidate,
1400 suffix,
1401 "memory name collision resolved with numeric suffix"
1402 );
1403 return Ok(candidate);
1404 }
1405 }
1406 Err(AppError::Validation(format!(
1407 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1408 )))
1409}
1410
1411fn collapse_dashes(s: &str) -> String {
1412 let mut out = String::with_capacity(s.len());
1413 let mut prev_dash = false;
1414 for c in s.chars() {
1415 if c == '-' {
1416 if !prev_dash {
1417 out.push('-');
1418 }
1419 prev_dash = true;
1420 } else {
1421 out.push(c);
1422 prev_dash = false;
1423 }
1424 }
1425 out
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430 use super::*;
1431 use std::path::PathBuf;
1432
1433 #[test]
1434 fn matches_pattern_suffix() {
1435 assert!(matches_pattern("foo.md", "*.md"));
1436 assert!(!matches_pattern("foo.txt", "*.md"));
1437 assert!(matches_pattern("foo.md", "*"));
1438 }
1439
1440 #[test]
1441 fn matches_pattern_prefix() {
1442 assert!(matches_pattern("README.md", "README*"));
1443 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1444 }
1445
1446 #[test]
1447 fn matches_pattern_exact() {
1448 assert!(matches_pattern("README.md", "README.md"));
1449 assert!(!matches_pattern("readme.md", "README.md"));
1450 }
1451
1452 #[test]
1453 fn derive_kebab_underscore_to_dash() {
1454 let p = PathBuf::from("/tmp/claude_code_headless.md");
1455 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1456 assert_eq!(name, "claude-code-headless");
1457 assert!(!truncated);
1458 assert!(original.is_none());
1459 }
1460
1461 #[test]
1462 fn derive_kebab_uppercase_lowered() {
1463 let p = PathBuf::from("/tmp/README.md");
1464 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1465 assert_eq!(name, "readme");
1466 assert!(!truncated);
1467 assert!(original.is_none());
1468 }
1469
1470 #[test]
1471 fn derive_kebab_strips_non_kebab_chars() {
1472 let p = PathBuf::from("/tmp/some@weird#name!.md");
1473 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1474 assert_eq!(name, "someweirdname");
1475 assert!(!truncated);
1476 assert!(original.is_none());
1477 }
1478
1479 #[test]
1482 fn derive_kebab_folds_accented_letters_to_ascii() {
1483 let p = PathBuf::from("/tmp/açaí.md");
1484 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1485 assert_eq!(name, "acai", "got '{name}'");
1486 }
1487
1488 #[test]
1489 fn derive_kebab_handles_naive_with_diaeresis() {
1490 let p = PathBuf::from("/tmp/naïve-test.md");
1491 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1492 assert_eq!(name, "naive-test", "got '{name}'");
1493 }
1494
1495 #[test]
1496 fn derive_kebab_drops_emoji_keeps_word() {
1497 let p = PathBuf::from("/tmp/🚀-rocket.md");
1498 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1499 assert_eq!(name, "rocket", "got '{name}'");
1500 }
1501
1502 #[test]
1503 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1504 let p = PathBuf::from("/tmp/açaí🦜.md");
1505 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1506 assert_eq!(name, "acai", "got '{name}'");
1507 }
1508
1509 #[test]
1510 fn derive_kebab_pure_emoji_yields_empty() {
1511 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1512 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1513 assert!(name.is_empty(), "got '{name}'");
1514 }
1515
1516 #[test]
1517 fn derive_kebab_collapses_consecutive_dashes() {
1518 let p = PathBuf::from("/tmp/a__b___c.md");
1519 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1520 assert_eq!(name, "a-b-c");
1521 assert!(!truncated);
1522 assert!(original.is_none());
1523 }
1524
1525 #[test]
1526 fn derive_kebab_truncates_to_60_chars() {
1527 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1528 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1529 assert!(name.len() <= 60, "got len {}", name.len());
1530 assert!(truncated);
1531 assert!(original.is_some());
1532 assert!(original.unwrap().len() > 60);
1533 }
1534
1535 #[test]
1536 fn collect_files_finds_md_files() {
1537 let tmp = tempfile::tempdir().expect("tempdir");
1538 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1539 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1540 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1541 let mut out = Vec::new();
1542 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1543 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1544 }
1545
1546 #[test]
1547 fn collect_files_recursive_descends_subdirs() {
1548 let tmp = tempfile::tempdir().expect("tempdir");
1549 let sub = tmp.path().join("sub");
1550 std::fs::create_dir(&sub).unwrap();
1551 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1552 std::fs::write(sub.join("b.md"), "y").unwrap();
1553 let mut out = Vec::new();
1554 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1555 assert_eq!(out.len(), 2);
1556 }
1557
1558 #[test]
1559 fn collect_files_non_recursive_skips_subdirs() {
1560 let tmp = tempfile::tempdir().expect("tempdir");
1561 let sub = tmp.path().join("sub");
1562 std::fs::create_dir(&sub).unwrap();
1563 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1564 std::fs::write(sub.join("b.md"), "y").unwrap();
1565 let mut out = Vec::new();
1566 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1567 assert_eq!(out.len(), 1);
1568 }
1569
1570 #[test]
1573 fn derive_kebab_long_basename_truncated_within_cap() {
1574 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1575 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1576 assert!(
1577 name.len() <= DERIVED_NAME_MAX_LEN,
1578 "truncated name must respect cap; got {} chars",
1579 name.len()
1580 );
1581 assert!(!name.is_empty());
1582 assert!(truncated);
1583 assert!(original.is_some());
1584 }
1585
1586 #[test]
1587 fn unique_name_returns_base_when_free() {
1588 let taken: BTreeSet<String> = BTreeSet::new();
1589 let resolved = unique_name("note", &taken).expect("must resolve");
1590 assert_eq!(resolved, "note");
1591 }
1592
1593 #[test]
1594 fn unique_name_appends_first_free_suffix_on_collision() {
1595 let mut taken: BTreeSet<String> = BTreeSet::new();
1596 taken.insert("note".to_string());
1597 taken.insert("note-1".to_string());
1598 let resolved = unique_name("note", &taken).expect("must resolve");
1599 assert_eq!(resolved, "note-2");
1600 }
1601
1602 #[test]
1603 fn unique_name_errors_after_collision_cap() {
1604 let mut taken: BTreeSet<String> = BTreeSet::new();
1605 taken.insert("note".to_string());
1606 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1607 taken.insert(format!("note-{i}"));
1608 }
1609 let err = unique_name("note", &taken).expect_err("must surface error");
1610 assert!(matches!(err, AppError::Validation(_)));
1611 }
1612
1613 #[test]
1616 fn validate_relation_format_accepts_valid_relations() {
1617 use crate::parsers::{is_canonical_relation, validate_relation_format};
1618 assert!(validate_relation_format("applies_to").is_ok());
1619 assert!(validate_relation_format("depends_on").is_ok());
1620 assert!(validate_relation_format("implements").is_ok());
1621 assert!(validate_relation_format("").is_err());
1622 assert!(is_canonical_relation("applies_to"));
1623 assert!(!is_canonical_relation("implements"));
1624 }
1625
1626 use serial_test::serial;
1629
1630 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1632 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1633 let prev = std::env::var(key).ok();
1634 match value {
1635 Some(v) => std::env::set_var(key, v),
1636 None => std::env::remove_var(key),
1637 }
1638 f();
1639 match prev {
1640 Some(p) => std::env::set_var(key, p),
1641 None => std::env::remove_var(key),
1642 }
1643 }
1644
1645 #[test]
1646 #[serial]
1647 fn env_low_memory_enabled_unset_returns_false() {
1648 with_env_var(None, || assert!(!env_low_memory_enabled()));
1649 }
1650
1651 #[test]
1652 #[serial]
1653 fn env_low_memory_enabled_empty_returns_false() {
1654 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1655 }
1656
1657 #[test]
1658 #[serial]
1659 fn env_low_memory_enabled_truthy_values_return_true() {
1660 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1661 with_env_var(Some(v), || {
1662 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1663 });
1664 }
1665 }
1666
1667 #[test]
1668 #[serial]
1669 fn env_low_memory_enabled_falsy_values_return_false() {
1670 for v in ["0", "false", "FALSE", "no", "off"] {
1671 with_env_var(Some(v), || {
1672 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1673 });
1674 }
1675 }
1676
1677 #[test]
1678 #[serial]
1679 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1680 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1681 }
1682
1683 #[test]
1684 #[serial]
1685 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1686 with_env_var(None, || {
1687 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1688 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1689 assert_eq!(resolve_parallelism(true, None), 1);
1690 });
1691 }
1692
1693 #[test]
1694 #[serial]
1695 fn resolve_parallelism_env_forces_one_when_flag_off() {
1696 with_env_var(Some("1"), || {
1697 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1698 assert_eq!(resolve_parallelism(false, None), 1);
1699 });
1700 }
1701
1702 #[test]
1703 #[serial]
1704 fn resolve_parallelism_falsy_env_does_not_override() {
1705 with_env_var(Some("0"), || {
1706 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1707 });
1708 }
1709
1710 #[test]
1711 #[serial]
1712 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1713 with_env_var(None, || {
1714 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1715 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1716 });
1717 }
1718
1719 #[test]
1720 #[serial]
1721 fn resolve_parallelism_default_when_unset() {
1722 with_env_var(None, || {
1723 let p = resolve_parallelism(false, None);
1724 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1725 });
1726 }
1727
1728 #[test]
1729 fn ingest_args_parses_low_memory_flag_via_clap() {
1730 use clap::Parser;
1731 let cli = crate::cli::Cli::try_parse_from([
1734 "sqlite-graphrag",
1735 "ingest",
1736 "/tmp/dummy",
1737 "--type",
1738 "document",
1739 "--low-memory",
1740 ])
1741 .expect("parse must succeed");
1742 match cli.command {
1743 crate::cli::Commands::Ingest(args) => {
1744 assert!(args.low_memory, "--low-memory must set field to true");
1745 }
1746 _ => panic!("expected Ingest subcommand"),
1747 }
1748 }
1749
1750 #[test]
1751 fn ingest_args_low_memory_defaults_false() {
1752 use clap::Parser;
1753 let cli = crate::cli::Cli::try_parse_from([
1754 "sqlite-graphrag",
1755 "ingest",
1756 "/tmp/dummy",
1757 "--type",
1758 "document",
1759 ])
1760 .expect("parse must succeed");
1761 match cli.command {
1762 crate::cli::Commands::Ingest(args) => {
1763 assert!(!args.low_memory, "default must be false");
1764 }
1765 _ => panic!("expected Ingest subcommand"),
1766 }
1767 }
1768}