1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120
121 #[arg(
125 long,
126 default_value_t = true,
127 overrides_with = "no_auto_describe",
128 help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129 )]
130 pub auto_describe: bool,
131 #[arg(
132 long = "no-auto-describe",
133 default_value_t = false,
134 help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135 )]
136 pub no_auto_describe: bool,
137 #[arg(
138 long,
139 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140 default_value = "fp32",
141 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142 )]
143 pub gliner_variant: String,
144
145 #[arg(long, default_value_t = false, hide = true)]
147 pub skip_extraction: bool,
148
149 #[arg(long, default_value_t = false)]
151 pub fail_fast: bool,
152
153 #[arg(long, default_value_t = false)]
155 pub dry_run: bool,
156
157 #[arg(long, default_value_t = 10_000)]
159 pub max_files: usize,
160
161 #[arg(long)]
163 pub namespace: Option<String>,
164
165 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167 pub db: Option<String>,
168
169 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170 pub format: JsonOutputFormat,
171
172 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173 pub json: bool,
174
175 #[arg(
177 long,
178 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179 )]
180 pub ingest_parallelism: Option<usize>,
181
182 #[arg(
190 long,
191 default_value_t = false,
192 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193 Recommended for environments with <4 GB available RAM or container/cgroup \
194 constraints. Trade-off: 3-4x longer wall time. Also honored via \
195 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196 )]
197 pub low_memory: bool,
198
199 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202 pub max_rss_mb: u64,
203
204 #[arg(long, default_value_t = 2, value_name = "N",
209 value_parser = clap::value_parser!(u64).range(1..=32),
210 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211 pub llm_parallelism: u64,
212
213 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218 help = "Maximum length for derived memory names (default: 60)")]
219 pub max_name_length: usize,
220
221 #[arg(long, value_enum, default_value_t = IngestMode::None)]
223 pub mode: IngestMode,
224
225 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227 pub claude_binary: Option<std::path::PathBuf>,
228
229 #[arg(long)]
231 pub claude_model: Option<String>,
232
233 #[arg(long, default_value_t = false)]
235 pub resume: bool,
236
237 #[arg(long, default_value_t = false)]
239 pub retry_failed: bool,
240
241 #[arg(long, default_value_t = false)]
243 pub keep_queue: bool,
244
245 #[arg(long, default_value = ".ingest-queue.sqlite")]
247 pub queue_db: String,
248
249 #[arg(long, default_value_t = 60)]
251 pub rate_limit_wait: u64,
252
253 #[arg(long)]
255 pub max_cost_usd: Option<f64>,
256
257 #[arg(
259 long,
260 default_value_t = 300,
261 help = "Timeout in seconds for each claude -p invocation (default: 300)"
262 )]
263 pub claude_timeout: u64,
264
265 #[arg(
267 long,
268 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270 )]
271 pub codex_binary: Option<PathBuf>,
272
273 #[arg(
275 long,
276 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277 )]
278 pub codex_model: Option<String>,
279
280 #[arg(
282 long,
283 default_value_t = 300,
284 help = "Timeout in seconds for each codex exec invocation (default: 300)"
285 )]
286 pub codex_timeout: u64,
287
288 #[arg(long, value_name = "SECONDS")]
291 pub wait_job_singleton: Option<u64>,
292
293 #[arg(long, default_value_t = false)]
296 pub force_job_singleton: bool,
297}
298
299#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
301pub enum IngestMode {
302 None,
304 Gliner,
306 ClaudeCode,
308 Codex,
310}
311
312fn env_low_memory_enabled() -> bool {
317 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
318 Ok(v) if v.is_empty() => false,
319 Ok(v) => match v.to_lowercase().as_str() {
320 "1" | "true" | "yes" | "on" => true,
321 "0" | "false" | "no" | "off" => false,
322 other => {
323 tracing::warn!(
324 target: "ingest",
325 value = %other,
326 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
327 );
328 false
329 }
330 },
331 Err(_) => false,
332 }
333}
334
335fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
347 let env_flag = env_low_memory_enabled();
348 let low_memory = low_memory_flag || env_flag;
349
350 if low_memory {
351 if let Some(n) = ingest_parallelism {
352 if n > 1 {
353 tracing::warn!(
354 target: "ingest",
355 requested = n,
356 "--ingest-parallelism overridden by --low-memory; using 1"
357 );
358 }
359 }
360 if low_memory_flag {
361 tracing::info!(
362 target: "ingest",
363 source = "flag",
364 "low-memory mode enabled: forcing --ingest-parallelism 1"
365 );
366 } else {
367 tracing::info!(
368 target: "ingest",
369 source = "env",
370 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
371 );
372 }
373 return 1;
374 }
375
376 ingest_parallelism
377 .unwrap_or_else(|| {
378 std::thread::available_parallelism()
379 .map(|v| v.get() / 2)
380 .unwrap_or(1)
381 .clamp(1, 4)
382 })
383 .max(1)
384}
385
386#[derive(Serialize)]
387struct IngestFileEvent<'a> {
388 file: &'a str,
389 name: &'a str,
390 status: &'a str,
391 truncated: bool,
393 #[serde(skip_serializing_if = "Option::is_none")]
395 original_name: Option<String>,
396 #[serde(skip_serializing_if = "Option::is_none")]
398 original_filename: Option<&'a str>,
399 #[serde(skip_serializing_if = "Option::is_none")]
400 error: Option<String>,
401 #[serde(skip_serializing_if = "Option::is_none")]
402 memory_id: Option<i64>,
403 #[serde(skip_serializing_if = "Option::is_none")]
404 action: Option<String>,
405 body_length: usize,
407 #[serde(skip_serializing_if = "Option::is_none")]
412 backend_invoked: Option<&'a str>,
413}
414
415#[derive(Serialize)]
416struct IngestSummary {
417 summary: bool,
418 dir: String,
419 pattern: String,
420 recursive: bool,
421 files_total: usize,
422 files_succeeded: usize,
423 files_failed: usize,
424 files_skipped: usize,
425 elapsed_ms: u64,
426}
427
428struct FileSuccess {
430 memory_id: i64,
431 action: String,
432 body_length: usize,
433 backend_invoked: Option<&'static str>,
434}
435
436#[derive(Serialize)]
439struct StageProgressEvent<'a> {
440 schema_version: u8,
441 event: &'a str,
442 path: &'a str,
443 ms: u64,
444 entities: usize,
445 relationships: usize,
446}
447
448struct StagedFile {
451 body: String,
452 body_hash: String,
453 snippet: String,
454 name: String,
455 description: String,
456 embedding: Vec<f32>,
457 chunk_embeddings: Option<Vec<Vec<f32>>>,
458 chunks_info: Vec<crate::chunking::Chunk>,
459 entities: Vec<NewEntity>,
460 relationships: Vec<NewRelationship>,
461 entity_embeddings: Vec<Vec<f32>>,
462 urls: Vec<crate::extraction::ExtractedUrl>,
463 backend_invoked: Option<&'static str>,
468}
469
470#[allow(clippy::too_many_arguments)]
476fn stage_file(
477 _idx: usize,
478 path: &Path,
479 name: &str,
480 paths: &AppPaths,
481 enable_ner: bool,
482 gliner_variant: crate::extraction::GlinerVariant,
483 max_rss_mb: u64,
484 llm_parallelism: usize,
485 llm_backend: crate::cli::LlmBackendChoice,
486 auto_describe: bool,
487) -> Result<StagedFile, AppError> {
488 use crate::constants::*;
489
490 if name.len() > MAX_MEMORY_NAME_LEN {
491 return Err(AppError::LimitExceeded(
492 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
493 ));
494 }
495 if name.starts_with("__") {
496 return Err(AppError::Validation(
497 crate::i18n::validation::reserved_name(),
498 ));
499 }
500 {
501 let slug_re = crate::constants::name_slug_regex();
502 if !slug_re.is_match(name) {
503 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
504 name,
505 )));
506 }
507 }
508
509 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
510 if file_size > MAX_MEMORY_BODY_LEN as u64 {
511 return Err(AppError::LimitExceeded(
512 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
513 ));
514 }
515 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
516 if raw_body.len() > MAX_MEMORY_BODY_LEN {
517 return Err(AppError::LimitExceeded(
518 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
519 ));
520 }
521 if raw_body.trim().is_empty() {
522 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
523 }
524
525 let description = if auto_describe {
526 crate::commands::ingest_heuristics::extract_heuristic_description(
527 &raw_body,
528 Some(&path.display().to_string()),
529 )
530 } else {
531 format!("ingested from {}", path.display())
532 };
533 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
534 return Err(AppError::Validation(
535 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
536 ));
537 }
538
539 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
540 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
541 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
542 if enable_ner {
543 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
544 Ok(extracted) => {
545 extracted_urls = extracted.urls;
546 extracted_entities = extracted
551 .entities
552 .into_iter()
553 .map(|e| NewEntity {
554 name: e.name,
555 entity_type: crate::entity_type::EntityType::Concept,
556 description: None,
557 })
558 .collect();
559 extracted_relationships.clear();
564
565 if extracted_entities.len() > max_entities_per_memory() {
566 extracted_entities.truncate(max_entities_per_memory());
567 }
568 if extracted_relationships.len() > max_relationships_per_memory() {
569 extracted_relationships.truncate(max_relationships_per_memory());
570 }
571 }
572 Err(e) => {
573 tracing::warn!(
574 target: "ingest",
575 file = %path.display(),
576 "auto-extraction failed (graceful degradation): {e:#}"
577 );
578 }
579 }
580 }
581
582 for rel in &mut extracted_relationships {
583 rel.relation = crate::parsers::normalize_relation(&rel.relation);
584 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
585 return Err(AppError::Validation(format!(
586 "{e} for relationship '{}' -> '{}'",
587 rel.source, rel.target
588 )));
589 }
590 crate::parsers::warn_if_non_canonical(&rel.relation);
591 if !(0.0..=1.0).contains(&rel.strength) {
592 return Err(AppError::Validation(format!(
593 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
594 rel.strength, rel.source, rel.target
595 )));
596 }
597 }
598
599 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
600 let snippet: String = raw_body.chars().take(200).collect();
601
602 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
603 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
604 return Err(AppError::LimitExceeded(format!(
605 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
606 chunks_info.len(),
607 REMEMBER_MAX_SAFE_MULTI_CHUNKS
608 )));
609 }
610
611 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
612 let (embedding, backend_invoked) = if chunks_info.len() == 1 {
616 crate::embedder::embed_passage_with_choice(&paths.models, &raw_body, Some(llm_backend))
618 .map(|(v, k)| (v, Some(k.as_str())))?
619 } else {
620 let chunk_texts: Vec<String> = chunks_info
623 .iter()
624 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
625 .collect();
626 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
627 if rss > max_rss_mb {
628 tracing::error!(
629 target: "ingest",
630 rss_mb = rss,
631 max_rss_mb = max_rss_mb,
632 file = %path.display(),
633 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
634 );
635 return Err(AppError::LowMemory {
636 available_mb: crate::memory_guard::available_memory_mb(),
637 required_mb: max_rss_mb,
638 });
639 }
640 }
641 let chunk_embeddings = crate::embedder::embed_passages_parallel_local(
642 &paths.models,
643 &chunk_texts,
644 llm_parallelism,
645 crate::embedder::chunk_embed_batch_size(),
646 )?;
647 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
648 chunk_embeddings_opt = Some(chunk_embeddings);
649 (aggregated, None)
652 };
653
654 let entity_texts: Vec<String> = extracted_entities
656 .iter()
657 .map(|entity| match &entity.description {
658 Some(desc) => format!("{} {}", entity.name, desc),
659 None => entity.name.clone(),
660 })
661 .collect();
662 let (entity_embeddings, embed_cache_stats) =
666 crate::embedder::embed_entity_texts_cached(&paths.models, &entity_texts, llm_parallelism)?;
667 if embed_cache_stats.hits > 0 {
668 tracing::debug!(
669 hits = embed_cache_stats.hits,
670 misses = embed_cache_stats.misses,
671 requested = embed_cache_stats.requested,
672 "G56: entity embed cache hit (ingest)"
673 );
674 }
675
676 Ok(StagedFile {
677 body: raw_body,
678 body_hash,
679 snippet,
680 name: name.to_string(),
681 description,
682 embedding,
683 chunk_embeddings: chunk_embeddings_opt,
684 chunks_info,
685 entities: extracted_entities,
686 relationships: extracted_relationships,
687 entity_embeddings,
688 urls: extracted_urls,
689 backend_invoked,
690 })
691}
692
693fn persist_staged(
695 conn: &mut Connection,
696 namespace: &str,
697 memory_type: &str,
698 staged: StagedFile,
699) -> Result<FileSuccess, AppError> {
700 {
701 let active_count: u32 = conn.query_row(
702 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
703 [],
704 |r| r.get::<_, i64>(0).map(|v| v as u32),
705 )?;
706 let ns_exists: bool = conn.query_row(
707 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
708 rusqlite::params![namespace],
709 |r| r.get::<_, i64>(0).map(|v| v > 0),
710 )?;
711 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
712 return Err(AppError::NamespaceError(format!(
713 "active namespace limit of {} exceeded while creating '{namespace}'",
714 crate::constants::MAX_NAMESPACES_ACTIVE
715 )));
716 }
717 }
718
719 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
720 if existing_memory.is_some() {
721 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
722 &staged.name,
723 namespace,
724 )));
725 }
726 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
727
728 let new_memory = NewMemory {
729 namespace: namespace.to_string(),
730 name: staged.name.clone(),
731 memory_type: memory_type.to_string(),
732 description: staged.description.clone(),
733 body: staged.body,
734 body_hash: staged.body_hash,
735 session_id: None,
736 source: "agent".to_string(),
737 metadata: serde_json::json!({}),
738 };
739
740 if let Some(hash_id) = duplicate_hash_id {
741 tracing::debug!(
742 target: "ingest",
743 duplicate_memory_id = hash_id,
744 "identical body already exists; persisting a new memory anyway"
745 );
746 }
747
748 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
749
750 let memory_id = memories::insert(&tx, &new_memory)?;
751 versions::insert_version(
752 &tx,
753 memory_id,
754 1,
755 &staged.name,
756 memory_type,
757 &staged.description,
758 &new_memory.body,
759 &serde_json::to_string(&new_memory.metadata)?,
760 None,
761 "create",
762 )?;
763 memories::upsert_vec(
764 &tx,
765 memory_id,
766 namespace,
767 memory_type,
768 &staged.embedding,
769 &staged.name,
770 &staged.snippet,
771 )?;
772
773 if staged.chunks_info.len() > 1 {
774 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
775 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
776 AppError::Internal(anyhow::anyhow!(
777 "missing chunk embeddings cache on multi-chunk ingest path"
778 ))
779 })?;
780 for (i, emb) in chunk_embeddings.iter().enumerate() {
781 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
782 }
783 }
784
785 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
786 for (idx, entity) in staged.entities.iter().enumerate() {
787 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
788 let entity_embedding = &staged.entity_embeddings[idx];
789 entities::upsert_entity_vec(
790 &tx,
791 entity_id,
792 namespace,
793 entity.entity_type,
794 entity_embedding,
795 &entity.name,
796 )?;
797 entities::link_memory_entity(&tx, memory_id, entity_id)?;
798 entities::increment_degree(&tx, entity_id)?;
799 }
800 let entity_types: std::collections::HashMap<&str, EntityType> = staged
801 .entities
802 .iter()
803 .map(|entity| (entity.name.as_str(), entity.entity_type))
804 .collect();
805 for rel in &staged.relationships {
806 let source_entity = NewEntity {
807 name: rel.source.clone(),
808 entity_type: entity_types
809 .get(rel.source.as_str())
810 .copied()
811 .unwrap_or(EntityType::Concept),
812 description: None,
813 };
814 let target_entity = NewEntity {
815 name: rel.target.clone(),
816 entity_type: entity_types
817 .get(rel.target.as_str())
818 .copied()
819 .unwrap_or(EntityType::Concept),
820 description: None,
821 };
822 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
823 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
824 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
825 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
826 }
827 }
828
829 tx.commit()?;
830
831 if !staged.urls.is_empty() {
832 let url_entries: Vec<storage_urls::MemoryUrl> = staged
833 .urls
834 .into_iter()
835 .map(|u| storage_urls::MemoryUrl {
836 url: u.url,
837 offset: Some(u.start as i64),
838 })
839 .collect();
840 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
841 }
842
843 Ok(FileSuccess {
844 memory_id,
845 action: "created".to_string(),
846 body_length: new_memory.body.len(),
847 backend_invoked: staged.backend_invoked,
848 })
849}
850
851fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
859 value == default
860}
861
862fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
877 const DEFAULT_TIMEOUT: u64 = 300;
878 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
879
880 let mut conflicts: Vec<String> = Vec::new();
881
882 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
883
884 if is_local_mode {
885 if args.claude_binary.is_some() {
886 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
887 }
888 if args.claude_model.is_some() {
889 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
890 }
891 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
892 conflicts.push(format!(
893 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
894 args.claude_timeout
895 ));
896 }
897 if args.codex_binary.is_some() {
898 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
899 }
900 if args.codex_model.is_some() {
901 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
902 }
903 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
904 conflicts.push(format!(
905 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
906 args.codex_timeout
907 ));
908 }
909 if args.max_cost_usd.is_some() {
910 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
911 }
912 if args.resume {
913 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
914 }
915 if args.retry_failed {
916 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
917 }
918 if args.keep_queue {
919 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
920 }
921 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
922 conflicts.push(format!(
923 "--rate-limit-wait={} is ignored when --mode is none or gliner",
924 args.rate_limit_wait
925 ));
926 }
927 }
928
929 match args.mode {
930 IngestMode::ClaudeCode => {
931 if args.codex_binary.is_some() {
932 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
933 }
934 if args.codex_model.is_some() {
935 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
936 }
937 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
938 conflicts.push(format!(
939 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
940 args.codex_timeout
941 ));
942 }
943 }
944 IngestMode::Codex => {
945 if args.claude_binary.is_some() {
946 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
947 }
948 if args.claude_model.is_some() {
949 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
950 }
951 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
952 conflicts.push(format!(
953 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
954 args.claude_timeout
955 ));
956 }
957 if args.max_cost_usd.is_some() {
958 conflicts.push(
959 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
960 .to_string(),
961 );
962 }
963 if args.resume {
964 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
965 }
966 if args.retry_failed {
967 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
968 }
969 if args.keep_queue {
970 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
971 }
972 }
973 IngestMode::None | IngestMode::Gliner => {}
974 }
975
976 if !conflicts.is_empty() {
977 return Err(AppError::Validation(format!(
978 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
979 args.mode,
980 conflicts.join("\n - ")
981 )));
982 }
983
984 Ok(())
985}
986
987#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
990pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
991 validate_mode_conditional_flags_ingest(&args)?;
994 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
995 if args.mode == IngestMode::ClaudeCode {
996 return super::ingest_claude::run_claude_ingest(&args);
997 }
998 if args.mode == IngestMode::Codex {
999 return super::ingest_codex::run_codex_ingest(&args);
1000 }
1001
1002 let started = std::time::Instant::now();
1003
1004 if !args.dir.exists() {
1005 return Err(AppError::Validation(format!(
1006 "directory not found: {}",
1007 args.dir.display()
1008 )));
1009 }
1010 if !args.dir.is_dir() {
1011 return Err(AppError::Validation(format!(
1012 "path is not a directory: {}",
1013 args.dir.display()
1014 )));
1015 }
1016
1017 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1018 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1019 files.sort_unstable();
1020
1021 if files.len() > args.max_files {
1022 return Err(AppError::Validation(format!(
1023 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1024 files.len(),
1025 args.max_files
1026 )));
1027 }
1028
1029 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1030 let memory_type_str = args.r#type.as_str().to_string();
1031
1032 let paths = AppPaths::resolve(args.db.as_deref())?;
1033 let mut conn_or_err = match init_storage(&paths) {
1034 Ok(c) => Ok(c),
1035 Err(e) => Err(format!("{e}")),
1036 };
1037
1038 let mut succeeded: usize = 0;
1039 let mut failed: usize = 0;
1040 let mut skipped: usize = 0;
1041 let total = files.len();
1042
1043 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1046
1047 enum SlotMeta {
1053 Skip {
1054 file_str: String,
1055 derived_base: String,
1056 name_truncated: bool,
1057 original_name: Option<String>,
1058 original_filename: Option<String>,
1059 reason: String,
1060 },
1061 Process {
1062 file_str: String,
1063 derived_name: String,
1064 name_truncated: bool,
1065 original_name: Option<String>,
1066 original_filename: Option<String>,
1067 },
1068 }
1069
1070 struct ProcessItem {
1071 idx: usize,
1072 path: PathBuf,
1073 file_str: String,
1074 derived_name: String,
1075 }
1076
1077 let files_cap = files.len();
1078 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1079 slots_meta.try_reserve(files_cap).map_err(|_| {
1080 AppError::LimitExceeded(format!(
1081 "allocation of {files_cap} slot metadata entries would exceed available memory"
1082 ))
1083 })?;
1084 let mut process_items: Vec<ProcessItem> = Vec::new();
1085 process_items.try_reserve(files_cap).map_err(|_| {
1086 AppError::LimitExceeded(format!(
1087 "allocation of {files_cap} process items would exceed available memory"
1088 ))
1089 })?;
1090 let mut truncations: Vec<(String, String)> = Vec::new();
1091 truncations.try_reserve(files_cap).map_err(|_| {
1092 AppError::LimitExceeded(format!(
1093 "allocation of {files_cap} truncation entries would exceed available memory"
1094 ))
1095 })?;
1096
1097 let max_name_length = args.max_name_length;
1098 for path in &files {
1099 let file_str = path.to_string_lossy().into_owned();
1100 let (derived_base, name_truncated, original_name) =
1101 derive_kebab_name(path, max_name_length);
1102 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1103
1104 if name_truncated {
1105 if let Some(ref orig) = original_name {
1106 truncations.push((orig.clone(), derived_base.clone()));
1107 }
1108 }
1109
1110 if derived_base.is_empty() {
1111 let orig_filename = if !original_basename.is_empty() {
1113 Some(original_basename.to_string())
1114 } else {
1115 None
1116 };
1117 slots_meta.push(SlotMeta::Skip {
1118 file_str,
1119 derived_base: String::new(),
1120 name_truncated: false,
1121 original_name: None,
1122 original_filename: orig_filename,
1123 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1124 });
1125 continue;
1126 }
1127
1128 match unique_name(&derived_base, &taken_names) {
1129 Ok(derived_name) => {
1130 taken_names.insert(derived_name.clone());
1131 let idx = slots_meta.len();
1132 let orig_filename = if original_basename != derived_name {
1134 Some(original_basename.to_string())
1135 } else {
1136 None
1137 };
1138 process_items.push(ProcessItem {
1139 idx,
1140 path: path.clone(),
1141 file_str: file_str.clone(),
1142 derived_name: derived_name.clone(),
1143 });
1144 slots_meta.push(SlotMeta::Process {
1145 file_str,
1146 derived_name,
1147 name_truncated,
1148 original_name,
1149 original_filename: orig_filename,
1150 });
1151 }
1152 Err(e) => {
1153 let orig_filename = if original_basename != derived_base {
1154 Some(original_basename.to_string())
1155 } else {
1156 None
1157 };
1158 slots_meta.push(SlotMeta::Skip {
1159 file_str,
1160 derived_base,
1161 name_truncated,
1162 original_name,
1163 original_filename: orig_filename,
1164 reason: e.to_string(),
1165 });
1166 }
1167 }
1168 }
1169
1170 if !truncations.is_empty() {
1171 tracing::info!(
1172 target: "ingest",
1173 count = truncations.len(),
1174 max_name_length = max_name_length,
1175 max_len = DERIVED_NAME_MAX_LEN,
1176 "derived names truncated; pass -vv (debug) for per-file detail"
1177 );
1178 }
1179
1180 if args.dry_run {
1182 for meta in &slots_meta {
1183 match meta {
1184 SlotMeta::Skip {
1185 file_str,
1186 derived_base,
1187 name_truncated,
1188 original_name,
1189 original_filename,
1190 reason,
1191 } => {
1192 output::emit_json_compact(&IngestFileEvent {
1193 file: file_str,
1194 name: derived_base,
1195 status: "skip",
1196 truncated: *name_truncated,
1197 original_name: original_name.clone(),
1198 original_filename: original_filename.as_deref(),
1199 error: Some(reason.clone()),
1200 memory_id: None,
1201 action: None,
1202 body_length: 0,
1203 backend_invoked: None,
1204 })?;
1205 }
1206 SlotMeta::Process {
1207 file_str,
1208 derived_name,
1209 name_truncated,
1210 original_name,
1211 original_filename,
1212 } => {
1213 output::emit_json_compact(&IngestFileEvent {
1214 file: file_str,
1215 name: derived_name,
1216 status: "preview",
1217 truncated: *name_truncated,
1218 original_name: original_name.clone(),
1219 original_filename: original_filename.as_deref(),
1220 error: None,
1221 memory_id: None,
1222 action: None,
1223 body_length: 0,
1224 backend_invoked: None,
1225 })?;
1226 }
1227 }
1228 }
1229 output::emit_json_compact(&IngestSummary {
1230 summary: true,
1231 dir: args.dir.to_string_lossy().into_owned(),
1232 pattern: args.pattern.clone(),
1233 recursive: args.recursive,
1234 files_total: total,
1235 files_succeeded: 0,
1236 files_failed: 0,
1237 files_skipped: 0,
1238 elapsed_ms: started.elapsed().as_millis() as u64,
1239 })?;
1240 return Ok(());
1241 }
1242
1243 if args.low_memory {
1245 if let Some(n) = args.ingest_parallelism {
1246 if n > 1 {
1247 return Err(AppError::Validation(
1248 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1249 .to_string(),
1250 ));
1251 }
1252 }
1253 }
1254
1255 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1258
1259 let pool = rayon::ThreadPoolBuilder::new()
1260 .num_threads(parallelism)
1261 .build()
1262 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1263
1264 if args.enable_ner && args.skip_extraction {
1265 return Err(AppError::Validation(
1266 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1267 ));
1268 }
1269 if args.skip_extraction && !args.enable_ner {
1270 tracing::warn!(
1277 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1278 );
1279 }
1280 let enable_ner = args.enable_ner;
1281 let auto_describe = args.auto_describe && !args.no_auto_describe;
1282 let max_rss_mb = args.max_rss_mb;
1283 let llm_parallelism = args.llm_parallelism as usize;
1284 if args.mode == IngestMode::Gliner {
1288 tracing::warn!(
1289 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1290 );
1291 }
1292 if args.gliner_variant != "fp32" {
1293 tracing::warn!(
1294 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1295 );
1296 }
1297 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1298 "int8" => crate::extraction::GlinerVariant::Int8,
1299 _ => crate::extraction::GlinerVariant::Fp32,
1300 };
1301
1302 let total_to_process = process_items.len();
1303 tracing::info!(
1304 target: "ingest",
1305 phase = "pipeline_start",
1306 files = total_to_process,
1307 ingest_parallelism = parallelism,
1308 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1309 );
1310
1311 let channel_bound = (parallelism * 2).max(1);
1315 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1316
1317 let paths_owned = paths.clone();
1322 let llm_backend_owned = llm_backend;
1323 let producer_handle = std::thread::spawn(move || {
1324 pool.install(|| {
1325 process_items.into_par_iter().for_each(|item| {
1326 if crate::shutdown_requested() {
1327 return;
1328 }
1329 let t0 = std::time::Instant::now();
1330 let result = stage_file(
1331 item.idx,
1332 &item.path,
1333 &item.derived_name,
1334 &paths_owned,
1335 enable_ner,
1336 gliner_variant,
1337 max_rss_mb,
1338 llm_parallelism,
1339 llm_backend_owned,
1340 auto_describe,
1341 );
1342 let elapsed_ms = t0.elapsed().as_millis() as u64;
1343
1344 let (n_entities, n_relationships) = match &result {
1347 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1348 Err(_) => (0, 0),
1349 };
1350 let progress = StageProgressEvent {
1351 schema_version: 1,
1352 event: "file_extracted",
1353 path: &item.file_str,
1354 ms: elapsed_ms,
1355 entities: n_entities,
1356 relationships: n_relationships,
1357 };
1358 if let Ok(line) = serde_json::to_string(&progress) {
1359 tracing::info!(target: "ingest_progress", "{}", line);
1360 }
1361
1362 let _ = tx.send((item.idx, result));
1366 });
1367 drop(tx);
1369 });
1370 });
1371
1372 let fail_fast = args.fail_fast;
1384
1385 for meta in &slots_meta {
1387 if let SlotMeta::Skip {
1388 file_str,
1389 derived_base,
1390 name_truncated,
1391 original_name,
1392 original_filename,
1393 reason,
1394 } = meta
1395 {
1396 output::emit_json_compact(&IngestFileEvent {
1397 file: file_str,
1398 name: derived_base,
1399 status: "skipped",
1400 truncated: *name_truncated,
1401 original_name: original_name.clone(),
1402 original_filename: original_filename.as_deref(),
1403 error: Some(reason.clone()),
1404 memory_id: None,
1405 action: None,
1406 body_length: 0,
1407 backend_invoked: None,
1408 })?;
1409 skipped += 1;
1410 }
1411 }
1412
1413 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1416 .iter()
1417 .enumerate()
1418 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1419 .collect();
1420
1421 tracing::info!(
1422 target: "ingest",
1423 phase = "persist_start",
1424 files = total_to_process,
1425 "phase B starting: persisting files incrementally as Phase A completes each one",
1426 );
1427
1428 for (idx, stage_result) in rx {
1432 if crate::shutdown_requested() {
1433 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1434 break;
1435 }
1436 let meta = meta_index.get(&idx).ok_or_else(|| {
1437 AppError::Internal(anyhow::anyhow!(
1438 "channel idx {idx} has no corresponding Process slot"
1439 ))
1440 })?;
1441 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1442 {
1443 SlotMeta::Process {
1444 file_str,
1445 derived_name,
1446 name_truncated,
1447 original_name,
1448 original_filename,
1449 } => (
1450 file_str,
1451 derived_name,
1452 name_truncated,
1453 original_name,
1454 original_filename,
1455 ),
1456 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1457 };
1458
1459 let conn = match conn_or_err.as_mut() {
1461 Ok(c) => c,
1462 Err(err_msg) => {
1463 let err_clone = err_msg.clone();
1464 output::emit_json_compact(&IngestFileEvent {
1465 file: file_str,
1466 name: derived_name,
1467 status: "failed",
1468 truncated: *name_truncated,
1469 original_name: original_name.clone(),
1470 original_filename: original_filename.as_deref(),
1471 error: Some(err_clone.clone()),
1472 memory_id: None,
1473 action: None,
1474 body_length: 0,
1475 backend_invoked: None,
1476 })?;
1477 failed += 1;
1478 if fail_fast {
1479 output::emit_json_compact(&IngestSummary {
1480 summary: true,
1481 dir: args.dir.display().to_string(),
1482 pattern: args.pattern.clone(),
1483 recursive: args.recursive,
1484 files_total: total,
1485 files_succeeded: succeeded,
1486 files_failed: failed,
1487 files_skipped: skipped,
1488 elapsed_ms: started.elapsed().as_millis() as u64,
1489 })?;
1490 return Err(AppError::Validation(format!(
1491 "ingest aborted on first failure: {err_clone}"
1492 )));
1493 }
1494 continue;
1495 }
1496 };
1497
1498 let outcome =
1499 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1500
1501 match outcome {
1502 Ok(FileSuccess {
1503 memory_id,
1504 action,
1505 body_length,
1506 backend_invoked: file_backend_invoked,
1507 }) => {
1508 output::emit_json_compact(&IngestFileEvent {
1509 file: file_str,
1510 name: derived_name,
1511 status: "indexed",
1512 truncated: *name_truncated,
1513 original_name: original_name.clone(),
1514 original_filename: original_filename.as_deref(),
1515 error: None,
1516 memory_id: Some(memory_id),
1517 action: Some(action),
1518 body_length,
1519 backend_invoked: file_backend_invoked,
1520 })?;
1521 succeeded += 1;
1522 }
1523 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1524 output::emit_json_compact(&IngestFileEvent {
1525 file: file_str,
1526 name: derived_name,
1527 status: "skipped",
1528 truncated: *name_truncated,
1529 original_name: original_name.clone(),
1530 original_filename: original_filename.as_deref(),
1531 error: Some(format!("{e}")),
1532 memory_id: None,
1533 action: Some("duplicate".to_string()),
1534 body_length: 0,
1535 backend_invoked: None,
1536 })?;
1537 skipped += 1;
1538 }
1539 Err(e) => {
1540 let err_msg = format!("{e}");
1541 output::emit_json_compact(&IngestFileEvent {
1542 file: file_str,
1543 name: derived_name,
1544 status: "failed",
1545 truncated: *name_truncated,
1546 original_name: original_name.clone(),
1547 original_filename: original_filename.as_deref(),
1548 error: Some(err_msg.clone()),
1549 memory_id: None,
1550 action: None,
1551 body_length: 0,
1552 backend_invoked: None,
1553 })?;
1554 failed += 1;
1555 if fail_fast {
1556 output::emit_json_compact(&IngestSummary {
1557 summary: true,
1558 dir: args.dir.display().to_string(),
1559 pattern: args.pattern.clone(),
1560 recursive: args.recursive,
1561 files_total: total,
1562 files_succeeded: succeeded,
1563 files_failed: failed,
1564 files_skipped: skipped,
1565 elapsed_ms: started.elapsed().as_millis() as u64,
1566 })?;
1567 return Err(AppError::Validation(format!(
1568 "ingest aborted on first failure: {err_msg}"
1569 )));
1570 }
1571 }
1572 }
1573 }
1574
1575 producer_handle
1577 .join()
1578 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1579
1580 if let Ok(ref conn) = conn_or_err {
1581 if succeeded > 0 {
1582 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1583 }
1584 }
1585
1586 output::emit_json_compact(&IngestSummary {
1587 summary: true,
1588 dir: args.dir.display().to_string(),
1589 pattern: args.pattern.clone(),
1590 recursive: args.recursive,
1591 files_total: total,
1592 files_succeeded: succeeded,
1593 files_failed: failed,
1594 files_skipped: skipped,
1595 elapsed_ms: started.elapsed().as_millis() as u64,
1596 })?;
1597
1598 Ok(())
1599}
1600
1601fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1607 ensure_db_ready(paths)?;
1608 let conn = open_rw(&paths.db)?;
1609 Ok(conn)
1610}
1611
1612pub(crate) fn collect_files(
1613 dir: &Path,
1614 pattern: &str,
1615 recursive: bool,
1616 out: &mut Vec<PathBuf>,
1617) -> Result<(), AppError> {
1618 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1619 for entry in entries {
1620 let entry = entry.map_err(AppError::Io)?;
1621 let path = entry.path();
1622 let file_type = entry.file_type().map_err(AppError::Io)?;
1623 if file_type.is_file() {
1624 let name = entry.file_name();
1625 let name_str = name.to_string_lossy();
1626 if matches_pattern(&name_str, pattern) {
1627 out.push(path);
1628 }
1629 } else if file_type.is_dir() && recursive {
1630 collect_files(&path, pattern, recursive, out)?;
1631 }
1632 }
1633 Ok(())
1634}
1635
1636fn matches_pattern(name: &str, pattern: &str) -> bool {
1637 if let Some(suffix) = pattern.strip_prefix('*') {
1638 name.ends_with(suffix)
1639 } else if let Some(prefix) = pattern.strip_suffix('*') {
1640 name.starts_with(prefix)
1641 } else {
1642 name == pattern
1643 }
1644}
1645
1646pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1657 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1658 let lowered: String = stem
1659 .nfd()
1660 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1661 .map(|c| {
1662 if c == '_' || c.is_whitespace() {
1663 '-'
1664 } else {
1665 c
1666 }
1667 })
1668 .map(|c| c.to_ascii_lowercase())
1669 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1670 .collect();
1671 let collapsed = collapse_dashes(&lowered);
1672 let trimmed_raw = collapsed.trim_matches('-').to_string();
1673 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1675 format!("doc-{trimmed_raw}")
1676 } else {
1677 trimmed_raw
1678 };
1679 if trimmed.len() > max_len {
1680 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1681 tracing::debug!(
1682 target: "ingest",
1683 original = %trimmed,
1684 truncated_to = %truncated,
1685 max_len = max_len,
1686 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1687 );
1688 (truncated, true, Some(trimmed))
1689 } else {
1690 (trimmed, false, None)
1691 }
1692}
1693
1694fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1707 if !taken.contains(base) {
1708 return Ok(base.to_string());
1709 }
1710 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1711 let candidate = format!("{base}-{suffix}");
1712 if !taken.contains(&candidate) {
1713 tracing::warn!(
1714 target: "ingest",
1715 base = %base,
1716 resolved = %candidate,
1717 suffix,
1718 "memory name collision resolved with numeric suffix"
1719 );
1720 return Ok(candidate);
1721 }
1722 }
1723 Err(AppError::Validation(format!(
1724 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1725 )))
1726}
1727
1728fn collapse_dashes(s: &str) -> String {
1729 let mut out = String::with_capacity(s.len());
1730 let mut prev_dash = false;
1731 for c in s.chars() {
1732 if c == '-' {
1733 if !prev_dash {
1734 out.push('-');
1735 }
1736 prev_dash = true;
1737 } else {
1738 out.push(c);
1739 prev_dash = false;
1740 }
1741 }
1742 out
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748 use std::path::PathBuf;
1749
1750 #[test]
1751 fn matches_pattern_suffix() {
1752 assert!(matches_pattern("foo.md", "*.md"));
1753 assert!(!matches_pattern("foo.txt", "*.md"));
1754 assert!(matches_pattern("foo.md", "*"));
1755 }
1756
1757 #[test]
1758 fn matches_pattern_prefix() {
1759 assert!(matches_pattern("README.md", "README*"));
1760 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1761 }
1762
1763 #[test]
1764 fn matches_pattern_exact() {
1765 assert!(matches_pattern("README.md", "README.md"));
1766 assert!(!matches_pattern("readme.md", "README.md"));
1767 }
1768
1769 #[test]
1770 fn derive_kebab_underscore_to_dash() {
1771 let p = PathBuf::from("/tmp/claude_code_headless.md");
1772 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1773 assert_eq!(name, "claude-code-headless");
1774 assert!(!truncated);
1775 assert!(original.is_none());
1776 }
1777
1778 #[test]
1779 fn derive_kebab_uppercase_lowered() {
1780 let p = PathBuf::from("/tmp/README.md");
1781 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1782 assert_eq!(name, "readme");
1783 assert!(!truncated);
1784 assert!(original.is_none());
1785 }
1786
1787 #[test]
1788 fn derive_kebab_strips_non_kebab_chars() {
1789 let p = PathBuf::from("/tmp/some@weird#name!.md");
1790 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1791 assert_eq!(name, "someweirdname");
1792 assert!(!truncated);
1793 assert!(original.is_none());
1794 }
1795
1796 #[test]
1799 fn derive_kebab_folds_accented_letters_to_ascii() {
1800 let p = PathBuf::from("/tmp/açaí.md");
1801 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1802 assert_eq!(name, "acai", "got '{name}'");
1803 }
1804
1805 #[test]
1806 fn derive_kebab_handles_naive_with_diaeresis() {
1807 let p = PathBuf::from("/tmp/naïve-test.md");
1808 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1809 assert_eq!(name, "naive-test", "got '{name}'");
1810 }
1811
1812 #[test]
1813 fn derive_kebab_drops_emoji_keeps_word() {
1814 let p = PathBuf::from("/tmp/🚀-rocket.md");
1815 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1816 assert_eq!(name, "rocket", "got '{name}'");
1817 }
1818
1819 #[test]
1820 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1821 let p = PathBuf::from("/tmp/açaí🦜.md");
1822 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1823 assert_eq!(name, "acai", "got '{name}'");
1824 }
1825
1826 #[test]
1827 fn derive_kebab_pure_emoji_yields_empty() {
1828 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1829 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1830 assert!(name.is_empty(), "got '{name}'");
1831 }
1832
1833 #[test]
1834 fn derive_kebab_collapses_consecutive_dashes() {
1835 let p = PathBuf::from("/tmp/a__b___c.md");
1836 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1837 assert_eq!(name, "a-b-c");
1838 assert!(!truncated);
1839 assert!(original.is_none());
1840 }
1841
1842 #[test]
1843 fn derive_kebab_truncates_to_60_chars() {
1844 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1845 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1846 assert!(name.len() <= 60, "got len {}", name.len());
1847 assert!(truncated);
1848 assert!(original.is_some());
1849 assert!(original.unwrap().len() > 60);
1850 }
1851
1852 #[test]
1853 fn collect_files_finds_md_files() {
1854 let tmp = tempfile::tempdir().expect("tempdir");
1855 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1856 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1857 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1858 let mut out = Vec::new();
1859 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1860 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1861 }
1862
1863 #[test]
1864 fn collect_files_recursive_descends_subdirs() {
1865 let tmp = tempfile::tempdir().expect("tempdir");
1866 let sub = tmp.path().join("sub");
1867 std::fs::create_dir(&sub).unwrap();
1868 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1869 std::fs::write(sub.join("b.md"), "y").unwrap();
1870 let mut out = Vec::new();
1871 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1872 assert_eq!(out.len(), 2);
1873 }
1874
1875 #[test]
1876 fn collect_files_non_recursive_skips_subdirs() {
1877 let tmp = tempfile::tempdir().expect("tempdir");
1878 let sub = tmp.path().join("sub");
1879 std::fs::create_dir(&sub).unwrap();
1880 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1881 std::fs::write(sub.join("b.md"), "y").unwrap();
1882 let mut out = Vec::new();
1883 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1884 assert_eq!(out.len(), 1);
1885 }
1886
1887 #[test]
1890 fn derive_kebab_long_basename_truncated_within_cap() {
1891 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1892 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1893 assert!(
1894 name.len() <= DERIVED_NAME_MAX_LEN,
1895 "truncated name must respect cap; got {} chars",
1896 name.len()
1897 );
1898 assert!(!name.is_empty());
1899 assert!(truncated);
1900 assert!(original.is_some());
1901 }
1902
1903 #[test]
1904 fn unique_name_returns_base_when_free() {
1905 let taken: BTreeSet<String> = BTreeSet::new();
1906 let resolved = unique_name("note", &taken).expect("must resolve");
1907 assert_eq!(resolved, "note");
1908 }
1909
1910 #[test]
1911 fn unique_name_appends_first_free_suffix_on_collision() {
1912 let mut taken: BTreeSet<String> = BTreeSet::new();
1913 taken.insert("note".to_string());
1914 taken.insert("note-1".to_string());
1915 let resolved = unique_name("note", &taken).expect("must resolve");
1916 assert_eq!(resolved, "note-2");
1917 }
1918
1919 #[test]
1920 fn unique_name_errors_after_collision_cap() {
1921 let mut taken: BTreeSet<String> = BTreeSet::new();
1922 taken.insert("note".to_string());
1923 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1924 taken.insert(format!("note-{i}"));
1925 }
1926 let err = unique_name("note", &taken).expect_err("must surface error");
1927 assert!(matches!(err, AppError::Validation(_)));
1928 }
1929
1930 #[test]
1933 fn validate_relation_format_accepts_valid_relations() {
1934 use crate::parsers::{is_canonical_relation, validate_relation_format};
1935 assert!(validate_relation_format("applies_to").is_ok());
1936 assert!(validate_relation_format("depends_on").is_ok());
1937 assert!(validate_relation_format("implements").is_ok());
1938 assert!(validate_relation_format("").is_err());
1939 assert!(is_canonical_relation("applies_to"));
1940 assert!(!is_canonical_relation("implements"));
1941 }
1942
1943 use serial_test::serial;
1946
1947 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1949 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1950 let prev = std::env::var(key).ok();
1951 match value {
1952 Some(v) => std::env::set_var(key, v),
1953 None => std::env::remove_var(key),
1954 }
1955 f();
1956 match prev {
1957 Some(p) => std::env::set_var(key, p),
1958 None => std::env::remove_var(key),
1959 }
1960 }
1961
1962 #[test]
1963 #[serial]
1964 fn env_low_memory_enabled_unset_returns_false() {
1965 with_env_var(None, || assert!(!env_low_memory_enabled()));
1966 }
1967
1968 #[test]
1969 #[serial]
1970 fn env_low_memory_enabled_empty_returns_false() {
1971 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1972 }
1973
1974 #[test]
1975 #[serial]
1976 fn env_low_memory_enabled_truthy_values_return_true() {
1977 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1978 with_env_var(Some(v), || {
1979 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1980 });
1981 }
1982 }
1983
1984 #[test]
1985 #[serial]
1986 fn env_low_memory_enabled_falsy_values_return_false() {
1987 for v in ["0", "false", "FALSE", "no", "off"] {
1988 with_env_var(Some(v), || {
1989 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1990 });
1991 }
1992 }
1993
1994 #[test]
1995 #[serial]
1996 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1997 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1998 }
1999
2000 #[test]
2001 #[serial]
2002 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2003 with_env_var(None, || {
2004 assert_eq!(resolve_parallelism(true, Some(4)), 1);
2005 assert_eq!(resolve_parallelism(true, Some(8)), 1);
2006 assert_eq!(resolve_parallelism(true, None), 1);
2007 });
2008 }
2009
2010 #[test]
2011 #[serial]
2012 fn resolve_parallelism_env_forces_one_when_flag_off() {
2013 with_env_var(Some("1"), || {
2014 assert_eq!(resolve_parallelism(false, Some(4)), 1);
2015 assert_eq!(resolve_parallelism(false, None), 1);
2016 });
2017 }
2018
2019 #[test]
2020 #[serial]
2021 fn resolve_parallelism_falsy_env_does_not_override() {
2022 with_env_var(Some("0"), || {
2023 assert_eq!(resolve_parallelism(false, Some(4)), 4);
2024 });
2025 }
2026
2027 #[test]
2028 #[serial]
2029 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2030 with_env_var(None, || {
2031 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2032 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2033 });
2034 }
2035
2036 #[test]
2037 #[serial]
2038 fn resolve_parallelism_default_when_unset() {
2039 with_env_var(None, || {
2040 let p = resolve_parallelism(false, None);
2041 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2042 });
2043 }
2044
2045 #[test]
2046 fn ingest_args_parses_low_memory_flag_via_clap() {
2047 use clap::Parser;
2048 let cli = crate::cli::Cli::try_parse_from([
2051 "sqlite-graphrag",
2052 "ingest",
2053 "/tmp/dummy",
2054 "--type",
2055 "document",
2056 "--low-memory",
2057 ])
2058 .expect("parse must succeed");
2059 match cli.command {
2060 Some(crate::cli::Commands::Ingest(args)) => {
2061 assert!(args.low_memory, "--low-memory must set field to true");
2062 }
2063 _ => panic!("expected Ingest subcommand"),
2064 }
2065 }
2066
2067 #[test]
2068 fn ingest_args_low_memory_defaults_false() {
2069 use clap::Parser;
2070 let cli = crate::cli::Cli::try_parse_from([
2071 "sqlite-graphrag",
2072 "ingest",
2073 "/tmp/dummy",
2074 "--type",
2075 "document",
2076 ])
2077 .expect("parse must succeed");
2078 match cli.command {
2079 Some(crate::cli::Commands::Ingest(args)) => {
2080 assert!(!args.low_memory, "default must be false");
2081 }
2082 _ => panic!("expected Ingest subcommand"),
2083 }
2084 }
2085}