1use std::collections::{HashMap, HashSet};
22use std::fs;
23use std::path::Path;
24use std::time::Instant;
25
26use pulldown_cmark::{Event, HeadingLevel, Parser, Tag, TagEnd};
27use sha2::{Digest, Sha256};
28
29use crate::error::{EngramError, Result};
30use crate::storage::queries::{create_memory, list_memories};
31use crate::storage::Storage;
32use crate::types::{CreateMemoryInput, ListOptions, MemoryType};
33
34pub const DEFAULT_MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
36
37pub const DEFAULT_CHUNK_SIZE: usize = 1200;
39
40pub const DEFAULT_OVERLAP: usize = 200;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DocumentFormat {
46 Markdown,
47 Pdf,
48}
49
50impl DocumentFormat {
51 pub fn from_path(path: &Path) -> Option<Self> {
53 let ext = path.extension()?.to_str()?.to_lowercase();
54 match ext.as_str() {
55 "md" | "markdown" => Some(DocumentFormat::Markdown),
56 "pdf" => Some(DocumentFormat::Pdf),
57 _ => None,
58 }
59 }
60
61 #[allow(clippy::should_implement_trait)]
63 pub fn from_str(s: &str) -> Option<Self> {
64 match s.to_lowercase().as_str() {
65 "md" | "markdown" => Some(DocumentFormat::Markdown),
66 "pdf" => Some(DocumentFormat::Pdf),
67 "auto" => None, _ => None,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct IngestConfig {
76 pub format: Option<DocumentFormat>,
78 pub chunk_size: usize,
80 pub overlap: usize,
82 pub max_file_size: u64,
84 pub extra_tags: Vec<String>,
86}
87
88impl Default for IngestConfig {
89 fn default() -> Self {
90 Self {
91 format: None,
92 chunk_size: DEFAULT_CHUNK_SIZE,
93 overlap: DEFAULT_OVERLAP,
94 max_file_size: DEFAULT_MAX_FILE_SIZE,
95 extra_tags: vec![],
96 }
97 }
98}
99
100#[derive(Debug, Clone, serde::Serialize)]
102pub struct IngestResult {
103 pub document_id: String,
105 pub chunks_created: usize,
107 pub chunks_skipped: usize,
109 pub chunks_total: usize,
111 pub duration_ms: u64,
113 pub warnings: Vec<String>,
115}
116
117#[derive(Debug, Clone)]
119pub struct DocumentSection {
120 pub section_path: String,
122 pub content: String,
124 pub page: Option<usize>,
126 pub level: Option<usize>,
128}
129
130#[derive(Debug, Clone)]
132pub struct DocumentChunk {
133 pub content: String,
135 pub source_path: String,
137 pub doc_id: String,
139 pub chunk_index: usize,
141 pub section_path: String,
143 pub page: Option<usize>,
145 pub chunk_hash: String,
147}
148
149pub struct DocumentIngestor<'a> {
151 storage: &'a Storage,
152}
153
154impl<'a> DocumentIngestor<'a> {
155 pub fn new(storage: &'a Storage) -> Self {
157 Self { storage }
158 }
159
160 pub fn ingest_file(
162 &self,
163 path: impl AsRef<Path>,
164 config: IngestConfig,
165 ) -> Result<IngestResult> {
166 let path = path.as_ref();
167 let start = Instant::now();
168 let mut warnings = Vec::new();
169
170 if config.chunk_size == 0 {
171 return Err(EngramError::InvalidInput(
172 "chunk_size must be greater than 0".to_string(),
173 ));
174 }
175
176 if config.overlap >= config.chunk_size {
177 return Err(EngramError::InvalidInput(
178 "overlap must be less than chunk_size".to_string(),
179 ));
180 }
181
182 if !path.exists() {
184 return Err(EngramError::InvalidInput(format!(
185 "File not found: {}",
186 path.display()
187 )));
188 }
189
190 let metadata = fs::metadata(path)
192 .map_err(|e| EngramError::Storage(format!("Failed to read file metadata: {}", e)))?;
193
194 if metadata.len() > config.max_file_size {
195 return Err(EngramError::InvalidInput(format!(
196 "File too large: {} bytes (max: {} bytes)",
197 metadata.len(),
198 config.max_file_size
199 )));
200 }
201
202 let format = config
204 .format
205 .or_else(|| DocumentFormat::from_path(path))
206 .ok_or_else(|| {
207 EngramError::InvalidInput(format!("Unknown file format for: {}", path.display()))
208 })?;
209
210 let content = fs::read(path)
212 .map_err(|e| EngramError::Storage(format!("Failed to read file: {}", e)))?;
213
214 let doc_id = compute_hash(&content);
216
217 let sections = match format {
219 DocumentFormat::Markdown => {
220 let text = String::from_utf8_lossy(&content);
221 extract_markdown_sections(&text)
222 }
223 DocumentFormat::Pdf => extract_pdf_sections(&content)
224 .map_err(|e| EngramError::InvalidInput(format!("PDF extraction failed: {}", e)))?,
225 };
226
227 if sections.is_empty() {
228 if matches!(format, DocumentFormat::Pdf) {
229 return Err(EngramError::InvalidInput(
230 "No text extracted from PDF".to_string(),
231 ));
232 }
233 warnings.push("No content extracted from document".to_string());
234 }
235
236 let source_path = path.to_string_lossy().to_string();
238 let chunks = create_chunks(sections, &source_path, &doc_id, &config);
239
240 let existing_hashes = self.existing_chunk_hashes(&doc_id)?;
242 let mut chunks_created = 0;
243 let mut chunks_skipped = 0;
244
245 for chunk in &chunks {
246 if existing_hashes.contains(&chunk.chunk_hash) {
247 chunks_skipped += 1;
248 continue;
249 }
250
251 self.create_chunk_memory(chunk, &config.extra_tags)?;
253 chunks_created += 1;
254 }
255
256 let duration_ms = start.elapsed().as_millis() as u64;
257
258 Ok(IngestResult {
259 document_id: doc_id,
260 chunks_created,
261 chunks_skipped,
262 chunks_total: chunks.len(),
263 duration_ms,
264 warnings,
265 })
266 }
267
268 fn existing_chunk_hashes(&self, doc_id: &str) -> Result<HashSet<String>> {
270 const PAGE_SIZE: i64 = 500;
271 self.storage.with_connection(|conn| {
272 let mut hashes = HashSet::new();
273 let mut offset = 0;
274
275 loop {
276 let mut filter = HashMap::new();
277 filter.insert("doc_id".to_string(), serde_json::json!(doc_id));
278
279 let options = ListOptions {
280 limit: Some(PAGE_SIZE),
281 offset: Some(offset),
282 tags: Some(vec!["document-chunk".to_string()]),
283 memory_type: None,
284 sort_by: None,
285 sort_order: None,
286 scope: None,
287 workspace: None,
288 workspaces: None,
289 tier: None,
290 metadata_filter: Some(filter),
291 filter: None,
292 include_archived: false,
293 };
294
295 let results = list_memories(conn, &options)?;
296 for memory in &results {
297 if let Some(hash) = memory.metadata.get("chunk_hash").and_then(|v| v.as_str()) {
298 hashes.insert(hash.to_string());
299 }
300 }
301
302 if results.len() < PAGE_SIZE as usize {
303 break;
304 }
305
306 offset += PAGE_SIZE;
307 }
308
309 Ok(hashes)
310 })
311 }
312
313 fn create_chunk_memory(&self, chunk: &DocumentChunk, extra_tags: &[String]) -> Result<()> {
315 let mut tags = vec!["document-chunk".to_string()];
316 tags.extend(extra_tags.iter().cloned());
317
318 let mut metadata = HashMap::new();
319 metadata.insert(
320 "source_file".to_string(),
321 serde_json::Value::String(
322 Path::new(&chunk.source_path)
323 .file_name()
324 .map(|n| n.to_string_lossy().to_string())
325 .unwrap_or_default(),
326 ),
327 );
328 metadata.insert(
329 "source_path".to_string(),
330 serde_json::Value::String(chunk.source_path.clone()),
331 );
332 metadata.insert(
333 "doc_id".to_string(),
334 serde_json::Value::String(chunk.doc_id.clone()),
335 );
336 metadata.insert(
337 "chunk_index".to_string(),
338 serde_json::Value::Number(chunk.chunk_index.into()),
339 );
340 metadata.insert(
341 "section_path".to_string(),
342 serde_json::Value::String(chunk.section_path.clone()),
343 );
344 metadata.insert(
345 "chunk_hash".to_string(),
346 serde_json::Value::String(chunk.chunk_hash.clone()),
347 );
348
349 if let Some(page) = chunk.page {
350 metadata.insert("page".to_string(), serde_json::Value::Number(page.into()));
351 }
352
353 let input = CreateMemoryInput {
354 content: chunk.content.clone(),
355 memory_type: MemoryType::Context,
356 tags,
357 metadata,
358 importance: Some(0.5),
359 scope: crate::types::MemoryScope::Global,
360 workspace: None,
361 tier: crate::types::MemoryTier::Permanent,
362 defer_embedding: false,
363 ttl_seconds: None,
364 dedup_mode: Default::default(),
365 dedup_threshold: None,
366 event_time: None,
367 event_duration_seconds: None,
368 trigger_pattern: None,
369 summary_of_id: None,
370 media_url: None,
371 };
372
373 self.storage.with_connection(|conn| {
374 create_memory(conn, &input)?;
375 Ok(())
376 })
377 }
378}
379
380fn compute_hash(content: &[u8]) -> String {
382 let mut hasher = Sha256::new();
383 hasher.update(content);
384 format!("sha256:{}", hex::encode(hasher.finalize()))
385}
386
387fn extract_markdown_sections(content: &str) -> Vec<DocumentSection> {
389 let parser = Parser::new(content);
390 let mut sections = Vec::new();
391 let mut heading_stack: Vec<(usize, String)> = Vec::new();
392 let mut current_content = String::new();
393 let mut current_section_path = String::new();
394 let mut in_heading = false;
395 let mut current_heading_text = String::new();
396 let mut current_heading_level = 0usize;
397
398 for event in parser {
399 match event {
400 Event::Start(Tag::Heading { level, .. }) => {
401 if !current_content.trim().is_empty() {
403 sections.push(DocumentSection {
404 section_path: if current_section_path.is_empty() {
405 "Preamble".to_string()
406 } else {
407 current_section_path.clone()
408 },
409 content: current_content.trim().to_string(),
410 page: None,
411 level: if heading_stack.is_empty() {
412 None
413 } else {
414 Some(heading_stack.last().map(|(l, _)| *l).unwrap_or(1))
415 },
416 });
417 current_content.clear();
418 }
419
420 in_heading = true;
421 current_heading_text.clear();
422 current_heading_level = heading_level_to_usize(level);
423 }
424 Event::End(TagEnd::Heading(_)) => {
425 in_heading = false;
426
427 while !heading_stack.is_empty()
429 && heading_stack.last().map(|(l, _)| *l).unwrap_or(0) >= current_heading_level
430 {
431 heading_stack.pop();
432 }
433 heading_stack.push((current_heading_level, current_heading_text.clone()));
434
435 current_section_path = heading_stack
437 .iter()
438 .map(|(_, t)| t.as_str())
439 .collect::<Vec<_>>()
440 .join(" > ");
441 }
442 Event::Text(text) => {
443 if in_heading {
444 current_heading_text.push_str(&text);
445 } else {
446 current_content.push_str(&text);
447 }
448 }
449 Event::Code(code) => {
450 if in_heading {
451 current_heading_text.push_str(&code);
452 } else {
453 current_content.push('`');
454 current_content.push_str(&code);
455 current_content.push('`');
456 }
457 }
458 Event::SoftBreak | Event::HardBreak => {
459 if !in_heading {
460 current_content.push('\n');
461 }
462 }
463 _ => {}
464 }
465 }
466
467 if !current_content.trim().is_empty() {
469 sections.push(DocumentSection {
470 section_path: if current_section_path.is_empty() {
471 "Preamble".to_string()
472 } else {
473 current_section_path
474 },
475 content: current_content.trim().to_string(),
476 page: None,
477 level: heading_stack.last().map(|(l, _)| *l),
478 });
479 }
480
481 sections
482}
483
484fn heading_level_to_usize(level: HeadingLevel) -> usize {
486 match level {
487 HeadingLevel::H1 => 1,
488 HeadingLevel::H2 => 2,
489 HeadingLevel::H3 => 3,
490 HeadingLevel::H4 => 4,
491 HeadingLevel::H5 => 5,
492 HeadingLevel::H6 => 6,
493 }
494}
495
496#[cfg(feature = "pdf")]
500fn extract_pdf_sections(content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
501 let text = pdf_extract::extract_text_from_mem(content)
502 .map_err(|e| format!("PDF extraction failed: {}", e))?;
503
504 let pages: Vec<&str> = if text.contains('\x0C') {
507 text.split('\x0C').collect()
508 } else {
509 vec![&text]
510 };
511
512 let sections: Vec<DocumentSection> = pages
513 .iter()
514 .enumerate()
515 .filter(|(_, page_text)| !page_text.trim().is_empty())
516 .map(|(i, page_text)| DocumentSection {
517 section_path: format!("Page {}", i + 1),
518 content: page_text.trim().to_string(),
519 page: Some(i + 1),
520 level: None,
521 })
522 .collect();
523
524 Ok(sections)
525}
526
527#[cfg(not(feature = "pdf"))]
529fn extract_pdf_sections(_content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
530 Err("PDF extraction requires the 'pdf' feature to be enabled".to_string())
531}
532
533fn create_chunks(
535 sections: Vec<DocumentSection>,
536 source_path: &str,
537 doc_id: &str,
538 config: &IngestConfig,
539) -> Vec<DocumentChunk> {
540 let mut chunks = Vec::new();
541 let mut chunk_index = 0;
542
543 for section in sections {
544 let section_chunks = chunk_text(§ion.content, config.chunk_size, config.overlap);
545
546 for chunk_content in section_chunks {
547 let chunk_hash = compute_hash(chunk_content.as_bytes());
548
549 chunks.push(DocumentChunk {
550 content: chunk_content,
551 source_path: source_path.to_string(),
552 doc_id: doc_id.to_string(),
553 chunk_index,
554 section_path: section.section_path.clone(),
555 page: section.page,
556 chunk_hash,
557 });
558
559 chunk_index += 1;
560 }
561 }
562
563 chunks
564}
565
566fn chunk_text(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
568 if text.is_empty() {
569 return vec![];
570 }
571
572 if text.chars().count() <= chunk_size {
574 return vec![text.to_string()];
575 }
576
577 let mut chunks = Vec::new();
578 let chars: Vec<char> = text.chars().collect();
579 let mut start = 0;
580
581 while start < chars.len() {
582 let end = (start + chunk_size).min(chars.len());
583 let chunk: String = chars[start..end].iter().collect();
584
585 let chunk = if end < chars.len() {
587 if let Some(last_space) = chunk.rfind(|c: char| c.is_whitespace()) {
588 if last_space > chunk_size / 2 {
589 chunk[..last_space].to_string()
591 } else {
592 chunk
593 }
594 } else {
595 chunk
596 }
597 } else {
598 chunk
599 };
600
601 let chunk_char_count = chunk.chars().count();
602 chunks.push(chunk);
603
604 if start + chunk_char_count >= chars.len() {
606 break;
607 }
608
609 let step = chunk_char_count.saturating_sub(overlap);
610 start += if step == 0 { chunk_char_count } else { step };
611 }
612
613 chunks
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use std::fs;
620 use tempfile::tempdir;
621
622 #[test]
623 fn test_chunk_text_small() {
624 let text = "Hello world";
625 let chunks = chunk_text(text, 1200, 200);
626 assert_eq!(chunks.len(), 1);
627 assert_eq!(chunks[0], "Hello world");
628 }
629
630 #[test]
631 fn test_chunk_text_with_overlap() {
632 let text = "A".repeat(2500);
633 let chunks = chunk_text(&text, 1200, 200);
634 assert!(chunks.len() >= 2);
635 assert!(chunks[0].len() <= 1200);
637 }
638
639 #[test]
640 fn test_markdown_sections() {
641 let md = r#"# Title
642
643Introduction text.
644
645## Section 1
646
647Content for section 1.
648
649### Subsection 1.1
650
651Nested content.
652
653## Section 2
654
655Content for section 2.
656"#;
657 let sections = extract_markdown_sections(md);
658 assert!(sections.len() >= 3);
659
660 let title_section = sections.iter().find(|s| s.section_path == "Title");
662 assert!(title_section.is_some());
663
664 let nested = sections
666 .iter()
667 .find(|s| s.section_path.contains("Subsection"));
668 assert!(nested.is_some());
669 }
670
671 #[test]
672 fn test_compute_hash() {
673 let hash = compute_hash(b"test content");
674 assert!(hash.starts_with("sha256:"));
675 assert_eq!(hash.len(), 7 + 64); }
677
678 #[test]
679 fn test_document_format_detection() {
680 assert_eq!(
681 DocumentFormat::from_path(Path::new("doc.md")),
682 Some(DocumentFormat::Markdown)
683 );
684 assert_eq!(
685 DocumentFormat::from_path(Path::new("doc.pdf")),
686 Some(DocumentFormat::Pdf)
687 );
688 assert_eq!(DocumentFormat::from_path(Path::new("doc.txt")), None);
689 }
690
691 #[test]
692 fn test_ingest_config_default() {
693 let config = IngestConfig::default();
694 assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
695 assert_eq!(config.overlap, DEFAULT_OVERLAP);
696 assert_eq!(config.max_file_size, DEFAULT_MAX_FILE_SIZE);
697 }
698
699 #[test]
700 fn test_ingest_idempotent() {
701 let dir = tempdir().unwrap();
702 let file_path = dir.path().join("doc.md");
703 fs::write(&file_path, "# Title\n\nHello world.\n").unwrap();
704
705 let storage = Storage::open_in_memory().unwrap();
706 let ingestor = DocumentIngestor::new(&storage);
707
708 let first = ingestor
709 .ingest_file(&file_path, IngestConfig::default())
710 .unwrap();
711 assert!(first.chunks_created > 0);
712 assert_eq!(first.chunks_skipped, 0);
713
714 let second = ingestor
715 .ingest_file(&file_path, IngestConfig::default())
716 .unwrap();
717 assert_eq!(second.chunks_created, 0);
718 assert_eq!(second.chunks_skipped, first.chunks_total);
719 }
720
721 #[test]
722 fn test_invalid_chunk_size() {
723 let dir = tempdir().unwrap();
724 let file_path = dir.path().join("doc.md");
725 fs::write(&file_path, "Hello").unwrap();
726
727 let storage = Storage::open_in_memory().unwrap();
728 let ingestor = DocumentIngestor::new(&storage);
729
730 let config = IngestConfig {
731 chunk_size: 0,
732 ..Default::default()
733 };
734
735 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
736 assert!(err.to_string().contains("chunk_size"));
737 }
738
739 #[test]
740 fn test_invalid_overlap() {
741 let dir = tempdir().unwrap();
742 let file_path = dir.path().join("doc.md");
743 fs::write(&file_path, "Hello").unwrap();
744
745 let storage = Storage::open_in_memory().unwrap();
746 let ingestor = DocumentIngestor::new(&storage);
747
748 let config = IngestConfig {
749 chunk_size: 200,
750 overlap: 200,
751 ..Default::default()
752 };
753
754 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
755 assert!(err.to_string().contains("overlap"));
756 }
757
758 #[test]
759 fn test_pdf_empty_is_error() {
760 let dir = tempdir().unwrap();
761 let file_path = dir.path().join("empty.pdf");
762 fs::write(&file_path, b"").unwrap();
763
764 let storage = Storage::open_in_memory().unwrap();
765 let ingestor = DocumentIngestor::new(&storage);
766
767 let config = IngestConfig {
768 format: Some(DocumentFormat::Pdf),
769 ..Default::default()
770 };
771
772 let err = ingestor.ingest_file(&file_path, config).unwrap_err();
773 assert!(err.to_string().contains("PDF"));
774 }
775}