1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120
121 #[arg(
125 long,
126 default_value_t = true,
127 overrides_with = "no_auto_describe",
128 help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129 )]
130 pub auto_describe: bool,
131 #[arg(
132 long = "no-auto-describe",
133 default_value_t = false,
134 help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135 )]
136 pub no_auto_describe: bool,
137 #[arg(
138 long,
139 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140 default_value = "fp32",
141 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142 )]
143 pub gliner_variant: String,
144
145 #[arg(long, default_value_t = false, hide = true)]
147 pub skip_extraction: bool,
148
149 #[arg(long, default_value_t = false)]
151 pub fail_fast: bool,
152
153 #[arg(long, default_value_t = false)]
155 pub dry_run: bool,
156
157 #[arg(long, default_value_t = 10_000)]
159 pub max_files: usize,
160
161 #[arg(long)]
163 pub namespace: Option<String>,
164
165 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167 pub db: Option<String>,
168
169 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170 pub format: JsonOutputFormat,
171
172 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173 pub json: bool,
174
175 #[arg(
177 long,
178 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179 )]
180 pub ingest_parallelism: Option<usize>,
181
182 #[arg(
190 long,
191 default_value_t = false,
192 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193 Recommended for environments with <4 GB available RAM or container/cgroup \
194 constraints. Trade-off: 3-4x longer wall time. Also honored via \
195 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196 )]
197 pub low_memory: bool,
198
199 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202 pub max_rss_mb: u64,
203
204 #[arg(long, default_value_t = 2, value_name = "N",
209 value_parser = clap::value_parser!(u64).range(1..=32),
210 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211 pub llm_parallelism: u64,
212
213 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218 help = "Maximum length for derived memory names (default: 60)")]
219 pub max_name_length: usize,
220
221 #[arg(long, value_enum, default_value_t = IngestMode::None)]
223 pub mode: IngestMode,
224
225 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227 pub claude_binary: Option<std::path::PathBuf>,
228
229 #[arg(long)]
231 pub claude_model: Option<String>,
232
233 #[arg(long, default_value_t = false)]
235 pub resume: bool,
236
237 #[arg(long, default_value_t = false)]
239 pub retry_failed: bool,
240
241 #[arg(long, default_value_t = false)]
243 pub keep_queue: bool,
244
245 #[arg(long, default_value = ".ingest-queue.sqlite")]
247 pub queue_db: String,
248
249 #[arg(long, default_value_t = 60)]
251 pub rate_limit_wait: u64,
252
253 #[arg(long)]
255 pub max_cost_usd: Option<f64>,
256
257 #[arg(
259 long,
260 default_value_t = 300,
261 help = "Timeout in seconds for each claude -p invocation (default: 300)"
262 )]
263 pub claude_timeout: u64,
264
265 #[arg(
267 long,
268 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270 )]
271 pub codex_binary: Option<PathBuf>,
272
273 #[arg(
275 long,
276 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277 )]
278 pub codex_model: Option<String>,
279
280 #[arg(
282 long,
283 default_value_t = 300,
284 help = "Timeout in seconds for each codex exec invocation (default: 300)"
285 )]
286 pub codex_timeout: u64,
287
288 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290 pub opencode_binary: Option<PathBuf>,
291
292 #[arg(
294 long,
295 value_name = "MODEL",
296 env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297 help = "Model override for OpenCode extraction"
298 )]
299 pub opencode_model: Option<String>,
300
301 #[arg(
303 long,
304 value_name = "SECONDS",
305 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306 default_value_t = 300,
307 help = "Timeout in seconds for each opencode run invocation (default: 300)"
308 )]
309 pub opencode_timeout: u64,
310
311 #[arg(long, value_name = "SECONDS")]
314 pub wait_job_singleton: Option<u64>,
315
316 #[arg(long, default_value_t = false)]
319 pub force_job_singleton: bool,
320}
321
322#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
324pub enum IngestMode {
325 None,
327 Gliner,
329 ClaudeCode,
331 Codex,
333 #[value(name = "opencode")]
335 Opencode,
336}
337
338fn env_low_memory_enabled() -> bool {
343 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
344 Ok(v) if v.is_empty() => false,
345 Ok(v) => match v.to_lowercase().as_str() {
346 "1" | "true" | "yes" | "on" => true,
347 "0" | "false" | "no" | "off" => false,
348 other => {
349 tracing::warn!(
350 target: "ingest",
351 value = %other,
352 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
353 );
354 false
355 }
356 },
357 Err(_) => false,
358 }
359}
360
361fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
373 let env_flag = env_low_memory_enabled();
374 let low_memory = low_memory_flag || env_flag;
375
376 if low_memory {
377 if let Some(n) = ingest_parallelism {
378 if n > 1 {
379 tracing::warn!(
380 target: "ingest",
381 requested = n,
382 "--ingest-parallelism overridden by --low-memory; using 1"
383 );
384 }
385 }
386 if low_memory_flag {
387 tracing::info!(
388 target: "ingest",
389 source = "flag",
390 "low-memory mode enabled: forcing --ingest-parallelism 1"
391 );
392 } else {
393 tracing::info!(
394 target: "ingest",
395 source = "env",
396 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
397 );
398 }
399 return 1;
400 }
401
402 ingest_parallelism
403 .unwrap_or_else(|| {
404 std::thread::available_parallelism()
405 .map(|v| v.get() / 2)
406 .unwrap_or(1)
407 .clamp(1, 4)
408 })
409 .max(1)
410}
411
412#[derive(Serialize)]
413struct IngestFileEvent<'a> {
414 file: &'a str,
415 name: &'a str,
416 status: &'a str,
417 truncated: bool,
419 #[serde(skip_serializing_if = "Option::is_none")]
421 original_name: Option<String>,
422 #[serde(skip_serializing_if = "Option::is_none")]
424 original_filename: Option<&'a str>,
425 #[serde(skip_serializing_if = "Option::is_none")]
426 error: Option<String>,
427 #[serde(skip_serializing_if = "Option::is_none")]
428 memory_id: Option<i64>,
429 #[serde(skip_serializing_if = "Option::is_none")]
430 action: Option<String>,
431 body_length: usize,
433 #[serde(skip_serializing_if = "Option::is_none")]
438 backend_invoked: Option<&'a str>,
439}
440
441#[derive(Serialize)]
442struct IngestSummary {
443 summary: bool,
444 dir: String,
445 pattern: String,
446 recursive: bool,
447 files_total: usize,
448 files_succeeded: usize,
449 files_failed: usize,
450 files_skipped: usize,
451 elapsed_ms: u64,
452}
453
454struct FileSuccess {
456 memory_id: i64,
457 action: String,
458 body_length: usize,
459 backend_invoked: Option<&'static str>,
460}
461
462#[derive(Serialize)]
465struct StageProgressEvent<'a> {
466 schema_version: u8,
467 event: &'a str,
468 path: &'a str,
469 ms: u64,
470 entities: usize,
471 relationships: usize,
472}
473
474struct StagedFile {
477 body: String,
478 body_hash: String,
479 snippet: String,
480 name: String,
481 description: String,
482 embedding: Option<Vec<f32>>,
483 chunk_embeddings: Option<Vec<Vec<f32>>>,
484 chunks_info: Vec<crate::chunking::Chunk>,
485 entities: Vec<NewEntity>,
486 relationships: Vec<NewRelationship>,
487 entity_embeddings: Option<Vec<Vec<f32>>>,
488 urls: Vec<crate::extraction::ExtractedUrl>,
489 backend_invoked: Option<&'static str>,
494}
495
496#[allow(clippy::too_many_arguments)]
502fn stage_file(
503 _idx: usize,
504 path: &Path,
505 name: &str,
506 paths: &AppPaths,
507 enable_ner: bool,
508 gliner_variant: crate::extraction::GlinerVariant,
509 max_rss_mb: u64,
510 llm_parallelism: usize,
511 llm_backend: crate::cli::LlmBackendChoice,
512 auto_describe: bool,
513) -> Result<StagedFile, AppError> {
514 use crate::constants::*;
515
516 if name.len() > MAX_MEMORY_NAME_LEN {
517 return Err(AppError::LimitExceeded(
518 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
519 ));
520 }
521 if name.starts_with("__") {
522 return Err(AppError::Validation(
523 crate::i18n::validation::reserved_name(),
524 ));
525 }
526 {
527 let slug_re = crate::constants::name_slug_regex();
528 if !slug_re.is_match(name) {
529 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
530 name,
531 )));
532 }
533 }
534
535 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
536 if file_size > MAX_MEMORY_BODY_LEN as u64 {
537 return Err(AppError::LimitExceeded(
538 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
539 ));
540 }
541 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
542 if raw_body.len() > MAX_MEMORY_BODY_LEN {
543 return Err(AppError::LimitExceeded(
544 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
545 ));
546 }
547 if raw_body.trim().is_empty() {
548 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
549 }
550
551 let description = if auto_describe {
552 crate::commands::ingest_heuristics::extract_heuristic_description(
553 &raw_body,
554 Some(&path.display().to_string()),
555 )
556 } else {
557 format!("ingested from {}", path.display())
558 };
559 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
560 return Err(AppError::Validation(
561 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
562 ));
563 }
564
565 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
566 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
567 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
568 if enable_ner {
569 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
570 Ok(extracted) => {
571 extracted_urls = extracted.urls;
572 extracted_entities = extracted
577 .entities
578 .into_iter()
579 .map(|e| NewEntity {
580 name: e.name,
581 entity_type: crate::entity_type::EntityType::Concept,
582 description: None,
583 })
584 .collect();
585 extracted_relationships.clear();
590
591 if extracted_entities.len() > max_entities_per_memory() {
592 extracted_entities.truncate(max_entities_per_memory());
593 }
594 if extracted_relationships.len() > max_relationships_per_memory() {
595 extracted_relationships.truncate(max_relationships_per_memory());
596 }
597 }
598 Err(e) => {
599 tracing::warn!(
600 target: "ingest",
601 file = %path.display(),
602 "auto-extraction failed (graceful degradation): {e:#}"
603 );
604 }
605 }
606 }
607
608 for rel in &mut extracted_relationships {
609 rel.relation = crate::parsers::normalize_relation(&rel.relation);
610 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
611 return Err(AppError::Validation(format!(
612 "{e} for relationship '{}' -> '{}'",
613 rel.source, rel.target
614 )));
615 }
616 crate::parsers::warn_if_non_canonical(&rel.relation);
617 if !(0.0..=1.0).contains(&rel.strength) {
618 return Err(AppError::Validation(format!(
619 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
620 rel.strength, rel.source, rel.target
621 )));
622 }
623 }
624
625 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
626 let snippet: String = raw_body.chars().take(200).collect();
627
628 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
629 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
630 return Err(AppError::LimitExceeded(format!(
631 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
632 chunks_info.len(),
633 REMEMBER_MAX_SAFE_MULTI_CHUNKS
634 )));
635 }
636
637 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
638 let skip_embed = crate::embedder::should_skip_embedding_on_failure();
639 let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
643 .len()
644 == 1
645 {
646 match crate::embedder::embed_passage_with_choice(
648 &paths.models,
649 &raw_body,
650 Some(llm_backend),
651 ) {
652 Ok((v, k)) => (Some(v), Some(k.as_str())),
653 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
654 Err(e) if skip_embed => {
655 tracing::warn!(error = %e, file = %path.display(), "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
656 (None, None)
657 }
658 Err(e) => return Err(e),
659 }
660 } else {
661 let chunk_texts: Vec<String> = chunks_info
664 .iter()
665 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
666 .collect();
667 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
668 if rss > max_rss_mb {
669 tracing::error!(
670 target: "ingest",
671 rss_mb = rss,
672 max_rss_mb = max_rss_mb,
673 file = %path.display(),
674 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
675 );
676 return Err(AppError::LowMemory {
677 available_mb: crate::memory_guard::available_memory_mb(),
678 required_mb: max_rss_mb,
679 });
680 }
681 }
682 match crate::embedder::embed_passages_parallel_local(
683 &paths.models,
684 &chunk_texts,
685 llm_parallelism,
686 crate::embedder::chunk_embed_batch_size(),
687 ) {
688 Ok(chunk_embeddings) => {
689 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
690 chunk_embeddings_opt = Some(chunk_embeddings);
691 (Some(aggregated), None)
694 }
695 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
696 Err(e) if skip_embed => {
697 tracing::warn!(error = %e, file = %path.display(), "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
698 (None, None)
699 }
700 Err(e) => return Err(e),
701 }
702 };
703
704 let entity_texts: Vec<String> = extracted_entities
706 .iter()
707 .map(|entity| match &entity.description {
708 Some(desc) => format!("{} {}", entity.name, desc),
709 None => entity.name.clone(),
710 })
711 .collect();
712 let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
716 &paths.models,
717 &entity_texts,
718 llm_parallelism,
719 ) {
720 Ok((entity_embeddings, embed_cache_stats)) => {
721 if embed_cache_stats.hits > 0 {
722 tracing::debug!(
723 hits = embed_cache_stats.hits,
724 misses = embed_cache_stats.misses,
725 requested = embed_cache_stats.requested,
726 "G56: entity embed cache hit (ingest)"
727 );
728 }
729 Some(entity_embeddings)
730 }
731 Err(e) if skip_embed => {
732 tracing::warn!(error = %e, file = %path.display(), "ingest: entity embedding failed; --skip-embedding-on-failure active");
733 None
734 }
735 Err(e) => return Err(e),
736 };
737
738 Ok(StagedFile {
739 body: raw_body,
740 body_hash,
741 snippet,
742 name: name.to_string(),
743 description,
744 embedding,
745 chunk_embeddings: chunk_embeddings_opt,
746 chunks_info,
747 entities: extracted_entities,
748 relationships: extracted_relationships,
749 entity_embeddings: entity_embeddings_opt,
750 urls: extracted_urls,
751 backend_invoked,
752 })
753}
754
755fn persist_staged(
757 conn: &mut Connection,
758 namespace: &str,
759 memory_type: &str,
760 staged: StagedFile,
761) -> Result<FileSuccess, AppError> {
762 {
763 let active_count: u32 = conn.query_row(
764 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
765 [],
766 |r| r.get::<_, i64>(0).map(|v| v as u32),
767 )?;
768 let ns_exists: bool = conn.query_row(
769 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
770 rusqlite::params![namespace],
771 |r| r.get::<_, i64>(0).map(|v| v > 0),
772 )?;
773 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
774 return Err(AppError::NamespaceError(format!(
775 "active namespace limit of {} exceeded while creating '{namespace}'",
776 crate::constants::MAX_NAMESPACES_ACTIVE
777 )));
778 }
779 }
780
781 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
782 if existing_memory.is_some() {
783 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
784 &staged.name,
785 namespace,
786 )));
787 }
788 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
789
790 let new_memory = NewMemory {
791 namespace: namespace.to_string(),
792 name: staged.name.clone(),
793 memory_type: memory_type.to_string(),
794 description: staged.description.clone(),
795 body: staged.body,
796 body_hash: staged.body_hash,
797 session_id: None,
798 source: "agent".to_string(),
799 metadata: serde_json::json!({}),
800 };
801
802 if let Some(hash_id) = duplicate_hash_id {
803 tracing::debug!(
804 target: "ingest",
805 duplicate_memory_id = hash_id,
806 "identical body already exists; persisting a new memory anyway"
807 );
808 }
809
810 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
811
812 let memory_id = memories::insert(&tx, &new_memory)?;
813 versions::insert_version(
814 &tx,
815 memory_id,
816 1,
817 &staged.name,
818 memory_type,
819 &staged.description,
820 &new_memory.body,
821 &serde_json::to_string(&new_memory.metadata)?,
822 None,
823 "create",
824 )?;
825 if let Some(ref emb) = staged.embedding {
826 memories::upsert_vec(
827 &tx,
828 memory_id,
829 namespace,
830 memory_type,
831 emb,
832 &staged.name,
833 &staged.snippet,
834 )?;
835 }
836
837 if staged.chunks_info.len() > 1 {
838 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
839 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
840 for (i, emb) in chunk_embeddings.iter().enumerate() {
841 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
842 }
843 }
844 }
845
846 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
847 for (idx, entity) in staged.entities.iter().enumerate() {
848 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
849 if let Some(ref entity_embeddings) = staged.entity_embeddings {
850 if let Some(entity_embedding) = entity_embeddings.get(idx) {
851 entities::upsert_entity_vec(
852 &tx,
853 entity_id,
854 namespace,
855 entity.entity_type,
856 entity_embedding,
857 &entity.name,
858 )?;
859 }
860 }
861 entities::link_memory_entity(&tx, memory_id, entity_id)?;
862 entities::increment_degree(&tx, entity_id)?;
863 }
864 let entity_types: std::collections::HashMap<&str, EntityType> = staged
865 .entities
866 .iter()
867 .map(|entity| (entity.name.as_str(), entity.entity_type))
868 .collect();
869 for rel in &staged.relationships {
870 let source_entity = NewEntity {
871 name: rel.source.clone(),
872 entity_type: entity_types
873 .get(rel.source.as_str())
874 .copied()
875 .unwrap_or(EntityType::Concept),
876 description: None,
877 };
878 let target_entity = NewEntity {
879 name: rel.target.clone(),
880 entity_type: entity_types
881 .get(rel.target.as_str())
882 .copied()
883 .unwrap_or(EntityType::Concept),
884 description: None,
885 };
886 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
887 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
888 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
889 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
890 }
891 }
892
893 tx.commit()?;
894
895 if !staged.urls.is_empty() {
896 let url_entries: Vec<storage_urls::MemoryUrl> = staged
897 .urls
898 .into_iter()
899 .map(|u| storage_urls::MemoryUrl {
900 url: u.url,
901 offset: Some(u.start as i64),
902 })
903 .collect();
904 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
905 }
906
907 Ok(FileSuccess {
908 memory_id,
909 action: "created".to_string(),
910 body_length: new_memory.body.len(),
911 backend_invoked: staged.backend_invoked,
912 })
913}
914
915fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
923 value == default
924}
925
926fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
941 const DEFAULT_TIMEOUT: u64 = 300;
942 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
943
944 let mut conflicts: Vec<String> = Vec::new();
945
946 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
947
948 if is_local_mode {
949 if args.claude_binary.is_some() {
950 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
951 }
952 if args.claude_model.is_some() {
953 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
954 }
955 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
956 conflicts.push(format!(
957 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
958 args.claude_timeout
959 ));
960 }
961 if args.codex_binary.is_some() {
962 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
963 }
964 if args.codex_model.is_some() {
965 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
966 }
967 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
968 conflicts.push(format!(
969 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
970 args.codex_timeout
971 ));
972 }
973 if args.opencode_binary.is_some() {
974 conflicts
975 .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
976 }
977 if args.opencode_model.is_some() {
978 conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
979 }
980 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
981 conflicts.push(format!(
982 "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
983 args.opencode_timeout
984 ));
985 }
986 if args.max_cost_usd.is_some() {
987 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
988 }
989 if args.resume {
990 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
991 }
992 if args.retry_failed {
993 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
994 }
995 if args.keep_queue {
996 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
997 }
998 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
999 conflicts.push(format!(
1000 "--rate-limit-wait={} is ignored when --mode is none or gliner",
1001 args.rate_limit_wait
1002 ));
1003 }
1004 }
1005
1006 match args.mode {
1007 IngestMode::ClaudeCode => {
1008 if args.codex_binary.is_some() {
1009 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1010 }
1011 if args.codex_model.is_some() {
1012 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1013 }
1014 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1015 conflicts.push(format!(
1016 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1017 args.codex_timeout
1018 ));
1019 }
1020 if args.opencode_binary.is_some() {
1021 conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1022 }
1023 if args.opencode_model.is_some() {
1024 conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1025 }
1026 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1027 conflicts.push(format!(
1028 "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1029 args.opencode_timeout
1030 ));
1031 }
1032 }
1033 IngestMode::Codex => {
1034 if args.claude_binary.is_some() {
1035 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1036 }
1037 if args.claude_model.is_some() {
1038 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1039 }
1040 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1041 conflicts.push(format!(
1042 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1043 args.claude_timeout
1044 ));
1045 }
1046 if args.max_cost_usd.is_some() {
1047 conflicts.push(
1048 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1049 .to_string(),
1050 );
1051 }
1052 if args.resume {
1053 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1054 }
1055 if args.retry_failed {
1056 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1057 }
1058 if args.keep_queue {
1059 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1060 }
1061 if args.opencode_binary.is_some() {
1062 conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1063 }
1064 if args.opencode_model.is_some() {
1065 conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1066 }
1067 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1068 conflicts.push(format!(
1069 "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1070 args.opencode_timeout
1071 ));
1072 }
1073 }
1074 IngestMode::Opencode => {
1075 if args.claude_binary.is_some() {
1076 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1077 }
1078 if args.claude_model.is_some() {
1079 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1080 }
1081 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1082 conflicts.push(format!(
1083 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1084 args.claude_timeout
1085 ));
1086 }
1087 if args.codex_binary.is_some() {
1088 conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1089 }
1090 if args.codex_model.is_some() {
1091 conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1092 }
1093 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1094 conflicts.push(format!(
1095 "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1096 args.codex_timeout
1097 ));
1098 }
1099 if args.max_cost_usd.is_some() {
1100 conflicts.push(
1101 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1102 .to_string(),
1103 );
1104 }
1105 if args.resume {
1106 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1107 }
1108 if args.retry_failed {
1109 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1110 }
1111 if args.keep_queue {
1112 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1113 }
1114 }
1115 IngestMode::None | IngestMode::Gliner => {}
1116 }
1117
1118 if !conflicts.is_empty() {
1119 return Err(AppError::Validation(format!(
1120 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
1121 args.mode,
1122 conflicts.join("\n - ")
1123 )));
1124 }
1125
1126 Ok(())
1127}
1128
1129#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1132pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1133 validate_mode_conditional_flags_ingest(&args)?;
1136 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1137 if args.mode == IngestMode::ClaudeCode {
1138 return super::ingest_claude::run_claude_ingest(&args);
1139 }
1140 if args.mode == IngestMode::Codex {
1141 return super::ingest_codex::run_codex_ingest(&args);
1142 }
1143 if args.mode == IngestMode::Opencode {
1144 return super::ingest_opencode::run_opencode_ingest(&args);
1145 }
1146
1147 let started = std::time::Instant::now();
1148
1149 if !args.dir.exists() {
1150 return Err(AppError::Validation(format!(
1151 "directory not found: {}",
1152 args.dir.display()
1153 )));
1154 }
1155 if !args.dir.is_dir() {
1156 return Err(AppError::Validation(format!(
1157 "path is not a directory: {}",
1158 args.dir.display()
1159 )));
1160 }
1161
1162 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1163 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1164 files.sort_unstable();
1165
1166 if files.len() > args.max_files {
1167 return Err(AppError::Validation(format!(
1168 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1169 files.len(),
1170 args.max_files
1171 )));
1172 }
1173
1174 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1175 let memory_type_str = args.r#type.as_str().to_string();
1176
1177 let paths = AppPaths::resolve(args.db.as_deref())?;
1178 let mut conn_or_err = match init_storage(&paths) {
1179 Ok(c) => Ok(c),
1180 Err(e) => Err(format!("{e}")),
1181 };
1182
1183 let mut succeeded: usize = 0;
1184 let mut failed: usize = 0;
1185 let mut skipped: usize = 0;
1186 let total = files.len();
1187
1188 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1191
1192 enum SlotMeta {
1198 Skip {
1199 file_str: String,
1200 derived_base: String,
1201 name_truncated: bool,
1202 original_name: Option<String>,
1203 original_filename: Option<String>,
1204 reason: String,
1205 },
1206 Process {
1207 file_str: String,
1208 derived_name: String,
1209 name_truncated: bool,
1210 original_name: Option<String>,
1211 original_filename: Option<String>,
1212 },
1213 }
1214
1215 struct ProcessItem {
1216 idx: usize,
1217 path: PathBuf,
1218 file_str: String,
1219 derived_name: String,
1220 }
1221
1222 let files_cap = files.len();
1223 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1224 slots_meta.try_reserve(files_cap).map_err(|_| {
1225 AppError::LimitExceeded(format!(
1226 "allocation of {files_cap} slot metadata entries would exceed available memory"
1227 ))
1228 })?;
1229 let mut process_items: Vec<ProcessItem> = Vec::new();
1230 process_items.try_reserve(files_cap).map_err(|_| {
1231 AppError::LimitExceeded(format!(
1232 "allocation of {files_cap} process items would exceed available memory"
1233 ))
1234 })?;
1235 let mut truncations: Vec<(String, String)> = Vec::new();
1236 truncations.try_reserve(files_cap).map_err(|_| {
1237 AppError::LimitExceeded(format!(
1238 "allocation of {files_cap} truncation entries would exceed available memory"
1239 ))
1240 })?;
1241
1242 let max_name_length = args.max_name_length;
1243 for path in &files {
1244 let file_str = path.to_string_lossy().into_owned();
1245 let (derived_base, name_truncated, original_name) =
1246 derive_kebab_name(path, max_name_length);
1247 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1248
1249 if name_truncated {
1250 if let Some(ref orig) = original_name {
1251 truncations.push((orig.clone(), derived_base.clone()));
1252 }
1253 }
1254
1255 if derived_base.is_empty() {
1256 let orig_filename = if !original_basename.is_empty() {
1258 Some(original_basename.to_string())
1259 } else {
1260 None
1261 };
1262 slots_meta.push(SlotMeta::Skip {
1263 file_str,
1264 derived_base: String::new(),
1265 name_truncated: false,
1266 original_name: None,
1267 original_filename: orig_filename,
1268 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1269 });
1270 continue;
1271 }
1272
1273 match unique_name(&derived_base, &taken_names) {
1274 Ok(derived_name) => {
1275 taken_names.insert(derived_name.clone());
1276 let idx = slots_meta.len();
1277 let orig_filename = if original_basename != derived_name {
1279 Some(original_basename.to_string())
1280 } else {
1281 None
1282 };
1283 process_items.push(ProcessItem {
1284 idx,
1285 path: path.clone(),
1286 file_str: file_str.clone(),
1287 derived_name: derived_name.clone(),
1288 });
1289 slots_meta.push(SlotMeta::Process {
1290 file_str,
1291 derived_name,
1292 name_truncated,
1293 original_name,
1294 original_filename: orig_filename,
1295 });
1296 }
1297 Err(e) => {
1298 let orig_filename = if original_basename != derived_base {
1299 Some(original_basename.to_string())
1300 } else {
1301 None
1302 };
1303 slots_meta.push(SlotMeta::Skip {
1304 file_str,
1305 derived_base,
1306 name_truncated,
1307 original_name,
1308 original_filename: orig_filename,
1309 reason: e.to_string(),
1310 });
1311 }
1312 }
1313 }
1314
1315 if !truncations.is_empty() {
1316 tracing::info!(
1317 target: "ingest",
1318 count = truncations.len(),
1319 max_name_length = max_name_length,
1320 max_len = DERIVED_NAME_MAX_LEN,
1321 "derived names truncated; pass -vv (debug) for per-file detail"
1322 );
1323 }
1324
1325 if args.dry_run {
1327 for meta in &slots_meta {
1328 match meta {
1329 SlotMeta::Skip {
1330 file_str,
1331 derived_base,
1332 name_truncated,
1333 original_name,
1334 original_filename,
1335 reason,
1336 } => {
1337 output::emit_json_compact(&IngestFileEvent {
1338 file: file_str,
1339 name: derived_base,
1340 status: "skip",
1341 truncated: *name_truncated,
1342 original_name: original_name.clone(),
1343 original_filename: original_filename.as_deref(),
1344 error: Some(reason.clone()),
1345 memory_id: None,
1346 action: None,
1347 body_length: 0,
1348 backend_invoked: None,
1349 })?;
1350 }
1351 SlotMeta::Process {
1352 file_str,
1353 derived_name,
1354 name_truncated,
1355 original_name,
1356 original_filename,
1357 } => {
1358 output::emit_json_compact(&IngestFileEvent {
1359 file: file_str,
1360 name: derived_name,
1361 status: "preview",
1362 truncated: *name_truncated,
1363 original_name: original_name.clone(),
1364 original_filename: original_filename.as_deref(),
1365 error: None,
1366 memory_id: None,
1367 action: None,
1368 body_length: 0,
1369 backend_invoked: None,
1370 })?;
1371 }
1372 }
1373 }
1374 output::emit_json_compact(&IngestSummary {
1375 summary: true,
1376 dir: args.dir.to_string_lossy().into_owned(),
1377 pattern: args.pattern.clone(),
1378 recursive: args.recursive,
1379 files_total: total,
1380 files_succeeded: 0,
1381 files_failed: 0,
1382 files_skipped: 0,
1383 elapsed_ms: started.elapsed().as_millis() as u64,
1384 })?;
1385 return Ok(());
1386 }
1387
1388 if args.low_memory {
1390 if let Some(n) = args.ingest_parallelism {
1391 if n > 1 {
1392 return Err(AppError::Validation(
1393 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1394 .to_string(),
1395 ));
1396 }
1397 }
1398 }
1399
1400 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1403
1404 let pool = rayon::ThreadPoolBuilder::new()
1405 .num_threads(parallelism)
1406 .build()
1407 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1408
1409 if args.enable_ner && args.skip_extraction {
1410 return Err(AppError::Validation(
1411 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1412 ));
1413 }
1414 if args.skip_extraction && !args.enable_ner {
1415 tracing::warn!(
1422 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1423 );
1424 }
1425 let enable_ner = args.enable_ner;
1426 let auto_describe = args.auto_describe && !args.no_auto_describe;
1427 let max_rss_mb = args.max_rss_mb;
1428 let llm_parallelism = args.llm_parallelism as usize;
1429 if args.mode == IngestMode::Gliner {
1433 tracing::warn!(
1434 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1435 );
1436 }
1437 if args.gliner_variant != "fp32" {
1438 tracing::warn!(
1439 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1440 );
1441 }
1442 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1443 "int8" => crate::extraction::GlinerVariant::Int8,
1444 _ => crate::extraction::GlinerVariant::Fp32,
1445 };
1446
1447 let total_to_process = process_items.len();
1448 tracing::info!(
1449 target: "ingest",
1450 phase = "pipeline_start",
1451 files = total_to_process,
1452 ingest_parallelism = parallelism,
1453 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1454 );
1455
1456 let channel_bound = (parallelism * 2).max(1);
1460 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1461
1462 let paths_owned = paths.clone();
1467 let llm_backend_owned = llm_backend;
1468 let producer_handle = std::thread::spawn(move || {
1469 pool.install(|| {
1470 process_items.into_par_iter().for_each(|item| {
1471 if crate::shutdown_requested() {
1472 return;
1473 }
1474 let t0 = std::time::Instant::now();
1475 let result = stage_file(
1476 item.idx,
1477 &item.path,
1478 &item.derived_name,
1479 &paths_owned,
1480 enable_ner,
1481 gliner_variant,
1482 max_rss_mb,
1483 llm_parallelism,
1484 llm_backend_owned,
1485 auto_describe,
1486 );
1487 let elapsed_ms = t0.elapsed().as_millis() as u64;
1488
1489 let (n_entities, n_relationships) = match &result {
1492 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1493 Err(_) => (0, 0),
1494 };
1495 let progress = StageProgressEvent {
1496 schema_version: 1,
1497 event: "file_extracted",
1498 path: &item.file_str,
1499 ms: elapsed_ms,
1500 entities: n_entities,
1501 relationships: n_relationships,
1502 };
1503 if let Ok(line) = serde_json::to_string(&progress) {
1504 tracing::info!(target: "ingest_progress", "{}", line);
1505 }
1506
1507 let _ = tx.send((item.idx, result));
1511 });
1512 drop(tx);
1514 });
1515 });
1516
1517 let fail_fast = args.fail_fast;
1529
1530 for meta in &slots_meta {
1532 if let SlotMeta::Skip {
1533 file_str,
1534 derived_base,
1535 name_truncated,
1536 original_name,
1537 original_filename,
1538 reason,
1539 } = meta
1540 {
1541 output::emit_json_compact(&IngestFileEvent {
1542 file: file_str,
1543 name: derived_base,
1544 status: "skipped",
1545 truncated: *name_truncated,
1546 original_name: original_name.clone(),
1547 original_filename: original_filename.as_deref(),
1548 error: Some(reason.clone()),
1549 memory_id: None,
1550 action: None,
1551 body_length: 0,
1552 backend_invoked: None,
1553 })?;
1554 skipped += 1;
1555 }
1556 }
1557
1558 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1561 .iter()
1562 .enumerate()
1563 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1564 .collect();
1565
1566 tracing::info!(
1567 target: "ingest",
1568 phase = "persist_start",
1569 files = total_to_process,
1570 "phase B starting: persisting files incrementally as Phase A completes each one",
1571 );
1572
1573 for (idx, stage_result) in rx {
1577 if crate::shutdown_requested() {
1578 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1579 break;
1580 }
1581 let meta = meta_index.get(&idx).ok_or_else(|| {
1582 AppError::Internal(anyhow::anyhow!(
1583 "channel idx {idx} has no corresponding Process slot"
1584 ))
1585 })?;
1586 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1587 {
1588 SlotMeta::Process {
1589 file_str,
1590 derived_name,
1591 name_truncated,
1592 original_name,
1593 original_filename,
1594 } => (
1595 file_str,
1596 derived_name,
1597 name_truncated,
1598 original_name,
1599 original_filename,
1600 ),
1601 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1602 };
1603
1604 let conn = match conn_or_err.as_mut() {
1606 Ok(c) => c,
1607 Err(err_msg) => {
1608 let err_clone = err_msg.clone();
1609 output::emit_json_compact(&IngestFileEvent {
1610 file: file_str,
1611 name: derived_name,
1612 status: "failed",
1613 truncated: *name_truncated,
1614 original_name: original_name.clone(),
1615 original_filename: original_filename.as_deref(),
1616 error: Some(err_clone.clone()),
1617 memory_id: None,
1618 action: None,
1619 body_length: 0,
1620 backend_invoked: None,
1621 })?;
1622 failed += 1;
1623 if fail_fast {
1624 output::emit_json_compact(&IngestSummary {
1625 summary: true,
1626 dir: args.dir.display().to_string(),
1627 pattern: args.pattern.clone(),
1628 recursive: args.recursive,
1629 files_total: total,
1630 files_succeeded: succeeded,
1631 files_failed: failed,
1632 files_skipped: skipped,
1633 elapsed_ms: started.elapsed().as_millis() as u64,
1634 })?;
1635 return Err(AppError::Validation(format!(
1636 "ingest aborted on first failure: {err_clone}"
1637 )));
1638 }
1639 continue;
1640 }
1641 };
1642
1643 let outcome =
1644 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1645
1646 match outcome {
1647 Ok(FileSuccess {
1648 memory_id,
1649 action,
1650 body_length,
1651 backend_invoked: file_backend_invoked,
1652 }) => {
1653 output::emit_json_compact(&IngestFileEvent {
1654 file: file_str,
1655 name: derived_name,
1656 status: "indexed",
1657 truncated: *name_truncated,
1658 original_name: original_name.clone(),
1659 original_filename: original_filename.as_deref(),
1660 error: None,
1661 memory_id: Some(memory_id),
1662 action: Some(action),
1663 body_length,
1664 backend_invoked: file_backend_invoked,
1665 })?;
1666 succeeded += 1;
1667 }
1668 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1669 output::emit_json_compact(&IngestFileEvent {
1670 file: file_str,
1671 name: derived_name,
1672 status: "skipped",
1673 truncated: *name_truncated,
1674 original_name: original_name.clone(),
1675 original_filename: original_filename.as_deref(),
1676 error: Some(format!("{e}")),
1677 memory_id: None,
1678 action: Some("duplicate".to_string()),
1679 body_length: 0,
1680 backend_invoked: None,
1681 })?;
1682 skipped += 1;
1683 }
1684 Err(e) => {
1685 let err_msg = format!("{e}");
1686 output::emit_json_compact(&IngestFileEvent {
1687 file: file_str,
1688 name: derived_name,
1689 status: "failed",
1690 truncated: *name_truncated,
1691 original_name: original_name.clone(),
1692 original_filename: original_filename.as_deref(),
1693 error: Some(err_msg.clone()),
1694 memory_id: None,
1695 action: None,
1696 body_length: 0,
1697 backend_invoked: None,
1698 })?;
1699 failed += 1;
1700 if fail_fast {
1701 output::emit_json_compact(&IngestSummary {
1702 summary: true,
1703 dir: args.dir.display().to_string(),
1704 pattern: args.pattern.clone(),
1705 recursive: args.recursive,
1706 files_total: total,
1707 files_succeeded: succeeded,
1708 files_failed: failed,
1709 files_skipped: skipped,
1710 elapsed_ms: started.elapsed().as_millis() as u64,
1711 })?;
1712 return Err(AppError::Validation(format!(
1713 "ingest aborted on first failure: {err_msg}"
1714 )));
1715 }
1716 }
1717 }
1718 }
1719
1720 producer_handle
1722 .join()
1723 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1724
1725 if let Ok(ref conn) = conn_or_err {
1726 if succeeded > 0 {
1727 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1728 }
1729 }
1730
1731 output::emit_json_compact(&IngestSummary {
1732 summary: true,
1733 dir: args.dir.display().to_string(),
1734 pattern: args.pattern.clone(),
1735 recursive: args.recursive,
1736 files_total: total,
1737 files_succeeded: succeeded,
1738 files_failed: failed,
1739 files_skipped: skipped,
1740 elapsed_ms: started.elapsed().as_millis() as u64,
1741 })?;
1742
1743 Ok(())
1744}
1745
1746fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1752 ensure_db_ready(paths)?;
1753 let conn = open_rw(&paths.db)?;
1754 Ok(conn)
1755}
1756
1757pub(crate) fn collect_files(
1758 dir: &Path,
1759 pattern: &str,
1760 recursive: bool,
1761 out: &mut Vec<PathBuf>,
1762) -> Result<(), AppError> {
1763 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1764 for entry in entries {
1765 let entry = entry.map_err(AppError::Io)?;
1766 let path = entry.path();
1767 let file_type = entry.file_type().map_err(AppError::Io)?;
1768 if file_type.is_file() {
1769 let name = entry.file_name();
1770 let name_str = name.to_string_lossy();
1771 if matches_pattern(&name_str, pattern) {
1772 out.push(path);
1773 }
1774 } else if file_type.is_dir() && recursive {
1775 collect_files(&path, pattern, recursive, out)?;
1776 }
1777 }
1778 Ok(())
1779}
1780
1781fn matches_pattern(name: &str, pattern: &str) -> bool {
1782 if let Some(suffix) = pattern.strip_prefix('*') {
1783 name.ends_with(suffix)
1784 } else if let Some(prefix) = pattern.strip_suffix('*') {
1785 name.starts_with(prefix)
1786 } else {
1787 name == pattern
1788 }
1789}
1790
1791pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1802 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1803 let lowered: String = stem
1804 .nfd()
1805 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1806 .map(|c| {
1807 if c == '_' || c.is_whitespace() {
1808 '-'
1809 } else {
1810 c
1811 }
1812 })
1813 .map(|c| c.to_ascii_lowercase())
1814 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1815 .collect();
1816 let collapsed = collapse_dashes(&lowered);
1817 let trimmed_raw = collapsed.trim_matches('-').to_string();
1818 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1820 format!("doc-{trimmed_raw}")
1821 } else {
1822 trimmed_raw
1823 };
1824 if trimmed.len() > max_len {
1825 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1826 tracing::debug!(
1827 target: "ingest",
1828 original = %trimmed,
1829 truncated_to = %truncated,
1830 max_len = max_len,
1831 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1832 );
1833 (truncated, true, Some(trimmed))
1834 } else {
1835 (trimmed, false, None)
1836 }
1837}
1838
1839fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1852 if !taken.contains(base) {
1853 return Ok(base.to_string());
1854 }
1855 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1856 let candidate = format!("{base}-{suffix}");
1857 if !taken.contains(&candidate) {
1858 tracing::warn!(
1859 target: "ingest",
1860 base = %base,
1861 resolved = %candidate,
1862 suffix,
1863 "memory name collision resolved with numeric suffix"
1864 );
1865 return Ok(candidate);
1866 }
1867 }
1868 Err(AppError::Validation(format!(
1869 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1870 )))
1871}
1872
1873fn collapse_dashes(s: &str) -> String {
1874 let mut out = String::with_capacity(s.len());
1875 let mut prev_dash = false;
1876 for c in s.chars() {
1877 if c == '-' {
1878 if !prev_dash {
1879 out.push('-');
1880 }
1881 prev_dash = true;
1882 } else {
1883 out.push(c);
1884 prev_dash = false;
1885 }
1886 }
1887 out
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892 use super::*;
1893 use std::path::PathBuf;
1894
1895 #[test]
1896 fn matches_pattern_suffix() {
1897 assert!(matches_pattern("foo.md", "*.md"));
1898 assert!(!matches_pattern("foo.txt", "*.md"));
1899 assert!(matches_pattern("foo.md", "*"));
1900 }
1901
1902 #[test]
1903 fn matches_pattern_prefix() {
1904 assert!(matches_pattern("README.md", "README*"));
1905 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1906 }
1907
1908 #[test]
1909 fn matches_pattern_exact() {
1910 assert!(matches_pattern("README.md", "README.md"));
1911 assert!(!matches_pattern("readme.md", "README.md"));
1912 }
1913
1914 #[test]
1915 fn derive_kebab_underscore_to_dash() {
1916 let p = PathBuf::from("/tmp/claude_code_headless.md");
1917 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1918 assert_eq!(name, "claude-code-headless");
1919 assert!(!truncated);
1920 assert!(original.is_none());
1921 }
1922
1923 #[test]
1924 fn derive_kebab_uppercase_lowered() {
1925 let p = PathBuf::from("/tmp/README.md");
1926 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1927 assert_eq!(name, "readme");
1928 assert!(!truncated);
1929 assert!(original.is_none());
1930 }
1931
1932 #[test]
1933 fn derive_kebab_strips_non_kebab_chars() {
1934 let p = PathBuf::from("/tmp/some@weird#name!.md");
1935 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1936 assert_eq!(name, "someweirdname");
1937 assert!(!truncated);
1938 assert!(original.is_none());
1939 }
1940
1941 #[test]
1944 fn derive_kebab_folds_accented_letters_to_ascii() {
1945 let p = PathBuf::from("/tmp/açaí.md");
1946 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1947 assert_eq!(name, "acai", "got '{name}'");
1948 }
1949
1950 #[test]
1951 fn derive_kebab_handles_naive_with_diaeresis() {
1952 let p = PathBuf::from("/tmp/naïve-test.md");
1953 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1954 assert_eq!(name, "naive-test", "got '{name}'");
1955 }
1956
1957 #[test]
1958 fn derive_kebab_drops_emoji_keeps_word() {
1959 let p = PathBuf::from("/tmp/🚀-rocket.md");
1960 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1961 assert_eq!(name, "rocket", "got '{name}'");
1962 }
1963
1964 #[test]
1965 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1966 let p = PathBuf::from("/tmp/açaí🦜.md");
1967 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1968 assert_eq!(name, "acai", "got '{name}'");
1969 }
1970
1971 #[test]
1972 fn derive_kebab_pure_emoji_yields_empty() {
1973 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1974 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1975 assert!(name.is_empty(), "got '{name}'");
1976 }
1977
1978 #[test]
1979 fn derive_kebab_collapses_consecutive_dashes() {
1980 let p = PathBuf::from("/tmp/a__b___c.md");
1981 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1982 assert_eq!(name, "a-b-c");
1983 assert!(!truncated);
1984 assert!(original.is_none());
1985 }
1986
1987 #[test]
1988 fn derive_kebab_truncates_to_60_chars() {
1989 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1990 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1991 assert!(name.len() <= 60, "got len {}", name.len());
1992 assert!(truncated);
1993 assert!(original.is_some());
1994 assert!(original.unwrap().len() > 60);
1995 }
1996
1997 #[test]
1998 fn collect_files_finds_md_files() {
1999 let tmp = tempfile::tempdir().expect("tempdir");
2000 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2001 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2002 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2003 let mut out = Vec::new();
2004 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2005 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2006 }
2007
2008 #[test]
2009 fn collect_files_recursive_descends_subdirs() {
2010 let tmp = tempfile::tempdir().expect("tempdir");
2011 let sub = tmp.path().join("sub");
2012 std::fs::create_dir(&sub).unwrap();
2013 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2014 std::fs::write(sub.join("b.md"), "y").unwrap();
2015 let mut out = Vec::new();
2016 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2017 assert_eq!(out.len(), 2);
2018 }
2019
2020 #[test]
2021 fn collect_files_non_recursive_skips_subdirs() {
2022 let tmp = tempfile::tempdir().expect("tempdir");
2023 let sub = tmp.path().join("sub");
2024 std::fs::create_dir(&sub).unwrap();
2025 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2026 std::fs::write(sub.join("b.md"), "y").unwrap();
2027 let mut out = Vec::new();
2028 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2029 assert_eq!(out.len(), 1);
2030 }
2031
2032 #[test]
2035 fn derive_kebab_long_basename_truncated_within_cap() {
2036 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2037 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2038 assert!(
2039 name.len() <= DERIVED_NAME_MAX_LEN,
2040 "truncated name must respect cap; got {} chars",
2041 name.len()
2042 );
2043 assert!(!name.is_empty());
2044 assert!(truncated);
2045 assert!(original.is_some());
2046 }
2047
2048 #[test]
2049 fn unique_name_returns_base_when_free() {
2050 let taken: BTreeSet<String> = BTreeSet::new();
2051 let resolved = unique_name("note", &taken).expect("must resolve");
2052 assert_eq!(resolved, "note");
2053 }
2054
2055 #[test]
2056 fn unique_name_appends_first_free_suffix_on_collision() {
2057 let mut taken: BTreeSet<String> = BTreeSet::new();
2058 taken.insert("note".to_string());
2059 taken.insert("note-1".to_string());
2060 let resolved = unique_name("note", &taken).expect("must resolve");
2061 assert_eq!(resolved, "note-2");
2062 }
2063
2064 #[test]
2065 fn unique_name_errors_after_collision_cap() {
2066 let mut taken: BTreeSet<String> = BTreeSet::new();
2067 taken.insert("note".to_string());
2068 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2069 taken.insert(format!("note-{i}"));
2070 }
2071 let err = unique_name("note", &taken).expect_err("must surface error");
2072 assert!(matches!(err, AppError::Validation(_)));
2073 }
2074
2075 #[test]
2078 fn validate_relation_format_accepts_valid_relations() {
2079 use crate::parsers::{is_canonical_relation, validate_relation_format};
2080 assert!(validate_relation_format("applies_to").is_ok());
2081 assert!(validate_relation_format("depends_on").is_ok());
2082 assert!(validate_relation_format("implements").is_ok());
2083 assert!(validate_relation_format("").is_err());
2084 assert!(is_canonical_relation("applies_to"));
2085 assert!(!is_canonical_relation("implements"));
2086 }
2087
2088 use serial_test::serial;
2091
2092 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2094 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2095 let prev = std::env::var(key).ok();
2096 match value {
2097 Some(v) => std::env::set_var(key, v),
2098 None => std::env::remove_var(key),
2099 }
2100 f();
2101 match prev {
2102 Some(p) => std::env::set_var(key, p),
2103 None => std::env::remove_var(key),
2104 }
2105 }
2106
2107 #[test]
2108 #[serial]
2109 fn env_low_memory_enabled_unset_returns_false() {
2110 with_env_var(None, || assert!(!env_low_memory_enabled()));
2111 }
2112
2113 #[test]
2114 #[serial]
2115 fn env_low_memory_enabled_empty_returns_false() {
2116 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2117 }
2118
2119 #[test]
2120 #[serial]
2121 fn env_low_memory_enabled_truthy_values_return_true() {
2122 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2123 with_env_var(Some(v), || {
2124 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2125 });
2126 }
2127 }
2128
2129 #[test]
2130 #[serial]
2131 fn env_low_memory_enabled_falsy_values_return_false() {
2132 for v in ["0", "false", "FALSE", "no", "off"] {
2133 with_env_var(Some(v), || {
2134 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2135 });
2136 }
2137 }
2138
2139 #[test]
2140 #[serial]
2141 fn env_low_memory_enabled_unrecognized_value_returns_false() {
2142 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2143 }
2144
2145 #[test]
2146 #[serial]
2147 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2148 with_env_var(None, || {
2149 assert_eq!(resolve_parallelism(true, Some(4)), 1);
2150 assert_eq!(resolve_parallelism(true, Some(8)), 1);
2151 assert_eq!(resolve_parallelism(true, None), 1);
2152 });
2153 }
2154
2155 #[test]
2156 #[serial]
2157 fn resolve_parallelism_env_forces_one_when_flag_off() {
2158 with_env_var(Some("1"), || {
2159 assert_eq!(resolve_parallelism(false, Some(4)), 1);
2160 assert_eq!(resolve_parallelism(false, None), 1);
2161 });
2162 }
2163
2164 #[test]
2165 #[serial]
2166 fn resolve_parallelism_falsy_env_does_not_override() {
2167 with_env_var(Some("0"), || {
2168 assert_eq!(resolve_parallelism(false, Some(4)), 4);
2169 });
2170 }
2171
2172 #[test]
2173 #[serial]
2174 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2175 with_env_var(None, || {
2176 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2177 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2178 });
2179 }
2180
2181 #[test]
2182 #[serial]
2183 fn resolve_parallelism_default_when_unset() {
2184 with_env_var(None, || {
2185 let p = resolve_parallelism(false, None);
2186 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2187 });
2188 }
2189
2190 #[test]
2191 fn ingest_args_parses_low_memory_flag_via_clap() {
2192 use clap::Parser;
2193 let cli = crate::cli::Cli::try_parse_from([
2196 "sqlite-graphrag",
2197 "ingest",
2198 "/tmp/dummy",
2199 "--type",
2200 "document",
2201 "--low-memory",
2202 ])
2203 .expect("parse must succeed");
2204 match cli.command {
2205 Some(crate::cli::Commands::Ingest(args)) => {
2206 assert!(args.low_memory, "--low-memory must set field to true");
2207 }
2208 _ => panic!("expected Ingest subcommand"),
2209 }
2210 }
2211
2212 #[test]
2213 fn ingest_args_low_memory_defaults_false() {
2214 use clap::Parser;
2215 let cli = crate::cli::Cli::try_parse_from([
2216 "sqlite-graphrag",
2217 "ingest",
2218 "/tmp/dummy",
2219 "--type",
2220 "document",
2221 ])
2222 .expect("parse must succeed");
2223 match cli.command {
2224 Some(crate::cli::Commands::Ingest(args)) => {
2225 assert!(!args.low_memory, "default must be false");
2226 }
2227 _ => panic!("expected Ingest subcommand"),
2228 }
2229 }
2230}