1use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
24use khive_storage::{EdgeRelation, EntityFilter};
25
26use crate::error::{RuntimeError, RuntimeResult};
27use crate::runtime::KhiveRuntime;
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct KgArchive {
37 pub format: String,
38 pub version: String,
39 pub namespace: String,
40 pub exported_at: DateTime<Utc>,
41 pub entities: Vec<ExportedEntity>,
42 pub edges: Vec<ExportedEdge>,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ExportedEntity {
48 pub id: Uuid,
49 pub kind: String,
51 pub name: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub description: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub properties: Option<serde_json::Value>,
56 #[serde(default)]
57 pub tags: Vec<String>,
58 pub created_at: DateTime<Utc>,
59 pub updated_at: DateTime<Utc>,
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct ExportedEdge {
65 pub source: Uuid,
66 pub target: Uuid,
67 pub relation: EdgeRelation,
69 pub weight: f64,
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct ImportSummary {
75 pub entities_imported: usize,
76 pub edges_imported: usize,
77}
78
79impl KhiveRuntime {
82 pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
88 let ns = self.ns(namespace).to_string();
89
90 let entity_page = self
92 .entities(Some(&ns))?
93 .query_entities(
94 &ns,
95 EntityFilter::default(),
96 PageRequest {
97 offset: 0,
98 limit: u32::MAX,
99 },
100 )
101 .await?;
102
103 let entities: Vec<ExportedEntity> = entity_page
104 .items
105 .into_iter()
106 .map(|e| {
107 let created_at =
108 DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
109 let updated_at =
110 DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
111 ExportedEntity {
112 id: e.id,
113 kind: e.kind.to_string(),
114 name: e.name,
115 description: e.description,
116 properties: e.properties,
117 tags: e.tags,
118 created_at,
119 updated_at,
120 }
121 })
122 .collect();
123
124 let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
126 let edges = if source_ids.is_empty() {
127 Vec::new()
128 } else {
129 let filter = EdgeFilter {
130 source_ids: source_ids.clone(),
131 ..Default::default()
132 };
133 let edge_page = self
134 .graph(Some(&ns))?
135 .query_edges(
136 filter,
137 Vec::new(),
138 PageRequest {
139 offset: 0,
140 limit: u32::MAX,
141 },
142 )
143 .await?;
144
145 let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
146 edge_page
147 .items
148 .into_iter()
149 .filter(|e| id_set.contains(&e.source_id))
150 .map(|e| ExportedEdge {
151 source: e.source_id,
152 target: e.target_id,
153 relation: e.relation,
154 weight: e.weight,
155 })
156 .collect()
157 };
158
159 Ok(KgArchive {
160 format: "khive-kg".to_string(),
161 version: "0.1".to_string(),
162 namespace: ns,
163 exported_at: Utc::now(),
164 entities,
165 edges,
166 })
167 }
168
169 pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
171 let archive = self.export_kg(namespace).await?;
172 serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
173 }
174
175 pub async fn import_kg(
184 &self,
185 archive: &KgArchive,
186 target_namespace: Option<&str>,
187 ) -> RuntimeResult<ImportSummary> {
188 if archive.format != "khive-kg" {
190 return Err(RuntimeError::InvalidInput(format!(
191 "unsupported archive format {:?}; expected \"khive-kg\"",
192 archive.format
193 )));
194 }
195 if archive.version != "0.1" {
196 return Err(RuntimeError::InvalidInput(format!(
197 "unsupported archive version {:?}; supported: \"0.1\"",
198 archive.version
199 )));
200 }
201
202 let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
203
204 let store = self.entities(Some(&ns))?;
206 let mut entities_imported = 0usize;
207 for ee in &archive.entities {
208 let created_micros = ee.created_at.timestamp_micros();
209 let updated_micros = ee.updated_at.timestamp_micros();
210 let entity = khive_storage::entity::Entity {
211 id: ee.id,
212 namespace: ns.clone(),
213 kind: ee.kind.clone(),
214 name: ee.name.clone(),
215 description: ee.description.clone(),
216 properties: ee.properties.clone(),
217 tags: ee.tags.clone(),
218 created_at: created_micros,
219 updated_at: updated_micros,
220 deleted_at: None,
221 };
222 store.upsert_entity(entity.clone()).await?;
223 self.reindex_entity(Some(&ns), &entity).await?;
226 entities_imported += 1;
227 }
228
229 let graph = self.graph(Some(&ns))?;
231 let mut edges_imported = 0usize;
232 for ee in &archive.edges {
233 let edge = khive_storage::types::Edge {
234 id: LinkId::from(Uuid::new_v4()),
235 source_id: ee.source,
236 target_id: ee.target,
237 relation: ee.relation,
238 weight: ee.weight,
239 created_at: Utc::now(),
240 metadata: None,
241 };
242 graph.upsert_edge(edge).await?;
243 edges_imported += 1;
244 }
245
246 Ok(ImportSummary {
247 entities_imported,
248 edges_imported,
249 })
250 }
251
252 pub async fn import_kg_json(
254 &self,
255 json: &str,
256 target_namespace: Option<&str>,
257 ) -> RuntimeResult<ImportSummary> {
258 let archive: KgArchive =
259 serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
260 self.import_kg(&archive, target_namespace).await
261 }
262}
263
264#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::runtime::KhiveRuntime;
270 use khive_storage::EdgeRelation;
271
272 async fn make_rt() -> KhiveRuntime {
273 KhiveRuntime::memory().expect("in-memory runtime")
274 }
275
276 #[tokio::test]
278 async fn roundtrip_entities_and_edges() {
279 let src = make_rt().await;
280 let e1 = src
281 .create_entity(
282 None,
283 "concept",
284 "FlashAttention",
285 Some("fast attention"),
286 None,
287 vec![],
288 )
289 .await
290 .unwrap();
291 let e2 = src
292 .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
293 .await
294 .unwrap();
295 let e3 = src
296 .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
297 .await
298 .unwrap();
299 src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
300 .await
301 .unwrap();
302 src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
303 .await
304 .unwrap();
305
306 let archive = src.export_kg(None).await.unwrap();
307 assert_eq!(archive.entities.len(), 3);
308 assert_eq!(archive.edges.len(), 2);
309 assert_eq!(archive.format, "khive-kg");
310 assert_eq!(archive.version, "0.1");
311
312 let dst = make_rt().await;
313 let summary = dst.import_kg(&archive, None).await.unwrap();
314 assert_eq!(summary.entities_imported, 3);
315 assert_eq!(summary.edges_imported, 2);
316
317 let got = dst.get_entity(None, e1.id).await.unwrap();
319 assert!(got.is_some());
320 let got = got.unwrap();
321 assert_eq!(got.name, "FlashAttention");
322 assert_eq!(got.description.as_deref(), Some("fast attention"));
323 }
324
325 #[tokio::test]
327 async fn json_roundtrip() {
328 let src = make_rt().await;
329 let e1 = src
330 .create_entity(
331 None,
332 "concept",
333 "LoRA",
334 Some("low-rank adaptation"),
335 Some(serde_json::json!({"year": "2021"})),
336 vec!["fine-tuning".into()],
337 )
338 .await
339 .unwrap();
340 let e2 = src
341 .create_entity(None, "concept", "QLoRA", None, None, vec![])
342 .await
343 .unwrap();
344 src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
345 .await
346 .unwrap();
347
348 let json_str = src.export_kg_json(None).await.unwrap();
349 assert!(json_str.contains("khive-kg"));
350
351 let dst = make_rt().await;
352 let summary = dst.import_kg_json(&json_str, None).await.unwrap();
353 assert_eq!(summary.entities_imported, 2);
354 assert_eq!(summary.edges_imported, 1);
355
356 let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
357 assert_eq!(got.tags, vec!["fine-tuning"]);
358 }
359
360 #[tokio::test]
367 async fn namespace_targeting() {
368 let src = make_rt().await;
369 src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
370 .await
371 .unwrap();
372
373 let archive = src.export_kg(Some("a")).await.unwrap();
374 assert_eq!(archive.namespace, "a");
375
376 let dst = make_rt().await;
378 let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
379 assert_eq!(summary.entities_imported, 1);
380
381 let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
383 assert_eq!(in_b.len(), 1);
384 assert_eq!(in_b[0].name, "Sinkhorn");
385
386 let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
388 assert_eq!(in_a.len(), 1);
389
390 let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
392 assert_eq!(dst_a.len(), 0);
393 }
394
395 #[tokio::test]
397 async fn format_validation_rejects_wrong_format() {
398 let rt = make_rt().await;
399 let bad = KgArchive {
400 format: "wrong".to_string(),
401 version: "0.1".to_string(),
402 namespace: "local".to_string(),
403 exported_at: Utc::now(),
404 entities: vec![],
405 edges: vec![],
406 };
407 let err = rt.import_kg(&bad, None).await.unwrap_err();
408 assert!(matches!(err, RuntimeError::InvalidInput(_)));
409 }
410
411 #[test]
413 fn invalid_relation_rejected_at_deserialize() {
414 let json = r#"{
415 "format":"khive-kg","version":"0.1","namespace":"local",
416 "exported_at":"2026-01-01T00:00:00Z",
417 "entities":[],
418 "edges":[{"source":"00000000-0000-0000-0000-000000000001",
419 "target":"00000000-0000-0000-0000-000000000002",
420 "relation":"related_to","weight":0.5}]
421 }"#;
422 let result: Result<KgArchive, _> = serde_json::from_str(json);
423 assert!(
424 result.is_err(),
425 "non-canonical relation should fail to deserialize"
426 );
427 }
428}