1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76NOTES:\n \
77 Each file becomes a separate memory. Names derive from file basenames\n \
78 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
79 followed by a final summary line with counts. Per-file errors are reported\n \
80 inline and processing continues unless --fail-fast is set.")]
81pub struct IngestArgs {
82 #[arg(
84 value_name = "DIR",
85 help = "Directory to ingest recursively (each matching file becomes a memory)"
86 )]
87 pub dir: PathBuf,
88
89 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
91 pub r#type: MemoryType,
92
93 #[arg(long, default_value = "*.md")]
96 pub pattern: String,
97
98 #[arg(long, default_value_t = false)]
100 pub recursive: bool,
101
102 #[arg(
103 long,
104 env = "SQLITE_GRAPHRAG_ENABLE_NER",
105 value_parser = crate::parsers::parse_bool_flexible,
106 action = clap::ArgAction::Set,
107 num_args = 0..=1,
108 default_missing_value = "true",
109 default_value = "false",
110 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
111 )]
112 pub enable_ner: bool,
113 #[arg(
114 long,
115 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
116 default_value = "fp32",
117 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
118 )]
119 pub gliner_variant: String,
120
121 #[arg(long, default_value_t = false, hide = true)]
123 pub skip_extraction: bool,
124
125 #[arg(long, default_value_t = false)]
127 pub fail_fast: bool,
128
129 #[arg(long, default_value_t = false)]
131 pub dry_run: bool,
132
133 #[arg(long, default_value_t = 10_000)]
135 pub max_files: usize,
136
137 #[arg(long)]
139 pub namespace: Option<String>,
140
141 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
143 pub db: Option<String>,
144
145 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
146 pub format: JsonOutputFormat,
147
148 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
149 pub json: bool,
150
151 #[arg(
153 long,
154 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
155 )]
156 pub ingest_parallelism: Option<usize>,
157
158 #[arg(
166 long,
167 default_value_t = false,
168 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
169 Recommended for environments with <4 GB available RAM or container/cgroup \
170 constraints. Trade-off: 3-4x longer wall time. Also honored via \
171 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
172 )]
173 pub low_memory: bool,
174
175 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
177 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
178 pub max_rss_mb: u64,
179
180 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
185 help = "Maximum length for derived memory names (default: 60)")]
186 pub max_name_length: usize,
187
188 #[arg(long, value_enum, default_value_t = IngestMode::None)]
190 pub mode: IngestMode,
191
192 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
194 pub claude_binary: Option<std::path::PathBuf>,
195
196 #[arg(long)]
198 pub claude_model: Option<String>,
199
200 #[arg(long, default_value_t = false)]
202 pub resume: bool,
203
204 #[arg(long, default_value_t = false)]
206 pub retry_failed: bool,
207
208 #[arg(long, default_value_t = false)]
210 pub keep_queue: bool,
211
212 #[arg(long, default_value = ".ingest-queue.sqlite")]
214 pub queue_db: String,
215
216 #[arg(long, default_value_t = 60)]
218 pub rate_limit_wait: u64,
219
220 #[arg(long)]
222 pub max_cost_usd: Option<f64>,
223
224 #[arg(
226 long,
227 default_value_t = 300,
228 help = "Timeout in seconds for each claude -p invocation (default: 300)"
229 )]
230 pub claude_timeout: u64,
231
232 #[arg(
234 long,
235 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
236 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
237 )]
238 pub codex_binary: Option<PathBuf>,
239
240 #[arg(
242 long,
243 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
244 )]
245 pub codex_model: Option<String>,
246
247 #[arg(
249 long,
250 default_value_t = 300,
251 help = "Timeout in seconds for each codex exec invocation (default: 300)"
252 )]
253 pub codex_timeout: u64,
254}
255
256#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
258pub enum IngestMode {
259 None,
261 Gliner,
263 ClaudeCode,
265 Codex,
267}
268
269fn env_low_memory_enabled() -> bool {
274 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
275 Ok(v) if v.is_empty() => false,
276 Ok(v) => match v.to_lowercase().as_str() {
277 "1" | "true" | "yes" | "on" => true,
278 "0" | "false" | "no" | "off" => false,
279 other => {
280 tracing::warn!(
281 target: "ingest",
282 value = %other,
283 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
284 );
285 false
286 }
287 },
288 Err(_) => false,
289 }
290}
291
292fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
304 let env_flag = env_low_memory_enabled();
305 let low_memory = low_memory_flag || env_flag;
306
307 if low_memory {
308 if let Some(n) = ingest_parallelism {
309 if n > 1 {
310 tracing::warn!(
311 target: "ingest",
312 requested = n,
313 "--ingest-parallelism overridden by --low-memory; using 1"
314 );
315 }
316 }
317 if low_memory_flag {
318 tracing::info!(
319 target: "ingest",
320 source = "flag",
321 "low-memory mode enabled: forcing --ingest-parallelism 1"
322 );
323 } else {
324 tracing::info!(
325 target: "ingest",
326 source = "env",
327 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
328 );
329 }
330 return 1;
331 }
332
333 ingest_parallelism
334 .unwrap_or_else(|| {
335 std::thread::available_parallelism()
336 .map(|v| v.get() / 2)
337 .unwrap_or(1)
338 .clamp(1, 4)
339 })
340 .max(1)
341}
342
343#[derive(Serialize)]
344struct IngestFileEvent<'a> {
345 file: &'a str,
346 name: &'a str,
347 status: &'a str,
348 truncated: bool,
350 #[serde(skip_serializing_if = "Option::is_none")]
352 original_name: Option<String>,
353 #[serde(skip_serializing_if = "Option::is_none")]
355 original_filename: Option<&'a str>,
356 #[serde(skip_serializing_if = "Option::is_none")]
357 error: Option<String>,
358 #[serde(skip_serializing_if = "Option::is_none")]
359 memory_id: Option<i64>,
360 #[serde(skip_serializing_if = "Option::is_none")]
361 action: Option<String>,
362 body_length: usize,
364}
365
366#[derive(Serialize)]
367struct IngestSummary {
368 summary: bool,
369 dir: String,
370 pattern: String,
371 recursive: bool,
372 files_total: usize,
373 files_succeeded: usize,
374 files_failed: usize,
375 files_skipped: usize,
376 elapsed_ms: u64,
377}
378
379struct FileSuccess {
381 memory_id: i64,
382 action: String,
383 body_length: usize,
384}
385
386#[derive(Serialize)]
389struct StageProgressEvent<'a> {
390 schema_version: u8,
391 event: &'a str,
392 path: &'a str,
393 ms: u64,
394 entities: usize,
395 relationships: usize,
396}
397
398struct StagedFile {
401 body: String,
402 body_hash: String,
403 snippet: String,
404 name: String,
405 description: String,
406 embedding: Vec<f32>,
407 chunk_embeddings: Option<Vec<Vec<f32>>>,
408 chunks_info: Vec<crate::chunking::Chunk>,
409 entities: Vec<NewEntity>,
410 relationships: Vec<NewRelationship>,
411 entity_embeddings: Vec<Vec<f32>>,
412 urls: Vec<crate::extraction::ExtractedUrl>,
413}
414
415fn stage_file(
418 _idx: usize,
419 path: &Path,
420 name: &str,
421 paths: &AppPaths,
422 enable_ner: bool,
423 gliner_variant: crate::extraction::GlinerVariant,
424 max_rss_mb: u64,
425) -> Result<StagedFile, AppError> {
426 use crate::constants::*;
427
428 if name.len() > MAX_MEMORY_NAME_LEN {
429 return Err(AppError::LimitExceeded(
430 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
431 ));
432 }
433 if name.starts_with("__") {
434 return Err(AppError::Validation(
435 crate::i18n::validation::reserved_name(),
436 ));
437 }
438 {
439 let slug_re = regex::Regex::new(NAME_SLUG_REGEX)
440 .map_err(|e| AppError::Internal(anyhow::anyhow!("regex: {e}")))?;
441 if !slug_re.is_match(name) {
442 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
443 name,
444 )));
445 }
446 }
447
448 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
449 if raw_body.len() > MAX_MEMORY_BODY_LEN {
450 return Err(AppError::LimitExceeded(
451 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
452 ));
453 }
454 if raw_body.trim().is_empty() {
455 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
456 }
457
458 let description = format!("ingested from {}", path.display());
459 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
460 return Err(AppError::Validation(
461 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
462 ));
463 }
464
465 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
466 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
467 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
468 if enable_ner {
469 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
470 Ok(extracted) => {
471 extracted_urls = extracted.urls;
472 extracted_entities = extracted.entities;
473 extracted_relationships = extracted.relationships;
474
475 if extracted_entities.len() > max_entities_per_memory() {
476 extracted_entities.truncate(max_entities_per_memory());
477 }
478 if extracted_relationships.len() > MAX_RELATIONSHIPS_PER_MEMORY {
479 extracted_relationships.truncate(MAX_RELATIONSHIPS_PER_MEMORY);
480 }
481 }
482 Err(e) => {
483 tracing::warn!(
484 file = %path.display(),
485 "auto-extraction failed (graceful degradation): {e:#}"
486 );
487 }
488 }
489 }
490
491 for rel in &mut extracted_relationships {
492 rel.relation = crate::parsers::normalize_relation(&rel.relation);
493 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
494 return Err(AppError::Validation(format!(
495 "{e} for relationship '{}' -> '{}'",
496 rel.source, rel.target
497 )));
498 }
499 crate::parsers::warn_if_non_canonical(&rel.relation);
500 if !(0.0..=1.0).contains(&rel.strength) {
501 return Err(AppError::Validation(format!(
502 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
503 rel.strength, rel.source, rel.target
504 )));
505 }
506 }
507
508 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
509 let snippet: String = raw_body.chars().take(200).collect();
510
511 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
512 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
513 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
514 return Err(AppError::LimitExceeded(format!(
515 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
516 chunks_info.len(),
517 REMEMBER_MAX_SAFE_MULTI_CHUNKS
518 )));
519 }
520
521 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
522 let embedding = if chunks_info.len() == 1 {
523 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
524 } else {
525 let chunk_texts: Vec<&str> = chunks_info
526 .iter()
527 .map(|c| chunking::chunk_text(&raw_body, c))
528 .collect();
529 let mut chunk_embeddings = Vec::with_capacity(chunk_texts.len());
530 for chunk_text in &chunk_texts {
531 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
532 if rss > max_rss_mb {
533 tracing::error!(
534 rss_mb = rss,
535 max_rss_mb = max_rss_mb,
536 file = %path.display(),
537 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
538 );
539 return Err(AppError::LowMemory {
540 available_mb: crate::memory_guard::available_memory_mb(),
541 required_mb: max_rss_mb,
542 });
543 }
544 }
545 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
546 &paths.models,
547 chunk_text,
548 )?);
549 }
550 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
551 chunk_embeddings_opt = Some(chunk_embeddings);
552 aggregated
553 };
554
555 let entity_embeddings = extracted_entities
556 .iter()
557 .map(|entity| {
558 let entity_text = match &entity.description {
559 Some(desc) => format!("{} {}", entity.name, desc),
560 None => entity.name.clone(),
561 };
562 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
563 })
564 .collect::<Result<Vec<_>, _>>()?;
565
566 Ok(StagedFile {
567 body: raw_body,
568 body_hash,
569 snippet,
570 name: name.to_string(),
571 description,
572 embedding,
573 chunk_embeddings: chunk_embeddings_opt,
574 chunks_info,
575 entities: extracted_entities,
576 relationships: extracted_relationships,
577 entity_embeddings,
578 urls: extracted_urls,
579 })
580}
581
582fn persist_staged(
584 conn: &mut Connection,
585 namespace: &str,
586 memory_type: &str,
587 staged: StagedFile,
588) -> Result<FileSuccess, AppError> {
589 {
590 let active_count: u32 = conn.query_row(
591 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
592 [],
593 |r| r.get::<_, i64>(0).map(|v| v as u32),
594 )?;
595 let ns_exists: bool = conn.query_row(
596 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
597 rusqlite::params![namespace],
598 |r| r.get::<_, i64>(0).map(|v| v > 0),
599 )?;
600 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
601 return Err(AppError::NamespaceError(format!(
602 "active namespace limit of {} exceeded while creating '{namespace}'",
603 crate::constants::MAX_NAMESPACES_ACTIVE
604 )));
605 }
606 }
607
608 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
609 if existing_memory.is_some() {
610 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
611 &staged.name,
612 namespace,
613 )));
614 }
615 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
616
617 let new_memory = NewMemory {
618 namespace: namespace.to_string(),
619 name: staged.name.clone(),
620 memory_type: memory_type.to_string(),
621 description: staged.description.clone(),
622 body: staged.body,
623 body_hash: staged.body_hash,
624 session_id: None,
625 source: "agent".to_string(),
626 metadata: serde_json::json!({}),
627 };
628
629 if let Some(hash_id) = duplicate_hash_id {
630 tracing::debug!(
631 target: "ingest",
632 duplicate_memory_id = hash_id,
633 "identical body already exists; persisting a new memory anyway"
634 );
635 }
636
637 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
638
639 let memory_id = memories::insert(&tx, &new_memory)?;
640 versions::insert_version(
641 &tx,
642 memory_id,
643 1,
644 &staged.name,
645 memory_type,
646 &staged.description,
647 &new_memory.body,
648 &serde_json::to_string(&new_memory.metadata)?,
649 None,
650 "create",
651 )?;
652 memories::upsert_vec(
653 &tx,
654 memory_id,
655 namespace,
656 memory_type,
657 &staged.embedding,
658 &staged.name,
659 &staged.snippet,
660 )?;
661
662 if staged.chunks_info.len() > 1 {
663 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
664 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
665 AppError::Internal(anyhow::anyhow!(
666 "missing chunk embeddings cache on multi-chunk ingest path"
667 ))
668 })?;
669 for (i, emb) in chunk_embeddings.iter().enumerate() {
670 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
671 }
672 }
673
674 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
675 for (idx, entity) in staged.entities.iter().enumerate() {
676 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
677 let entity_embedding = &staged.entity_embeddings[idx];
678 entities::upsert_entity_vec(
679 &tx,
680 entity_id,
681 namespace,
682 entity.entity_type,
683 entity_embedding,
684 &entity.name,
685 )?;
686 entities::link_memory_entity(&tx, memory_id, entity_id)?;
687 entities::increment_degree(&tx, entity_id)?;
688 }
689 let entity_types: std::collections::HashMap<&str, EntityType> = staged
690 .entities
691 .iter()
692 .map(|entity| (entity.name.as_str(), entity.entity_type))
693 .collect();
694 for rel in &staged.relationships {
695 let source_entity = NewEntity {
696 name: rel.source.clone(),
697 entity_type: entity_types
698 .get(rel.source.as_str())
699 .copied()
700 .unwrap_or(EntityType::Concept),
701 description: None,
702 };
703 let target_entity = NewEntity {
704 name: rel.target.clone(),
705 entity_type: entity_types
706 .get(rel.target.as_str())
707 .copied()
708 .unwrap_or(EntityType::Concept),
709 description: None,
710 };
711 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
712 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
713 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
714 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
715 }
716 }
717
718 tx.commit()?;
719
720 if !staged.urls.is_empty() {
721 let url_entries: Vec<storage_urls::MemoryUrl> = staged
722 .urls
723 .into_iter()
724 .map(|u| storage_urls::MemoryUrl {
725 url: u.url,
726 offset: Some(u.offset as i64),
727 })
728 .collect();
729 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
730 }
731
732 Ok(FileSuccess {
733 memory_id,
734 action: "created".to_string(),
735 body_length: new_memory.body.len(),
736 })
737}
738
739pub fn run(args: IngestArgs) -> Result<(), AppError> {
740 if args.mode == IngestMode::ClaudeCode {
741 return super::ingest_claude::run_claude_ingest(&args);
742 }
743 if args.mode == IngestMode::Codex {
744 return super::ingest_codex::run_codex_ingest(&args);
745 }
746
747 let started = std::time::Instant::now();
748
749 if !args.dir.exists() {
750 return Err(AppError::Validation(format!(
751 "directory not found: {}",
752 args.dir.display()
753 )));
754 }
755 if !args.dir.is_dir() {
756 return Err(AppError::Validation(format!(
757 "path is not a directory: {}",
758 args.dir.display()
759 )));
760 }
761
762 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
763 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
764 files.sort();
765
766 if files.len() > args.max_files {
767 return Err(AppError::Validation(format!(
768 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
769 files.len(),
770 args.max_files
771 )));
772 }
773
774 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
775 let memory_type_str = args.r#type.as_str().to_string();
776
777 let paths = AppPaths::resolve(args.db.as_deref())?;
778 let mut conn_or_err = match init_storage(&paths) {
779 Ok(c) => Ok(c),
780 Err(e) => Err(format!("{e}")),
781 };
782
783 let mut succeeded: usize = 0;
784 let mut failed: usize = 0;
785 let mut skipped: usize = 0;
786 let total = files.len();
787
788 let mut taken_names: BTreeSet<String> = BTreeSet::new();
791
792 enum SlotMeta {
798 Skip {
799 file_str: String,
800 derived_base: String,
801 name_truncated: bool,
802 original_name: Option<String>,
803 original_filename: Option<String>,
804 reason: String,
805 },
806 Process {
807 file_str: String,
808 derived_name: String,
809 name_truncated: bool,
810 original_name: Option<String>,
811 original_filename: Option<String>,
812 },
813 }
814
815 struct ProcessItem {
816 idx: usize,
817 path: PathBuf,
818 file_str: String,
819 derived_name: String,
820 }
821
822 let mut slots_meta: Vec<SlotMeta> = Vec::with_capacity(files.len());
823 let mut process_items: Vec<ProcessItem> = Vec::with_capacity(files.len());
824 let mut truncations: Vec<(String, String)> = Vec::with_capacity(files.len());
825
826 let max_name_length = args.max_name_length;
827 for path in &files {
828 let file_str = path.to_string_lossy().into_owned();
829 let (derived_base, name_truncated, original_name) =
830 derive_kebab_name(path, max_name_length);
831 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
832
833 if name_truncated {
834 if let Some(ref orig) = original_name {
835 truncations.push((orig.clone(), derived_base.clone()));
836 }
837 }
838
839 if derived_base.is_empty() {
840 let orig_filename = if !original_basename.is_empty() {
842 Some(original_basename.to_string())
843 } else {
844 None
845 };
846 slots_meta.push(SlotMeta::Skip {
847 file_str,
848 derived_base: String::new(),
849 name_truncated: false,
850 original_name: None,
851 original_filename: orig_filename,
852 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
853 });
854 continue;
855 }
856
857 match unique_name(&derived_base, &taken_names) {
858 Ok(derived_name) => {
859 taken_names.insert(derived_name.clone());
860 let idx = slots_meta.len();
861 let orig_filename = if original_basename != derived_name {
863 Some(original_basename.to_string())
864 } else {
865 None
866 };
867 process_items.push(ProcessItem {
868 idx,
869 path: path.clone(),
870 file_str: file_str.clone(),
871 derived_name: derived_name.clone(),
872 });
873 slots_meta.push(SlotMeta::Process {
874 file_str,
875 derived_name,
876 name_truncated,
877 original_name,
878 original_filename: orig_filename,
879 });
880 }
881 Err(e) => {
882 let orig_filename = if original_basename != derived_base {
883 Some(original_basename.to_string())
884 } else {
885 None
886 };
887 slots_meta.push(SlotMeta::Skip {
888 file_str,
889 derived_base,
890 name_truncated,
891 original_name,
892 original_filename: orig_filename,
893 reason: e.to_string(),
894 });
895 }
896 }
897 }
898
899 if !truncations.is_empty() {
900 tracing::info!(
901 target: "ingest",
902 count = truncations.len(),
903 max_name_length = max_name_length,
904 max_len = DERIVED_NAME_MAX_LEN,
905 "derived names truncated; pass -vv (debug) for per-file detail"
906 );
907 }
908
909 if args.dry_run {
911 for meta in &slots_meta {
912 match meta {
913 SlotMeta::Skip {
914 file_str,
915 derived_base,
916 name_truncated,
917 original_name,
918 original_filename,
919 reason,
920 } => {
921 output::emit_json_compact(&IngestFileEvent {
922 file: file_str,
923 name: derived_base,
924 status: "skip",
925 truncated: *name_truncated,
926 original_name: original_name.clone(),
927 original_filename: original_filename.as_deref(),
928 error: Some(reason.clone()),
929 memory_id: None,
930 action: None,
931 body_length: 0,
932 })?;
933 }
934 SlotMeta::Process {
935 file_str,
936 derived_name,
937 name_truncated,
938 original_name,
939 original_filename,
940 } => {
941 output::emit_json_compact(&IngestFileEvent {
942 file: file_str,
943 name: derived_name,
944 status: "preview",
945 truncated: *name_truncated,
946 original_name: original_name.clone(),
947 original_filename: original_filename.as_deref(),
948 error: None,
949 memory_id: None,
950 action: None,
951 body_length: 0,
952 })?;
953 }
954 }
955 }
956 output::emit_json_compact(&IngestSummary {
957 summary: true,
958 dir: args.dir.to_string_lossy().into_owned(),
959 pattern: args.pattern.clone(),
960 recursive: args.recursive,
961 files_total: total,
962 files_succeeded: 0,
963 files_failed: 0,
964 files_skipped: 0,
965 elapsed_ms: started.elapsed().as_millis() as u64,
966 })?;
967 return Ok(());
968 }
969
970 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
973
974 let pool = rayon::ThreadPoolBuilder::new()
975 .num_threads(parallelism)
976 .build()
977 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
978
979 if args.enable_ner && args.skip_extraction {
980 tracing::warn!(
981 "--enable-ner and --skip-extraction are contradictory; --enable-ner takes precedence"
982 );
983 }
984 if args.skip_extraction && !args.enable_ner {
985 tracing::warn!("--skip-extraction is deprecated and has no effect (NER is disabled by default since v1.0.45); remove this flag");
986 }
987 let enable_ner = args.enable_ner;
988 let max_rss_mb = args.max_rss_mb;
989 let gliner_variant: crate::extraction::GlinerVariant =
990 args.gliner_variant.parse().unwrap_or_else(|e| {
991 tracing::warn!("invalid --gliner-variant: {e}; using fp32");
992 crate::extraction::GlinerVariant::Fp32
993 });
994
995 let total_to_process = process_items.len();
996 tracing::info!(
997 target = "ingest",
998 phase = "pipeline_start",
999 files = total_to_process,
1000 ingest_parallelism = parallelism,
1001 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1002 );
1003
1004 let channel_bound = (parallelism * 2).max(1);
1008 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1009
1010 let paths_owned = paths.clone();
1015 let producer_handle = std::thread::spawn(move || {
1016 pool.install(|| {
1017 process_items.into_par_iter().for_each(|item| {
1018 let t0 = std::time::Instant::now();
1019 let result = stage_file(
1020 item.idx,
1021 &item.path,
1022 &item.derived_name,
1023 &paths_owned,
1024 enable_ner,
1025 gliner_variant,
1026 max_rss_mb,
1027 );
1028 let elapsed_ms = t0.elapsed().as_millis() as u64;
1029
1030 let (n_entities, n_relationships) = match &result {
1033 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1034 Err(_) => (0, 0),
1035 };
1036 let progress = StageProgressEvent {
1037 schema_version: 1,
1038 event: "file_extracted",
1039 path: &item.file_str,
1040 ms: elapsed_ms,
1041 entities: n_entities,
1042 relationships: n_relationships,
1043 };
1044 if let Ok(line) = serde_json::to_string(&progress) {
1045 eprintln!("{line}");
1046 }
1047
1048 let _ = tx.send((item.idx, result));
1052 });
1053 drop(tx);
1055 });
1056 });
1057
1058 let fail_fast = args.fail_fast;
1070
1071 for meta in &slots_meta {
1073 if let SlotMeta::Skip {
1074 file_str,
1075 derived_base,
1076 name_truncated,
1077 original_name,
1078 original_filename,
1079 reason,
1080 } = meta
1081 {
1082 output::emit_json_compact(&IngestFileEvent {
1083 file: file_str,
1084 name: derived_base,
1085 status: "skipped",
1086 truncated: *name_truncated,
1087 original_name: original_name.clone(),
1088 original_filename: original_filename.as_deref(),
1089 error: Some(reason.clone()),
1090 memory_id: None,
1091 action: None,
1092 body_length: 0,
1093 })?;
1094 skipped += 1;
1095 }
1096 }
1097
1098 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1101 .iter()
1102 .enumerate()
1103 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1104 .collect();
1105
1106 tracing::info!(
1107 target = "ingest",
1108 phase = "persist_start",
1109 files = total_to_process,
1110 "phase B starting: persisting files incrementally as Phase A completes each one",
1111 );
1112
1113 for (idx, stage_result) in rx {
1117 let meta = meta_index.get(&idx).ok_or_else(|| {
1118 AppError::Internal(anyhow::anyhow!(
1119 "channel idx {idx} has no corresponding Process slot"
1120 ))
1121 })?;
1122 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1123 {
1124 SlotMeta::Process {
1125 file_str,
1126 derived_name,
1127 name_truncated,
1128 original_name,
1129 original_filename,
1130 } => (
1131 file_str,
1132 derived_name,
1133 name_truncated,
1134 original_name,
1135 original_filename,
1136 ),
1137 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1138 };
1139
1140 let conn = match conn_or_err.as_mut() {
1142 Ok(c) => c,
1143 Err(err_msg) => {
1144 let err_clone = err_msg.clone();
1145 output::emit_json_compact(&IngestFileEvent {
1146 file: file_str,
1147 name: derived_name,
1148 status: "failed",
1149 truncated: *name_truncated,
1150 original_name: original_name.clone(),
1151 original_filename: original_filename.as_deref(),
1152 error: Some(err_clone.clone()),
1153 memory_id: None,
1154 action: None,
1155 body_length: 0,
1156 })?;
1157 failed += 1;
1158 if fail_fast {
1159 output::emit_json_compact(&IngestSummary {
1160 summary: true,
1161 dir: args.dir.display().to_string(),
1162 pattern: args.pattern.clone(),
1163 recursive: args.recursive,
1164 files_total: total,
1165 files_succeeded: succeeded,
1166 files_failed: failed,
1167 files_skipped: skipped,
1168 elapsed_ms: started.elapsed().as_millis() as u64,
1169 })?;
1170 return Err(AppError::Validation(format!(
1171 "ingest aborted on first failure: {err_clone}"
1172 )));
1173 }
1174 continue;
1175 }
1176 };
1177
1178 let outcome =
1179 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1180
1181 match outcome {
1182 Ok(FileSuccess {
1183 memory_id,
1184 action,
1185 body_length,
1186 }) => {
1187 output::emit_json_compact(&IngestFileEvent {
1188 file: file_str,
1189 name: derived_name,
1190 status: "indexed",
1191 truncated: *name_truncated,
1192 original_name: original_name.clone(),
1193 original_filename: original_filename.as_deref(),
1194 error: None,
1195 memory_id: Some(memory_id),
1196 action: Some(action),
1197 body_length,
1198 })?;
1199 succeeded += 1;
1200 }
1201 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1202 output::emit_json_compact(&IngestFileEvent {
1203 file: file_str,
1204 name: derived_name,
1205 status: "skipped",
1206 truncated: *name_truncated,
1207 original_name: original_name.clone(),
1208 original_filename: original_filename.as_deref(),
1209 error: Some(format!("{e}")),
1210 memory_id: None,
1211 action: Some("duplicate".to_string()),
1212 body_length: 0,
1213 })?;
1214 skipped += 1;
1215 }
1216 Err(e) => {
1217 let err_msg = format!("{e}");
1218 output::emit_json_compact(&IngestFileEvent {
1219 file: file_str,
1220 name: derived_name,
1221 status: "failed",
1222 truncated: *name_truncated,
1223 original_name: original_name.clone(),
1224 original_filename: original_filename.as_deref(),
1225 error: Some(err_msg.clone()),
1226 memory_id: None,
1227 action: None,
1228 body_length: 0,
1229 })?;
1230 failed += 1;
1231 if fail_fast {
1232 output::emit_json_compact(&IngestSummary {
1233 summary: true,
1234 dir: args.dir.display().to_string(),
1235 pattern: args.pattern.clone(),
1236 recursive: args.recursive,
1237 files_total: total,
1238 files_succeeded: succeeded,
1239 files_failed: failed,
1240 files_skipped: skipped,
1241 elapsed_ms: started.elapsed().as_millis() as u64,
1242 })?;
1243 return Err(AppError::Validation(format!(
1244 "ingest aborted on first failure: {err_msg}"
1245 )));
1246 }
1247 }
1248 }
1249 }
1250
1251 producer_handle
1253 .join()
1254 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1255
1256 if let Ok(ref conn) = conn_or_err {
1257 if succeeded > 0 {
1258 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1259 }
1260 }
1261
1262 output::emit_json_compact(&IngestSummary {
1263 summary: true,
1264 dir: args.dir.display().to_string(),
1265 pattern: args.pattern.clone(),
1266 recursive: args.recursive,
1267 files_total: total,
1268 files_succeeded: succeeded,
1269 files_failed: failed,
1270 files_skipped: skipped,
1271 elapsed_ms: started.elapsed().as_millis() as u64,
1272 })?;
1273
1274 Ok(())
1275}
1276
1277fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1283 ensure_db_ready(paths)?;
1284 let conn = open_rw(&paths.db)?;
1285 Ok(conn)
1286}
1287
1288pub(crate) fn collect_files(
1289 dir: &Path,
1290 pattern: &str,
1291 recursive: bool,
1292 out: &mut Vec<PathBuf>,
1293) -> Result<(), AppError> {
1294 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1295 for entry in entries {
1296 let entry = entry.map_err(AppError::Io)?;
1297 let path = entry.path();
1298 let file_type = entry.file_type().map_err(AppError::Io)?;
1299 if file_type.is_file() {
1300 let name = entry.file_name();
1301 let name_str = name.to_string_lossy();
1302 if matches_pattern(&name_str, pattern) {
1303 out.push(path);
1304 }
1305 } else if file_type.is_dir() && recursive {
1306 collect_files(&path, pattern, recursive, out)?;
1307 }
1308 }
1309 Ok(())
1310}
1311
1312fn matches_pattern(name: &str, pattern: &str) -> bool {
1313 if let Some(suffix) = pattern.strip_prefix('*') {
1314 name.ends_with(suffix)
1315 } else if let Some(prefix) = pattern.strip_suffix('*') {
1316 name.starts_with(prefix)
1317 } else {
1318 name == pattern
1319 }
1320}
1321
1322pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1333 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1334 let lowered: String = stem
1335 .nfd()
1336 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1337 .map(|c| {
1338 if c == '_' || c.is_whitespace() {
1339 '-'
1340 } else {
1341 c
1342 }
1343 })
1344 .map(|c| c.to_ascii_lowercase())
1345 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1346 .collect();
1347 let collapsed = collapse_dashes(&lowered);
1348 let trimmed_raw = collapsed.trim_matches('-').to_string();
1349 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1351 format!("doc-{trimmed_raw}")
1352 } else {
1353 trimmed_raw
1354 };
1355 if trimmed.len() > max_len {
1356 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1357 tracing::debug!(
1358 target: "ingest",
1359 original = %trimmed,
1360 truncated_to = %truncated,
1361 max_len = max_len,
1362 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1363 );
1364 (truncated, true, Some(trimmed))
1365 } else {
1366 (trimmed, false, None)
1367 }
1368}
1369
1370fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1383 if !taken.contains(base) {
1384 return Ok(base.to_string());
1385 }
1386 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1387 let candidate = format!("{base}-{suffix}");
1388 if !taken.contains(&candidate) {
1389 tracing::warn!(
1390 target: "ingest",
1391 base = %base,
1392 resolved = %candidate,
1393 suffix,
1394 "memory name collision resolved with numeric suffix"
1395 );
1396 return Ok(candidate);
1397 }
1398 }
1399 Err(AppError::Validation(format!(
1400 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1401 )))
1402}
1403
1404fn collapse_dashes(s: &str) -> String {
1405 let mut out = String::with_capacity(s.len());
1406 let mut prev_dash = false;
1407 for c in s.chars() {
1408 if c == '-' {
1409 if !prev_dash {
1410 out.push('-');
1411 }
1412 prev_dash = true;
1413 } else {
1414 out.push(c);
1415 prev_dash = false;
1416 }
1417 }
1418 out
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423 use super::*;
1424 use std::path::PathBuf;
1425
1426 #[test]
1427 fn matches_pattern_suffix() {
1428 assert!(matches_pattern("foo.md", "*.md"));
1429 assert!(!matches_pattern("foo.txt", "*.md"));
1430 assert!(matches_pattern("foo.md", "*"));
1431 }
1432
1433 #[test]
1434 fn matches_pattern_prefix() {
1435 assert!(matches_pattern("README.md", "README*"));
1436 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1437 }
1438
1439 #[test]
1440 fn matches_pattern_exact() {
1441 assert!(matches_pattern("README.md", "README.md"));
1442 assert!(!matches_pattern("readme.md", "README.md"));
1443 }
1444
1445 #[test]
1446 fn derive_kebab_underscore_to_dash() {
1447 let p = PathBuf::from("/tmp/claude_code_headless.md");
1448 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1449 assert_eq!(name, "claude-code-headless");
1450 assert!(!truncated);
1451 assert!(original.is_none());
1452 }
1453
1454 #[test]
1455 fn derive_kebab_uppercase_lowered() {
1456 let p = PathBuf::from("/tmp/README.md");
1457 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1458 assert_eq!(name, "readme");
1459 assert!(!truncated);
1460 assert!(original.is_none());
1461 }
1462
1463 #[test]
1464 fn derive_kebab_strips_non_kebab_chars() {
1465 let p = PathBuf::from("/tmp/some@weird#name!.md");
1466 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1467 assert_eq!(name, "someweirdname");
1468 assert!(!truncated);
1469 assert!(original.is_none());
1470 }
1471
1472 #[test]
1475 fn derive_kebab_folds_accented_letters_to_ascii() {
1476 let p = PathBuf::from("/tmp/açaí.md");
1477 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1478 assert_eq!(name, "acai", "got '{name}'");
1479 }
1480
1481 #[test]
1482 fn derive_kebab_handles_naive_with_diaeresis() {
1483 let p = PathBuf::from("/tmp/naïve-test.md");
1484 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1485 assert_eq!(name, "naive-test", "got '{name}'");
1486 }
1487
1488 #[test]
1489 fn derive_kebab_drops_emoji_keeps_word() {
1490 let p = PathBuf::from("/tmp/🚀-rocket.md");
1491 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1492 assert_eq!(name, "rocket", "got '{name}'");
1493 }
1494
1495 #[test]
1496 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1497 let p = PathBuf::from("/tmp/açaí🦜.md");
1498 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1499 assert_eq!(name, "acai", "got '{name}'");
1500 }
1501
1502 #[test]
1503 fn derive_kebab_pure_emoji_yields_empty() {
1504 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1505 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1506 assert!(name.is_empty(), "got '{name}'");
1507 }
1508
1509 #[test]
1510 fn derive_kebab_collapses_consecutive_dashes() {
1511 let p = PathBuf::from("/tmp/a__b___c.md");
1512 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1513 assert_eq!(name, "a-b-c");
1514 assert!(!truncated);
1515 assert!(original.is_none());
1516 }
1517
1518 #[test]
1519 fn derive_kebab_truncates_to_60_chars() {
1520 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1521 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1522 assert!(name.len() <= 60, "got len {}", name.len());
1523 assert!(truncated);
1524 assert!(original.is_some());
1525 assert!(original.unwrap().len() > 60);
1526 }
1527
1528 #[test]
1529 fn collect_files_finds_md_files() {
1530 let tmp = tempfile::tempdir().expect("tempdir");
1531 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1532 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1533 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1534 let mut out = Vec::new();
1535 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1536 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1537 }
1538
1539 #[test]
1540 fn collect_files_recursive_descends_subdirs() {
1541 let tmp = tempfile::tempdir().expect("tempdir");
1542 let sub = tmp.path().join("sub");
1543 std::fs::create_dir(&sub).unwrap();
1544 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1545 std::fs::write(sub.join("b.md"), "y").unwrap();
1546 let mut out = Vec::new();
1547 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1548 assert_eq!(out.len(), 2);
1549 }
1550
1551 #[test]
1552 fn collect_files_non_recursive_skips_subdirs() {
1553 let tmp = tempfile::tempdir().expect("tempdir");
1554 let sub = tmp.path().join("sub");
1555 std::fs::create_dir(&sub).unwrap();
1556 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1557 std::fs::write(sub.join("b.md"), "y").unwrap();
1558 let mut out = Vec::new();
1559 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1560 assert_eq!(out.len(), 1);
1561 }
1562
1563 #[test]
1566 fn derive_kebab_long_basename_truncated_within_cap() {
1567 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1568 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1569 assert!(
1570 name.len() <= DERIVED_NAME_MAX_LEN,
1571 "truncated name must respect cap; got {} chars",
1572 name.len()
1573 );
1574 assert!(!name.is_empty());
1575 assert!(truncated);
1576 assert!(original.is_some());
1577 }
1578
1579 #[test]
1580 fn unique_name_returns_base_when_free() {
1581 let taken: BTreeSet<String> = BTreeSet::new();
1582 let resolved = unique_name("note", &taken).expect("must resolve");
1583 assert_eq!(resolved, "note");
1584 }
1585
1586 #[test]
1587 fn unique_name_appends_first_free_suffix_on_collision() {
1588 let mut taken: BTreeSet<String> = BTreeSet::new();
1589 taken.insert("note".to_string());
1590 taken.insert("note-1".to_string());
1591 let resolved = unique_name("note", &taken).expect("must resolve");
1592 assert_eq!(resolved, "note-2");
1593 }
1594
1595 #[test]
1596 fn unique_name_errors_after_collision_cap() {
1597 let mut taken: BTreeSet<String> = BTreeSet::new();
1598 taken.insert("note".to_string());
1599 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1600 taken.insert(format!("note-{i}"));
1601 }
1602 let err = unique_name("note", &taken).expect_err("must surface error");
1603 assert!(matches!(err, AppError::Validation(_)));
1604 }
1605
1606 #[test]
1609 fn validate_relation_format_accepts_valid_relations() {
1610 use crate::parsers::{is_canonical_relation, validate_relation_format};
1611 assert!(validate_relation_format("applies_to").is_ok());
1612 assert!(validate_relation_format("depends_on").is_ok());
1613 assert!(validate_relation_format("implements").is_ok());
1614 assert!(validate_relation_format("").is_err());
1615 assert!(is_canonical_relation("applies_to"));
1616 assert!(!is_canonical_relation("implements"));
1617 }
1618
1619 use serial_test::serial;
1622
1623 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1625 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1626 let prev = std::env::var(key).ok();
1627 match value {
1628 Some(v) => std::env::set_var(key, v),
1629 None => std::env::remove_var(key),
1630 }
1631 f();
1632 match prev {
1633 Some(p) => std::env::set_var(key, p),
1634 None => std::env::remove_var(key),
1635 }
1636 }
1637
1638 #[test]
1639 #[serial]
1640 fn env_low_memory_enabled_unset_returns_false() {
1641 with_env_var(None, || assert!(!env_low_memory_enabled()));
1642 }
1643
1644 #[test]
1645 #[serial]
1646 fn env_low_memory_enabled_empty_returns_false() {
1647 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1648 }
1649
1650 #[test]
1651 #[serial]
1652 fn env_low_memory_enabled_truthy_values_return_true() {
1653 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1654 with_env_var(Some(v), || {
1655 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1656 });
1657 }
1658 }
1659
1660 #[test]
1661 #[serial]
1662 fn env_low_memory_enabled_falsy_values_return_false() {
1663 for v in ["0", "false", "FALSE", "no", "off"] {
1664 with_env_var(Some(v), || {
1665 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1666 });
1667 }
1668 }
1669
1670 #[test]
1671 #[serial]
1672 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1673 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1674 }
1675
1676 #[test]
1677 #[serial]
1678 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1679 with_env_var(None, || {
1680 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1681 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1682 assert_eq!(resolve_parallelism(true, None), 1);
1683 });
1684 }
1685
1686 #[test]
1687 #[serial]
1688 fn resolve_parallelism_env_forces_one_when_flag_off() {
1689 with_env_var(Some("1"), || {
1690 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1691 assert_eq!(resolve_parallelism(false, None), 1);
1692 });
1693 }
1694
1695 #[test]
1696 #[serial]
1697 fn resolve_parallelism_falsy_env_does_not_override() {
1698 with_env_var(Some("0"), || {
1699 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1700 });
1701 }
1702
1703 #[test]
1704 #[serial]
1705 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1706 with_env_var(None, || {
1707 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1708 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1709 });
1710 }
1711
1712 #[test]
1713 #[serial]
1714 fn resolve_parallelism_default_when_unset() {
1715 with_env_var(None, || {
1716 let p = resolve_parallelism(false, None);
1717 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1718 });
1719 }
1720
1721 #[test]
1722 fn ingest_args_parses_low_memory_flag_via_clap() {
1723 use clap::Parser;
1724 let cli = crate::cli::Cli::try_parse_from([
1727 "sqlite-graphrag",
1728 "ingest",
1729 "/tmp/dummy",
1730 "--type",
1731 "document",
1732 "--low-memory",
1733 ])
1734 .expect("parse must succeed");
1735 match cli.command {
1736 crate::cli::Commands::Ingest(args) => {
1737 assert!(args.low_memory, "--low-memory must set field to true");
1738 }
1739 _ => panic!("expected Ingest subcommand"),
1740 }
1741 }
1742
1743 #[test]
1744 fn ingest_args_low_memory_defaults_false() {
1745 use clap::Parser;
1746 let cli = crate::cli::Cli::try_parse_from([
1747 "sqlite-graphrag",
1748 "ingest",
1749 "/tmp/dummy",
1750 "--type",
1751 "document",
1752 ])
1753 .expect("parse must succeed");
1754 match cli.command {
1755 crate::cli::Commands::Ingest(args) => {
1756 assert!(!args.low_memory, "default must be false");
1757 }
1758 _ => panic!("expected Ingest subcommand"),
1759 }
1760 }
1761}