Skip to main content

engram/intelligence/
document_ingest.rs

1//! Document ingestion for Engram (RML-928)
2//!
3//! Provides document parsing, chunking, and ingestion into the memory store.
4//! Supported formats:
5//! - Markdown (.md): Uses pulldown-cmark for parsing, extracts sections
6//! - PDF (.pdf): Uses pdf-extract for text extraction by page
7//!
8//! # Usage
9//!
10//! ```ignore
11//! use engram::intelligence::document_ingest::{DocumentIngestor, IngestConfig};
12//! use engram::Storage;
13//!
14//! let storage = Storage::open_in_memory()?;
15//! let ingestor = DocumentIngestor::new(&storage);
16//!
17//! let result = ingestor.ingest_file("docs/handbook.pdf", IngestConfig::default())?;
18//! println!("Ingested {} chunks", result.chunks_created);
19//! ```
20
21use std::collections::{HashMap, HashSet};
22use std::fs;
23use std::path::Path;
24use std::time::Instant;
25
26use pulldown_cmark::{Event, HeadingLevel, Parser, Tag, TagEnd};
27use sha2::{Digest, Sha256};
28
29use crate::error::{EngramError, Result};
30use crate::storage::queries::{create_memory, list_memories};
31use crate::storage::Storage;
32use crate::types::{CreateMemoryInput, ListOptions, MemoryType};
33
34/// Maximum file size in bytes (10 MB default)
35pub const DEFAULT_MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
36
37/// Default chunk size in characters
38pub const DEFAULT_CHUNK_SIZE: usize = 1200;
39
40/// Default overlap between chunks in characters
41pub const DEFAULT_OVERLAP: usize = 200;
42
43/// Document format
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DocumentFormat {
46    Markdown,
47    Pdf,
48}
49
50impl DocumentFormat {
51    /// Detect format from file extension
52    pub fn from_path(path: &Path) -> Option<Self> {
53        let ext = path.extension()?.to_str()?.to_lowercase();
54        match ext.as_str() {
55            "md" | "markdown" => Some(DocumentFormat::Markdown),
56            "pdf" => Some(DocumentFormat::Pdf),
57            _ => None,
58        }
59    }
60
61    /// Parse format from string
62    #[allow(clippy::should_implement_trait)]
63    pub fn from_str(s: &str) -> Option<Self> {
64        match s.to_lowercase().as_str() {
65            "md" | "markdown" => Some(DocumentFormat::Markdown),
66            "pdf" => Some(DocumentFormat::Pdf),
67            "auto" => None, // Will be detected from path
68            _ => None,
69        }
70    }
71}
72
73/// Configuration for document ingestion
74#[derive(Debug, Clone)]
75pub struct IngestConfig {
76    /// Force specific format (None = auto-detect)
77    pub format: Option<DocumentFormat>,
78    /// Maximum characters per chunk
79    pub chunk_size: usize,
80    /// Overlap between chunks in characters
81    pub overlap: usize,
82    /// Maximum file size in bytes
83    pub max_file_size: u64,
84    /// Additional tags to add to all chunks
85    pub extra_tags: Vec<String>,
86}
87
88impl Default for IngestConfig {
89    fn default() -> Self {
90        Self {
91            format: None,
92            chunk_size: DEFAULT_CHUNK_SIZE,
93            overlap: DEFAULT_OVERLAP,
94            max_file_size: DEFAULT_MAX_FILE_SIZE,
95            extra_tags: vec![],
96        }
97    }
98}
99
100/// Result of document ingestion
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct IngestResult {
103    /// Document ID (SHA-256 hash of file content)
104    pub document_id: String,
105    /// Number of chunks created
106    pub chunks_created: usize,
107    /// Number of chunks skipped (already existed)
108    pub chunks_skipped: usize,
109    /// Total number of chunks processed
110    pub chunks_total: usize,
111    /// Duration in milliseconds
112    pub duration_ms: u64,
113    /// Warnings encountered during ingestion
114    pub warnings: Vec<String>,
115}
116
117/// A section extracted from a document
118#[derive(Debug, Clone)]
119pub struct DocumentSection {
120    /// Section path (e.g., "Security > Key Rotation")
121    pub section_path: String,
122    /// Section content
123    pub content: String,
124    /// Page number (for PDFs)
125    pub page: Option<usize>,
126    /// Heading level (1-6 for Markdown)
127    pub level: Option<usize>,
128}
129
130/// A chunk ready for ingestion
131#[derive(Debug, Clone)]
132pub struct DocumentChunk {
133    /// Chunk content
134    pub content: String,
135    /// Source file path
136    pub source_path: String,
137    /// Document ID
138    pub doc_id: String,
139    /// Chunk index within document
140    pub chunk_index: usize,
141    /// Section path
142    pub section_path: String,
143    /// Page number (for PDFs)
144    pub page: Option<usize>,
145    /// SHA-256 hash of chunk content
146    pub chunk_hash: String,
147}
148
149/// Document ingestor
150pub struct DocumentIngestor<'a> {
151    storage: &'a Storage,
152}
153
154impl<'a> DocumentIngestor<'a> {
155    /// Create a new document ingestor
156    pub fn new(storage: &'a Storage) -> Self {
157        Self { storage }
158    }
159
160    /// Ingest a document file
161    pub fn ingest_file(
162        &self,
163        path: impl AsRef<Path>,
164        config: IngestConfig,
165    ) -> Result<IngestResult> {
166        let path = path.as_ref();
167        let start = Instant::now();
168        let mut warnings = Vec::new();
169
170        if config.chunk_size == 0 {
171            return Err(EngramError::InvalidInput(
172                "chunk_size must be greater than 0".to_string(),
173            ));
174        }
175
176        if config.overlap >= config.chunk_size {
177            return Err(EngramError::InvalidInput(
178                "overlap must be less than chunk_size".to_string(),
179            ));
180        }
181
182        // Check file exists
183        if !path.exists() {
184            return Err(EngramError::InvalidInput(format!(
185                "File not found: {}",
186                path.display()
187            )));
188        }
189
190        // Check file size
191        let metadata = fs::metadata(path)
192            .map_err(|e| EngramError::Storage(format!("Failed to read file metadata: {}", e)))?;
193
194        if metadata.len() > config.max_file_size {
195            return Err(EngramError::InvalidInput(format!(
196                "File too large: {} bytes (max: {} bytes)",
197                metadata.len(),
198                config.max_file_size
199            )));
200        }
201
202        // Determine format
203        let format = config
204            .format
205            .or_else(|| DocumentFormat::from_path(path))
206            .ok_or_else(|| {
207                EngramError::InvalidInput(format!("Unknown file format for: {}", path.display()))
208            })?;
209
210        // Read file content
211        let content = fs::read(path)
212            .map_err(|e| EngramError::Storage(format!("Failed to read file: {}", e)))?;
213
214        // Compute document ID
215        let doc_id = compute_hash(&content);
216
217        // Extract sections based on format
218        let sections = match format {
219            DocumentFormat::Markdown => {
220                let text = String::from_utf8_lossy(&content);
221                extract_markdown_sections(&text)
222            }
223            DocumentFormat::Pdf => extract_pdf_sections(&content)
224                .map_err(|e| EngramError::InvalidInput(format!("PDF extraction failed: {}", e)))?,
225        };
226
227        if sections.is_empty() {
228            if matches!(format, DocumentFormat::Pdf) {
229                return Err(EngramError::InvalidInput(
230                    "No text extracted from PDF".to_string(),
231                ));
232            }
233            warnings.push("No content extracted from document".to_string());
234        }
235
236        // Create chunks
237        let source_path = path.to_string_lossy().to_string();
238        let chunks = create_chunks(sections, &source_path, &doc_id, &config);
239
240        // Ingest chunks
241        let existing_hashes = self.existing_chunk_hashes(&doc_id)?;
242        let mut chunks_created = 0;
243        let mut chunks_skipped = 0;
244
245        for chunk in &chunks {
246            if existing_hashes.contains(&chunk.chunk_hash) {
247                chunks_skipped += 1;
248                continue;
249            }
250
251            // Create memory for chunk
252            self.create_chunk_memory(chunk, &config.extra_tags)?;
253            chunks_created += 1;
254        }
255
256        let duration_ms = start.elapsed().as_millis() as u64;
257
258        Ok(IngestResult {
259            document_id: doc_id,
260            chunks_created,
261            chunks_skipped,
262            chunks_total: chunks.len(),
263            duration_ms,
264            warnings,
265        })
266    }
267
268    /// Fetch existing chunk hashes for a document in a single pass
269    fn existing_chunk_hashes(&self, doc_id: &str) -> Result<HashSet<String>> {
270        const PAGE_SIZE: i64 = 500;
271        self.storage.with_connection(|conn| {
272            let mut hashes = HashSet::new();
273            let mut offset = 0;
274
275            loop {
276                let mut filter = HashMap::new();
277                filter.insert("doc_id".to_string(), serde_json::json!(doc_id));
278
279                let options = ListOptions {
280                    limit: Some(PAGE_SIZE),
281                    offset: Some(offset),
282                    tags: Some(vec!["document-chunk".to_string()]),
283                    memory_type: None,
284                    sort_by: None,
285                    sort_order: None,
286                    scope: None,
287                    workspace: None,
288                    workspaces: None,
289                    tier: None,
290                    metadata_filter: Some(filter),
291                    filter: None,
292                    include_archived: false,
293                };
294
295                let results = list_memories(conn, &options)?;
296                for memory in &results {
297                    if let Some(hash) = memory.metadata.get("chunk_hash").and_then(|v| v.as_str()) {
298                        hashes.insert(hash.to_string());
299                    }
300                }
301
302                if results.len() < PAGE_SIZE as usize {
303                    break;
304                }
305
306                offset += PAGE_SIZE;
307            }
308
309            Ok(hashes)
310        })
311    }
312
313    /// Create a memory entry for a chunk
314    fn create_chunk_memory(&self, chunk: &DocumentChunk, extra_tags: &[String]) -> Result<()> {
315        let mut tags = vec!["document-chunk".to_string()];
316        tags.extend(extra_tags.iter().cloned());
317
318        let mut metadata = HashMap::new();
319        metadata.insert(
320            "source_file".to_string(),
321            serde_json::Value::String(
322                Path::new(&chunk.source_path)
323                    .file_name()
324                    .map(|n| n.to_string_lossy().to_string())
325                    .unwrap_or_default(),
326            ),
327        );
328        metadata.insert(
329            "source_path".to_string(),
330            serde_json::Value::String(chunk.source_path.clone()),
331        );
332        metadata.insert(
333            "doc_id".to_string(),
334            serde_json::Value::String(chunk.doc_id.clone()),
335        );
336        metadata.insert(
337            "chunk_index".to_string(),
338            serde_json::Value::Number(chunk.chunk_index.into()),
339        );
340        metadata.insert(
341            "section_path".to_string(),
342            serde_json::Value::String(chunk.section_path.clone()),
343        );
344        metadata.insert(
345            "chunk_hash".to_string(),
346            serde_json::Value::String(chunk.chunk_hash.clone()),
347        );
348
349        if let Some(page) = chunk.page {
350            metadata.insert("page".to_string(), serde_json::Value::Number(page.into()));
351        }
352
353        let input = CreateMemoryInput {
354            content: chunk.content.clone(),
355            memory_type: MemoryType::Context,
356            tags,
357            metadata,
358            importance: Some(0.5),
359            scope: crate::types::MemoryScope::Global,
360            workspace: None,
361            tier: crate::types::MemoryTier::Permanent,
362            defer_embedding: false,
363            ttl_seconds: None,
364            dedup_mode: Default::default(),
365            dedup_threshold: None,
366            event_time: None,
367            event_duration_seconds: None,
368            trigger_pattern: None,
369            summary_of_id: None,
370            media_url: None,
371        };
372
373        self.storage.with_connection(|conn| {
374            create_memory(conn, &input)?;
375            Ok(())
376        })
377    }
378}
379
380/// Compute SHA-256 hash of content
381fn compute_hash(content: &[u8]) -> String {
382    let mut hasher = Sha256::new();
383    hasher.update(content);
384    format!("sha256:{}", hex::encode(hasher.finalize()))
385}
386
387/// Extract sections from Markdown content
388fn extract_markdown_sections(content: &str) -> Vec<DocumentSection> {
389    let parser = Parser::new(content);
390    let mut sections = Vec::new();
391    let mut heading_stack: Vec<(usize, String)> = Vec::new();
392    let mut current_content = String::new();
393    let mut current_section_path = String::new();
394    let mut in_heading = false;
395    let mut current_heading_text = String::new();
396    let mut current_heading_level = 0usize;
397
398    for event in parser {
399        match event {
400            Event::Start(Tag::Heading { level, .. }) => {
401                // Save previous section if it has content
402                if !current_content.trim().is_empty() {
403                    sections.push(DocumentSection {
404                        section_path: if current_section_path.is_empty() {
405                            "Preamble".to_string()
406                        } else {
407                            current_section_path.clone()
408                        },
409                        content: current_content.trim().to_string(),
410                        page: None,
411                        level: if heading_stack.is_empty() {
412                            None
413                        } else {
414                            Some(heading_stack.last().map(|(l, _)| *l).unwrap_or(1))
415                        },
416                    });
417                    current_content.clear();
418                }
419
420                in_heading = true;
421                current_heading_text.clear();
422                current_heading_level = heading_level_to_usize(level);
423            }
424            Event::End(TagEnd::Heading(_)) => {
425                in_heading = false;
426
427                // Update heading stack
428                while !heading_stack.is_empty()
429                    && heading_stack.last().map(|(l, _)| *l).unwrap_or(0) >= current_heading_level
430                {
431                    heading_stack.pop();
432                }
433                heading_stack.push((current_heading_level, current_heading_text.clone()));
434
435                // Build section path
436                current_section_path = heading_stack
437                    .iter()
438                    .map(|(_, t)| t.as_str())
439                    .collect::<Vec<_>>()
440                    .join(" > ");
441            }
442            Event::Text(text) => {
443                if in_heading {
444                    current_heading_text.push_str(&text);
445                } else {
446                    current_content.push_str(&text);
447                }
448            }
449            Event::Code(code) => {
450                if in_heading {
451                    current_heading_text.push_str(&code);
452                } else {
453                    current_content.push('`');
454                    current_content.push_str(&code);
455                    current_content.push('`');
456                }
457            }
458            Event::SoftBreak | Event::HardBreak => {
459                if !in_heading {
460                    current_content.push('\n');
461                }
462            }
463            _ => {}
464        }
465    }
466
467    // Save final section
468    if !current_content.trim().is_empty() {
469        sections.push(DocumentSection {
470            section_path: if current_section_path.is_empty() {
471                "Preamble".to_string()
472            } else {
473                current_section_path
474            },
475            content: current_content.trim().to_string(),
476            page: None,
477            level: heading_stack.last().map(|(l, _)| *l),
478        });
479    }
480
481    sections
482}
483
484/// Convert pulldown_cmark HeadingLevel to usize
485fn heading_level_to_usize(level: HeadingLevel) -> usize {
486    match level {
487        HeadingLevel::H1 => 1,
488        HeadingLevel::H2 => 2,
489        HeadingLevel::H3 => 3,
490        HeadingLevel::H4 => 4,
491        HeadingLevel::H5 => 5,
492        HeadingLevel::H6 => 6,
493    }
494}
495
496/// Extract sections from PDF content
497///
498/// Requires the `pdf` feature to be enabled.
499#[cfg(feature = "pdf")]
500fn extract_pdf_sections(content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
501    let text = pdf_extract::extract_text_from_mem(content)
502        .map_err(|e| format!("PDF extraction failed: {}", e))?;
503
504    // Split by page markers if present, otherwise treat as single page
505    // pdf-extract may include form feed characters or page break markers
506    let pages: Vec<&str> = if text.contains('\x0C') {
507        text.split('\x0C').collect()
508    } else {
509        vec![&text]
510    };
511
512    let sections: Vec<DocumentSection> = pages
513        .iter()
514        .enumerate()
515        .filter(|(_, page_text)| !page_text.trim().is_empty())
516        .map(|(i, page_text)| DocumentSection {
517            section_path: format!("Page {}", i + 1),
518            content: page_text.trim().to_string(),
519            page: Some(i + 1),
520            level: None,
521        })
522        .collect();
523
524    Ok(sections)
525}
526
527/// Stub for PDF extraction when the `pdf` feature is disabled
528#[cfg(not(feature = "pdf"))]
529fn extract_pdf_sections(_content: &[u8]) -> std::result::Result<Vec<DocumentSection>, String> {
530    Err("PDF extraction requires the 'pdf' feature to be enabled".to_string())
531}
532
533/// Create chunks from sections with overlap
534fn create_chunks(
535    sections: Vec<DocumentSection>,
536    source_path: &str,
537    doc_id: &str,
538    config: &IngestConfig,
539) -> Vec<DocumentChunk> {
540    let mut chunks = Vec::new();
541    let mut chunk_index = 0;
542
543    for section in sections {
544        let section_chunks = chunk_text(&section.content, config.chunk_size, config.overlap);
545
546        for chunk_content in section_chunks {
547            let chunk_hash = compute_hash(chunk_content.as_bytes());
548
549            chunks.push(DocumentChunk {
550                content: chunk_content,
551                source_path: source_path.to_string(),
552                doc_id: doc_id.to_string(),
553                chunk_index,
554                section_path: section.section_path.clone(),
555                page: section.page,
556                chunk_hash,
557            });
558
559            chunk_index += 1;
560        }
561    }
562
563    chunks
564}
565
566/// Chunk text with overlap
567fn chunk_text(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
568    if text.is_empty() {
569        return vec![];
570    }
571
572    // If text is smaller than chunk size, return as single chunk
573    if text.chars().count() <= chunk_size {
574        return vec![text.to_string()];
575    }
576
577    let mut chunks = Vec::new();
578    let chars: Vec<char> = text.chars().collect();
579    let mut start = 0;
580
581    while start < chars.len() {
582        let end = (start + chunk_size).min(chars.len());
583        let chunk: String = chars[start..end].iter().collect();
584
585        // Try to break at word boundary
586        let chunk = if end < chars.len() {
587            if let Some(last_space) = chunk.rfind(|c: char| c.is_whitespace()) {
588                if last_space > chunk_size / 2 {
589                    // Only break at word boundary if it's in the second half
590                    chunk[..last_space].to_string()
591                } else {
592                    chunk
593                }
594            } else {
595                chunk
596            }
597        } else {
598            chunk
599        };
600
601        let chunk_char_count = chunk.chars().count();
602        chunks.push(chunk);
603
604        // Move start with overlap
605        if start + chunk_char_count >= chars.len() {
606            break;
607        }
608
609        let step = chunk_char_count.saturating_sub(overlap);
610        start += if step == 0 { chunk_char_count } else { step };
611    }
612
613    chunks
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use std::fs;
620    use tempfile::tempdir;
621
622    #[test]
623    fn test_chunk_text_small() {
624        let text = "Hello world";
625        let chunks = chunk_text(text, 1200, 200);
626        assert_eq!(chunks.len(), 1);
627        assert_eq!(chunks[0], "Hello world");
628    }
629
630    #[test]
631    fn test_chunk_text_with_overlap() {
632        let text = "A".repeat(2500);
633        let chunks = chunk_text(&text, 1200, 200);
634        assert!(chunks.len() >= 2);
635        // First chunk should be 1200 chars
636        assert!(chunks[0].len() <= 1200);
637    }
638
639    #[test]
640    fn test_markdown_sections() {
641        let md = r#"# Title
642
643Introduction text.
644
645## Section 1
646
647Content for section 1.
648
649### Subsection 1.1
650
651Nested content.
652
653## Section 2
654
655Content for section 2.
656"#;
657        let sections = extract_markdown_sections(md);
658        assert!(sections.len() >= 3);
659
660        // Check preamble/title section
661        let title_section = sections.iter().find(|s| s.section_path == "Title");
662        assert!(title_section.is_some());
663
664        // Check nested section
665        let nested = sections
666            .iter()
667            .find(|s| s.section_path.contains("Subsection"));
668        assert!(nested.is_some());
669    }
670
671    #[test]
672    fn test_compute_hash() {
673        let hash = compute_hash(b"test content");
674        assert!(hash.starts_with("sha256:"));
675        assert_eq!(hash.len(), 7 + 64); // "sha256:" + 64 hex chars
676    }
677
678    #[test]
679    fn test_document_format_detection() {
680        assert_eq!(
681            DocumentFormat::from_path(Path::new("doc.md")),
682            Some(DocumentFormat::Markdown)
683        );
684        assert_eq!(
685            DocumentFormat::from_path(Path::new("doc.pdf")),
686            Some(DocumentFormat::Pdf)
687        );
688        assert_eq!(DocumentFormat::from_path(Path::new("doc.txt")), None);
689    }
690
691    #[test]
692    fn test_ingest_config_default() {
693        let config = IngestConfig::default();
694        assert_eq!(config.chunk_size, DEFAULT_CHUNK_SIZE);
695        assert_eq!(config.overlap, DEFAULT_OVERLAP);
696        assert_eq!(config.max_file_size, DEFAULT_MAX_FILE_SIZE);
697    }
698
699    #[test]
700    fn test_ingest_idempotent() {
701        let dir = tempdir().unwrap();
702        let file_path = dir.path().join("doc.md");
703        fs::write(&file_path, "# Title\n\nHello world.\n").unwrap();
704
705        let storage = Storage::open_in_memory().unwrap();
706        let ingestor = DocumentIngestor::new(&storage);
707
708        let first = ingestor
709            .ingest_file(&file_path, IngestConfig::default())
710            .unwrap();
711        assert!(first.chunks_created > 0);
712        assert_eq!(first.chunks_skipped, 0);
713
714        let second = ingestor
715            .ingest_file(&file_path, IngestConfig::default())
716            .unwrap();
717        assert_eq!(second.chunks_created, 0);
718        assert_eq!(second.chunks_skipped, first.chunks_total);
719    }
720
721    #[test]
722    fn test_invalid_chunk_size() {
723        let dir = tempdir().unwrap();
724        let file_path = dir.path().join("doc.md");
725        fs::write(&file_path, "Hello").unwrap();
726
727        let storage = Storage::open_in_memory().unwrap();
728        let ingestor = DocumentIngestor::new(&storage);
729
730        let config = IngestConfig {
731            chunk_size: 0,
732            ..Default::default()
733        };
734
735        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
736        assert!(err.to_string().contains("chunk_size"));
737    }
738
739    #[test]
740    fn test_invalid_overlap() {
741        let dir = tempdir().unwrap();
742        let file_path = dir.path().join("doc.md");
743        fs::write(&file_path, "Hello").unwrap();
744
745        let storage = Storage::open_in_memory().unwrap();
746        let ingestor = DocumentIngestor::new(&storage);
747
748        let config = IngestConfig {
749            chunk_size: 200,
750            overlap: 200,
751            ..Default::default()
752        };
753
754        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
755        assert!(err.to_string().contains("overlap"));
756    }
757
758    #[test]
759    fn test_pdf_empty_is_error() {
760        let dir = tempdir().unwrap();
761        let file_path = dir.path().join("empty.pdf");
762        fs::write(&file_path, b"").unwrap();
763
764        let storage = Storage::open_in_memory().unwrap();
765        let ingestor = DocumentIngestor::new(&storage);
766
767        let config = IngestConfig {
768            format: Some(DocumentFormat::Pdf),
769            ..Default::default()
770        };
771
772        let err = ingestor.ingest_file(&file_path, config).unwrap_err();
773        assert!(err.to_string().contains("PDF"));
774    }
775}