1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120 #[arg(
121 long,
122 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123 default_value = "fp32",
124 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
125 )]
126 pub gliner_variant: String,
127
128 #[arg(long, default_value_t = false, hide = true)]
130 pub skip_extraction: bool,
131
132 #[arg(long, default_value_t = false)]
134 pub fail_fast: bool,
135
136 #[arg(long, default_value_t = false)]
138 pub dry_run: bool,
139
140 #[arg(long, default_value_t = 10_000)]
142 pub max_files: usize,
143
144 #[arg(long)]
146 pub namespace: Option<String>,
147
148 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150 pub db: Option<String>,
151
152 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153 pub format: JsonOutputFormat,
154
155 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156 pub json: bool,
157
158 #[arg(
160 long,
161 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162 )]
163 pub ingest_parallelism: Option<usize>,
164
165 #[arg(
173 long,
174 default_value_t = false,
175 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176 Recommended for environments with <4 GB available RAM or container/cgroup \
177 constraints. Trade-off: 3-4x longer wall time. Also honored via \
178 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179 )]
180 pub low_memory: bool,
181
182 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185 pub max_rss_mb: u64,
186
187 #[arg(long, default_value_t = 2, value_name = "N",
192 value_parser = clap::value_parser!(u64).range(1..=32),
193 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
194 pub llm_parallelism: u64,
195
196 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
201 help = "Maximum length for derived memory names (default: 60)")]
202 pub max_name_length: usize,
203
204 #[arg(long, value_enum, default_value_t = IngestMode::None)]
206 pub mode: IngestMode,
207
208 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
210 pub claude_binary: Option<std::path::PathBuf>,
211
212 #[arg(long)]
214 pub claude_model: Option<String>,
215
216 #[arg(long, default_value_t = false)]
218 pub resume: bool,
219
220 #[arg(long, default_value_t = false)]
222 pub retry_failed: bool,
223
224 #[arg(long, default_value_t = false)]
226 pub keep_queue: bool,
227
228 #[arg(long, default_value = ".ingest-queue.sqlite")]
230 pub queue_db: String,
231
232 #[arg(long, default_value_t = 60)]
234 pub rate_limit_wait: u64,
235
236 #[arg(long)]
238 pub max_cost_usd: Option<f64>,
239
240 #[arg(
242 long,
243 default_value_t = 300,
244 help = "Timeout in seconds for each claude -p invocation (default: 300)"
245 )]
246 pub claude_timeout: u64,
247
248 #[arg(
250 long,
251 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
252 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
253 )]
254 pub codex_binary: Option<PathBuf>,
255
256 #[arg(
258 long,
259 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
260 )]
261 pub codex_model: Option<String>,
262
263 #[arg(
265 long,
266 default_value_t = 300,
267 help = "Timeout in seconds for each codex exec invocation (default: 300)"
268 )]
269 pub codex_timeout: u64,
270
271 #[arg(long, value_name = "SECONDS")]
274 pub wait_job_singleton: Option<u64>,
275
276 #[arg(long, default_value_t = false)]
279 pub force_job_singleton: bool,
280}
281
282#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
284pub enum IngestMode {
285 None,
287 Gliner,
289 ClaudeCode,
291 Codex,
293}
294
295fn env_low_memory_enabled() -> bool {
300 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
301 Ok(v) if v.is_empty() => false,
302 Ok(v) => match v.to_lowercase().as_str() {
303 "1" | "true" | "yes" | "on" => true,
304 "0" | "false" | "no" | "off" => false,
305 other => {
306 tracing::warn!(
307 target: "ingest",
308 value = %other,
309 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
310 );
311 false
312 }
313 },
314 Err(_) => false,
315 }
316}
317
318fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
330 let env_flag = env_low_memory_enabled();
331 let low_memory = low_memory_flag || env_flag;
332
333 if low_memory {
334 if let Some(n) = ingest_parallelism {
335 if n > 1 {
336 tracing::warn!(
337 target: "ingest",
338 requested = n,
339 "--ingest-parallelism overridden by --low-memory; using 1"
340 );
341 }
342 }
343 if low_memory_flag {
344 tracing::info!(
345 target: "ingest",
346 source = "flag",
347 "low-memory mode enabled: forcing --ingest-parallelism 1"
348 );
349 } else {
350 tracing::info!(
351 target: "ingest",
352 source = "env",
353 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
354 );
355 }
356 return 1;
357 }
358
359 ingest_parallelism
360 .unwrap_or_else(|| {
361 std::thread::available_parallelism()
362 .map(|v| v.get() / 2)
363 .unwrap_or(1)
364 .clamp(1, 4)
365 })
366 .max(1)
367}
368
369#[derive(Serialize)]
370struct IngestFileEvent<'a> {
371 file: &'a str,
372 name: &'a str,
373 status: &'a str,
374 truncated: bool,
376 #[serde(skip_serializing_if = "Option::is_none")]
378 original_name: Option<String>,
379 #[serde(skip_serializing_if = "Option::is_none")]
381 original_filename: Option<&'a str>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 error: Option<String>,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 memory_id: Option<i64>,
386 #[serde(skip_serializing_if = "Option::is_none")]
387 action: Option<String>,
388 body_length: usize,
390}
391
392#[derive(Serialize)]
393struct IngestSummary {
394 summary: bool,
395 dir: String,
396 pattern: String,
397 recursive: bool,
398 files_total: usize,
399 files_succeeded: usize,
400 files_failed: usize,
401 files_skipped: usize,
402 elapsed_ms: u64,
403}
404
405struct FileSuccess {
407 memory_id: i64,
408 action: String,
409 body_length: usize,
410}
411
412#[derive(Serialize)]
415struct StageProgressEvent<'a> {
416 schema_version: u8,
417 event: &'a str,
418 path: &'a str,
419 ms: u64,
420 entities: usize,
421 relationships: usize,
422}
423
424struct StagedFile {
427 body: String,
428 body_hash: String,
429 snippet: String,
430 name: String,
431 description: String,
432 embedding: Vec<f32>,
433 chunk_embeddings: Option<Vec<Vec<f32>>>,
434 chunks_info: Vec<crate::chunking::Chunk>,
435 entities: Vec<NewEntity>,
436 relationships: Vec<NewRelationship>,
437 entity_embeddings: Vec<Vec<f32>>,
438 urls: Vec<crate::extraction::ExtractedUrl>,
439}
440
441#[allow(clippy::too_many_arguments)]
447fn stage_file(
448 _idx: usize,
449 path: &Path,
450 name: &str,
451 paths: &AppPaths,
452 enable_ner: bool,
453 gliner_variant: crate::extraction::GlinerVariant,
454 max_rss_mb: u64,
455 llm_parallelism: usize,
456 llm_backend: crate::cli::LlmBackendChoice,
457) -> Result<StagedFile, AppError> {
458 use crate::constants::*;
459
460 if name.len() > MAX_MEMORY_NAME_LEN {
461 return Err(AppError::LimitExceeded(
462 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
463 ));
464 }
465 if name.starts_with("__") {
466 return Err(AppError::Validation(
467 crate::i18n::validation::reserved_name(),
468 ));
469 }
470 {
471 let slug_re = crate::constants::name_slug_regex();
472 if !slug_re.is_match(name) {
473 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
474 name,
475 )));
476 }
477 }
478
479 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
480 if file_size > MAX_MEMORY_BODY_LEN as u64 {
481 return Err(AppError::LimitExceeded(
482 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
483 ));
484 }
485 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
486 if raw_body.len() > MAX_MEMORY_BODY_LEN {
487 return Err(AppError::LimitExceeded(
488 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
489 ));
490 }
491 if raw_body.trim().is_empty() {
492 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
493 }
494
495 let description = format!("ingested from {}", path.display());
496 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
497 return Err(AppError::Validation(
498 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
499 ));
500 }
501
502 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
503 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
504 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
505 if enable_ner {
506 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
507 Ok(extracted) => {
508 extracted_urls = extracted.urls;
509 extracted_entities = extracted
514 .entities
515 .into_iter()
516 .map(|e| NewEntity {
517 name: e.name,
518 entity_type: crate::entity_type::EntityType::Concept,
519 description: None,
520 })
521 .collect();
522 extracted_relationships.clear();
527
528 if extracted_entities.len() > max_entities_per_memory() {
529 extracted_entities.truncate(max_entities_per_memory());
530 }
531 if extracted_relationships.len() > max_relationships_per_memory() {
532 extracted_relationships.truncate(max_relationships_per_memory());
533 }
534 }
535 Err(e) => {
536 tracing::warn!(
537 target: "ingest",
538 file = %path.display(),
539 "auto-extraction failed (graceful degradation): {e:#}"
540 );
541 }
542 }
543 }
544
545 for rel in &mut extracted_relationships {
546 rel.relation = crate::parsers::normalize_relation(&rel.relation);
547 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
548 return Err(AppError::Validation(format!(
549 "{e} for relationship '{}' -> '{}'",
550 rel.source, rel.target
551 )));
552 }
553 crate::parsers::warn_if_non_canonical(&rel.relation);
554 if !(0.0..=1.0).contains(&rel.strength) {
555 return Err(AppError::Validation(format!(
556 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
557 rel.strength, rel.source, rel.target
558 )));
559 }
560 }
561
562 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
563 let snippet: String = raw_body.chars().take(200).collect();
564
565 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
566 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
567 return Err(AppError::LimitExceeded(format!(
568 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
569 chunks_info.len(),
570 REMEMBER_MAX_SAFE_MULTI_CHUNKS
571 )));
572 }
573
574 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
575 let embedding = if chunks_info.len() == 1 {
576 crate::embedder::embed_passage_with_choice(&paths.models, &raw_body, Some(llm_backend))?
578 } else {
579 let chunk_texts: Vec<String> = chunks_info
582 .iter()
583 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
584 .collect();
585 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
586 if rss > max_rss_mb {
587 tracing::error!(
588 target: "ingest",
589 rss_mb = rss,
590 max_rss_mb = max_rss_mb,
591 file = %path.display(),
592 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
593 );
594 return Err(AppError::LowMemory {
595 available_mb: crate::memory_guard::available_memory_mb(),
596 required_mb: max_rss_mb,
597 });
598 }
599 }
600 let chunk_embeddings = crate::embedder::embed_passages_parallel_local(
601 &paths.models,
602 &chunk_texts,
603 llm_parallelism,
604 crate::embedder::chunk_embed_batch_size(),
605 )?;
606 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
607 chunk_embeddings_opt = Some(chunk_embeddings);
608 aggregated
609 };
610
611 let entity_texts: Vec<String> = extracted_entities
613 .iter()
614 .map(|entity| match &entity.description {
615 Some(desc) => format!("{} {}", entity.name, desc),
616 None => entity.name.clone(),
617 })
618 .collect();
619 let (entity_embeddings, embed_cache_stats) =
623 crate::embedder::embed_entity_texts_cached(&paths.models, &entity_texts, llm_parallelism)?;
624 if embed_cache_stats.hits > 0 {
625 tracing::debug!(
626 hits = embed_cache_stats.hits,
627 misses = embed_cache_stats.misses,
628 requested = embed_cache_stats.requested,
629 "G56: entity embed cache hit (ingest)"
630 );
631 }
632
633 Ok(StagedFile {
634 body: raw_body,
635 body_hash,
636 snippet,
637 name: name.to_string(),
638 description,
639 embedding,
640 chunk_embeddings: chunk_embeddings_opt,
641 chunks_info,
642 entities: extracted_entities,
643 relationships: extracted_relationships,
644 entity_embeddings,
645 urls: extracted_urls,
646 })
647}
648
649fn persist_staged(
651 conn: &mut Connection,
652 namespace: &str,
653 memory_type: &str,
654 staged: StagedFile,
655) -> Result<FileSuccess, AppError> {
656 {
657 let active_count: u32 = conn.query_row(
658 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
659 [],
660 |r| r.get::<_, i64>(0).map(|v| v as u32),
661 )?;
662 let ns_exists: bool = conn.query_row(
663 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
664 rusqlite::params![namespace],
665 |r| r.get::<_, i64>(0).map(|v| v > 0),
666 )?;
667 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
668 return Err(AppError::NamespaceError(format!(
669 "active namespace limit of {} exceeded while creating '{namespace}'",
670 crate::constants::MAX_NAMESPACES_ACTIVE
671 )));
672 }
673 }
674
675 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
676 if existing_memory.is_some() {
677 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
678 &staged.name,
679 namespace,
680 )));
681 }
682 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
683
684 let new_memory = NewMemory {
685 namespace: namespace.to_string(),
686 name: staged.name.clone(),
687 memory_type: memory_type.to_string(),
688 description: staged.description.clone(),
689 body: staged.body,
690 body_hash: staged.body_hash,
691 session_id: None,
692 source: "agent".to_string(),
693 metadata: serde_json::json!({}),
694 };
695
696 if let Some(hash_id) = duplicate_hash_id {
697 tracing::debug!(
698 target: "ingest",
699 duplicate_memory_id = hash_id,
700 "identical body already exists; persisting a new memory anyway"
701 );
702 }
703
704 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
705
706 let memory_id = memories::insert(&tx, &new_memory)?;
707 versions::insert_version(
708 &tx,
709 memory_id,
710 1,
711 &staged.name,
712 memory_type,
713 &staged.description,
714 &new_memory.body,
715 &serde_json::to_string(&new_memory.metadata)?,
716 None,
717 "create",
718 )?;
719 memories::upsert_vec(
720 &tx,
721 memory_id,
722 namespace,
723 memory_type,
724 &staged.embedding,
725 &staged.name,
726 &staged.snippet,
727 )?;
728
729 if staged.chunks_info.len() > 1 {
730 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
731 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
732 AppError::Internal(anyhow::anyhow!(
733 "missing chunk embeddings cache on multi-chunk ingest path"
734 ))
735 })?;
736 for (i, emb) in chunk_embeddings.iter().enumerate() {
737 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
738 }
739 }
740
741 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
742 for (idx, entity) in staged.entities.iter().enumerate() {
743 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
744 let entity_embedding = &staged.entity_embeddings[idx];
745 entities::upsert_entity_vec(
746 &tx,
747 entity_id,
748 namespace,
749 entity.entity_type,
750 entity_embedding,
751 &entity.name,
752 )?;
753 entities::link_memory_entity(&tx, memory_id, entity_id)?;
754 entities::increment_degree(&tx, entity_id)?;
755 }
756 let entity_types: std::collections::HashMap<&str, EntityType> = staged
757 .entities
758 .iter()
759 .map(|entity| (entity.name.as_str(), entity.entity_type))
760 .collect();
761 for rel in &staged.relationships {
762 let source_entity = NewEntity {
763 name: rel.source.clone(),
764 entity_type: entity_types
765 .get(rel.source.as_str())
766 .copied()
767 .unwrap_or(EntityType::Concept),
768 description: None,
769 };
770 let target_entity = NewEntity {
771 name: rel.target.clone(),
772 entity_type: entity_types
773 .get(rel.target.as_str())
774 .copied()
775 .unwrap_or(EntityType::Concept),
776 description: None,
777 };
778 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
779 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
780 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
781 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
782 }
783 }
784
785 tx.commit()?;
786
787 if !staged.urls.is_empty() {
788 let url_entries: Vec<storage_urls::MemoryUrl> = staged
789 .urls
790 .into_iter()
791 .map(|u| storage_urls::MemoryUrl {
792 url: u.url,
793 offset: Some(u.start as i64),
794 })
795 .collect();
796 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
797 }
798
799 Ok(FileSuccess {
800 memory_id,
801 action: "created".to_string(),
802 body_length: new_memory.body.len(),
803 })
804}
805
806fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
814 value == default
815}
816
817fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
832 const DEFAULT_TIMEOUT: u64 = 300;
833 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
834
835 let mut conflicts: Vec<String> = Vec::new();
836
837 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
838
839 if is_local_mode {
840 if args.claude_binary.is_some() {
841 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
842 }
843 if args.claude_model.is_some() {
844 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
845 }
846 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
847 conflicts.push(format!(
848 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
849 args.claude_timeout
850 ));
851 }
852 if args.codex_binary.is_some() {
853 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
854 }
855 if args.codex_model.is_some() {
856 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
857 }
858 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
859 conflicts.push(format!(
860 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
861 args.codex_timeout
862 ));
863 }
864 if args.max_cost_usd.is_some() {
865 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
866 }
867 if args.resume {
868 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
869 }
870 if args.retry_failed {
871 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
872 }
873 if args.keep_queue {
874 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
875 }
876 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
877 conflicts.push(format!(
878 "--rate-limit-wait={} is ignored when --mode is none or gliner",
879 args.rate_limit_wait
880 ));
881 }
882 }
883
884 match args.mode {
885 IngestMode::ClaudeCode => {
886 if args.codex_binary.is_some() {
887 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
888 }
889 if args.codex_model.is_some() {
890 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
891 }
892 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
893 conflicts.push(format!(
894 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
895 args.codex_timeout
896 ));
897 }
898 }
899 IngestMode::Codex => {
900 if args.claude_binary.is_some() {
901 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
902 }
903 if args.claude_model.is_some() {
904 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
905 }
906 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
907 conflicts.push(format!(
908 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
909 args.claude_timeout
910 ));
911 }
912 if args.max_cost_usd.is_some() {
913 conflicts.push(
914 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
915 .to_string(),
916 );
917 }
918 if args.resume {
919 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
920 }
921 if args.retry_failed {
922 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
923 }
924 if args.keep_queue {
925 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
926 }
927 }
928 IngestMode::None | IngestMode::Gliner => {}
929 }
930
931 if !conflicts.is_empty() {
932 return Err(AppError::Validation(format!(
933 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
934 args.mode,
935 conflicts.join("\n - ")
936 )));
937 }
938
939 Ok(())
940}
941
942#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
945pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
946 validate_mode_conditional_flags_ingest(&args)?;
949 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
950 if args.mode == IngestMode::ClaudeCode {
951 return super::ingest_claude::run_claude_ingest(&args);
952 }
953 if args.mode == IngestMode::Codex {
954 return super::ingest_codex::run_codex_ingest(&args);
955 }
956
957 let started = std::time::Instant::now();
958
959 if !args.dir.exists() {
960 return Err(AppError::Validation(format!(
961 "directory not found: {}",
962 args.dir.display()
963 )));
964 }
965 if !args.dir.is_dir() {
966 return Err(AppError::Validation(format!(
967 "path is not a directory: {}",
968 args.dir.display()
969 )));
970 }
971
972 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
973 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
974 files.sort_unstable();
975
976 if files.len() > args.max_files {
977 return Err(AppError::Validation(format!(
978 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
979 files.len(),
980 args.max_files
981 )));
982 }
983
984 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
985 let memory_type_str = args.r#type.as_str().to_string();
986
987 let paths = AppPaths::resolve(args.db.as_deref())?;
988 let mut conn_or_err = match init_storage(&paths) {
989 Ok(c) => Ok(c),
990 Err(e) => Err(format!("{e}")),
991 };
992
993 let mut succeeded: usize = 0;
994 let mut failed: usize = 0;
995 let mut skipped: usize = 0;
996 let total = files.len();
997
998 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1001
1002 enum SlotMeta {
1008 Skip {
1009 file_str: String,
1010 derived_base: String,
1011 name_truncated: bool,
1012 original_name: Option<String>,
1013 original_filename: Option<String>,
1014 reason: String,
1015 },
1016 Process {
1017 file_str: String,
1018 derived_name: String,
1019 name_truncated: bool,
1020 original_name: Option<String>,
1021 original_filename: Option<String>,
1022 },
1023 }
1024
1025 struct ProcessItem {
1026 idx: usize,
1027 path: PathBuf,
1028 file_str: String,
1029 derived_name: String,
1030 }
1031
1032 let files_cap = files.len();
1033 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1034 slots_meta.try_reserve(files_cap).map_err(|_| {
1035 AppError::LimitExceeded(format!(
1036 "allocation of {files_cap} slot metadata entries would exceed available memory"
1037 ))
1038 })?;
1039 let mut process_items: Vec<ProcessItem> = Vec::new();
1040 process_items.try_reserve(files_cap).map_err(|_| {
1041 AppError::LimitExceeded(format!(
1042 "allocation of {files_cap} process items would exceed available memory"
1043 ))
1044 })?;
1045 let mut truncations: Vec<(String, String)> = Vec::new();
1046 truncations.try_reserve(files_cap).map_err(|_| {
1047 AppError::LimitExceeded(format!(
1048 "allocation of {files_cap} truncation entries would exceed available memory"
1049 ))
1050 })?;
1051
1052 let max_name_length = args.max_name_length;
1053 for path in &files {
1054 let file_str = path.to_string_lossy().into_owned();
1055 let (derived_base, name_truncated, original_name) =
1056 derive_kebab_name(path, max_name_length);
1057 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1058
1059 if name_truncated {
1060 if let Some(ref orig) = original_name {
1061 truncations.push((orig.clone(), derived_base.clone()));
1062 }
1063 }
1064
1065 if derived_base.is_empty() {
1066 let orig_filename = if !original_basename.is_empty() {
1068 Some(original_basename.to_string())
1069 } else {
1070 None
1071 };
1072 slots_meta.push(SlotMeta::Skip {
1073 file_str,
1074 derived_base: String::new(),
1075 name_truncated: false,
1076 original_name: None,
1077 original_filename: orig_filename,
1078 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1079 });
1080 continue;
1081 }
1082
1083 match unique_name(&derived_base, &taken_names) {
1084 Ok(derived_name) => {
1085 taken_names.insert(derived_name.clone());
1086 let idx = slots_meta.len();
1087 let orig_filename = if original_basename != derived_name {
1089 Some(original_basename.to_string())
1090 } else {
1091 None
1092 };
1093 process_items.push(ProcessItem {
1094 idx,
1095 path: path.clone(),
1096 file_str: file_str.clone(),
1097 derived_name: derived_name.clone(),
1098 });
1099 slots_meta.push(SlotMeta::Process {
1100 file_str,
1101 derived_name,
1102 name_truncated,
1103 original_name,
1104 original_filename: orig_filename,
1105 });
1106 }
1107 Err(e) => {
1108 let orig_filename = if original_basename != derived_base {
1109 Some(original_basename.to_string())
1110 } else {
1111 None
1112 };
1113 slots_meta.push(SlotMeta::Skip {
1114 file_str,
1115 derived_base,
1116 name_truncated,
1117 original_name,
1118 original_filename: orig_filename,
1119 reason: e.to_string(),
1120 });
1121 }
1122 }
1123 }
1124
1125 if !truncations.is_empty() {
1126 tracing::info!(
1127 target: "ingest",
1128 count = truncations.len(),
1129 max_name_length = max_name_length,
1130 max_len = DERIVED_NAME_MAX_LEN,
1131 "derived names truncated; pass -vv (debug) for per-file detail"
1132 );
1133 }
1134
1135 if args.dry_run {
1137 for meta in &slots_meta {
1138 match meta {
1139 SlotMeta::Skip {
1140 file_str,
1141 derived_base,
1142 name_truncated,
1143 original_name,
1144 original_filename,
1145 reason,
1146 } => {
1147 output::emit_json_compact(&IngestFileEvent {
1148 file: file_str,
1149 name: derived_base,
1150 status: "skip",
1151 truncated: *name_truncated,
1152 original_name: original_name.clone(),
1153 original_filename: original_filename.as_deref(),
1154 error: Some(reason.clone()),
1155 memory_id: None,
1156 action: None,
1157 body_length: 0,
1158 })?;
1159 }
1160 SlotMeta::Process {
1161 file_str,
1162 derived_name,
1163 name_truncated,
1164 original_name,
1165 original_filename,
1166 } => {
1167 output::emit_json_compact(&IngestFileEvent {
1168 file: file_str,
1169 name: derived_name,
1170 status: "preview",
1171 truncated: *name_truncated,
1172 original_name: original_name.clone(),
1173 original_filename: original_filename.as_deref(),
1174 error: None,
1175 memory_id: None,
1176 action: None,
1177 body_length: 0,
1178 })?;
1179 }
1180 }
1181 }
1182 output::emit_json_compact(&IngestSummary {
1183 summary: true,
1184 dir: args.dir.to_string_lossy().into_owned(),
1185 pattern: args.pattern.clone(),
1186 recursive: args.recursive,
1187 files_total: total,
1188 files_succeeded: 0,
1189 files_failed: 0,
1190 files_skipped: 0,
1191 elapsed_ms: started.elapsed().as_millis() as u64,
1192 })?;
1193 return Ok(());
1194 }
1195
1196 if args.low_memory {
1198 if let Some(n) = args.ingest_parallelism {
1199 if n > 1 {
1200 return Err(AppError::Validation(
1201 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1202 .to_string(),
1203 ));
1204 }
1205 }
1206 }
1207
1208 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1211
1212 let pool = rayon::ThreadPoolBuilder::new()
1213 .num_threads(parallelism)
1214 .build()
1215 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1216
1217 if args.enable_ner && args.skip_extraction {
1218 return Err(AppError::Validation(
1219 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1220 ));
1221 }
1222 if args.skip_extraction && !args.enable_ner {
1223 tracing::warn!(
1230 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1231 );
1232 }
1233 let enable_ner = args.enable_ner;
1234 let max_rss_mb = args.max_rss_mb;
1235 let llm_parallelism = args.llm_parallelism as usize;
1236 if args.mode == IngestMode::Gliner {
1240 tracing::warn!(
1241 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1242 );
1243 }
1244 if args.gliner_variant != "fp32" {
1245 tracing::warn!(
1246 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1247 );
1248 }
1249 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1250 "int8" => crate::extraction::GlinerVariant::Int8,
1251 _ => crate::extraction::GlinerVariant::Fp32,
1252 };
1253
1254 let total_to_process = process_items.len();
1255 tracing::info!(
1256 target: "ingest",
1257 phase = "pipeline_start",
1258 files = total_to_process,
1259 ingest_parallelism = parallelism,
1260 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1261 );
1262
1263 let channel_bound = (parallelism * 2).max(1);
1267 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1268
1269 let paths_owned = paths.clone();
1274 let llm_backend_owned = llm_backend;
1275 let producer_handle = std::thread::spawn(move || {
1276 pool.install(|| {
1277 process_items.into_par_iter().for_each(|item| {
1278 if crate::shutdown_requested() {
1279 return;
1280 }
1281 let t0 = std::time::Instant::now();
1282 let result = stage_file(
1283 item.idx,
1284 &item.path,
1285 &item.derived_name,
1286 &paths_owned,
1287 enable_ner,
1288 gliner_variant,
1289 max_rss_mb,
1290 llm_parallelism,
1291 llm_backend_owned,
1292 );
1293 let elapsed_ms = t0.elapsed().as_millis() as u64;
1294
1295 let (n_entities, n_relationships) = match &result {
1298 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1299 Err(_) => (0, 0),
1300 };
1301 let progress = StageProgressEvent {
1302 schema_version: 1,
1303 event: "file_extracted",
1304 path: &item.file_str,
1305 ms: elapsed_ms,
1306 entities: n_entities,
1307 relationships: n_relationships,
1308 };
1309 if let Ok(line) = serde_json::to_string(&progress) {
1310 tracing::info!(target: "ingest_progress", "{}", line);
1311 }
1312
1313 let _ = tx.send((item.idx, result));
1317 });
1318 drop(tx);
1320 });
1321 });
1322
1323 let fail_fast = args.fail_fast;
1335
1336 for meta in &slots_meta {
1338 if let SlotMeta::Skip {
1339 file_str,
1340 derived_base,
1341 name_truncated,
1342 original_name,
1343 original_filename,
1344 reason,
1345 } = meta
1346 {
1347 output::emit_json_compact(&IngestFileEvent {
1348 file: file_str,
1349 name: derived_base,
1350 status: "skipped",
1351 truncated: *name_truncated,
1352 original_name: original_name.clone(),
1353 original_filename: original_filename.as_deref(),
1354 error: Some(reason.clone()),
1355 memory_id: None,
1356 action: None,
1357 body_length: 0,
1358 })?;
1359 skipped += 1;
1360 }
1361 }
1362
1363 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1366 .iter()
1367 .enumerate()
1368 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1369 .collect();
1370
1371 tracing::info!(
1372 target: "ingest",
1373 phase = "persist_start",
1374 files = total_to_process,
1375 "phase B starting: persisting files incrementally as Phase A completes each one",
1376 );
1377
1378 for (idx, stage_result) in rx {
1382 if crate::shutdown_requested() {
1383 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1384 break;
1385 }
1386 let meta = meta_index.get(&idx).ok_or_else(|| {
1387 AppError::Internal(anyhow::anyhow!(
1388 "channel idx {idx} has no corresponding Process slot"
1389 ))
1390 })?;
1391 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1392 {
1393 SlotMeta::Process {
1394 file_str,
1395 derived_name,
1396 name_truncated,
1397 original_name,
1398 original_filename,
1399 } => (
1400 file_str,
1401 derived_name,
1402 name_truncated,
1403 original_name,
1404 original_filename,
1405 ),
1406 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1407 };
1408
1409 let conn = match conn_or_err.as_mut() {
1411 Ok(c) => c,
1412 Err(err_msg) => {
1413 let err_clone = err_msg.clone();
1414 output::emit_json_compact(&IngestFileEvent {
1415 file: file_str,
1416 name: derived_name,
1417 status: "failed",
1418 truncated: *name_truncated,
1419 original_name: original_name.clone(),
1420 original_filename: original_filename.as_deref(),
1421 error: Some(err_clone.clone()),
1422 memory_id: None,
1423 action: None,
1424 body_length: 0,
1425 })?;
1426 failed += 1;
1427 if fail_fast {
1428 output::emit_json_compact(&IngestSummary {
1429 summary: true,
1430 dir: args.dir.display().to_string(),
1431 pattern: args.pattern.clone(),
1432 recursive: args.recursive,
1433 files_total: total,
1434 files_succeeded: succeeded,
1435 files_failed: failed,
1436 files_skipped: skipped,
1437 elapsed_ms: started.elapsed().as_millis() as u64,
1438 })?;
1439 return Err(AppError::Validation(format!(
1440 "ingest aborted on first failure: {err_clone}"
1441 )));
1442 }
1443 continue;
1444 }
1445 };
1446
1447 let outcome =
1448 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1449
1450 match outcome {
1451 Ok(FileSuccess {
1452 memory_id,
1453 action,
1454 body_length,
1455 }) => {
1456 output::emit_json_compact(&IngestFileEvent {
1457 file: file_str,
1458 name: derived_name,
1459 status: "indexed",
1460 truncated: *name_truncated,
1461 original_name: original_name.clone(),
1462 original_filename: original_filename.as_deref(),
1463 error: None,
1464 memory_id: Some(memory_id),
1465 action: Some(action),
1466 body_length,
1467 })?;
1468 succeeded += 1;
1469 }
1470 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1471 output::emit_json_compact(&IngestFileEvent {
1472 file: file_str,
1473 name: derived_name,
1474 status: "skipped",
1475 truncated: *name_truncated,
1476 original_name: original_name.clone(),
1477 original_filename: original_filename.as_deref(),
1478 error: Some(format!("{e}")),
1479 memory_id: None,
1480 action: Some("duplicate".to_string()),
1481 body_length: 0,
1482 })?;
1483 skipped += 1;
1484 }
1485 Err(e) => {
1486 let err_msg = format!("{e}");
1487 output::emit_json_compact(&IngestFileEvent {
1488 file: file_str,
1489 name: derived_name,
1490 status: "failed",
1491 truncated: *name_truncated,
1492 original_name: original_name.clone(),
1493 original_filename: original_filename.as_deref(),
1494 error: Some(err_msg.clone()),
1495 memory_id: None,
1496 action: None,
1497 body_length: 0,
1498 })?;
1499 failed += 1;
1500 if fail_fast {
1501 output::emit_json_compact(&IngestSummary {
1502 summary: true,
1503 dir: args.dir.display().to_string(),
1504 pattern: args.pattern.clone(),
1505 recursive: args.recursive,
1506 files_total: total,
1507 files_succeeded: succeeded,
1508 files_failed: failed,
1509 files_skipped: skipped,
1510 elapsed_ms: started.elapsed().as_millis() as u64,
1511 })?;
1512 return Err(AppError::Validation(format!(
1513 "ingest aborted on first failure: {err_msg}"
1514 )));
1515 }
1516 }
1517 }
1518 }
1519
1520 producer_handle
1522 .join()
1523 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1524
1525 if let Ok(ref conn) = conn_or_err {
1526 if succeeded > 0 {
1527 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1528 }
1529 }
1530
1531 output::emit_json_compact(&IngestSummary {
1532 summary: true,
1533 dir: args.dir.display().to_string(),
1534 pattern: args.pattern.clone(),
1535 recursive: args.recursive,
1536 files_total: total,
1537 files_succeeded: succeeded,
1538 files_failed: failed,
1539 files_skipped: skipped,
1540 elapsed_ms: started.elapsed().as_millis() as u64,
1541 })?;
1542
1543 Ok(())
1544}
1545
1546fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1552 ensure_db_ready(paths)?;
1553 let conn = open_rw(&paths.db)?;
1554 Ok(conn)
1555}
1556
1557pub(crate) fn collect_files(
1558 dir: &Path,
1559 pattern: &str,
1560 recursive: bool,
1561 out: &mut Vec<PathBuf>,
1562) -> Result<(), AppError> {
1563 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1564 for entry in entries {
1565 let entry = entry.map_err(AppError::Io)?;
1566 let path = entry.path();
1567 let file_type = entry.file_type().map_err(AppError::Io)?;
1568 if file_type.is_file() {
1569 let name = entry.file_name();
1570 let name_str = name.to_string_lossy();
1571 if matches_pattern(&name_str, pattern) {
1572 out.push(path);
1573 }
1574 } else if file_type.is_dir() && recursive {
1575 collect_files(&path, pattern, recursive, out)?;
1576 }
1577 }
1578 Ok(())
1579}
1580
1581fn matches_pattern(name: &str, pattern: &str) -> bool {
1582 if let Some(suffix) = pattern.strip_prefix('*') {
1583 name.ends_with(suffix)
1584 } else if let Some(prefix) = pattern.strip_suffix('*') {
1585 name.starts_with(prefix)
1586 } else {
1587 name == pattern
1588 }
1589}
1590
1591pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1602 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1603 let lowered: String = stem
1604 .nfd()
1605 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1606 .map(|c| {
1607 if c == '_' || c.is_whitespace() {
1608 '-'
1609 } else {
1610 c
1611 }
1612 })
1613 .map(|c| c.to_ascii_lowercase())
1614 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1615 .collect();
1616 let collapsed = collapse_dashes(&lowered);
1617 let trimmed_raw = collapsed.trim_matches('-').to_string();
1618 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1620 format!("doc-{trimmed_raw}")
1621 } else {
1622 trimmed_raw
1623 };
1624 if trimmed.len() > max_len {
1625 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1626 tracing::debug!(
1627 target: "ingest",
1628 original = %trimmed,
1629 truncated_to = %truncated,
1630 max_len = max_len,
1631 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1632 );
1633 (truncated, true, Some(trimmed))
1634 } else {
1635 (trimmed, false, None)
1636 }
1637}
1638
1639fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1652 if !taken.contains(base) {
1653 return Ok(base.to_string());
1654 }
1655 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1656 let candidate = format!("{base}-{suffix}");
1657 if !taken.contains(&candidate) {
1658 tracing::warn!(
1659 target: "ingest",
1660 base = %base,
1661 resolved = %candidate,
1662 suffix,
1663 "memory name collision resolved with numeric suffix"
1664 );
1665 return Ok(candidate);
1666 }
1667 }
1668 Err(AppError::Validation(format!(
1669 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1670 )))
1671}
1672
1673fn collapse_dashes(s: &str) -> String {
1674 let mut out = String::with_capacity(s.len());
1675 let mut prev_dash = false;
1676 for c in s.chars() {
1677 if c == '-' {
1678 if !prev_dash {
1679 out.push('-');
1680 }
1681 prev_dash = true;
1682 } else {
1683 out.push(c);
1684 prev_dash = false;
1685 }
1686 }
1687 out
1688}
1689
1690#[cfg(test)]
1691mod tests {
1692 use super::*;
1693 use std::path::PathBuf;
1694
1695 #[test]
1696 fn matches_pattern_suffix() {
1697 assert!(matches_pattern("foo.md", "*.md"));
1698 assert!(!matches_pattern("foo.txt", "*.md"));
1699 assert!(matches_pattern("foo.md", "*"));
1700 }
1701
1702 #[test]
1703 fn matches_pattern_prefix() {
1704 assert!(matches_pattern("README.md", "README*"));
1705 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1706 }
1707
1708 #[test]
1709 fn matches_pattern_exact() {
1710 assert!(matches_pattern("README.md", "README.md"));
1711 assert!(!matches_pattern("readme.md", "README.md"));
1712 }
1713
1714 #[test]
1715 fn derive_kebab_underscore_to_dash() {
1716 let p = PathBuf::from("/tmp/claude_code_headless.md");
1717 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1718 assert_eq!(name, "claude-code-headless");
1719 assert!(!truncated);
1720 assert!(original.is_none());
1721 }
1722
1723 #[test]
1724 fn derive_kebab_uppercase_lowered() {
1725 let p = PathBuf::from("/tmp/README.md");
1726 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1727 assert_eq!(name, "readme");
1728 assert!(!truncated);
1729 assert!(original.is_none());
1730 }
1731
1732 #[test]
1733 fn derive_kebab_strips_non_kebab_chars() {
1734 let p = PathBuf::from("/tmp/some@weird#name!.md");
1735 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1736 assert_eq!(name, "someweirdname");
1737 assert!(!truncated);
1738 assert!(original.is_none());
1739 }
1740
1741 #[test]
1744 fn derive_kebab_folds_accented_letters_to_ascii() {
1745 let p = PathBuf::from("/tmp/açaí.md");
1746 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1747 assert_eq!(name, "acai", "got '{name}'");
1748 }
1749
1750 #[test]
1751 fn derive_kebab_handles_naive_with_diaeresis() {
1752 let p = PathBuf::from("/tmp/naïve-test.md");
1753 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1754 assert_eq!(name, "naive-test", "got '{name}'");
1755 }
1756
1757 #[test]
1758 fn derive_kebab_drops_emoji_keeps_word() {
1759 let p = PathBuf::from("/tmp/🚀-rocket.md");
1760 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1761 assert_eq!(name, "rocket", "got '{name}'");
1762 }
1763
1764 #[test]
1765 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1766 let p = PathBuf::from("/tmp/açaí🦜.md");
1767 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1768 assert_eq!(name, "acai", "got '{name}'");
1769 }
1770
1771 #[test]
1772 fn derive_kebab_pure_emoji_yields_empty() {
1773 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1774 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1775 assert!(name.is_empty(), "got '{name}'");
1776 }
1777
1778 #[test]
1779 fn derive_kebab_collapses_consecutive_dashes() {
1780 let p = PathBuf::from("/tmp/a__b___c.md");
1781 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1782 assert_eq!(name, "a-b-c");
1783 assert!(!truncated);
1784 assert!(original.is_none());
1785 }
1786
1787 #[test]
1788 fn derive_kebab_truncates_to_60_chars() {
1789 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1790 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1791 assert!(name.len() <= 60, "got len {}", name.len());
1792 assert!(truncated);
1793 assert!(original.is_some());
1794 assert!(original.unwrap().len() > 60);
1795 }
1796
1797 #[test]
1798 fn collect_files_finds_md_files() {
1799 let tmp = tempfile::tempdir().expect("tempdir");
1800 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1801 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1802 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1803 let mut out = Vec::new();
1804 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1805 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1806 }
1807
1808 #[test]
1809 fn collect_files_recursive_descends_subdirs() {
1810 let tmp = tempfile::tempdir().expect("tempdir");
1811 let sub = tmp.path().join("sub");
1812 std::fs::create_dir(&sub).unwrap();
1813 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1814 std::fs::write(sub.join("b.md"), "y").unwrap();
1815 let mut out = Vec::new();
1816 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1817 assert_eq!(out.len(), 2);
1818 }
1819
1820 #[test]
1821 fn collect_files_non_recursive_skips_subdirs() {
1822 let tmp = tempfile::tempdir().expect("tempdir");
1823 let sub = tmp.path().join("sub");
1824 std::fs::create_dir(&sub).unwrap();
1825 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1826 std::fs::write(sub.join("b.md"), "y").unwrap();
1827 let mut out = Vec::new();
1828 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1829 assert_eq!(out.len(), 1);
1830 }
1831
1832 #[test]
1835 fn derive_kebab_long_basename_truncated_within_cap() {
1836 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1837 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1838 assert!(
1839 name.len() <= DERIVED_NAME_MAX_LEN,
1840 "truncated name must respect cap; got {} chars",
1841 name.len()
1842 );
1843 assert!(!name.is_empty());
1844 assert!(truncated);
1845 assert!(original.is_some());
1846 }
1847
1848 #[test]
1849 fn unique_name_returns_base_when_free() {
1850 let taken: BTreeSet<String> = BTreeSet::new();
1851 let resolved = unique_name("note", &taken).expect("must resolve");
1852 assert_eq!(resolved, "note");
1853 }
1854
1855 #[test]
1856 fn unique_name_appends_first_free_suffix_on_collision() {
1857 let mut taken: BTreeSet<String> = BTreeSet::new();
1858 taken.insert("note".to_string());
1859 taken.insert("note-1".to_string());
1860 let resolved = unique_name("note", &taken).expect("must resolve");
1861 assert_eq!(resolved, "note-2");
1862 }
1863
1864 #[test]
1865 fn unique_name_errors_after_collision_cap() {
1866 let mut taken: BTreeSet<String> = BTreeSet::new();
1867 taken.insert("note".to_string());
1868 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1869 taken.insert(format!("note-{i}"));
1870 }
1871 let err = unique_name("note", &taken).expect_err("must surface error");
1872 assert!(matches!(err, AppError::Validation(_)));
1873 }
1874
1875 #[test]
1878 fn validate_relation_format_accepts_valid_relations() {
1879 use crate::parsers::{is_canonical_relation, validate_relation_format};
1880 assert!(validate_relation_format("applies_to").is_ok());
1881 assert!(validate_relation_format("depends_on").is_ok());
1882 assert!(validate_relation_format("implements").is_ok());
1883 assert!(validate_relation_format("").is_err());
1884 assert!(is_canonical_relation("applies_to"));
1885 assert!(!is_canonical_relation("implements"));
1886 }
1887
1888 use serial_test::serial;
1891
1892 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1894 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1895 let prev = std::env::var(key).ok();
1896 match value {
1897 Some(v) => std::env::set_var(key, v),
1898 None => std::env::remove_var(key),
1899 }
1900 f();
1901 match prev {
1902 Some(p) => std::env::set_var(key, p),
1903 None => std::env::remove_var(key),
1904 }
1905 }
1906
1907 #[test]
1908 #[serial]
1909 fn env_low_memory_enabled_unset_returns_false() {
1910 with_env_var(None, || assert!(!env_low_memory_enabled()));
1911 }
1912
1913 #[test]
1914 #[serial]
1915 fn env_low_memory_enabled_empty_returns_false() {
1916 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1917 }
1918
1919 #[test]
1920 #[serial]
1921 fn env_low_memory_enabled_truthy_values_return_true() {
1922 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1923 with_env_var(Some(v), || {
1924 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1925 });
1926 }
1927 }
1928
1929 #[test]
1930 #[serial]
1931 fn env_low_memory_enabled_falsy_values_return_false() {
1932 for v in ["0", "false", "FALSE", "no", "off"] {
1933 with_env_var(Some(v), || {
1934 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1935 });
1936 }
1937 }
1938
1939 #[test]
1940 #[serial]
1941 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1942 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1943 }
1944
1945 #[test]
1946 #[serial]
1947 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1948 with_env_var(None, || {
1949 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1950 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1951 assert_eq!(resolve_parallelism(true, None), 1);
1952 });
1953 }
1954
1955 #[test]
1956 #[serial]
1957 fn resolve_parallelism_env_forces_one_when_flag_off() {
1958 with_env_var(Some("1"), || {
1959 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1960 assert_eq!(resolve_parallelism(false, None), 1);
1961 });
1962 }
1963
1964 #[test]
1965 #[serial]
1966 fn resolve_parallelism_falsy_env_does_not_override() {
1967 with_env_var(Some("0"), || {
1968 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1969 });
1970 }
1971
1972 #[test]
1973 #[serial]
1974 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1975 with_env_var(None, || {
1976 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1977 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1978 });
1979 }
1980
1981 #[test]
1982 #[serial]
1983 fn resolve_parallelism_default_when_unset() {
1984 with_env_var(None, || {
1985 let p = resolve_parallelism(false, None);
1986 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1987 });
1988 }
1989
1990 #[test]
1991 fn ingest_args_parses_low_memory_flag_via_clap() {
1992 use clap::Parser;
1993 let cli = crate::cli::Cli::try_parse_from([
1996 "sqlite-graphrag",
1997 "ingest",
1998 "/tmp/dummy",
1999 "--type",
2000 "document",
2001 "--low-memory",
2002 ])
2003 .expect("parse must succeed");
2004 match cli.command {
2005 crate::cli::Commands::Ingest(args) => {
2006 assert!(args.low_memory, "--low-memory must set field to true");
2007 }
2008 _ => panic!("expected Ingest subcommand"),
2009 }
2010 }
2011
2012 #[test]
2013 fn ingest_args_low_memory_defaults_false() {
2014 use clap::Parser;
2015 let cli = crate::cli::Cli::try_parse_from([
2016 "sqlite-graphrag",
2017 "ingest",
2018 "/tmp/dummy",
2019 "--type",
2020 "document",
2021 ])
2022 .expect("parse must succeed");
2023 match cli.command {
2024 crate::cli::Commands::Ingest(args) => {
2025 assert!(!args.low_memory, "default must be false");
2026 }
2027 _ => panic!("expected Ingest subcommand"),
2028 }
2029 }
2030}