1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
118 )]
119 pub enable_ner: bool,
120
121 #[arg(
125 long,
126 default_value_t = true,
127 overrides_with = "no_auto_describe",
128 help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
129 )]
130 pub auto_describe: bool,
131 #[arg(
132 long = "no-auto-describe",
133 default_value_t = false,
134 help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
135 )]
136 pub no_auto_describe: bool,
137 #[arg(
138 long,
139 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
140 default_value = "fp32",
141 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
142 )]
143 pub gliner_variant: String,
144
145 #[arg(long, default_value_t = false, hide = true)]
147 pub skip_extraction: bool,
148
149 #[arg(long, default_value_t = false)]
151 pub fail_fast: bool,
152
153 #[arg(long, default_value_t = false)]
155 pub dry_run: bool,
156
157 #[arg(long, default_value_t = 10_000)]
159 pub max_files: usize,
160
161 #[arg(long)]
163 pub namespace: Option<String>,
164
165 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
167 pub db: Option<String>,
168
169 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
170 pub format: JsonOutputFormat,
171
172 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
173 pub json: bool,
174
175 #[arg(
177 long,
178 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
179 )]
180 pub ingest_parallelism: Option<usize>,
181
182 #[arg(
190 long,
191 default_value_t = false,
192 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
193 Recommended for environments with <4 GB available RAM or container/cgroup \
194 constraints. Trade-off: 3-4x longer wall time. Also honored via \
195 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
196 )]
197 pub low_memory: bool,
198
199 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
201 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
202 pub max_rss_mb: u64,
203
204 #[arg(long, default_value_t = 2, value_name = "N",
209 value_parser = clap::value_parser!(u64).range(1..=32),
210 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
211 pub llm_parallelism: u64,
212
213 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
218 help = "Maximum length for derived memory names (default: 60)")]
219 pub max_name_length: usize,
220
221 #[arg(long, value_enum, default_value_t = IngestMode::None)]
223 pub mode: IngestMode,
224
225 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
227 pub claude_binary: Option<std::path::PathBuf>,
228
229 #[arg(long)]
231 pub claude_model: Option<String>,
232
233 #[arg(long, default_value_t = false)]
235 pub resume: bool,
236
237 #[arg(long, default_value_t = false)]
239 pub retry_failed: bool,
240
241 #[arg(long, default_value_t = false)]
243 pub keep_queue: bool,
244
245 #[arg(long)]
247 pub queue_db: Option<String>,
248
249 #[arg(long, default_value_t = 60)]
251 pub rate_limit_wait: u64,
252
253 #[arg(long)]
255 pub max_cost_usd: Option<f64>,
256
257 #[arg(
259 long,
260 default_value_t = 300,
261 help = "Timeout in seconds for each claude -p invocation (default: 300)"
262 )]
263 pub claude_timeout: u64,
264
265 #[arg(
267 long,
268 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
269 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
270 )]
271 pub codex_binary: Option<PathBuf>,
272
273 #[arg(
275 long,
276 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
277 )]
278 pub codex_model: Option<String>,
279
280 #[arg(
282 long,
283 default_value_t = 300,
284 help = "Timeout in seconds for each codex exec invocation (default: 300)"
285 )]
286 pub codex_timeout: u64,
287
288 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
290 pub opencode_binary: Option<PathBuf>,
291
292 #[arg(
294 long,
295 value_name = "MODEL",
296 env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
297 help = "Model override for OpenCode extraction"
298 )]
299 pub opencode_model: Option<String>,
300
301 #[arg(
303 long,
304 value_name = "SECONDS",
305 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
306 default_value_t = 300,
307 help = "Timeout in seconds for each opencode run invocation (default: 300)"
308 )]
309 pub opencode_timeout: u64,
310
311 #[arg(long, value_name = "SECONDS")]
314 pub wait_job_singleton: Option<u64>,
315
316 #[arg(long, default_value_t = false)]
319 pub force_job_singleton: bool,
320
321 #[arg(
324 long,
325 default_value_t = false,
326 help = "Run enrich --operation memory-bindings after all files are ingested"
327 )]
328 pub enrich_after: bool,
329
330 #[arg(
335 long,
336 default_value_t = false,
337 help = "Update existing memories on name collision instead of skipping (idempotent re-ingest)"
338 )]
339 pub force_merge: bool,
340}
341
342#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
344pub enum IngestMode {
345 None,
347 Gliner,
349 ClaudeCode,
351 Codex,
353 #[value(name = "opencode")]
355 Opencode,
356}
357
358fn env_low_memory_enabled() -> bool {
363 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
364 Ok(v) if v.is_empty() => false,
365 Ok(v) => match v.to_lowercase().as_str() {
366 "1" | "true" | "yes" | "on" => true,
367 "0" | "false" | "no" | "off" => false,
368 other => {
369 tracing::warn!(
370 target: "ingest",
371 value = %other,
372 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
373 );
374 false
375 }
376 },
377 Err(_) => false,
378 }
379}
380
381fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
393 let env_flag = env_low_memory_enabled();
394 let low_memory = low_memory_flag || env_flag;
395
396 if low_memory {
397 if let Some(n) = ingest_parallelism {
398 if n > 1 {
399 tracing::warn!(
400 target: "ingest",
401 requested = n,
402 "--ingest-parallelism overridden by --low-memory; using 1"
403 );
404 }
405 }
406 if low_memory_flag {
407 tracing::info!(
408 target: "ingest",
409 source = "flag",
410 "low-memory mode enabled: forcing --ingest-parallelism 1"
411 );
412 } else {
413 tracing::info!(
414 target: "ingest",
415 source = "env",
416 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
417 );
418 }
419 return 1;
420 }
421
422 ingest_parallelism
423 .unwrap_or_else(|| {
424 std::thread::available_parallelism()
425 .map(|v| v.get() / 2)
426 .unwrap_or(1)
427 .clamp(1, 4)
428 })
429 .max(1)
430}
431
432#[derive(Serialize)]
433struct IngestFileEvent<'a> {
434 file: &'a str,
435 name: &'a str,
436 status: &'a str,
437 truncated: bool,
439 #[serde(skip_serializing_if = "Option::is_none")]
441 original_name: Option<String>,
442 #[serde(skip_serializing_if = "Option::is_none")]
444 original_filename: Option<&'a str>,
445 #[serde(skip_serializing_if = "Option::is_none")]
446 error: Option<String>,
447 #[serde(skip_serializing_if = "Option::is_none")]
448 memory_id: Option<i64>,
449 #[serde(skip_serializing_if = "Option::is_none")]
450 action: Option<String>,
451 body_length: usize,
453 #[serde(skip_serializing_if = "Option::is_none")]
458 backend_invoked: Option<&'a str>,
459}
460
461#[derive(Serialize)]
465struct IngestDryRunBudget<'a> {
466 budget: bool,
467 file: &'a str,
468 name: &'a str,
469 bytes: usize,
470 chunk_count: usize,
471 token_count: usize,
472 partition_count: usize,
473 exceeds_limits: bool,
474}
475
476#[derive(Serialize)]
477struct IngestSummary {
478 summary: bool,
479 dir: String,
480 pattern: String,
481 recursive: bool,
482 files_total: usize,
483 files_succeeded: usize,
484 files_failed: usize,
485 files_skipped: usize,
486 elapsed_ms: u64,
487}
488
489#[derive(Debug)]
491struct FileSuccess {
492 memory_id: i64,
493 action: String,
494 body_length: usize,
495 backend_invoked: Option<&'static str>,
496}
497
498#[derive(Serialize)]
501struct StageProgressEvent<'a> {
502 schema_version: u8,
503 event: &'a str,
504 path: &'a str,
505 ms: u64,
506 entities: usize,
507 relationships: usize,
508}
509
510struct StagedFile {
513 body: String,
514 body_hash: String,
515 snippet: String,
516 name: String,
517 description: String,
518 embedding: Option<Vec<f32>>,
519 chunk_embeddings: Option<Vec<Vec<f32>>>,
520 chunks_info: Vec<crate::chunking::Chunk>,
521 entities: Vec<NewEntity>,
522 relationships: Vec<NewRelationship>,
523 entity_embeddings: Option<Vec<Vec<f32>>>,
524 urls: Vec<crate::extraction::ExtractedUrl>,
525 backend_invoked: Option<&'static str>,
530}
531
532#[allow(clippy::too_many_arguments)]
538fn stage_file(
539 _idx: usize,
540 path: &Path,
541 name: &str,
542 paths: &AppPaths,
543 enable_ner: bool,
544 gliner_variant: crate::extraction::GlinerVariant,
545 max_rss_mb: u64,
546 llm_parallelism: usize,
547 llm_backend: crate::cli::LlmBackendChoice,
548 embedding_backend: crate::cli::EmbeddingBackendChoice,
549 auto_describe: bool,
550) -> Result<Vec<StagedFile>, AppError> {
551 use crate::constants::*;
552
553 if name.len() > MAX_MEMORY_NAME_LEN {
554 return Err(AppError::LimitExceeded(
555 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
556 ));
557 }
558 if name.starts_with("__") {
559 return Err(AppError::Validation(
560 crate::i18n::validation::reserved_name(),
561 ));
562 }
563 {
564 let slug_re = crate::constants::name_slug_regex();
565 if !slug_re.is_match(name) {
566 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
567 name,
568 )));
569 }
570 }
571
572 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
573 if file_size > MAX_MEMORY_BODY_LEN as u64 {
574 return Err(AppError::LimitExceeded(
575 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
576 ));
577 }
578 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
579 if raw_body.len() > MAX_MEMORY_BODY_LEN {
580 return Err(AppError::LimitExceeded(
581 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
582 ));
583 }
584 if raw_body.trim().is_empty() {
585 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
586 }
587
588 let description = if auto_describe {
589 crate::commands::ingest_heuristics::extract_heuristic_description(
590 &raw_body,
591 Some(&path.display().to_string()),
592 )
593 } else {
594 format!("ingested from {}", path.display())
595 };
596 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
597 return Err(AppError::Validation(
598 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
599 ));
600 }
601
602 let partitions = chunking::split_body_by_sections(&raw_body);
607 let total_parts = partitions.len();
608 let mut staged = Vec::with_capacity(total_parts);
609 for (part_idx, part_body) in partitions.into_iter().enumerate() {
610 let part_name = if total_parts == 1 {
611 name.to_string()
612 } else {
613 format!("{name}-part-{}", part_idx + 1)
614 };
615 if part_name.len() > MAX_MEMORY_NAME_LEN {
616 return Err(AppError::LimitExceeded(
617 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
618 ));
619 }
620 let part_description = if total_parts == 1 {
621 description.clone()
622 } else {
623 partition_description(&description, part_idx + 1, total_parts)
624 };
625 staged.push(stage_one_body(
626 part_body,
627 part_name,
628 part_description,
629 paths,
630 enable_ner,
631 gliner_variant,
632 max_rss_mb,
633 llm_parallelism,
634 llm_backend,
635 embedding_backend,
636 )?);
637 }
638 Ok(staged)
639}
640
641fn partition_description(base: &str, part: usize, total: usize) -> String {
645 let suffix = format!(" (part {part}/{total})");
646 let max = crate::constants::MAX_MEMORY_DESCRIPTION_LEN;
647 if base.len() + suffix.len() <= max {
648 return format!("{base}{suffix}");
649 }
650 let mut cut = max.saturating_sub(suffix.len()).min(base.len());
651 while cut > 0 && !base.is_char_boundary(cut) {
652 cut -= 1;
653 }
654 format!("{}{}", &base[..cut], suffix)
655}
656
657#[allow(clippy::too_many_arguments)]
661fn stage_one_body(
662 raw_body: String,
663 name: String,
664 description: String,
665 paths: &AppPaths,
666 enable_ner: bool,
667 gliner_variant: crate::extraction::GlinerVariant,
668 max_rss_mb: u64,
669 llm_parallelism: usize,
670 llm_backend: crate::cli::LlmBackendChoice,
671 embedding_backend: crate::cli::EmbeddingBackendChoice,
672) -> Result<StagedFile, AppError> {
673 use crate::constants::*;
674
675 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
676 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
677 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
678 if enable_ner {
679 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
680 Ok(extracted) => {
681 extracted_urls = extracted.urls;
682 extracted_entities = extracted
687 .entities
688 .into_iter()
689 .map(|e| NewEntity {
690 name: e.name,
691 entity_type: crate::entity_type::EntityType::Concept,
692 description: None,
693 })
694 .collect();
695 extracted_relationships.clear();
700
701 if extracted_entities.len() > max_entities_per_memory() {
702 extracted_entities.truncate(max_entities_per_memory());
703 }
704 if extracted_relationships.len() > max_relationships_per_memory() {
705 extracted_relationships.truncate(max_relationships_per_memory());
706 }
707 }
708 Err(e) => {
709 tracing::warn!(
710 target: "ingest",
711 file = %name,
712 "auto-extraction failed (graceful degradation): {e:#}"
713 );
714 }
715 }
716 }
717
718 for rel in &mut extracted_relationships {
719 rel.relation = crate::parsers::normalize_relation(&rel.relation);
720 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
721 return Err(AppError::Validation(format!(
722 "{e} for relationship '{}' -> '{}'",
723 rel.source, rel.target
724 )));
725 }
726 crate::parsers::warn_if_non_canonical(&rel.relation);
727 if !(0.0..=1.0).contains(&rel.strength) {
728 return Err(AppError::Validation(format!(
729 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
730 rel.strength, rel.source, rel.target
731 )));
732 }
733 }
734
735 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
736 let snippet: String = raw_body.chars().take(200).collect();
737
738 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
739 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
740 return Err(AppError::LimitExceeded(format!(
741 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
742 chunks_info.len(),
743 REMEMBER_MAX_SAFE_MULTI_CHUNKS
744 )));
745 }
746
747 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
748 let skip_embed = crate::embedder::should_skip_embedding_on_failure();
749 let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
753 .len()
754 == 1
755 {
756 match crate::embedder::embed_passage_with_embedding_choice(
757 &paths.models,
758 &raw_body,
759 embedding_backend,
760 llm_backend,
761 ) {
762 Ok((v, k)) => (Some(v), Some(k.as_str())),
763 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
764 Err(e) if skip_embed => {
765 tracing::warn!(error = %e, file = %name, "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
766 (None, None)
767 }
768 Err(e) => return Err(e),
769 }
770 } else {
771 let chunk_texts: Vec<String> = chunks_info
774 .iter()
775 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
776 .collect();
777 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
778 if rss > max_rss_mb {
779 tracing::error!(
780 target: "ingest",
781 rss_mb = rss,
782 max_rss_mb = max_rss_mb,
783 file = %name,
784 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
785 );
786 return Err(AppError::LowMemory {
787 available_mb: crate::memory_guard::available_memory_mb(),
788 required_mb: max_rss_mb,
789 });
790 }
791 }
792 match crate::embedder::embed_passages_parallel_with_embedding_choice(
793 &paths.models,
794 &chunk_texts,
795 llm_parallelism,
796 crate::embedder::chunk_embed_batch_size(),
797 embedding_backend,
798 llm_backend,
799 ) {
800 Ok(chunk_embeddings) => {
801 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
802 chunk_embeddings_opt = Some(chunk_embeddings);
803 (Some(aggregated), None)
806 }
807 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
808 Err(e) if skip_embed => {
809 tracing::warn!(error = %e, file = %name, "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
810 (None, None)
811 }
812 Err(e) => return Err(e),
813 }
814 };
815
816 let entity_texts: Vec<String> = extracted_entities
818 .iter()
819 .map(|entity| match &entity.description {
820 Some(desc) => format!("{} {}", entity.name, desc),
821 None => entity.name.clone(),
822 })
823 .collect();
824 let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
828 &paths.models,
829 &entity_texts,
830 llm_parallelism,
831 embedding_backend,
832 llm_backend,
833 ) {
834 Ok((entity_embeddings, embed_cache_stats)) => {
835 if embed_cache_stats.hits > 0 {
836 tracing::debug!(
837 hits = embed_cache_stats.hits,
838 misses = embed_cache_stats.misses,
839 requested = embed_cache_stats.requested,
840 "G56: entity embed cache hit (ingest)"
841 );
842 }
843 Some(entity_embeddings)
844 }
845 Err(e) if skip_embed => {
846 tracing::warn!(error = %e, file = %name, "ingest: entity embedding failed; --skip-embedding-on-failure active");
847 None
848 }
849 Err(e) => return Err(e),
850 };
851
852 Ok(StagedFile {
853 body: raw_body,
854 body_hash,
855 snippet,
856 name,
857 description,
858 embedding,
859 chunk_embeddings: chunk_embeddings_opt,
860 chunks_info,
861 entities: extracted_entities,
862 relationships: extracted_relationships,
863 entity_embeddings: entity_embeddings_opt,
864 urls: extracted_urls,
865 backend_invoked,
866 })
867}
868
869fn link_staged_graph(
873 tx: &Connection,
874 namespace: &str,
875 memory_id: i64,
876 staged: &StagedFile,
877) -> Result<(), AppError> {
878 if staged.entities.is_empty() && staged.relationships.is_empty() {
879 return Ok(());
880 }
881 for (idx, entity) in staged.entities.iter().enumerate() {
882 let entity_id = entities::upsert_entity(tx, namespace, entity)?;
883 if let Some(ref entity_embeddings) = staged.entity_embeddings {
884 if let Some(entity_embedding) = entity_embeddings.get(idx) {
885 entities::upsert_entity_vec(
886 tx,
887 entity_id,
888 namespace,
889 entity.entity_type,
890 entity_embedding,
891 &entity.name,
892 )?;
893 }
894 }
895 entities::link_memory_entity(tx, memory_id, entity_id)?;
896 }
897 let entity_types: std::collections::HashMap<&str, EntityType> = staged
898 .entities
899 .iter()
900 .map(|entity| (entity.name.as_str(), entity.entity_type))
901 .collect();
902
903 let mut affected_entity_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
904 for entity in &staged.entities {
905 if let Some(eid) = entities::find_entity_id(tx, namespace, &entity.name)? {
906 affected_entity_ids.insert(eid);
907 }
908 }
909
910 for rel in &staged.relationships {
911 let source_entity = NewEntity {
912 name: rel.source.clone(),
913 entity_type: entity_types
914 .get(rel.source.as_str())
915 .copied()
916 .unwrap_or(EntityType::Concept),
917 description: None,
918 };
919 let target_entity = NewEntity {
920 name: rel.target.clone(),
921 entity_type: entity_types
922 .get(rel.target.as_str())
923 .copied()
924 .unwrap_or(EntityType::Concept),
925 description: None,
926 };
927 let source_id = entities::upsert_entity(tx, namespace, &source_entity)?;
928 let target_id = entities::upsert_entity(tx, namespace, &target_entity)?;
929 let rel_id = entities::upsert_relationship(tx, namespace, source_id, target_id, rel)?;
930 entities::link_memory_relationship(tx, memory_id, rel_id)?;
931 affected_entity_ids.insert(source_id);
932 affected_entity_ids.insert(target_id);
933 }
934
935 for &eid in &affected_entity_ids {
936 entities::recalculate_degree(tx, eid)?;
937 }
938 Ok(())
939}
940
941fn persist_staged(
949 conn: &mut Connection,
950 namespace: &str,
951 memory_type: &str,
952 staged: StagedFile,
953 force_merge: bool,
954) -> Result<FileSuccess, AppError> {
955 {
956 let active_count: u32 = conn.query_row(
957 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
958 [],
959 |r| r.get::<_, i64>(0).map(|v| v as u32),
960 )?;
961 let ns_exists: bool = conn.query_row(
962 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
963 rusqlite::params![namespace],
964 |r| r.get::<_, i64>(0).map(|v| v > 0),
965 )?;
966 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
967 return Err(AppError::NamespaceError(format!(
968 "active namespace limit of {} exceeded while creating '{namespace}'",
969 crate::constants::MAX_NAMESPACES_ACTIVE
970 )));
971 }
972 }
973
974 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
975 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
976
977 let new_memory = NewMemory {
978 namespace: namespace.to_string(),
979 name: staged.name.clone(),
980 memory_type: memory_type.to_string(),
981 description: staged.description.clone(),
982 body: staged.body.clone(),
983 body_hash: staged.body_hash.clone(),
984 session_id: None,
985 source: "agent".to_string(),
986 metadata: serde_json::json!({}),
987 };
988 let body_length = new_memory.body.len();
989 let metadata_json = serde_json::to_string(&new_memory.metadata)?;
990
991 match existing_memory {
992 Some((existing_id, _updated_at, _version)) => {
993 if !force_merge {
994 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
995 &staged.name,
996 namespace,
997 )));
998 }
999
1000 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1003
1004 let (old_name, old_desc, old_body): (String, String, String) = tx.query_row(
1005 "SELECT name, description, body FROM memories WHERE id = ?1",
1006 rusqlite::params![existing_id],
1007 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1008 )?;
1009
1010 let next_v = versions::next_version(&tx, existing_id)?;
1011 memories::update(&tx, existing_id, &new_memory, None)?;
1012 memories::sync_fts_after_update(
1013 &tx,
1014 existing_id,
1015 &old_name,
1016 &old_desc,
1017 &old_body,
1018 &staged.name,
1019 &staged.description,
1020 &new_memory.body,
1021 )?;
1022 versions::insert_version(
1023 &tx,
1024 existing_id,
1025 next_v,
1026 &staged.name,
1027 memory_type,
1028 &staged.description,
1029 &new_memory.body,
1030 &metadata_json,
1031 None,
1032 "edit",
1033 )?;
1034
1035 storage_chunks::delete_chunks(&tx, existing_id)?;
1037 if let Some(ref emb) = staged.embedding {
1038 memories::upsert_vec(
1039 &tx,
1040 existing_id,
1041 namespace,
1042 memory_type,
1043 emb,
1044 &staged.name,
1045 &staged.snippet,
1046 )?;
1047 }
1048 if staged.chunks_info.len() > 1 {
1049 storage_chunks::insert_chunk_slices(
1050 &tx,
1051 existing_id,
1052 &new_memory.body,
1053 &staged.chunks_info,
1054 )?;
1055 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1056 for (i, emb) in chunk_embeddings.iter().enumerate() {
1057 storage_chunks::upsert_chunk_vec(
1058 &tx,
1059 i as i64,
1060 existing_id,
1061 i as i32,
1062 emb,
1063 )?;
1064 }
1065 }
1066 }
1067
1068 link_staged_graph(&tx, namespace, existing_id, &staged)?;
1069 tx.commit()?;
1070
1071 Ok(FileSuccess {
1072 memory_id: existing_id,
1073 action: "updated".to_string(),
1074 body_length,
1075 backend_invoked: staged.backend_invoked,
1076 })
1077 }
1078 None => {
1079 if let Some(hash_id) = duplicate_hash_id {
1082 return Err(AppError::Duplicate(format!(
1083 "identical body already stored as memory id {hash_id} (dedup by body_hash); skipping '{}'",
1084 staged.name
1085 )));
1086 }
1087
1088 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1089 let memory_id = memories::insert(&tx, &new_memory)?;
1090 versions::insert_version(
1091 &tx,
1092 memory_id,
1093 1,
1094 &staged.name,
1095 memory_type,
1096 &staged.description,
1097 &new_memory.body,
1098 &metadata_json,
1099 None,
1100 "create",
1101 )?;
1102 if let Some(ref emb) = staged.embedding {
1103 memories::upsert_vec(
1104 &tx,
1105 memory_id,
1106 namespace,
1107 memory_type,
1108 emb,
1109 &staged.name,
1110 &staged.snippet,
1111 )?;
1112 }
1113 if staged.chunks_info.len() > 1 {
1114 storage_chunks::insert_chunk_slices(
1115 &tx,
1116 memory_id,
1117 &new_memory.body,
1118 &staged.chunks_info,
1119 )?;
1120 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1121 for (i, emb) in chunk_embeddings.iter().enumerate() {
1122 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
1123 }
1124 }
1125 }
1126 link_staged_graph(&tx, namespace, memory_id, &staged)?;
1127 tx.commit()?;
1128
1129 if !staged.urls.is_empty() {
1130 let url_entries: Vec<storage_urls::MemoryUrl> = staged
1131 .urls
1132 .into_iter()
1133 .map(|u| storage_urls::MemoryUrl {
1134 url: u.url,
1135 offset: Some(u.start as i64),
1136 })
1137 .collect();
1138 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
1139 }
1140
1141 Ok(FileSuccess {
1142 memory_id,
1143 action: "created".to_string(),
1144 body_length,
1145 backend_invoked: staged.backend_invoked,
1146 })
1147 }
1148 }
1149}
1150
1151fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1159 value == default
1160}
1161
1162fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
1177 const DEFAULT_TIMEOUT: u64 = 300;
1178 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
1179
1180 let mut conflicts: Vec<String> = Vec::new();
1181
1182 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
1183
1184 if is_local_mode {
1185 if args.claude_binary.is_some() {
1186 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
1187 }
1188 if args.claude_model.is_some() {
1189 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
1190 }
1191 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1192 conflicts.push(format!(
1193 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1194 args.claude_timeout
1195 ));
1196 }
1197 if args.codex_binary.is_some() {
1198 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
1199 }
1200 if args.codex_model.is_some() {
1201 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
1202 }
1203 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1204 conflicts.push(format!(
1205 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1206 args.codex_timeout
1207 ));
1208 }
1209 if args.opencode_binary.is_some() {
1210 conflicts
1211 .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
1212 }
1213 if args.opencode_model.is_some() {
1214 conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
1215 }
1216 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1217 conflicts.push(format!(
1218 "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1219 args.opencode_timeout
1220 ));
1221 }
1222 if args.max_cost_usd.is_some() {
1223 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1224 }
1225 if args.resume {
1226 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1227 }
1228 if args.retry_failed {
1229 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1230 }
1231 if args.keep_queue {
1232 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1233 }
1234 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1235 conflicts.push(format!(
1236 "--rate-limit-wait={} is ignored when --mode is none or gliner",
1237 args.rate_limit_wait
1238 ));
1239 }
1240 }
1241
1242 match args.mode {
1243 IngestMode::ClaudeCode => {
1244 if args.codex_binary.is_some() {
1245 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1246 }
1247 if args.codex_model.is_some() {
1248 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1249 }
1250 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1251 conflicts.push(format!(
1252 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1253 args.codex_timeout
1254 ));
1255 }
1256 if args.opencode_binary.is_some() {
1257 conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1258 }
1259 if args.opencode_model.is_some() {
1260 conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1261 }
1262 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1263 conflicts.push(format!(
1264 "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1265 args.opencode_timeout
1266 ));
1267 }
1268 }
1269 IngestMode::Codex => {
1270 if args.claude_binary.is_some() {
1271 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1272 }
1273 if args.claude_model.is_some() {
1274 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1275 }
1276 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1277 conflicts.push(format!(
1278 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1279 args.claude_timeout
1280 ));
1281 }
1282 if args.max_cost_usd.is_some() {
1283 conflicts.push(
1284 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1285 .to_string(),
1286 );
1287 }
1288 if args.resume {
1289 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1290 }
1291 if args.retry_failed {
1292 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1293 }
1294 if args.keep_queue {
1295 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1296 }
1297 if args.opencode_binary.is_some() {
1298 conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1299 }
1300 if args.opencode_model.is_some() {
1301 conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1302 }
1303 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1304 conflicts.push(format!(
1305 "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1306 args.opencode_timeout
1307 ));
1308 }
1309 }
1310 IngestMode::Opencode => {
1311 if args.claude_binary.is_some() {
1312 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1313 }
1314 if args.claude_model.is_some() {
1315 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1316 }
1317 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1318 conflicts.push(format!(
1319 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1320 args.claude_timeout
1321 ));
1322 }
1323 if args.codex_binary.is_some() {
1324 conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1325 }
1326 if args.codex_model.is_some() {
1327 conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1328 }
1329 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1330 conflicts.push(format!(
1331 "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1332 args.codex_timeout
1333 ));
1334 }
1335 if args.max_cost_usd.is_some() {
1336 conflicts.push(
1337 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1338 .to_string(),
1339 );
1340 }
1341 if args.resume {
1342 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1343 }
1344 if args.retry_failed {
1345 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1346 }
1347 if args.keep_queue {
1348 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1349 }
1350 }
1351 IngestMode::None | IngestMode::Gliner => {}
1352 }
1353
1354 if !conflicts.is_empty() {
1355 return Err(AppError::Validation(format!(
1356 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
1357 args.mode,
1358 conflicts.join("\n - ")
1359 )));
1360 }
1361
1362 Ok(())
1363}
1364
1365#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1368pub fn run(
1369 args: IngestArgs,
1370 llm_backend: crate::cli::LlmBackendChoice,
1371 embedding_backend: crate::cli::EmbeddingBackendChoice,
1372) -> Result<(), AppError> {
1373 validate_mode_conditional_flags_ingest(&args)?;
1376 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1377 if args.mode == IngestMode::ClaudeCode {
1378 return super::ingest_claude::run_claude_ingest(&args, embedding_backend, llm_backend);
1379 }
1380 if args.mode == IngestMode::Codex {
1381 return super::ingest_codex::run_codex_ingest(&args);
1382 }
1383 if args.mode == IngestMode::Opencode {
1384 return super::ingest_opencode::run_opencode_ingest(&args);
1385 }
1386
1387 let started = std::time::Instant::now();
1388
1389 if !args.dir.exists() {
1390 return Err(AppError::Validation(format!(
1391 "directory not found: {}",
1392 args.dir.display()
1393 )));
1394 }
1395 if !args.dir.is_dir() {
1396 return Err(AppError::Validation(format!(
1397 "path is not a directory: {}",
1398 args.dir.display()
1399 )));
1400 }
1401
1402 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1403 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1404 files.sort_unstable();
1405
1406 if files.len() > args.max_files {
1407 return Err(AppError::Validation(format!(
1408 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1409 files.len(),
1410 args.max_files
1411 )));
1412 }
1413
1414 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1415 let memory_type_str = args.r#type.as_str().to_string();
1416
1417 let paths = AppPaths::resolve(args.db.as_deref())?;
1418 let mut conn_or_err = match init_storage(&paths) {
1419 Ok(c) => Ok(c),
1420 Err(e) => Err(format!("{e}")),
1421 };
1422
1423 let mut succeeded: usize = 0;
1424 let mut failed: usize = 0;
1425 let mut skipped: usize = 0;
1426 let total = files.len();
1427
1428 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1431
1432 enum SlotMeta {
1438 Skip {
1439 file_str: String,
1440 derived_base: String,
1441 name_truncated: bool,
1442 original_name: Option<String>,
1443 original_filename: Option<String>,
1444 reason: String,
1445 },
1446 Process {
1447 file_str: String,
1448 derived_name: String,
1449 name_truncated: bool,
1450 original_name: Option<String>,
1451 original_filename: Option<String>,
1452 },
1453 }
1454
1455 struct ProcessItem {
1456 idx: usize,
1457 path: PathBuf,
1458 file_str: String,
1459 derived_name: String,
1460 }
1461
1462 let files_cap = files.len();
1463 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1464 slots_meta.try_reserve(files_cap).map_err(|_| {
1465 AppError::LimitExceeded(format!(
1466 "allocation of {files_cap} slot metadata entries would exceed available memory"
1467 ))
1468 })?;
1469 let mut process_items: Vec<ProcessItem> = Vec::new();
1470 process_items.try_reserve(files_cap).map_err(|_| {
1471 AppError::LimitExceeded(format!(
1472 "allocation of {files_cap} process items would exceed available memory"
1473 ))
1474 })?;
1475 let mut truncations: Vec<(String, String)> = Vec::new();
1476 truncations.try_reserve(files_cap).map_err(|_| {
1477 AppError::LimitExceeded(format!(
1478 "allocation of {files_cap} truncation entries would exceed available memory"
1479 ))
1480 })?;
1481
1482 let max_name_length = args.max_name_length;
1483 for path in &files {
1484 let file_str = path.to_string_lossy().into_owned();
1485 let (derived_base, name_truncated, original_name) =
1486 derive_kebab_name(path, max_name_length);
1487 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1488
1489 if name_truncated {
1490 if let Some(ref orig) = original_name {
1491 truncations.push((orig.clone(), derived_base.clone()));
1492 }
1493 }
1494
1495 if derived_base.is_empty() {
1496 let orig_filename = if !original_basename.is_empty() {
1498 Some(original_basename.to_string())
1499 } else {
1500 None
1501 };
1502 slots_meta.push(SlotMeta::Skip {
1503 file_str,
1504 derived_base: String::new(),
1505 name_truncated: false,
1506 original_name: None,
1507 original_filename: orig_filename,
1508 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1509 });
1510 continue;
1511 }
1512
1513 match unique_name(&derived_base, &taken_names) {
1514 Ok(derived_name) => {
1515 taken_names.insert(derived_name.clone());
1516 let idx = slots_meta.len();
1517 let orig_filename = if original_basename != derived_name {
1519 Some(original_basename.to_string())
1520 } else {
1521 None
1522 };
1523 process_items.push(ProcessItem {
1524 idx,
1525 path: path.clone(),
1526 file_str: file_str.clone(),
1527 derived_name: derived_name.clone(),
1528 });
1529 slots_meta.push(SlotMeta::Process {
1530 file_str,
1531 derived_name,
1532 name_truncated,
1533 original_name,
1534 original_filename: orig_filename,
1535 });
1536 }
1537 Err(e) => {
1538 let orig_filename = if original_basename != derived_base {
1539 Some(original_basename.to_string())
1540 } else {
1541 None
1542 };
1543 slots_meta.push(SlotMeta::Skip {
1544 file_str,
1545 derived_base,
1546 name_truncated,
1547 original_name,
1548 original_filename: orig_filename,
1549 reason: e.to_string(),
1550 });
1551 }
1552 }
1553 }
1554
1555 if !truncations.is_empty() {
1556 tracing::info!(
1557 target: "ingest",
1558 count = truncations.len(),
1559 max_name_length = max_name_length,
1560 max_len = DERIVED_NAME_MAX_LEN,
1561 "derived names truncated; pass -vv (debug) for per-file detail"
1562 );
1563 }
1564
1565 if args.dry_run {
1567 for meta in &slots_meta {
1568 match meta {
1569 SlotMeta::Skip {
1570 file_str,
1571 derived_base,
1572 name_truncated,
1573 original_name,
1574 original_filename,
1575 reason,
1576 } => {
1577 output::emit_json_compact(&IngestFileEvent {
1578 file: file_str,
1579 name: derived_base,
1580 status: "skip",
1581 truncated: *name_truncated,
1582 original_name: original_name.clone(),
1583 original_filename: original_filename.as_deref(),
1584 error: Some(reason.clone()),
1585 memory_id: None,
1586 action: None,
1587 body_length: 0,
1588 backend_invoked: None,
1589 })?;
1590 }
1591 SlotMeta::Process {
1592 file_str,
1593 derived_name,
1594 name_truncated,
1595 original_name,
1596 original_filename,
1597 } => {
1598 output::emit_json_compact(&IngestFileEvent {
1599 file: file_str,
1600 name: derived_name,
1601 status: "preview",
1602 truncated: *name_truncated,
1603 original_name: original_name.clone(),
1604 original_filename: original_filename.as_deref(),
1605 error: None,
1606 memory_id: None,
1607 action: None,
1608 body_length: 0,
1609 backend_invoked: None,
1610 })?;
1611
1612 match std::fs::read_to_string(file_str) {
1616 Ok(body) => {
1617 let budget = chunking::assess_body_budget(&body);
1618 output::emit_json_compact(&IngestDryRunBudget {
1619 budget: true,
1620 file: file_str,
1621 name: derived_name,
1622 bytes: budget.bytes,
1623 chunk_count: budget.chunk_count,
1624 token_count: budget.approx_tokens,
1625 partition_count: budget.partition_count,
1626 exceeds_limits: budget.exceeds_limits,
1627 })?;
1628 }
1629 Err(e) => {
1630 tracing::warn!(
1631 target: "ingest",
1632 file = %file_str,
1633 "dry-run: could not read file for budget assessment: {e}"
1634 );
1635 }
1636 }
1637 }
1638 }
1639 }
1640 output::emit_json_compact(&IngestSummary {
1641 summary: true,
1642 dir: args.dir.to_string_lossy().into_owned(),
1643 pattern: args.pattern.clone(),
1644 recursive: args.recursive,
1645 files_total: total,
1646 files_succeeded: 0,
1647 files_failed: 0,
1648 files_skipped: 0,
1649 elapsed_ms: started.elapsed().as_millis() as u64,
1650 })?;
1651 return Ok(());
1652 }
1653
1654 if args.low_memory {
1656 if let Some(n) = args.ingest_parallelism {
1657 if n > 1 {
1658 return Err(AppError::Validation(
1659 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1660 .to_string(),
1661 ));
1662 }
1663 }
1664 }
1665
1666 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1669
1670 let pool = rayon::ThreadPoolBuilder::new()
1671 .num_threads(parallelism)
1672 .build()
1673 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1674
1675 if args.enable_ner && args.skip_extraction {
1676 return Err(AppError::Validation(
1677 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1678 ));
1679 }
1680 if args.skip_extraction && !args.enable_ner {
1681 tracing::warn!(
1688 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1689 );
1690 }
1691 let enable_ner = args.enable_ner;
1692 let auto_describe = args.auto_describe && !args.no_auto_describe;
1693 let max_rss_mb = args.max_rss_mb;
1694 let llm_parallelism = args.llm_parallelism as usize;
1695 if args.mode == IngestMode::Gliner {
1699 tracing::warn!(
1700 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1701 );
1702 }
1703 if args.gliner_variant != "fp32" {
1704 tracing::warn!(
1705 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1706 );
1707 }
1708 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1709 "int8" => crate::extraction::GlinerVariant::Int8,
1710 _ => crate::extraction::GlinerVariant::Fp32,
1711 };
1712
1713 let total_to_process = process_items.len();
1714 tracing::info!(
1715 target: "ingest",
1716 phase = "pipeline_start",
1717 files = total_to_process,
1718 ingest_parallelism = parallelism,
1719 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1720 );
1721
1722 let channel_bound = (parallelism * 2).max(1);
1726 let (tx, rx) = mpsc::sync_channel::<(usize, Result<Vec<StagedFile>, AppError>)>(channel_bound);
1727
1728 let paths_owned = paths.clone();
1733 let llm_backend_owned = llm_backend;
1734 let embedding_backend_owned = embedding_backend;
1735 let producer_handle = std::thread::spawn(move || {
1736 pool.install(|| {
1737 process_items.into_par_iter().for_each(|item| {
1738 if crate::shutdown_requested() {
1739 return;
1740 }
1741 let t0 = std::time::Instant::now();
1742 let result = stage_file(
1743 item.idx,
1744 &item.path,
1745 &item.derived_name,
1746 &paths_owned,
1747 enable_ner,
1748 gliner_variant,
1749 max_rss_mb,
1750 llm_parallelism,
1751 llm_backend_owned,
1752 embedding_backend_owned,
1753 auto_describe,
1754 );
1755 let elapsed_ms = t0.elapsed().as_millis() as u64;
1756
1757 let (n_entities, n_relationships) = match &result {
1760 Ok(parts) => (
1761 parts.iter().map(|sf| sf.entities.len()).sum::<usize>(),
1762 parts.iter().map(|sf| sf.relationships.len()).sum::<usize>(),
1763 ),
1764 Err(_) => (0, 0),
1765 };
1766 let progress = StageProgressEvent {
1767 schema_version: 1,
1768 event: "file_extracted",
1769 path: &item.file_str,
1770 ms: elapsed_ms,
1771 entities: n_entities,
1772 relationships: n_relationships,
1773 };
1774 if let Ok(line) = serde_json::to_string(&progress) {
1775 tracing::info!(target: "ingest_progress", "{}", line);
1776 }
1777
1778 let _ = tx.send((item.idx, result));
1782 });
1783 drop(tx);
1785 });
1786 });
1787
1788 let fail_fast = args.fail_fast;
1800
1801 for meta in &slots_meta {
1803 if let SlotMeta::Skip {
1804 file_str,
1805 derived_base,
1806 name_truncated,
1807 original_name,
1808 original_filename,
1809 reason,
1810 } = meta
1811 {
1812 output::emit_json_compact(&IngestFileEvent {
1813 file: file_str,
1814 name: derived_base,
1815 status: "skipped",
1816 truncated: *name_truncated,
1817 original_name: original_name.clone(),
1818 original_filename: original_filename.as_deref(),
1819 error: Some(reason.clone()),
1820 memory_id: None,
1821 action: None,
1822 body_length: 0,
1823 backend_invoked: None,
1824 })?;
1825 skipped += 1;
1826 }
1827 }
1828
1829 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1832 .iter()
1833 .enumerate()
1834 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1835 .collect();
1836
1837 tracing::info!(
1838 target: "ingest",
1839 phase = "persist_start",
1840 files = total_to_process,
1841 "phase B starting: persisting files incrementally as Phase A completes each one",
1842 );
1843
1844 for (idx, stage_result) in rx {
1848 if crate::shutdown_requested() {
1849 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1850 break;
1851 }
1852 let meta = meta_index.get(&idx).ok_or_else(|| {
1853 AppError::Internal(anyhow::anyhow!(
1854 "channel idx {idx} has no corresponding Process slot"
1855 ))
1856 })?;
1857 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1858 {
1859 SlotMeta::Process {
1860 file_str,
1861 derived_name,
1862 name_truncated,
1863 original_name,
1864 original_filename,
1865 } => (
1866 file_str,
1867 derived_name,
1868 name_truncated,
1869 original_name,
1870 original_filename,
1871 ),
1872 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1873 };
1874
1875 let conn = match conn_or_err.as_mut() {
1877 Ok(c) => c,
1878 Err(err_msg) => {
1879 let err_clone = err_msg.clone();
1880 output::emit_json_compact(&IngestFileEvent {
1881 file: file_str,
1882 name: derived_name,
1883 status: "failed",
1884 truncated: *name_truncated,
1885 original_name: original_name.clone(),
1886 original_filename: original_filename.as_deref(),
1887 error: Some(err_clone.clone()),
1888 memory_id: None,
1889 action: None,
1890 body_length: 0,
1891 backend_invoked: None,
1892 })?;
1893 failed += 1;
1894 if fail_fast {
1895 output::emit_json_compact(&IngestSummary {
1896 summary: true,
1897 dir: args.dir.display().to_string(),
1898 pattern: args.pattern.clone(),
1899 recursive: args.recursive,
1900 files_total: total,
1901 files_succeeded: succeeded,
1902 files_failed: failed,
1903 files_skipped: skipped,
1904 elapsed_ms: started.elapsed().as_millis() as u64,
1905 })?;
1906 return Err(AppError::Validation(format!(
1907 "ingest aborted on first failure: {err_clone}"
1908 )));
1909 }
1910 continue;
1911 }
1912 };
1913
1914 match stage_result {
1915 Ok(parts) => {
1916 for staged in parts {
1919 let part_name = staged.name.clone();
1920 match persist_staged(
1921 conn,
1922 &namespace,
1923 &memory_type_str,
1924 staged,
1925 args.force_merge,
1926 ) {
1927 Ok(FileSuccess {
1928 memory_id,
1929 action,
1930 body_length,
1931 backend_invoked: file_backend_invoked,
1932 }) => {
1933 output::emit_json_compact(&IngestFileEvent {
1934 file: file_str,
1935 name: &part_name,
1936 status: "indexed",
1937 truncated: *name_truncated,
1938 original_name: original_name.clone(),
1939 original_filename: original_filename.as_deref(),
1940 error: None,
1941 memory_id: Some(memory_id),
1942 action: Some(action),
1943 body_length,
1944 backend_invoked: file_backend_invoked,
1945 })?;
1946 succeeded += 1;
1947 }
1948 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1949 output::emit_json_compact(&IngestFileEvent {
1950 file: file_str,
1951 name: &part_name,
1952 status: "skipped",
1953 truncated: *name_truncated,
1954 original_name: original_name.clone(),
1955 original_filename: original_filename.as_deref(),
1956 error: Some(format!("{e}")),
1957 memory_id: None,
1958 action: Some("duplicate".to_string()),
1959 body_length: 0,
1960 backend_invoked: None,
1961 })?;
1962 skipped += 1;
1963 }
1964 Err(e) => {
1965 let err_msg = format!("{e}");
1966 output::emit_json_compact(&IngestFileEvent {
1967 file: file_str,
1968 name: &part_name,
1969 status: "failed",
1970 truncated: *name_truncated,
1971 original_name: original_name.clone(),
1972 original_filename: original_filename.as_deref(),
1973 error: Some(err_msg.clone()),
1974 memory_id: None,
1975 action: None,
1976 body_length: 0,
1977 backend_invoked: None,
1978 })?;
1979 failed += 1;
1980 if fail_fast {
1981 output::emit_json_compact(&IngestSummary {
1982 summary: true,
1983 dir: args.dir.display().to_string(),
1984 pattern: args.pattern.clone(),
1985 recursive: args.recursive,
1986 files_total: total,
1987 files_succeeded: succeeded,
1988 files_failed: failed,
1989 files_skipped: skipped,
1990 elapsed_ms: started.elapsed().as_millis() as u64,
1991 })?;
1992 return Err(AppError::Validation(format!(
1993 "ingest aborted on first failure: {err_msg}"
1994 )));
1995 }
1996 }
1997 }
1998 }
1999 }
2000 Err(e) => {
2001 let err_msg = format!("{e}");
2002 output::emit_json_compact(&IngestFileEvent {
2003 file: file_str,
2004 name: derived_name,
2005 status: "failed",
2006 truncated: *name_truncated,
2007 original_name: original_name.clone(),
2008 original_filename: original_filename.as_deref(),
2009 error: Some(err_msg.clone()),
2010 memory_id: None,
2011 action: None,
2012 body_length: 0,
2013 backend_invoked: None,
2014 })?;
2015 failed += 1;
2016 if fail_fast {
2017 output::emit_json_compact(&IngestSummary {
2018 summary: true,
2019 dir: args.dir.display().to_string(),
2020 pattern: args.pattern.clone(),
2021 recursive: args.recursive,
2022 files_total: total,
2023 files_succeeded: succeeded,
2024 files_failed: failed,
2025 files_skipped: skipped,
2026 elapsed_ms: started.elapsed().as_millis() as u64,
2027 })?;
2028 return Err(AppError::Validation(format!(
2029 "ingest aborted on first failure: {err_msg}"
2030 )));
2031 }
2032 }
2033 }
2034 }
2035
2036 producer_handle
2038 .join()
2039 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
2040
2041 if let Ok(ref conn) = conn_or_err {
2042 if succeeded > 0 {
2043 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2044 }
2045 }
2046
2047 output::emit_json_compact(&IngestSummary {
2048 summary: true,
2049 dir: args.dir.display().to_string(),
2050 pattern: args.pattern.clone(),
2051 recursive: args.recursive,
2052 files_total: total,
2053 files_succeeded: succeeded,
2054 files_failed: failed,
2055 files_skipped: skipped,
2056 elapsed_ms: started.elapsed().as_millis() as u64,
2057 })?;
2058
2059 if args.enrich_after && succeeded > 0 {
2060 output::emit_json_compact(&serde_json::json!({
2061 "event": "enrich_phase_started",
2062 "operation": "memory-bindings"
2063 }))?;
2064 let enrich_args = super::enrich::EnrichArgs {
2065 operation: Some(super::enrich::EnrichOperation::MemoryBindings),
2066 mode: Some(super::enrich::EnrichMode::ClaudeCode),
2067 limit: None,
2068 dry_run: false,
2069 namespace: args.namespace.clone(),
2070 claude_binary: args.claude_binary.clone(),
2071 claude_model: args.claude_model.clone(),
2072 claude_timeout: args.claude_timeout,
2073 codex_binary: args.codex_binary.clone(),
2074 codex_model: args.codex_model.clone(),
2075 codex_timeout: args.codex_timeout,
2076 opencode_binary: args.opencode_binary.clone(),
2077 opencode_model: args.opencode_model.clone(),
2078 opencode_timeout: args.opencode_timeout,
2079 openrouter_model: None,
2080 openrouter_api_key: None,
2081 openrouter_timeout: 300,
2082 openrouter_base_url: None,
2083 db: args.db.clone(),
2084 json: false,
2085 resume: false,
2086 retry_failed: false,
2087 max_cost_usd: args.max_cost_usd,
2088 llm_parallelism: args.llm_parallelism as u32,
2089 wait_job_singleton: args.wait_job_singleton,
2090 force_job_singleton: args.force_job_singleton,
2091 names: Vec::new(),
2092 names_file: None,
2093 preflight_check: false,
2094 fallback_mode: None,
2095 rate_limit_buffer: 300,
2096 max_load_check: true,
2097 circuit_breaker_threshold: 5,
2098 preserve_threshold: 0.7,
2099 codex_model_validate: true,
2100 codex_model_fallback: None,
2101 min_output_chars: 500,
2102 max_output_chars: 2000,
2103 preserve_check: true,
2104 prompt_template: None,
2105 until_empty: false,
2106 max_runtime: None,
2107 max_attempts: 5,
2108 status: false,
2109 rest_concurrency: None,
2110 list_dead: false,
2113 requeue_dead: false,
2114 prune_dead_orphans: false,
2115 ignore_backoff: false,
2116 body_extract_graph_only: false,
2117 };
2118 match super::enrich::run(&enrich_args, llm_backend, embedding_backend) {
2119 Ok(()) => {
2120 output::emit_json_compact(&serde_json::json!({
2121 "event": "enrich_phase_completed"
2122 }))?;
2123 }
2124 Err(e) => {
2125 tracing::warn!(error = %e, "enrich --operation memory-bindings failed after ingest");
2126 output::emit_json_compact(&serde_json::json!({
2127 "event": "enrich_phase_failed",
2128 "error": e.to_string()
2129 }))?;
2130 }
2131 }
2132 }
2133
2134 Ok(())
2135}
2136
2137fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
2143 ensure_db_ready(paths)?;
2144 let conn = open_rw(&paths.db)?;
2145 Ok(conn)
2146}
2147
2148pub(crate) fn collect_files(
2149 dir: &Path,
2150 pattern: &str,
2151 recursive: bool,
2152 out: &mut Vec<PathBuf>,
2153) -> Result<(), AppError> {
2154 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
2155 for entry in entries {
2156 let entry = entry.map_err(AppError::Io)?;
2157 let path = entry.path();
2158 let file_type = entry.file_type().map_err(AppError::Io)?;
2159 if file_type.is_file() {
2160 let name = entry.file_name();
2161 let name_str = name.to_string_lossy();
2162 if matches_pattern(&name_str, pattern) {
2163 out.push(path);
2164 }
2165 } else if file_type.is_dir() && recursive {
2166 collect_files(&path, pattern, recursive, out)?;
2167 }
2168 }
2169 Ok(())
2170}
2171
2172fn matches_pattern(name: &str, pattern: &str) -> bool {
2173 if let Some(suffix) = pattern.strip_prefix('*') {
2174 name.ends_with(suffix)
2175 } else if let Some(prefix) = pattern.strip_suffix('*') {
2176 name.starts_with(prefix)
2177 } else {
2178 name == pattern
2179 }
2180}
2181
2182pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
2193 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
2194 let lowered: String = stem
2195 .nfd()
2196 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
2197 .map(|c| {
2198 if c == '_' || c.is_whitespace() {
2199 '-'
2200 } else {
2201 c
2202 }
2203 })
2204 .map(|c| c.to_ascii_lowercase())
2205 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
2206 .collect();
2207 let collapsed = collapse_dashes(&lowered);
2208 let trimmed_raw = collapsed.trim_matches('-').to_string();
2209 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
2211 format!("doc-{trimmed_raw}")
2212 } else {
2213 trimmed_raw
2214 };
2215 if trimmed.len() > max_len {
2216 let truncated = trimmed[..max_len].trim_matches('-').to_string();
2217 tracing::warn!(
2222 target: "ingest",
2223 original = %trimmed,
2224 truncated_to = %truncated,
2225 max_len = max_len,
2226 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
2227 );
2228 (truncated, true, Some(trimmed))
2229 } else {
2230 (trimmed, false, None)
2231 }
2232}
2233
2234fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
2247 if !taken.contains(base) {
2248 return Ok(base.to_string());
2249 }
2250 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
2251 let candidate = format!("{base}-{suffix}");
2252 if !taken.contains(&candidate) {
2253 tracing::warn!(
2254 target: "ingest",
2255 base = %base,
2256 resolved = %candidate,
2257 suffix,
2258 "memory name collision resolved with numeric suffix"
2259 );
2260 return Ok(candidate);
2261 }
2262 }
2263 Err(AppError::Validation(format!(
2264 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
2265 )))
2266}
2267
2268fn collapse_dashes(s: &str) -> String {
2269 let mut out = String::with_capacity(s.len());
2270 let mut prev_dash = false;
2271 for c in s.chars() {
2272 if c == '-' {
2273 if !prev_dash {
2274 out.push('-');
2275 }
2276 prev_dash = true;
2277 } else {
2278 out.push(c);
2279 prev_dash = false;
2280 }
2281 }
2282 out
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287 use super::*;
2288 use std::path::PathBuf;
2289
2290 #[test]
2297 fn ingest_mode_none_with_resume_is_rejected() {
2298 use crate::cli::{Cli, Commands};
2299 use clap::Parser;
2300
2301 let none_resume = Cli::try_parse_from([
2302 "sqlite-graphrag",
2303 "ingest",
2304 "./docs",
2305 "--mode",
2306 "none",
2307 "--resume",
2308 ])
2309 .expect("parse succeeds; the conflict is value-conditional");
2310 let args = match none_resume.command {
2311 Some(Commands::Ingest(a)) => a,
2312 other => panic!("expected ingest, got {other:?}"),
2313 };
2314 assert!(
2315 validate_mode_conditional_flags_ingest(&args).is_err(),
2316 "--mode none + --resume must be rejected fail-fast"
2317 );
2318
2319 let claude_resume = Cli::try_parse_from([
2321 "sqlite-graphrag",
2322 "ingest",
2323 "./docs",
2324 "--mode",
2325 "claude-code",
2326 "--resume",
2327 ])
2328 .expect("parse");
2329 let args = match claude_resume.command {
2330 Some(Commands::Ingest(a)) => a,
2331 other => panic!("expected ingest, got {other:?}"),
2332 };
2333 assert!(
2334 validate_mode_conditional_flags_ingest(&args).is_ok(),
2335 "--mode claude-code + --resume is valid and must pass"
2336 );
2337 }
2338
2339 fn setup_ingest_conn() -> Connection {
2340 crate::storage::connection::register_vec_extension();
2341 let mut conn = Connection::open_in_memory().unwrap();
2342 crate::migrations::runner().run(&mut conn).unwrap();
2343 conn
2344 }
2345
2346 fn make_staged(name: &str, body: &str) -> StagedFile {
2347 StagedFile {
2348 body: body.to_string(),
2349 body_hash: blake3::hash(body.as_bytes()).to_hex().to_string(),
2350 snippet: body.chars().take(200).collect(),
2351 name: name.to_string(),
2352 description: "desc".to_string(),
2353 embedding: None,
2354 chunk_embeddings: None,
2355 chunks_info: Vec::new(),
2356 entities: Vec::new(),
2357 relationships: Vec::new(),
2358 entity_embeddings: None,
2359 urls: Vec::new(),
2360 backend_invoked: None,
2361 }
2362 }
2363
2364 #[test]
2367 fn persist_staged_force_merge_updates_existing() {
2368 let mut conn = setup_ingest_conn();
2369
2370 let first = persist_staged(
2371 &mut conn,
2372 "global",
2373 "document",
2374 make_staged("doc-a", "v1"),
2375 false,
2376 )
2377 .expect("create");
2378 assert_eq!(first.action, "created");
2379
2380 let dup = persist_staged(
2382 &mut conn,
2383 "global",
2384 "document",
2385 make_staged("doc-a", "v2-changed"),
2386 false,
2387 );
2388 assert!(matches!(dup, Err(AppError::Duplicate(_))));
2389
2390 let upd = persist_staged(
2392 &mut conn,
2393 "global",
2394 "document",
2395 make_staged("doc-a", "v2-changed"),
2396 true,
2397 )
2398 .expect("update");
2399 assert_eq!(upd.action, "updated");
2400 assert_eq!(upd.memory_id, first.memory_id);
2401 let body: String = conn
2402 .query_row(
2403 "SELECT body FROM memories WHERE id = ?1",
2404 rusqlite::params![first.memory_id],
2405 |r| r.get(0),
2406 )
2407 .unwrap();
2408 assert_eq!(body, "v2-changed");
2409 }
2410
2411 #[test]
2413 fn persist_staged_dedupes_by_body_hash() {
2414 let mut conn = setup_ingest_conn();
2415 persist_staged(
2416 &mut conn,
2417 "global",
2418 "document",
2419 make_staged("parte-1", "identical content"),
2420 false,
2421 )
2422 .expect("create");
2423
2424 let res = persist_staged(
2426 &mut conn,
2427 "global",
2428 "document",
2429 make_staged("part-01", "identical content"),
2430 false,
2431 );
2432 match res {
2433 Err(AppError::Duplicate(msg)) => assert!(msg.contains("body_hash")),
2434 other => panic!("expected body_hash dedup duplicate, got {other:?}"),
2435 }
2436 let n: i64 = conn
2438 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2439 .unwrap();
2440 assert_eq!(n, 1);
2441 }
2442
2443 #[test]
2445 fn ingest_force_merge_flag_parses() {
2446 use crate::cli::{Cli, Commands};
2447 use clap::Parser;
2448 let cli = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs", "--force-merge"])
2449 .expect("parse");
2450 match cli.command {
2451 Some(Commands::Ingest(a)) => assert!(a.force_merge),
2452 other => panic!("expected ingest, got {other:?}"),
2453 }
2454 let cli2 = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs"]).expect("parse");
2456 match cli2.command {
2457 Some(Commands::Ingest(a)) => assert!(!a.force_merge),
2458 other => panic!("expected ingest, got {other:?}"),
2459 }
2460 }
2461
2462 #[test]
2463 fn matches_pattern_suffix() {
2464 assert!(matches_pattern("foo.md", "*.md"));
2465 assert!(!matches_pattern("foo.txt", "*.md"));
2466 assert!(matches_pattern("foo.md", "*"));
2467 }
2468
2469 #[test]
2470 fn matches_pattern_prefix() {
2471 assert!(matches_pattern("README.md", "README*"));
2472 assert!(!matches_pattern("CHANGELOG.md", "README*"));
2473 }
2474
2475 #[test]
2476 fn matches_pattern_exact() {
2477 assert!(matches_pattern("README.md", "README.md"));
2478 assert!(!matches_pattern("readme.md", "README.md"));
2479 }
2480
2481 #[test]
2482 fn derive_kebab_underscore_to_dash() {
2483 let p = PathBuf::from("/tmp/claude_code_headless.md");
2484 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2485 assert_eq!(name, "claude-code-headless");
2486 assert!(!truncated);
2487 assert!(original.is_none());
2488 }
2489
2490 #[test]
2491 fn derive_kebab_uppercase_lowered() {
2492 let p = PathBuf::from("/tmp/README.md");
2493 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2494 assert_eq!(name, "readme");
2495 assert!(!truncated);
2496 assert!(original.is_none());
2497 }
2498
2499 #[test]
2500 fn derive_kebab_strips_non_kebab_chars() {
2501 let p = PathBuf::from("/tmp/some@weird#name!.md");
2502 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2503 assert_eq!(name, "someweirdname");
2504 assert!(!truncated);
2505 assert!(original.is_none());
2506 }
2507
2508 #[test]
2511 fn derive_kebab_folds_accented_letters_to_ascii() {
2512 let p = PathBuf::from("/tmp/açaí.md");
2513 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2514 assert_eq!(name, "acai", "got '{name}'");
2515 }
2516
2517 #[test]
2518 fn derive_kebab_handles_naive_with_diaeresis() {
2519 let p = PathBuf::from("/tmp/naïve-test.md");
2520 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2521 assert_eq!(name, "naive-test", "got '{name}'");
2522 }
2523
2524 #[test]
2525 fn derive_kebab_drops_emoji_keeps_word() {
2526 let p = PathBuf::from("/tmp/🚀-rocket.md");
2527 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2528 assert_eq!(name, "rocket", "got '{name}'");
2529 }
2530
2531 #[test]
2532 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
2533 let p = PathBuf::from("/tmp/açaí🦜.md");
2534 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2535 assert_eq!(name, "acai", "got '{name}'");
2536 }
2537
2538 #[test]
2539 fn derive_kebab_pure_emoji_yields_empty() {
2540 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
2541 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2542 assert!(name.is_empty(), "got '{name}'");
2543 }
2544
2545 #[test]
2546 fn derive_kebab_collapses_consecutive_dashes() {
2547 let p = PathBuf::from("/tmp/a__b___c.md");
2548 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2549 assert_eq!(name, "a-b-c");
2550 assert!(!truncated);
2551 assert!(original.is_none());
2552 }
2553
2554 #[test]
2555 fn derive_kebab_truncates_to_60_chars() {
2556 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2557 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2558 assert!(name.len() <= 60, "got len {}", name.len());
2559 assert!(truncated);
2560 assert!(original.is_some());
2561 assert!(original.unwrap().len() > 60);
2562 }
2563
2564 #[test]
2565 fn collect_files_finds_md_files() {
2566 let tmp = tempfile::tempdir().expect("tempdir");
2567 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2568 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2569 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2570 let mut out = Vec::new();
2571 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2572 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2573 }
2574
2575 #[test]
2576 fn collect_files_recursive_descends_subdirs() {
2577 let tmp = tempfile::tempdir().expect("tempdir");
2578 let sub = tmp.path().join("sub");
2579 std::fs::create_dir(&sub).unwrap();
2580 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2581 std::fs::write(sub.join("b.md"), "y").unwrap();
2582 let mut out = Vec::new();
2583 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2584 assert_eq!(out.len(), 2);
2585 }
2586
2587 #[test]
2588 fn collect_files_non_recursive_skips_subdirs() {
2589 let tmp = tempfile::tempdir().expect("tempdir");
2590 let sub = tmp.path().join("sub");
2591 std::fs::create_dir(&sub).unwrap();
2592 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2593 std::fs::write(sub.join("b.md"), "y").unwrap();
2594 let mut out = Vec::new();
2595 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2596 assert_eq!(out.len(), 1);
2597 }
2598
2599 #[test]
2602 fn derive_kebab_long_basename_truncated_within_cap() {
2603 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2604 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2605 assert!(
2606 name.len() <= DERIVED_NAME_MAX_LEN,
2607 "truncated name must respect cap; got {} chars",
2608 name.len()
2609 );
2610 assert!(!name.is_empty());
2611 assert!(truncated);
2612 assert!(original.is_some());
2613 }
2614
2615 #[test]
2616 fn unique_name_returns_base_when_free() {
2617 let taken: BTreeSet<String> = BTreeSet::new();
2618 let resolved = unique_name("note", &taken).expect("must resolve");
2619 assert_eq!(resolved, "note");
2620 }
2621
2622 #[test]
2623 fn unique_name_appends_first_free_suffix_on_collision() {
2624 let mut taken: BTreeSet<String> = BTreeSet::new();
2625 taken.insert("note".to_string());
2626 taken.insert("note-1".to_string());
2627 let resolved = unique_name("note", &taken).expect("must resolve");
2628 assert_eq!(resolved, "note-2");
2629 }
2630
2631 #[test]
2632 fn unique_name_errors_after_collision_cap() {
2633 let mut taken: BTreeSet<String> = BTreeSet::new();
2634 taken.insert("note".to_string());
2635 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2636 taken.insert(format!("note-{i}"));
2637 }
2638 let err = unique_name("note", &taken).expect_err("must surface error");
2639 assert!(matches!(err, AppError::Validation(_)));
2640 }
2641
2642 #[test]
2645 fn validate_relation_format_accepts_valid_relations() {
2646 use crate::parsers::{is_canonical_relation, validate_relation_format};
2647 assert!(validate_relation_format("applies_to").is_ok());
2648 assert!(validate_relation_format("depends_on").is_ok());
2649 assert!(validate_relation_format("implements").is_ok());
2650 assert!(validate_relation_format("").is_err());
2651 assert!(is_canonical_relation("applies_to"));
2652 assert!(!is_canonical_relation("implements"));
2653 }
2654
2655 use serial_test::serial;
2658
2659 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2661 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2662 let prev = std::env::var(key).ok();
2663 match value {
2664 Some(v) => std::env::set_var(key, v),
2665 None => std::env::remove_var(key),
2666 }
2667 f();
2668 match prev {
2669 Some(p) => std::env::set_var(key, p),
2670 None => std::env::remove_var(key),
2671 }
2672 }
2673
2674 #[test]
2675 #[serial]
2676 fn env_low_memory_enabled_unset_returns_false() {
2677 with_env_var(None, || assert!(!env_low_memory_enabled()));
2678 }
2679
2680 #[test]
2681 #[serial]
2682 fn env_low_memory_enabled_empty_returns_false() {
2683 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2684 }
2685
2686 #[test]
2687 #[serial]
2688 fn env_low_memory_enabled_truthy_values_return_true() {
2689 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2690 with_env_var(Some(v), || {
2691 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2692 });
2693 }
2694 }
2695
2696 #[test]
2697 #[serial]
2698 fn env_low_memory_enabled_falsy_values_return_false() {
2699 for v in ["0", "false", "FALSE", "no", "off"] {
2700 with_env_var(Some(v), || {
2701 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2702 });
2703 }
2704 }
2705
2706 #[test]
2707 #[serial]
2708 fn env_low_memory_enabled_unrecognized_value_returns_false() {
2709 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2710 }
2711
2712 #[test]
2713 #[serial]
2714 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2715 with_env_var(None, || {
2716 assert_eq!(resolve_parallelism(true, Some(4)), 1);
2717 assert_eq!(resolve_parallelism(true, Some(8)), 1);
2718 assert_eq!(resolve_parallelism(true, None), 1);
2719 });
2720 }
2721
2722 #[test]
2723 #[serial]
2724 fn resolve_parallelism_env_forces_one_when_flag_off() {
2725 with_env_var(Some("1"), || {
2726 assert_eq!(resolve_parallelism(false, Some(4)), 1);
2727 assert_eq!(resolve_parallelism(false, None), 1);
2728 });
2729 }
2730
2731 #[test]
2732 #[serial]
2733 fn resolve_parallelism_falsy_env_does_not_override() {
2734 with_env_var(Some("0"), || {
2735 assert_eq!(resolve_parallelism(false, Some(4)), 4);
2736 });
2737 }
2738
2739 #[test]
2740 #[serial]
2741 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2742 with_env_var(None, || {
2743 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2744 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2745 });
2746 }
2747
2748 #[test]
2749 #[serial]
2750 fn resolve_parallelism_default_when_unset() {
2751 with_env_var(None, || {
2752 let p = resolve_parallelism(false, None);
2753 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2754 });
2755 }
2756
2757 #[test]
2758 fn ingest_args_parses_low_memory_flag_via_clap() {
2759 use clap::Parser;
2760 let cli = crate::cli::Cli::try_parse_from([
2763 "sqlite-graphrag",
2764 "ingest",
2765 "/tmp/dummy",
2766 "--type",
2767 "document",
2768 "--low-memory",
2769 ])
2770 .expect("parse must succeed");
2771 match cli.command {
2772 Some(crate::cli::Commands::Ingest(args)) => {
2773 assert!(args.low_memory, "--low-memory must set field to true");
2774 }
2775 _ => panic!("expected Ingest subcommand"),
2776 }
2777 }
2778
2779 #[test]
2780 fn ingest_args_low_memory_defaults_false() {
2781 use clap::Parser;
2782 let cli = crate::cli::Cli::try_parse_from([
2783 "sqlite-graphrag",
2784 "ingest",
2785 "/tmp/dummy",
2786 "--type",
2787 "document",
2788 ])
2789 .expect("parse must succeed");
2790 match cli.command {
2791 Some(crate::cli::Commands::Ingest(args)) => {
2792 assert!(!args.low_memory, "default must be false");
2793 }
2794 _ => panic!("expected Ingest subcommand"),
2795 }
2796 }
2797
2798 #[test]
2801 fn dry_run_budget_event_serializes_chunk_and_token_counts() {
2802 let ev = IngestDryRunBudget {
2803 budget: true,
2804 file: "/tmp/doc.md",
2805 name: "doc",
2806 bytes: 1234,
2807 chunk_count: 3,
2808 token_count: 567,
2809 partition_count: 1,
2810 exceeds_limits: false,
2811 };
2812 let json = serde_json::to_string(&ev).expect("serialize budget event");
2813 assert!(json.contains("\"chunk_count\":3"), "got: {json}");
2814 assert!(json.contains("\"token_count\":567"), "got: {json}");
2815 assert!(json.contains("\"partition_count\":1"), "got: {json}");
2816 assert!(json.contains("\"exceeds_limits\":false"), "got: {json}");
2817 }
2818
2819 #[test]
2820 fn assess_body_budget_feeds_dry_run_with_positive_counts() {
2821 let body = "# Title\n\nsome representative body text for the budget.";
2824 let budget = chunking::assess_body_budget(body);
2825 assert!(budget.chunk_count >= 1);
2826 assert!(budget.approx_tokens >= 1);
2827 assert_eq!(budget.partition_count, 1);
2828 }
2829}