use std::collections::HashSet;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
use khive_storage::{EdgeRelation, EntityFilter};
use crate::error::{RuntimeError, RuntimeResult};
use crate::runtime::KhiveRuntime;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KgArchive {
pub format: String,
pub version: String,
pub namespace: String,
pub exported_at: DateTime<Utc>,
pub entities: Vec<ExportedEntity>,
pub edges: Vec<ExportedEdge>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExportedEntity {
pub id: Uuid,
pub kind: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<serde_json::Value>,
#[serde(default)]
pub tags: Vec<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExportedEdge {
pub source: Uuid,
pub target: Uuid,
pub relation: EdgeRelation,
pub weight: f64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ImportSummary {
pub entities_imported: usize,
pub edges_imported: usize,
pub edges_skipped: usize,
}
impl KhiveRuntime {
pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
let ns = self.ns(namespace).to_string();
let entity_page = self
.entities(Some(&ns))?
.query_entities(
&ns,
EntityFilter::default(),
PageRequest {
offset: 0,
limit: u32::MAX,
},
)
.await?;
let entities: Vec<ExportedEntity> = entity_page
.items
.into_iter()
.map(|e| {
let created_at =
DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
let updated_at =
DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
ExportedEntity {
id: e.id,
kind: e.kind.to_string(),
name: e.name,
description: e.description,
properties: e.properties,
tags: e.tags,
created_at,
updated_at,
}
})
.collect();
let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
let edges = if source_ids.is_empty() {
Vec::new()
} else {
let filter = EdgeFilter {
source_ids: source_ids.clone(),
..Default::default()
};
let edge_page = self
.graph(Some(&ns))?
.query_edges(
filter,
Vec::new(),
PageRequest {
offset: 0,
limit: u32::MAX,
},
)
.await?;
let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
edge_page
.items
.into_iter()
.filter(|e| id_set.contains(&e.source_id))
.map(|e| ExportedEdge {
source: e.source_id,
target: e.target_id,
relation: e.relation,
weight: e.weight,
})
.collect()
};
Ok(KgArchive {
format: "khive-kg".to_string(),
version: "0.1".to_string(),
namespace: ns,
exported_at: Utc::now(),
entities,
edges,
})
}
pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
let archive = self.export_kg(namespace).await?;
serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
}
pub async fn import_kg(
&self,
archive: &KgArchive,
target_namespace: Option<&str>,
) -> RuntimeResult<ImportSummary> {
if archive.format != "khive-kg" {
return Err(RuntimeError::InvalidInput(format!(
"unsupported archive format {:?}; expected \"khive-kg\"",
archive.format
)));
}
if archive.version != "0.1" {
return Err(RuntimeError::InvalidInput(format!(
"unsupported archive version {:?}; supported: \"0.1\"",
archive.version
)));
}
let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
let store = self.entities(Some(&ns))?;
let mut entities_imported = 0usize;
for ee in &archive.entities {
let created_micros = ee.created_at.timestamp_micros();
let updated_micros = ee.updated_at.timestamp_micros();
let entity = khive_storage::entity::Entity {
id: ee.id,
namespace: ns.clone(),
kind: ee.kind.clone(),
name: ee.name.clone(),
description: ee.description.clone(),
properties: ee.properties.clone(),
tags: ee.tags.clone(),
created_at: created_micros,
updated_at: updated_micros,
deleted_at: None,
};
store.upsert_entity(entity.clone()).await?;
self.reindex_entity(Some(&ns), &entity).await?;
entities_imported += 1;
}
let graph = self.graph(Some(&ns))?;
let mut edges_imported = 0usize;
let mut edges_skipped = 0usize;
for ee in &archive.edges {
let source_ok = self.get_entity(Some(&ns), ee.source).await?.is_some();
if !source_ok {
tracing::warn!(
source = %ee.source,
target = %ee.target,
relation = ?ee.relation,
"import_kg: skipping edge — source entity not found in namespace {ns:?}"
);
edges_skipped += 1;
continue;
}
let target_ok = self.get_entity(Some(&ns), ee.target).await?.is_some();
if !target_ok {
tracing::warn!(
source = %ee.source,
target = %ee.target,
relation = ?ee.relation,
"import_kg: skipping edge — target entity not found in namespace {ns:?}"
);
edges_skipped += 1;
continue;
}
let edge = khive_storage::types::Edge {
id: LinkId::from(Uuid::new_v4()),
source_id: ee.source,
target_id: ee.target,
relation: ee.relation,
weight: ee.weight,
created_at: Utc::now(),
metadata: None,
};
graph.upsert_edge(edge).await?;
edges_imported += 1;
}
Ok(ImportSummary {
entities_imported,
edges_imported,
edges_skipped,
})
}
pub async fn import_kg_json(
&self,
json: &str,
target_namespace: Option<&str>,
) -> RuntimeResult<ImportSummary> {
let archive: KgArchive =
serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
self.import_kg(&archive, target_namespace).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::KhiveRuntime;
use khive_storage::EdgeRelation;
async fn make_rt() -> KhiveRuntime {
KhiveRuntime::memory().expect("in-memory runtime")
}
#[tokio::test]
async fn roundtrip_entities_and_edges() {
let src = make_rt().await;
let e1 = src
.create_entity(
None,
"concept",
"FlashAttention",
Some("fast attention"),
None,
vec![],
)
.await
.unwrap();
let e2 = src
.create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
.await
.unwrap();
let e3 = src
.create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
.await
.unwrap();
src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
.await
.unwrap();
src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
.await
.unwrap();
let archive = src.export_kg(None).await.unwrap();
assert_eq!(archive.entities.len(), 3);
assert_eq!(archive.edges.len(), 2);
assert_eq!(archive.format, "khive-kg");
assert_eq!(archive.version, "0.1");
let dst = make_rt().await;
let summary = dst.import_kg(&archive, None).await.unwrap();
assert_eq!(summary.entities_imported, 3);
assert_eq!(summary.edges_imported, 2);
let got = dst.get_entity(None, e1.id).await.unwrap();
assert!(got.is_some());
let got = got.unwrap();
assert_eq!(got.name, "FlashAttention");
assert_eq!(got.description.as_deref(), Some("fast attention"));
}
#[tokio::test]
async fn json_roundtrip() {
let src = make_rt().await;
let e1 = src
.create_entity(
None,
"concept",
"LoRA",
Some("low-rank adaptation"),
Some(serde_json::json!({"year": "2021"})),
vec!["fine-tuning".into()],
)
.await
.unwrap();
let e2 = src
.create_entity(None, "concept", "QLoRA", None, None, vec![])
.await
.unwrap();
src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
.await
.unwrap();
let json_str = src.export_kg_json(None).await.unwrap();
assert!(json_str.contains("khive-kg"));
let dst = make_rt().await;
let summary = dst.import_kg_json(&json_str, None).await.unwrap();
assert_eq!(summary.entities_imported, 2);
assert_eq!(summary.edges_imported, 1);
let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
assert_eq!(got.tags, vec!["fine-tuning"]);
}
#[tokio::test]
async fn namespace_targeting() {
let src = make_rt().await;
src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
.await
.unwrap();
let archive = src.export_kg(Some("a")).await.unwrap();
assert_eq!(archive.namespace, "a");
let dst = make_rt().await;
let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
assert_eq!(summary.entities_imported, 1);
let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
assert_eq!(in_b.len(), 1);
assert_eq!(in_b[0].name, "Sinkhorn");
let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
assert_eq!(in_a.len(), 1);
let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
assert_eq!(dst_a.len(), 0);
}
#[tokio::test]
async fn format_validation_rejects_wrong_format() {
let rt = make_rt().await;
let bad = KgArchive {
format: "wrong".to_string(),
version: "0.1".to_string(),
namespace: "local".to_string(),
exported_at: Utc::now(),
entities: vec![],
edges: vec![],
};
let err = rt.import_kg(&bad, None).await.unwrap_err();
assert!(matches!(err, RuntimeError::InvalidInput(_)));
}
#[test]
fn invalid_relation_rejected_at_deserialize() {
let json = r#"{
"format":"khive-kg","version":"0.1","namespace":"local",
"exported_at":"2026-01-01T00:00:00Z",
"entities":[],
"edges":[{"source":"00000000-0000-0000-0000-000000000001",
"target":"00000000-0000-0000-0000-000000000002",
"relation":"related_to","weight":0.5}]
}"#;
let result: Result<KgArchive, _> = serde_json::from_str(json);
assert!(
result.is_err(),
"non-canonical relation should fail to deserialize"
);
}
#[tokio::test]
async fn import_edge_with_dangling_source_is_skipped() {
let phantom_source = Uuid::parse_str("deadbeef-dead-4ead-dead-deadbeefcafe").unwrap();
let rt = make_rt().await;
let real = rt
.create_entity(None, "concept", "Real", None, None, vec![])
.await
.unwrap();
let archive = KgArchive {
format: "khive-kg".to_string(),
version: "0.1".to_string(),
namespace: "local".to_string(),
exported_at: Utc::now(),
entities: vec![ExportedEntity {
id: real.id,
kind: "concept".to_string(),
name: "Real".to_string(),
description: None,
properties: None,
tags: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
}],
edges: vec![ExportedEdge {
source: phantom_source,
target: real.id,
relation: EdgeRelation::Extends,
weight: 1.0,
}],
};
let dst = make_rt().await;
let summary = dst.import_kg(&archive, None).await.unwrap();
assert_eq!(summary.entities_imported, 1);
assert_eq!(
summary.edges_imported, 0,
"dangling source must not be imported"
);
assert_eq!(
summary.edges_skipped, 1,
"dangling source must be counted as skipped"
);
}
#[tokio::test]
async fn import_edge_with_dangling_target_is_skipped() {
let phantom_target = Uuid::parse_str("cafebabe-cafe-4abe-cafe-cafebabecafe").unwrap();
let rt = make_rt().await;
let real = rt
.create_entity(None, "concept", "Source", None, None, vec![])
.await
.unwrap();
let archive = KgArchive {
format: "khive-kg".to_string(),
version: "0.1".to_string(),
namespace: "local".to_string(),
exported_at: Utc::now(),
entities: vec![ExportedEntity {
id: real.id,
kind: "concept".to_string(),
name: "Source".to_string(),
description: None,
properties: None,
tags: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
}],
edges: vec![ExportedEdge {
source: real.id,
target: phantom_target,
relation: EdgeRelation::DependsOn,
weight: 0.8,
}],
};
let dst = make_rt().await;
let summary = dst.import_kg(&archive, None).await.unwrap();
assert_eq!(summary.entities_imported, 1);
assert_eq!(
summary.edges_imported, 0,
"dangling target must not be imported"
);
assert_eq!(
summary.edges_skipped, 1,
"dangling target must be counted as skipped"
);
}
#[tokio::test]
async fn import_mixed_edges_reports_correct_counts() {
let phantom = Uuid::parse_str("11111111-1111-4111-8111-111111111111").unwrap();
let src = make_rt().await;
let a = src
.create_entity(None, "concept", "A", None, None, vec![])
.await
.unwrap();
let b = src
.create_entity(None, "concept", "B", None, None, vec![])
.await
.unwrap();
let c = src
.create_entity(None, "concept", "C", None, None, vec![])
.await
.unwrap();
let archive = KgArchive {
format: "khive-kg".to_string(),
version: "0.1".to_string(),
namespace: "local".to_string(),
exported_at: Utc::now(),
entities: vec![
ExportedEntity {
id: a.id,
kind: "concept".to_string(),
name: "A".to_string(),
description: None,
properties: None,
tags: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
},
ExportedEntity {
id: b.id,
kind: "concept".to_string(),
name: "B".to_string(),
description: None,
properties: None,
tags: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
},
ExportedEntity {
id: c.id,
kind: "concept".to_string(),
name: "C".to_string(),
description: None,
properties: None,
tags: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
},
],
edges: vec![
ExportedEdge {
source: a.id,
target: b.id,
relation: EdgeRelation::Extends,
weight: 1.0,
},
ExportedEdge {
source: b.id,
target: c.id,
relation: EdgeRelation::DependsOn,
weight: 0.9,
},
ExportedEdge {
source: a.id,
target: phantom,
relation: EdgeRelation::Enables,
weight: 0.5,
},
],
};
let dst = make_rt().await;
let summary = dst.import_kg(&archive, None).await.unwrap();
assert_eq!(summary.entities_imported, 3);
assert_eq!(
summary.edges_imported, 2,
"only valid edges must be imported"
);
assert_eq!(
summary.edges_skipped, 1,
"one dangling edge must be reported"
);
}
#[tokio::test]
async fn import_all_valid_edges_reports_zero_skipped() {
let src = make_rt().await;
let e1 = src
.create_entity(None, "concept", "E1", None, None, vec![])
.await
.unwrap();
let e2 = src
.create_entity(None, "concept", "E2", None, None, vec![])
.await
.unwrap();
src.link(None, e1.id, e2.id, EdgeRelation::VariantOf, 0.7)
.await
.unwrap();
let archive = src.export_kg(None).await.unwrap();
let dst = make_rt().await;
let summary = dst.import_kg(&archive, None).await.unwrap();
assert_eq!(summary.edges_imported, 1);
assert_eq!(
summary.edges_skipped, 0,
"no edges should be skipped when all endpoints exist"
);
}
}