1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120
121 #[arg(
125 long,
126 default_value_t = true,
127 overrides_with = "no_auto_describe",
128 help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129 )]
130 pub auto_describe: bool,
131 #[arg(
132 long = "no-auto-describe",
133 default_value_t = false,
134 help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135 )]
136 pub no_auto_describe: bool,
137 #[arg(
138 long,
139 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140 default_value = "fp32",
141 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142 )]
143 pub gliner_variant: String,
144
145 #[arg(long, default_value_t = false, hide = true)]
147 pub skip_extraction: bool,
148
149 #[arg(long, default_value_t = false)]
151 pub fail_fast: bool,
152
153 #[arg(long, default_value_t = false)]
155 pub dry_run: bool,
156
157 #[arg(long, default_value_t = 10_000)]
159 pub max_files: usize,
160
161 #[arg(long)]
163 pub namespace: Option<String>,
164
165 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167 pub db: Option<String>,
168
169 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170 pub format: JsonOutputFormat,
171
172 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173 pub json: bool,
174
175 #[arg(
177 long,
178 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179 )]
180 pub ingest_parallelism: Option<usize>,
181
182 #[arg(
190 long,
191 default_value_t = false,
192 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193 Recommended for environments with <4 GB available RAM or container/cgroup \
194 constraints. Trade-off: 3-4x longer wall time. Also honored via \
195 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196 )]
197 pub low_memory: bool,
198
199 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202 pub max_rss_mb: u64,
203
204 #[arg(long, default_value_t = 2, value_name = "N",
209 value_parser = clap::value_parser!(u64).range(1..=32),
210 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211 pub llm_parallelism: u64,
212
213 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218 help = "Maximum length for derived memory names (default: 60)")]
219 pub max_name_length: usize,
220
221 #[arg(long, value_enum, default_value_t = IngestMode::None)]
223 pub mode: IngestMode,
224
225 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227 pub claude_binary: Option<std::path::PathBuf>,
228
229 #[arg(long)]
231 pub claude_model: Option<String>,
232
233 #[arg(long, default_value_t = false)]
235 pub resume: bool,
236
237 #[arg(long, default_value_t = false)]
239 pub retry_failed: bool,
240
241 #[arg(long, default_value_t = false)]
243 pub keep_queue: bool,
244
245 #[arg(long, default_value = ".ingest-queue.sqlite")]
247 pub queue_db: String,
248
249 #[arg(long, default_value_t = 60)]
251 pub rate_limit_wait: u64,
252
253 #[arg(long)]
255 pub max_cost_usd: Option<f64>,
256
257 #[arg(
259 long,
260 default_value_t = 300,
261 help = "Timeout in seconds for each claude -p invocation (default: 300)"
262 )]
263 pub claude_timeout: u64,
264
265 #[arg(
267 long,
268 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270 )]
271 pub codex_binary: Option<PathBuf>,
272
273 #[arg(
275 long,
276 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277 )]
278 pub codex_model: Option<String>,
279
280 #[arg(
282 long,
283 default_value_t = 300,
284 help = "Timeout in seconds for each codex exec invocation (default: 300)"
285 )]
286 pub codex_timeout: u64,
287
288 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290 pub opencode_binary: Option<PathBuf>,
291
292 #[arg(
294 long,
295 value_name = "MODEL",
296 env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297 help = "Model override for OpenCode extraction"
298 )]
299 pub opencode_model: Option<String>,
300
301 #[arg(
303 long,
304 value_name = "SECONDS",
305 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306 default_value_t = 300,
307 help = "Timeout in seconds for each opencode run invocation (default: 300)"
308 )]
309 pub opencode_timeout: u64,
310
311 #[arg(long, value_name = "SECONDS")]
314 pub wait_job_singleton: Option<u64>,
315
316 #[arg(long, default_value_t = false)]
319 pub force_job_singleton: bool,
320}
321
322#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
324pub enum IngestMode {
325 None,
327 Gliner,
329 ClaudeCode,
331 Codex,
333 #[value(name = "opencode")]
335 Opencode,
336}
337
338fn env_low_memory_enabled() -> bool {
343 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
344 Ok(v) if v.is_empty() => false,
345 Ok(v) => match v.to_lowercase().as_str() {
346 "1" | "true" | "yes" | "on" => true,
347 "0" | "false" | "no" | "off" => false,
348 other => {
349 tracing::warn!(
350 target: "ingest",
351 value = %other,
352 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
353 );
354 false
355 }
356 },
357 Err(_) => false,
358 }
359}
360
361fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
373 let env_flag = env_low_memory_enabled();
374 let low_memory = low_memory_flag || env_flag;
375
376 if low_memory {
377 if let Some(n) = ingest_parallelism {
378 if n > 1 {
379 tracing::warn!(
380 target: "ingest",
381 requested = n,
382 "--ingest-parallelism overridden by --low-memory; using 1"
383 );
384 }
385 }
386 if low_memory_flag {
387 tracing::info!(
388 target: "ingest",
389 source = "flag",
390 "low-memory mode enabled: forcing --ingest-parallelism 1"
391 );
392 } else {
393 tracing::info!(
394 target: "ingest",
395 source = "env",
396 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
397 );
398 }
399 return 1;
400 }
401
402 ingest_parallelism
403 .unwrap_or_else(|| {
404 std::thread::available_parallelism()
405 .map(|v| v.get() / 2)
406 .unwrap_or(1)
407 .clamp(1, 4)
408 })
409 .max(1)
410}
411
412#[derive(Serialize)]
413struct IngestFileEvent<'a> {
414 file: &'a str,
415 name: &'a str,
416 status: &'a str,
417 truncated: bool,
419 #[serde(skip_serializing_if = "Option::is_none")]
421 original_name: Option<String>,
422 #[serde(skip_serializing_if = "Option::is_none")]
424 original_filename: Option<&'a str>,
425 #[serde(skip_serializing_if = "Option::is_none")]
426 error: Option<String>,
427 #[serde(skip_serializing_if = "Option::is_none")]
428 memory_id: Option<i64>,
429 #[serde(skip_serializing_if = "Option::is_none")]
430 action: Option<String>,
431 body_length: usize,
433 #[serde(skip_serializing_if = "Option::is_none")]
438 backend_invoked: Option<&'a str>,
439}
440
441#[derive(Serialize)]
442struct IngestSummary {
443 summary: bool,
444 dir: String,
445 pattern: String,
446 recursive: bool,
447 files_total: usize,
448 files_succeeded: usize,
449 files_failed: usize,
450 files_skipped: usize,
451 elapsed_ms: u64,
452}
453
454struct FileSuccess {
456 memory_id: i64,
457 action: String,
458 body_length: usize,
459 backend_invoked: Option<&'static str>,
460}
461
462#[derive(Serialize)]
465struct StageProgressEvent<'a> {
466 schema_version: u8,
467 event: &'a str,
468 path: &'a str,
469 ms: u64,
470 entities: usize,
471 relationships: usize,
472}
473
474struct StagedFile {
477 body: String,
478 body_hash: String,
479 snippet: String,
480 name: String,
481 description: String,
482 embedding: Option<Vec<f32>>,
483 chunk_embeddings: Option<Vec<Vec<f32>>>,
484 chunks_info: Vec<crate::chunking::Chunk>,
485 entities: Vec<NewEntity>,
486 relationships: Vec<NewRelationship>,
487 entity_embeddings: Option<Vec<Vec<f32>>>,
488 urls: Vec<crate::extraction::ExtractedUrl>,
489 backend_invoked: Option<&'static str>,
494}
495
496#[allow(clippy::too_many_arguments)]
502fn stage_file(
503 _idx: usize,
504 path: &Path,
505 name: &str,
506 paths: &AppPaths,
507 enable_ner: bool,
508 gliner_variant: crate::extraction::GlinerVariant,
509 max_rss_mb: u64,
510 llm_parallelism: usize,
511 llm_backend: crate::cli::LlmBackendChoice,
512 auto_describe: bool,
513) -> Result<StagedFile, AppError> {
514 use crate::constants::*;
515
516 if name.len() > MAX_MEMORY_NAME_LEN {
517 return Err(AppError::LimitExceeded(
518 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
519 ));
520 }
521 if name.starts_with("__") {
522 return Err(AppError::Validation(
523 crate::i18n::validation::reserved_name(),
524 ));
525 }
526 {
527 let slug_re = crate::constants::name_slug_regex();
528 if !slug_re.is_match(name) {
529 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
530 name,
531 )));
532 }
533 }
534
535 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
536 if file_size > MAX_MEMORY_BODY_LEN as u64 {
537 return Err(AppError::LimitExceeded(
538 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
539 ));
540 }
541 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
542 if raw_body.len() > MAX_MEMORY_BODY_LEN {
543 return Err(AppError::LimitExceeded(
544 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
545 ));
546 }
547 if raw_body.trim().is_empty() {
548 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
549 }
550
551 let description = if auto_describe {
552 crate::commands::ingest_heuristics::extract_heuristic_description(
553 &raw_body,
554 Some(&path.display().to_string()),
555 )
556 } else {
557 format!("ingested from {}", path.display())
558 };
559 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
560 return Err(AppError::Validation(
561 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
562 ));
563 }
564
565 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
566 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
567 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
568 if enable_ner {
569 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
570 Ok(extracted) => {
571 extracted_urls = extracted.urls;
572 extracted_entities = extracted
577 .entities
578 .into_iter()
579 .map(|e| NewEntity {
580 name: e.name,
581 entity_type: crate::entity_type::EntityType::Concept,
582 description: None,
583 })
584 .collect();
585 extracted_relationships.clear();
590
591 if extracted_entities.len() > max_entities_per_memory() {
592 extracted_entities.truncate(max_entities_per_memory());
593 }
594 if extracted_relationships.len() > max_relationships_per_memory() {
595 extracted_relationships.truncate(max_relationships_per_memory());
596 }
597 }
598 Err(e) => {
599 tracing::warn!(
600 target: "ingest",
601 file = %path.display(),
602 "auto-extraction failed (graceful degradation): {e:#}"
603 );
604 }
605 }
606 }
607
608 for rel in &mut extracted_relationships {
609 rel.relation = crate::parsers::normalize_relation(&rel.relation);
610 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
611 return Err(AppError::Validation(format!(
612 "{e} for relationship '{}' -> '{}'",
613 rel.source, rel.target
614 )));
615 }
616 crate::parsers::warn_if_non_canonical(&rel.relation);
617 if !(0.0..=1.0).contains(&rel.strength) {
618 return Err(AppError::Validation(format!(
619 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
620 rel.strength, rel.source, rel.target
621 )));
622 }
623 }
624
625 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
626 let snippet: String = raw_body.chars().take(200).collect();
627
628 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
629 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
630 return Err(AppError::LimitExceeded(format!(
631 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
632 chunks_info.len(),
633 REMEMBER_MAX_SAFE_MULTI_CHUNKS
634 )));
635 }
636
637 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
638 let skip_embed = crate::embedder::should_skip_embedding_on_failure();
639 let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
643 .len()
644 == 1
645 {
646 match crate::embedder::embed_passage_with_choice(
648 &paths.models,
649 &raw_body,
650 Some(llm_backend),
651 ) {
652 Ok((v, k)) => (Some(v), Some(k.as_str())),
653 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
654 Err(e) if skip_embed => {
655 tracing::warn!(error = %e, file = %path.display(), "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
656 (None, None)
657 }
658 Err(e) => return Err(e),
659 }
660 } else {
661 let chunk_texts: Vec<String> = chunks_info
664 .iter()
665 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
666 .collect();
667 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
668 if rss > max_rss_mb {
669 tracing::error!(
670 target: "ingest",
671 rss_mb = rss,
672 max_rss_mb = max_rss_mb,
673 file = %path.display(),
674 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
675 );
676 return Err(AppError::LowMemory {
677 available_mb: crate::memory_guard::available_memory_mb(),
678 required_mb: max_rss_mb,
679 });
680 }
681 }
682 match crate::embedder::embed_passages_parallel_local(
683 &paths.models,
684 &chunk_texts,
685 llm_parallelism,
686 crate::embedder::chunk_embed_batch_size(),
687 ) {
688 Ok(chunk_embeddings) => {
689 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
690 chunk_embeddings_opt = Some(chunk_embeddings);
691 (Some(aggregated), None)
694 }
695 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
696 Err(e) if skip_embed => {
697 tracing::warn!(error = %e, file = %path.display(), "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
698 (None, None)
699 }
700 Err(e) => return Err(e),
701 }
702 };
703
704 let entity_texts: Vec<String> = extracted_entities
706 .iter()
707 .map(|entity| match &entity.description {
708 Some(desc) => format!("{} {}", entity.name, desc),
709 None => entity.name.clone(),
710 })
711 .collect();
712 let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
716 &paths.models,
717 &entity_texts,
718 llm_parallelism,
719 ) {
720 Ok((entity_embeddings, embed_cache_stats)) => {
721 if embed_cache_stats.hits > 0 {
722 tracing::debug!(
723 hits = embed_cache_stats.hits,
724 misses = embed_cache_stats.misses,
725 requested = embed_cache_stats.requested,
726 "G56: entity embed cache hit (ingest)"
727 );
728 }
729 Some(entity_embeddings)
730 }
731 Err(e) if skip_embed => {
732 tracing::warn!(error = %e, file = %path.display(), "ingest: entity embedding failed; --skip-embedding-on-failure active");
733 None
734 }
735 Err(e) => return Err(e),
736 };
737
738 Ok(StagedFile {
739 body: raw_body,
740 body_hash,
741 snippet,
742 name: name.to_string(),
743 description,
744 embedding,
745 chunk_embeddings: chunk_embeddings_opt,
746 chunks_info,
747 entities: extracted_entities,
748 relationships: extracted_relationships,
749 entity_embeddings: entity_embeddings_opt,
750 urls: extracted_urls,
751 backend_invoked,
752 })
753}
754
755fn persist_staged(
757 conn: &mut Connection,
758 namespace: &str,
759 memory_type: &str,
760 staged: StagedFile,
761) -> Result<FileSuccess, AppError> {
762 {
763 let active_count: u32 = conn.query_row(
764 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
765 [],
766 |r| r.get::<_, i64>(0).map(|v| v as u32),
767 )?;
768 let ns_exists: bool = conn.query_row(
769 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
770 rusqlite::params![namespace],
771 |r| r.get::<_, i64>(0).map(|v| v > 0),
772 )?;
773 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
774 return Err(AppError::NamespaceError(format!(
775 "active namespace limit of {} exceeded while creating '{namespace}'",
776 crate::constants::MAX_NAMESPACES_ACTIVE
777 )));
778 }
779 }
780
781 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
782 if existing_memory.is_some() {
783 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
784 &staged.name,
785 namespace,
786 )));
787 }
788 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
789
790 let new_memory = NewMemory {
791 namespace: namespace.to_string(),
792 name: staged.name.clone(),
793 memory_type: memory_type.to_string(),
794 description: staged.description.clone(),
795 body: staged.body,
796 body_hash: staged.body_hash,
797 session_id: None,
798 source: "agent".to_string(),
799 metadata: serde_json::json!({}),
800 };
801
802 if let Some(hash_id) = duplicate_hash_id {
803 tracing::debug!(
804 target: "ingest",
805 duplicate_memory_id = hash_id,
806 "identical body already exists; persisting a new memory anyway"
807 );
808 }
809
810 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
811
812 let memory_id = memories::insert(&tx, &new_memory)?;
813 versions::insert_version(
814 &tx,
815 memory_id,
816 1,
817 &staged.name,
818 memory_type,
819 &staged.description,
820 &new_memory.body,
821 &serde_json::to_string(&new_memory.metadata)?,
822 None,
823 "create",
824 )?;
825 if let Some(ref emb) = staged.embedding {
826 memories::upsert_vec(
827 &tx,
828 memory_id,
829 namespace,
830 memory_type,
831 emb,
832 &staged.name,
833 &staged.snippet,
834 )?;
835 }
836
837 if staged.chunks_info.len() > 1 {
838 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
839 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
840 for (i, emb) in chunk_embeddings.iter().enumerate() {
841 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
842 }
843 }
844 }
845
846 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
847 for (idx, entity) in staged.entities.iter().enumerate() {
848 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
849 if let Some(ref entity_embeddings) = staged.entity_embeddings {
850 if let Some(entity_embedding) = entity_embeddings.get(idx) {
851 entities::upsert_entity_vec(
852 &tx,
853 entity_id,
854 namespace,
855 entity.entity_type,
856 entity_embedding,
857 &entity.name,
858 )?;
859 }
860 }
861 entities::link_memory_entity(&tx, memory_id, entity_id)?;
862 }
863 let entity_types: std::collections::HashMap<&str, EntityType> = staged
864 .entities
865 .iter()
866 .map(|entity| (entity.name.as_str(), entity.entity_type))
867 .collect();
868
869 let mut affected_entity_ids: std::collections::HashSet<i64> =
870 std::collections::HashSet::new();
871 for entity in &staged.entities {
872 if let Some(eid) = entities::find_entity_id(&tx, namespace, &entity.name)? {
873 affected_entity_ids.insert(eid);
874 }
875 }
876
877 for rel in &staged.relationships {
878 let source_entity = NewEntity {
879 name: rel.source.clone(),
880 entity_type: entity_types
881 .get(rel.source.as_str())
882 .copied()
883 .unwrap_or(EntityType::Concept),
884 description: None,
885 };
886 let target_entity = NewEntity {
887 name: rel.target.clone(),
888 entity_type: entity_types
889 .get(rel.target.as_str())
890 .copied()
891 .unwrap_or(EntityType::Concept),
892 description: None,
893 };
894 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
895 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
896 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
897 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
898 affected_entity_ids.insert(source_id);
899 affected_entity_ids.insert(target_id);
900 }
901
902 for &eid in &affected_entity_ids {
903 entities::recalculate_degree(&tx, eid)?;
904 }
905 }
906
907 tx.commit()?;
908
909 if !staged.urls.is_empty() {
910 let url_entries: Vec<storage_urls::MemoryUrl> = staged
911 .urls
912 .into_iter()
913 .map(|u| storage_urls::MemoryUrl {
914 url: u.url,
915 offset: Some(u.start as i64),
916 })
917 .collect();
918 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
919 }
920
921 Ok(FileSuccess {
922 memory_id,
923 action: "created".to_string(),
924 body_length: new_memory.body.len(),
925 backend_invoked: staged.backend_invoked,
926 })
927}
928
929fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
937 value == default
938}
939
940fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
955 const DEFAULT_TIMEOUT: u64 = 300;
956 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
957
958 let mut conflicts: Vec<String> = Vec::new();
959
960 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
961
962 if is_local_mode {
963 if args.claude_binary.is_some() {
964 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
965 }
966 if args.claude_model.is_some() {
967 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
968 }
969 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
970 conflicts.push(format!(
971 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
972 args.claude_timeout
973 ));
974 }
975 if args.codex_binary.is_some() {
976 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
977 }
978 if args.codex_model.is_some() {
979 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
980 }
981 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
982 conflicts.push(format!(
983 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
984 args.codex_timeout
985 ));
986 }
987 if args.opencode_binary.is_some() {
988 conflicts
989 .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
990 }
991 if args.opencode_model.is_some() {
992 conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
993 }
994 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
995 conflicts.push(format!(
996 "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
997 args.opencode_timeout
998 ));
999 }
1000 if args.max_cost_usd.is_some() {
1001 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1002 }
1003 if args.resume {
1004 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1005 }
1006 if args.retry_failed {
1007 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1008 }
1009 if args.keep_queue {
1010 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1011 }
1012 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1013 conflicts.push(format!(
1014 "--rate-limit-wait={} is ignored when --mode is none or gliner",
1015 args.rate_limit_wait
1016 ));
1017 }
1018 }
1019
1020 match args.mode {
1021 IngestMode::ClaudeCode => {
1022 if args.codex_binary.is_some() {
1023 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1024 }
1025 if args.codex_model.is_some() {
1026 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1027 }
1028 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1029 conflicts.push(format!(
1030 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1031 args.codex_timeout
1032 ));
1033 }
1034 if args.opencode_binary.is_some() {
1035 conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1036 }
1037 if args.opencode_model.is_some() {
1038 conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1039 }
1040 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1041 conflicts.push(format!(
1042 "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1043 args.opencode_timeout
1044 ));
1045 }
1046 }
1047 IngestMode::Codex => {
1048 if args.claude_binary.is_some() {
1049 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1050 }
1051 if args.claude_model.is_some() {
1052 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1053 }
1054 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1055 conflicts.push(format!(
1056 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1057 args.claude_timeout
1058 ));
1059 }
1060 if args.max_cost_usd.is_some() {
1061 conflicts.push(
1062 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1063 .to_string(),
1064 );
1065 }
1066 if args.resume {
1067 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1068 }
1069 if args.retry_failed {
1070 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1071 }
1072 if args.keep_queue {
1073 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1074 }
1075 if args.opencode_binary.is_some() {
1076 conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1077 }
1078 if args.opencode_model.is_some() {
1079 conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1080 }
1081 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1082 conflicts.push(format!(
1083 "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1084 args.opencode_timeout
1085 ));
1086 }
1087 }
1088 IngestMode::Opencode => {
1089 if args.claude_binary.is_some() {
1090 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1091 }
1092 if args.claude_model.is_some() {
1093 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1094 }
1095 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1096 conflicts.push(format!(
1097 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1098 args.claude_timeout
1099 ));
1100 }
1101 if args.codex_binary.is_some() {
1102 conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1103 }
1104 if args.codex_model.is_some() {
1105 conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1106 }
1107 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1108 conflicts.push(format!(
1109 "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1110 args.codex_timeout
1111 ));
1112 }
1113 if args.max_cost_usd.is_some() {
1114 conflicts.push(
1115 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1116 .to_string(),
1117 );
1118 }
1119 if args.resume {
1120 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1121 }
1122 if args.retry_failed {
1123 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1124 }
1125 if args.keep_queue {
1126 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1127 }
1128 }
1129 IngestMode::None | IngestMode::Gliner => {}
1130 }
1131
1132 if !conflicts.is_empty() {
1133 return Err(AppError::Validation(format!(
1134 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
1135 args.mode,
1136 conflicts.join("\n - ")
1137 )));
1138 }
1139
1140 Ok(())
1141}
1142
1143#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1146pub fn run(args: IngestArgs, llm_backend: crate::cli::LlmBackendChoice) -> Result<(), AppError> {
1147 validate_mode_conditional_flags_ingest(&args)?;
1150 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1151 if args.mode == IngestMode::ClaudeCode {
1152 return super::ingest_claude::run_claude_ingest(&args);
1153 }
1154 if args.mode == IngestMode::Codex {
1155 return super::ingest_codex::run_codex_ingest(&args);
1156 }
1157 if args.mode == IngestMode::Opencode {
1158 return super::ingest_opencode::run_opencode_ingest(&args);
1159 }
1160
1161 let started = std::time::Instant::now();
1162
1163 if !args.dir.exists() {
1164 return Err(AppError::Validation(format!(
1165 "directory not found: {}",
1166 args.dir.display()
1167 )));
1168 }
1169 if !args.dir.is_dir() {
1170 return Err(AppError::Validation(format!(
1171 "path is not a directory: {}",
1172 args.dir.display()
1173 )));
1174 }
1175
1176 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1177 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1178 files.sort_unstable();
1179
1180 if files.len() > args.max_files {
1181 return Err(AppError::Validation(format!(
1182 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1183 files.len(),
1184 args.max_files
1185 )));
1186 }
1187
1188 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1189 let memory_type_str = args.r#type.as_str().to_string();
1190
1191 let paths = AppPaths::resolve(args.db.as_deref())?;
1192 let mut conn_or_err = match init_storage(&paths) {
1193 Ok(c) => Ok(c),
1194 Err(e) => Err(format!("{e}")),
1195 };
1196
1197 let mut succeeded: usize = 0;
1198 let mut failed: usize = 0;
1199 let mut skipped: usize = 0;
1200 let total = files.len();
1201
1202 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1205
1206 enum SlotMeta {
1212 Skip {
1213 file_str: String,
1214 derived_base: String,
1215 name_truncated: bool,
1216 original_name: Option<String>,
1217 original_filename: Option<String>,
1218 reason: String,
1219 },
1220 Process {
1221 file_str: String,
1222 derived_name: String,
1223 name_truncated: bool,
1224 original_name: Option<String>,
1225 original_filename: Option<String>,
1226 },
1227 }
1228
1229 struct ProcessItem {
1230 idx: usize,
1231 path: PathBuf,
1232 file_str: String,
1233 derived_name: String,
1234 }
1235
1236 let files_cap = files.len();
1237 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1238 slots_meta.try_reserve(files_cap).map_err(|_| {
1239 AppError::LimitExceeded(format!(
1240 "allocation of {files_cap} slot metadata entries would exceed available memory"
1241 ))
1242 })?;
1243 let mut process_items: Vec<ProcessItem> = Vec::new();
1244 process_items.try_reserve(files_cap).map_err(|_| {
1245 AppError::LimitExceeded(format!(
1246 "allocation of {files_cap} process items would exceed available memory"
1247 ))
1248 })?;
1249 let mut truncations: Vec<(String, String)> = Vec::new();
1250 truncations.try_reserve(files_cap).map_err(|_| {
1251 AppError::LimitExceeded(format!(
1252 "allocation of {files_cap} truncation entries would exceed available memory"
1253 ))
1254 })?;
1255
1256 let max_name_length = args.max_name_length;
1257 for path in &files {
1258 let file_str = path.to_string_lossy().into_owned();
1259 let (derived_base, name_truncated, original_name) =
1260 derive_kebab_name(path, max_name_length);
1261 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1262
1263 if name_truncated {
1264 if let Some(ref orig) = original_name {
1265 truncations.push((orig.clone(), derived_base.clone()));
1266 }
1267 }
1268
1269 if derived_base.is_empty() {
1270 let orig_filename = if !original_basename.is_empty() {
1272 Some(original_basename.to_string())
1273 } else {
1274 None
1275 };
1276 slots_meta.push(SlotMeta::Skip {
1277 file_str,
1278 derived_base: String::new(),
1279 name_truncated: false,
1280 original_name: None,
1281 original_filename: orig_filename,
1282 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1283 });
1284 continue;
1285 }
1286
1287 match unique_name(&derived_base, &taken_names) {
1288 Ok(derived_name) => {
1289 taken_names.insert(derived_name.clone());
1290 let idx = slots_meta.len();
1291 let orig_filename = if original_basename != derived_name {
1293 Some(original_basename.to_string())
1294 } else {
1295 None
1296 };
1297 process_items.push(ProcessItem {
1298 idx,
1299 path: path.clone(),
1300 file_str: file_str.clone(),
1301 derived_name: derived_name.clone(),
1302 });
1303 slots_meta.push(SlotMeta::Process {
1304 file_str,
1305 derived_name,
1306 name_truncated,
1307 original_name,
1308 original_filename: orig_filename,
1309 });
1310 }
1311 Err(e) => {
1312 let orig_filename = if original_basename != derived_base {
1313 Some(original_basename.to_string())
1314 } else {
1315 None
1316 };
1317 slots_meta.push(SlotMeta::Skip {
1318 file_str,
1319 derived_base,
1320 name_truncated,
1321 original_name,
1322 original_filename: orig_filename,
1323 reason: e.to_string(),
1324 });
1325 }
1326 }
1327 }
1328
1329 if !truncations.is_empty() {
1330 tracing::info!(
1331 target: "ingest",
1332 count = truncations.len(),
1333 max_name_length = max_name_length,
1334 max_len = DERIVED_NAME_MAX_LEN,
1335 "derived names truncated; pass -vv (debug) for per-file detail"
1336 );
1337 }
1338
1339 if args.dry_run {
1341 for meta in &slots_meta {
1342 match meta {
1343 SlotMeta::Skip {
1344 file_str,
1345 derived_base,
1346 name_truncated,
1347 original_name,
1348 original_filename,
1349 reason,
1350 } => {
1351 output::emit_json_compact(&IngestFileEvent {
1352 file: file_str,
1353 name: derived_base,
1354 status: "skip",
1355 truncated: *name_truncated,
1356 original_name: original_name.clone(),
1357 original_filename: original_filename.as_deref(),
1358 error: Some(reason.clone()),
1359 memory_id: None,
1360 action: None,
1361 body_length: 0,
1362 backend_invoked: None,
1363 })?;
1364 }
1365 SlotMeta::Process {
1366 file_str,
1367 derived_name,
1368 name_truncated,
1369 original_name,
1370 original_filename,
1371 } => {
1372 output::emit_json_compact(&IngestFileEvent {
1373 file: file_str,
1374 name: derived_name,
1375 status: "preview",
1376 truncated: *name_truncated,
1377 original_name: original_name.clone(),
1378 original_filename: original_filename.as_deref(),
1379 error: None,
1380 memory_id: None,
1381 action: None,
1382 body_length: 0,
1383 backend_invoked: None,
1384 })?;
1385 }
1386 }
1387 }
1388 output::emit_json_compact(&IngestSummary {
1389 summary: true,
1390 dir: args.dir.to_string_lossy().into_owned(),
1391 pattern: args.pattern.clone(),
1392 recursive: args.recursive,
1393 files_total: total,
1394 files_succeeded: 0,
1395 files_failed: 0,
1396 files_skipped: 0,
1397 elapsed_ms: started.elapsed().as_millis() as u64,
1398 })?;
1399 return Ok(());
1400 }
1401
1402 if args.low_memory {
1404 if let Some(n) = args.ingest_parallelism {
1405 if n > 1 {
1406 return Err(AppError::Validation(
1407 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1408 .to_string(),
1409 ));
1410 }
1411 }
1412 }
1413
1414 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1417
1418 let pool = rayon::ThreadPoolBuilder::new()
1419 .num_threads(parallelism)
1420 .build()
1421 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1422
1423 if args.enable_ner && args.skip_extraction {
1424 return Err(AppError::Validation(
1425 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1426 ));
1427 }
1428 if args.skip_extraction && !args.enable_ner {
1429 tracing::warn!(
1436 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1437 );
1438 }
1439 let enable_ner = args.enable_ner;
1440 let auto_describe = args.auto_describe && !args.no_auto_describe;
1441 let max_rss_mb = args.max_rss_mb;
1442 let llm_parallelism = args.llm_parallelism as usize;
1443 if args.mode == IngestMode::Gliner {
1447 tracing::warn!(
1448 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1449 );
1450 }
1451 if args.gliner_variant != "fp32" {
1452 tracing::warn!(
1453 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1454 );
1455 }
1456 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1457 "int8" => crate::extraction::GlinerVariant::Int8,
1458 _ => crate::extraction::GlinerVariant::Fp32,
1459 };
1460
1461 let total_to_process = process_items.len();
1462 tracing::info!(
1463 target: "ingest",
1464 phase = "pipeline_start",
1465 files = total_to_process,
1466 ingest_parallelism = parallelism,
1467 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1468 );
1469
1470 let channel_bound = (parallelism * 2).max(1);
1474 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1475
1476 let paths_owned = paths.clone();
1481 let llm_backend_owned = llm_backend;
1482 let producer_handle = std::thread::spawn(move || {
1483 pool.install(|| {
1484 process_items.into_par_iter().for_each(|item| {
1485 if crate::shutdown_requested() {
1486 return;
1487 }
1488 let t0 = std::time::Instant::now();
1489 let result = stage_file(
1490 item.idx,
1491 &item.path,
1492 &item.derived_name,
1493 &paths_owned,
1494 enable_ner,
1495 gliner_variant,
1496 max_rss_mb,
1497 llm_parallelism,
1498 llm_backend_owned,
1499 auto_describe,
1500 );
1501 let elapsed_ms = t0.elapsed().as_millis() as u64;
1502
1503 let (n_entities, n_relationships) = match &result {
1506 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1507 Err(_) => (0, 0),
1508 };
1509 let progress = StageProgressEvent {
1510 schema_version: 1,
1511 event: "file_extracted",
1512 path: &item.file_str,
1513 ms: elapsed_ms,
1514 entities: n_entities,
1515 relationships: n_relationships,
1516 };
1517 if let Ok(line) = serde_json::to_string(&progress) {
1518 tracing::info!(target: "ingest_progress", "{}", line);
1519 }
1520
1521 let _ = tx.send((item.idx, result));
1525 });
1526 drop(tx);
1528 });
1529 });
1530
1531 let fail_fast = args.fail_fast;
1543
1544 for meta in &slots_meta {
1546 if let SlotMeta::Skip {
1547 file_str,
1548 derived_base,
1549 name_truncated,
1550 original_name,
1551 original_filename,
1552 reason,
1553 } = meta
1554 {
1555 output::emit_json_compact(&IngestFileEvent {
1556 file: file_str,
1557 name: derived_base,
1558 status: "skipped",
1559 truncated: *name_truncated,
1560 original_name: original_name.clone(),
1561 original_filename: original_filename.as_deref(),
1562 error: Some(reason.clone()),
1563 memory_id: None,
1564 action: None,
1565 body_length: 0,
1566 backend_invoked: None,
1567 })?;
1568 skipped += 1;
1569 }
1570 }
1571
1572 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1575 .iter()
1576 .enumerate()
1577 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1578 .collect();
1579
1580 tracing::info!(
1581 target: "ingest",
1582 phase = "persist_start",
1583 files = total_to_process,
1584 "phase B starting: persisting files incrementally as Phase A completes each one",
1585 );
1586
1587 for (idx, stage_result) in rx {
1591 if crate::shutdown_requested() {
1592 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1593 break;
1594 }
1595 let meta = meta_index.get(&idx).ok_or_else(|| {
1596 AppError::Internal(anyhow::anyhow!(
1597 "channel idx {idx} has no corresponding Process slot"
1598 ))
1599 })?;
1600 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1601 {
1602 SlotMeta::Process {
1603 file_str,
1604 derived_name,
1605 name_truncated,
1606 original_name,
1607 original_filename,
1608 } => (
1609 file_str,
1610 derived_name,
1611 name_truncated,
1612 original_name,
1613 original_filename,
1614 ),
1615 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1616 };
1617
1618 let conn = match conn_or_err.as_mut() {
1620 Ok(c) => c,
1621 Err(err_msg) => {
1622 let err_clone = err_msg.clone();
1623 output::emit_json_compact(&IngestFileEvent {
1624 file: file_str,
1625 name: derived_name,
1626 status: "failed",
1627 truncated: *name_truncated,
1628 original_name: original_name.clone(),
1629 original_filename: original_filename.as_deref(),
1630 error: Some(err_clone.clone()),
1631 memory_id: None,
1632 action: None,
1633 body_length: 0,
1634 backend_invoked: None,
1635 })?;
1636 failed += 1;
1637 if fail_fast {
1638 output::emit_json_compact(&IngestSummary {
1639 summary: true,
1640 dir: args.dir.display().to_string(),
1641 pattern: args.pattern.clone(),
1642 recursive: args.recursive,
1643 files_total: total,
1644 files_succeeded: succeeded,
1645 files_failed: failed,
1646 files_skipped: skipped,
1647 elapsed_ms: started.elapsed().as_millis() as u64,
1648 })?;
1649 return Err(AppError::Validation(format!(
1650 "ingest aborted on first failure: {err_clone}"
1651 )));
1652 }
1653 continue;
1654 }
1655 };
1656
1657 let outcome =
1658 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1659
1660 match outcome {
1661 Ok(FileSuccess {
1662 memory_id,
1663 action,
1664 body_length,
1665 backend_invoked: file_backend_invoked,
1666 }) => {
1667 output::emit_json_compact(&IngestFileEvent {
1668 file: file_str,
1669 name: derived_name,
1670 status: "indexed",
1671 truncated: *name_truncated,
1672 original_name: original_name.clone(),
1673 original_filename: original_filename.as_deref(),
1674 error: None,
1675 memory_id: Some(memory_id),
1676 action: Some(action),
1677 body_length,
1678 backend_invoked: file_backend_invoked,
1679 })?;
1680 succeeded += 1;
1681 }
1682 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1683 output::emit_json_compact(&IngestFileEvent {
1684 file: file_str,
1685 name: derived_name,
1686 status: "skipped",
1687 truncated: *name_truncated,
1688 original_name: original_name.clone(),
1689 original_filename: original_filename.as_deref(),
1690 error: Some(format!("{e}")),
1691 memory_id: None,
1692 action: Some("duplicate".to_string()),
1693 body_length: 0,
1694 backend_invoked: None,
1695 })?;
1696 skipped += 1;
1697 }
1698 Err(e) => {
1699 let err_msg = format!("{e}");
1700 output::emit_json_compact(&IngestFileEvent {
1701 file: file_str,
1702 name: derived_name,
1703 status: "failed",
1704 truncated: *name_truncated,
1705 original_name: original_name.clone(),
1706 original_filename: original_filename.as_deref(),
1707 error: Some(err_msg.clone()),
1708 memory_id: None,
1709 action: None,
1710 body_length: 0,
1711 backend_invoked: None,
1712 })?;
1713 failed += 1;
1714 if fail_fast {
1715 output::emit_json_compact(&IngestSummary {
1716 summary: true,
1717 dir: args.dir.display().to_string(),
1718 pattern: args.pattern.clone(),
1719 recursive: args.recursive,
1720 files_total: total,
1721 files_succeeded: succeeded,
1722 files_failed: failed,
1723 files_skipped: skipped,
1724 elapsed_ms: started.elapsed().as_millis() as u64,
1725 })?;
1726 return Err(AppError::Validation(format!(
1727 "ingest aborted on first failure: {err_msg}"
1728 )));
1729 }
1730 }
1731 }
1732 }
1733
1734 producer_handle
1736 .join()
1737 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1738
1739 if let Ok(ref conn) = conn_or_err {
1740 if succeeded > 0 {
1741 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1742 }
1743 }
1744
1745 output::emit_json_compact(&IngestSummary {
1746 summary: true,
1747 dir: args.dir.display().to_string(),
1748 pattern: args.pattern.clone(),
1749 recursive: args.recursive,
1750 files_total: total,
1751 files_succeeded: succeeded,
1752 files_failed: failed,
1753 files_skipped: skipped,
1754 elapsed_ms: started.elapsed().as_millis() as u64,
1755 })?;
1756
1757 Ok(())
1758}
1759
1760fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1766 ensure_db_ready(paths)?;
1767 let conn = open_rw(&paths.db)?;
1768 Ok(conn)
1769}
1770
1771pub(crate) fn collect_files(
1772 dir: &Path,
1773 pattern: &str,
1774 recursive: bool,
1775 out: &mut Vec<PathBuf>,
1776) -> Result<(), AppError> {
1777 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1778 for entry in entries {
1779 let entry = entry.map_err(AppError::Io)?;
1780 let path = entry.path();
1781 let file_type = entry.file_type().map_err(AppError::Io)?;
1782 if file_type.is_file() {
1783 let name = entry.file_name();
1784 let name_str = name.to_string_lossy();
1785 if matches_pattern(&name_str, pattern) {
1786 out.push(path);
1787 }
1788 } else if file_type.is_dir() && recursive {
1789 collect_files(&path, pattern, recursive, out)?;
1790 }
1791 }
1792 Ok(())
1793}
1794
1795fn matches_pattern(name: &str, pattern: &str) -> bool {
1796 if let Some(suffix) = pattern.strip_prefix('*') {
1797 name.ends_with(suffix)
1798 } else if let Some(prefix) = pattern.strip_suffix('*') {
1799 name.starts_with(prefix)
1800 } else {
1801 name == pattern
1802 }
1803}
1804
1805pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1816 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1817 let lowered: String = stem
1818 .nfd()
1819 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1820 .map(|c| {
1821 if c == '_' || c.is_whitespace() {
1822 '-'
1823 } else {
1824 c
1825 }
1826 })
1827 .map(|c| c.to_ascii_lowercase())
1828 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1829 .collect();
1830 let collapsed = collapse_dashes(&lowered);
1831 let trimmed_raw = collapsed.trim_matches('-').to_string();
1832 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1834 format!("doc-{trimmed_raw}")
1835 } else {
1836 trimmed_raw
1837 };
1838 if trimmed.len() > max_len {
1839 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1840 tracing::debug!(
1841 target: "ingest",
1842 original = %trimmed,
1843 truncated_to = %truncated,
1844 max_len = max_len,
1845 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1846 );
1847 (truncated, true, Some(trimmed))
1848 } else {
1849 (trimmed, false, None)
1850 }
1851}
1852
1853fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1866 if !taken.contains(base) {
1867 return Ok(base.to_string());
1868 }
1869 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1870 let candidate = format!("{base}-{suffix}");
1871 if !taken.contains(&candidate) {
1872 tracing::warn!(
1873 target: "ingest",
1874 base = %base,
1875 resolved = %candidate,
1876 suffix,
1877 "memory name collision resolved with numeric suffix"
1878 );
1879 return Ok(candidate);
1880 }
1881 }
1882 Err(AppError::Validation(format!(
1883 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1884 )))
1885}
1886
1887fn collapse_dashes(s: &str) -> String {
1888 let mut out = String::with_capacity(s.len());
1889 let mut prev_dash = false;
1890 for c in s.chars() {
1891 if c == '-' {
1892 if !prev_dash {
1893 out.push('-');
1894 }
1895 prev_dash = true;
1896 } else {
1897 out.push(c);
1898 prev_dash = false;
1899 }
1900 }
1901 out
1902}
1903
1904#[cfg(test)]
1905mod tests {
1906 use super::*;
1907 use std::path::PathBuf;
1908
1909 #[test]
1910 fn matches_pattern_suffix() {
1911 assert!(matches_pattern("foo.md", "*.md"));
1912 assert!(!matches_pattern("foo.txt", "*.md"));
1913 assert!(matches_pattern("foo.md", "*"));
1914 }
1915
1916 #[test]
1917 fn matches_pattern_prefix() {
1918 assert!(matches_pattern("README.md", "README*"));
1919 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1920 }
1921
1922 #[test]
1923 fn matches_pattern_exact() {
1924 assert!(matches_pattern("README.md", "README.md"));
1925 assert!(!matches_pattern("readme.md", "README.md"));
1926 }
1927
1928 #[test]
1929 fn derive_kebab_underscore_to_dash() {
1930 let p = PathBuf::from("/tmp/claude_code_headless.md");
1931 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1932 assert_eq!(name, "claude-code-headless");
1933 assert!(!truncated);
1934 assert!(original.is_none());
1935 }
1936
1937 #[test]
1938 fn derive_kebab_uppercase_lowered() {
1939 let p = PathBuf::from("/tmp/README.md");
1940 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1941 assert_eq!(name, "readme");
1942 assert!(!truncated);
1943 assert!(original.is_none());
1944 }
1945
1946 #[test]
1947 fn derive_kebab_strips_non_kebab_chars() {
1948 let p = PathBuf::from("/tmp/some@weird#name!.md");
1949 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1950 assert_eq!(name, "someweirdname");
1951 assert!(!truncated);
1952 assert!(original.is_none());
1953 }
1954
1955 #[test]
1958 fn derive_kebab_folds_accented_letters_to_ascii() {
1959 let p = PathBuf::from("/tmp/açaí.md");
1960 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1961 assert_eq!(name, "acai", "got '{name}'");
1962 }
1963
1964 #[test]
1965 fn derive_kebab_handles_naive_with_diaeresis() {
1966 let p = PathBuf::from("/tmp/naïve-test.md");
1967 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1968 assert_eq!(name, "naive-test", "got '{name}'");
1969 }
1970
1971 #[test]
1972 fn derive_kebab_drops_emoji_keeps_word() {
1973 let p = PathBuf::from("/tmp/🚀-rocket.md");
1974 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1975 assert_eq!(name, "rocket", "got '{name}'");
1976 }
1977
1978 #[test]
1979 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1980 let p = PathBuf::from("/tmp/açaí🦜.md");
1981 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1982 assert_eq!(name, "acai", "got '{name}'");
1983 }
1984
1985 #[test]
1986 fn derive_kebab_pure_emoji_yields_empty() {
1987 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1988 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1989 assert!(name.is_empty(), "got '{name}'");
1990 }
1991
1992 #[test]
1993 fn derive_kebab_collapses_consecutive_dashes() {
1994 let p = PathBuf::from("/tmp/a__b___c.md");
1995 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1996 assert_eq!(name, "a-b-c");
1997 assert!(!truncated);
1998 assert!(original.is_none());
1999 }
2000
2001 #[test]
2002 fn derive_kebab_truncates_to_60_chars() {
2003 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2004 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2005 assert!(name.len() <= 60, "got len {}", name.len());
2006 assert!(truncated);
2007 assert!(original.is_some());
2008 assert!(original.unwrap().len() > 60);
2009 }
2010
2011 #[test]
2012 fn collect_files_finds_md_files() {
2013 let tmp = tempfile::tempdir().expect("tempdir");
2014 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2015 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2016 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2017 let mut out = Vec::new();
2018 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2019 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2020 }
2021
2022 #[test]
2023 fn collect_files_recursive_descends_subdirs() {
2024 let tmp = tempfile::tempdir().expect("tempdir");
2025 let sub = tmp.path().join("sub");
2026 std::fs::create_dir(&sub).unwrap();
2027 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2028 std::fs::write(sub.join("b.md"), "y").unwrap();
2029 let mut out = Vec::new();
2030 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2031 assert_eq!(out.len(), 2);
2032 }
2033
2034 #[test]
2035 fn collect_files_non_recursive_skips_subdirs() {
2036 let tmp = tempfile::tempdir().expect("tempdir");
2037 let sub = tmp.path().join("sub");
2038 std::fs::create_dir(&sub).unwrap();
2039 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2040 std::fs::write(sub.join("b.md"), "y").unwrap();
2041 let mut out = Vec::new();
2042 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2043 assert_eq!(out.len(), 1);
2044 }
2045
2046 #[test]
2049 fn derive_kebab_long_basename_truncated_within_cap() {
2050 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2051 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2052 assert!(
2053 name.len() <= DERIVED_NAME_MAX_LEN,
2054 "truncated name must respect cap; got {} chars",
2055 name.len()
2056 );
2057 assert!(!name.is_empty());
2058 assert!(truncated);
2059 assert!(original.is_some());
2060 }
2061
2062 #[test]
2063 fn unique_name_returns_base_when_free() {
2064 let taken: BTreeSet<String> = BTreeSet::new();
2065 let resolved = unique_name("note", &taken).expect("must resolve");
2066 assert_eq!(resolved, "note");
2067 }
2068
2069 #[test]
2070 fn unique_name_appends_first_free_suffix_on_collision() {
2071 let mut taken: BTreeSet<String> = BTreeSet::new();
2072 taken.insert("note".to_string());
2073 taken.insert("note-1".to_string());
2074 let resolved = unique_name("note", &taken).expect("must resolve");
2075 assert_eq!(resolved, "note-2");
2076 }
2077
2078 #[test]
2079 fn unique_name_errors_after_collision_cap() {
2080 let mut taken: BTreeSet<String> = BTreeSet::new();
2081 taken.insert("note".to_string());
2082 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2083 taken.insert(format!("note-{i}"));
2084 }
2085 let err = unique_name("note", &taken).expect_err("must surface error");
2086 assert!(matches!(err, AppError::Validation(_)));
2087 }
2088
2089 #[test]
2092 fn validate_relation_format_accepts_valid_relations() {
2093 use crate::parsers::{is_canonical_relation, validate_relation_format};
2094 assert!(validate_relation_format("applies_to").is_ok());
2095 assert!(validate_relation_format("depends_on").is_ok());
2096 assert!(validate_relation_format("implements").is_ok());
2097 assert!(validate_relation_format("").is_err());
2098 assert!(is_canonical_relation("applies_to"));
2099 assert!(!is_canonical_relation("implements"));
2100 }
2101
2102 use serial_test::serial;
2105
2106 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2108 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2109 let prev = std::env::var(key).ok();
2110 match value {
2111 Some(v) => std::env::set_var(key, v),
2112 None => std::env::remove_var(key),
2113 }
2114 f();
2115 match prev {
2116 Some(p) => std::env::set_var(key, p),
2117 None => std::env::remove_var(key),
2118 }
2119 }
2120
2121 #[test]
2122 #[serial]
2123 fn env_low_memory_enabled_unset_returns_false() {
2124 with_env_var(None, || assert!(!env_low_memory_enabled()));
2125 }
2126
2127 #[test]
2128 #[serial]
2129 fn env_low_memory_enabled_empty_returns_false() {
2130 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2131 }
2132
2133 #[test]
2134 #[serial]
2135 fn env_low_memory_enabled_truthy_values_return_true() {
2136 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2137 with_env_var(Some(v), || {
2138 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2139 });
2140 }
2141 }
2142
2143 #[test]
2144 #[serial]
2145 fn env_low_memory_enabled_falsy_values_return_false() {
2146 for v in ["0", "false", "FALSE", "no", "off"] {
2147 with_env_var(Some(v), || {
2148 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2149 });
2150 }
2151 }
2152
2153 #[test]
2154 #[serial]
2155 fn env_low_memory_enabled_unrecognized_value_returns_false() {
2156 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2157 }
2158
2159 #[test]
2160 #[serial]
2161 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2162 with_env_var(None, || {
2163 assert_eq!(resolve_parallelism(true, Some(4)), 1);
2164 assert_eq!(resolve_parallelism(true, Some(8)), 1);
2165 assert_eq!(resolve_parallelism(true, None), 1);
2166 });
2167 }
2168
2169 #[test]
2170 #[serial]
2171 fn resolve_parallelism_env_forces_one_when_flag_off() {
2172 with_env_var(Some("1"), || {
2173 assert_eq!(resolve_parallelism(false, Some(4)), 1);
2174 assert_eq!(resolve_parallelism(false, None), 1);
2175 });
2176 }
2177
2178 #[test]
2179 #[serial]
2180 fn resolve_parallelism_falsy_env_does_not_override() {
2181 with_env_var(Some("0"), || {
2182 assert_eq!(resolve_parallelism(false, Some(4)), 4);
2183 });
2184 }
2185
2186 #[test]
2187 #[serial]
2188 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2189 with_env_var(None, || {
2190 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2191 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2192 });
2193 }
2194
2195 #[test]
2196 #[serial]
2197 fn resolve_parallelism_default_when_unset() {
2198 with_env_var(None, || {
2199 let p = resolve_parallelism(false, None);
2200 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2201 });
2202 }
2203
2204 #[test]
2205 fn ingest_args_parses_low_memory_flag_via_clap() {
2206 use clap::Parser;
2207 let cli = crate::cli::Cli::try_parse_from([
2210 "sqlite-graphrag",
2211 "ingest",
2212 "/tmp/dummy",
2213 "--type",
2214 "document",
2215 "--low-memory",
2216 ])
2217 .expect("parse must succeed");
2218 match cli.command {
2219 Some(crate::cli::Commands::Ingest(args)) => {
2220 assert!(args.low_memory, "--low-memory must set field to true");
2221 }
2222 _ => panic!("expected Ingest subcommand"),
2223 }
2224 }
2225
2226 #[test]
2227 fn ingest_args_low_memory_defaults_false() {
2228 use clap::Parser;
2229 let cli = crate::cli::Cli::try_parse_from([
2230 "sqlite-graphrag",
2231 "ingest",
2232 "/tmp/dummy",
2233 "--type",
2234 "document",
2235 ])
2236 .expect("parse must succeed");
2237 match cli.command {
2238 Some(crate::cli::Commands::Ingest(args)) => {
2239 assert!(!args.low_memory, "default must be false");
2240 }
2241 _ => panic!("expected Ingest subcommand"),
2242 }
2243 }
2244}