1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Namespace derived names with a kebab-case prefix (projx-<derived>)\n \
67 sqlite-graphrag ingest ./docs --name-prefix projx- --dry-run\n\n \
68 # Enable automatic URL extraction (URL-regex only since v1.0.79)\n \
69 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
70 # Preview file-to-name mapping without ingesting\n \
71 sqlite-graphrag ingest ./docs --dry-run\n\n \
72 # LLM-curated extraction via Claude Code CLI\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
74 # Resume interrupted claude-code ingest\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
76 # Claude Code with budget cap and custom timeout\n \
77 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
78AUTHENTICATION:\n \
79 --mode claude-code: Uses existing Claude Code authentication.\n \
80 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
81 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
82 --mode codex: Uses existing Codex CLI authentication.\n \
83 Device auth: run `codex auth login` first\n \
84 API key: set OPENAI_API_KEY (optional)\n\n \
85NOTES:\n \
86 Each file becomes a separate memory. Names derive from file basenames\n \
87 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
88 followed by a final summary line with counts. Per-file errors are reported\n \
89 inline and processing continues unless --fail-fast is set.")]
90pub struct IngestArgs {
91 #[arg(
93 value_name = "DIR",
94 help = "Directory to ingest recursively (each matching file becomes a memory)"
95 )]
96 pub dir: PathBuf,
97
98 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
100 pub r#type: MemoryType,
101
102 #[arg(long, default_value = "*.md")]
105 pub pattern: String,
106
107 #[arg(long, default_value_t = false)]
109 pub recursive: bool,
110
111 #[arg(
112 long,
113 env = "SQLITE_GRAPHRAG_ENABLE_NER",
114 value_parser = crate::parsers::parse_bool_flexible,
115 action = clap::ArgAction::Set,
116 num_args = 0..=1,
117 default_missing_value = "true",
118 default_value = "false",
119 help = "Enable automatic URL-regex extraction (the GLiNER NER pipeline was removed in v1.0.79)"
120 )]
121 pub enable_ner: bool,
122
123 #[arg(
127 long,
128 default_value_t = true,
129 overrides_with = "no_auto_describe",
130 help = "Derive memory description from the first meaningful body line instead of the legacy `ingested from <path>` placeholder."
131 )]
132 pub auto_describe: bool,
133 #[arg(
134 long = "no-auto-describe",
135 default_value_t = false,
136 help = "Disable `--auto-describe` and fall back to the legacy `ingested from <path>` description placeholder."
137 )]
138 pub no_auto_describe: bool,
139 #[arg(
140 long,
141 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
142 default_value = "fp32",
143 help = "DEPRECATED: no effect since v1.0.79 (the GLiNER pipeline was removed); accepted for compatibility only"
144 )]
145 pub gliner_variant: String,
146
147 #[arg(long, default_value_t = false, hide = true)]
149 pub skip_extraction: bool,
150
151 #[arg(long, default_value_t = false)]
153 pub fail_fast: bool,
154
155 #[arg(long, default_value_t = false)]
157 pub dry_run: bool,
158
159 #[arg(long, default_value_t = 10_000)]
161 pub max_files: usize,
162
163 #[arg(long)]
165 pub namespace: Option<String>,
166
167 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
169 pub db: Option<String>,
170
171 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
172 pub format: JsonOutputFormat,
173
174 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
175 pub json: bool,
176
177 #[arg(
179 long,
180 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
181 )]
182 pub ingest_parallelism: Option<usize>,
183
184 #[arg(
192 long,
193 default_value_t = false,
194 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
195 Recommended for environments with <4 GB available RAM or container/cgroup \
196 constraints. Trade-off: 3-4x longer wall time. Also honored via \
197 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
198 )]
199 pub low_memory: bool,
200
201 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
203 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
204 pub max_rss_mb: u64,
205
206 #[arg(long, default_value_t = 2, value_name = "N",
211 value_parser = clap::value_parser!(u64).range(1..=32),
212 help = "Maximum simultaneous LLM embedding subprocesses per file (default: 2, clamp [1,32])")]
213 pub llm_parallelism: u64,
214
215 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
220 help = "Maximum length for derived memory names (default: 60)")]
221 pub max_name_length: usize,
222
223 #[arg(
229 long,
230 value_name = "PREFIX",
231 help = "Kebab-case prefix applied to every derived memory name (e.g. 'projx-')"
232 )]
233 pub name_prefix: Option<String>,
234
235 #[arg(long, value_enum, default_value_t = IngestMode::None)]
237 pub mode: IngestMode,
238
239 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
241 pub claude_binary: Option<std::path::PathBuf>,
242
243 #[arg(long)]
245 pub claude_model: Option<String>,
246
247 #[arg(long, default_value_t = false)]
249 pub resume: bool,
250
251 #[arg(long, default_value_t = false)]
253 pub retry_failed: bool,
254
255 #[arg(long, default_value_t = false)]
257 pub keep_queue: bool,
258
259 #[arg(long)]
261 pub queue_db: Option<String>,
262
263 #[arg(long, default_value_t = 60)]
265 pub rate_limit_wait: u64,
266
267 #[arg(long)]
269 pub max_cost_usd: Option<f64>,
270
271 #[arg(
273 long,
274 default_value_t = 300,
275 help = "Timeout in seconds for each claude -p invocation (default: 300)"
276 )]
277 pub claude_timeout: u64,
278
279 #[arg(
281 long,
282 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
283 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
284 )]
285 pub codex_binary: Option<PathBuf>,
286
287 #[arg(
289 long,
290 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
291 )]
292 pub codex_model: Option<String>,
293
294 #[arg(
296 long,
297 default_value_t = 300,
298 help = "Timeout in seconds for each codex exec invocation (default: 300)"
299 )]
300 pub codex_timeout: u64,
301
302 #[arg(long, value_name = "PATH", env = "SQLITE_GRAPHRAG_OPENCODE_BINARY")]
304 pub opencode_binary: Option<PathBuf>,
305
306 #[arg(
308 long,
309 value_name = "MODEL",
310 env = "SQLITE_GRAPHRAG_OPENCODE_MODEL",
311 help = "Model override for OpenCode extraction"
312 )]
313 pub opencode_model: Option<String>,
314
315 #[arg(
317 long,
318 value_name = "SECONDS",
319 env = "SQLITE_GRAPHRAG_OPENCODE_TIMEOUT",
320 default_value_t = 300,
321 help = "Timeout in seconds for each opencode run invocation (default: 300)"
322 )]
323 pub opencode_timeout: u64,
324
325 #[arg(long, value_name = "SECONDS")]
328 pub wait_job_singleton: Option<u64>,
329
330 #[arg(long, default_value_t = false)]
333 pub force_job_singleton: bool,
334
335 #[arg(
338 long,
339 default_value_t = false,
340 help = "Run enrich --operation memory-bindings after all files are ingested"
341 )]
342 pub enrich_after: bool,
343
344 #[arg(
349 long,
350 default_value_t = false,
351 help = "Update existing memories on name collision instead of skipping (idempotent re-ingest)"
352 )]
353 pub force_merge: bool,
354}
355
356#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
358pub enum IngestMode {
359 None,
361 Gliner,
363 ClaudeCode,
365 Codex,
367 #[value(name = "opencode")]
369 Opencode,
370}
371
372fn env_low_memory_enabled() -> bool {
377 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
378 Ok(v) if v.is_empty() => false,
379 Ok(v) => match v.to_lowercase().as_str() {
380 "1" | "true" | "yes" | "on" => true,
381 "0" | "false" | "no" | "off" => false,
382 other => {
383 tracing::warn!(
384 target: "ingest",
385 value = %other,
386 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
387 );
388 false
389 }
390 },
391 Err(_) => false,
392 }
393}
394
395fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
407 let env_flag = env_low_memory_enabled();
408 let low_memory = low_memory_flag || env_flag;
409
410 if low_memory {
411 if let Some(n) = ingest_parallelism {
412 if n > 1 {
413 tracing::warn!(
414 target: "ingest",
415 requested = n,
416 "--ingest-parallelism overridden by --low-memory; using 1"
417 );
418 }
419 }
420 if low_memory_flag {
421 tracing::info!(
422 target: "ingest",
423 source = "flag",
424 "low-memory mode enabled: forcing --ingest-parallelism 1"
425 );
426 } else {
427 tracing::info!(
428 target: "ingest",
429 source = "env",
430 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
431 );
432 }
433 return 1;
434 }
435
436 ingest_parallelism
437 .unwrap_or_else(|| {
438 std::thread::available_parallelism()
439 .map(|v| v.get() / 2)
440 .unwrap_or(1)
441 .clamp(1, 4)
442 })
443 .max(1)
444}
445
446#[derive(Serialize)]
447struct IngestFileEvent<'a> {
448 file: &'a str,
449 name: &'a str,
450 status: &'a str,
451 truncated: bool,
453 #[serde(skip_serializing_if = "Option::is_none")]
455 original_name: Option<String>,
456 #[serde(skip_serializing_if = "Option::is_none")]
458 original_filename: Option<&'a str>,
459 #[serde(skip_serializing_if = "Option::is_none")]
460 error: Option<String>,
461 #[serde(skip_serializing_if = "Option::is_none")]
462 memory_id: Option<i64>,
463 #[serde(skip_serializing_if = "Option::is_none")]
464 action: Option<String>,
465 body_length: usize,
467 #[serde(skip_serializing_if = "Option::is_none")]
472 backend_invoked: Option<&'a str>,
473}
474
475#[derive(Serialize)]
479struct IngestDryRunBudget<'a> {
480 budget: bool,
481 file: &'a str,
482 name: &'a str,
483 bytes: usize,
484 chunk_count: usize,
485 token_count: usize,
486 partition_count: usize,
487 exceeds_limits: bool,
488}
489
490#[derive(Serialize)]
491struct IngestSummary {
492 summary: bool,
493 dir: String,
494 pattern: String,
495 recursive: bool,
496 files_total: usize,
497 files_succeeded: usize,
498 files_failed: usize,
499 files_skipped: usize,
500 elapsed_ms: u64,
501}
502
503#[derive(Debug)]
505struct FileSuccess {
506 memory_id: i64,
507 action: String,
508 body_length: usize,
509 backend_invoked: Option<&'static str>,
510}
511
512#[derive(Serialize)]
515struct StageProgressEvent<'a> {
516 schema_version: u8,
517 event: &'a str,
518 path: &'a str,
519 ms: u64,
520 entities: usize,
521 relationships: usize,
522}
523
524struct StagedFile {
527 body: String,
528 body_hash: String,
529 snippet: String,
530 name: String,
531 description: String,
532 embedding: Option<Vec<f32>>,
533 chunk_embeddings: Option<Vec<Vec<f32>>>,
534 chunks_info: Vec<crate::chunking::Chunk>,
535 entities: Vec<NewEntity>,
536 relationships: Vec<NewRelationship>,
537 entity_embeddings: Option<Vec<Vec<f32>>>,
538 urls: Vec<crate::extraction::ExtractedUrl>,
539 backend_invoked: Option<&'static str>,
544}
545
546#[allow(clippy::too_many_arguments)]
552fn stage_file(
553 _idx: usize,
554 path: &Path,
555 name: &str,
556 paths: &AppPaths,
557 enable_ner: bool,
558 gliner_variant: crate::extraction::GlinerVariant,
559 max_rss_mb: u64,
560 llm_parallelism: usize,
561 llm_backend: crate::cli::LlmBackendChoice,
562 embedding_backend: crate::cli::EmbeddingBackendChoice,
563 auto_describe: bool,
564) -> Result<Vec<StagedFile>, AppError> {
565 use crate::constants::*;
566
567 if name.len() > MAX_MEMORY_NAME_LEN {
568 return Err(AppError::LimitExceeded(
569 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
570 ));
571 }
572 if name.starts_with("__") {
573 return Err(AppError::Validation(
574 crate::i18n::validation::reserved_name(),
575 ));
576 }
577 {
578 let slug_re = crate::constants::name_slug_regex();
579 if !slug_re.is_match(name) {
580 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
581 name,
582 )));
583 }
584 }
585
586 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
587 if file_size > MAX_MEMORY_BODY_LEN as u64 {
588 return Err(AppError::BodyTooLarge {
589 bytes: file_size,
590 limit: MAX_MEMORY_BODY_LEN as u64,
591 });
592 }
593 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
594 if raw_body.len() > MAX_MEMORY_BODY_LEN {
595 return Err(AppError::BodyTooLarge {
596 bytes: raw_body.len() as u64,
597 limit: MAX_MEMORY_BODY_LEN as u64,
598 });
599 }
600 if raw_body.trim().is_empty() {
601 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
602 }
603
604 let description = if auto_describe {
605 crate::commands::ingest_heuristics::extract_heuristic_description(
606 &raw_body,
607 Some(&path.display().to_string()),
608 )
609 } else {
610 format!("ingested from {}", path.display())
611 };
612 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
613 return Err(AppError::Validation(
614 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
615 ));
616 }
617
618 let partitions = chunking::split_body_by_sections(&raw_body);
623 let total_parts = partitions.len();
624 let mut staged = Vec::with_capacity(total_parts);
625 for (part_idx, part_body) in partitions.into_iter().enumerate() {
626 let part_name = if total_parts == 1 {
627 name.to_string()
628 } else {
629 format!("{name}-part-{}", part_idx + 1)
630 };
631 if part_name.len() > MAX_MEMORY_NAME_LEN {
632 return Err(AppError::LimitExceeded(
633 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
634 ));
635 }
636 let part_description = if total_parts == 1 {
637 description.clone()
638 } else {
639 partition_description(&description, part_idx + 1, total_parts)
640 };
641 staged.push(stage_one_body(
642 part_body,
643 part_name,
644 part_description,
645 paths,
646 enable_ner,
647 gliner_variant,
648 max_rss_mb,
649 llm_parallelism,
650 llm_backend,
651 embedding_backend,
652 )?);
653 }
654 Ok(staged)
655}
656
657fn partition_description(base: &str, part: usize, total: usize) -> String {
661 let suffix = format!(" (part {part}/{total})");
662 let max = crate::constants::MAX_MEMORY_DESCRIPTION_LEN;
663 if base.len() + suffix.len() <= max {
664 return format!("{base}{suffix}");
665 }
666 let mut cut = max.saturating_sub(suffix.len()).min(base.len());
667 while cut > 0 && !base.is_char_boundary(cut) {
668 cut -= 1;
669 }
670 format!("{}{}", &base[..cut], suffix)
671}
672
673#[allow(clippy::too_many_arguments)]
677fn stage_one_body(
678 raw_body: String,
679 name: String,
680 description: String,
681 paths: &AppPaths,
682 enable_ner: bool,
683 gliner_variant: crate::extraction::GlinerVariant,
684 max_rss_mb: u64,
685 llm_parallelism: usize,
686 llm_backend: crate::cli::LlmBackendChoice,
687 embedding_backend: crate::cli::EmbeddingBackendChoice,
688) -> Result<StagedFile, AppError> {
689 use crate::constants::*;
690
691 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
692 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
693 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
694 if enable_ner {
695 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
696 Ok(extracted) => {
697 extracted_urls = extracted.urls;
698 extracted_entities = extracted
703 .entities
704 .into_iter()
705 .map(|e| NewEntity {
706 name: e.name,
707 entity_type: crate::entity_type::EntityType::Concept,
708 description: None,
709 })
710 .collect();
711 extracted_relationships.clear();
716
717 if extracted_entities.len() > max_entities_per_memory() {
718 extracted_entities.truncate(max_entities_per_memory());
719 }
720 if extracted_relationships.len() > max_relationships_per_memory() {
721 extracted_relationships.truncate(max_relationships_per_memory());
722 }
723 }
724 Err(e) => {
725 tracing::warn!(
726 target: "ingest",
727 file = %name,
728 "auto-extraction failed (graceful degradation): {e:#}"
729 );
730 }
731 }
732 }
733
734 for rel in &mut extracted_relationships {
735 rel.relation = crate::parsers::normalize_relation(&rel.relation);
736 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
737 return Err(AppError::Validation(format!(
738 "{e} for relationship '{}' -> '{}'",
739 rel.source, rel.target
740 )));
741 }
742 crate::parsers::warn_if_non_canonical(&rel.relation);
743 if !(0.0..=1.0).contains(&rel.strength) {
744 return Err(AppError::Validation(format!(
745 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
746 rel.strength, rel.source, rel.target
747 )));
748 }
749 }
750
751 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
752 let snippet: String = raw_body.chars().take(200).collect();
753
754 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body);
755 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
756 return Err(AppError::TooManyChunks {
757 chunks: chunks_info.len(),
758 limit: REMEMBER_MAX_SAFE_MULTI_CHUNKS,
759 });
760 }
761
762 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
763 let skip_embed = crate::embedder::should_skip_embedding_on_failure();
764 let (embedding, backend_invoked): (Option<Vec<f32>>, Option<&'static str>) = if chunks_info
768 .len()
769 == 1
770 {
771 match crate::embedder::embed_passage_with_embedding_choice(
772 &paths.models,
773 &raw_body,
774 embedding_backend,
775 llm_backend,
776 ) {
777 Ok((v, k)) => (Some(v), Some(k.as_str())),
778 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
779 Err(e) if skip_embed => {
780 tracing::warn!(error = %e, file = %name, "ingest: embedding failed; --skip-embedding-on-failure active, persisting without embedding");
781 (None, None)
782 }
783 Err(e) => return Err(e),
784 }
785 } else {
786 let chunk_texts: Vec<String> = chunks_info
789 .iter()
790 .map(|c| chunking::chunk_text(&raw_body, c).to_string())
791 .collect();
792 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
793 if rss > max_rss_mb {
794 tracing::error!(
795 target: "ingest",
796 rss_mb = rss,
797 max_rss_mb = max_rss_mb,
798 file = %name,
799 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
800 );
801 return Err(AppError::LowMemory {
802 available_mb: crate::memory_guard::available_memory_mb(),
803 required_mb: max_rss_mb,
804 });
805 }
806 }
807 match crate::embedder::embed_passages_parallel_with_embedding_choice(
808 &paths.models,
809 &chunk_texts,
810 llm_parallelism,
811 crate::embedder::chunk_embed_batch_size(),
812 embedding_backend,
813 llm_backend,
814 ) {
815 Ok(chunk_embeddings) => {
816 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
817 chunk_embeddings_opt = Some(chunk_embeddings);
818 (Some(aggregated), None)
821 }
822 Err(AppError::Validation(msg)) => return Err(AppError::Validation(msg)),
823 Err(e) if skip_embed => {
824 tracing::warn!(error = %e, file = %name, "ingest: chunk embedding failed; --skip-embedding-on-failure active, persisting without embedding");
825 (None, None)
826 }
827 Err(e) => return Err(e),
828 }
829 };
830
831 let entity_texts: Vec<String> = extracted_entities
833 .iter()
834 .map(|entity| match &entity.description {
835 Some(desc) => format!("{} {}", entity.name, desc),
836 None => entity.name.clone(),
837 })
838 .collect();
839 let entity_embeddings_opt = match crate::embedder::embed_entity_texts_cached(
843 &paths.models,
844 &entity_texts,
845 llm_parallelism,
846 embedding_backend,
847 llm_backend,
848 ) {
849 Ok((entity_embeddings, embed_cache_stats)) => {
850 if embed_cache_stats.hits > 0 {
851 tracing::debug!(
852 hits = embed_cache_stats.hits,
853 misses = embed_cache_stats.misses,
854 requested = embed_cache_stats.requested,
855 "G56: entity embed cache hit (ingest)"
856 );
857 }
858 Some(entity_embeddings)
859 }
860 Err(e) if skip_embed => {
861 tracing::warn!(error = %e, file = %name, "ingest: entity embedding failed; --skip-embedding-on-failure active");
862 None
863 }
864 Err(e) => return Err(e),
865 };
866
867 Ok(StagedFile {
868 body: raw_body,
869 body_hash,
870 snippet,
871 name,
872 description,
873 embedding,
874 chunk_embeddings: chunk_embeddings_opt,
875 chunks_info,
876 entities: extracted_entities,
877 relationships: extracted_relationships,
878 entity_embeddings: entity_embeddings_opt,
879 urls: extracted_urls,
880 backend_invoked,
881 })
882}
883
884fn link_staged_graph(
888 tx: &Connection,
889 namespace: &str,
890 memory_id: i64,
891 staged: &StagedFile,
892) -> Result<(), AppError> {
893 if staged.entities.is_empty() && staged.relationships.is_empty() {
894 return Ok(());
895 }
896 for (idx, entity) in staged.entities.iter().enumerate() {
897 let entity_id = entities::upsert_entity(tx, namespace, entity)?;
898 if let Some(ref entity_embeddings) = staged.entity_embeddings {
899 if let Some(entity_embedding) = entity_embeddings.get(idx) {
900 entities::upsert_entity_vec(
901 tx,
902 entity_id,
903 namespace,
904 entity.entity_type,
905 entity_embedding,
906 &entity.name,
907 )?;
908 }
909 }
910 entities::link_memory_entity(tx, memory_id, entity_id)?;
911 }
912 let entity_types: std::collections::HashMap<&str, EntityType> = staged
913 .entities
914 .iter()
915 .map(|entity| (entity.name.as_str(), entity.entity_type))
916 .collect();
917
918 let mut affected_entity_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
919 for entity in &staged.entities {
920 if let Some(eid) = entities::find_entity_id(tx, namespace, &entity.name)? {
921 affected_entity_ids.insert(eid);
922 }
923 }
924
925 for rel in &staged.relationships {
926 let source_entity = NewEntity {
927 name: rel.source.clone(),
928 entity_type: entity_types
929 .get(rel.source.as_str())
930 .copied()
931 .unwrap_or(EntityType::Concept),
932 description: None,
933 };
934 let target_entity = NewEntity {
935 name: rel.target.clone(),
936 entity_type: entity_types
937 .get(rel.target.as_str())
938 .copied()
939 .unwrap_or(EntityType::Concept),
940 description: None,
941 };
942 let source_id = entities::upsert_entity(tx, namespace, &source_entity)?;
943 let target_id = entities::upsert_entity(tx, namespace, &target_entity)?;
944 let rel_id = entities::upsert_relationship(tx, namespace, source_id, target_id, rel)?;
945 entities::link_memory_relationship(tx, memory_id, rel_id)?;
946 affected_entity_ids.insert(source_id);
947 affected_entity_ids.insert(target_id);
948 }
949
950 for &eid in &affected_entity_ids {
951 entities::recalculate_degree(tx, eid)?;
952 }
953 Ok(())
954}
955
956fn persist_staged(
964 conn: &mut Connection,
965 namespace: &str,
966 memory_type: &str,
967 staged: StagedFile,
968 force_merge: bool,
969) -> Result<FileSuccess, AppError> {
970 {
971 let active_count: u32 = conn.query_row(
972 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
973 [],
974 |r| r.get::<_, i64>(0).map(|v| v as u32),
975 )?;
976 let ns_exists: bool = conn.query_row(
977 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
978 rusqlite::params![namespace],
979 |r| r.get::<_, i64>(0).map(|v| v > 0),
980 )?;
981 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
982 return Err(AppError::NamespaceError(format!(
983 "active namespace limit of {} exceeded while creating '{namespace}'",
984 crate::constants::MAX_NAMESPACES_ACTIVE
985 )));
986 }
987 }
988
989 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
990 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
991
992 let new_memory = NewMemory {
993 namespace: namespace.to_string(),
994 name: staged.name.clone(),
995 memory_type: memory_type.to_string(),
996 description: staged.description.clone(),
997 body: staged.body.clone(),
998 body_hash: staged.body_hash.clone(),
999 session_id: None,
1000 source: "agent".to_string(),
1001 metadata: serde_json::json!({}),
1002 };
1003 let body_length = new_memory.body.len();
1004 let metadata_json = serde_json::to_string(&new_memory.metadata)?;
1005
1006 match existing_memory {
1007 Some((existing_id, _updated_at, _version)) => {
1008 if !force_merge {
1009 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
1010 &staged.name,
1011 namespace,
1012 )));
1013 }
1014
1015 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1018
1019 let (old_name, old_desc, old_body): (String, String, String) = tx.query_row(
1020 "SELECT name, description, body FROM memories WHERE id = ?1",
1021 rusqlite::params![existing_id],
1022 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1023 )?;
1024
1025 let next_v = versions::next_version(&tx, existing_id)?;
1026 memories::update(&tx, existing_id, &new_memory, None)?;
1027 memories::sync_fts_after_update(
1028 &tx,
1029 existing_id,
1030 &old_name,
1031 &old_desc,
1032 &old_body,
1033 &staged.name,
1034 &staged.description,
1035 &new_memory.body,
1036 )?;
1037 versions::insert_version(
1038 &tx,
1039 existing_id,
1040 next_v,
1041 &staged.name,
1042 memory_type,
1043 &staged.description,
1044 &new_memory.body,
1045 &metadata_json,
1046 None,
1047 "edit",
1048 )?;
1049
1050 storage_chunks::delete_chunks(&tx, existing_id)?;
1052 if let Some(ref emb) = staged.embedding {
1053 memories::upsert_vec(
1054 &tx,
1055 existing_id,
1056 namespace,
1057 memory_type,
1058 emb,
1059 &staged.name,
1060 &staged.snippet,
1061 )?;
1062 }
1063 if staged.chunks_info.len() > 1 {
1064 storage_chunks::insert_chunk_slices(
1065 &tx,
1066 existing_id,
1067 &new_memory.body,
1068 &staged.chunks_info,
1069 )?;
1070 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1071 for (i, emb) in chunk_embeddings.iter().enumerate() {
1072 storage_chunks::upsert_chunk_vec(
1073 &tx,
1074 i as i64,
1075 existing_id,
1076 i as i32,
1077 emb,
1078 )?;
1079 }
1080 }
1081 }
1082
1083 link_staged_graph(&tx, namespace, existing_id, &staged)?;
1084 tx.commit()?;
1085
1086 Ok(FileSuccess {
1087 memory_id: existing_id,
1088 action: "updated".to_string(),
1089 body_length,
1090 backend_invoked: staged.backend_invoked,
1091 })
1092 }
1093 None => {
1094 if let Some(hash_id) = duplicate_hash_id {
1097 return Err(AppError::Duplicate(format!(
1098 "identical body already stored as memory id {hash_id} (dedup by body_hash); skipping '{}'",
1099 staged.name
1100 )));
1101 }
1102
1103 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
1104 let memory_id = memories::insert(&tx, &new_memory)?;
1105 versions::insert_version(
1106 &tx,
1107 memory_id,
1108 1,
1109 &staged.name,
1110 memory_type,
1111 &staged.description,
1112 &new_memory.body,
1113 &metadata_json,
1114 None,
1115 "create",
1116 )?;
1117 if let Some(ref emb) = staged.embedding {
1118 memories::upsert_vec(
1119 &tx,
1120 memory_id,
1121 namespace,
1122 memory_type,
1123 emb,
1124 &staged.name,
1125 &staged.snippet,
1126 )?;
1127 }
1128 if staged.chunks_info.len() > 1 {
1129 storage_chunks::insert_chunk_slices(
1130 &tx,
1131 memory_id,
1132 &new_memory.body,
1133 &staged.chunks_info,
1134 )?;
1135 if let Some(ref chunk_embeddings) = staged.chunk_embeddings {
1136 for (i, emb) in chunk_embeddings.iter().enumerate() {
1137 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
1138 }
1139 }
1140 }
1141 link_staged_graph(&tx, namespace, memory_id, &staged)?;
1142 tx.commit()?;
1143
1144 if !staged.urls.is_empty() {
1145 let url_entries: Vec<storage_urls::MemoryUrl> = staged
1146 .urls
1147 .into_iter()
1148 .map(|u| storage_urls::MemoryUrl {
1149 url: u.url,
1150 offset: Some(u.start as i64),
1151 })
1152 .collect();
1153 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
1154 }
1155
1156 Ok(FileSuccess {
1157 memory_id,
1158 action: "created".to_string(),
1159 body_length,
1160 backend_invoked: staged.backend_invoked,
1161 })
1162 }
1163 }
1164}
1165
1166fn is_at_default<T: PartialEq>(value: T, default: T) -> bool {
1174 value == default
1175}
1176
1177fn validate_mode_conditional_flags_ingest(args: &IngestArgs) -> Result<(), AppError> {
1192 const DEFAULT_TIMEOUT: u64 = 300;
1193 const DEFAULT_RATE_LIMIT_WAIT: u64 = 60;
1194
1195 let mut conflicts: Vec<String> = Vec::new();
1196
1197 let is_local_mode = args.mode == IngestMode::None || args.mode == IngestMode::Gliner;
1198
1199 if args.name_prefix.is_some() && !is_local_mode {
1202 return Err(AppError::Validation(
1203 "--name-prefix is not supported with --mode claude-code/codex/opencode; \
1204 use --mode none (default) or gliner"
1205 .to_string(),
1206 ));
1207 }
1208
1209 if is_local_mode {
1210 if args.claude_binary.is_some() {
1211 conflicts.push("--claude-binary is ignored when --mode is none or gliner".to_string());
1212 }
1213 if args.claude_model.is_some() {
1214 conflicts.push("--claude-model is ignored when --mode is none or gliner".to_string());
1215 }
1216 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1217 conflicts.push(format!(
1218 "--claude-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1219 args.claude_timeout
1220 ));
1221 }
1222 if args.codex_binary.is_some() {
1223 conflicts.push("--codex-binary is ignored when --mode is none or gliner".to_string());
1224 }
1225 if args.codex_model.is_some() {
1226 conflicts.push("--codex-model is ignored when --mode is none or gliner".to_string());
1227 }
1228 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1229 conflicts.push(format!(
1230 "--codex-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1231 args.codex_timeout
1232 ));
1233 }
1234 if args.opencode_binary.is_some() {
1235 conflicts
1236 .push("--opencode-binary is ignored when --mode is none or gliner".to_string());
1237 }
1238 if args.opencode_model.is_some() {
1239 conflicts.push("--opencode-model is ignored when --mode is none or gliner".to_string());
1240 }
1241 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1242 conflicts.push(format!(
1243 "--opencode-timeout={} is ignored when --mode is none or gliner (remove the flag to use the default 300s)",
1244 args.opencode_timeout
1245 ));
1246 }
1247 if args.max_cost_usd.is_some() {
1248 conflicts.push("--max-cost-usd is ignored when --mode is none or gliner (cost is only tracked for LLM-backed modes)".to_string());
1249 }
1250 if args.resume {
1251 conflicts.push("--resume is ignored when --mode is none or gliner (the queue DB is only used by LLM-backed modes)".to_string());
1252 }
1253 if args.retry_failed {
1254 conflicts.push("--retry-failed is ignored when --mode is none or gliner".to_string());
1255 }
1256 if args.keep_queue {
1257 conflicts.push("--keep-queue is ignored when --mode is none or gliner".to_string());
1258 }
1259 if !is_at_default(args.rate_limit_wait, DEFAULT_RATE_LIMIT_WAIT) {
1260 conflicts.push(format!(
1261 "--rate-limit-wait={} is ignored when --mode is none or gliner",
1262 args.rate_limit_wait
1263 ));
1264 }
1265 }
1266
1267 match args.mode {
1268 IngestMode::ClaudeCode => {
1269 if args.codex_binary.is_some() {
1270 conflicts.push("--codex-binary is ignored when --mode=claude-code".to_string());
1271 }
1272 if args.codex_model.is_some() {
1273 conflicts.push("--codex-model is ignored when --mode=claude-code".to_string());
1274 }
1275 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1276 conflicts.push(format!(
1277 "--codex-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1278 args.codex_timeout
1279 ));
1280 }
1281 if args.opencode_binary.is_some() {
1282 conflicts.push("--opencode-binary is ignored when --mode=claude-code".to_string());
1283 }
1284 if args.opencode_model.is_some() {
1285 conflicts.push("--opencode-model is ignored when --mode=claude-code".to_string());
1286 }
1287 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1288 conflicts.push(format!(
1289 "--opencode-timeout={} is ignored when --mode=claude-code (remove the flag to use the default 300s)",
1290 args.opencode_timeout
1291 ));
1292 }
1293 }
1294 IngestMode::Codex => {
1295 if args.claude_binary.is_some() {
1296 conflicts.push("--claude-binary is ignored when --mode=codex".to_string());
1297 }
1298 if args.claude_model.is_some() {
1299 conflicts.push("--claude-model is ignored when --mode=codex".to_string());
1300 }
1301 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1302 conflicts.push(format!(
1303 "--claude-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1304 args.claude_timeout
1305 ));
1306 }
1307 if args.max_cost_usd.is_some() {
1308 conflicts.push(
1309 "--max-cost-usd is ignored when --mode=codex (OAuth-first; cost is metered by your subscription)"
1310 .to_string(),
1311 );
1312 }
1313 if args.resume {
1314 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1315 }
1316 if args.retry_failed {
1317 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1318 }
1319 if args.keep_queue {
1320 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1321 }
1322 if args.opencode_binary.is_some() {
1323 conflicts.push("--opencode-binary is ignored when --mode=codex".to_string());
1324 }
1325 if args.opencode_model.is_some() {
1326 conflicts.push("--opencode-model is ignored when --mode=codex".to_string());
1327 }
1328 if !is_at_default(args.opencode_timeout, DEFAULT_TIMEOUT) {
1329 conflicts.push(format!(
1330 "--opencode-timeout={} is ignored when --mode=codex (remove the flag to use the default 300s)",
1331 args.opencode_timeout
1332 ));
1333 }
1334 }
1335 IngestMode::Opencode => {
1336 if args.claude_binary.is_some() {
1337 conflicts.push("--claude-binary is ignored when --mode=opencode".to_string());
1338 }
1339 if args.claude_model.is_some() {
1340 conflicts.push("--claude-model is ignored when --mode=opencode".to_string());
1341 }
1342 if !is_at_default(args.claude_timeout, DEFAULT_TIMEOUT) {
1343 conflicts.push(format!(
1344 "--claude-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1345 args.claude_timeout
1346 ));
1347 }
1348 if args.codex_binary.is_some() {
1349 conflicts.push("--codex-binary is ignored when --mode=opencode".to_string());
1350 }
1351 if args.codex_model.is_some() {
1352 conflicts.push("--codex-model is ignored when --mode=opencode".to_string());
1353 }
1354 if !is_at_default(args.codex_timeout, DEFAULT_TIMEOUT) {
1355 conflicts.push(format!(
1356 "--codex-timeout={} is ignored when --mode=opencode (remove the flag to use the default 300s)",
1357 args.codex_timeout
1358 ));
1359 }
1360 if args.max_cost_usd.is_some() {
1361 conflicts.push(
1362 "--max-cost-usd is ignored when --mode=opencode (OAuth-first; cost is metered by your subscription)"
1363 .to_string(),
1364 );
1365 }
1366 if args.resume {
1367 conflicts.push("--resume is only valid for --mode=claude-code".to_string());
1368 }
1369 if args.retry_failed {
1370 conflicts.push("--retry-failed is only valid for --mode=claude-code".to_string());
1371 }
1372 if args.keep_queue {
1373 conflicts.push("--keep-queue is only valid for --mode=claude-code".to_string());
1374 }
1375 }
1376 IngestMode::None | IngestMode::Gliner => {}
1377 }
1378
1379 if !conflicts.is_empty() {
1380 return Err(AppError::Validation(format!(
1381 "G20: mode-conditional flag conflicts detected for --mode={:?}:\n - {}",
1382 args.mode,
1383 conflicts.join("\n - ")
1384 )));
1385 }
1386
1387 Ok(())
1388}
1389
1390#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
1393pub fn run(
1394 args: IngestArgs,
1395 llm_backend: crate::cli::LlmBackendChoice,
1396 embedding_backend: crate::cli::EmbeddingBackendChoice,
1397) -> Result<(), AppError> {
1398 validate_mode_conditional_flags_ingest(&args)?;
1401 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
1402 if args.mode == IngestMode::ClaudeCode {
1403 return super::ingest_claude::run_claude_ingest(&args, embedding_backend, llm_backend);
1404 }
1405 if args.mode == IngestMode::Codex {
1406 return super::ingest_codex::run_codex_ingest(&args);
1407 }
1408 if args.mode == IngestMode::Opencode {
1409 return super::ingest_opencode::run_opencode_ingest(&args);
1410 }
1411
1412 let started = std::time::Instant::now();
1413
1414 if !args.dir.exists() {
1415 return Err(AppError::Validation(format!(
1416 "directory not found: {}",
1417 args.dir.display()
1418 )));
1419 }
1420 if !args.dir.is_dir() {
1421 return Err(AppError::Validation(format!(
1422 "path is not a directory: {}",
1423 args.dir.display()
1424 )));
1425 }
1426
1427 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
1428 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
1429 files.sort_unstable();
1430
1431 if files.len() > args.max_files {
1432 return Err(AppError::Validation(format!(
1433 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
1434 files.len(),
1435 args.max_files
1436 )));
1437 }
1438
1439 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
1440 let memory_type_str = args.r#type.as_str().to_string();
1441
1442 let paths = AppPaths::resolve(args.db.as_deref())?;
1443 let mut conn_or_err = match init_storage(&paths) {
1444 Ok(c) => Ok(c),
1445 Err(e) => Err(format!("{e}")),
1446 };
1447
1448 let mut succeeded: usize = 0;
1449 let mut failed: usize = 0;
1450 let mut skipped: usize = 0;
1451 let total = files.len();
1452
1453 let mut taken_names: BTreeSet<String> = BTreeSet::new();
1456
1457 enum SlotMeta {
1463 Skip {
1464 file_str: String,
1465 derived_base: String,
1466 name_truncated: bool,
1467 original_name: Option<String>,
1468 original_filename: Option<String>,
1469 reason: String,
1470 },
1471 Process {
1472 file_str: String,
1473 derived_name: String,
1474 name_truncated: bool,
1475 original_name: Option<String>,
1476 original_filename: Option<String>,
1477 },
1478 }
1479
1480 struct ProcessItem {
1481 idx: usize,
1482 path: PathBuf,
1483 file_str: String,
1484 derived_name: String,
1485 }
1486
1487 let files_cap = files.len();
1488 let mut slots_meta: Vec<SlotMeta> = Vec::new();
1489 slots_meta.try_reserve(files_cap).map_err(|_| {
1490 AppError::LimitExceeded(format!(
1491 "allocation of {files_cap} slot metadata entries would exceed available memory"
1492 ))
1493 })?;
1494 let mut process_items: Vec<ProcessItem> = Vec::new();
1495 process_items.try_reserve(files_cap).map_err(|_| {
1496 AppError::LimitExceeded(format!(
1497 "allocation of {files_cap} process items would exceed available memory"
1498 ))
1499 })?;
1500 let mut truncations: Vec<(String, String)> = Vec::new();
1501 truncations.try_reserve(files_cap).map_err(|_| {
1502 AppError::LimitExceeded(format!(
1503 "allocation of {files_cap} truncation entries would exceed available memory"
1504 ))
1505 })?;
1506
1507 let max_name_length = match args.name_prefix.as_deref() {
1510 Some(prefix) => validate_name_prefix(prefix, args.max_name_length)?,
1511 None => args.max_name_length,
1512 };
1513 for path in &files {
1514 let file_str = path.to_string_lossy().into_owned();
1515 let (derived_base, name_truncated, original_name) =
1516 derive_kebab_name(path, max_name_length);
1517 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1518
1519 if name_truncated {
1520 if let Some(ref orig) = original_name {
1521 truncations.push((orig.clone(), derived_base.clone()));
1522 }
1523 }
1524
1525 if derived_base.is_empty() {
1526 let orig_filename = if !original_basename.is_empty() {
1528 Some(original_basename.to_string())
1529 } else {
1530 None
1531 };
1532 slots_meta.push(SlotMeta::Skip {
1533 file_str,
1534 derived_base: String::new(),
1535 name_truncated: false,
1536 original_name: None,
1537 original_filename: orig_filename,
1538 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
1539 });
1540 continue;
1541 }
1542
1543 let derived_base = match args.name_prefix.as_deref() {
1547 Some(prefix) => format!("{prefix}{derived_base}"),
1548 None => derived_base,
1549 };
1550
1551 match unique_name(&derived_base, &taken_names) {
1552 Ok(derived_name) => {
1553 taken_names.insert(derived_name.clone());
1554 let idx = slots_meta.len();
1555 let orig_filename = if original_basename != derived_name {
1557 Some(original_basename.to_string())
1558 } else {
1559 None
1560 };
1561 process_items.push(ProcessItem {
1562 idx,
1563 path: path.clone(),
1564 file_str: file_str.clone(),
1565 derived_name: derived_name.clone(),
1566 });
1567 slots_meta.push(SlotMeta::Process {
1568 file_str,
1569 derived_name,
1570 name_truncated,
1571 original_name,
1572 original_filename: orig_filename,
1573 });
1574 }
1575 Err(e) => {
1576 let orig_filename = if original_basename != derived_base {
1577 Some(original_basename.to_string())
1578 } else {
1579 None
1580 };
1581 slots_meta.push(SlotMeta::Skip {
1582 file_str,
1583 derived_base,
1584 name_truncated,
1585 original_name,
1586 original_filename: orig_filename,
1587 reason: e.to_string(),
1588 });
1589 }
1590 }
1591 }
1592
1593 if !truncations.is_empty() {
1594 tracing::info!(
1595 target: "ingest",
1596 count = truncations.len(),
1597 max_name_length = max_name_length,
1598 max_len = DERIVED_NAME_MAX_LEN,
1599 "derived names truncated; pass -vv (debug) for per-file detail"
1600 );
1601 }
1602
1603 if args.dry_run {
1605 for meta in &slots_meta {
1606 match meta {
1607 SlotMeta::Skip {
1608 file_str,
1609 derived_base,
1610 name_truncated,
1611 original_name,
1612 original_filename,
1613 reason,
1614 } => {
1615 output::emit_json_compact(&IngestFileEvent {
1616 file: file_str,
1617 name: derived_base,
1618 status: "skip",
1619 truncated: *name_truncated,
1620 original_name: original_name.clone(),
1621 original_filename: original_filename.as_deref(),
1622 error: Some(reason.clone()),
1623 memory_id: None,
1624 action: None,
1625 body_length: 0,
1626 backend_invoked: None,
1627 })?;
1628 }
1629 SlotMeta::Process {
1630 file_str,
1631 derived_name,
1632 name_truncated,
1633 original_name,
1634 original_filename,
1635 } => {
1636 output::emit_json_compact(&IngestFileEvent {
1637 file: file_str,
1638 name: derived_name,
1639 status: "preview",
1640 truncated: *name_truncated,
1641 original_name: original_name.clone(),
1642 original_filename: original_filename.as_deref(),
1643 error: None,
1644 memory_id: None,
1645 action: None,
1646 body_length: 0,
1647 backend_invoked: None,
1648 })?;
1649
1650 match std::fs::read_to_string(file_str) {
1654 Ok(body) => {
1655 let budget = chunking::assess_body_budget(&body);
1656 output::emit_json_compact(&IngestDryRunBudget {
1657 budget: true,
1658 file: file_str,
1659 name: derived_name,
1660 bytes: budget.bytes,
1661 chunk_count: budget.chunk_count,
1662 token_count: budget.approx_tokens,
1663 partition_count: budget.partition_count,
1664 exceeds_limits: budget.exceeds_limits,
1665 })?;
1666 }
1667 Err(e) => {
1668 tracing::warn!(
1669 target: "ingest",
1670 file = %file_str,
1671 "dry-run: could not read file for budget assessment: {e}"
1672 );
1673 }
1674 }
1675 }
1676 }
1677 }
1678 output::emit_json_compact(&IngestSummary {
1679 summary: true,
1680 dir: args.dir.to_string_lossy().into_owned(),
1681 pattern: args.pattern.clone(),
1682 recursive: args.recursive,
1683 files_total: total,
1684 files_succeeded: 0,
1685 files_failed: 0,
1686 files_skipped: 0,
1687 elapsed_ms: started.elapsed().as_millis() as u64,
1688 })?;
1689 return Ok(());
1690 }
1691
1692 if args.low_memory {
1694 if let Some(n) = args.ingest_parallelism {
1695 if n > 1 {
1696 return Err(AppError::Validation(
1697 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1698 .to_string(),
1699 ));
1700 }
1701 }
1702 }
1703
1704 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1707
1708 let pool = rayon::ThreadPoolBuilder::new()
1709 .num_threads(parallelism)
1710 .build()
1711 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1712
1713 if args.enable_ner && args.skip_extraction {
1714 return Err(AppError::Validation(
1715 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1716 ));
1717 }
1718 if args.skip_extraction && !args.enable_ner {
1719 tracing::warn!(
1726 "--skip-extraction is deprecated since v1.0.45 and has no effect (NER is disabled by default); remove this flag to silence the warning"
1727 );
1728 }
1729 let enable_ner = args.enable_ner;
1730 let auto_describe = args.auto_describe && !args.no_auto_describe;
1731 let max_rss_mb = args.max_rss_mb;
1732 let llm_parallelism = args.llm_parallelism as usize;
1733 if args.mode == IngestMode::Gliner {
1737 tracing::warn!(
1738 "--mode gliner is deprecated since v1.0.79 (the GLiNER pipeline was removed); it now performs URL-regex extraction only — use --mode claude-code or --mode codex for LLM-curated extraction"
1739 );
1740 }
1741 if args.gliner_variant != "fp32" {
1742 tracing::warn!(
1743 "--gliner-variant is deprecated and has no effect since v1.0.79 (the GLiNER pipeline was removed)"
1744 );
1745 }
1746 let gliner_variant: crate::extraction::GlinerVariant = match args.gliner_variant.as_str() {
1747 "int8" => crate::extraction::GlinerVariant::Int8,
1748 _ => crate::extraction::GlinerVariant::Fp32,
1749 };
1750
1751 let total_to_process = process_items.len();
1752 tracing::info!(
1753 target: "ingest",
1754 phase = "pipeline_start",
1755 files = total_to_process,
1756 ingest_parallelism = parallelism,
1757 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1758 );
1759
1760 let channel_bound = (parallelism * 2).max(1);
1764 let (tx, rx) = mpsc::sync_channel::<(usize, Result<Vec<StagedFile>, AppError>)>(channel_bound);
1765
1766 let paths_owned = paths.clone();
1771 let llm_backend_owned = llm_backend;
1772 let embedding_backend_owned = embedding_backend;
1773 let producer_handle = std::thread::spawn(move || {
1774 pool.install(|| {
1775 process_items.into_par_iter().for_each(|item| {
1776 if crate::shutdown_requested() {
1777 return;
1778 }
1779 let t0 = std::time::Instant::now();
1780 let result = stage_file(
1781 item.idx,
1782 &item.path,
1783 &item.derived_name,
1784 &paths_owned,
1785 enable_ner,
1786 gliner_variant,
1787 max_rss_mb,
1788 llm_parallelism,
1789 llm_backend_owned,
1790 embedding_backend_owned,
1791 auto_describe,
1792 );
1793 let elapsed_ms = t0.elapsed().as_millis() as u64;
1794
1795 let (n_entities, n_relationships) = match &result {
1798 Ok(parts) => (
1799 parts.iter().map(|sf| sf.entities.len()).sum::<usize>(),
1800 parts.iter().map(|sf| sf.relationships.len()).sum::<usize>(),
1801 ),
1802 Err(_) => (0, 0),
1803 };
1804 let progress = StageProgressEvent {
1805 schema_version: 1,
1806 event: "file_extracted",
1807 path: &item.file_str,
1808 ms: elapsed_ms,
1809 entities: n_entities,
1810 relationships: n_relationships,
1811 };
1812 if let Ok(line) = serde_json::to_string(&progress) {
1813 tracing::info!(target: "ingest_progress", "{}", line);
1814 }
1815
1816 let _ = tx.send((item.idx, result));
1820 });
1821 drop(tx);
1823 });
1824 });
1825
1826 let fail_fast = args.fail_fast;
1838
1839 for meta in &slots_meta {
1841 if let SlotMeta::Skip {
1842 file_str,
1843 derived_base,
1844 name_truncated,
1845 original_name,
1846 original_filename,
1847 reason,
1848 } = meta
1849 {
1850 output::emit_json_compact(&IngestFileEvent {
1851 file: file_str,
1852 name: derived_base,
1853 status: "skipped",
1854 truncated: *name_truncated,
1855 original_name: original_name.clone(),
1856 original_filename: original_filename.as_deref(),
1857 error: Some(reason.clone()),
1858 memory_id: None,
1859 action: None,
1860 body_length: 0,
1861 backend_invoked: None,
1862 })?;
1863 skipped += 1;
1864 }
1865 }
1866
1867 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1870 .iter()
1871 .enumerate()
1872 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1873 .collect();
1874
1875 tracing::info!(
1876 target: "ingest",
1877 phase = "persist_start",
1878 files = total_to_process,
1879 "phase B starting: persisting files incrementally as Phase A completes each one",
1880 );
1881
1882 for (idx, stage_result) in rx {
1886 if crate::shutdown_requested() {
1887 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1888 break;
1889 }
1890 let meta = meta_index.get(&idx).ok_or_else(|| {
1891 AppError::Internal(anyhow::anyhow!(
1892 "channel idx {idx} has no corresponding Process slot"
1893 ))
1894 })?;
1895 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1896 {
1897 SlotMeta::Process {
1898 file_str,
1899 derived_name,
1900 name_truncated,
1901 original_name,
1902 original_filename,
1903 } => (
1904 file_str,
1905 derived_name,
1906 name_truncated,
1907 original_name,
1908 original_filename,
1909 ),
1910 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1911 };
1912
1913 let conn = match conn_or_err.as_mut() {
1915 Ok(c) => c,
1916 Err(err_msg) => {
1917 let err_clone = err_msg.clone();
1918 output::emit_json_compact(&IngestFileEvent {
1919 file: file_str,
1920 name: derived_name,
1921 status: "failed",
1922 truncated: *name_truncated,
1923 original_name: original_name.clone(),
1924 original_filename: original_filename.as_deref(),
1925 error: Some(err_clone.clone()),
1926 memory_id: None,
1927 action: None,
1928 body_length: 0,
1929 backend_invoked: None,
1930 })?;
1931 failed += 1;
1932 if fail_fast {
1933 output::emit_json_compact(&IngestSummary {
1934 summary: true,
1935 dir: args.dir.display().to_string(),
1936 pattern: args.pattern.clone(),
1937 recursive: args.recursive,
1938 files_total: total,
1939 files_succeeded: succeeded,
1940 files_failed: failed,
1941 files_skipped: skipped,
1942 elapsed_ms: started.elapsed().as_millis() as u64,
1943 })?;
1944 return Err(AppError::Validation(format!(
1945 "ingest aborted on first failure: {err_clone}"
1946 )));
1947 }
1948 continue;
1949 }
1950 };
1951
1952 match stage_result {
1953 Ok(parts) => {
1954 for staged in parts {
1957 let part_name = staged.name.clone();
1958 match persist_staged(
1959 conn,
1960 &namespace,
1961 &memory_type_str,
1962 staged,
1963 args.force_merge,
1964 ) {
1965 Ok(FileSuccess {
1966 memory_id,
1967 action,
1968 body_length,
1969 backend_invoked: file_backend_invoked,
1970 }) => {
1971 output::emit_json_compact(&IngestFileEvent {
1972 file: file_str,
1973 name: &part_name,
1974 status: "indexed",
1975 truncated: *name_truncated,
1976 original_name: original_name.clone(),
1977 original_filename: original_filename.as_deref(),
1978 error: None,
1979 memory_id: Some(memory_id),
1980 action: Some(action),
1981 body_length,
1982 backend_invoked: file_backend_invoked,
1983 })?;
1984 succeeded += 1;
1985 }
1986 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1987 output::emit_json_compact(&IngestFileEvent {
1988 file: file_str,
1989 name: &part_name,
1990 status: "skipped",
1991 truncated: *name_truncated,
1992 original_name: original_name.clone(),
1993 original_filename: original_filename.as_deref(),
1994 error: Some(format!("{e}")),
1995 memory_id: None,
1996 action: Some("duplicate".to_string()),
1997 body_length: 0,
1998 backend_invoked: None,
1999 })?;
2000 skipped += 1;
2001 }
2002 Err(e) => {
2003 let err_msg = format!("{e}");
2004 output::emit_json_compact(&IngestFileEvent {
2005 file: file_str,
2006 name: &part_name,
2007 status: "failed",
2008 truncated: *name_truncated,
2009 original_name: original_name.clone(),
2010 original_filename: original_filename.as_deref(),
2011 error: Some(err_msg.clone()),
2012 memory_id: None,
2013 action: None,
2014 body_length: 0,
2015 backend_invoked: None,
2016 })?;
2017 failed += 1;
2018 if fail_fast {
2019 output::emit_json_compact(&IngestSummary {
2020 summary: true,
2021 dir: args.dir.display().to_string(),
2022 pattern: args.pattern.clone(),
2023 recursive: args.recursive,
2024 files_total: total,
2025 files_succeeded: succeeded,
2026 files_failed: failed,
2027 files_skipped: skipped,
2028 elapsed_ms: started.elapsed().as_millis() as u64,
2029 })?;
2030 return Err(AppError::Validation(format!(
2031 "ingest aborted on first failure: {err_msg}"
2032 )));
2033 }
2034 }
2035 }
2036 }
2037 }
2038 Err(e) => {
2039 let err_msg = format!("{e}");
2040 output::emit_json_compact(&IngestFileEvent {
2041 file: file_str,
2042 name: derived_name,
2043 status: "failed",
2044 truncated: *name_truncated,
2045 original_name: original_name.clone(),
2046 original_filename: original_filename.as_deref(),
2047 error: Some(err_msg.clone()),
2048 memory_id: None,
2049 action: None,
2050 body_length: 0,
2051 backend_invoked: None,
2052 })?;
2053 failed += 1;
2054 if fail_fast {
2055 output::emit_json_compact(&IngestSummary {
2056 summary: true,
2057 dir: args.dir.display().to_string(),
2058 pattern: args.pattern.clone(),
2059 recursive: args.recursive,
2060 files_total: total,
2061 files_succeeded: succeeded,
2062 files_failed: failed,
2063 files_skipped: skipped,
2064 elapsed_ms: started.elapsed().as_millis() as u64,
2065 })?;
2066 return Err(AppError::Validation(format!(
2067 "ingest aborted on first failure: {err_msg}"
2068 )));
2069 }
2070 }
2071 }
2072 }
2073
2074 producer_handle
2076 .join()
2077 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
2078
2079 if let Ok(ref conn) = conn_or_err {
2080 if succeeded > 0 {
2081 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
2082 }
2083 }
2084
2085 output::emit_json_compact(&IngestSummary {
2086 summary: true,
2087 dir: args.dir.display().to_string(),
2088 pattern: args.pattern.clone(),
2089 recursive: args.recursive,
2090 files_total: total,
2091 files_succeeded: succeeded,
2092 files_failed: failed,
2093 files_skipped: skipped,
2094 elapsed_ms: started.elapsed().as_millis() as u64,
2095 })?;
2096
2097 if args.enrich_after && succeeded > 0 {
2098 output::emit_json_compact(&serde_json::json!({
2099 "event": "enrich_phase_started",
2100 "operation": "memory-bindings"
2101 }))?;
2102 let enrich_args = super::enrich::EnrichArgs {
2103 operation: Some(super::enrich::EnrichOperation::MemoryBindings),
2104 mode: Some(super::enrich::EnrichMode::ClaudeCode),
2105 limit: None,
2106 target: super::enrich::ReEmbedTarget::Memories,
2107 dry_run: false,
2108 namespace: args.namespace.clone(),
2109 claude_binary: args.claude_binary.clone(),
2110 claude_model: args.claude_model.clone(),
2111 claude_timeout: args.claude_timeout,
2112 codex_binary: args.codex_binary.clone(),
2113 codex_model: args.codex_model.clone(),
2114 codex_timeout: args.codex_timeout,
2115 opencode_binary: args.opencode_binary.clone(),
2116 opencode_model: args.opencode_model.clone(),
2117 opencode_timeout: args.opencode_timeout,
2118 openrouter_model: None,
2119 openrouter_api_key: None,
2120 openrouter_timeout: 300,
2121 openrouter_base_url: None,
2122 db: args.db.clone(),
2123 json: false,
2124 resume: false,
2125 retry_failed: false,
2126 max_cost_usd: args.max_cost_usd,
2127 llm_parallelism: args.llm_parallelism as u32,
2128 wait_job_singleton: args.wait_job_singleton,
2129 force_job_singleton: args.force_job_singleton,
2130 names: Vec::new(),
2131 names_file: None,
2132 preflight_check: false,
2133 fallback_mode: None,
2134 rate_limit_buffer: 300,
2135 max_load_check: true,
2136 circuit_breaker_threshold: 5,
2137 preserve_threshold: 0.7,
2138 codex_model_validate: true,
2139 codex_model_fallback: None,
2140 min_output_chars: 500,
2141 max_output_chars: 2000,
2142 preserve_check: true,
2143 prompt_template: None,
2144 until_empty: false,
2145 max_runtime: None,
2146 max_attempts: 5,
2147 status: false,
2148 rest_concurrency: None,
2149 list_dead: false,
2152 requeue_dead: false,
2153 prune_dead_orphans: false,
2154 ignore_backoff: false,
2155 body_extract_graph_only: false,
2156 };
2157 match super::enrich::run(&enrich_args, llm_backend, embedding_backend) {
2158 Ok(()) => {
2159 output::emit_json_compact(&serde_json::json!({
2160 "event": "enrich_phase_completed"
2161 }))?;
2162 }
2163 Err(e) => {
2164 tracing::warn!(error = %e, "enrich --operation memory-bindings failed after ingest");
2165 output::emit_json_compact(&serde_json::json!({
2166 "event": "enrich_phase_failed",
2167 "error": e.to_string()
2168 }))?;
2169 }
2170 }
2171 }
2172
2173 Ok(())
2174}
2175
2176fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
2182 ensure_db_ready(paths)?;
2183 let conn = open_rw(&paths.db)?;
2184 Ok(conn)
2185}
2186
2187pub(crate) fn collect_files(
2188 dir: &Path,
2189 pattern: &str,
2190 recursive: bool,
2191 out: &mut Vec<PathBuf>,
2192) -> Result<(), AppError> {
2193 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
2194 for entry in entries {
2195 let entry = entry.map_err(AppError::Io)?;
2196 let path = entry.path();
2197 let file_type = entry.file_type().map_err(AppError::Io)?;
2198 if file_type.is_file() {
2199 let name = entry.file_name();
2200 let name_str = name.to_string_lossy();
2201 if matches_pattern(&name_str, pattern) {
2202 out.push(path);
2203 }
2204 } else if file_type.is_dir() && recursive {
2205 collect_files(&path, pattern, recursive, out)?;
2206 }
2207 }
2208 Ok(())
2209}
2210
2211fn matches_pattern(name: &str, pattern: &str) -> bool {
2212 if let Some(suffix) = pattern.strip_prefix('*') {
2213 name.ends_with(suffix)
2214 } else if let Some(prefix) = pattern.strip_suffix('*') {
2215 name.starts_with(prefix)
2216 } else {
2217 name == pattern
2218 }
2219}
2220
2221pub(crate) fn validate_name_prefix(
2238 prefix: &str,
2239 max_name_length: usize,
2240) -> Result<usize, AppError> {
2241 if prefix.is_empty() {
2242 return Err(AppError::Validation(
2243 "--name-prefix cannot be empty".to_string(),
2244 ));
2245 }
2246 let starts_lower = prefix
2247 .chars()
2248 .next()
2249 .is_some_and(|c| c.is_ascii_lowercase());
2250 let all_slug_chars = prefix
2251 .chars()
2252 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-');
2253 if !starts_lower || !all_slug_chars {
2254 return Err(AppError::Validation(format!(
2255 "--name-prefix '{prefix}' must start with a lowercase letter and contain \
2256 only lowercase letters, digits and hyphens (kebab-case)"
2257 )));
2258 }
2259 let cap = crate::constants::MAX_MEMORY_NAME_LEN;
2260 if prefix.len() >= cap {
2261 return Err(AppError::LimitExceeded(format!(
2262 "--name-prefix is {} chars; prefixed names would exceed the {cap}-char \
2263 name cap (MAX_MEMORY_NAME_LEN)",
2264 prefix.len()
2265 )));
2266 }
2267 Ok(max_name_length.min(cap - prefix.len()))
2268}
2269
2270pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
2271 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
2272 let lowered: String = stem
2273 .nfd()
2274 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
2275 .map(|c| {
2276 if c == '_' || c.is_whitespace() {
2277 '-'
2278 } else {
2279 c
2280 }
2281 })
2282 .map(|c| c.to_ascii_lowercase())
2283 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
2284 .collect();
2285 let collapsed = collapse_dashes(&lowered);
2286 let trimmed_raw = collapsed.trim_matches('-').to_string();
2287 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
2289 format!("doc-{trimmed_raw}")
2290 } else {
2291 trimmed_raw
2292 };
2293 if trimmed.len() > max_len {
2294 let truncated = trimmed[..max_len].trim_matches('-').to_string();
2295 tracing::warn!(
2300 target: "ingest",
2301 original = %trimmed,
2302 truncated_to = %truncated,
2303 max_len = max_len,
2304 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
2305 );
2306 (truncated, true, Some(trimmed))
2307 } else {
2308 (trimmed, false, None)
2309 }
2310}
2311
2312fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
2325 if !taken.contains(base) {
2326 return Ok(base.to_string());
2327 }
2328 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
2329 let candidate = format!("{base}-{suffix}");
2330 if !taken.contains(&candidate) {
2331 tracing::warn!(
2332 target: "ingest",
2333 base = %base,
2334 resolved = %candidate,
2335 suffix,
2336 "memory name collision resolved with numeric suffix"
2337 );
2338 return Ok(candidate);
2339 }
2340 }
2341 Err(AppError::Validation(format!(
2342 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
2343 )))
2344}
2345
2346fn collapse_dashes(s: &str) -> String {
2347 let mut out = String::with_capacity(s.len());
2348 let mut prev_dash = false;
2349 for c in s.chars() {
2350 if c == '-' {
2351 if !prev_dash {
2352 out.push('-');
2353 }
2354 prev_dash = true;
2355 } else {
2356 out.push(c);
2357 prev_dash = false;
2358 }
2359 }
2360 out
2361}
2362
2363#[cfg(test)]
2364mod tests {
2365 use super::*;
2366 use std::path::PathBuf;
2367
2368 #[test]
2370 fn validate_name_prefix_shrinks_budget_to_fit_name_cap() {
2371 let budget = validate_name_prefix("projx-team", 60).unwrap();
2374 assert_eq!(budget, 60);
2375 let long_prefix = "p".repeat(75);
2377 let budget = validate_name_prefix(&long_prefix, 60).unwrap();
2378 assert_eq!(budget, 5, "80-char cap minus 75-char prefix leaves 5");
2379 }
2380
2381 #[test]
2382 fn validate_name_prefix_rejects_invalid_slugs() {
2383 for bad in ["", "-lead", "Upper", "has_underscore", "acentuação", "1x"] {
2384 let err = validate_name_prefix(bad, 60).unwrap_err();
2385 assert_eq!(err.exit_code(), 1, "prefix '{bad}' must be Validation");
2386 }
2387 }
2388
2389 #[test]
2390 fn validate_name_prefix_too_long_is_limit_exceeded() {
2391 let huge = "p".repeat(crate::constants::MAX_MEMORY_NAME_LEN);
2392 let err = validate_name_prefix(&huge, 60).unwrap_err();
2393 assert_eq!(err.exit_code(), 6, "prefix >= name cap must be exit 6");
2394 assert!(
2395 err.to_string().contains("MAX_MEMORY_NAME_LEN"),
2396 "obtido: {err}"
2397 );
2398 }
2399
2400 #[test]
2401 fn name_prefix_applies_after_kebab_normalization_and_fits_cap() {
2402 let prefix = "projx-";
2403 let budget = validate_name_prefix(prefix, 60).unwrap();
2404 let (base, _, _) = derive_kebab_name(&PathBuf::from("My File Name.md"), budget);
2405 let final_name = format!("{prefix}{base}");
2406 assert_eq!(final_name, "projx-my-file-name");
2407 assert!(final_name.len() <= crate::constants::MAX_MEMORY_NAME_LEN);
2408 assert!(crate::constants::name_slug_regex().is_match(&final_name));
2409 }
2410
2411 #[test]
2418 fn ingest_mode_none_with_resume_is_rejected() {
2419 use crate::cli::{Cli, Commands};
2420 use clap::Parser;
2421
2422 let none_resume = Cli::try_parse_from([
2423 "sqlite-graphrag",
2424 "ingest",
2425 "./docs",
2426 "--mode",
2427 "none",
2428 "--resume",
2429 ])
2430 .expect("parse succeeds; the conflict is value-conditional");
2431 let args = match none_resume.command {
2432 Some(Commands::Ingest(a)) => a,
2433 other => panic!("expected ingest, got {other:?}"),
2434 };
2435 assert!(
2436 validate_mode_conditional_flags_ingest(&args).is_err(),
2437 "--mode none + --resume must be rejected fail-fast"
2438 );
2439
2440 let claude_resume = Cli::try_parse_from([
2442 "sqlite-graphrag",
2443 "ingest",
2444 "./docs",
2445 "--mode",
2446 "claude-code",
2447 "--resume",
2448 ])
2449 .expect("parse");
2450 let args = match claude_resume.command {
2451 Some(Commands::Ingest(a)) => a,
2452 other => panic!("expected ingest, got {other:?}"),
2453 };
2454 assert!(
2455 validate_mode_conditional_flags_ingest(&args).is_ok(),
2456 "--mode claude-code + --resume is valid and must pass"
2457 );
2458 }
2459
2460 fn setup_ingest_conn() -> Connection {
2461 crate::storage::connection::register_vec_extension();
2462 let mut conn = Connection::open_in_memory().unwrap();
2463 crate::migrations::runner().run(&mut conn).unwrap();
2464 conn
2465 }
2466
2467 fn make_staged(name: &str, body: &str) -> StagedFile {
2468 StagedFile {
2469 body: body.to_string(),
2470 body_hash: blake3::hash(body.as_bytes()).to_hex().to_string(),
2471 snippet: body.chars().take(200).collect(),
2472 name: name.to_string(),
2473 description: "desc".to_string(),
2474 embedding: None,
2475 chunk_embeddings: None,
2476 chunks_info: Vec::new(),
2477 entities: Vec::new(),
2478 relationships: Vec::new(),
2479 entity_embeddings: None,
2480 urls: Vec::new(),
2481 backend_invoked: None,
2482 }
2483 }
2484
2485 #[test]
2488 fn persist_staged_force_merge_updates_existing() {
2489 let mut conn = setup_ingest_conn();
2490
2491 let first = persist_staged(
2492 &mut conn,
2493 "global",
2494 "document",
2495 make_staged("doc-a", "v1"),
2496 false,
2497 )
2498 .expect("create");
2499 assert_eq!(first.action, "created");
2500
2501 let dup = persist_staged(
2503 &mut conn,
2504 "global",
2505 "document",
2506 make_staged("doc-a", "v2-changed"),
2507 false,
2508 );
2509 assert!(matches!(dup, Err(AppError::Duplicate(_))));
2510
2511 let upd = persist_staged(
2513 &mut conn,
2514 "global",
2515 "document",
2516 make_staged("doc-a", "v2-changed"),
2517 true,
2518 )
2519 .expect("update");
2520 assert_eq!(upd.action, "updated");
2521 assert_eq!(upd.memory_id, first.memory_id);
2522 let body: String = conn
2523 .query_row(
2524 "SELECT body FROM memories WHERE id = ?1",
2525 rusqlite::params![first.memory_id],
2526 |r| r.get(0),
2527 )
2528 .unwrap();
2529 assert_eq!(body, "v2-changed");
2530 }
2531
2532 #[test]
2534 fn persist_staged_dedupes_by_body_hash() {
2535 let mut conn = setup_ingest_conn();
2536 persist_staged(
2537 &mut conn,
2538 "global",
2539 "document",
2540 make_staged("parte-1", "identical content"),
2541 false,
2542 )
2543 .expect("create");
2544
2545 let res = persist_staged(
2547 &mut conn,
2548 "global",
2549 "document",
2550 make_staged("part-01", "identical content"),
2551 false,
2552 );
2553 match res {
2554 Err(AppError::Duplicate(msg)) => assert!(msg.contains("body_hash")),
2555 other => panic!("expected body_hash dedup duplicate, got {other:?}"),
2556 }
2557 let n: i64 = conn
2559 .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2560 .unwrap();
2561 assert_eq!(n, 1);
2562 }
2563
2564 #[test]
2566 fn ingest_force_merge_flag_parses() {
2567 use crate::cli::{Cli, Commands};
2568 use clap::Parser;
2569 let cli = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs", "--force-merge"])
2570 .expect("parse");
2571 match cli.command {
2572 Some(Commands::Ingest(a)) => assert!(a.force_merge),
2573 other => panic!("expected ingest, got {other:?}"),
2574 }
2575 let cli2 = Cli::try_parse_from(["sqlite-graphrag", "ingest", "./docs"]).expect("parse");
2577 match cli2.command {
2578 Some(Commands::Ingest(a)) => assert!(!a.force_merge),
2579 other => panic!("expected ingest, got {other:?}"),
2580 }
2581 }
2582
2583 #[test]
2584 fn matches_pattern_suffix() {
2585 assert!(matches_pattern("foo.md", "*.md"));
2586 assert!(!matches_pattern("foo.txt", "*.md"));
2587 assert!(matches_pattern("foo.md", "*"));
2588 }
2589
2590 #[test]
2591 fn matches_pattern_prefix() {
2592 assert!(matches_pattern("README.md", "README*"));
2593 assert!(!matches_pattern("CHANGELOG.md", "README*"));
2594 }
2595
2596 #[test]
2597 fn matches_pattern_exact() {
2598 assert!(matches_pattern("README.md", "README.md"));
2599 assert!(!matches_pattern("readme.md", "README.md"));
2600 }
2601
2602 #[test]
2603 fn derive_kebab_underscore_to_dash() {
2604 let p = PathBuf::from("/tmp/claude_code_headless.md");
2605 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2606 assert_eq!(name, "claude-code-headless");
2607 assert!(!truncated);
2608 assert!(original.is_none());
2609 }
2610
2611 #[test]
2612 fn derive_kebab_uppercase_lowered() {
2613 let p = PathBuf::from("/tmp/README.md");
2614 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2615 assert_eq!(name, "readme");
2616 assert!(!truncated);
2617 assert!(original.is_none());
2618 }
2619
2620 #[test]
2621 fn derive_kebab_strips_non_kebab_chars() {
2622 let p = PathBuf::from("/tmp/some@weird#name!.md");
2623 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2624 assert_eq!(name, "someweirdname");
2625 assert!(!truncated);
2626 assert!(original.is_none());
2627 }
2628
2629 #[test]
2632 fn derive_kebab_folds_accented_letters_to_ascii() {
2633 let p = PathBuf::from("/tmp/açaí.md");
2634 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2635 assert_eq!(name, "acai", "got '{name}'");
2636 }
2637
2638 #[test]
2639 fn derive_kebab_handles_naive_with_diaeresis() {
2640 let p = PathBuf::from("/tmp/naïve-test.md");
2641 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2642 assert_eq!(name, "naive-test", "got '{name}'");
2643 }
2644
2645 #[test]
2646 fn derive_kebab_drops_emoji_keeps_word() {
2647 let p = PathBuf::from("/tmp/🚀-rocket.md");
2648 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2649 assert_eq!(name, "rocket", "got '{name}'");
2650 }
2651
2652 #[test]
2653 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
2654 let p = PathBuf::from("/tmp/açaí🦜.md");
2655 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2656 assert_eq!(name, "acai", "got '{name}'");
2657 }
2658
2659 #[test]
2660 fn derive_kebab_pure_emoji_yields_empty() {
2661 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
2662 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2663 assert!(name.is_empty(), "got '{name}'");
2664 }
2665
2666 #[test]
2667 fn derive_kebab_collapses_consecutive_dashes() {
2668 let p = PathBuf::from("/tmp/a__b___c.md");
2669 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2670 assert_eq!(name, "a-b-c");
2671 assert!(!truncated);
2672 assert!(original.is_none());
2673 }
2674
2675 #[test]
2676 fn derive_kebab_truncates_to_60_chars() {
2677 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
2678 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2679 assert!(name.len() <= 60, "got len {}", name.len());
2680 assert!(truncated);
2681 assert!(original.is_some());
2682 assert!(original.unwrap().len() > 60);
2683 }
2684
2685 #[test]
2686 fn collect_files_finds_md_files() {
2687 let tmp = tempfile::tempdir().expect("tempdir");
2688 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2689 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
2690 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
2691 let mut out = Vec::new();
2692 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2693 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
2694 }
2695
2696 #[test]
2697 fn collect_files_recursive_descends_subdirs() {
2698 let tmp = tempfile::tempdir().expect("tempdir");
2699 let sub = tmp.path().join("sub");
2700 std::fs::create_dir(&sub).unwrap();
2701 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2702 std::fs::write(sub.join("b.md"), "y").unwrap();
2703 let mut out = Vec::new();
2704 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
2705 assert_eq!(out.len(), 2);
2706 }
2707
2708 #[test]
2709 fn collect_files_non_recursive_skips_subdirs() {
2710 let tmp = tempfile::tempdir().expect("tempdir");
2711 let sub = tmp.path().join("sub");
2712 std::fs::create_dir(&sub).unwrap();
2713 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
2714 std::fs::write(sub.join("b.md"), "y").unwrap();
2715 let mut out = Vec::new();
2716 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
2717 assert_eq!(out.len(), 1);
2718 }
2719
2720 #[test]
2723 fn derive_kebab_long_basename_truncated_within_cap() {
2724 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
2725 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
2726 assert!(
2727 name.len() <= DERIVED_NAME_MAX_LEN,
2728 "truncated name must respect cap; got {} chars",
2729 name.len()
2730 );
2731 assert!(!name.is_empty());
2732 assert!(truncated);
2733 assert!(original.is_some());
2734 }
2735
2736 #[test]
2737 fn unique_name_returns_base_when_free() {
2738 let taken: BTreeSet<String> = BTreeSet::new();
2739 let resolved = unique_name("note", &taken).expect("must resolve");
2740 assert_eq!(resolved, "note");
2741 }
2742
2743 #[test]
2744 fn unique_name_appends_first_free_suffix_on_collision() {
2745 let mut taken: BTreeSet<String> = BTreeSet::new();
2746 taken.insert("note".to_string());
2747 taken.insert("note-1".to_string());
2748 let resolved = unique_name("note", &taken).expect("must resolve");
2749 assert_eq!(resolved, "note-2");
2750 }
2751
2752 #[test]
2753 fn unique_name_errors_after_collision_cap() {
2754 let mut taken: BTreeSet<String> = BTreeSet::new();
2755 taken.insert("note".to_string());
2756 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
2757 taken.insert(format!("note-{i}"));
2758 }
2759 let err = unique_name("note", &taken).expect_err("must surface error");
2760 assert!(matches!(err, AppError::Validation(_)));
2761 }
2762
2763 #[test]
2766 fn validate_relation_format_accepts_valid_relations() {
2767 use crate::parsers::{is_canonical_relation, validate_relation_format};
2768 assert!(validate_relation_format("applies_to").is_ok());
2769 assert!(validate_relation_format("depends_on").is_ok());
2770 assert!(validate_relation_format("implements").is_ok());
2771 assert!(validate_relation_format("").is_err());
2772 assert!(is_canonical_relation("applies_to"));
2773 assert!(!is_canonical_relation("implements"));
2774 }
2775
2776 use serial_test::serial;
2779
2780 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
2782 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
2783 let prev = std::env::var(key).ok();
2784 match value {
2785 Some(v) => std::env::set_var(key, v),
2786 None => std::env::remove_var(key),
2787 }
2788 f();
2789 match prev {
2790 Some(p) => std::env::set_var(key, p),
2791 None => std::env::remove_var(key),
2792 }
2793 }
2794
2795 #[test]
2796 #[serial]
2797 fn env_low_memory_enabled_unset_returns_false() {
2798 with_env_var(None, || assert!(!env_low_memory_enabled()));
2799 }
2800
2801 #[test]
2802 #[serial]
2803 fn env_low_memory_enabled_empty_returns_false() {
2804 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
2805 }
2806
2807 #[test]
2808 #[serial]
2809 fn env_low_memory_enabled_truthy_values_return_true() {
2810 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
2811 with_env_var(Some(v), || {
2812 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
2813 });
2814 }
2815 }
2816
2817 #[test]
2818 #[serial]
2819 fn env_low_memory_enabled_falsy_values_return_false() {
2820 for v in ["0", "false", "FALSE", "no", "off"] {
2821 with_env_var(Some(v), || {
2822 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
2823 });
2824 }
2825 }
2826
2827 #[test]
2828 #[serial]
2829 fn env_low_memory_enabled_unrecognized_value_returns_false() {
2830 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
2831 }
2832
2833 #[test]
2834 #[serial]
2835 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
2836 with_env_var(None, || {
2837 assert_eq!(resolve_parallelism(true, Some(4)), 1);
2838 assert_eq!(resolve_parallelism(true, Some(8)), 1);
2839 assert_eq!(resolve_parallelism(true, None), 1);
2840 });
2841 }
2842
2843 #[test]
2844 #[serial]
2845 fn resolve_parallelism_env_forces_one_when_flag_off() {
2846 with_env_var(Some("1"), || {
2847 assert_eq!(resolve_parallelism(false, Some(4)), 1);
2848 assert_eq!(resolve_parallelism(false, None), 1);
2849 });
2850 }
2851
2852 #[test]
2853 #[serial]
2854 fn resolve_parallelism_falsy_env_does_not_override() {
2855 with_env_var(Some("0"), || {
2856 assert_eq!(resolve_parallelism(false, Some(4)), 4);
2857 });
2858 }
2859
2860 #[test]
2861 #[serial]
2862 fn resolve_parallelism_explicit_value_when_low_memory_off() {
2863 with_env_var(None, || {
2864 assert_eq!(resolve_parallelism(false, Some(3)), 3);
2865 assert_eq!(resolve_parallelism(false, Some(1)), 1);
2866 });
2867 }
2868
2869 #[test]
2870 #[serial]
2871 fn resolve_parallelism_default_when_unset() {
2872 with_env_var(None, || {
2873 let p = resolve_parallelism(false, None);
2874 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
2875 });
2876 }
2877
2878 #[test]
2879 fn ingest_args_parses_low_memory_flag_via_clap() {
2880 use clap::Parser;
2881 let cli = crate::cli::Cli::try_parse_from([
2884 "sqlite-graphrag",
2885 "ingest",
2886 "/tmp/dummy",
2887 "--type",
2888 "document",
2889 "--low-memory",
2890 ])
2891 .expect("parse must succeed");
2892 match cli.command {
2893 Some(crate::cli::Commands::Ingest(args)) => {
2894 assert!(args.low_memory, "--low-memory must set field to true");
2895 }
2896 _ => panic!("expected Ingest subcommand"),
2897 }
2898 }
2899
2900 #[test]
2901 fn ingest_args_low_memory_defaults_false() {
2902 use clap::Parser;
2903 let cli = crate::cli::Cli::try_parse_from([
2904 "sqlite-graphrag",
2905 "ingest",
2906 "/tmp/dummy",
2907 "--type",
2908 "document",
2909 ])
2910 .expect("parse must succeed");
2911 match cli.command {
2912 Some(crate::cli::Commands::Ingest(args)) => {
2913 assert!(!args.low_memory, "default must be false");
2914 }
2915 _ => panic!("expected Ingest subcommand"),
2916 }
2917 }
2918
2919 #[test]
2922 fn dry_run_budget_event_serializes_chunk_and_token_counts() {
2923 let ev = IngestDryRunBudget {
2924 budget: true,
2925 file: "/tmp/doc.md",
2926 name: "doc",
2927 bytes: 1234,
2928 chunk_count: 3,
2929 token_count: 567,
2930 partition_count: 1,
2931 exceeds_limits: false,
2932 };
2933 let json = serde_json::to_string(&ev).expect("serialize budget event");
2934 assert!(json.contains("\"chunk_count\":3"), "got: {json}");
2935 assert!(json.contains("\"token_count\":567"), "got: {json}");
2936 assert!(json.contains("\"partition_count\":1"), "got: {json}");
2937 assert!(json.contains("\"exceeds_limits\":false"), "got: {json}");
2938 }
2939
2940 #[test]
2941 fn assess_body_budget_feeds_dry_run_with_positive_counts() {
2942 let body = "# Title\n\nsome representative body text for the budget.";
2945 let budget = chunking::assess_body_budget(body);
2946 assert!(budget.chunk_count >= 1);
2947 assert!(budget.approx_tokens >= 1);
2948 assert_eq!(budget.partition_count, 1);
2949 }
2950}