1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120 #[arg(
121 long,
122 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123 default_value = "fp32",
124 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
125 )]
126 pub gliner_variant: String,
127
128 #[arg(long, default_value_t = false, hide = true)]
130 pub skip_extraction: bool,
131
132 #[arg(long, default_value_t = false)]
134 pub fail_fast: bool,
135
136 #[arg(long, default_value_t = false)]
138 pub dry_run: bool,
139
140 #[arg(long, default_value_t = 10_000)]
142 pub max_files: usize,
143
144 #[arg(long)]
146 pub namespace: Option<String>,
147
148 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150 pub db: Option<String>,
151
152 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153 pub format: JsonOutputFormat,
154
155 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156 pub json: bool,
157
158 #[arg(
160 long,
161 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162 )]
163 pub ingest_parallelism: Option<usize>,
164
165 #[arg(
173 long,
174 default_value_t = false,
175 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176 Recommended for environments with <4 GB available RAM or container/cgroup \
177 constraints. Trade-off: 3-4x longer wall time. Also honored via \
178 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179 )]
180 pub low_memory: bool,
181
182 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185 pub max_rss_mb: u64,
186
187 #[arg(long, default_value_t = 2, value_name = "N",
192 value_parser = clap::value_parser!(u64).range(1..=32),
193 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
194 pub llm_parallelism: u64,
195
196 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
201 help = "Maximum length for derived memory names (default: 60)")]
202 pub max_name_length: usize,
203
204 #[arg(long, value_enum, default_value_t = IngestMode::None)]
206 pub mode: IngestMode,
207
208 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
210 pub claude_binary: Option<std::path::PathBuf>,
211
212 #[arg(long)]
214 pub claude_model: Option<String>,
215
216 #[arg(long, default_value_t = false)]
218 pub resume: bool,
219
220 #[arg(long, default_value_t = false)]
222 pub retry_failed: bool,
223
224 #[arg(long, default_value_t = false)]
226 pub keep_queue: bool,
227
228 #[arg(long, default_value = ".ingest-queue.sqlite")]
230 pub queue_db: String,
231
232 #[arg(long, default_value_t = 60)]
234 pub rate_limit_wait: u64,
235
236 #[arg(long)]
238 pub max_cost_usd: Option<f64>,
239
240 #[arg(
242 long,
243 default_value_t = 300,
244 help = "Timeout in seconds for each claude -p invocation (default: 300)"
245 )]
246 pub claude_timeout: u64,
247
248 #[arg(
250 long,
251 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
252 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
253 )]
254 pub codex_binary: Option<PathBuf>,
255
256 #[arg(
258 long,
259 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
260 )]
261 pub codex_model: Option<String>,
262
263 #[arg(
265 long,
266 default_value_t = 300,
267 help = "Timeout in seconds for each codex exec invocation (default: 300)"
268 )]
269 pub codex_timeout: u64,
270
271 #[arg(long, value_name = "SECONDS")]
274 pub wait_job_singleton: Option<u64>,
275
276 #[arg(long, default_value_t = false)]
279 pub force_job_singleton: bool,
280}
281
282#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
284pub enum IngestMode {
285 None,
287 Gliner,
289 ClaudeCode,
291 Codex,
293}
294
295fn env_low_memory_enabled() -> bool {
300 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
301 Ok(v) if v.is_empty() => false,
302 Ok(v) => match v.to_lowercase().as_str() {
303 "1" | "true" | "yes" | "on" => true,
304 "0" | "false" | "no" | "off" => false,
305 other => {
306 tracing::warn!(
307 target: "ingest",
308 value = %other,
309 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
310 );
311 false
312 }
313 },
314 Err(_) => false,
315 }
316}
317
318fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
330 let env_flag = env_low_memory_enabled();
331 let low_memory = low_memory_flag || env_flag;
332
333 if low_memory {
334 if let Some(n) = ingest_parallelism {
335 if n > 1 {
336 tracing::warn!(
337 target: "ingest",
338 requested = n,
339 "--ingest-parallelism overridden by --low-memory; using 1"
340 );
341 }
342 }
343 if low_memory_flag {
344 tracing::info!(
345 target: "ingest",
346 source = "flag",
347 "low-memory mode enabled: forcing --ingest-parallelism 1"
348 );
349 } else {
350 tracing::info!(
351 target: "ingest",
352 source = "env",
353 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
354 );
355 }
356 return 1;
357 }
358
359 ingest_parallelism
360 .unwrap_or_else(|| {
361 std::thread::available_parallelism()
362 .map(|v| v.get() / 2)
363 .unwrap_or(1)
364 .clamp(1, 4)
365 })
366 .max(1)
367}
368
369#[derive(Serialize)]
370struct IngestFileEvent<'a> {
371 file: &'a str,
372 name: &'a str,
373 status: &'a str,
374 truncated: bool,
376 #[serde(skip_serializing_if = "Option::is_none")]
378 original_name: Option<String>,
379 #[serde(skip_serializing_if = "Option::is_none")]
381 original_filename: Option<&'a str>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 error: Option<String>,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 memory_id: Option<i64>,
386 #[serde(skip_serializing_if = "Option::is_none")]
387 action: Option<String>,
388 body_length: usize,
390 #[serde(skip_serializing_if = "Option::is_none")]
395 backend_invoked: Option<&'a str>,
396}
397
398#[derive(Serialize)]
399struct IngestSummary {
400 summary: bool,
401 dir: String,
402 pattern: String,
403 recursive: bool,
404 files_total: usize,
405 files_succeeded: usize,
406 files_failed: usize,
407 files_skipped: usize,
408 elapsed_ms: u64,
409}
410
411struct FileSuccess {
413 memory_id: i64,
414 action: String,
415 body_length: usize,
416 backend_invoked: Option<&'static str>,
417}
418
419#[derive(Serialize)]
422struct StageProgressEvent<'a> {
423 schema_version: u8,
424 event: &'a str,
425 path: &'a str,
426 ms: u64,
427 entities: usize,
428 relationships: usize,
429}
430
431struct StagedFile {
434 body: String,
435 body_hash: String,
436 snippet: String,
437 name: String,
438 description: String,
439 embedding: Vec<f32>,
440 chunk_embeddings: Option<Vec<Vec<f32>>>,
441 chunks_info: Vec<crate::chunking::Chunk>,
442 entities: Vec<NewEntity>,
443 relationships: Vec<NewRelationship>,
444 entity_embeddings: Vec<Vec<f32>>,
445 urls: Vec<crate::extraction::ExtractedUrl>,
446 backend_invoked: Option<&'static str>,
451}
452
453#[allow(clippy::too_many_arguments)]
459fn stage_file(
460 _idx: usize,
461 path: &Path,
462 name: &str,
463 paths: &AppPaths,
464 enable_ner: bool,
465 gliner_variant: crate::extraction::GlinerVariant,
466 max_rss_mb: u64,
467 llm_parallelism: usize,
468 llm_backend: crate::cli::LlmBackendChoice,
469) -> Result<StagedFile, AppError> {
470 use crate::constants::*;
471
472 if name.len() > MAX_MEMORY_NAME_LEN {
473 return Err(AppError::LimitExceeded(
474 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
475 ));
476 }
477 if name.starts_with("__") {
478 return Err(AppError::Validation(
479 crate::i18n::validation::reserved_name(),
480 ));
481 }
482 {
483 let slug_re = crate::constants::name_slug_regex();
484 if !slug_re.is_match(name) {
485 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
486 name,
487 )));
488 }
489 }
490
491 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
492 if file_size > MAX_MEMORY_BODY_LEN as u64 {
493 return Err(AppError::LimitExceeded(
494 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
495 ));
496 }
497 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
498 if raw_body.len() > MAX_MEMORY_BODY_LEN {
499 return Err(AppError::LimitExceeded(
500 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
501 ));
502 }
503 if raw_body.trim().is_empty() {
504 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
505 }
506
507 let description = format!("ingested from {}", path.display());
508 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
509 return Err(AppError::Validation(
510 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
511 ));
512 }
513
514 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
515 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
516 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
517 if enable_ner {
518 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
519 Ok(extracted) => {
520 extracted_urls = extracted.urls;
521 extracted_entities = extracted
526 .entities
527 .into_iter()
528 .map(|e| NewEntity {
529 name: e.name,
530 entity_type: crate::entity_type::EntityType::Concept,
531 description: None,
532 })
533 .collect();
534 extracted_relationships.clear();
539
540 if extracted_entities.len() > max_entities_per_memory() {
541 extracted_entities.truncate(max_entities_per_memory());
542 }
543 if extracted_relationships.len() > max_relationships_per_memory() {
544 extracted_relationships.truncate(max_relationships_per_memory());
545 }
546 }
547 Err(e) => {
548 tracing::warn!(
549 target: "ingest",
550 file = %path.display(),
551 "auto-extraction failed (graceful degradation): {e:#}"
552 );
553 }
554 }
555 }
556
557 for rel in &mut extracted_relationships {
558 rel.relation = crate::parsers::normalize_relation(&rel.relation);
559 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
560 return Err(AppError::Validation(format!(
561 "{e} for relationship '{}' -> '{}'",
562 rel.source, rel.target
563 )));
564 }
565 crate::parsers::warn_if_non_canonical(&rel.relation);
566 if !(0.0..=1.0).contains(&rel.strength) {
567 return Err(AppError::Validation(format!(
568 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
569 rel.strength, rel.source, rel.target
570 )));
571 }
572 }
573
574 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
575 let snippet: String = raw_body.chars().take(200).collect();
576
577 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
578 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
579 return Err(AppError::LimitExceeded(format!(
580 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
581 chunks_info.len(),
582 REMEMBER_MAX_SAFE_MULTI_CHUNKS
583 )));
584 }
585
586 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
587 let (embedding, backend_invoked) = if chunks_info.len() == 1 {
591 crate::embedder::embed_passage_with_choice(&paths.models, &raw_body, Some(llm_backend))
593 .map(|(v, k)| (v, Some(k.as_str())))?
594 } else {
595 let chunk_texts: Vec<String> = chunks_info
598 .iter()
599 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
600 .collect();
601 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
602 if rss > max_rss_mb {
603 tracing::error!(
604 target: "ingest",
605 rss_mb = rss,
606 max_rss_mb = max_rss_mb,
607 file = %path.display(),
608 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
609 );
610 return Err(AppError::LowMemory {
611 available_mb: crate::memory_guard::available_memory_mb(),
612 required_mb: max_rss_mb,
613 });
614 }
615 }
616 let chunk_embeddings = crate::embedder::embed_passages_parallel_local(
617 &paths.models,
618 &chunk_texts,
619 llm_parallelism,
620 crate::embedder::chunk_embed_batch_size(),
621 )?;
622 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
623 chunk_embeddings_opt = Some(chunk_embeddings);
624 (aggregated, None)
627 };
628
629 let entity_texts: Vec<String> = extracted_entities
631 .iter()
632 .map(|entity| match &entity.description {
633 Some(desc) => format!("{} {}", entity.name, desc),
634 None => entity.name.clone(),
635 })
636 .collect();
637 let (entity_embeddings, embed_cache_stats) =
641 crate::embedder::embed_entity_texts_cached(&paths.models, &entity_texts, llm_parallelism)?;
642 if embed_cache_stats.hits > 0 {
643 tracing::debug!(
644 hits = embed_cache_stats.hits,
645 misses = embed_cache_stats.misses,
646 requested = embed_cache_stats.requested,
647 "G56: entity embed cache hit (ingest)"
648 );
649 }
650
651 Ok(StagedFile {
652 body: raw_body,
653 body_hash,
654 snippet,
655 name: name.to_string(),
656 description,
657 embedding,
658 chunk_embeddings: chunk_embeddings_opt,
659 chunks_info,
660 entities: extracted_entities,
661 relationships: extracted_relationships,
662 entity_embeddings,
663 urls: extracted_urls,
664 backend_invoked,
665 })
666}
667
668fn persist_staged(
670 conn: &mut Connection,
671 namespace: &str,
672 memory_type: &str,
673 staged: StagedFile,
674) -> Result<FileSuccess, AppError> {
675 {
676 let active_count: u32 = conn.query_row(
677 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
678 [],
679 |r| r.get::<_, i64>(0).map(|v| v as u32),
680 )?;
681 let ns_exists: bool = conn.query_row(
682 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
683 rusqlite::params![namespace],
684 |r| r.get::<_, i64>(0).map(|v| v > 0),
685 )?;
686 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
687 return Err(AppError::NamespaceError(format!(
688 "active namespace limit of {} exceeded while creating '{namespace}'",
689 crate::constants::MAX_NAMESPACES_ACTIVE
690 )));
691 }
692 }
693
694 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
695 if existing_memory.is_some() {
696 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
697 &staged.name,
698 namespace,
699 )));
700 }
701 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
702
703 let new_memory = NewMemory {
704 namespace: namespace.to_string(),
705 name: staged.name.clone(),
706 memory_type: memory_type.to_string(),
707 description: staged.description.clone(),
708 body: staged.body,
709 body_hash: staged.body_hash,
710 session_id: None,
711 source: "agent".to_string(),
712 metadata: serde_json::json!({}),
713 };
714
715 if let Some(hash_id) = duplicate_hash_id {
716 tracing::debug!(
717 target: "ingest",
718 duplicate_memory_id = hash_id,
719 "identical body already exists; persisting a new memory anyway"
720 );
721 }
722
723 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
724
725 let memory_id = memories::insert(&tx, &new_memory)?;
726 versions::insert_version(
727 &tx,
728 memory_id,
729 1,
730 &staged.name,
731 memory_type,
732 &staged.description,
733 &new_memory.body,
734 &serde_json::to_string(&new_memory.metadata)?,
735 None,
736 "create",
737 )?;
738 memories::upsert_vec(
739 &tx,
740 memory_id,
741 namespace,
742 memory_type,
743 &staged.embedding,
744 &staged.name,
745 &staged.snippet,
746 )?;
747
748 if staged.chunks_info.len() > 1 {
749 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
750 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
751 AppError::Internal(anyhow::anyhow!(
752 "missing chunk embeddings cache on multi-chunk ingest path"
753 ))
754 })?;
755 for (i, emb) in chunk_embeddings.iter().enumerate() {
756 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
757 }
758 }
759
760 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
761 for (idx, entity) in staged.entities.iter().enumerate() {
762 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
763 let entity_embedding = &staged.entity_embeddings[idx];
764 entities::upsert_entity_vec(
765 &tx,
766 entity_id,
767 namespace,
768 entity.entity_type,
769 entity_embedding,
770 &entity.name,
771 )?;
772 entities::link_memory_entity(&tx, memory_id, entity_id)?;
773 entities::increment_degree(&tx, entity_id)?;
774 }
775 let entity_types: std::collections::HashMap<&str, EntityType> = staged
776 .entities
777 .iter()
778 .map(|entity| (entity.name.as_str(), entity.entity_type))
779 .collect();
780 for rel in &staged.relationships {
781 let source_entity = NewEntity {
782 name: rel.source.clone(),
783 entity_type: entity_types
784 .get(rel.source.as_str())
785 .copied()
786 .unwrap_or(EntityType::Concept),
787 description: None,
788 };
789 let target_entity = NewEntity {
790 name: rel.target.clone(),
791 entity_type: entity_types
792 .get(rel.target.as_str())
793 .copied()
794 .unwrap_or(EntityType::Concept),
795 description: None,
796 };
797 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
798 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
799 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
800 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
801 }
802 }
803
804 tx.commit()?;
805
806 if !staged.urls.is_empty() {
807 let url_entries: Vec<storage_urls::MemoryUrl> = staged
808 .urls
809 .into_iter()
810 .map(|u| storage_urls::MemoryUrl {
811 url: u.url,
812 offset: Some(u.start as i64),
813 })
814 .collect();
815 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
816 }
817
818 Ok(FileSuccess {
819 memory_id,
820 action: "created".to_string(),
821 body_length: new_memory.body.len(),
822 backend_invoked: staged.backend_invoked,
823 })
824}
825
826fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
834 value == default
835}
836
837fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
852 const DEFAULT_TIMEOUT: u64 = 300;
853 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
854
855 let mut conflicts: Vec<String> = Vec::new();
856
857 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
858
859 if is_local_mode {
860 if args.claude_binary.is_some() {
861 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
862 }
863 if args.claude_model.is_some() {
864 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
865 }
866 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
867 conflicts.push(format!(
868 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
869 args.claude_timeout
870 ));
871 }
872 if args.codex_binary.is_some() {
873 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
874 }
875 if args.codex_model.is_some() {
876 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
877 }
878 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
879 conflicts.push(format!(
880 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
881 args.codex_timeout
882 ));
883 }
884 if args.max_cost_usd.is_some() {
885 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
886 }
887 if args.resume {
888 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
889 }
890 if args.retry_failed {
891 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
892 }
893 if args.keep_queue {
894 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
895 }
896 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
897 conflicts.push(format!(
898 "--rate-limit-wait={} is ignored when --mode is none or gliner",
899 args.rate_limit_wait
900 ));
901 }
902 }
903
904 match args.mode {
905 IngestMode::ClaudeCode => {
906 if args.codex_binary.is_some() {
907 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
908 }
909 if args.codex_model.is_some() {
910 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
911 }
912 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
913 conflicts.push(format!(
914 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
915 args.codex_timeout
916 ));
917 }
918 }
919 IngestMode::Codex => {
920 if args.claude_binary.is_some() {
921 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
922 }
923 if args.claude_model.is_some() {
924 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
925 }
926 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
927 conflicts.push(format!(
928 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
929 args.claude_timeout
930 ));
931 }
932 if args.max_cost_usd.is_some() {
933 conflicts.push(
934 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
935 .to_string(),
936 );
937 }
938 if args.resume {
939 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
940 }
941 if args.retry_failed {
942 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
943 }
944 if args.keep_queue {
945 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
946 }
947 }
948 IngestMode::None | IngestMode::Gliner => {}
949 }
950
951 if !conflicts.is_empty() {
952 return Err(AppError::Validation(format!(
953 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
954 args.mode,
955 conflicts.join("\n - ")
956 )));
957 }
958
959 Ok(())
960}
961
962#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
965pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
966 validate_mode_conditional_flags_ingest(&args)?;
969 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
970 if args.mode == IngestMode::ClaudeCode {
971 return super::ingest_claude::run_claude_ingest(&args);
972 }
973 if args.mode == IngestMode::Codex {
974 return super::ingest_codex::run_codex_ingest(&args);
975 }
976
977 let started = std::time::Instant::now();
978
979 if !args.dir.exists() {
980 return Err(AppError::Validation(format!(
981 "directory not found: {}",
982 args.dir.display()
983 )));
984 }
985 if !args.dir.is_dir() {
986 return Err(AppError::Validation(format!(
987 "path is not a directory: {}",
988 args.dir.display()
989 )));
990 }
991
992 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
993 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
994 files.sort_unstable();
995
996 if files.len() > args.max_files {
997 return Err(AppError::Validation(format!(
998 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
999 files.len(),
1000 args.max_files
1001 )));
1002 }
1003
1004 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1005 let memory_type_str = args.r#type.as_str().to_string();
1006
1007 let paths = AppPaths::resolve(args.db.as_deref())?;
1008 let mut conn_or_err = match init_storage(&paths) {
1009 Ok(c) => Ok(c),
1010 Err(e) => Err(format!("{e}")),
1011 };
1012
1013 let mut succeeded: usize = 0;
1014 let mut failed: usize = 0;
1015 let mut skipped: usize = 0;
1016 let total = files.len();
1017
1018 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1021
1022 enum SlotMeta {
1028 Skip {
1029 file_str: String,
1030 derived_base: String,
1031 name_truncated: bool,
1032 original_name: Option<String>,
1033 original_filename: Option<String>,
1034 reason: String,
1035 },
1036 Process {
1037 file_str: String,
1038 derived_name: String,
1039 name_truncated: bool,
1040 original_name: Option<String>,
1041 original_filename: Option<String>,
1042 },
1043 }
1044
1045 struct ProcessItem {
1046 idx: usize,
1047 path: PathBuf,
1048 file_str: String,
1049 derived_name: String,
1050 }
1051
1052 let files_cap = files.len();
1053 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1054 slots_meta.try_reserve(files_cap).map_err(|_| {
1055 AppError::LimitExceeded(format!(
1056 "allocation of {files_cap} slot metadata entries would exceed available memory"
1057 ))
1058 })?;
1059 let mut process_items: Vec<ProcessItem> = Vec::new();
1060 process_items.try_reserve(files_cap).map_err(|_| {
1061 AppError::LimitExceeded(format!(
1062 "allocation of {files_cap} process items would exceed available memory"
1063 ))
1064 })?;
1065 let mut truncations: Vec<(String, String)> = Vec::new();
1066 truncations.try_reserve(files_cap).map_err(|_| {
1067 AppError::LimitExceeded(format!(
1068 "allocation of {files_cap} truncation entries would exceed available memory"
1069 ))
1070 })?;
1071
1072 let max_name_length = args.max_name_length;
1073 for path in &files {
1074 let file_str = path.to_string_lossy().into_owned();
1075 let (derived_base, name_truncated, original_name) =
1076 derive_kebab_name(path, max_name_length);
1077 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1078
1079 if name_truncated {
1080 if let Some(ref orig) = original_name {
1081 truncations.push((orig.clone(), derived_base.clone()));
1082 }
1083 }
1084
1085 if derived_base.is_empty() {
1086 let orig_filename = if !original_basename.is_empty() {
1088 Some(original_basename.to_string())
1089 } else {
1090 None
1091 };
1092 slots_meta.push(SlotMeta::Skip {
1093 file_str,
1094 derived_base: String::new(),
1095 name_truncated: false,
1096 original_name: None,
1097 original_filename: orig_filename,
1098 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1099 });
1100 continue;
1101 }
1102
1103 match unique_name(&derived_base, &taken_names) {
1104 Ok(derived_name) => {
1105 taken_names.insert(derived_name.clone());
1106 let idx = slots_meta.len();
1107 let orig_filename = if original_basename != derived_name {
1109 Some(original_basename.to_string())
1110 } else {
1111 None
1112 };
1113 process_items.push(ProcessItem {
1114 idx,
1115 path: path.clone(),
1116 file_str: file_str.clone(),
1117 derived_name: derived_name.clone(),
1118 });
1119 slots_meta.push(SlotMeta::Process {
1120 file_str,
1121 derived_name,
1122 name_truncated,
1123 original_name,
1124 original_filename: orig_filename,
1125 });
1126 }
1127 Err(e) => {
1128 let orig_filename = if original_basename != derived_base {
1129 Some(original_basename.to_string())
1130 } else {
1131 None
1132 };
1133 slots_meta.push(SlotMeta::Skip {
1134 file_str,
1135 derived_base,
1136 name_truncated,
1137 original_name,
1138 original_filename: orig_filename,
1139 reason: e.to_string(),
1140 });
1141 }
1142 }
1143 }
1144
1145 if !truncations.is_empty() {
1146 tracing::info!(
1147 target: "ingest",
1148 count = truncations.len(),
1149 max_name_length = max_name_length,
1150 max_len = DERIVED_NAME_MAX_LEN,
1151 "derived names truncated; pass -vv (debug) for per-file detail"
1152 );
1153 }
1154
1155 if args.dry_run {
1157 for meta in &slots_meta {
1158 match meta {
1159 SlotMeta::Skip {
1160 file_str,
1161 derived_base,
1162 name_truncated,
1163 original_name,
1164 original_filename,
1165 reason,
1166 } => {
1167 output::emit_json_compact(&IngestFileEvent {
1168 file: file_str,
1169 name: derived_base,
1170 status: "skip",
1171 truncated: *name_truncated,
1172 original_name: original_name.clone(),
1173 original_filename: original_filename.as_deref(),
1174 error: Some(reason.clone()),
1175 memory_id: None,
1176 action: None,
1177 body_length: 0,
1178 backend_invoked: None,
1179 })?;
1180 }
1181 SlotMeta::Process {
1182 file_str,
1183 derived_name,
1184 name_truncated,
1185 original_name,
1186 original_filename,
1187 } => {
1188 output::emit_json_compact(&IngestFileEvent {
1189 file: file_str,
1190 name: derived_name,
1191 status: "preview",
1192 truncated: *name_truncated,
1193 original_name: original_name.clone(),
1194 original_filename: original_filename.as_deref(),
1195 error: None,
1196 memory_id: None,
1197 action: None,
1198 body_length: 0,
1199 backend_invoked: None,
1200 })?;
1201 }
1202 }
1203 }
1204 output::emit_json_compact(&IngestSummary {
1205 summary: true,
1206 dir: args.dir.to_string_lossy().into_owned(),
1207 pattern: args.pattern.clone(),
1208 recursive: args.recursive,
1209 files_total: total,
1210 files_succeeded: 0,
1211 files_failed: 0,
1212 files_skipped: 0,
1213 elapsed_ms: started.elapsed().as_millis() as u64,
1214 })?;
1215 return Ok(());
1216 }
1217
1218 if args.low_memory {
1220 if let Some(n) = args.ingest_parallelism {
1221 if n > 1 {
1222 return Err(AppError::Validation(
1223 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1224 .to_string(),
1225 ));
1226 }
1227 }
1228 }
1229
1230 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1233
1234 let pool = rayon::ThreadPoolBuilder::new()
1235 .num_threads(parallelism)
1236 .build()
1237 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1238
1239 if args.enable_ner && args.skip_extraction {
1240 return Err(AppError::Validation(
1241 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1242 ));
1243 }
1244 if args.skip_extraction && !args.enable_ner {
1245 tracing::warn!(
1252 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1253 );
1254 }
1255 let enable_ner = args.enable_ner;
1256 let max_rss_mb = args.max_rss_mb;
1257 let llm_parallelism = args.llm_parallelism as usize;
1258 if args.mode == IngestMode::Gliner {
1262 tracing::warn!(
1263 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1264 );
1265 }
1266 if args.gliner_variant != "fp32" {
1267 tracing::warn!(
1268 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1269 );
1270 }
1271 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1272 "int8" => crate::extraction::GlinerVariant::Int8,
1273 _ => crate::extraction::GlinerVariant::Fp32,
1274 };
1275
1276 let total_to_process = process_items.len();
1277 tracing::info!(
1278 target: "ingest",
1279 phase = "pipeline_start",
1280 files = total_to_process,
1281 ingest_parallelism = parallelism,
1282 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1283 );
1284
1285 let channel_bound = (parallelism * 2).max(1);
1289 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1290
1291 let paths_owned = paths.clone();
1296 let llm_backend_owned = llm_backend;
1297 let producer_handle = std::thread::spawn(move || {
1298 pool.install(|| {
1299 process_items.into_par_iter().for_each(|item| {
1300 if crate::shutdown_requested() {
1301 return;
1302 }
1303 let t0 = std::time::Instant::now();
1304 let result = stage_file(
1305 item.idx,
1306 &item.path,
1307 &item.derived_name,
1308 &paths_owned,
1309 enable_ner,
1310 gliner_variant,
1311 max_rss_mb,
1312 llm_parallelism,
1313 llm_backend_owned,
1314 );
1315 let elapsed_ms = t0.elapsed().as_millis() as u64;
1316
1317 let (n_entities, n_relationships) = match &result {
1320 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1321 Err(_) => (0, 0),
1322 };
1323 let progress = StageProgressEvent {
1324 schema_version: 1,
1325 event: "file_extracted",
1326 path: &item.file_str,
1327 ms: elapsed_ms,
1328 entities: n_entities,
1329 relationships: n_relationships,
1330 };
1331 if let Ok(line) = serde_json::to_string(&progress) {
1332 tracing::info!(target: "ingest_progress", "{}", line);
1333 }
1334
1335 let _ = tx.send((item.idx, result));
1339 });
1340 drop(tx);
1342 });
1343 });
1344
1345 let fail_fast = args.fail_fast;
1357
1358 for meta in &slots_meta {
1360 if let SlotMeta::Skip {
1361 file_str,
1362 derived_base,
1363 name_truncated,
1364 original_name,
1365 original_filename,
1366 reason,
1367 } = meta
1368 {
1369 output::emit_json_compact(&IngestFileEvent {
1370 file: file_str,
1371 name: derived_base,
1372 status: "skipped",
1373 truncated: *name_truncated,
1374 original_name: original_name.clone(),
1375 original_filename: original_filename.as_deref(),
1376 error: Some(reason.clone()),
1377 memory_id: None,
1378 action: None,
1379 body_length: 0,
1380 backend_invoked: None,
1381 })?;
1382 skipped += 1;
1383 }
1384 }
1385
1386 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1389 .iter()
1390 .enumerate()
1391 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1392 .collect();
1393
1394 tracing::info!(
1395 target: "ingest",
1396 phase = "persist_start",
1397 files = total_to_process,
1398 "phase B starting: persisting files incrementally as Phase A completes each one",
1399 );
1400
1401 for (idx, stage_result) in rx {
1405 if crate::shutdown_requested() {
1406 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1407 break;
1408 }
1409 let meta = meta_index.get(&idx).ok_or_else(|| {
1410 AppError::Internal(anyhow::anyhow!(
1411 "channel idx {idx} has no corresponding Process slot"
1412 ))
1413 })?;
1414 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1415 {
1416 SlotMeta::Process {
1417 file_str,
1418 derived_name,
1419 name_truncated,
1420 original_name,
1421 original_filename,
1422 } => (
1423 file_str,
1424 derived_name,
1425 name_truncated,
1426 original_name,
1427 original_filename,
1428 ),
1429 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1430 };
1431
1432 let conn = match conn_or_err.as_mut() {
1434 Ok(c) => c,
1435 Err(err_msg) => {
1436 let err_clone = err_msg.clone();
1437 output::emit_json_compact(&IngestFileEvent {
1438 file: file_str,
1439 name: derived_name,
1440 status: "failed",
1441 truncated: *name_truncated,
1442 original_name: original_name.clone(),
1443 original_filename: original_filename.as_deref(),
1444 error: Some(err_clone.clone()),
1445 memory_id: None,
1446 action: None,
1447 body_length: 0,
1448 backend_invoked: None,
1449 })?;
1450 failed += 1;
1451 if fail_fast {
1452 output::emit_json_compact(&IngestSummary {
1453 summary: true,
1454 dir: args.dir.display().to_string(),
1455 pattern: args.pattern.clone(),
1456 recursive: args.recursive,
1457 files_total: total,
1458 files_succeeded: succeeded,
1459 files_failed: failed,
1460 files_skipped: skipped,
1461 elapsed_ms: started.elapsed().as_millis() as u64,
1462 })?;
1463 return Err(AppError::Validation(format!(
1464 "ingest aborted on first failure: {err_clone}"
1465 )));
1466 }
1467 continue;
1468 }
1469 };
1470
1471 let outcome =
1472 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1473
1474 match outcome {
1475 Ok(FileSuccess {
1476 memory_id,
1477 action,
1478 body_length,
1479 backend_invoked: file_backend_invoked,
1480 }) => {
1481 output::emit_json_compact(&IngestFileEvent {
1482 file: file_str,
1483 name: derived_name,
1484 status: "indexed",
1485 truncated: *name_truncated,
1486 original_name: original_name.clone(),
1487 original_filename: original_filename.as_deref(),
1488 error: None,
1489 memory_id: Some(memory_id),
1490 action: Some(action),
1491 body_length,
1492 backend_invoked: file_backend_invoked,
1493 })?;
1494 succeeded += 1;
1495 }
1496 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1497 output::emit_json_compact(&IngestFileEvent {
1498 file: file_str,
1499 name: derived_name,
1500 status: "skipped",
1501 truncated: *name_truncated,
1502 original_name: original_name.clone(),
1503 original_filename: original_filename.as_deref(),
1504 error: Some(format!("{e}")),
1505 memory_id: None,
1506 action: Some("duplicate".to_string()),
1507 body_length: 0,
1508 backend_invoked: None,
1509 })?;
1510 skipped += 1;
1511 }
1512 Err(e) => {
1513 let err_msg = format!("{e}");
1514 output::emit_json_compact(&IngestFileEvent {
1515 file: file_str,
1516 name: derived_name,
1517 status: "failed",
1518 truncated: *name_truncated,
1519 original_name: original_name.clone(),
1520 original_filename: original_filename.as_deref(),
1521 error: Some(err_msg.clone()),
1522 memory_id: None,
1523 action: None,
1524 body_length: 0,
1525 backend_invoked: None,
1526 })?;
1527 failed += 1;
1528 if fail_fast {
1529 output::emit_json_compact(&IngestSummary {
1530 summary: true,
1531 dir: args.dir.display().to_string(),
1532 pattern: args.pattern.clone(),
1533 recursive: args.recursive,
1534 files_total: total,
1535 files_succeeded: succeeded,
1536 files_failed: failed,
1537 files_skipped: skipped,
1538 elapsed_ms: started.elapsed().as_millis() as u64,
1539 })?;
1540 return Err(AppError::Validation(format!(
1541 "ingest aborted on first failure: {err_msg}"
1542 )));
1543 }
1544 }
1545 }
1546 }
1547
1548 producer_handle
1550 .join()
1551 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1552
1553 if let Ok(ref conn) = conn_or_err {
1554 if succeeded > 0 {
1555 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1556 }
1557 }
1558
1559 output::emit_json_compact(&IngestSummary {
1560 summary: true,
1561 dir: args.dir.display().to_string(),
1562 pattern: args.pattern.clone(),
1563 recursive: args.recursive,
1564 files_total: total,
1565 files_succeeded: succeeded,
1566 files_failed: failed,
1567 files_skipped: skipped,
1568 elapsed_ms: started.elapsed().as_millis() as u64,
1569 })?;
1570
1571 Ok(())
1572}
1573
1574fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1580 ensure_db_ready(paths)?;
1581 let conn = open_rw(&paths.db)?;
1582 Ok(conn)
1583}
1584
1585pub(crate) fn collect_files(
1586 dir: &Path,
1587 pattern: &str,
1588 recursive: bool,
1589 out: &mut Vec<PathBuf>,
1590) -> Result<(), AppError> {
1591 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1592 for entry in entries {
1593 let entry = entry.map_err(AppError::Io)?;
1594 let path = entry.path();
1595 let file_type = entry.file_type().map_err(AppError::Io)?;
1596 if file_type.is_file() {
1597 let name = entry.file_name();
1598 let name_str = name.to_string_lossy();
1599 if matches_pattern(&name_str, pattern) {
1600 out.push(path);
1601 }
1602 } else if file_type.is_dir() && recursive {
1603 collect_files(&path, pattern, recursive, out)?;
1604 }
1605 }
1606 Ok(())
1607}
1608
1609fn matches_pattern(name: &str, pattern: &str) -> bool {
1610 if let Some(suffix) = pattern.strip_prefix('*') {
1611 name.ends_with(suffix)
1612 } else if let Some(prefix) = pattern.strip_suffix('*') {
1613 name.starts_with(prefix)
1614 } else {
1615 name == pattern
1616 }
1617}
1618
1619pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1630 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1631 let lowered: String = stem
1632 .nfd()
1633 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1634 .map(|c| {
1635 if c == '_' || c.is_whitespace() {
1636 '-'
1637 } else {
1638 c
1639 }
1640 })
1641 .map(|c| c.to_ascii_lowercase())
1642 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1643 .collect();
1644 let collapsed = collapse_dashes(&lowered);
1645 let trimmed_raw = collapsed.trim_matches('-').to_string();
1646 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1648 format!("doc-{trimmed_raw}")
1649 } else {
1650 trimmed_raw
1651 };
1652 if trimmed.len() > max_len {
1653 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1654 tracing::debug!(
1655 target: "ingest",
1656 original = %trimmed,
1657 truncated_to = %truncated,
1658 max_len = max_len,
1659 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1660 );
1661 (truncated, true, Some(trimmed))
1662 } else {
1663 (trimmed, false, None)
1664 }
1665}
1666
1667fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1680 if !taken.contains(base) {
1681 return Ok(base.to_string());
1682 }
1683 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1684 let candidate = format!("{base}-{suffix}");
1685 if !taken.contains(&candidate) {
1686 tracing::warn!(
1687 target: "ingest",
1688 base = %base,
1689 resolved = %candidate,
1690 suffix,
1691 "memory name collision resolved with numeric suffix"
1692 );
1693 return Ok(candidate);
1694 }
1695 }
1696 Err(AppError::Validation(format!(
1697 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1698 )))
1699}
1700
1701fn collapse_dashes(s: &str) -> String {
1702 let mut out = String::with_capacity(s.len());
1703 let mut prev_dash = false;
1704 for c in s.chars() {
1705 if c == '-' {
1706 if !prev_dash {
1707 out.push('-');
1708 }
1709 prev_dash = true;
1710 } else {
1711 out.push(c);
1712 prev_dash = false;
1713 }
1714 }
1715 out
1716}
1717
1718#[cfg(test)]
1719mod tests {
1720 use super::*;
1721 use std::path::PathBuf;
1722
1723 #[test]
1724 fn matches_pattern_suffix() {
1725 assert!(matches_pattern("foo.md", "*.md"));
1726 assert!(!matches_pattern("foo.txt", "*.md"));
1727 assert!(matches_pattern("foo.md", "*"));
1728 }
1729
1730 #[test]
1731 fn matches_pattern_prefix() {
1732 assert!(matches_pattern("README.md", "README*"));
1733 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1734 }
1735
1736 #[test]
1737 fn matches_pattern_exact() {
1738 assert!(matches_pattern("README.md", "README.md"));
1739 assert!(!matches_pattern("readme.md", "README.md"));
1740 }
1741
1742 #[test]
1743 fn derive_kebab_underscore_to_dash() {
1744 let p = PathBuf::from("/tmp/claude_code_headless.md");
1745 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1746 assert_eq!(name, "claude-code-headless");
1747 assert!(!truncated);
1748 assert!(original.is_none());
1749 }
1750
1751 #[test]
1752 fn derive_kebab_uppercase_lowered() {
1753 let p = PathBuf::from("/tmp/README.md");
1754 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1755 assert_eq!(name, "readme");
1756 assert!(!truncated);
1757 assert!(original.is_none());
1758 }
1759
1760 #[test]
1761 fn derive_kebab_strips_non_kebab_chars() {
1762 let p = PathBuf::from("/tmp/some@weird#name!.md");
1763 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1764 assert_eq!(name, "someweirdname");
1765 assert!(!truncated);
1766 assert!(original.is_none());
1767 }
1768
1769 #[test]
1772 fn derive_kebab_folds_accented_letters_to_ascii() {
1773 let p = PathBuf::from("/tmp/açaí.md");
1774 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1775 assert_eq!(name, "acai", "got '{name}'");
1776 }
1777
1778 #[test]
1779 fn derive_kebab_handles_naive_with_diaeresis() {
1780 let p = PathBuf::from("/tmp/naïve-test.md");
1781 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1782 assert_eq!(name, "naive-test", "got '{name}'");
1783 }
1784
1785 #[test]
1786 fn derive_kebab_drops_emoji_keeps_word() {
1787 let p = PathBuf::from("/tmp/🚀-rocket.md");
1788 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1789 assert_eq!(name, "rocket", "got '{name}'");
1790 }
1791
1792 #[test]
1793 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1794 let p = PathBuf::from("/tmp/açaí🦜.md");
1795 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1796 assert_eq!(name, "acai", "got '{name}'");
1797 }
1798
1799 #[test]
1800 fn derive_kebab_pure_emoji_yields_empty() {
1801 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1802 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1803 assert!(name.is_empty(), "got '{name}'");
1804 }
1805
1806 #[test]
1807 fn derive_kebab_collapses_consecutive_dashes() {
1808 let p = PathBuf::from("/tmp/a__b___c.md");
1809 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1810 assert_eq!(name, "a-b-c");
1811 assert!(!truncated);
1812 assert!(original.is_none());
1813 }
1814
1815 #[test]
1816 fn derive_kebab_truncates_to_60_chars() {
1817 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1818 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1819 assert!(name.len() <= 60, "got len {}", name.len());
1820 assert!(truncated);
1821 assert!(original.is_some());
1822 assert!(original.unwrap().len() > 60);
1823 }
1824
1825 #[test]
1826 fn collect_files_finds_md_files() {
1827 let tmp = tempfile::tempdir().expect("tempdir");
1828 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1829 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1830 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1831 let mut out = Vec::new();
1832 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1833 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1834 }
1835
1836 #[test]
1837 fn collect_files_recursive_descends_subdirs() {
1838 let tmp = tempfile::tempdir().expect("tempdir");
1839 let sub = tmp.path().join("sub");
1840 std::fs::create_dir(&sub).unwrap();
1841 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1842 std::fs::write(sub.join("b.md"), "y").unwrap();
1843 let mut out = Vec::new();
1844 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1845 assert_eq!(out.len(), 2);
1846 }
1847
1848 #[test]
1849 fn collect_files_non_recursive_skips_subdirs() {
1850 let tmp = tempfile::tempdir().expect("tempdir");
1851 let sub = tmp.path().join("sub");
1852 std::fs::create_dir(&sub).unwrap();
1853 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1854 std::fs::write(sub.join("b.md"), "y").unwrap();
1855 let mut out = Vec::new();
1856 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1857 assert_eq!(out.len(), 1);
1858 }
1859
1860 #[test]
1863 fn derive_kebab_long_basename_truncated_within_cap() {
1864 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1865 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1866 assert!(
1867 name.len() <= DERIVED_NAME_MAX_LEN,
1868 "truncated name must respect cap; got {} chars",
1869 name.len()
1870 );
1871 assert!(!name.is_empty());
1872 assert!(truncated);
1873 assert!(original.is_some());
1874 }
1875
1876 #[test]
1877 fn unique_name_returns_base_when_free() {
1878 let taken: BTreeSet<String> = BTreeSet::new();
1879 let resolved = unique_name("note", &taken).expect("must resolve");
1880 assert_eq!(resolved, "note");
1881 }
1882
1883 #[test]
1884 fn unique_name_appends_first_free_suffix_on_collision() {
1885 let mut taken: BTreeSet<String> = BTreeSet::new();
1886 taken.insert("note".to_string());
1887 taken.insert("note-1".to_string());
1888 let resolved = unique_name("note", &taken).expect("must resolve");
1889 assert_eq!(resolved, "note-2");
1890 }
1891
1892 #[test]
1893 fn unique_name_errors_after_collision_cap() {
1894 let mut taken: BTreeSet<String> = BTreeSet::new();
1895 taken.insert("note".to_string());
1896 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1897 taken.insert(format!("note-{i}"));
1898 }
1899 let err = unique_name("note", &taken).expect_err("must surface error");
1900 assert!(matches!(err, AppError::Validation(_)));
1901 }
1902
1903 #[test]
1906 fn validate_relation_format_accepts_valid_relations() {
1907 use crate::parsers::{is_canonical_relation, validate_relation_format};
1908 assert!(validate_relation_format("applies_to").is_ok());
1909 assert!(validate_relation_format("depends_on").is_ok());
1910 assert!(validate_relation_format("implements").is_ok());
1911 assert!(validate_relation_format("").is_err());
1912 assert!(is_canonical_relation("applies_to"));
1913 assert!(!is_canonical_relation("implements"));
1914 }
1915
1916 use serial_test::serial;
1919
1920 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1922 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1923 let prev = std::env::var(key).ok();
1924 match value {
1925 Some(v) => std::env::set_var(key, v),
1926 None => std::env::remove_var(key),
1927 }
1928 f();
1929 match prev {
1930 Some(p) => std::env::set_var(key, p),
1931 None => std::env::remove_var(key),
1932 }
1933 }
1934
1935 #[test]
1936 #[serial]
1937 fn env_low_memory_enabled_unset_returns_false() {
1938 with_env_var(None, || assert!(!env_low_memory_enabled()));
1939 }
1940
1941 #[test]
1942 #[serial]
1943 fn env_low_memory_enabled_empty_returns_false() {
1944 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1945 }
1946
1947 #[test]
1948 #[serial]
1949 fn env_low_memory_enabled_truthy_values_return_true() {
1950 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1951 with_env_var(Some(v), || {
1952 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1953 });
1954 }
1955 }
1956
1957 #[test]
1958 #[serial]
1959 fn env_low_memory_enabled_falsy_values_return_false() {
1960 for v in ["0", "false", "FALSE", "no", "off"] {
1961 with_env_var(Some(v), || {
1962 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1963 });
1964 }
1965 }
1966
1967 #[test]
1968 #[serial]
1969 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1970 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1971 }
1972
1973 #[test]
1974 #[serial]
1975 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1976 with_env_var(None, || {
1977 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1978 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1979 assert_eq!(resolve_parallelism(true, None), 1);
1980 });
1981 }
1982
1983 #[test]
1984 #[serial]
1985 fn resolve_parallelism_env_forces_one_when_flag_off() {
1986 with_env_var(Some("1"), || {
1987 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1988 assert_eq!(resolve_parallelism(false, None), 1);
1989 });
1990 }
1991
1992 #[test]
1993 #[serial]
1994 fn resolve_parallelism_falsy_env_does_not_override() {
1995 with_env_var(Some("0"), || {
1996 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1997 });
1998 }
1999
2000 #[test]
2001 #[serial]
2002 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2003 with_env_var(None, || {
2004 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2005 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2006 });
2007 }
2008
2009 #[test]
2010 #[serial]
2011 fn resolve_parallelism_default_when_unset() {
2012 with_env_var(None, || {
2013 let p = resolve_parallelism(false, None);
2014 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2015 });
2016 }
2017
2018 #[test]
2019 fn ingest_args_parses_low_memory_flag_via_clap() {
2020 use clap::Parser;
2021 let cli = crate::cli::Cli::try_parse_from([
2024 "sqlite-graphrag",
2025 "ingest",
2026 "/tmp/dummy",
2027 "--type",
2028 "document",
2029 "--low-memory",
2030 ])
2031 .expect("parse must succeed");
2032 match cli.command {
2033 Some(crate::cli::Commands::Ingest(args)) => {
2034 assert!(args.low_memory, "--low-memory must set field to true");
2035 }
2036 _ => panic!("expected Ingest subcommand"),
2037 }
2038 }
2039
2040 #[test]
2041 fn ingest_args_low_memory_defaults_false() {
2042 use clap::Parser;
2043 let cli = crate::cli::Cli::try_parse_from([
2044 "sqlite-graphrag",
2045 "ingest",
2046 "/tmp/dummy",
2047 "--type",
2048 "document",
2049 ])
2050 .expect("parse must succeed");
2051 match cli.command {
2052 Some(crate::cli::Commands::Ingest(args)) => {
2053 assert!(!args.low_memory, "default must be false");
2054 }
2055 _ => panic!("expected Ingest subcommand"),
2056 }
2057 }
2058}