Skip to main content

synapse_core/ingest/
mod.rs

1pub mod extractor;
2pub mod ontology;
3pub mod processor;
4use crate::store::{IngestTriple, SynapseStore};
5use anyhow::Result;
6use std::path::Path;
7
8pub struct IngestionEngine {
9    store: std::sync::Arc<SynapseStore>,
10}
11
12impl IngestionEngine {
13    pub fn new(store: std::sync::Arc<SynapseStore>) -> Self {
14        Self { store }
15    }
16
17    pub async fn ingest_file(&self, path: &Path, namespace: &str) -> Result<u32> {
18        let extension = path
19            .extension()
20            .and_then(|e| e.to_str())
21            .unwrap_or("")
22            .to_lowercase();
23
24        match extension.as_str() {
25            "md" | "markdown" => self.ingest_markdown(path, namespace).await,
26            "csv" => self.ingest_csv(path, namespace).await,
27            "owl" | "ttl" | "rdf" | "xml" => {
28                let count = ontology::OntologyLoader::load_file(&self.store, path).await?;
29                Ok(count as u32)
30            }
31            _ => Err(anyhow::anyhow!("Unsupported file type: {}", extension)),
32        }
33    }
34
35    async fn ingest_markdown(&self, path: &Path, namespace: &str) -> Result<u32> {
36        let content = std::fs::read_to_string(path)?;
37        let triples = extractor::extract_metadata(&content, path.to_str().unwrap());
38
39        let ingest_triples: Vec<IngestTriple> = triples
40            .into_iter()
41            .map(|t| IngestTriple {
42                subject: t.subject,
43                predicate: t.predicate,
44                object: t.object,
45                provenance: Some(crate::store::Provenance {
46                    source: path.to_string_lossy().to_string(),
47                    timestamp: chrono::Utc::now().to_rfc3339(),
48                    method: "markdown_extractor".to_string(),
49                }),
50            })
51            .collect();
52
53        let (added, _) = self.store.ingest_triples(ingest_triples).await?;
54
55        // Also ingest content into vector store for RAG
56        if let Some(ref vs) = self.store.vector_store {
57            let processor = super::processor::TextProcessor::new();
58            let chunks = processor.chunk_text(&content, 1000, 150);
59            for (i, chunk) in chunks.iter().enumerate() {
60                let chunk_uri = format!("{}#chunk-{}", path.to_string_lossy(), i);
61                let metadata = serde_json::json!({
62                    "uri": path.to_string_lossy(),
63                    "chunk_uri": chunk_uri,
64                    "type": "markdown_chunk",
65                    "namespace": namespace
66                });
67                if let Err(e) = vs.add(&chunk_uri, chunk, metadata).await {
68                    eprintln!("Failed to index chunk {}: {}", i, e);
69                }
70            }
71        }
72
73        Ok(added)
74    }
75
76    async fn ingest_csv(&self, path: &Path, _namespace: &str) -> Result<u32> {
77        let mut reader = csv::Reader::from_path(path)?;
78        let headers = reader.headers()?.clone();
79
80        let mut triples = Vec::new();
81        let filename = path.file_name().unwrap().to_string_lossy();
82
83        for result in reader.records() {
84            let record = result?;
85            // Assume first column is ID/Subject
86            if let Some(subject) = record.get(0) {
87                let subject_uri = format!("urn:csv:{}:{}", filename, subject); // basic namespacing
88
89                for (j, field) in record.iter().enumerate().skip(1) {
90                    if let Some(header) = headers.get(j) {
91                        if !field.is_empty() {
92                            triples.push(IngestTriple {
93                                subject: subject_uri.clone(),
94                                predicate: format!("urn:csv:prop:{}", header),
95                                object: field.to_string(),
96                                provenance: Some(crate::store::Provenance {
97                                    source: path.to_string_lossy().to_string(),
98                                    timestamp: chrono::Utc::now().to_rfc3339(),
99                                    method: "csv_extractor".to_string(),
100                                }),
101                            });
102                        }
103                    }
104                }
105            }
106        }
107
108        let (added, _) = self.store.ingest_triples(triples).await?;
109        Ok(added)
110    }
111}