synapse_core/ingest/
mod.rs1pub mod extractor;
2pub mod ontology;
3pub mod processor;
4use crate::store::{IngestTriple, SynapseStore};
5use anyhow::Result;
6use std::path::Path;
7
8pub struct IngestionEngine {
9 store: std::sync::Arc<SynapseStore>,
10}
11
12impl IngestionEngine {
13 pub fn new(store: std::sync::Arc<SynapseStore>) -> Self {
14 Self { store }
15 }
16
17 pub async fn ingest_file(&self, path: &Path, namespace: &str) -> Result<u32> {
18 let extension = path
19 .extension()
20 .and_then(|e| e.to_str())
21 .unwrap_or("")
22 .to_lowercase();
23
24 match extension.as_str() {
25 "md" | "markdown" => self.ingest_markdown(path, namespace).await,
26 "csv" => self.ingest_csv(path, namespace).await,
27 "owl" | "ttl" | "rdf" | "xml" => {
28 let count = ontology::OntologyLoader::load_file(&self.store, path).await?;
29 Ok(count as u32)
30 }
31 _ => Err(anyhow::anyhow!("Unsupported file type: {}", extension)),
32 }
33 }
34
35 async fn ingest_markdown(&self, path: &Path, namespace: &str) -> Result<u32> {
36 let content = std::fs::read_to_string(path)?;
37 let triples = extractor::extract_metadata(&content, path.to_str().unwrap());
38
39 let ingest_triples: Vec<IngestTriple> = triples
40 .into_iter()
41 .map(|t| IngestTriple {
42 subject: t.subject,
43 predicate: t.predicate,
44 object: t.object,
45 provenance: Some(crate::store::Provenance {
46 source: path.to_string_lossy().to_string(),
47 timestamp: chrono::Utc::now().to_rfc3339(),
48 method: "markdown_extractor".to_string(),
49 }),
50 })
51 .collect();
52
53 let (added, _) = self.store.ingest_triples(ingest_triples).await?;
54
55 if let Some(ref vs) = self.store.vector_store {
57 let processor = super::processor::TextProcessor::new();
58 let chunks = processor.chunk_text(&content, 1000, 150);
59 for (i, chunk) in chunks.iter().enumerate() {
60 let chunk_uri = format!("{}#chunk-{}", path.to_string_lossy(), i);
61 let metadata = serde_json::json!({
62 "uri": path.to_string_lossy(),
63 "chunk_uri": chunk_uri,
64 "type": "markdown_chunk",
65 "namespace": namespace
66 });
67 if let Err(e) = vs.add(&chunk_uri, chunk, metadata).await {
68 eprintln!("Failed to index chunk {}: {}", i, e);
69 }
70 }
71 }
72
73 Ok(added)
74 }
75
76 async fn ingest_csv(&self, path: &Path, _namespace: &str) -> Result<u32> {
77 let mut reader = csv::Reader::from_path(path)?;
78 let headers = reader.headers()?.clone();
79
80 let mut triples = Vec::new();
81 let filename = path.file_name().unwrap().to_string_lossy();
82
83 for result in reader.records() {
84 let record = result?;
85 if let Some(subject) = record.get(0) {
87 let subject_uri = format!("urn:csv:{}:{}", filename, subject); for (j, field) in record.iter().enumerate().skip(1) {
90 if let Some(header) = headers.get(j) {
91 if !field.is_empty() {
92 triples.push(IngestTriple {
93 subject: subject_uri.clone(),
94 predicate: format!("urn:csv:prop:{}", header),
95 object: field.to_string(),
96 provenance: Some(crate::store::Provenance {
97 source: path.to_string_lossy().to_string(),
98 timestamp: chrono::Utc::now().to_rfc3339(),
99 method: "csv_extractor".to_string(),
100 }),
101 });
102 }
103 }
104 }
105 }
106 }
107
108 let (added, _) = self.store.ingest_triples(triples).await?;
109 Ok(added)
110 }
111}