1use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use std::str::FromStr;
24
25use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
26use khive_storage::{EdgeRelation, EntityFilter};
27use khive_types::EntityKind;
28
29use crate::error::{RuntimeError, RuntimeResult};
30use crate::runtime::KhiveRuntime;
31
32#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct KgArchive {
40 pub format: String,
41 pub version: String,
42 pub namespace: String,
43 pub exported_at: DateTime<Utc>,
44 pub entities: Vec<ExportedEntity>,
45 pub edges: Vec<ExportedEdge>,
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct ExportedEntity {
51 pub id: Uuid,
52 pub kind: String,
54 pub name: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub description: Option<String>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub properties: Option<serde_json::Value>,
59 #[serde(default)]
60 pub tags: Vec<String>,
61 pub created_at: DateTime<Utc>,
62 pub updated_at: DateTime<Utc>,
63}
64
65#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ExportedEdge {
68 pub source: Uuid,
69 pub target: Uuid,
70 pub relation: EdgeRelation,
72 pub weight: f64,
73}
74
75#[derive(Clone, Debug, Serialize, Deserialize)]
77pub struct ImportSummary {
78 pub entities_imported: usize,
79 pub edges_imported: usize,
80}
81
82impl KhiveRuntime {
85 pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
91 let ns = self.ns(namespace).to_string();
92
93 let entity_page = self
95 .entities(Some(&ns))?
96 .query_entities(
97 &ns,
98 EntityFilter::default(),
99 PageRequest {
100 offset: 0,
101 limit: u32::MAX,
102 },
103 )
104 .await?;
105
106 let entities: Vec<ExportedEntity> = entity_page
107 .items
108 .into_iter()
109 .map(|e| {
110 let created_at =
111 DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
112 let updated_at =
113 DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
114 ExportedEntity {
115 id: e.id,
116 kind: e.kind.to_string(),
117 name: e.name,
118 description: e.description,
119 properties: e.properties,
120 tags: e.tags,
121 created_at,
122 updated_at,
123 }
124 })
125 .collect();
126
127 let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
129 let edges = if source_ids.is_empty() {
130 Vec::new()
131 } else {
132 let filter = EdgeFilter {
133 source_ids: source_ids.clone(),
134 ..Default::default()
135 };
136 let edge_page = self
137 .graph(Some(&ns))?
138 .query_edges(
139 filter,
140 Vec::new(),
141 PageRequest {
142 offset: 0,
143 limit: u32::MAX,
144 },
145 )
146 .await?;
147
148 let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
149 edge_page
150 .items
151 .into_iter()
152 .filter(|e| id_set.contains(&e.source_id))
153 .map(|e| ExportedEdge {
154 source: e.source_id,
155 target: e.target_id,
156 relation: e.relation,
157 weight: e.weight,
158 })
159 .collect()
160 };
161
162 Ok(KgArchive {
163 format: "khive-kg".to_string(),
164 version: "0.1".to_string(),
165 namespace: ns,
166 exported_at: Utc::now(),
167 entities,
168 edges,
169 })
170 }
171
172 pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
174 let archive = self.export_kg(namespace).await?;
175 serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
176 }
177
178 pub async fn import_kg(
187 &self,
188 archive: &KgArchive,
189 target_namespace: Option<&str>,
190 ) -> RuntimeResult<ImportSummary> {
191 if archive.format != "khive-kg" {
193 return Err(RuntimeError::InvalidInput(format!(
194 "unsupported archive format {:?}; expected \"khive-kg\"",
195 archive.format
196 )));
197 }
198 if archive.version != "0.1" {
199 return Err(RuntimeError::InvalidInput(format!(
200 "unsupported archive version {:?}; supported: \"0.1\"",
201 archive.version
202 )));
203 }
204
205 let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
206
207 let store = self.entities(Some(&ns))?;
209 let mut entities_imported = 0usize;
210 for ee in &archive.entities {
211 let created_micros = ee.created_at.timestamp_micros();
212 let updated_micros = ee.updated_at.timestamp_micros();
213 let entity = khive_storage::entity::Entity {
214 id: ee.id,
215 namespace: ns.clone(),
216 kind: EntityKind::from_str(&ee.kind).map_err(|e| {
217 RuntimeError::InvalidInput(format!(
218 "invalid entity kind {:?} in archive: {}",
219 ee.kind, e
220 ))
221 })?,
222 name: ee.name.clone(),
223 description: ee.description.clone(),
224 properties: ee.properties.clone(),
225 tags: ee.tags.clone(),
226 created_at: created_micros,
227 updated_at: updated_micros,
228 deleted_at: None,
229 };
230 store.upsert_entity(entity.clone()).await?;
231 self.reindex_entity(Some(&ns), &entity).await?;
234 entities_imported += 1;
235 }
236
237 let graph = self.graph(Some(&ns))?;
239 let mut edges_imported = 0usize;
240 for ee in &archive.edges {
241 let edge = khive_storage::types::Edge {
242 id: LinkId::from(Uuid::new_v4()),
243 source_id: ee.source,
244 target_id: ee.target,
245 relation: ee.relation,
246 weight: ee.weight,
247 created_at: Utc::now(),
248 metadata: None,
249 };
250 graph.upsert_edge(edge).await?;
251 edges_imported += 1;
252 }
253
254 Ok(ImportSummary {
255 entities_imported,
256 edges_imported,
257 })
258 }
259
260 pub async fn import_kg_json(
262 &self,
263 json: &str,
264 target_namespace: Option<&str>,
265 ) -> RuntimeResult<ImportSummary> {
266 let archive: KgArchive =
267 serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
268 self.import_kg(&archive, target_namespace).await
269 }
270}
271
272#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::runtime::KhiveRuntime;
278 use khive_storage::EdgeRelation;
279
280 async fn make_rt() -> KhiveRuntime {
281 KhiveRuntime::memory().expect("in-memory runtime")
282 }
283
284 #[tokio::test]
286 async fn roundtrip_entities_and_edges() {
287 let src = make_rt().await;
288 let e1 = src
289 .create_entity(
290 None,
291 "concept",
292 "FlashAttention",
293 Some("fast attention"),
294 None,
295 vec![],
296 )
297 .await
298 .unwrap();
299 let e2 = src
300 .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
301 .await
302 .unwrap();
303 let e3 = src
304 .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
305 .await
306 .unwrap();
307 src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
308 .await
309 .unwrap();
310 src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
311 .await
312 .unwrap();
313
314 let archive = src.export_kg(None).await.unwrap();
315 assert_eq!(archive.entities.len(), 3);
316 assert_eq!(archive.edges.len(), 2);
317 assert_eq!(archive.format, "khive-kg");
318 assert_eq!(archive.version, "0.1");
319
320 let dst = make_rt().await;
321 let summary = dst.import_kg(&archive, None).await.unwrap();
322 assert_eq!(summary.entities_imported, 3);
323 assert_eq!(summary.edges_imported, 2);
324
325 let got = dst.get_entity(None, e1.id).await.unwrap();
327 assert!(got.is_some());
328 let got = got.unwrap();
329 assert_eq!(got.name, "FlashAttention");
330 assert_eq!(got.description.as_deref(), Some("fast attention"));
331 }
332
333 #[tokio::test]
335 async fn json_roundtrip() {
336 let src = make_rt().await;
337 let e1 = src
338 .create_entity(
339 None,
340 "concept",
341 "LoRA",
342 Some("low-rank adaptation"),
343 Some(serde_json::json!({"year": "2021"})),
344 vec!["fine-tuning".into()],
345 )
346 .await
347 .unwrap();
348 let e2 = src
349 .create_entity(None, "concept", "QLoRA", None, None, vec![])
350 .await
351 .unwrap();
352 src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
353 .await
354 .unwrap();
355
356 let json_str = src.export_kg_json(None).await.unwrap();
357 assert!(json_str.contains("khive-kg"));
358
359 let dst = make_rt().await;
360 let summary = dst.import_kg_json(&json_str, None).await.unwrap();
361 assert_eq!(summary.entities_imported, 2);
362 assert_eq!(summary.edges_imported, 1);
363
364 let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
365 assert_eq!(got.tags, vec!["fine-tuning"]);
366 }
367
368 #[tokio::test]
375 async fn namespace_targeting() {
376 let src = make_rt().await;
377 src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
378 .await
379 .unwrap();
380
381 let archive = src.export_kg(Some("a")).await.unwrap();
382 assert_eq!(archive.namespace, "a");
383
384 let dst = make_rt().await;
386 let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
387 assert_eq!(summary.entities_imported, 1);
388
389 let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
391 assert_eq!(in_b.len(), 1);
392 assert_eq!(in_b[0].name, "Sinkhorn");
393
394 let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
396 assert_eq!(in_a.len(), 1);
397
398 let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
400 assert_eq!(dst_a.len(), 0);
401 }
402
403 #[tokio::test]
405 async fn format_validation_rejects_wrong_format() {
406 let rt = make_rt().await;
407 let bad = KgArchive {
408 format: "wrong".to_string(),
409 version: "0.1".to_string(),
410 namespace: "local".to_string(),
411 exported_at: Utc::now(),
412 entities: vec![],
413 edges: vec![],
414 };
415 let err = rt.import_kg(&bad, None).await.unwrap_err();
416 assert!(matches!(err, RuntimeError::InvalidInput(_)));
417 }
418
419 #[test]
421 fn invalid_relation_rejected_at_deserialize() {
422 let json = r#"{
423 "format":"khive-kg","version":"0.1","namespace":"local",
424 "exported_at":"2026-01-01T00:00:00Z",
425 "entities":[],
426 "edges":[{"source":"00000000-0000-0000-0000-000000000001",
427 "target":"00000000-0000-0000-0000-000000000002",
428 "relation":"related_to","weight":0.5}]
429 }"#;
430 let result: Result<KgArchive, _> = serde_json::from_str(json);
431 assert!(
432 result.is_err(),
433 "non-canonical relation should fail to deserialize"
434 );
435 }
436}