Skip to main content

khive_runtime/
portability.rs

1// Copyright 2026 khive contributors. Licensed under Apache-2.0.
2//
3//! KG export / import — portable JSON archive for namespace-scoped knowledge graphs.
4//!
5//! Implements the v1 portability format described in ADR-010. Embeddings are
6//! intentionally excluded: they are regenerable from the embedding model + text
7//! and their inclusion would lock the format to a specific model.
8//!
9//! # Edge namespace enumeration
10//!
11//! `GraphStore::query_edges` has no namespace column — edges are linked to entities,
12//! not namespaces. Export collects all entity IDs in the namespace first, then
13//! queries edges where source_id is in that set. This covers every edge whose
14//! source entity belongs to the namespace, which is the correct definition of
15//! "edges in a namespace" for an export that preserves referential integrity.
16
17use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use std::str::FromStr;
24
25use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
26use khive_storage::{EdgeRelation, EntityFilter};
27use khive_types::EntityKind;
28
29use crate::error::{RuntimeError, RuntimeResult};
30use crate::runtime::KhiveRuntime;
31
32// ── Archive types ─────────────────────────────────────────────────────────────
33
34/// Portable JSON archive of a namespace-scoped knowledge graph.
35///
36/// The `format` field is always `"khive-kg"`. The `version` field identifies
37/// the serialization schema; parsers should reject unknown versions.
38#[derive(Clone, Debug, Serialize, Deserialize)]
39pub struct KgArchive {
40    pub format: String,
41    pub version: String,
42    pub namespace: String,
43    pub exported_at: DateTime<Utc>,
44    pub entities: Vec<ExportedEntity>,
45    pub edges: Vec<ExportedEdge>,
46}
47
48/// An entity record in the portable archive.
49#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct ExportedEntity {
51    pub id: Uuid,
52    /// EntityKind serialized as snake_case string (e.g. `"concept"`, `"person"`).
53    pub kind: String,
54    pub name: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub description: Option<String>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub properties: Option<serde_json::Value>,
59    #[serde(default)]
60    pub tags: Vec<String>,
61    pub created_at: DateTime<Utc>,
62    pub updated_at: DateTime<Utc>,
63}
64
65/// A directed edge record in the portable archive.
66#[derive(Clone, Debug, Serialize, Deserialize)]
67pub struct ExportedEdge {
68    pub source: Uuid,
69    pub target: Uuid,
70    /// One of the 13 canonical relations defined in ADR-002.
71    pub relation: EdgeRelation,
72    pub weight: f64,
73}
74
75/// Outcome of a successful import operation.
76#[derive(Clone, Debug, Serialize, Deserialize)]
77pub struct ImportSummary {
78    pub entities_imported: usize,
79    pub edges_imported: usize,
80}
81
82// ── KhiveRuntime impl ─────────────────────────────────────────────────────────
83
84impl KhiveRuntime {
85    /// Export all entities and edges in a namespace to a portable JSON archive.
86    ///
87    /// Edge collection: all entity IDs in the namespace are gathered first;
88    /// `query_edges` is then called with those IDs as `source_ids`. This
89    /// captures every edge whose source entity belongs to the namespace.
90    pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
91        let ns = self.ns(namespace).to_string();
92
93        // 1. Collect all entities in the namespace.
94        let entity_page = self
95            .entities(Some(&ns))?
96            .query_entities(
97                &ns,
98                EntityFilter::default(),
99                PageRequest {
100                    offset: 0,
101                    limit: u32::MAX,
102                },
103            )
104            .await?;
105
106        let entities: Vec<ExportedEntity> = entity_page
107            .items
108            .into_iter()
109            .map(|e| {
110                let created_at =
111                    DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
112                let updated_at =
113                    DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
114                ExportedEntity {
115                    id: e.id,
116                    kind: e.kind.to_string(),
117                    name: e.name,
118                    description: e.description,
119                    properties: e.properties,
120                    tags: e.tags,
121                    created_at,
122                    updated_at,
123                }
124            })
125            .collect();
126
127        // 2. Collect edges whose source is any entity in this namespace.
128        let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
129        let edges = if source_ids.is_empty() {
130            Vec::new()
131        } else {
132            let filter = EdgeFilter {
133                source_ids: source_ids.clone(),
134                ..Default::default()
135            };
136            let edge_page = self
137                .graph(Some(&ns))?
138                .query_edges(
139                    filter,
140                    Vec::new(),
141                    PageRequest {
142                        offset: 0,
143                        limit: u32::MAX,
144                    },
145                )
146                .await?;
147
148            let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
149            edge_page
150                .items
151                .into_iter()
152                .filter(|e| id_set.contains(&e.source_id))
153                .map(|e| ExportedEdge {
154                    source: e.source_id,
155                    target: e.target_id,
156                    relation: e.relation,
157                    weight: e.weight,
158                })
159                .collect()
160        };
161
162        Ok(KgArchive {
163            format: "khive-kg".to_string(),
164            version: "0.1".to_string(),
165            namespace: ns,
166            exported_at: Utc::now(),
167            entities,
168            edges,
169        })
170    }
171
172    /// Export to a JSON string (convenience wrapper around `export_kg`).
173    pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
174        let archive = self.export_kg(namespace).await?;
175        serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
176    }
177
178    /// Import an archive into `target_namespace`.
179    ///
180    /// If `target_namespace` is `None`, the archive's own namespace is used.
181    ///
182    /// - Entities: upserted by ID; existing records are overwritten.
183    /// - Edges: upserted; existing records are overwritten.
184    /// - Validation: `format != "khive-kg"` or unsupported version → `InvalidInput`.
185    ///   Invalid edge relations are caught at JSON deserialization time.
186    pub async fn import_kg(
187        &self,
188        archive: &KgArchive,
189        target_namespace: Option<&str>,
190    ) -> RuntimeResult<ImportSummary> {
191        // Format validation.
192        if archive.format != "khive-kg" {
193            return Err(RuntimeError::InvalidInput(format!(
194                "unsupported archive format {:?}; expected \"khive-kg\"",
195                archive.format
196            )));
197        }
198        if archive.version != "0.1" {
199            return Err(RuntimeError::InvalidInput(format!(
200                "unsupported archive version {:?}; supported: \"0.1\"",
201                archive.version
202            )));
203        }
204
205        let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
206
207        // Import entities.
208        let store = self.entities(Some(&ns))?;
209        let mut entities_imported = 0usize;
210        for ee in &archive.entities {
211            let created_micros = ee.created_at.timestamp_micros();
212            let updated_micros = ee.updated_at.timestamp_micros();
213            let entity = khive_storage::entity::Entity {
214                id: ee.id,
215                namespace: ns.clone(),
216                kind: EntityKind::from_str(&ee.kind).map_err(|e| {
217                    RuntimeError::InvalidInput(format!(
218                        "invalid entity kind {:?} in archive: {}",
219                        ee.kind, e
220                    ))
221                })?,
222                name: ee.name.clone(),
223                description: ee.description.clone(),
224                properties: ee.properties.clone(),
225                tags: ee.tags.clone(),
226                created_at: created_micros,
227                updated_at: updated_micros,
228                deleted_at: None,
229            };
230            store.upsert_entity(entity.clone()).await?;
231            // Index into FTS5 (and vector store if a model is configured) so that
232            // imported entities are visible to hybrid_search immediately.
233            self.reindex_entity(Some(&ns), &entity).await?;
234            entities_imported += 1;
235        }
236
237        // Import edges.
238        let graph = self.graph(Some(&ns))?;
239        let mut edges_imported = 0usize;
240        for ee in &archive.edges {
241            let edge = khive_storage::types::Edge {
242                id: LinkId::from(Uuid::new_v4()),
243                source_id: ee.source,
244                target_id: ee.target,
245                relation: ee.relation,
246                weight: ee.weight,
247                created_at: Utc::now(),
248                metadata: None,
249            };
250            graph.upsert_edge(edge).await?;
251            edges_imported += 1;
252        }
253
254        Ok(ImportSummary {
255            entities_imported,
256            edges_imported,
257        })
258    }
259
260    /// Import from a JSON string (convenience wrapper around `import_kg`).
261    pub async fn import_kg_json(
262        &self,
263        json: &str,
264        target_namespace: Option<&str>,
265    ) -> RuntimeResult<ImportSummary> {
266        let archive: KgArchive =
267            serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
268        self.import_kg(&archive, target_namespace).await
269    }
270}
271
272// ── Tests ─────────────────────────────────────────────────────────────────────
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::runtime::KhiveRuntime;
278    use khive_storage::EdgeRelation;
279
280    async fn make_rt() -> KhiveRuntime {
281        KhiveRuntime::memory().expect("in-memory runtime")
282    }
283
284    /// 1. Roundtrip: 3 entities + 2 edges survive export → import on a fresh runtime.
285    #[tokio::test]
286    async fn roundtrip_entities_and_edges() {
287        let src = make_rt().await;
288        let e1 = src
289            .create_entity(
290                None,
291                "concept",
292                "FlashAttention",
293                Some("fast attention"),
294                None,
295                vec![],
296            )
297            .await
298            .unwrap();
299        let e2 = src
300            .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
301            .await
302            .unwrap();
303        let e3 = src
304            .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
305            .await
306            .unwrap();
307        src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
308            .await
309            .unwrap();
310        src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
311            .await
312            .unwrap();
313
314        let archive = src.export_kg(None).await.unwrap();
315        assert_eq!(archive.entities.len(), 3);
316        assert_eq!(archive.edges.len(), 2);
317        assert_eq!(archive.format, "khive-kg");
318        assert_eq!(archive.version, "0.1");
319
320        let dst = make_rt().await;
321        let summary = dst.import_kg(&archive, None).await.unwrap();
322        assert_eq!(summary.entities_imported, 3);
323        assert_eq!(summary.edges_imported, 2);
324
325        // Spot-check: the imported entity is retrievable.
326        let got = dst.get_entity(None, e1.id).await.unwrap();
327        assert!(got.is_some());
328        let got = got.unwrap();
329        assert_eq!(got.name, "FlashAttention");
330        assert_eq!(got.description.as_deref(), Some("fast attention"));
331    }
332
333    /// 2. JSON roundtrip: export_kg_json → import_kg_json produces equivalent state.
334    #[tokio::test]
335    async fn json_roundtrip() {
336        let src = make_rt().await;
337        let e1 = src
338            .create_entity(
339                None,
340                "concept",
341                "LoRA",
342                Some("low-rank adaptation"),
343                Some(serde_json::json!({"year": "2021"})),
344                vec!["fine-tuning".into()],
345            )
346            .await
347            .unwrap();
348        let e2 = src
349            .create_entity(None, "concept", "QLoRA", None, None, vec![])
350            .await
351            .unwrap();
352        src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
353            .await
354            .unwrap();
355
356        let json_str = src.export_kg_json(None).await.unwrap();
357        assert!(json_str.contains("khive-kg"));
358
359        let dst = make_rt().await;
360        let summary = dst.import_kg_json(&json_str, None).await.unwrap();
361        assert_eq!(summary.entities_imported, 2);
362        assert_eq!(summary.edges_imported, 1);
363
364        let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
365        assert_eq!(got.tags, vec!["fine-tuning"]);
366    }
367
368    /// 3. Namespace targeting: export from namespace "a", import into namespace "b" on a
369    ///    fresh runtime — entities land in "b", and the source runtime's "a" is unaffected.
370    ///
371    ///    Note: source and destination are separate runtimes (separate in-memory DBs).
372    ///    Same-DB cross-namespace copy is not a portability use case — portability is about
373    ///    moving graphs between instances, not between namespaces within one instance.
374    #[tokio::test]
375    async fn namespace_targeting() {
376        let src = make_rt().await;
377        src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
378            .await
379            .unwrap();
380
381        let archive = src.export_kg(Some("a")).await.unwrap();
382        assert_eq!(archive.namespace, "a");
383
384        // Import into a fresh runtime, targeting namespace "b".
385        let dst = make_rt().await;
386        let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
387        assert_eq!(summary.entities_imported, 1);
388
389        // Entity is in "b" on the destination runtime.
390        let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
391        assert_eq!(in_b.len(), 1);
392        assert_eq!(in_b[0].name, "Sinkhorn");
393
394        // Namespace "a" on the source runtime is unchanged.
395        let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
396        assert_eq!(in_a.len(), 1);
397
398        // Namespace "a" on the destination runtime has nothing (only "b" was written).
399        let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
400        assert_eq!(dst_a.len(), 0);
401    }
402
403    /// 4. Format validation: wrong `format` field → InvalidInput.
404    #[tokio::test]
405    async fn format_validation_rejects_wrong_format() {
406        let rt = make_rt().await;
407        let bad = KgArchive {
408            format: "wrong".to_string(),
409            version: "0.1".to_string(),
410            namespace: "local".to_string(),
411            exported_at: Utc::now(),
412            entities: vec![],
413            edges: vec![],
414        };
415        let err = rt.import_kg(&bad, None).await.unwrap_err();
416        assert!(matches!(err, RuntimeError::InvalidInput(_)));
417    }
418
419    /// 5. Invalid relation in archive → InvalidInput.
420    #[test]
421    fn invalid_relation_rejected_at_deserialize() {
422        let json = r#"{
423            "format":"khive-kg","version":"0.1","namespace":"local",
424            "exported_at":"2026-01-01T00:00:00Z",
425            "entities":[],
426            "edges":[{"source":"00000000-0000-0000-0000-000000000001",
427                       "target":"00000000-0000-0000-0000-000000000002",
428                       "relation":"related_to","weight":0.5}]
429        }"#;
430        let result: Result<KgArchive, _> = serde_json::from_str(json);
431        assert!(
432            result.is_err(),
433            "non-canonical relation should fail to deserialize"
434        );
435    }
436}