use std::io::{BufRead, Write};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
embedded::{EmbeddedCore, IngestEvent},
prime::types::event_types,
};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExportStats {
pub nodes: usize,
pub edges: usize,
pub vectors: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ImportStats {
pub nodes: usize,
pub edges: usize,
pub vectors: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ExportLine {
#[serde(rename = "type")]
kind: String,
entity_id: String,
data: Value,
}
pub async fn export_json(
core: &EmbeddedCore,
mut writer: impl Write,
) -> crate::error::Result<ExportStats> {
use crate::embedded::Query;
let events = core.query(Query::new().event_type_prefix("prime.")).await?;
let mut stats = ExportStats::default();
let mut exported_nodes = std::collections::HashSet::new();
let mut exported_edges = std::collections::HashSet::new();
for event in &events {
let entity_id = &event.entity_id;
match event.event_type.as_str() {
event_types::NODE_CREATED => {
if exported_nodes.insert(entity_id.clone()) {
let line = ExportLine {
kind: "node".to_string(),
entity_id: entity_id.clone(),
data: event.payload.clone(),
};
serde_json::to_writer(&mut writer, &line)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
writeln!(writer)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
stats.nodes += 1;
}
}
event_types::EDGE_CREATED => {
if exported_edges.insert(entity_id.clone()) {
let line = ExportLine {
kind: "edge".to_string(),
entity_id: entity_id.clone(),
data: event.payload.clone(),
};
serde_json::to_writer(&mut writer, &line)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
writeln!(writer)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
stats.edges += 1;
}
}
_ => {} }
}
writer
.flush()
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
Ok(stats)
}
pub async fn import_json(
core: &EmbeddedCore,
reader: impl BufRead,
) -> crate::error::Result<ImportStats> {
let mut stats = ImportStats::default();
for line_result in reader.lines() {
let line =
line_result.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
let line = line.trim();
if line.is_empty() {
continue;
}
let export_line: ExportLine = serde_json::from_str(line)
.map_err(|e| crate::error::AllSourceError::StorageError(e.to_string()))?;
let event_type = match export_line.kind.as_str() {
"node" => {
stats.nodes += 1;
event_types::NODE_CREATED
}
"edge" => {
stats.edges += 1;
event_types::EDGE_CREATED
}
"vector" => {
stats.vectors += 1;
"prime.vector.stored"
}
_ => continue,
};
core.ingest(IngestEvent {
entity_id: &export_line.entity_id,
event_type,
payload: export_line.data,
metadata: None,
tenant_id: None,
})
.await?;
}
Ok(stats)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embedded::{Config, EmbeddedCore, Query};
use std::io::Cursor;
async fn test_core() -> EmbeddedCore {
let config = Config::builder().build().unwrap();
EmbeddedCore::open(config).await.unwrap()
}
#[tokio::test]
async fn test_export_nodes_and_edges() {
let core = test_core().await;
core.ingest(IngestEvent {
entity_id: "node:person:alice",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "person", "properties": {"name": "Alice"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
core.ingest(IngestEvent {
entity_id: "node:person:bob",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "person", "properties": {"name": "Bob"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
core.ingest(IngestEvent {
entity_id: "edge:e-1",
event_type: event_types::EDGE_CREATED,
payload: serde_json::json!({"id": "e-1", "source": "alice", "target": "bob", "relation": "knows"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
let mut buf = Vec::new();
let stats = export_json(&core, &mut buf).await.unwrap();
assert_eq!(stats.nodes, 2);
assert_eq!(stats.edges, 1);
let output = String::from_utf8(buf).unwrap();
let lines: Vec<&str> = output.trim().lines().collect();
assert_eq!(lines.len(), 3);
for line in &lines {
let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
assert!(parsed.get("type").is_some());
assert!(parsed.get("entity_id").is_some());
assert!(parsed.get("data").is_some());
}
core.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_import_from_json_lines() {
let core = test_core().await;
let input = r#"{"type":"node","entity_id":"node:person:alice","data":{"node_type":"person","properties":{"name":"Alice"}}}
{"type":"edge","entity_id":"edge:e-1","data":{"id":"e-1","source":"alice","target":"bob","relation":"knows"}}
"#;
let reader = Cursor::new(input);
let stats = import_json(&core, reader).await.unwrap();
assert_eq!(stats.nodes, 1);
assert_eq!(stats.edges, 1);
let events = core
.query(Query::new().event_type_prefix("prime."))
.await
.unwrap();
assert_eq!(events.len(), 2);
core.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_export_import_roundtrip() {
let core = test_core().await;
core.ingest(IngestEvent {
entity_id: "node:person:alice",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "person", "properties": {"name": "Alice"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
core.ingest(IngestEvent {
entity_id: "node:project:prime",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "project", "properties": {"name": "Prime"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
let mut buf = Vec::new();
export_json(&core, &mut buf).await.unwrap();
let exported = String::from_utf8(buf.clone()).unwrap();
let core2 = test_core().await;
let reader = Cursor::new(exported);
let stats = import_json(&core2, reader).await.unwrap();
assert_eq!(stats.nodes, 2);
let events = core2
.query(Query::new().event_type(event_types::NODE_CREATED))
.await
.unwrap();
assert_eq!(events.len(), 2);
core.shutdown().await.unwrap();
core2.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_import_idempotent() {
let core = test_core().await;
let input = r#"{"type":"node","entity_id":"node:person:alice","data":{"node_type":"person","properties":{"name":"Alice"}}}"#;
let reader1 = Cursor::new(input);
import_json(&core, reader1).await.unwrap();
let reader2 = Cursor::new(input);
import_json(&core, reader2).await.unwrap();
let events = core
.query(Query::new().entity_id("node:person:alice"))
.await
.unwrap();
assert!(!events.is_empty());
core.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_import_empty_input() {
let core = test_core().await;
let reader = Cursor::new("");
let stats = import_json(&core, reader).await.unwrap();
assert_eq!(stats.nodes, 0);
assert_eq!(stats.edges, 0);
core.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_import_skips_blank_lines() {
let core = test_core().await;
let input = "\n\n{\"type\":\"node\",\"entity_id\":\"node:person:alice\",\"data\":{\"node_type\":\"person\",\"properties\":{\"name\":\"Alice\"}}}\n\n";
let reader = Cursor::new(input);
let stats = import_json(&core, reader).await.unwrap();
assert_eq!(stats.nodes, 1);
core.shutdown().await.unwrap();
}
}