1use std::collections::{HashMap, HashSet};
22use std::fs;
23use std::path::Path;
24use std::time::Instant;
25
26use pulldown_cmark::{Event, HeadingLevel, Parser, Tag, TagEnd};
27use sha2::{Digest, Sha256};
28
29use crate::error::{EngramError, Result};
30use crate::storage::queries::{create_memory, list_memories};
31use crate::storage::Storage;
32use crate::types::{CreateMemoryInput, ListOptions, MemoryType};
33
34pub const DEFAULT_MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
36
37pub const DEFAULT_CHUNK_SIZE: usize = 1200;
39
40pub const DEFAULT_OVERLAP: usize = 200;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DocumentFormat {
46 Markdown,
47 Pdf,
48}
49
50impl DocumentFormat {
51 pub fn from_path(path: &Path) -> Option<Self> {
53 let ext = path.extension()?.to_str()?.to_lowercase();
54 match ext.as_str() {
55 "md" | "markdown" => Some(DocumentFormat::Markdown),
56 "pdf" => Some(DocumentFormat::Pdf),
57 _ => None,
58 }
59 }
60
61 #[allow(clippy::should_implement_trait)]
63 pub fn from_str(s: &str) -> Option<Self> {
64 match s.to_lowercase().as_str() {
65 "md" | "markdown" => Some(DocumentFormat::Markdown),
66 "pdf" => Some(DocumentFormat::Pdf),
67 "auto" => None, _ => None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct IngestConfig {
76 pub format: Option<DocumentFormat>,
78 pub chunk_size: usize,
80 pub overlap: usize,
82 pub max_file_size: u64,
84 pub extra_tags: Vec<String>,
86}
87
88impl Default for IngestConfig {
89 fn default() -> Self {
90 Self {
91 format: None,
92 chunk_size: DEFAULT_CHUNK_SIZE,
93 overlap: DEFAULT_OVERLAP,
94 max_file_size: DEFAULT_MAX_FILE_SIZE,
95 extra_tags: vec![],
96 }
97 }
98}
99
100#[derive(Debug, Clone, serde::Serialize)]
102pub struct IngestResult {
103 pub document_id: String,
105 pub chunks_created: usize,
107 pub chunks_skipped: usize,
109 pub chunks_total: usize,
111 pub duration_ms: u64,
113 pub warnings: Vec<String>,
115}
116
117#[derive(Debug, Clone)]
119pub struct DocumentSection {
120 pub section_path: String,
122 pub content: String,
124 pub page: Option<usize>,
126 pub level: Option<usize>,
128}
129
130#[derive(Debug, Clone)]
132pub struct DocumentChunk {
133 pub content: String,
135 pub source_path: String,
137 pub doc_id: String,
139 pub chunk_index: usize,
141 pub section_path: String,
143 pub page: Option<usize>,
145 pub chunk_hash: String,
147}
148
149pub struct DocumentIngestor<'a> {
151 storage: &'a Storage,
152}
153
154impl<'a> DocumentIngestor<'a> {
155 pub fn new(storage: &'a Storage) -> Self {
157 Self { storage }
158 }
159
160 pub fn ingest_file(
162 &self,
163 path: impl AsRef<Path>,
164 config: IngestConfig,
165 ) -> Result<IngestResult> {
166 let path = path.as_ref();
167 let start = Instant::now();
168 let mut warnings = Vec::new();
169
170 if config.chunk_size == 0 {
171 return Err(EngramError::InvalidInput(
172 "chunk_size must be greater than 0".to_string(),
173 ));
174 }
175
176 if config.overlap >= config.chunk_size {
177 return Err(EngramError::InvalidInput(
178 "overlap must be less than chunk_size".to_string(),
179 ));
180 }
181
182 if !path.exists() {
184 return Err(EngramError::InvalidInput(format!(
185 "File not found: {}",
186 path.display()
187 )));
188 }
189
190 let metadata = fs::metadata(path)
192 .map_err(|e| EngramError::Storage(format!("Failed to read file metadata: {}", e)))?;
193
194 if metadata.len() > config.max_file_size {
195 return Err(EngramError::InvalidInput(format!(
196 "File too large: {} bytes (max: {} bytes)",
197 metadata.len(),
198 config.max_file_size
199 )));
200 }
201
202 let format = config
204 .format
205 .or_else(|| DocumentFormat::from_path(path))
206 .ok_or_else(|| {
207 EngramError::InvalidInput(format!("Unknown file format for: {}", path.display()))
208 })?;
209
210 let content = fs::read(path)
212 .map_err(|e| EngramError::Storage(format!("Failed to read file: {}", e)))?;
213
214 let doc_id = compute_hash(&content);
216
217 let sections = match format {
219 DocumentFormat::Markdown => {
220 let text = String::from_utf8_lossy(&content);
221 extract_markdown_sections(&text)
222 }
223 DocumentFormat::Pdf => extract_pdf_sections(&content)
224 .map_err(|e| EngramError::InvalidInput(format!("PDF extraction failed: {}", e)))?,
225 };
226
227 if sections.is_empty() {
228 if matches!(format, DocumentFormat::Pdf) {
229 return Err(EngramError::InvalidInput(
230 "No text extracted from PDF".to_string(),
231 ));
232 }
233 warnings.push("No content extracted from document".to_string());
234 }
235
236 let source_path = path.to_string_lossy().to_string();
238 let chunks = create_chunks(sections, &source_path, &doc_id, &config);
239
240 let existing_hashes = self.existing_chunk_hashes(&doc_id)?;
242 let mut chunks_created = 0;
243 let mut chunks_skipped = 0;
244
245 for chunk in &chunks {
246 if existing_hashes.contains(&chunk.chunk_hash) {
247 chunks_skipped += 1;
248 continue;
249 }
250
251 self.create_chunk_memory(chunk, &config.extra_tags)?;
253 chunks_created += 1;
254 }
255
256 let duration_ms = start.elapsed().as_millis() as u64;
257
258 Ok(IngestResult {
259 document_id: doc_id,
260 chunks_created,
261 chunks_skipped,
262 chunks_total: chunks.len(),
263 duration_ms,
264 warnings,
265 })
266 }
267
268 fn existing_chunk_hashes(&self, doc_id: &str) -> Result<HashSet<String>> {
270 const PAGE_SIZE: i64 = 500;
271 self.storage.with_connection(|conn| {
272 let mut hashes = HashSet::new();
273 let mut offset = 0;
274
275 loop {
276 let mut filter = HashMap::new();
277 filter.insert("doc_id".to_string(), serde_json::json!(doc_id));
278
279 let options = ListOptions {
280 limit: Some(PAGE_SIZE),
281 offset: Some(offset),
282 tags: Some(vec!["document-chunk".to_string()]),
283 memory_type: None,
284 sort_by: None,
285 sort_order: None,
286 scope: None,
287 workspace: None,
288 workspaces: None,
289 tier: None,
290 metadata_filter: Some(filter),
291 filter: None,
292 include_archived: false,
293 };
294
295 let results = list_memories(conn, &options)?;
296 for memory in &results {
297 if let Some(hash) = memory.metadata.get("chunk_hash").and_then(|v| v.as_str()) {
298 hashes.insert(hash.to_string());
299 }
300 }
301
302 if results.len() < PAGE_SIZE as usize {
303 break;
304 }
305
306 offset += PAGE_SIZE;
307 }
308
309 Ok(hashes)
310 })
311 }
312
313 fn create_chunk_memory(&self, chunk: &DocumentChunk, extra_tags: &[String]) -> Result<()> {
315 let mut tags = vec!["document-chunk".to_string()];
316 tags.extend(extra_tags.iter().cloned());
317
318 let mut metadata = HashMap::new();
319 metadata.insert(
320 "source_file".to_string(),
321 serde_json::Value::String(
322 Path::new(&chunk.source_path)
323 .file_name()
324 .map(|n| n.to_string_lossy().to_string())
325 .unwrap_or_default(),
326 ),
327 );
328 metadata.insert(
329 "source_path".to_string(),
330 serde_json::Value::String(chunk.source_path.clone()),
331 );
332 metadata.insert(
333 "doc_id".to_string(),
334 serde_json::Value::String(chunk.doc_id.clone()),
335 );
336 metadata.insert(
337 "chunk_index".to_string(),
338 serde_json::Value::Number(chunk.chunk_index.into()),
339 );
340 metadata.insert(
341 "section_path".to_string(),
342 serde_json::Value::String(chunk.section_path.clone()),
343 );
344 metadata.insert(
345 "chunk_hash".to_string(),
346 serde_json::Value::String(chunk.chunk_hash.clone()),
347 );
348
349 if let Some(page) = chunk.page {
350 metadata.insert("page".to_string(), serde_json::Value::Number(page.into()));
351 }
352
353 let input = CreateMemoryInput {
354 content: chunk.content.clone(),
355 memory_type: MemoryType::Context,
356 tags,
357 metadata,
358 importance: Some(0.5),
359 scope: crate::types::MemoryScope::Global,
360 workspace: None,
361 tier: crate::types::MemoryTier::Permanent,
362 defer_embedding: false,
363 ttl_seconds: None,
364 dedup_mode: Default::default(),
365 dedup_threshold: None,
366 event_time: None,
367 event_duration_seconds: None,
368 trigger_pattern: None,
369 summary_of_id: None,
370 };
371
372 self.storage.with_connection(|conn| {
373 create_memory(conn, &input)?;
374 Ok(())
375 })
376 }
377}
378
379fn compute_hash(content: &[u8]) -> String {
381 let mut hasher = Sha256::new();
382 hasher.update(content);
383 format!("sha256:{}", hex::encode(hasher.finalize()))
384}
385
386fn extract_markdown_sections(content: &str) -> Vec<DocumentSection> {
388 let parser = Parser::new(content);
389 let mut sections = Vec::new();
390 let mut heading_stack: Vec<(usize, String)> = Vec::new();
391 let mut current_content = String::new();
392 let mut current_section_path = String::new();
393 let mut in_heading = false;
394 let mut current_heading_text = String::new();
395 let mut current_heading_level = 0usize;
396
397 for event in parser {
398 match event {
399 Event::Start(Tag::Heading { level, .. }) => {
400 if !current_content.trim().is_empty() {
402 sections.push(DocumentSection {
403 section_path: if current_section_path.is_empty() {
404 "Preamble".to_string()
405 } else {
406 current_section_path.clone()
407 },
408 content: current_content.trim().to_string(),
409 page: None,
410 level: if heading_stack.is_empty() {
411 None
412 } else {
413 Some(heading_stack.last().map(|(l, _)| *l).unwrap_or(1))
414 },
415 });
416 current_content.clear();
417 }
418
419 in_heading = true;
420 current_heading_text.clear();
421 current_heading_level = heading_level_to_usize(level);
422 }
423 Event::End(TagEnd::Heading(_)) => {
424 in_heading = false;
425
426 while !heading_stack.is_empty()
428 && heading_stack.last().map(|(l, _)| *l).unwrap_or(0) >= current_heading_level
429 {
430 heading_stack.pop();
431 }
432 heading_stack.push((current_heading_level, current_heading_text.clone()));
433
434 current_section_path = heading_stack
436 .iter()
437 .map(|(_, t)| t.as_str())
438 .collect::<Vec<_>>()
439 .join(" > ");
440 }
441 Event::Text(text) => {
442 if in_heading {
443 current_heading_text.push_str(&text);
444 } else {
445 current_content.push_str(&text);
446 }
447 }
448 Event::Code(code) => {
449 if in_heading {
450 current_heading_text.push_str(&code);
451 } else {
452 current_content.push('`');
453 current_content.push_str(&code);
454 current_content.push('`');
455 }
456 }
457 Event::SoftBreak | Event::HardBreak => {
458 if !in_heading {
459 current_content.push('\n');
460 }
461 }
462 _ => {}
463 }
464 }
465
466 if !current_content.trim().is_empty() {
468 sections.push(DocumentSection {
469 section_path: if current_section_path.is_empty() {
470 "Preamble".to_string()
471 } else {
472 current_section_path
473 },
474 content: current_content.trim().to_string(),
475 page: None,
476 level: heading_stack.last().map(|(l, _)| *l),
477 });
478 }
479
480 sections
481}
482
483fn heading_level_to_usize(level: HeadingLevel) -> usize {
485 match level {
486 HeadingLevel::H1 => 1,
487 HeadingLevel::H2 => 2,
488 HeadingLevel::H3 => 3,
489 HeadingLevel::H4 => 4,
490 HeadingLevel::H5 => 5,
491 HeadingLevel::H6 => 6,
492 }
493}
494
495#[cfg(feature = "pdf")]
499fn extract_pdf_sections(content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
500 let text = pdf_extract::extract_text_from_mem(content)
501 .map_err(|e| format!("PDF extraction failed: {}", e))?;
502
503 let pages: Vec<&str> = if text.contains('\x0C') {
506 text.split('\x0C').collect()
507 } else {
508 vec![&text]
509 };
510
511 let sections: Vec<DocumentSection> = pages
512 .iter()
513 .enumerate()
514 .filter(|(_, page_text)| !page_text.trim().is_empty())
515 .map(|(i, page_text)| DocumentSection {
516 section_path: format!("Page {}", i + 1),
517 content: page_text.trim().to_string(),
518 page: Some(i + 1),
519 level: None,
520 })
521 .collect();
522
523 Ok(sections)
524}
525
526#[cfg(not(feature = "pdf"))]
528fn extract_pdf_sections(_content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
529 Err("PDF extraction requires the 'pdf' feature to be enabled".to_string())
530}
531
532fn create_chunks(
534 sections: Vec<DocumentSection>,
535 source_path: &str,
536 doc_id: &str,
537 config: &IngestConfig,
538) -> Vec<DocumentChunk> {
539 let mut chunks = Vec::new();
540 let mut chunk_index = 0;
541
542 for section in sections {
543 let section_chunks = chunk_text(§ion.content, config.chunk_size, config.overlap);
544
545 for chunk_content in section_chunks {
546 let chunk_hash = compute_hash(chunk_content.as_bytes());
547
548 chunks.push(DocumentChunk {
549 content: chunk_content,
550 source_path: source_path.to_string(),
551 doc_id: doc_id.to_string(),
552 chunk_index,
553 section_path: section.section_path.clone(),
554 page: section.page,
555 chunk_hash,
556 });
557
558 chunk_index += 1;
559 }
560 }
561
562 chunks
563}
564
565fn chunk_text(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
567 if text.is_empty() {
568 return vec![];
569 }
570
571 if text.chars().count() <= chunk_size {
573 return vec![text.to_string()];
574 }
575
576 let mut chunks = Vec::new();
577 let chars: Vec<char> = text.chars().collect();
578 let mut start = 0;
579
580 while start < chars.len() {
581 let end = (start + chunk_size).min(chars.len());
582 let chunk: String = chars[start..end].iter().collect();
583
584 let chunk = if end < chars.len() {
586 if let Some(last_space) = chunk.rfind(|c: char| c.is_whitespace()) {
587 if last_space > chunk_size / 2 {
588 chunk[..last_space].to_string()
590 } else {
591 chunk
592 }
593 } else {
594 chunk
595 }
596 } else {
597 chunk
598 };
599
600 let chunk_char_count = chunk.chars().count();
601 chunks.push(chunk);
602
603 if start + chunk_char_count >= chars.len() {
605 break;
606 }
607
608 let step = chunk_char_count.saturating_sub(overlap);
609 start += if step == 0 { chunk_char_count } else { step };
610 }
611
612 chunks
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use std::fs;
619 use tempfile::tempdir;
620
621 #[test]
622 fn test_chunk_text_small() {
623 let text = "Hello world";
624 let chunks = chunk_text(text, 1200, 200);
625 assert_eq!(chunks.len(), 1);
626 assert_eq!(chunks[0], "Hello world");
627 }
628
629 #[test]
630 fn test_chunk_text_with_overlap() {
631 let text = "A".repeat(2500);
632 let chunks = chunk_text(&text, 1200, 200);
633 assert!(chunks.len() >= 2);
634 assert!(chunks[0].len() <= 1200);
636 }
637
638 #[test]
639 fn test_markdown_sections() {
640 let md = r#"# Title
641
642Introduction text.
643
644## Section 1
645
646Content for section 1.
647
648### Subsection 1.1
649
650Nested content.
651
652## Section 2
653
654Content for section 2.
655"#;
656 let sections = extract_markdown_sections(md);
657 assert!(sections.len() >= 3);
658
659 let title_section = sections.iter().find(|s| s.section_path == "Title");
661 assert!(title_section.is_some());
662
663 let nested = sections
665 .iter()
666 .find(|s| s.section_path.contains("Subsection"));
667 assert!(nested.is_some());
668 }
669
670 #[test]
671 fn test_compute_hash() {
672 let hash = compute_hash(b"test content");
673 assert!(hash.starts_with("sha256:"));
674 assert_eq!(hash.len(), 7 + 64); }
676
677 #[test]
678 fn test_document_format_detection() {
679 assert_eq!(
680 DocumentFormat::from_path(Path::new("doc.md")),
681 Some(DocumentFormat::Markdown)
682 );
683 assert_eq!(
684 DocumentFormat::from_path(Path::new("doc.pdf")),
685 Some(DocumentFormat::Pdf)
686 );
687 assert_eq!(DocumentFormat::from_path(Path::new("doc.txt")), None);
688 }
689
690 #[test]
691 fn test_ingest_config_default() {
692 let config = IngestConfig::default();
693 assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
694 assert_eq!(config.overlap, DEFAULT_OVERLAP);
695 assert_eq!(config.max_file_size, DEFAULT_MAX_FILE_SIZE);
696 }
697
698 #[test]
699 fn test_ingest_idempotent() {
700 let dir = tempdir().unwrap();
701 let file_path = dir.path().join("doc.md");
702 fs::write(&file_path, "# Title\n\nHello world.\n").unwrap();
703
704 let storage = Storage::open_in_memory().unwrap();
705 let ingestor = DocumentIngestor::new(&storage);
706
707 let first = ingestor
708 .ingest_file(&file_path, IngestConfig::default())
709 .unwrap();
710 assert!(first.chunks_created > 0);
711 assert_eq!(first.chunks_skipped, 0);
712
713 let second = ingestor
714 .ingest_file(&file_path, IngestConfig::default())
715 .unwrap();
716 assert_eq!(second.chunks_created, 0);
717 assert_eq!(second.chunks_skipped, first.chunks_total);
718 }
719
720 #[test]
721 fn test_invalid_chunk_size() {
722 let dir = tempdir().unwrap();
723 let file_path = dir.path().join("doc.md");
724 fs::write(&file_path, "Hello").unwrap();
725
726 let storage = Storage::open_in_memory().unwrap();
727 let ingestor = DocumentIngestor::new(&storage);
728
729 let config = IngestConfig {
730 chunk_size: 0,
731 ..Default::default()
732 };
733
734 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
735 assert!(err.to_string().contains("chunk_size"));
736 }
737
738 #[test]
739 fn test_invalid_overlap() {
740 let dir = tempdir().unwrap();
741 let file_path = dir.path().join("doc.md");
742 fs::write(&file_path, "Hello").unwrap();
743
744 let storage = Storage::open_in_memory().unwrap();
745 let ingestor = DocumentIngestor::new(&storage);
746
747 let config = IngestConfig {
748 chunk_size: 200,
749 overlap: 200,
750 ..Default::default()
751 };
752
753 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
754 assert!(err.to_string().contains("overlap"));
755 }
756
757 #[test]
758 fn test_pdf_empty_is_error() {
759 let dir = tempdir().unwrap();
760 let file_path = dir.path().join("empty.pdf");
761 fs::write(&file_path, b"").unwrap();
762
763 let storage = Storage::open_in_memory().unwrap();
764 let ingestor = DocumentIngestor::new(&storage);
765
766 let config = IngestConfig {
767 format: Some(DocumentFormat::Pdf),
768 ..Default::default()
769 };
770
771 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
772 assert!(err.to_string().contains("PDF"));
773 }
774}