1use crate::chunking;
35use crate::cli::MemoryType;
36use crate::entity_type::EntityType;
37use crate::errors::AppError;
38use crate::i18n::errors_msg;
39use crate::output::{self, JsonOutputFormat};
40use crate::paths::AppPaths;
41use crate::storage::chunks as storage_chunks;
42use crate::storage::connection::{ensure_db_ready, open_rw};
43use crate::storage::entities::{NewEntity, NewRelationship};
44use crate::storage::memories::NewMemory;
45use crate::storage::{entities, memories, urls as storage_urls, versions};
46use rayon::prelude::*;
47use rusqlite::Connection;
48use serde::Serialize;
49use std::collections::BTreeSet;
50use std::path::{Path, PathBuf};
51use std::sync::mpsc;
52use unicode_normalization::UnicodeNormalization;
53
54use crate::constants::DERIVED_NAME_MAX_LEN;
55
56const MAX_NAME_COLLISION_SUFFIX: usize = 1000;
59
60#[derive(clap::Args)]
61#[command(after_long_help = "EXAMPLES:\n \
62 # Ingest every Markdown file under ./docs as `document` memories\n \
63 sqlite-graphrag ingest ./docs --type document\n\n \
64 # Ingest .txt files recursively under ./notes\n \
65 sqlite-graphrag ingest ./notes --type note --pattern '*.txt' --recursive\n\n \
66 # Enable GLiNER NER extraction (disabled by default, slower)\n \
67 sqlite-graphrag ingest ./big-corpus --type reference --enable-ner\n\n \
68 # Preview file-to-name mapping without ingesting\n \
69 sqlite-graphrag ingest ./docs --dry-run\n\n \
70 # LLM-curated extraction via Claude Code CLI\n \
71 sqlite-graphrag ingest ./docs --mode claude-code --recursive --json\n\n \
72 # Resume interrupted claude-code ingest\n \
73 sqlite-graphrag ingest ./docs --mode claude-code --resume --json\n\n \
74 # Claude Code with budget cap and custom timeout\n \
75 sqlite-graphrag ingest ./docs --mode claude-code --max-cost-usd 5.00 --claude-timeout 600 --json\n\n \
76AUTHENTICATION:\n \
77 --mode claude-code: Uses existing Claude Code authentication.\n \
78 OAuth (Pro/Max/Team): works automatically from ~/.claude/.credentials.json\n \
79 API key: set ANTHROPIC_API_KEY for faster startup (optional)\n\n \
80 --mode codex: Uses existing Codex CLI authentication.\n \
81 Device auth: run `codex auth login` first\n \
82 API key: set OPENAI_API_KEY (optional)\n\n \
83NOTES:\n \
84 Each file becomes a separate memory. Names derive from file basenames\n \
85 (kebab-case, lowercase, ASCII). Output is NDJSON: one JSON object per file,\n \
86 followed by a final summary line with counts. Per-file errors are reported\n \
87 inline and processing continues unless --fail-fast is set.")]
88pub struct IngestArgs {
89 #[arg(
91 value_name = "DIR",
92 help = "Directory to ingest recursively (each matching file becomes a memory)"
93 )]
94 pub dir: PathBuf,
95
96 #[arg(long, value_enum, default_value_t = MemoryType::Document)]
98 pub r#type: MemoryType,
99
100 #[arg(long, default_value = "*.md")]
103 pub pattern: String,
104
105 #[arg(long, default_value_t = false)]
107 pub recursive: bool,
108
109 #[arg(
110 long,
111 env = "SQLITE_GRAPHRAG_ENABLE_NER",
112 value_parser = crate::parsers::parse_bool_flexible,
113 action = clap::ArgAction::Set,
114 num_args = 0..=1,
115 default_missing_value = "true",
116 default_value = "false",
117 help = "Enable automatic GLiNER NER entity/relationship extraction (disabled by default)"
118 )]
119 pub enable_ner: bool,
120 #[arg(
121 long,
122 env = "SQLITE_GRAPHRAG_GLINER_VARIANT",
123 default_value = "fp32",
124 help = "GLiNER model variant: fp32 (1.1GB, best quality), fp16 (580MB), int8 (349MB, fastest but may miss entities on short texts), q4, q4f16"
125 )]
126 pub gliner_variant: String,
127
128 #[arg(long, default_value_t = false, hide = true)]
130 pub skip_extraction: bool,
131
132 #[arg(long, default_value_t = false)]
134 pub fail_fast: bool,
135
136 #[arg(long, default_value_t = false)]
138 pub dry_run: bool,
139
140 #[arg(long, default_value_t = 10_000)]
142 pub max_files: usize,
143
144 #[arg(long)]
146 pub namespace: Option<String>,
147
148 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
150 pub db: Option<String>,
151
152 #[arg(long, value_enum, default_value_t = JsonOutputFormat::Json)]
153 pub format: JsonOutputFormat,
154
155 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
156 pub json: bool,
157
158 #[arg(
160 long,
161 help = "Number of files to extract+embed in parallel; default = max(1, cpus/2).min(4)"
162 )]
163 pub ingest_parallelism: Option<usize>,
164
165 #[arg(
173 long,
174 default_value_t = false,
175 help = "Forces single-threaded ingest (--ingest-parallelism 1) to reduce RSS pressure. \
176 Recommended for environments with <4 GB available RAM or container/cgroup \
177 constraints. Trade-off: 3-4x longer wall time. Also honored via \
178 SQLITE_GRAPHRAG_LOW_MEMORY=1 env var."
179 )]
180 pub low_memory: bool,
181
182 #[arg(long, default_value_t = crate::constants::DEFAULT_MAX_RSS_MB,
184 help = "Maximum process RSS in MiB; abort if exceeded during embedding (default: 8192)")]
185 pub max_rss_mb: u64,
186
187 #[arg(long, default_value_t = crate::constants::DERIVED_NAME_MAX_LEN,
192 help = "Maximum length for derived memory names (default: 60)")]
193 pub max_name_length: usize,
194
195 #[arg(long, value_enum, default_value_t = IngestMode::None)]
197 pub mode: IngestMode,
198
199 #[arg(long, env = "SQLITE_GRAPHRAG_CLAUDE_BINARY")]
201 pub claude_binary: Option<std::path::PathBuf>,
202
203 #[arg(long)]
205 pub claude_model: Option<String>,
206
207 #[arg(long, default_value_t = false)]
209 pub resume: bool,
210
211 #[arg(long, default_value_t = false)]
213 pub retry_failed: bool,
214
215 #[arg(long, default_value_t = false)]
217 pub keep_queue: bool,
218
219 #[arg(long, default_value = ".ingest-queue.sqlite")]
221 pub queue_db: String,
222
223 #[arg(long, default_value_t = 60)]
225 pub rate_limit_wait: u64,
226
227 #[arg(long)]
229 pub max_cost_usd: Option<f64>,
230
231 #[arg(
233 long,
234 default_value_t = 300,
235 help = "Timeout in seconds for each claude -p invocation (default: 300)"
236 )]
237 pub claude_timeout: u64,
238
239 #[arg(
241 long,
242 env = "SQLITE_GRAPHRAG_CODEX_BINARY",
243 help = "Explicit path to the Codex CLI binary (only with --mode codex)"
244 )]
245 pub codex_binary: Option<PathBuf>,
246
247 #[arg(
249 long,
250 help = "Model override for Codex extraction (e.g. o4-mini, gpt-5.1-codex)"
251 )]
252 pub codex_model: Option<String>,
253
254 #[arg(
256 long,
257 default_value_t = 300,
258 help = "Timeout in seconds for each codex exec invocation (default: 300)"
259 )]
260 pub codex_timeout: u64,
261}
262
263#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
265pub enum IngestMode {
266 None,
268 Gliner,
270 ClaudeCode,
272 Codex,
274}
275
276fn env_low_memory_enabled() -> bool {
281 match std::env::var("SQLITE_GRAPHRAG_LOW_MEMORY") {
282 Ok(v) if v.is_empty() => false,
283 Ok(v) => match v.to_lowercase().as_str() {
284 "1" | "true" | "yes" | "on" => true,
285 "0" | "false" | "no" | "off" => false,
286 other => {
287 tracing::warn!(
288 target: "ingest",
289 value = %other,
290 "SQLITE_GRAPHRAG_LOW_MEMORY value not recognized; treating as disabled"
291 );
292 false
293 }
294 },
295 Err(_) => false,
296 }
297}
298
299fn resolve_parallelism(low_memory_flag: bool, ingest_parallelism: Option<usize>) -> usize {
311 let env_flag = env_low_memory_enabled();
312 let low_memory = low_memory_flag || env_flag;
313
314 if low_memory {
315 if let Some(n) = ingest_parallelism {
316 if n > 1 {
317 tracing::warn!(
318 target: "ingest",
319 requested = n,
320 "--ingest-parallelism overridden by --low-memory; using 1"
321 );
322 }
323 }
324 if low_memory_flag {
325 tracing::info!(
326 target: "ingest",
327 source = "flag",
328 "low-memory mode enabled: forcing --ingest-parallelism 1"
329 );
330 } else {
331 tracing::info!(
332 target: "ingest",
333 source = "env",
334 "low-memory mode enabled via SQLITE_GRAPHRAG_LOW_MEMORY: forcing --ingest-parallelism 1"
335 );
336 }
337 return 1;
338 }
339
340 ingest_parallelism
341 .unwrap_or_else(|| {
342 std::thread::available_parallelism()
343 .map(|v| v.get() / 2)
344 .unwrap_or(1)
345 .clamp(1, 4)
346 })
347 .max(1)
348}
349
350#[derive(Serialize)]
351struct IngestFileEvent<'a> {
352 file: &'a str,
353 name: &'a str,
354 status: &'a str,
355 truncated: bool,
357 #[serde(skip_serializing_if = "Option::is_none")]
359 original_name: Option<String>,
360 #[serde(skip_serializing_if = "Option::is_none")]
362 original_filename: Option<&'a str>,
363 #[serde(skip_serializing_if = "Option::is_none")]
364 error: Option<String>,
365 #[serde(skip_serializing_if = "Option::is_none")]
366 memory_id: Option<i64>,
367 #[serde(skip_serializing_if = "Option::is_none")]
368 action: Option<String>,
369 body_length: usize,
371}
372
373#[derive(Serialize)]
374struct IngestSummary {
375 summary: bool,
376 dir: String,
377 pattern: String,
378 recursive: bool,
379 files_total: usize,
380 files_succeeded: usize,
381 files_failed: usize,
382 files_skipped: usize,
383 elapsed_ms: u64,
384}
385
386struct FileSuccess {
388 memory_id: i64,
389 action: String,
390 body_length: usize,
391}
392
393#[derive(Serialize)]
396struct StageProgressEvent<'a> {
397 schema_version: u8,
398 event: &'a str,
399 path: &'a str,
400 ms: u64,
401 entities: usize,
402 relationships: usize,
403}
404
405struct StagedFile {
408 body: String,
409 body_hash: String,
410 snippet: String,
411 name: String,
412 description: String,
413 embedding: Vec<f32>,
414 chunk_embeddings: Option<Vec<Vec<f32>>>,
415 chunks_info: Vec<crate::chunking::Chunk>,
416 entities: Vec<NewEntity>,
417 relationships: Vec<NewRelationship>,
418 entity_embeddings: Vec<Vec<f32>>,
419 urls: Vec<crate::extraction::ExtractedUrl>,
420}
421
422fn stage_file(
425 _idx: usize,
426 path: &Path,
427 name: &str,
428 paths: &AppPaths,
429 enable_ner: bool,
430 gliner_variant: crate::extraction::GlinerVariant,
431 max_rss_mb: u64,
432) -> Result<StagedFile, AppError> {
433 use crate::constants::*;
434
435 if name.len() > MAX_MEMORY_NAME_LEN {
436 return Err(AppError::LimitExceeded(
437 crate::i18n::validation::name_length(MAX_MEMORY_NAME_LEN),
438 ));
439 }
440 if name.starts_with("__") {
441 return Err(AppError::Validation(
442 crate::i18n::validation::reserved_name(),
443 ));
444 }
445 {
446 let slug_re = crate::constants::name_slug_regex();
447 if !slug_re.is_match(name) {
448 return Err(AppError::Validation(crate::i18n::validation::name_kebab(
449 name,
450 )));
451 }
452 }
453
454 let file_size = std::fs::metadata(path).map_err(AppError::Io)?.len();
455 if file_size > MAX_MEMORY_BODY_LEN as u64 {
456 return Err(AppError::LimitExceeded(
457 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
458 ));
459 }
460 let raw_body = std::fs::read_to_string(path).map_err(AppError::Io)?;
461 if raw_body.len() > MAX_MEMORY_BODY_LEN {
462 return Err(AppError::LimitExceeded(
463 crate::i18n::validation::body_exceeds(MAX_MEMORY_BODY_LEN),
464 ));
465 }
466 if raw_body.trim().is_empty() {
467 return Err(AppError::Validation(crate::i18n::validation::empty_body()));
468 }
469
470 let description = format!("ingested from {}", path.display());
471 if description.len() > MAX_MEMORY_DESCRIPTION_LEN {
472 return Err(AppError::Validation(
473 crate::i18n::validation::description_exceeds(MAX_MEMORY_DESCRIPTION_LEN),
474 ));
475 }
476
477 let mut extracted_entities: Vec<NewEntity> = Vec::with_capacity(30);
478 let mut extracted_relationships: Vec<NewRelationship> = Vec::with_capacity(50);
479 let mut extracted_urls: Vec<crate::extraction::ExtractedUrl> = Vec::with_capacity(4);
480 if enable_ner {
481 match crate::extraction::extract_graph_auto(&raw_body, paths, gliner_variant) {
482 Ok(extracted) => {
483 extracted_urls = extracted.urls;
484 extracted_entities = extracted.entities;
485 extracted_relationships = extracted.relationships;
486
487 if extracted_entities.len() > max_entities_per_memory() {
488 extracted_entities.truncate(max_entities_per_memory());
489 }
490 if extracted_relationships.len() > max_relationships_per_memory() {
491 extracted_relationships.truncate(max_relationships_per_memory());
492 }
493 }
494 Err(e) => {
495 tracing::warn!(
496 target: "ingest",
497 file = %path.display(),
498 "auto-extraction failed (graceful degradation): {e:#}"
499 );
500 }
501 }
502 }
503
504 for rel in &mut extracted_relationships {
505 rel.relation = crate::parsers::normalize_relation(&rel.relation);
506 if let Err(e) = crate::parsers::validate_relation_format(&rel.relation) {
507 return Err(AppError::Validation(format!(
508 "{e} for relationship '{}' -> '{}'",
509 rel.source, rel.target
510 )));
511 }
512 crate::parsers::warn_if_non_canonical(&rel.relation);
513 if !(0.0..=1.0).contains(&rel.strength) {
514 return Err(AppError::Validation(format!(
515 "invalid strength {} for relationship '{}' -> '{}'; expected value in [0.0, 1.0]",
516 rel.strength, rel.source, rel.target
517 )));
518 }
519 }
520
521 let body_hash = blake3::hash(raw_body.as_bytes()).to_hex().to_string();
522 let snippet: String = raw_body.chars().take(200).collect();
523
524 let tokenizer = crate::tokenizer::get_tokenizer(&paths.models)?;
525 let chunks_info = chunking::split_into_chunks_hierarchical(&raw_body, tokenizer);
526 if chunks_info.len() > REMEMBER_MAX_SAFE_MULTI_CHUNKS {
527 return Err(AppError::LimitExceeded(format!(
528 "document produces {} chunks; current safe operational limit is {} chunks; split the document before using remember",
529 chunks_info.len(),
530 REMEMBER_MAX_SAFE_MULTI_CHUNKS
531 )));
532 }
533
534 let mut chunk_embeddings_opt: Option<Vec<Vec<f32>>> = None;
535 let embedding = if chunks_info.len() == 1 {
536 crate::daemon::embed_passage_or_local(&paths.models, &raw_body)?
537 } else {
538 let chunk_texts: Vec<&str> = chunks_info
539 .iter()
540 .map(|c| chunking::chunk_text(&raw_body, c))
541 .collect();
542 let embed_cap = chunk_texts.len();
543 let mut chunk_embeddings = Vec::new();
544 chunk_embeddings.try_reserve(embed_cap).map_err(|_| {
545 AppError::LimitExceeded(format!(
546 "allocation of {embed_cap} chunk embeddings would exceed available memory"
547 ))
548 })?;
549 for chunk_text in &chunk_texts {
550 if let Some(rss) = crate::memory_guard::current_process_memory_mb() {
551 if rss > max_rss_mb {
552 tracing::error!(
553 target: "ingest",
554 rss_mb = rss,
555 max_rss_mb = max_rss_mb,
556 file = %path.display(),
557 "RSS exceeded --max-rss-mb threshold; aborting to prevent system instability"
558 );
559 return Err(AppError::LowMemory {
560 available_mb: crate::memory_guard::available_memory_mb(),
561 required_mb: max_rss_mb,
562 });
563 }
564 }
565 chunk_embeddings.push(crate::daemon::embed_passage_or_local(
566 &paths.models,
567 chunk_text,
568 )?);
569 }
570 let aggregated = chunking::aggregate_embeddings(&chunk_embeddings);
571 chunk_embeddings_opt = Some(chunk_embeddings);
572 aggregated
573 };
574
575 let entity_embeddings = extracted_entities
576 .iter()
577 .map(|entity| {
578 let entity_text = match &entity.description {
579 Some(desc) => format!("{} {}", entity.name, desc),
580 None => entity.name.clone(),
581 };
582 crate::daemon::embed_passage_or_local(&paths.models, &entity_text)
583 })
584 .collect::<Result<Vec<_>, _>>()?;
585
586 Ok(StagedFile {
587 body: raw_body,
588 body_hash,
589 snippet,
590 name: name.to_string(),
591 description,
592 embedding,
593 chunk_embeddings: chunk_embeddings_opt,
594 chunks_info,
595 entities: extracted_entities,
596 relationships: extracted_relationships,
597 entity_embeddings,
598 urls: extracted_urls,
599 })
600}
601
602fn persist_staged(
604 conn: &mut Connection,
605 namespace: &str,
606 memory_type: &str,
607 staged: StagedFile,
608) -> Result<FileSuccess, AppError> {
609 {
610 let active_count: u32 = conn.query_row(
611 "SELECT COUNT(DISTINCT namespace) FROM memories WHERE deleted_at IS NULL",
612 [],
613 |r| r.get::<_, i64>(0).map(|v| v as u32),
614 )?;
615 let ns_exists: bool = conn.query_row(
616 "SELECT EXISTS(SELECT 1 FROM memories WHERE namespace = ?1 AND deleted_at IS NULL)",
617 rusqlite::params![namespace],
618 |r| r.get::<_, i64>(0).map(|v| v > 0),
619 )?;
620 if !ns_exists && active_count >= crate::constants::MAX_NAMESPACES_ACTIVE {
621 return Err(AppError::NamespaceError(format!(
622 "active namespace limit of {} exceeded while creating '{namespace}'",
623 crate::constants::MAX_NAMESPACES_ACTIVE
624 )));
625 }
626 }
627
628 let existing_memory = memories::find_by_name(conn, namespace, &staged.name)?;
629 if existing_memory.is_some() {
630 return Err(AppError::Duplicate(errors_msg::duplicate_memory(
631 &staged.name,
632 namespace,
633 )));
634 }
635 let duplicate_hash_id = memories::find_by_hash(conn, namespace, &staged.body_hash)?;
636
637 let new_memory = NewMemory {
638 namespace: namespace.to_string(),
639 name: staged.name.clone(),
640 memory_type: memory_type.to_string(),
641 description: staged.description.clone(),
642 body: staged.body,
643 body_hash: staged.body_hash,
644 session_id: None,
645 source: "agent".to_string(),
646 metadata: serde_json::json!({}),
647 };
648
649 if let Some(hash_id) = duplicate_hash_id {
650 tracing::debug!(
651 target: "ingest",
652 duplicate_memory_id = hash_id,
653 "identical body already exists; persisting a new memory anyway"
654 );
655 }
656
657 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
658
659 let memory_id = memories::insert(&tx, &new_memory)?;
660 versions::insert_version(
661 &tx,
662 memory_id,
663 1,
664 &staged.name,
665 memory_type,
666 &staged.description,
667 &new_memory.body,
668 &serde_json::to_string(&new_memory.metadata)?,
669 None,
670 "create",
671 )?;
672 memories::upsert_vec(
673 &tx,
674 memory_id,
675 namespace,
676 memory_type,
677 &staged.embedding,
678 &staged.name,
679 &staged.snippet,
680 )?;
681
682 if staged.chunks_info.len() > 1 {
683 storage_chunks::insert_chunk_slices(&tx, memory_id, &new_memory.body, &staged.chunks_info)?;
684 let chunk_embeddings = staged.chunk_embeddings.ok_or_else(|| {
685 AppError::Internal(anyhow::anyhow!(
686 "missing chunk embeddings cache on multi-chunk ingest path"
687 ))
688 })?;
689 for (i, emb) in chunk_embeddings.iter().enumerate() {
690 storage_chunks::upsert_chunk_vec(&tx, i as i64, memory_id, i as i32, emb)?;
691 }
692 }
693
694 if !staged.entities.is_empty() || !staged.relationships.is_empty() {
695 for (idx, entity) in staged.entities.iter().enumerate() {
696 let entity_id = entities::upsert_entity(&tx, namespace, entity)?;
697 let entity_embedding = &staged.entity_embeddings[idx];
698 entities::upsert_entity_vec(
699 &tx,
700 entity_id,
701 namespace,
702 entity.entity_type,
703 entity_embedding,
704 &entity.name,
705 )?;
706 entities::link_memory_entity(&tx, memory_id, entity_id)?;
707 entities::increment_degree(&tx, entity_id)?;
708 }
709 let entity_types: std::collections::HashMap<&str, EntityType> = staged
710 .entities
711 .iter()
712 .map(|entity| (entity.name.as_str(), entity.entity_type))
713 .collect();
714 for rel in &staged.relationships {
715 let source_entity = NewEntity {
716 name: rel.source.clone(),
717 entity_type: entity_types
718 .get(rel.source.as_str())
719 .copied()
720 .unwrap_or(EntityType::Concept),
721 description: None,
722 };
723 let target_entity = NewEntity {
724 name: rel.target.clone(),
725 entity_type: entity_types
726 .get(rel.target.as_str())
727 .copied()
728 .unwrap_or(EntityType::Concept),
729 description: None,
730 };
731 let source_id = entities::upsert_entity(&tx, namespace, &source_entity)?;
732 let target_id = entities::upsert_entity(&tx, namespace, &target_entity)?;
733 let rel_id = entities::upsert_relationship(&tx, namespace, source_id, target_id, rel)?;
734 entities::link_memory_relationship(&tx, memory_id, rel_id)?;
735 }
736 }
737
738 tx.commit()?;
739
740 if !staged.urls.is_empty() {
741 let url_entries: Vec<storage_urls::MemoryUrl> = staged
742 .urls
743 .into_iter()
744 .map(|u| storage_urls::MemoryUrl {
745 url: u.url,
746 offset: Some(u.offset as i64),
747 })
748 .collect();
749 let _ = storage_urls::insert_urls(conn, memory_id, &url_entries);
750 }
751
752 Ok(FileSuccess {
753 memory_id,
754 action: "created".to_string(),
755 body_length: new_memory.body.len(),
756 })
757}
758
759#[tracing::instrument(skip_all, level = "debug", name = "ingest")]
760pub fn run(args: IngestArgs) -> Result<(), AppError> {
761 tracing::debug!(target: "ingest", dir = %args.dir.display(), mode = ?args.mode, "starting ingest");
775 if args.mode == IngestMode::ClaudeCode {
776 return super::ingest_claude::run_claude_ingest(&args);
777 }
778 if args.mode == IngestMode::Codex {
779 return super::ingest_codex::run_codex_ingest(&args);
780 }
781
782 let started = std::time::Instant::now();
783
784 if !args.dir.exists() {
785 return Err(AppError::Validation(format!(
786 "directory not found: {}",
787 args.dir.display()
788 )));
789 }
790 if !args.dir.is_dir() {
791 return Err(AppError::Validation(format!(
792 "path is not a directory: {}",
793 args.dir.display()
794 )));
795 }
796
797 let mut files: Vec<PathBuf> = Vec::with_capacity(128);
798 collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
799 files.sort_unstable();
800
801 if files.len() > args.max_files {
802 return Err(AppError::Validation(format!(
803 "found {} files matching pattern, exceeds --max-files cap of {} (raise the cap or narrow the pattern)",
804 files.len(),
805 args.max_files
806 )));
807 }
808
809 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
810 let memory_type_str = args.r#type.as_str().to_string();
811
812 let paths = AppPaths::resolve(args.db.as_deref())?;
813 let mut conn_or_err = match init_storage(&paths) {
814 Ok(c) => Ok(c),
815 Err(e) => Err(format!("{e}")),
816 };
817
818 let mut succeeded: usize = 0;
819 let mut failed: usize = 0;
820 let mut skipped: usize = 0;
821 let total = files.len();
822
823 let mut taken_names: BTreeSet<String> = BTreeSet::new();
826
827 enum SlotMeta {
833 Skip {
834 file_str: String,
835 derived_base: String,
836 name_truncated: bool,
837 original_name: Option<String>,
838 original_filename: Option<String>,
839 reason: String,
840 },
841 Process {
842 file_str: String,
843 derived_name: String,
844 name_truncated: bool,
845 original_name: Option<String>,
846 original_filename: Option<String>,
847 },
848 }
849
850 struct ProcessItem {
851 idx: usize,
852 path: PathBuf,
853 file_str: String,
854 derived_name: String,
855 }
856
857 let files_cap = files.len();
858 let mut slots_meta: Vec<SlotMeta> = Vec::new();
859 slots_meta.try_reserve(files_cap).map_err(|_| {
860 AppError::LimitExceeded(format!(
861 "allocation of {files_cap} slot metadata entries would exceed available memory"
862 ))
863 })?;
864 let mut process_items: Vec<ProcessItem> = Vec::new();
865 process_items.try_reserve(files_cap).map_err(|_| {
866 AppError::LimitExceeded(format!(
867 "allocation of {files_cap} process items would exceed available memory"
868 ))
869 })?;
870 let mut truncations: Vec<(String, String)> = Vec::new();
871 truncations.try_reserve(files_cap).map_err(|_| {
872 AppError::LimitExceeded(format!(
873 "allocation of {files_cap} truncation entries would exceed available memory"
874 ))
875 })?;
876
877 let max_name_length = args.max_name_length;
878 for path in &files {
879 let file_str = path.to_string_lossy().into_owned();
880 let (derived_base, name_truncated, original_name) =
881 derive_kebab_name(path, max_name_length);
882 let original_basename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
883
884 if name_truncated {
885 if let Some(ref orig) = original_name {
886 truncations.push((orig.clone(), derived_base.clone()));
887 }
888 }
889
890 if derived_base.is_empty() {
891 let orig_filename = if !original_basename.is_empty() {
893 Some(original_basename.to_string())
894 } else {
895 None
896 };
897 slots_meta.push(SlotMeta::Skip {
898 file_str,
899 derived_base: String::new(),
900 name_truncated: false,
901 original_name: None,
902 original_filename: orig_filename,
903 reason: "could not derive a non-empty kebab-case name from filename".to_string(),
904 });
905 continue;
906 }
907
908 match unique_name(&derived_base, &taken_names) {
909 Ok(derived_name) => {
910 taken_names.insert(derived_name.clone());
911 let idx = slots_meta.len();
912 let orig_filename = if original_basename != derived_name {
914 Some(original_basename.to_string())
915 } else {
916 None
917 };
918 process_items.push(ProcessItem {
919 idx,
920 path: path.clone(),
921 file_str: file_str.clone(),
922 derived_name: derived_name.clone(),
923 });
924 slots_meta.push(SlotMeta::Process {
925 file_str,
926 derived_name,
927 name_truncated,
928 original_name,
929 original_filename: orig_filename,
930 });
931 }
932 Err(e) => {
933 let orig_filename = if original_basename != derived_base {
934 Some(original_basename.to_string())
935 } else {
936 None
937 };
938 slots_meta.push(SlotMeta::Skip {
939 file_str,
940 derived_base,
941 name_truncated,
942 original_name,
943 original_filename: orig_filename,
944 reason: e.to_string(),
945 });
946 }
947 }
948 }
949
950 if !truncations.is_empty() {
951 tracing::info!(
952 target: "ingest",
953 count = truncations.len(),
954 max_name_length = max_name_length,
955 max_len = DERIVED_NAME_MAX_LEN,
956 "derived names truncated; pass -vv (debug) for per-file detail"
957 );
958 }
959
960 if args.dry_run {
962 for meta in &slots_meta {
963 match meta {
964 SlotMeta::Skip {
965 file_str,
966 derived_base,
967 name_truncated,
968 original_name,
969 original_filename,
970 reason,
971 } => {
972 output::emit_json_compact(&IngestFileEvent {
973 file: file_str,
974 name: derived_base,
975 status: "skip",
976 truncated: *name_truncated,
977 original_name: original_name.clone(),
978 original_filename: original_filename.as_deref(),
979 error: Some(reason.clone()),
980 memory_id: None,
981 action: None,
982 body_length: 0,
983 })?;
984 }
985 SlotMeta::Process {
986 file_str,
987 derived_name,
988 name_truncated,
989 original_name,
990 original_filename,
991 } => {
992 output::emit_json_compact(&IngestFileEvent {
993 file: file_str,
994 name: derived_name,
995 status: "preview",
996 truncated: *name_truncated,
997 original_name: original_name.clone(),
998 original_filename: original_filename.as_deref(),
999 error: None,
1000 memory_id: None,
1001 action: None,
1002 body_length: 0,
1003 })?;
1004 }
1005 }
1006 }
1007 output::emit_json_compact(&IngestSummary {
1008 summary: true,
1009 dir: args.dir.to_string_lossy().into_owned(),
1010 pattern: args.pattern.clone(),
1011 recursive: args.recursive,
1012 files_total: total,
1013 files_succeeded: 0,
1014 files_failed: 0,
1015 files_skipped: 0,
1016 elapsed_ms: started.elapsed().as_millis() as u64,
1017 })?;
1018 return Ok(());
1019 }
1020
1021 if args.low_memory {
1023 if let Some(n) = args.ingest_parallelism {
1024 if n > 1 {
1025 return Err(AppError::Validation(
1026 "--ingest-parallelism N>1 conflicts with --low-memory; use one or the other"
1027 .to_string(),
1028 ));
1029 }
1030 }
1031 }
1032
1033 let parallelism = resolve_parallelism(args.low_memory, args.ingest_parallelism);
1036
1037 let pool = rayon::ThreadPoolBuilder::new()
1038 .num_threads(parallelism)
1039 .build()
1040 .map_err(|e| AppError::Internal(anyhow::anyhow!("rayon pool: {e}")))?;
1041
1042 if args.enable_ner && args.skip_extraction {
1043 return Err(AppError::Validation(
1044 "--enable-ner and --skip-extraction are mutually exclusive; remove one".to_string(),
1045 ));
1046 }
1047 if args.skip_extraction && !args.enable_ner {
1048 return Err(AppError::Validation(
1049 "--skip-extraction is deprecated since v1.0.45 and has no effect; remove this flag"
1050 .to_string(),
1051 ));
1052 }
1053 let enable_ner = args.enable_ner;
1054 let max_rss_mb = args.max_rss_mb;
1055 let gliner_variant: crate::extraction::GlinerVariant =
1056 args.gliner_variant.parse().unwrap_or_else(|e| {
1057 tracing::warn!(target: "ingest", error = %e, "invalid --gliner-variant, defaulting to fp32");
1058 crate::extraction::GlinerVariant::Fp32
1059 });
1060
1061 let total_to_process = process_items.len();
1062 tracing::info!(
1063 target: "ingest",
1064 phase = "pipeline_start",
1065 files = total_to_process,
1066 ingest_parallelism = parallelism,
1067 "incremental pipeline starting: Phase A (rayon) → channel → Phase B (main thread)",
1068 );
1069
1070 let channel_bound = (parallelism * 2).max(1);
1074 let (tx, rx) = mpsc::sync_channel::<(usize, Result<StagedFile, AppError>)>(channel_bound);
1075
1076 let paths_owned = paths.clone();
1081 let producer_handle = std::thread::spawn(move || {
1082 pool.install(|| {
1083 process_items.into_par_iter().for_each(|item| {
1084 if crate::shutdown_requested() {
1085 return;
1086 }
1087 let t0 = std::time::Instant::now();
1088 let result = stage_file(
1089 item.idx,
1090 &item.path,
1091 &item.derived_name,
1092 &paths_owned,
1093 enable_ner,
1094 gliner_variant,
1095 max_rss_mb,
1096 );
1097 let elapsed_ms = t0.elapsed().as_millis() as u64;
1098
1099 let (n_entities, n_relationships) = match &result {
1102 Ok(sf) => (sf.entities.len(), sf.relationships.len()),
1103 Err(_) => (0, 0),
1104 };
1105 let progress = StageProgressEvent {
1106 schema_version: 1,
1107 event: "file_extracted",
1108 path: &item.file_str,
1109 ms: elapsed_ms,
1110 entities: n_entities,
1111 relationships: n_relationships,
1112 };
1113 if let Ok(line) = serde_json::to_string(&progress) {
1114 tracing::info!(target: "ingest_progress", "{}", line);
1115 }
1116
1117 let _ = tx.send((item.idx, result));
1121 });
1122 drop(tx);
1124 });
1125 });
1126
1127 let fail_fast = args.fail_fast;
1139
1140 for meta in &slots_meta {
1142 if let SlotMeta::Skip {
1143 file_str,
1144 derived_base,
1145 name_truncated,
1146 original_name,
1147 original_filename,
1148 reason,
1149 } = meta
1150 {
1151 output::emit_json_compact(&IngestFileEvent {
1152 file: file_str,
1153 name: derived_base,
1154 status: "skipped",
1155 truncated: *name_truncated,
1156 original_name: original_name.clone(),
1157 original_filename: original_filename.as_deref(),
1158 error: Some(reason.clone()),
1159 memory_id: None,
1160 action: None,
1161 body_length: 0,
1162 })?;
1163 skipped += 1;
1164 }
1165 }
1166
1167 let meta_index: std::collections::HashMap<usize, &SlotMeta> = slots_meta
1170 .iter()
1171 .enumerate()
1172 .filter(|(_, m)| matches!(m, SlotMeta::Process { .. }))
1173 .collect();
1174
1175 tracing::info!(
1176 target: "ingest",
1177 phase = "persist_start",
1178 files = total_to_process,
1179 "phase B starting: persisting files incrementally as Phase A completes each one",
1180 );
1181
1182 for (idx, stage_result) in rx {
1186 if crate::shutdown_requested() {
1187 tracing::info!(target: "ingest", "shutdown requested, stopping persistence loop");
1188 break;
1189 }
1190 let meta = meta_index.get(&idx).ok_or_else(|| {
1191 AppError::Internal(anyhow::anyhow!(
1192 "channel idx {idx} has no corresponding Process slot"
1193 ))
1194 })?;
1195 let (file_str, derived_name, name_truncated, original_name, original_filename) = match meta
1196 {
1197 SlotMeta::Process {
1198 file_str,
1199 derived_name,
1200 name_truncated,
1201 original_name,
1202 original_filename,
1203 } => (
1204 file_str,
1205 derived_name,
1206 name_truncated,
1207 original_name,
1208 original_filename,
1209 ),
1210 SlotMeta::Skip { .. } => unreachable!("channel only carries Process results"),
1211 };
1212
1213 let conn = match conn_or_err.as_mut() {
1215 Ok(c) => c,
1216 Err(err_msg) => {
1217 let err_clone = err_msg.clone();
1218 output::emit_json_compact(&IngestFileEvent {
1219 file: file_str,
1220 name: derived_name,
1221 status: "failed",
1222 truncated: *name_truncated,
1223 original_name: original_name.clone(),
1224 original_filename: original_filename.as_deref(),
1225 error: Some(err_clone.clone()),
1226 memory_id: None,
1227 action: None,
1228 body_length: 0,
1229 })?;
1230 failed += 1;
1231 if fail_fast {
1232 output::emit_json_compact(&IngestSummary {
1233 summary: true,
1234 dir: args.dir.display().to_string(),
1235 pattern: args.pattern.clone(),
1236 recursive: args.recursive,
1237 files_total: total,
1238 files_succeeded: succeeded,
1239 files_failed: failed,
1240 files_skipped: skipped,
1241 elapsed_ms: started.elapsed().as_millis() as u64,
1242 })?;
1243 return Err(AppError::Validation(format!(
1244 "ingest aborted on first failure: {err_clone}"
1245 )));
1246 }
1247 continue;
1248 }
1249 };
1250
1251 let outcome =
1252 stage_result.and_then(|sf| persist_staged(conn, &namespace, &memory_type_str, sf));
1253
1254 match outcome {
1255 Ok(FileSuccess {
1256 memory_id,
1257 action,
1258 body_length,
1259 }) => {
1260 output::emit_json_compact(&IngestFileEvent {
1261 file: file_str,
1262 name: derived_name,
1263 status: "indexed",
1264 truncated: *name_truncated,
1265 original_name: original_name.clone(),
1266 original_filename: original_filename.as_deref(),
1267 error: None,
1268 memory_id: Some(memory_id),
1269 action: Some(action),
1270 body_length,
1271 })?;
1272 succeeded += 1;
1273 }
1274 Err(ref e) if matches!(e, AppError::Duplicate(_)) => {
1275 output::emit_json_compact(&IngestFileEvent {
1276 file: file_str,
1277 name: derived_name,
1278 status: "skipped",
1279 truncated: *name_truncated,
1280 original_name: original_name.clone(),
1281 original_filename: original_filename.as_deref(),
1282 error: Some(format!("{e}")),
1283 memory_id: None,
1284 action: Some("duplicate".to_string()),
1285 body_length: 0,
1286 })?;
1287 skipped += 1;
1288 }
1289 Err(e) => {
1290 let err_msg = format!("{e}");
1291 output::emit_json_compact(&IngestFileEvent {
1292 file: file_str,
1293 name: derived_name,
1294 status: "failed",
1295 truncated: *name_truncated,
1296 original_name: original_name.clone(),
1297 original_filename: original_filename.as_deref(),
1298 error: Some(err_msg.clone()),
1299 memory_id: None,
1300 action: None,
1301 body_length: 0,
1302 })?;
1303 failed += 1;
1304 if fail_fast {
1305 output::emit_json_compact(&IngestSummary {
1306 summary: true,
1307 dir: args.dir.display().to_string(),
1308 pattern: args.pattern.clone(),
1309 recursive: args.recursive,
1310 files_total: total,
1311 files_succeeded: succeeded,
1312 files_failed: failed,
1313 files_skipped: skipped,
1314 elapsed_ms: started.elapsed().as_millis() as u64,
1315 })?;
1316 return Err(AppError::Validation(format!(
1317 "ingest aborted on first failure: {err_msg}"
1318 )));
1319 }
1320 }
1321 }
1322 }
1323
1324 producer_handle
1326 .join()
1327 .map_err(|_| AppError::Internal(anyhow::anyhow!("ingest producer thread panicked")))?;
1328
1329 if let Ok(ref conn) = conn_or_err {
1330 if succeeded > 0 {
1331 let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
1332 }
1333 }
1334
1335 output::emit_json_compact(&IngestSummary {
1336 summary: true,
1337 dir: args.dir.display().to_string(),
1338 pattern: args.pattern.clone(),
1339 recursive: args.recursive,
1340 files_total: total,
1341 files_succeeded: succeeded,
1342 files_failed: failed,
1343 files_skipped: skipped,
1344 elapsed_ms: started.elapsed().as_millis() as u64,
1345 })?;
1346
1347 Ok(())
1348}
1349
1350fn init_storage(paths: &AppPaths) -> Result<Connection, AppError> {
1356 ensure_db_ready(paths)?;
1357 let conn = open_rw(&paths.db)?;
1358 Ok(conn)
1359}
1360
1361pub(crate) fn collect_files(
1362 dir: &Path,
1363 pattern: &str,
1364 recursive: bool,
1365 out: &mut Vec<PathBuf>,
1366) -> Result<(), AppError> {
1367 let entries = std::fs::read_dir(dir).map_err(AppError::Io)?;
1368 for entry in entries {
1369 let entry = entry.map_err(AppError::Io)?;
1370 let path = entry.path();
1371 let file_type = entry.file_type().map_err(AppError::Io)?;
1372 if file_type.is_file() {
1373 let name = entry.file_name();
1374 let name_str = name.to_string_lossy();
1375 if matches_pattern(&name_str, pattern) {
1376 out.push(path);
1377 }
1378 } else if file_type.is_dir() && recursive {
1379 collect_files(&path, pattern, recursive, out)?;
1380 }
1381 }
1382 Ok(())
1383}
1384
1385fn matches_pattern(name: &str, pattern: &str) -> bool {
1386 if let Some(suffix) = pattern.strip_prefix('*') {
1387 name.ends_with(suffix)
1388 } else if let Some(prefix) = pattern.strip_suffix('*') {
1389 name.starts_with(prefix)
1390 } else {
1391 name == pattern
1392 }
1393}
1394
1395pub(crate) fn derive_kebab_name(path: &Path, max_len: usize) -> (String, bool, Option<String>) {
1406 let stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
1407 let lowered: String = stem
1408 .nfd()
1409 .filter(|c| !unicode_normalization::char::is_combining_mark(*c))
1410 .map(|c| {
1411 if c == '_' || c.is_whitespace() {
1412 '-'
1413 } else {
1414 c
1415 }
1416 })
1417 .map(|c| c.to_ascii_lowercase())
1418 .filter(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-')
1419 .collect();
1420 let collapsed = collapse_dashes(&lowered);
1421 let trimmed_raw = collapsed.trim_matches('-').to_string();
1422 let trimmed = if trimmed_raw.starts_with(|c: char| c.is_ascii_digit()) {
1424 format!("doc-{trimmed_raw}")
1425 } else {
1426 trimmed_raw
1427 };
1428 if trimmed.len() > max_len {
1429 let truncated = trimmed[..max_len].trim_matches('-').to_string();
1430 tracing::debug!(
1431 target: "ingest",
1432 original = %trimmed,
1433 truncated_to = %truncated,
1434 max_len = max_len,
1435 "derived memory name truncated to fit length cap; collisions will be resolved with numeric suffixes"
1436 );
1437 (truncated, true, Some(trimmed))
1438 } else {
1439 (trimmed, false, None)
1440 }
1441}
1442
1443fn unique_name(base: &str, taken: &BTreeSet<String>) -> Result<String, AppError> {
1456 if !taken.contains(base) {
1457 return Ok(base.to_string());
1458 }
1459 for suffix in 1..=MAX_NAME_COLLISION_SUFFIX {
1460 let candidate = format!("{base}-{suffix}");
1461 if !taken.contains(&candidate) {
1462 tracing::warn!(
1463 target: "ingest",
1464 base = %base,
1465 resolved = %candidate,
1466 suffix,
1467 "memory name collision resolved with numeric suffix"
1468 );
1469 return Ok(candidate);
1470 }
1471 }
1472 Err(AppError::Validation(format!(
1473 "too many name collisions for base '{base}' (>{MAX_NAME_COLLISION_SUFFIX}); rename source files to disambiguate"
1474 )))
1475}
1476
1477fn collapse_dashes(s: &str) -> String {
1478 let mut out = String::with_capacity(s.len());
1479 let mut prev_dash = false;
1480 for c in s.chars() {
1481 if c == '-' {
1482 if !prev_dash {
1483 out.push('-');
1484 }
1485 prev_dash = true;
1486 } else {
1487 out.push(c);
1488 prev_dash = false;
1489 }
1490 }
1491 out
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496 use super::*;
1497 use std::path::PathBuf;
1498
1499 #[test]
1500 fn matches_pattern_suffix() {
1501 assert!(matches_pattern("foo.md", "*.md"));
1502 assert!(!matches_pattern("foo.txt", "*.md"));
1503 assert!(matches_pattern("foo.md", "*"));
1504 }
1505
1506 #[test]
1507 fn matches_pattern_prefix() {
1508 assert!(matches_pattern("README.md", "README*"));
1509 assert!(!matches_pattern("CHANGELOG.md", "README*"));
1510 }
1511
1512 #[test]
1513 fn matches_pattern_exact() {
1514 assert!(matches_pattern("README.md", "README.md"));
1515 assert!(!matches_pattern("readme.md", "README.md"));
1516 }
1517
1518 #[test]
1519 fn derive_kebab_underscore_to_dash() {
1520 let p = PathBuf::from("/tmp/claude_code_headless.md");
1521 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1522 assert_eq!(name, "claude-code-headless");
1523 assert!(!truncated);
1524 assert!(original.is_none());
1525 }
1526
1527 #[test]
1528 fn derive_kebab_uppercase_lowered() {
1529 let p = PathBuf::from("/tmp/README.md");
1530 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1531 assert_eq!(name, "readme");
1532 assert!(!truncated);
1533 assert!(original.is_none());
1534 }
1535
1536 #[test]
1537 fn derive_kebab_strips_non_kebab_chars() {
1538 let p = PathBuf::from("/tmp/some@weird#name!.md");
1539 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1540 assert_eq!(name, "someweirdname");
1541 assert!(!truncated);
1542 assert!(original.is_none());
1543 }
1544
1545 #[test]
1548 fn derive_kebab_folds_accented_letters_to_ascii() {
1549 let p = PathBuf::from("/tmp/açaí.md");
1550 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1551 assert_eq!(name, "acai", "got '{name}'");
1552 }
1553
1554 #[test]
1555 fn derive_kebab_handles_naive_with_diaeresis() {
1556 let p = PathBuf::from("/tmp/naïve-test.md");
1557 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1558 assert_eq!(name, "naive-test", "got '{name}'");
1559 }
1560
1561 #[test]
1562 fn derive_kebab_drops_emoji_keeps_word() {
1563 let p = PathBuf::from("/tmp/🚀-rocket.md");
1564 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1565 assert_eq!(name, "rocket", "got '{name}'");
1566 }
1567
1568 #[test]
1569 fn derive_kebab_mixed_unicode_emoji_keeps_letters() {
1570 let p = PathBuf::from("/tmp/açaí🦜.md");
1571 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1572 assert_eq!(name, "acai", "got '{name}'");
1573 }
1574
1575 #[test]
1576 fn derive_kebab_pure_emoji_yields_empty() {
1577 let p = PathBuf::from("/tmp/🦜🚀🌟.md");
1578 let (name, _, _) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1579 assert!(name.is_empty(), "got '{name}'");
1580 }
1581
1582 #[test]
1583 fn derive_kebab_collapses_consecutive_dashes() {
1584 let p = PathBuf::from("/tmp/a__b___c.md");
1585 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1586 assert_eq!(name, "a-b-c");
1587 assert!(!truncated);
1588 assert!(original.is_none());
1589 }
1590
1591 #[test]
1592 fn derive_kebab_truncates_to_60_chars() {
1593 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(80)));
1594 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1595 assert!(name.len() <= 60, "got len {}", name.len());
1596 assert!(truncated);
1597 assert!(original.is_some());
1598 assert!(original.unwrap().len() > 60);
1599 }
1600
1601 #[test]
1602 fn collect_files_finds_md_files() {
1603 let tmp = tempfile::tempdir().expect("tempdir");
1604 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1605 std::fs::write(tmp.path().join("b.md"), "y").unwrap();
1606 std::fs::write(tmp.path().join("c.txt"), "z").unwrap();
1607 let mut out = Vec::new();
1608 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1609 assert_eq!(out.len(), 2, "should find 2 .md files, got {out:?}");
1610 }
1611
1612 #[test]
1613 fn collect_files_recursive_descends_subdirs() {
1614 let tmp = tempfile::tempdir().expect("tempdir");
1615 let sub = tmp.path().join("sub");
1616 std::fs::create_dir(&sub).unwrap();
1617 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1618 std::fs::write(sub.join("b.md"), "y").unwrap();
1619 let mut out = Vec::new();
1620 collect_files(tmp.path(), "*.md", true, &mut out).expect("collect");
1621 assert_eq!(out.len(), 2);
1622 }
1623
1624 #[test]
1625 fn collect_files_non_recursive_skips_subdirs() {
1626 let tmp = tempfile::tempdir().expect("tempdir");
1627 let sub = tmp.path().join("sub");
1628 std::fs::create_dir(&sub).unwrap();
1629 std::fs::write(tmp.path().join("a.md"), "x").unwrap();
1630 std::fs::write(sub.join("b.md"), "y").unwrap();
1631 let mut out = Vec::new();
1632 collect_files(tmp.path(), "*.md", false, &mut out).expect("collect");
1633 assert_eq!(out.len(), 1);
1634 }
1635
1636 #[test]
1639 fn derive_kebab_long_basename_truncated_within_cap() {
1640 let p = PathBuf::from(format!("/tmp/{}.md", "a".repeat(120)));
1641 let (name, truncated, original) = derive_kebab_name(&p, DERIVED_NAME_MAX_LEN);
1642 assert!(
1643 name.len() <= DERIVED_NAME_MAX_LEN,
1644 "truncated name must respect cap; got {} chars",
1645 name.len()
1646 );
1647 assert!(!name.is_empty());
1648 assert!(truncated);
1649 assert!(original.is_some());
1650 }
1651
1652 #[test]
1653 fn unique_name_returns_base_when_free() {
1654 let taken: BTreeSet<String> = BTreeSet::new();
1655 let resolved = unique_name("note", &taken).expect("must resolve");
1656 assert_eq!(resolved, "note");
1657 }
1658
1659 #[test]
1660 fn unique_name_appends_first_free_suffix_on_collision() {
1661 let mut taken: BTreeSet<String> = BTreeSet::new();
1662 taken.insert("note".to_string());
1663 taken.insert("note-1".to_string());
1664 let resolved = unique_name("note", &taken).expect("must resolve");
1665 assert_eq!(resolved, "note-2");
1666 }
1667
1668 #[test]
1669 fn unique_name_errors_after_collision_cap() {
1670 let mut taken: BTreeSet<String> = BTreeSet::new();
1671 taken.insert("note".to_string());
1672 for i in 1..=MAX_NAME_COLLISION_SUFFIX {
1673 taken.insert(format!("note-{i}"));
1674 }
1675 let err = unique_name("note", &taken).expect_err("must surface error");
1676 assert!(matches!(err, AppError::Validation(_)));
1677 }
1678
1679 #[test]
1682 fn validate_relation_format_accepts_valid_relations() {
1683 use crate::parsers::{is_canonical_relation, validate_relation_format};
1684 assert!(validate_relation_format("applies_to").is_ok());
1685 assert!(validate_relation_format("depends_on").is_ok());
1686 assert!(validate_relation_format("implements").is_ok());
1687 assert!(validate_relation_format("").is_err());
1688 assert!(is_canonical_relation("applies_to"));
1689 assert!(!is_canonical_relation("implements"));
1690 }
1691
1692 use serial_test::serial;
1695
1696 fn with_env_var<F: FnOnce()>(value: Option<&str>, f: F) {
1698 let key = "SQLITE_GRAPHRAG_LOW_MEMORY";
1699 let prev = std::env::var(key).ok();
1700 match value {
1701 Some(v) => std::env::set_var(key, v),
1702 None => std::env::remove_var(key),
1703 }
1704 f();
1705 match prev {
1706 Some(p) => std::env::set_var(key, p),
1707 None => std::env::remove_var(key),
1708 }
1709 }
1710
1711 #[test]
1712 #[serial]
1713 fn env_low_memory_enabled_unset_returns_false() {
1714 with_env_var(None, || assert!(!env_low_memory_enabled()));
1715 }
1716
1717 #[test]
1718 #[serial]
1719 fn env_low_memory_enabled_empty_returns_false() {
1720 with_env_var(Some(""), || assert!(!env_low_memory_enabled()));
1721 }
1722
1723 #[test]
1724 #[serial]
1725 fn env_low_memory_enabled_truthy_values_return_true() {
1726 for v in ["1", "true", "TRUE", "yes", "YES", "on", "On"] {
1727 with_env_var(Some(v), || {
1728 assert!(env_low_memory_enabled(), "value {v:?} should be truthy")
1729 });
1730 }
1731 }
1732
1733 #[test]
1734 #[serial]
1735 fn env_low_memory_enabled_falsy_values_return_false() {
1736 for v in ["0", "false", "FALSE", "no", "off"] {
1737 with_env_var(Some(v), || {
1738 assert!(!env_low_memory_enabled(), "value {v:?} should be falsy")
1739 });
1740 }
1741 }
1742
1743 #[test]
1744 #[serial]
1745 fn env_low_memory_enabled_unrecognized_value_returns_false() {
1746 with_env_var(Some("maybe"), || assert!(!env_low_memory_enabled()));
1747 }
1748
1749 #[test]
1750 #[serial]
1751 fn resolve_parallelism_flag_forces_one_overriding_explicit_value() {
1752 with_env_var(None, || {
1753 assert_eq!(resolve_parallelism(true, Some(4)), 1);
1754 assert_eq!(resolve_parallelism(true, Some(8)), 1);
1755 assert_eq!(resolve_parallelism(true, None), 1);
1756 });
1757 }
1758
1759 #[test]
1760 #[serial]
1761 fn resolve_parallelism_env_forces_one_when_flag_off() {
1762 with_env_var(Some("1"), || {
1763 assert_eq!(resolve_parallelism(false, Some(4)), 1);
1764 assert_eq!(resolve_parallelism(false, None), 1);
1765 });
1766 }
1767
1768 #[test]
1769 #[serial]
1770 fn resolve_parallelism_falsy_env_does_not_override() {
1771 with_env_var(Some("0"), || {
1772 assert_eq!(resolve_parallelism(false, Some(4)), 4);
1773 });
1774 }
1775
1776 #[test]
1777 #[serial]
1778 fn resolve_parallelism_explicit_value_when_low_memory_off() {
1779 with_env_var(None, || {
1780 assert_eq!(resolve_parallelism(false, Some(3)), 3);
1781 assert_eq!(resolve_parallelism(false, Some(1)), 1);
1782 });
1783 }
1784
1785 #[test]
1786 #[serial]
1787 fn resolve_parallelism_default_when_unset() {
1788 with_env_var(None, || {
1789 let p = resolve_parallelism(false, None);
1790 assert!((1..=4).contains(&p), "default must be in [1, 4]; got {p}");
1791 });
1792 }
1793
1794 #[test]
1795 fn ingest_args_parses_low_memory_flag_via_clap() {
1796 use clap::Parser;
1797 let cli = crate::cli::Cli::try_parse_from([
1800 "sqlite-graphrag",
1801 "ingest",
1802 "/tmp/dummy",
1803 "--type",
1804 "document",
1805 "--low-memory",
1806 ])
1807 .expect("parse must succeed");
1808 match cli.command {
1809 crate::cli::Commands::Ingest(args) => {
1810 assert!(args.low_memory, "--low-memory must set field to true");
1811 }
1812 _ => panic!("expected Ingest subcommand"),
1813 }
1814 }
1815
1816 #[test]
1817 fn ingest_args_low_memory_defaults_false() {
1818 use clap::Parser;
1819 let cli = crate::cli::Cli::try_parse_from([
1820 "sqlite-graphrag",
1821 "ingest",
1822 "/tmp/dummy",
1823 "--type",
1824 "document",
1825 ])
1826 .expect("parse must succeed");
1827 match cli.command {
1828 crate::cli::Commands::Ingest(args) => {
1829 assert!(!args.low_memory, "default must be false");
1830 }
1831 _ => panic!("expected Ingest subcommand"),
1832 }
1833 }
1834}