1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
118 )]
119 pub enable_ner: bool,
120 #[arg(
121 long,
122 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123 default_value = "fp32",
124 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
125 )]
126 pub gliner_variant: String,
127
128 #[arg(long, default_value_t = false, hide = true)]
130 pub skip_extraction: bool,
131
132 #[arg(long, default_value_t = false)]
134 pub fail_fast: bool,
135
136 #[arg(long, default_value_t = false)]
138 pub dry_run: bool,
139
140 #[arg(long, default_value_t = 10_000)]
142 pub max_files: usize,
143
144 #[arg(long)]
146 pub namespace: Option<String>,
147
148 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150 pub db: Option<String>,
151
152 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153 pub format: JsonOutputFormat,
154
155 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156 pub json: bool,
157
158 #[arg(
160 long,
161 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162 )]
163 pub ingest_parallelism: Option<usize>,
164
165 #[arg(
173 long,
174 default_value_t = false,
175 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176 Recommended for environments with <4 GB available RAM or container/cgroup \
177 constraints. Trade-off: 3-4x longer wall time. Also honored via \
178 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179 )]
180 pub low_memory: bool,
181
182 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185 pub max_rss_mb: u64,
186
187 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
192 help = "Maximum length for derived memory names (default: 60)")]
193 pub max_name_length: usize,
194
195 #[arg(long, value_enum, default_value_t = IngestMode::None)]
197 pub mode: IngestMode,
198
199 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
201 pub claude_binary: Option<std::path::PathBuf>,
202
203 #[arg(long)]
205 pub claude_model: Option<String>,
206
207 #[arg(long, default_value_t = false)]
209 pub resume: bool,
210
211 #[arg(long, default_value_t = false)]
213 pub retry_failed: bool,
214
215 #[arg(long, default_value_t = false)]
217 pub keep_queue: bool,
218
219 #[arg(long, default_value = ".ingest-queue.sqlite")]
221 pub queue_db: String,
222
223 #[arg(long, default_value_t = 60)]
225 pub rate_limit_wait: u64,
226
227 #[arg(long)]
229 pub max_cost_usd: Option<f64>,
230
231 #[arg(
233 long,
234 default_value_t = 300,
235 help = "Timeout in seconds for each claude -p invocation (default: 300)"
236 )]
237 pub claude_timeout: u64,
238
239 #[arg(
241 long,
242 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
243 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
244 )]
245 pub codex_binary: Option<PathBuf>,
246
247 #[arg(
249 long,
250 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
251 )]
252 pub codex_model: Option<String>,
253
254 #[arg(
256 long,
257 default_value_t = 300,
258 help = "Timeout in seconds for each codex exec invocation (default: 300)"
259 )]
260 pub codex_timeout: u64,
261
262 #[arg(long, value_name = "SECONDS")]
265 pub wait_job_singleton: Option<u64>,
266
267 #[arg(long, default_value_t = false)]
270 pub force_job_singleton: bool,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
275pub enum IngestMode {
276 None,
278 Gliner,
280 ClaudeCode,
282 Codex,
284}
285
286fn env_low_memory_enabled() -> bool {
291 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
292 Ok(v) if v.is_empty() => false,
293 Ok(v) => match v.to_lowercase().as_str() {
294 "1" | "true" | "yes" | "on" => true,
295 "0" | "false" | "no" | "off" => false,
296 other => {
297 tracing::warn!(
298 target: "ingest",
299 value = %other,
300 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
301 );
302 false
303 }
304 },
305 Err(_) => false,
306 }
307}
308
309fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
321 let env_flag = env_low_memory_enabled();
322 let low_memory = low_memory_flag || env_flag;
323
324 if low_memory {
325 if let Some(n) = ingest_parallelism {
326 if n > 1 {
327 tracing::warn!(
328 target: "ingest",
329 requested = n,
330 "--ingest-parallelism overridden by --low-memory; using 1"
331 );
332 }
333 }
334 if low_memory_flag {
335 tracing::info!(
336 target: "ingest",
337 source = "flag",
338 "low-memory mode enabled: forcing --ingest-parallelism 1"
339 );
340 } else {
341 tracing::info!(
342 target: "ingest",
343 source = "env",
344 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
345 );
346 }
347 return 1;
348 }
349
350 ingest_parallelism
351 .unwrap_or_else(|| {
352 std::thread::available_parallelism()
353 .map(|v| v.get() / 2)
354 .unwrap_or(1)
355 .clamp(1, 4)
356 })
357 .max(1)
358}
359
360#[derive(Serialize)]
361struct IngestFileEvent<'a> {
362 file: &'a str,
363 name: &'a str,
364 status: &'a str,
365 truncated: bool,
367 #[serde(skip_serializing_if = "Option::is_none")]
369 original_name: Option<String>,
370 #[serde(skip_serializing_if = "Option::is_none")]
372 original_filename: Option<&'a str>,
373 #[serde(skip_serializing_if = "Option::is_none")]
374 error: Option<String>,
375 #[serde(skip_serializing_if = "Option::is_none")]
376 memory_id: Option<i64>,
377 #[serde(skip_serializing_if = "Option::is_none")]
378 action: Option<String>,
379 body_length: usize,
381}
382
383#[derive(Serialize)]
384struct IngestSummary {
385 summary: bool,
386 dir: String,
387 pattern: String,
388 recursive: bool,
389 files_total: usize,
390 files_succeeded: usize,
391 files_failed: usize,
392 files_skipped: usize,
393 elapsed_ms: u64,
394}
395
396struct FileSuccess {
398 memory_id: i64,
399 action: String,
400 body_length: usize,
401}
402
403#[derive(Serialize)]
406struct StageProgressEvent<'a> {
407 schema_version: u8,
408 event: &'a str,
409 path: &'a str,
410 ms: u64,
411 entities: usize,
412 relationships: usize,
413}
414
415struct StagedFile {
418 body: String,
419 body_hash: String,
420 snippet: String,
421 name: String,
422 description: String,
423 embedding: Vec<f32>,
424 chunk_embeddings: Option<Vec<Vec<f32>>>,
425 chunks_info: Vec<crate::chunking::Chunk>,
426 entities: Vec<NewEntity>,
427 relationships: Vec<NewRelationship>,
428 entity_embeddings: Vec<Vec<f32>>,
429 urls: Vec<crate::extraction::ExtractedUrl>,
430}
431
432fn stage_file(
435 _idx: usize,
436 path: &Path,
437 name: &str,
438 paths: &AppPaths,
439 enable_ner: bool,
440 gliner_variant: crate::extraction::GlinerVariant,
441 max_rss_mb: u64,
442) -> Result<StagedFile, AppError> {
443 use crate::constants::*;
444
445 if name.len() > MAX_MEMORY_NAME_LEN {
446 return Err(AppError::LimitExceeded(
447 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
448 ));
449 }
450 if name.starts_with("__") {
451 return Err(AppError::Validation(
452 crate::i18n::validation::reserved_name(),
453 ));
454 }
455 {
456 let slug_re = crate::constants::name_slug_regex();
457 if !slug_re.is_match(name) {
458 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
459 name,
460 )));
461 }
462 }
463
464 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
465 if file_size > MAX_MEMORY_BODY_LEN as u64 {
466 return Err(AppError::LimitExceeded(
467 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
468 ));
469 }
470 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
471 if raw_body.len() > MAX_MEMORY_BODY_LEN {
472 return Err(AppError::LimitExceeded(
473 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
474 ));
475 }
476 if raw_body.trim().is_empty() {
477 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
478 }
479
480 let description = format!("ingested from {}", path.display());
481 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
482 return Err(AppError::Validation(
483 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
484 ));
485 }
486
487 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
488 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
489 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
490 if enable_ner {
491 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
492 Ok(extracted) => {
493 extracted_urls = extracted.urls;
494 extracted_entities = extracted.entities;
495 extracted_relationships = extracted.relationships;
496
497 if extracted_entities.len() > max_entities_per_memory() {
498 extracted_entities.truncate(max_entities_per_memory());
499 }
500 if extracted_relationships.len() > max_relationships_per_memory() {
501 extracted_relationships.truncate(max_relationships_per_memory());
502 }
503 }
504 Err(e) => {
505 tracing::warn!(
506 target: "ingest",
507 file = %path.display(),
508 "auto-extraction failed (graceful degradation): {e:#}"
509 );
510 }
511 }
512 }
513
514 for rel in &mut extracted_relationships {
515 rel.relation = crate::parsers::normalize_relation(&rel.relation);
516 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
517 return Err(AppError::Validation(format!(
518 "{e} for relationship '{}' -> '{}'",
519 rel.source, rel.target
520 )));
521 }
522 crate::parsers::warn_if_non_canonical(&rel.relation);
523 if !(0.0..=1.0).contains(&rel.strength) {
524 return Err(AppError::Validation(format!(
525 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
526 rel.strength, rel.source, rel.target
527 )));
528 }
529 }
530
531 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
532 let snippet: String = raw_body.chars().take(200).collect();
533
534 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
535 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
536 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
537 return Err(AppError::LimitExceeded(format!(
538 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
539 chunks_info.len(),
540 REMEMBER_MAX_SAFE_MULTI_CHUNKS
541 )));
542 }
543
544 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
545 let embedding = if chunks_info.len() == 1 {
546 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
547 } else {
548 let chunk_texts: Vec<&str> = chunks_info
549 .iter()
550 .map(|c| chunking::chunk_text(&raw_body, c))
551 .collect();
552 let embed_cap = chunk_texts.len();
553 let mut chunk_embeddings = Vec::new();
554 chunk_embeddings.try_reserve(embed_cap).map_err(|_| {
555 AppError::LimitExceeded(format!(
556 "allocation of {embed_cap} chunk embeddings would exceed available memory"
557 ))
558 })?;
559 for chunk_text in &chunk_texts {
560 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
561 if rss > max_rss_mb {
562 tracing::error!(
563 target: "ingest",
564 rss_mb = rss,
565 max_rss_mb = max_rss_mb,
566 file = %path.display(),
567 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
568 );
569 return Err(AppError::LowMemory {
570 available_mb: crate::memory_guard::available_memory_mb(),
571 required_mb: max_rss_mb,
572 });
573 }
574 }
575 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
576 &paths.models,
577 chunk_text,
578 )?);
579 }
580 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
581 chunk_embeddings_opt = Some(chunk_embeddings);
582 aggregated
583 };
584
585 let entity_embeddings = extracted_entities
586 .iter()
587 .map(|entity| {
588 let entity_text = match &entity.description {
589 Some(desc) => format!("{} {}", entity.name, desc),
590 None => entity.name.clone(),
591 };
592 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
593 })
594 .collect::<Result<Vec<_>, _>>()?;
595
596 Ok(StagedFile {
597 body: raw_body,
598 body_hash,
599 snippet,
600 name: name.to_string(),
601 description,
602 embedding,
603 chunk_embeddings: chunk_embeddings_opt,
604 chunks_info,
605 entities: extracted_entities,
606 relationships: extracted_relationships,
607 entity_embeddings,
608 urls: extracted_urls,
609 })
610}
611
612fn persist_staged(
614 conn: &mut Connection,
615 namespace: &str,
616 memory_type: &str,
617 staged: StagedFile,
618) -> Result<FileSuccess, AppError> {
619 {
620 let active_count: u32 = conn.query_row(
621 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
622 [],
623 |r| r.get::<_, i64>(0).map(|v| v as u32),
624 )?;
625 let ns_exists: bool = conn.query_row(
626 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
627 rusqlite::params![namespace],
628 |r| r.get::<_, i64>(0).map(|v| v > 0),
629 )?;
630 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
631 return Err(AppError::NamespaceError(format!(
632 "active namespace limit of {} exceeded while creating '{namespace}'",
633 crate::constants::MAX_NAMESPACES_ACTIVE
634 )));
635 }
636 }
637
638 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
639 if existing_memory.is_some() {
640 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
641 &staged.name,
642 namespace,
643 )));
644 }
645 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
646
647 let new_memory = NewMemory {
648 namespace: namespace.to_string(),
649 name: staged.name.clone(),
650 memory_type: memory_type.to_string(),
651 description: staged.description.clone(),
652 body: staged.body,
653 body_hash: staged.body_hash,
654 session_id: None,
655 source: "agent".to_string(),
656 metadata: serde_json::json!({}),
657 };
658
659 if let Some(hash_id) = duplicate_hash_id {
660 tracing::debug!(
661 target: "ingest",
662 duplicate_memory_id = hash_id,
663 "identical body already exists; persisting a new memory anyway"
664 );
665 }
666
667 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
668
669 let memory_id = memories::insert(&tx, &new_memory)?;
670 versions::insert_version(
671 &tx,
672 memory_id,
673 1,
674 &staged.name,
675 memory_type,
676 &staged.description,
677 &new_memory.body,
678 &serde_json::to_string(&new_memory.metadata)?,
679 None,
680 "create",
681 )?;
682 memories::upsert_vec(
683 &tx,
684 memory_id,
685 namespace,
686 memory_type,
687 &staged.embedding,
688 &staged.name,
689 &staged.snippet,
690 )?;
691
692 if staged.chunks_info.len() > 1 {
693 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
694 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
695 AppError::Internal(anyhow::anyhow!(
696 "missing chunk embeddings cache on multi-chunk ingest path"
697 ))
698 })?;
699 for (i, emb) in chunk_embeddings.iter().enumerate() {
700 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
701 }
702 }
703
704 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
705 for (idx, entity) in staged.entities.iter().enumerate() {
706 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
707 let entity_embedding = &staged.entity_embeddings[idx];
708 entities::upsert_entity_vec(
709 &tx,
710 entity_id,
711 namespace,
712 entity.entity_type,
713 entity_embedding,
714 &entity.name,
715 )?;
716 entities::link_memory_entity(&tx, memory_id, entity_id)?;
717 entities::increment_degree(&tx, entity_id)?;
718 }
719 let entity_types: std::collections::HashMap<&str, EntityType> = staged
720 .entities
721 .iter()
722 .map(|entity| (entity.name.as_str(), entity.entity_type))
723 .collect();
724 for rel in &staged.relationships {
725 let source_entity = NewEntity {
726 name: rel.source.clone(),
727 entity_type: entity_types
728 .get(rel.source.as_str())
729 .copied()
730 .unwrap_or(EntityType::Concept),
731 description: None,
732 };
733 let target_entity = NewEntity {
734 name: rel.target.clone(),
735 entity_type: entity_types
736 .get(rel.target.as_str())
737 .copied()
738 .unwrap_or(EntityType::Concept),
739 description: None,
740 };
741 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
742 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
743 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
744 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
745 }
746 }
747
748 tx.commit()?;
749
750 if !staged.urls.is_empty() {
751 let url_entries: Vec<storage_urls::MemoryUrl> = staged
752 .urls
753 .into_iter()
754 .map(|u| storage_urls::MemoryUrl {
755 url: u.url,
756 offset: Some(u.offset as i64),
757 })
758 .collect();
759 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
760 }
761
762 Ok(FileSuccess {
763 memory_id,
764 action: "created".to_string(),
765 body_length: new_memory.body.len(),
766 })
767}
768
769#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
770pub fn run(args: IngestArgs) -> Result<(), AppError> {
771 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
785 if args.mode == IngestMode::ClaudeCode {
786 return super::ingest_claude::run_claude_ingest(&args);
787 }
788 if args.mode == IngestMode::Codex {
789 return super::ingest_codex::run_codex_ingest(&args);
790 }
791
792 let started = std::time::Instant::now();
793
794 if !args.dir.exists() {
795 return Err(AppError::Validation(format!(
796 "directory not found: {}",
797 args.dir.display()
798 )));
799 }
800 if !args.dir.is_dir() {
801 return Err(AppError::Validation(format!(
802 "path is not a directory: {}",
803 args.dir.display()
804 )));
805 }
806
807 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
808 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
809 files.sort_unstable();
810
811 if files.len() > args.max_files {
812 return Err(AppError::Validation(format!(
813 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
814 files.len(),
815 args.max_files
816 )));
817 }
818
819 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
820 let memory_type_str = args.r#type.as_str().to_string();
821
822 let paths = AppPaths::resolve(args.db.as_deref())?;
823 let mut conn_or_err = match init_storage(&paths) {
824 Ok(c) => Ok(c),
825 Err(e) => Err(format!("{e}")),
826 };
827
828 let mut succeeded: usize = 0;
829 let mut failed: usize = 0;
830 let mut skipped: usize = 0;
831 let total = files.len();
832
833 let mut taken_names: BTreeSet<String> = BTreeSet::new();
836
837 enum SlotMeta {
843 Skip {
844 file_str: String,
845 derived_base: String,
846 name_truncated: bool,
847 original_name: Option<String>,
848 original_filename: Option<String>,
849 reason: String,
850 },
851 Process {
852 file_str: String,
853 derived_name: String,
854 name_truncated: bool,
855 original_name: Option<String>,
856 original_filename: Option<String>,
857 },
858 }
859
860 struct ProcessItem {
861 idx: usize,
862 path: PathBuf,
863 file_str: String,
864 derived_name: String,
865 }
866
867 let files_cap = files.len();
868 let mut slots_meta: Vec<SlotMeta> = Vec::new();
869 slots_meta.try_reserve(files_cap).map_err(|_| {
870 AppError::LimitExceeded(format!(
871 "allocation of {files_cap} slot metadata entries would exceed available memory"
872 ))
873 })?;
874 let mut process_items: Vec<ProcessItem> = Vec::new();
875 process_items.try_reserve(files_cap).map_err(|_| {
876 AppError::LimitExceeded(format!(
877 "allocation of {files_cap} process items would exceed available memory"
878 ))
879 })?;
880 let mut truncations: Vec<(String, String)> = Vec::new();
881 truncations.try_reserve(files_cap).map_err(|_| {
882 AppError::LimitExceeded(format!(
883 "allocation of {files_cap} truncation entries would exceed available memory"
884 ))
885 })?;
886
887 let max_name_length = args.max_name_length;
888 for path in &files {
889 let file_str = path.to_string_lossy().into_owned();
890 let (derived_base, name_truncated, original_name) =
891 derive_kebab_name(path, max_name_length);
892 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
893
894 if name_truncated {
895 if let Some(ref orig) = original_name {
896 truncations.push((orig.clone(), derived_base.clone()));
897 }
898 }
899
900 if derived_base.is_empty() {
901 let orig_filename = if !original_basename.is_empty() {
903 Some(original_basename.to_string())
904 } else {
905 None
906 };
907 slots_meta.push(SlotMeta::Skip {
908 file_str,
909 derived_base: String::new(),
910 name_truncated: false,
911 original_name: None,
912 original_filename: orig_filename,
913 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
914 });
915 continue;
916 }
917
918 match unique_name(&derived_base, &taken_names) {
919 Ok(derived_name) => {
920 taken_names.insert(derived_name.clone());
921 let idx = slots_meta.len();
922 let orig_filename = if original_basename != derived_name {
924 Some(original_basename.to_string())
925 } else {
926 None
927 };
928 process_items.push(ProcessItem {
929 idx,
930 path: path.clone(),
931 file_str: file_str.clone(),
932 derived_name: derived_name.clone(),
933 });
934 slots_meta.push(SlotMeta::Process {
935 file_str,
936 derived_name,
937 name_truncated,
938 original_name,
939 original_filename: orig_filename,
940 });
941 }
942 Err(e) => {
943 let orig_filename = if original_basename != derived_base {
944 Some(original_basename.to_string())
945 } else {
946 None
947 };
948 slots_meta.push(SlotMeta::Skip {
949 file_str,
950 derived_base,
951 name_truncated,
952 original_name,
953 original_filename: orig_filename,
954 reason: e.to_string(),
955 });
956 }
957 }
958 }
959
960 if !truncations.is_empty() {
961 tracing::info!(
962 target: "ingest",
963 count = truncations.len(),
964 max_name_length = max_name_length,
965 max_len = DERIVED_NAME_MAX_LEN,
966 "derived names truncated; pass -vv (debug) for per-file detail"
967 );
968 }
969
970 if args.dry_run {
972 for meta in &slots_meta {
973 match meta {
974 SlotMeta::Skip {
975 file_str,
976 derived_base,
977 name_truncated,
978 original_name,
979 original_filename,
980 reason,
981 } => {
982 output::emit_json_compact(&IngestFileEvent {
983 file: file_str,
984 name: derived_base,
985 status: "skip",
986 truncated: *name_truncated,
987 original_name: original_name.clone(),
988 original_filename: original_filename.as_deref(),
989 error: Some(reason.clone()),
990 memory_id: None,
991 action: None,
992 body_length: 0,
993 })?;
994 }
995 SlotMeta::Process {
996 file_str,
997 derived_name,
998 name_truncated,
999 original_name,
1000 original_filename,
1001 } => {
1002 output::emit_json_compact(&IngestFileEvent {
1003 file: file_str,
1004 name: derived_name,
1005 status: "preview",
1006 truncated: *name_truncated,
1007 original_name: original_name.clone(),
1008 original_filename: original_filename.as_deref(),
1009 error: None,
1010 memory_id: None,
1011 action: None,
1012 body_length: 0,
1013 })?;
1014 }
1015 }
1016 }
1017 output::emit_json_compact(&IngestSummary {
1018 summary: true,
1019 dir: args.dir.to_string_lossy().into_owned(),
1020 pattern: args.pattern.clone(),
1021 recursive: args.recursive,
1022 files_total: total,
1023 files_succeeded: 0,
1024 files_failed: 0,
1025 files_skipped: 0,
1026 elapsed_ms: started.elapsed().as_millis() as u64,
1027 })?;
1028 return Ok(());
1029 }
1030
1031 if args.low_memory {
1033 if let Some(n) = args.ingest_parallelism {
1034 if n > 1 {
1035 return Err(AppError::Validation(
1036 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1037 .to_string(),
1038 ));
1039 }
1040 }
1041 }
1042
1043 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1046
1047 let pool = rayon::ThreadPoolBuilder::new()
1048 .num_threads(parallelism)
1049 .build()
1050 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1051
1052 if args.enable_ner && args.skip_extraction {
1053 return Err(AppError::Validation(
1054 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1055 ));
1056 }
1057 if args.skip_extraction && !args.enable_ner {
1058 return Err(AppError::Validation(
1059 "--skip-extraction is deprecated since v1.0.45 and has no effect; remove this flag"
1060 .to_string(),
1061 ));
1062 }
1063 let enable_ner = args.enable_ner;
1064 let max_rss_mb = args.max_rss_mb;
1065 let gliner_variant: crate::extraction::GlinerVariant =
1066 args.gliner_variant.parse().unwrap_or_else(|e| {
1067 tracing::warn!(target: "ingest", error = %e, "invalid --gliner-variant, defaulting to fp32");
1068 crate::extraction::GlinerVariant::Fp32
1069 });
1070
1071 let total_to_process = process_items.len();
1072 tracing::info!(
1073 target: "ingest",
1074 phase = "pipeline_start",
1075 files = total_to_process,
1076 ingest_parallelism = parallelism,
1077 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1078 );
1079
1080 let channel_bound = (parallelism * 2).max(1);
1084 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1085
1086 let paths_owned = paths.clone();
1091 let producer_handle = std::thread::spawn(move || {
1092 pool.install(|| {
1093 process_items.into_par_iter().for_each(|item| {
1094 if crate::shutdown_requested() {
1095 return;
1096 }
1097 let t0 = std::time::Instant::now();
1098 let result = stage_file(
1099 item.idx,
1100 &item.path,
1101 &item.derived_name,
1102 &paths_owned,
1103 enable_ner,
1104 gliner_variant,
1105 max_rss_mb,
1106 );
1107 let elapsed_ms = t0.elapsed().as_millis() as u64;
1108
1109 let (n_entities, n_relationships) = match &result {
1112 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1113 Err(_) => (0, 0),
1114 };
1115 let progress = StageProgressEvent {
1116 schema_version: 1,
1117 event: "file_extracted",
1118 path: &item.file_str,
1119 ms: elapsed_ms,
1120 entities: n_entities,
1121 relationships: n_relationships,
1122 };
1123 if let Ok(line) = serde_json::to_string(&progress) {
1124 tracing::info!(target: "ingest_progress", "{}", line);
1125 }
1126
1127 let _ = tx.send((item.idx, result));
1131 });
1132 drop(tx);
1134 });
1135 });
1136
1137 let fail_fast = args.fail_fast;
1149
1150 for meta in &slots_meta {
1152 if let SlotMeta::Skip {
1153 file_str,
1154 derived_base,
1155 name_truncated,
1156 original_name,
1157 original_filename,
1158 reason,
1159 } = meta
1160 {
1161 output::emit_json_compact(&IngestFileEvent {
1162 file: file_str,
1163 name: derived_base,
1164 status: "skipped",
1165 truncated: *name_truncated,
1166 original_name: original_name.clone(),
1167 original_filename: original_filename.as_deref(),
1168 error: Some(reason.clone()),
1169 memory_id: None,
1170 action: None,
1171 body_length: 0,
1172 })?;
1173 skipped += 1;
1174 }
1175 }
1176
1177 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1180 .iter()
1181 .enumerate()
1182 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1183 .collect();
1184
1185 tracing::info!(
1186 target: "ingest",
1187 phase = "persist_start",
1188 files = total_to_process,
1189 "phase B starting: persisting files incrementally as Phase A completes each one",
1190 );
1191
1192 for (idx, stage_result) in rx {
1196 if crate::shutdown_requested() {
1197 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1198 break;
1199 }
1200 let meta = meta_index.get(&idx).ok_or_else(|| {
1201 AppError::Internal(anyhow::anyhow!(
1202 "channel idx {idx} has no corresponding Process slot"
1203 ))
1204 })?;
1205 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1206 {
1207 SlotMeta::Process {
1208 file_str,
1209 derived_name,
1210 name_truncated,
1211 original_name,
1212 original_filename,
1213 } => (
1214 file_str,
1215 derived_name,
1216 name_truncated,
1217 original_name,
1218 original_filename,
1219 ),
1220 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1221 };
1222
1223 let conn = match conn_or_err.as_mut() {
1225 Ok(c) => c,
1226 Err(err_msg) => {
1227 let err_clone = err_msg.clone();
1228 output::emit_json_compact(&IngestFileEvent {
1229 file: file_str,
1230 name: derived_name,
1231 status: "failed",
1232 truncated: *name_truncated,
1233 original_name: original_name.clone(),
1234 original_filename: original_filename.as_deref(),
1235 error: Some(err_clone.clone()),
1236 memory_id: None,
1237 action: None,
1238 body_length: 0,
1239 })?;
1240 failed += 1;
1241 if fail_fast {
1242 output::emit_json_compact(&IngestSummary {
1243 summary: true,
1244 dir: args.dir.display().to_string(),
1245 pattern: args.pattern.clone(),
1246 recursive: args.recursive,
1247 files_total: total,
1248 files_succeeded: succeeded,
1249 files_failed: failed,
1250 files_skipped: skipped,
1251 elapsed_ms: started.elapsed().as_millis() as u64,
1252 })?;
1253 return Err(AppError::Validation(format!(
1254 "ingest aborted on first failure: {err_clone}"
1255 )));
1256 }
1257 continue;
1258 }
1259 };
1260
1261 let outcome =
1262 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1263
1264 match outcome {
1265 Ok(FileSuccess {
1266 memory_id,
1267 action,
1268 body_length,
1269 }) => {
1270 output::emit_json_compact(&IngestFileEvent {
1271 file: file_str,
1272 name: derived_name,
1273 status: "indexed",
1274 truncated: *name_truncated,
1275 original_name: original_name.clone(),
1276 original_filename: original_filename.as_deref(),
1277 error: None,
1278 memory_id: Some(memory_id),
1279 action: Some(action),
1280 body_length,
1281 })?;
1282 succeeded += 1;
1283 }
1284 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1285 output::emit_json_compact(&IngestFileEvent {
1286 file: file_str,
1287 name: derived_name,
1288 status: "skipped",
1289 truncated: *name_truncated,
1290 original_name: original_name.clone(),
1291 original_filename: original_filename.as_deref(),
1292 error: Some(format!("{e}")),
1293 memory_id: None,
1294 action: Some("duplicate".to_string()),
1295 body_length: 0,
1296 })?;
1297 skipped += 1;
1298 }
1299 Err(e) => {
1300 let err_msg = format!("{e}");
1301 output::emit_json_compact(&IngestFileEvent {
1302 file: file_str,
1303 name: derived_name,
1304 status: "failed",
1305 truncated: *name_truncated,
1306 original_name: original_name.clone(),
1307 original_filename: original_filename.as_deref(),
1308 error: Some(err_msg.clone()),
1309 memory_id: None,
1310 action: None,
1311 body_length: 0,
1312 })?;
1313 failed += 1;
1314 if fail_fast {
1315 output::emit_json_compact(&IngestSummary {
1316 summary: true,
1317 dir: args.dir.display().to_string(),
1318 pattern: args.pattern.clone(),
1319 recursive: args.recursive,
1320 files_total: total,
1321 files_succeeded: succeeded,
1322 files_failed: failed,
1323 files_skipped: skipped,
1324 elapsed_ms: started.elapsed().as_millis() as u64,
1325 })?;
1326 return Err(AppError::Validation(format!(
1327 "ingest aborted on first failure: {err_msg}"
1328 )));
1329 }
1330 }
1331 }
1332 }
1333
1334 producer_handle
1336 .join()
1337 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1338
1339 if let Ok(ref conn) = conn_or_err {
1340 if succeeded > 0 {
1341 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1342 }
1343 }
1344
1345 output::emit_json_compact(&IngestSummary {
1346 summary: true,
1347 dir: args.dir.display().to_string(),
1348 pattern: args.pattern.clone(),
1349 recursive: args.recursive,
1350 files_total: total,
1351 files_succeeded: succeeded,
1352 files_failed: failed,
1353 files_skipped: skipped,
1354 elapsed_ms: started.elapsed().as_millis() as u64,
1355 })?;
1356
1357 Ok(())
1358}
1359
1360fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1366 ensure_db_ready(paths)?;
1367 let conn = open_rw(&paths.db)?;
1368 Ok(conn)
1369}
1370
1371pub(crate) fn collect_files(
1372 dir: &Path,
1373 pattern: &str,
1374 recursive: bool,
1375 out: &mut Vec<PathBuf>,
1376) -> Result<(), AppError> {
1377 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1378 for entry in entries {
1379 let entry = entry.map_err(AppError::Io)?;
1380 let path = entry.path();
1381 let file_type = entry.file_type().map_err(AppError::Io)?;
1382 if file_type.is_file() {
1383 let name = entry.file_name();
1384 let name_str = name.to_string_lossy();
1385 if matches_pattern(&name_str, pattern) {
1386 out.push(path);
1387 }
1388 } else if file_type.is_dir() && recursive {
1389 collect_files(&path, pattern, recursive, out)?;
1390 }
1391 }
1392 Ok(())
1393}
1394
1395fn matches_pattern(name: &str, pattern: &str) -> bool {
1396 if let Some(suffix) = pattern.strip_prefix('*') {
1397 name.ends_with(suffix)
1398 } else if let Some(prefix) = pattern.strip_suffix('*') {
1399 name.starts_with(prefix)
1400 } else {
1401 name == pattern
1402 }
1403}
1404
1405pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1416 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1417 let lowered: String = stem
1418 .nfd()
1419 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1420 .map(|c| {
1421 if c == '_' || c.is_whitespace() {
1422 '-'
1423 } else {
1424 c
1425 }
1426 })
1427 .map(|c| c.to_ascii_lowercase())
1428 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1429 .collect();
1430 let collapsed = collapse_dashes(&lowered);
1431 let trimmed_raw = collapsed.trim_matches('-').to_string();
1432 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1434 format!("doc-{trimmed_raw}")
1435 } else {
1436 trimmed_raw
1437 };
1438 if trimmed.len() > max_len {
1439 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1440 tracing::debug!(
1441 target: "ingest",
1442 original = %trimmed,
1443 truncated_to = %truncated,
1444 max_len = max_len,
1445 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1446 );
1447 (truncated, true, Some(trimmed))
1448 } else {
1449 (trimmed, false, None)
1450 }
1451}
1452
1453fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1466 if !taken.contains(base) {
1467 return Ok(base.to_string());
1468 }
1469 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1470 let candidate = format!("{base}-{suffix}");
1471 if !taken.contains(&candidate) {
1472 tracing::warn!(
1473 target: "ingest",
1474 base = %base,
1475 resolved = %candidate,
1476 suffix,
1477 "memory name collision resolved with numeric suffix"
1478 );
1479 return Ok(candidate);
1480 }
1481 }
1482 Err(AppError::Validation(format!(
1483 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1484 )))
1485}
1486
1487fn collapse_dashes(s: &str) -> String {
1488 let mut out = String::with_capacity(s.len());
1489 let mut prev_dash = false;
1490 for c in s.chars() {
1491 if c == '-' {
1492 if !prev_dash {
1493 out.push('-');
1494 }
1495 prev_dash = true;
1496 } else {
1497 out.push(c);
1498 prev_dash = false;
1499 }
1500 }
1501 out
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506 use super::*;
1507 use std::path::PathBuf;
1508
1509 #[test]
1510 fn matches_pattern_suffix() {
1511 assert!(matches_pattern("foo.md", "*.md"));
1512 assert!(!matches_pattern("foo.txt", "*.md"));
1513 assert!(matches_pattern("foo.md", "*"));
1514 }
1515
1516 #[test]
1517 fn matches_pattern_prefix() {
1518 assert!(matches_pattern("README.md", "README*"));
1519 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1520 }
1521
1522 #[test]
1523 fn matches_pattern_exact() {
1524 assert!(matches_pattern("README.md", "README.md"));
1525 assert!(!matches_pattern("readme.md", "README.md"));
1526 }
1527
1528 #[test]
1529 fn derive_kebab_underscore_to_dash() {
1530 let p = PathBuf::from("/tmp/claude_code_headless.md");
1531 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1532 assert_eq!(name, "claude-code-headless");
1533 assert!(!truncated);
1534 assert!(original.is_none());
1535 }
1536
1537 #[test]
1538 fn derive_kebab_uppercase_lowered() {
1539 let p = PathBuf::from("/tmp/README.md");
1540 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1541 assert_eq!(name, "readme");
1542 assert!(!truncated);
1543 assert!(original.is_none());
1544 }
1545
1546 #[test]
1547 fn derive_kebab_strips_non_kebab_chars() {
1548 let p = PathBuf::from("/tmp/some@weird#name!.md");
1549 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1550 assert_eq!(name, "someweirdname");
1551 assert!(!truncated);
1552 assert!(original.is_none());
1553 }
1554
1555 #[test]
1558 fn derive_kebab_folds_accented_letters_to_ascii() {
1559 let p = PathBuf::from("/tmp/açaí.md");
1560 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1561 assert_eq!(name, "acai", "got '{name}'");
1562 }
1563
1564 #[test]
1565 fn derive_kebab_handles_naive_with_diaeresis() {
1566 let p = PathBuf::from("/tmp/naïve-test.md");
1567 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1568 assert_eq!(name, "naive-test", "got '{name}'");
1569 }
1570
1571 #[test]
1572 fn derive_kebab_drops_emoji_keeps_word() {
1573 let p = PathBuf::from("/tmp/🚀-rocket.md");
1574 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1575 assert_eq!(name, "rocket", "got '{name}'");
1576 }
1577
1578 #[test]
1579 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1580 let p = PathBuf::from("/tmp/açaí🦜.md");
1581 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1582 assert_eq!(name, "acai", "got '{name}'");
1583 }
1584
1585 #[test]
1586 fn derive_kebab_pure_emoji_yields_empty() {
1587 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1588 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1589 assert!(name.is_empty(), "got '{name}'");
1590 }
1591
1592 #[test]
1593 fn derive_kebab_collapses_consecutive_dashes() {
1594 let p = PathBuf::from("/tmp/a__b___c.md");
1595 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1596 assert_eq!(name, "a-b-c");
1597 assert!(!truncated);
1598 assert!(original.is_none());
1599 }
1600
1601 #[test]
1602 fn derive_kebab_truncates_to_60_chars() {
1603 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1604 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1605 assert!(name.len() <= 60, "got len {}", name.len());
1606 assert!(truncated);
1607 assert!(original.is_some());
1608 assert!(original.unwrap().len() > 60);
1609 }
1610
1611 #[test]
1612 fn collect_files_finds_md_files() {
1613 let tmp = tempfile::tempdir().expect("tempdir");
1614 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1615 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1616 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1617 let mut out = Vec::new();
1618 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1619 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1620 }
1621
1622 #[test]
1623 fn collect_files_recursive_descends_subdirs() {
1624 let tmp = tempfile::tempdir().expect("tempdir");
1625 let sub = tmp.path().join("sub");
1626 std::fs::create_dir(&sub).unwrap();
1627 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1628 std::fs::write(sub.join("b.md"), "y").unwrap();
1629 let mut out = Vec::new();
1630 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1631 assert_eq!(out.len(), 2);
1632 }
1633
1634 #[test]
1635 fn collect_files_non_recursive_skips_subdirs() {
1636 let tmp = tempfile::tempdir().expect("tempdir");
1637 let sub = tmp.path().join("sub");
1638 std::fs::create_dir(&sub).unwrap();
1639 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1640 std::fs::write(sub.join("b.md"), "y").unwrap();
1641 let mut out = Vec::new();
1642 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1643 assert_eq!(out.len(), 1);
1644 }
1645
1646 #[test]
1649 fn derive_kebab_long_basename_truncated_within_cap() {
1650 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1651 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1652 assert!(
1653 name.len() <= DERIVED_NAME_MAX_LEN,
1654 "truncated name must respect cap; got {} chars",
1655 name.len()
1656 );
1657 assert!(!name.is_empty());
1658 assert!(truncated);
1659 assert!(original.is_some());
1660 }
1661
1662 #[test]
1663 fn unique_name_returns_base_when_free() {
1664 let taken: BTreeSet<String> = BTreeSet::new();
1665 let resolved = unique_name("note", &taken).expect("must resolve");
1666 assert_eq!(resolved, "note");
1667 }
1668
1669 #[test]
1670 fn unique_name_appends_first_free_suffix_on_collision() {
1671 let mut taken: BTreeSet<String> = BTreeSet::new();
1672 taken.insert("note".to_string());
1673 taken.insert("note-1".to_string());
1674 let resolved = unique_name("note", &taken).expect("must resolve");
1675 assert_eq!(resolved, "note-2");
1676 }
1677
1678 #[test]
1679 fn unique_name_errors_after_collision_cap() {
1680 let mut taken: BTreeSet<String> = BTreeSet::new();
1681 taken.insert("note".to_string());
1682 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1683 taken.insert(format!("note-{i}"));
1684 }
1685 let err = unique_name("note", &taken).expect_err("must surface error");
1686 assert!(matches!(err, AppError::Validation(_)));
1687 }
1688
1689 #[test]
1692 fn validate_relation_format_accepts_valid_relations() {
1693 use crate::parsers::{is_canonical_relation, validate_relation_format};
1694 assert!(validate_relation_format("applies_to").is_ok());
1695 assert!(validate_relation_format("depends_on").is_ok());
1696 assert!(validate_relation_format("implements").is_ok());
1697 assert!(validate_relation_format("").is_err());
1698 assert!(is_canonical_relation("applies_to"));
1699 assert!(!is_canonical_relation("implements"));
1700 }
1701
1702 use serial_test::serial;
1705
1706 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1708 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1709 let prev = std::env::var(key).ok();
1710 match value {
1711 Some(v) => std::env::set_var(key, v),
1712 None => std::env::remove_var(key),
1713 }
1714 f();
1715 match prev {
1716 Some(p) => std::env::set_var(key, p),
1717 None => std::env::remove_var(key),
1718 }
1719 }
1720
1721 #[test]
1722 #[serial]
1723 fn env_low_memory_enabled_unset_returns_false() {
1724 with_env_var(None, || assert!(!env_low_memory_enabled()));
1725 }
1726
1727 #[test]
1728 #[serial]
1729 fn env_low_memory_enabled_empty_returns_false() {
1730 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1731 }
1732
1733 #[test]
1734 #[serial]
1735 fn env_low_memory_enabled_truthy_values_return_true() {
1736 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1737 with_env_var(Some(v), || {
1738 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1739 });
1740 }
1741 }
1742
1743 #[test]
1744 #[serial]
1745 fn env_low_memory_enabled_falsy_values_return_false() {
1746 for v in ["0", "false", "FALSE", "no", "off"] {
1747 with_env_var(Some(v), || {
1748 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1749 });
1750 }
1751 }
1752
1753 #[test]
1754 #[serial]
1755 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1756 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1757 }
1758
1759 #[test]
1760 #[serial]
1761 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1762 with_env_var(None, || {
1763 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1764 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1765 assert_eq!(resolve_parallelism(true, None), 1);
1766 });
1767 }
1768
1769 #[test]
1770 #[serial]
1771 fn resolve_parallelism_env_forces_one_when_flag_off() {
1772 with_env_var(Some("1"), || {
1773 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1774 assert_eq!(resolve_parallelism(false, None), 1);
1775 });
1776 }
1777
1778 #[test]
1779 #[serial]
1780 fn resolve_parallelism_falsy_env_does_not_override() {
1781 with_env_var(Some("0"), || {
1782 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1783 });
1784 }
1785
1786 #[test]
1787 #[serial]
1788 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1789 with_env_var(None, || {
1790 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1791 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1792 });
1793 }
1794
1795 #[test]
1796 #[serial]
1797 fn resolve_parallelism_default_when_unset() {
1798 with_env_var(None, || {
1799 let p = resolve_parallelism(false, None);
1800 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1801 });
1802 }
1803
1804 #[test]
1805 fn ingest_args_parses_low_memory_flag_via_clap() {
1806 use clap::Parser;
1807 let cli = crate::cli::Cli::try_parse_from([
1810 "sqlite-graphrag",
1811 "ingest",
1812 "/tmp/dummy",
1813 "--type",
1814 "document",
1815 "--low-memory",
1816 ])
1817 .expect("parse must succeed");
1818 match cli.command {
1819 crate::cli::Commands::Ingest(args) => {
1820 assert!(args.low_memory, "--low-memory must set field to true");
1821 }
1822 _ => panic!("expected Ingest subcommand"),
1823 }
1824 }
1825
1826 #[test]
1827 fn ingest_args_low_memory_defaults_false() {
1828 use clap::Parser;
1829 let cli = crate::cli::Cli::try_parse_from([
1830 "sqlite-graphrag",
1831 "ingest",
1832 "/tmp/dummy",
1833 "--type",
1834 "document",
1835 ])
1836 .expect("parse must succeed");
1837 match cli.command {
1838 crate::cli::Commands::Ingest(args) => {
1839 assert!(!args.low_memory, "default must be false");
1840 }
1841 _ => panic!("expected Ingest subcommand"),
1842 }
1843 }
1844}