1use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
24use khive_storage::{EdgeRelation, EntityFilter};
25
26use crate::error::{RuntimeError, RuntimeResult};
27use crate::runtime::KhiveRuntime;
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct KgArchive {
37 pub format: String,
38 pub version: String,
39 pub namespace: String,
40 pub exported_at: DateTime<Utc>,
41 pub entities: Vec<ExportedEntity>,
42 pub edges: Vec<ExportedEdge>,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ExportedEntity {
48 pub id: Uuid,
49 pub kind: String,
51 pub name: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub description: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub properties: Option<serde_json::Value>,
56 #[serde(default)]
57 pub tags: Vec<String>,
58 pub created_at: DateTime<Utc>,
59 pub updated_at: DateTime<Utc>,
60}
61
62#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct ExportedEdge {
65 pub source: Uuid,
66 pub target: Uuid,
67 pub relation: EdgeRelation,
69 pub weight: f64,
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct ImportSummary {
75 pub entities_imported: usize,
76 pub edges_imported: usize,
77 pub edges_skipped: usize,
83}
84
85impl KhiveRuntime {
88 pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
94 let ns = self.ns(namespace).to_string();
95
96 let entity_page = self
98 .entities(Some(&ns))?
99 .query_entities(
100 &ns,
101 EntityFilter::default(),
102 PageRequest {
103 offset: 0,
104 limit: u32::MAX,
105 },
106 )
107 .await?;
108
109 let entities: Vec<ExportedEntity> = entity_page
110 .items
111 .into_iter()
112 .map(|e| {
113 let created_at =
114 DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
115 let updated_at =
116 DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
117 ExportedEntity {
118 id: e.id,
119 kind: e.kind.to_string(),
120 name: e.name,
121 description: e.description,
122 properties: e.properties,
123 tags: e.tags,
124 created_at,
125 updated_at,
126 }
127 })
128 .collect();
129
130 let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
132 let edges = if source_ids.is_empty() {
133 Vec::new()
134 } else {
135 let filter = EdgeFilter {
136 source_ids: source_ids.clone(),
137 ..Default::default()
138 };
139 let edge_page = self
140 .graph(Some(&ns))?
141 .query_edges(
142 filter,
143 Vec::new(),
144 PageRequest {
145 offset: 0,
146 limit: u32::MAX,
147 },
148 )
149 .await?;
150
151 let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
152 edge_page
153 .items
154 .into_iter()
155 .filter(|e| id_set.contains(&e.source_id))
156 .map(|e| ExportedEdge {
157 source: e.source_id,
158 target: e.target_id,
159 relation: e.relation,
160 weight: e.weight,
161 })
162 .collect()
163 };
164
165 Ok(KgArchive {
166 format: "khive-kg".to_string(),
167 version: "0.1".to_string(),
168 namespace: ns,
169 exported_at: Utc::now(),
170 entities,
171 edges,
172 })
173 }
174
175 pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
177 let archive = self.export_kg(namespace).await?;
178 serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
179 }
180
181 pub async fn import_kg(
190 &self,
191 archive: &KgArchive,
192 target_namespace: Option<&str>,
193 ) -> RuntimeResult<ImportSummary> {
194 if archive.format != "khive-kg" {
196 return Err(RuntimeError::InvalidInput(format!(
197 "unsupported archive format {:?}; expected \"khive-kg\"",
198 archive.format
199 )));
200 }
201 if archive.version != "0.1" {
202 return Err(RuntimeError::InvalidInput(format!(
203 "unsupported archive version {:?}; supported: \"0.1\"",
204 archive.version
205 )));
206 }
207
208 let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
209
210 let store = self.entities(Some(&ns))?;
212 let mut entities_imported = 0usize;
213 for ee in &archive.entities {
214 let created_micros = ee.created_at.timestamp_micros();
215 let updated_micros = ee.updated_at.timestamp_micros();
216 let entity = khive_storage::entity::Entity {
217 id: ee.id,
218 namespace: ns.clone(),
219 kind: ee.kind.clone(),
220 name: ee.name.clone(),
221 description: ee.description.clone(),
222 properties: ee.properties.clone(),
223 tags: ee.tags.clone(),
224 created_at: created_micros,
225 updated_at: updated_micros,
226 deleted_at: None,
227 };
228 store.upsert_entity(entity.clone()).await?;
229 self.reindex_entity(Some(&ns), &entity).await?;
232 entities_imported += 1;
233 }
234
235 let graph = self.graph(Some(&ns))?;
243 let mut edges_imported = 0usize;
244 let mut edges_skipped = 0usize;
245 for ee in &archive.edges {
246 let source_ok = self.get_entity(Some(&ns), ee.source).await?.is_some();
247 if !source_ok {
248 tracing::warn!(
249 source = %ee.source,
250 target = %ee.target,
251 relation = ?ee.relation,
252 "import_kg: skipping edge — source entity not found in namespace {ns:?}"
253 );
254 edges_skipped += 1;
255 continue;
256 }
257 let target_ok = self.get_entity(Some(&ns), ee.target).await?.is_some();
258 if !target_ok {
259 tracing::warn!(
260 source = %ee.source,
261 target = %ee.target,
262 relation = ?ee.relation,
263 "import_kg: skipping edge — target entity not found in namespace {ns:?}"
264 );
265 edges_skipped += 1;
266 continue;
267 }
268 let edge = khive_storage::types::Edge {
269 id: LinkId::from(Uuid::new_v4()),
270 source_id: ee.source,
271 target_id: ee.target,
272 relation: ee.relation,
273 weight: ee.weight,
274 created_at: Utc::now(),
275 metadata: None,
276 };
277 graph.upsert_edge(edge).await?;
278 edges_imported += 1;
279 }
280
281 Ok(ImportSummary {
282 entities_imported,
283 edges_imported,
284 edges_skipped,
285 })
286 }
287
288 pub async fn import_kg_json(
290 &self,
291 json: &str,
292 target_namespace: Option<&str>,
293 ) -> RuntimeResult<ImportSummary> {
294 let archive: KgArchive =
295 serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
296 self.import_kg(&archive, target_namespace).await
297 }
298}
299
300#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::runtime::KhiveRuntime;
306 use khive_storage::EdgeRelation;
307
308 async fn make_rt() -> KhiveRuntime {
309 KhiveRuntime::memory().expect("in-memory runtime")
310 }
311
312 #[tokio::test]
314 async fn roundtrip_entities_and_edges() {
315 let src = make_rt().await;
316 let e1 = src
317 .create_entity(
318 None,
319 "concept",
320 "FlashAttention",
321 Some("fast attention"),
322 None,
323 vec![],
324 )
325 .await
326 .unwrap();
327 let e2 = src
328 .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
329 .await
330 .unwrap();
331 let e3 = src
332 .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
333 .await
334 .unwrap();
335 src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
336 .await
337 .unwrap();
338 src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
339 .await
340 .unwrap();
341
342 let archive = src.export_kg(None).await.unwrap();
343 assert_eq!(archive.entities.len(), 3);
344 assert_eq!(archive.edges.len(), 2);
345 assert_eq!(archive.format, "khive-kg");
346 assert_eq!(archive.version, "0.1");
347
348 let dst = make_rt().await;
349 let summary = dst.import_kg(&archive, None).await.unwrap();
350 assert_eq!(summary.entities_imported, 3);
351 assert_eq!(summary.edges_imported, 2);
352
353 let got = dst.get_entity(None, e1.id).await.unwrap();
355 assert!(got.is_some());
356 let got = got.unwrap();
357 assert_eq!(got.name, "FlashAttention");
358 assert_eq!(got.description.as_deref(), Some("fast attention"));
359 }
360
361 #[tokio::test]
363 async fn json_roundtrip() {
364 let src = make_rt().await;
365 let e1 = src
366 .create_entity(
367 None,
368 "concept",
369 "LoRA",
370 Some("low-rank adaptation"),
371 Some(serde_json::json!({"year": "2021"})),
372 vec!["fine-tuning".into()],
373 )
374 .await
375 .unwrap();
376 let e2 = src
377 .create_entity(None, "concept", "QLoRA", None, None, vec![])
378 .await
379 .unwrap();
380 src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
381 .await
382 .unwrap();
383
384 let json_str = src.export_kg_json(None).await.unwrap();
385 assert!(json_str.contains("khive-kg"));
386
387 let dst = make_rt().await;
388 let summary = dst.import_kg_json(&json_str, None).await.unwrap();
389 assert_eq!(summary.entities_imported, 2);
390 assert_eq!(summary.edges_imported, 1);
391
392 let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
393 assert_eq!(got.tags, vec!["fine-tuning"]);
394 }
395
396 #[tokio::test]
403 async fn namespace_targeting() {
404 let src = make_rt().await;
405 src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
406 .await
407 .unwrap();
408
409 let archive = src.export_kg(Some("a")).await.unwrap();
410 assert_eq!(archive.namespace, "a");
411
412 let dst = make_rt().await;
414 let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
415 assert_eq!(summary.entities_imported, 1);
416
417 let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
419 assert_eq!(in_b.len(), 1);
420 assert_eq!(in_b[0].name, "Sinkhorn");
421
422 let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
424 assert_eq!(in_a.len(), 1);
425
426 let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
428 assert_eq!(dst_a.len(), 0);
429 }
430
431 #[tokio::test]
433 async fn format_validation_rejects_wrong_format() {
434 let rt = make_rt().await;
435 let bad = KgArchive {
436 format: "wrong".to_string(),
437 version: "0.1".to_string(),
438 namespace: "local".to_string(),
439 exported_at: Utc::now(),
440 entities: vec![],
441 edges: vec![],
442 };
443 let err = rt.import_kg(&bad, None).await.unwrap_err();
444 assert!(matches!(err, RuntimeError::InvalidInput(_)));
445 }
446
447 #[test]
449 fn invalid_relation_rejected_at_deserialize() {
450 let json = r#"{
451 "format":"khive-kg","version":"0.1","namespace":"local",
452 "exported_at":"2026-01-01T00:00:00Z",
453 "entities":[],
454 "edges":[{"source":"00000000-0000-0000-0000-000000000001",
455 "target":"00000000-0000-0000-0000-000000000002",
456 "relation":"related_to","weight":0.5}]
457 }"#;
458 let result: Result<KgArchive, _> = serde_json::from_str(json);
459 assert!(
460 result.is_err(),
461 "non-canonical relation should fail to deserialize"
462 );
463 }
464
465 #[tokio::test]
472 async fn import_edge_with_dangling_source_is_skipped() {
473 let phantom_source = Uuid::parse_str("deadbeef-dead-4ead-dead-deadbeefcafe").unwrap();
474
475 let rt = make_rt().await;
476 let real = rt
478 .create_entity(None, "concept", "Real", None, None, vec![])
479 .await
480 .unwrap();
481
482 let archive = KgArchive {
484 format: "khive-kg".to_string(),
485 version: "0.1".to_string(),
486 namespace: "local".to_string(),
487 exported_at: Utc::now(),
488 entities: vec![ExportedEntity {
489 id: real.id,
490 kind: "concept".to_string(),
491 name: "Real".to_string(),
492 description: None,
493 properties: None,
494 tags: vec![],
495 created_at: Utc::now(),
496 updated_at: Utc::now(),
497 }],
498 edges: vec![ExportedEdge {
499 source: phantom_source,
500 target: real.id,
501 relation: EdgeRelation::Extends,
502 weight: 1.0,
503 }],
504 };
505
506 let dst = make_rt().await;
507 let summary = dst.import_kg(&archive, None).await.unwrap();
508 assert_eq!(summary.entities_imported, 1);
509 assert_eq!(
510 summary.edges_imported, 0,
511 "dangling source must not be imported"
512 );
513 assert_eq!(
514 summary.edges_skipped, 1,
515 "dangling source must be counted as skipped"
516 );
517 }
518
519 #[tokio::test]
524 async fn import_edge_with_dangling_target_is_skipped() {
525 let phantom_target = Uuid::parse_str("cafebabe-cafe-4abe-cafe-cafebabecafe").unwrap();
526
527 let rt = make_rt().await;
528 let real = rt
529 .create_entity(None, "concept", "Source", None, None, vec![])
530 .await
531 .unwrap();
532
533 let archive = KgArchive {
534 format: "khive-kg".to_string(),
535 version: "0.1".to_string(),
536 namespace: "local".to_string(),
537 exported_at: Utc::now(),
538 entities: vec![ExportedEntity {
539 id: real.id,
540 kind: "concept".to_string(),
541 name: "Source".to_string(),
542 description: None,
543 properties: None,
544 tags: vec![],
545 created_at: Utc::now(),
546 updated_at: Utc::now(),
547 }],
548 edges: vec![ExportedEdge {
549 source: real.id,
550 target: phantom_target,
551 relation: EdgeRelation::DependsOn,
552 weight: 0.8,
553 }],
554 };
555
556 let dst = make_rt().await;
557 let summary = dst.import_kg(&archive, None).await.unwrap();
558 assert_eq!(summary.entities_imported, 1);
559 assert_eq!(
560 summary.edges_imported, 0,
561 "dangling target must not be imported"
562 );
563 assert_eq!(
564 summary.edges_skipped, 1,
565 "dangling target must be counted as skipped"
566 );
567 }
568
569 #[tokio::test]
574 async fn import_mixed_edges_reports_correct_counts() {
575 let phantom = Uuid::parse_str("11111111-1111-4111-8111-111111111111").unwrap();
576
577 let src = make_rt().await;
578 let a = src
579 .create_entity(None, "concept", "A", None, None, vec![])
580 .await
581 .unwrap();
582 let b = src
583 .create_entity(None, "concept", "B", None, None, vec![])
584 .await
585 .unwrap();
586 let c = src
587 .create_entity(None, "concept", "C", None, None, vec![])
588 .await
589 .unwrap();
590
591 let archive = KgArchive {
593 format: "khive-kg".to_string(),
594 version: "0.1".to_string(),
595 namespace: "local".to_string(),
596 exported_at: Utc::now(),
597 entities: vec![
598 ExportedEntity {
599 id: a.id,
600 kind: "concept".to_string(),
601 name: "A".to_string(),
602 description: None,
603 properties: None,
604 tags: vec![],
605 created_at: Utc::now(),
606 updated_at: Utc::now(),
607 },
608 ExportedEntity {
609 id: b.id,
610 kind: "concept".to_string(),
611 name: "B".to_string(),
612 description: None,
613 properties: None,
614 tags: vec![],
615 created_at: Utc::now(),
616 updated_at: Utc::now(),
617 },
618 ExportedEntity {
619 id: c.id,
620 kind: "concept".to_string(),
621 name: "C".to_string(),
622 description: None,
623 properties: None,
624 tags: vec![],
625 created_at: Utc::now(),
626 updated_at: Utc::now(),
627 },
628 ],
629 edges: vec![
630 ExportedEdge {
632 source: a.id,
633 target: b.id,
634 relation: EdgeRelation::Extends,
635 weight: 1.0,
636 },
637 ExportedEdge {
639 source: b.id,
640 target: c.id,
641 relation: EdgeRelation::DependsOn,
642 weight: 0.9,
643 },
644 ExportedEdge {
646 source: a.id,
647 target: phantom,
648 relation: EdgeRelation::Enables,
649 weight: 0.5,
650 },
651 ],
652 };
653
654 let dst = make_rt().await;
655 let summary = dst.import_kg(&archive, None).await.unwrap();
656 assert_eq!(summary.entities_imported, 3);
657 assert_eq!(
658 summary.edges_imported, 2,
659 "only valid edges must be imported"
660 );
661 assert_eq!(
662 summary.edges_skipped, 1,
663 "one dangling edge must be reported"
664 );
665 }
666
667 #[tokio::test]
669 async fn import_all_valid_edges_reports_zero_skipped() {
670 let src = make_rt().await;
671 let e1 = src
672 .create_entity(None, "concept", "E1", None, None, vec![])
673 .await
674 .unwrap();
675 let e2 = src
676 .create_entity(None, "concept", "E2", None, None, vec![])
677 .await
678 .unwrap();
679 src.link(None, e1.id, e2.id, EdgeRelation::VariantOf, 0.7)
680 .await
681 .unwrap();
682
683 let archive = src.export_kg(None).await.unwrap();
684 let dst = make_rt().await;
685 let summary = dst.import_kg(&archive, None).await.unwrap();
686 assert_eq!(summary.edges_imported, 1);
687 assert_eq!(
688 summary.edges_skipped, 0,
689 "no edges should be skipped when all endpoints exist"
690 );
691 }
692}