Skip to main content

engram/intelligence/
document_ingest.rs

1//! Document ingestion for Engram (RML-928)
2//!
3//! Provides document parsing, chunking, and ingestion into the memory store.
4//! Supported formats:
5//! - Markdown (.md): Uses pulldown-cmark for parsing, extracts sections
6//! - PDF (.pdf): Uses pdf-extract for text extraction by page
7//!
8//! # Usage
9//!
10//! ```ignore
11//! use engram::intelligence::document_ingest::{DocumentIngestor, IngestConfig};
12//! use engram::Storage;
13//!
14//! let storage = Storage::open_in_memory()?;
15//! let ingestor = DocumentIngestor::new(&storage);
16//!
17//! let result = ingestor.ingest_file("docs/handbook.pdf", IngestConfig::default())?;
18//! println!("Ingested {} chunks", result.chunks_created);
19//! ```
20
21use std::collections::{HashMap, HashSet};
22use std::fs;
23use std::path::Path;
24use std::time::Instant;
25
26use pulldown_cmark::{Event, HeadingLevel, Parser, Tag, TagEnd};
27use sha2::{Digest, Sha256};
28
29use crate::error::{EngramError, Result};
30use crate::storage::queries::{create_memory, list_memories};
31use crate::storage::Storage;
32use crate::types::{CreateMemoryInput, ListOptions, MemoryType};
33
34/// Maximum file size in bytes (10 MB default)
35pub const DEFAULT_MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
36
37/// Default chunk size in characters
38pub const DEFAULT_CHUNK_SIZE: usize = 1200;
39
40/// Default overlap between chunks in characters
41pub const DEFAULT_OVERLAP: usize = 200;
42
43/// Document format
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DocumentFormat {
46    Markdown,
47    Pdf,
48}
49
50impl DocumentFormat {
51    /// Detect format from file extension
52    pub fn from_path(path: &Path) -> Option<Self> {
53        let ext = path.extension()?.to_str()?.to_lowercase();
54        match ext.as_str() {
55            "md" | "markdown" => Some(DocumentFormat::Markdown),
56            "pdf" => Some(DocumentFormat::Pdf),
57            _ => None,
58        }
59    }
60
61    /// Parse format from string
62    #[allow(clippy::should_implement_trait)]
63    pub fn from_str(s: &str) -> Option<Self> {
64        match s.to_lowercase().as_str() {
65            "md" | "markdown" => Some(DocumentFormat::Markdown),
66            "pdf" => Some(DocumentFormat::Pdf),
67            "auto" => None, // Will be detected from path
68            _ => None,
69        }
70    }
71}
72
73/// Configuration for document ingestion
74#[derive(Debug, Clone)]
75pub struct IngestConfig {
76    /// Force specific format (None = auto-detect)
77    pub format: Option<DocumentFormat>,
78    /// Maximum characters per chunk
79    pub chunk_size: usize,
80    /// Overlap between chunks in characters
81    pub overlap: usize,
82    /// Maximum file size in bytes
83    pub max_file_size: u64,
84    /// Additional tags to add to all chunks
85    pub extra_tags: Vec<String>,
86}
87
88impl Default for IngestConfig {
89    fn default() -> Self {
90        Self {
91            format: None,
92            chunk_size: DEFAULT_CHUNK_SIZE,
93            overlap: DEFAULT_OVERLAP,
94            max_file_size: DEFAULT_MAX_FILE_SIZE,
95            extra_tags: vec![],
96        }
97    }
98}
99
100/// Result of document ingestion
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct IngestResult {
103    /// Document ID (SHA-256 hash of file content)
104    pub document_id: String,
105    /// Number of chunks created
106    pub chunks_created: usize,
107    /// Number of chunks skipped (already existed)
108    pub chunks_skipped: usize,
109    /// Total number of chunks processed
110    pub chunks_total: usize,
111    /// Duration in milliseconds
112    pub duration_ms: u64,
113    /// Warnings encountered during ingestion
114    pub warnings: Vec<String>,
115}
116
117/// A section extracted from a document
118#[derive(Debug, Clone)]
119pub struct DocumentSection {
120    /// Section path (e.g., "Security > Key Rotation")
121    pub section_path: String,
122    /// Section content
123    pub content: String,
124    /// Page number (for PDFs)
125    pub page: Option<usize>,
126    /// Heading level (1-6 for Markdown)
127    pub level: Option<usize>,
128}
129
130/// A chunk ready for ingestion
131#[derive(Debug, Clone)]
132pub struct DocumentChunk {
133    /// Chunk content
134    pub content: String,
135    /// Source file path
136    pub source_path: String,
137    /// Document ID
138    pub doc_id: String,
139    /// Chunk index within document
140    pub chunk_index: usize,
141    /// Section path
142    pub section_path: String,
143    /// Page number (for PDFs)
144    pub page: Option<usize>,
145    /// SHA-256 hash of chunk content
146    pub chunk_hash: String,
147}
148
149/// Document ingestor
150pub struct DocumentIngestor<'a> {
151    storage: &'a Storage,
152}
153
154impl<'a> DocumentIngestor<'a> {
155    /// Create a new document ingestor
156    pub fn new(storage: &'a Storage) -> Self {
157        Self { storage }
158    }
159
160    /// Ingest a document file
161    pub fn ingest_file(
162        &self,
163        path: impl AsRef<Path>,
164        config: IngestConfig,
165    ) -> Result<IngestResult> {
166        let path = path.as_ref();
167        let start = Instant::now();
168        let mut warnings = Vec::new();
169
170        if config.chunk_size == 0 {
171            return Err(EngramError::InvalidInput(
172                "chunk_size must be greater than 0".to_string(),
173            ));
174        }
175
176        if config.overlap >= config.chunk_size {
177            return Err(EngramError::InvalidInput(
178                "overlap must be less than chunk_size".to_string(),
179            ));
180        }
181
182        // Check file exists
183        if !path.exists() {
184            return Err(EngramError::InvalidInput(format!(
185                "File not found: {}",
186                path.display()
187            )));
188        }
189
190        // Check file size
191        let metadata = fs::metadata(path)
192            .map_err(|e| EngramError::Storage(format!("Failed to read file metadata: {}", e)))?;
193
194        if metadata.len() > config.max_file_size {
195            return Err(EngramError::InvalidInput(format!(
196                "File too large: {} bytes (max: {} bytes)",
197                metadata.len(),
198                config.max_file_size
199            )));
200        }
201
202        // Determine format
203        let format = config
204            .format
205            .or_else(|| DocumentFormat::from_path(path))
206            .ok_or_else(|| {
207                EngramError::InvalidInput(format!("Unknown file format for: {}", path.display()))
208            })?;
209
210        // Read file content
211        let content = fs::read(path)
212            .map_err(|e| EngramError::Storage(format!("Failed to read file: {}", e)))?;
213
214        // Compute document ID
215        let doc_id = compute_hash(&content);
216
217        // Extract sections based on format
218        let sections = match format {
219            DocumentFormat::Markdown => {
220                let text = String::from_utf8_lossy(&content);
221                extract_markdown_sections(&text)
222            }
223            DocumentFormat::Pdf => extract_pdf_sections(&content)
224                .map_err(|e| EngramError::InvalidInput(format!("PDF extraction failed: {}", e)))?,
225        };
226
227        if sections.is_empty() {
228            if matches!(format, DocumentFormat::Pdf) {
229                return Err(EngramError::InvalidInput(
230                    "No text extracted from PDF".to_string(),
231                ));
232            }
233            warnings.push("No content extracted from document".to_string());
234        }
235
236        // Create chunks
237        let source_path = path.to_string_lossy().to_string();
238        let chunks = create_chunks(sections, &source_path, &doc_id, &config);
239
240        // Ingest chunks
241        let existing_hashes = self.existing_chunk_hashes(&doc_id)?;
242        let mut chunks_created = 0;
243        let mut chunks_skipped = 0;
244
245        for chunk in &chunks {
246            if existing_hashes.contains(&chunk.chunk_hash) {
247                chunks_skipped += 1;
248                continue;
249            }
250
251            // Create memory for chunk
252            self.create_chunk_memory(chunk, &config.extra_tags)?;
253            chunks_created += 1;
254        }
255
256        let duration_ms = start.elapsed().as_millis() as u64;
257
258        Ok(IngestResult {
259            document_id: doc_id,
260            chunks_created,
261            chunks_skipped,
262            chunks_total: chunks.len(),
263            duration_ms,
264            warnings,
265        })
266    }
267
268    /// Fetch existing chunk hashes for a document in a single pass
269    fn existing_chunk_hashes(&self, doc_id: &str) -> Result<HashSet<String>> {
270        const PAGE_SIZE: i64 = 500;
271        self.storage.with_connection(|conn| {
272            let mut hashes = HashSet::new();
273            let mut offset = 0;
274
275            loop {
276                let mut filter = HashMap::new();
277                filter.insert("doc_id".to_string(), serde_json::json!(doc_id));
278
279                let options = ListOptions {
280                    limit: Some(PAGE_SIZE),
281                    offset: Some(offset),
282                    tags: Some(vec!["document-chunk".to_string()]),
283                    memory_type: None,
284                    sort_by: None,
285                    sort_order: None,
286                    scope: None,
287                    workspace: None,
288                    workspaces: None,
289                    tier: None,
290                    metadata_filter: Some(filter),
291                    filter: None,
292                    include_archived: false,
293                };
294
295                let results = list_memories(conn, &options)?;
296                for memory in &results {
297                    if let Some(hash) = memory.metadata.get("chunk_hash").and_then(|v| v.as_str()) {
298                        hashes.insert(hash.to_string());
299                    }
300                }
301
302                if results.len() < PAGE_SIZE as usize {
303                    break;
304                }
305
306                offset += PAGE_SIZE;
307            }
308
309            Ok(hashes)
310        })
311    }
312
313    /// Create a memory entry for a chunk
314    fn create_chunk_memory(&self, chunk: &DocumentChunk, extra_tags: &[String]) -> Result<()> {
315        let mut tags = vec!["document-chunk".to_string()];
316        tags.extend(extra_tags.iter().cloned());
317
318        let mut metadata = HashMap::new();
319        metadata.insert(
320            "source_file".to_string(),
321            serde_json::Value::String(
322                Path::new(&chunk.source_path)
323                    .file_name()
324                    .map(|n| n.to_string_lossy().to_string())
325                    .unwrap_or_default(),
326            ),
327        );
328        metadata.insert(
329            "source_path".to_string(),
330            serde_json::Value::String(chunk.source_path.clone()),
331        );
332        metadata.insert(
333            "doc_id".to_string(),
334            serde_json::Value::String(chunk.doc_id.clone()),
335        );
336        metadata.insert(
337            "chunk_index".to_string(),
338            serde_json::Value::Number(chunk.chunk_index.into()),
339        );
340        metadata.insert(
341            "section_path".to_string(),
342            serde_json::Value::String(chunk.section_path.clone()),
343        );
344        metadata.insert(
345            "chunk_hash".to_string(),
346            serde_json::Value::String(chunk.chunk_hash.clone()),
347        );
348
349        if let Some(page) = chunk.page {
350            metadata.insert("page".to_string(), serde_json::Value::Number(page.into()));
351        }
352
353        let input = CreateMemoryInput {
354            content: chunk.content.clone(),
355            memory_type: MemoryType::Context,
356            tags,
357            metadata,
358            importance: Some(0.5),
359            scope: crate::types::MemoryScope::Global,
360            workspace: None,
361            tier: crate::types::MemoryTier::Permanent,
362            defer_embedding: false,
363            ttl_seconds: None,
364            dedup_mode: Default::default(),
365            dedup_threshold: None,
366            event_time: None,
367            event_duration_seconds: None,
368            trigger_pattern: None,
369            summary_of_id: None,
370        };
371
372        self.storage.with_connection(|conn| {
373            create_memory(conn, &input)?;
374            Ok(())
375        })
376    }
377}
378
379/// Compute SHA-256 hash of content
380fn compute_hash(content: &[u8]) -> String {
381    let mut hasher = Sha256::new();
382    hasher.update(content);
383    format!("sha256:{}", hex::encode(hasher.finalize()))
384}
385
386/// Extract sections from Markdown content
387fn extract_markdown_sections(content: &str) -> Vec<DocumentSection> {
388    let parser = Parser::new(content);
389    let mut sections = Vec::new();
390    let mut heading_stack: Vec<(usize, String)> = Vec::new();
391    let mut current_content = String::new();
392    let mut current_section_path = String::new();
393    let mut in_heading = false;
394    let mut current_heading_text = String::new();
395    let mut current_heading_level = 0usize;
396
397    for event in parser {
398        match event {
399            Event::Start(Tag::Heading { level, .. }) => {
400                // Save previous section if it has content
401                if !current_content.trim().is_empty() {
402                    sections.push(DocumentSection {
403                        section_path: if current_section_path.is_empty() {
404                            "Preamble".to_string()
405                        } else {
406                            current_section_path.clone()
407                        },
408                        content: current_content.trim().to_string(),
409                        page: None,
410                        level: if heading_stack.is_empty() {
411                            None
412                        } else {
413                            Some(heading_stack.last().map(|(l, _)| *l).unwrap_or(1))
414                        },
415                    });
416                    current_content.clear();
417                }
418
419                in_heading = true;
420                current_heading_text.clear();
421                current_heading_level = heading_level_to_usize(level);
422            }
423            Event::End(TagEnd::Heading(_)) => {
424                in_heading = false;
425
426                // Update heading stack
427                while !heading_stack.is_empty()
428                    && heading_stack.last().map(|(l, _)| *l).unwrap_or(0) >= current_heading_level
429                {
430                    heading_stack.pop();
431                }
432                heading_stack.push((current_heading_level, current_heading_text.clone()));
433
434                // Build section path
435                current_section_path = heading_stack
436                    .iter()
437                    .map(|(_, t)| t.as_str())
438                    .collect::<Vec<_>>()
439                    .join(" > ");
440            }
441            Event::Text(text) => {
442                if in_heading {
443                    current_heading_text.push_str(&text);
444                } else {
445                    current_content.push_str(&text);
446                }
447            }
448            Event::Code(code) => {
449                if in_heading {
450                    current_heading_text.push_str(&code);
451                } else {
452                    current_content.push('`');
453                    current_content.push_str(&code);
454                    current_content.push('`');
455                }
456            }
457            Event::SoftBreak | Event::HardBreak => {
458                if !in_heading {
459                    current_content.push('\n');
460                }
461            }
462            _ => {}
463        }
464    }
465
466    // Save final section
467    if !current_content.trim().is_empty() {
468        sections.push(DocumentSection {
469            section_path: if current_section_path.is_empty() {
470                "Preamble".to_string()
471            } else {
472                current_section_path
473            },
474            content: current_content.trim().to_string(),
475            page: None,
476            level: heading_stack.last().map(|(l, _)| *l),
477        });
478    }
479
480    sections
481}
482
483/// Convert pulldown_cmark HeadingLevel to usize
484fn heading_level_to_usize(level: HeadingLevel) -> usize {
485    match level {
486        HeadingLevel::H1 => 1,
487        HeadingLevel::H2 => 2,
488        HeadingLevel::H3 => 3,
489        HeadingLevel::H4 => 4,
490        HeadingLevel::H5 => 5,
491        HeadingLevel::H6 => 6,
492    }
493}
494
495/// Extract sections from PDF content
496///
497/// Requires the `pdf` feature to be enabled.
498#[cfg(feature = "pdf")]
499fn extract_pdf_sections(content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
500    let text = pdf_extract::extract_text_from_mem(content)
501        .map_err(|e| format!("PDF extraction failed: {}", e))?;
502
503    // Split by page markers if present, otherwise treat as single page
504    // pdf-extract may include form feed characters or page break markers
505    let pages: Vec<&str> = if text.contains('\x0C') {
506        text.split('\x0C').collect()
507    } else {
508        vec![&text]
509    };
510
511    let sections: Vec<DocumentSection> = pages
512        .iter()
513        .enumerate()
514        .filter(|(_, page_text)| !page_text.trim().is_empty())
515        .map(|(i, page_text)| DocumentSection {
516            section_path: format!("Page {}", i + 1),
517            content: page_text.trim().to_string(),
518            page: Some(i + 1),
519            level: None,
520        })
521        .collect();
522
523    Ok(sections)
524}
525
526/// Stub for PDF extraction when the `pdf` feature is disabled
527#[cfg(not(feature = "pdf"))]
528fn extract_pdf_sections(_content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
529    Err("PDF extraction requires the 'pdf' feature to be enabled".to_string())
530}
531
532/// Create chunks from sections with overlap
533fn create_chunks(
534    sections: Vec<DocumentSection>,
535    source_path: &str,
536    doc_id: &str,
537    config: &IngestConfig,
538) -> Vec<DocumentChunk> {
539    let mut chunks = Vec::new();
540    let mut chunk_index = 0;
541
542    for section in sections {
543        let section_chunks = chunk_text(&section.content, config.chunk_size, config.overlap);
544
545        for chunk_content in section_chunks {
546            let chunk_hash = compute_hash(chunk_content.as_bytes());
547
548            chunks.push(DocumentChunk {
549                content: chunk_content,
550                source_path: source_path.to_string(),
551                doc_id: doc_id.to_string(),
552                chunk_index,
553                section_path: section.section_path.clone(),
554                page: section.page,
555                chunk_hash,
556            });
557
558            chunk_index += 1;
559        }
560    }
561
562    chunks
563}
564
565/// Chunk text with overlap
566fn chunk_text(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
567    if text.is_empty() {
568        return vec![];
569    }
570
571    // If text is smaller than chunk size, return as single chunk
572    if text.chars().count() <= chunk_size {
573        return vec![text.to_string()];
574    }
575
576    let mut chunks = Vec::new();
577    let chars: Vec<char> = text.chars().collect();
578    let mut start = 0;
579
580    while start < chars.len() {
581        let end = (start + chunk_size).min(chars.len());
582        let chunk: String = chars[start..end].iter().collect();
583
584        // Try to break at word boundary
585        let chunk = if end < chars.len() {
586            if let Some(last_space) = chunk.rfind(|c: char| c.is_whitespace()) {
587                if last_space > chunk_size / 2 {
588                    // Only break at word boundary if it's in the second half
589                    chunk[..last_space].to_string()
590                } else {
591                    chunk
592                }
593            } else {
594                chunk
595            }
596        } else {
597            chunk
598        };
599
600        let chunk_char_count = chunk.chars().count();
601        chunks.push(chunk);
602
603        // Move start with overlap
604        if start + chunk_char_count >= chars.len() {
605            break;
606        }
607
608        let step = chunk_char_count.saturating_sub(overlap);
609        start += if step == 0 { chunk_char_count } else { step };
610    }
611
612    chunks
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use std::fs;
619    use tempfile::tempdir;
620
621    #[test]
622    fn test_chunk_text_small() {
623        let text = "Hello world";
624        let chunks = chunk_text(text, 1200, 200);
625        assert_eq!(chunks.len(), 1);
626        assert_eq!(chunks[0], "Hello world");
627    }
628
629    #[test]
630    fn test_chunk_text_with_overlap() {
631        let text = "A".repeat(2500);
632        let chunks = chunk_text(&text, 1200, 200);
633        assert!(chunks.len() >= 2);
634        // First chunk should be 1200 chars
635        assert!(chunks[0].len() <= 1200);
636    }
637
638    #[test]
639    fn test_markdown_sections() {
640        let md = r#"# Title
641
642Introduction text.
643
644## Section 1
645
646Content for section 1.
647
648### Subsection 1.1
649
650Nested content.
651
652## Section 2
653
654Content for section 2.
655"#;
656        let sections = extract_markdown_sections(md);
657        assert!(sections.len() >= 3);
658
659        // Check preamble/title section
660        let title_section = sections.iter().find(|s| s.section_path == "Title");
661        assert!(title_section.is_some());
662
663        // Check nested section
664        let nested = sections
665            .iter()
666            .find(|s| s.section_path.contains("Subsection"));
667        assert!(nested.is_some());
668    }
669
670    #[test]
671    fn test_compute_hash() {
672        let hash = compute_hash(b"test content");
673        assert!(hash.starts_with("sha256:"));
674        assert_eq!(hash.len(), 7 + 64); // "sha256:" + 64 hex chars
675    }
676
677    #[test]
678    fn test_document_format_detection() {
679        assert_eq!(
680            DocumentFormat::from_path(Path::new("doc.md")),
681            Some(DocumentFormat::Markdown)
682        );
683        assert_eq!(
684            DocumentFormat::from_path(Path::new("doc.pdf")),
685            Some(DocumentFormat::Pdf)
686        );
687        assert_eq!(DocumentFormat::from_path(Path::new("doc.txt")), None);
688    }
689
690    #[test]
691    fn test_ingest_config_default() {
692        let config = IngestConfig::default();
693        assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
694        assert_eq!(config.overlap, DEFAULT_OVERLAP);
695        assert_eq!(config.max_file_size, DEFAULT_MAX_FILE_SIZE);
696    }
697
698    #[test]
699    fn test_ingest_idempotent() {
700        let dir = tempdir().unwrap();
701        let file_path = dir.path().join("doc.md");
702        fs::write(&file_path, "# Title\n\nHello world.\n").unwrap();
703
704        let storage = Storage::open_in_memory().unwrap();
705        let ingestor = DocumentIngestor::new(&storage);
706
707        let first = ingestor
708            .ingest_file(&file_path, IngestConfig::default())
709            .unwrap();
710        assert!(first.chunks_created > 0);
711        assert_eq!(first.chunks_skipped, 0);
712
713        let second = ingestor
714            .ingest_file(&file_path, IngestConfig::default())
715            .unwrap();
716        assert_eq!(second.chunks_created, 0);
717        assert_eq!(second.chunks_skipped, first.chunks_total);
718    }
719
720    #[test]
721    fn test_invalid_chunk_size() {
722        let dir = tempdir().unwrap();
723        let file_path = dir.path().join("doc.md");
724        fs::write(&file_path, "Hello").unwrap();
725
726        let storage = Storage::open_in_memory().unwrap();
727        let ingestor = DocumentIngestor::new(&storage);
728
729        let config = IngestConfig {
730            chunk_size: 0,
731            ..Default::default()
732        };
733
734        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
735        assert!(err.to_string().contains("chunk_size"));
736    }
737
738    #[test]
739    fn test_invalid_overlap() {
740        let dir = tempdir().unwrap();
741        let file_path = dir.path().join("doc.md");
742        fs::write(&file_path, "Hello").unwrap();
743
744        let storage = Storage::open_in_memory().unwrap();
745        let ingestor = DocumentIngestor::new(&storage);
746
747        let config = IngestConfig {
748            chunk_size: 200,
749            overlap: 200,
750            ..Default::default()
751        };
752
753        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
754        assert!(err.to_string().contains("overlap"));
755    }
756
757    #[test]
758    fn test_pdf_empty_is_error() {
759        let dir = tempdir().unwrap();
760        let file_path = dir.path().join("empty.pdf");
761        fs::write(&file_path, b"").unwrap();
762
763        let storage = Storage::open_in_memory().unwrap();
764        let ingestor = DocumentIngestor::new(&storage);
765
766        let config = IngestConfig {
767            format: Some(DocumentFormat::Pdf),
768            ..Default::default()
769        };
770
771        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
772        assert!(err.to_string().contains("PDF"));
773    }
774}