Skip to main content

khive_runtime/
portability.rs

1// Copyright 2026 khive contributors. Licensed under Apache-2.0.
2//
3//! KG export / import — portable JSON archive for namespace-scoped knowledge graphs.
4//!
5//! Implements the v1 portability format described in ADR-010. Embeddings are
6//! intentionally excluded: they are regenerable from the embedding model + text
7//! and their inclusion would lock the format to a specific model.
8//!
9//! # Edge namespace enumeration
10//!
11//! `GraphStore::query_edges` has no namespace column — edges are linked to entities,
12//! not namespaces. Export collects all entity IDs in the namespace first, then
13//! queries edges where source_id is in that set. This covers every edge whose
14//! source entity belongs to the namespace, which is the correct definition of
15//! "edges in a namespace" for an export that preserves referential integrity.
16
17use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
24use khive_storage::{EdgeRelation, EntityFilter};
25
26use crate::error::{RuntimeError, RuntimeResult};
27use crate::runtime::KhiveRuntime;
28
29// ── Archive types ─────────────────────────────────────────────────────────────
30
31/// Portable JSON archive of a namespace-scoped knowledge graph.
32///
33/// The `format` field is always `"khive-kg"`. The `version` field identifies
34/// the serialization schema; parsers should reject unknown versions.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct KgArchive {
37    pub format: String,
38    pub version: String,
39    pub namespace: String,
40    pub exported_at: DateTime<Utc>,
41    pub entities: Vec<ExportedEntity>,
42    pub edges: Vec<ExportedEdge>,
43}
44
45/// An entity record in the portable archive.
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ExportedEntity {
48    pub id: Uuid,
49    /// Pack-owned kind string (e.g. `"concept"`, `"person"`).
50    pub kind: String,
51    pub name: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub description: Option<String>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub properties: Option<serde_json::Value>,
56    #[serde(default)]
57    pub tags: Vec<String>,
58    pub created_at: DateTime<Utc>,
59    pub updated_at: DateTime<Utc>,
60}
61
62/// A directed edge record in the portable archive.
63#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct ExportedEdge {
65    pub source: Uuid,
66    pub target: Uuid,
67    /// One of the 13 canonical relations defined in ADR-002.
68    pub relation: EdgeRelation,
69    pub weight: f64,
70}
71
72/// Outcome of a successful import operation.
73#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct ImportSummary {
75    pub entities_imported: usize,
76    pub edges_imported: usize,
77}
78
79// ── KhiveRuntime impl ─────────────────────────────────────────────────────────
80
81impl KhiveRuntime {
82    /// Export all entities and edges in a namespace to a portable JSON archive.
83    ///
84    /// Edge collection: all entity IDs in the namespace are gathered first;
85    /// `query_edges` is then called with those IDs as `source_ids`. This
86    /// captures every edge whose source entity belongs to the namespace.
87    pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
88        let ns = self.ns(namespace).to_string();
89
90        // 1. Collect all entities in the namespace.
91        let entity_page = self
92            .entities(Some(&ns))?
93            .query_entities(
94                &ns,
95                EntityFilter::default(),
96                PageRequest {
97                    offset: 0,
98                    limit: u32::MAX,
99                },
100            )
101            .await?;
102
103        let entities: Vec<ExportedEntity> = entity_page
104            .items
105            .into_iter()
106            .map(|e| {
107                let created_at =
108                    DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
109                let updated_at =
110                    DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
111                ExportedEntity {
112                    id: e.id,
113                    kind: e.kind.to_string(),
114                    name: e.name,
115                    description: e.description,
116                    properties: e.properties,
117                    tags: e.tags,
118                    created_at,
119                    updated_at,
120                }
121            })
122            .collect();
123
124        // 2. Collect edges whose source is any entity in this namespace.
125        let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
126        let edges = if source_ids.is_empty() {
127            Vec::new()
128        } else {
129            let filter = EdgeFilter {
130                source_ids: source_ids.clone(),
131                ..Default::default()
132            };
133            let edge_page = self
134                .graph(Some(&ns))?
135                .query_edges(
136                    filter,
137                    Vec::new(),
138                    PageRequest {
139                        offset: 0,
140                        limit: u32::MAX,
141                    },
142                )
143                .await?;
144
145            let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
146            edge_page
147                .items
148                .into_iter()
149                .filter(|e| id_set.contains(&e.source_id))
150                .map(|e| ExportedEdge {
151                    source: e.source_id,
152                    target: e.target_id,
153                    relation: e.relation,
154                    weight: e.weight,
155                })
156                .collect()
157        };
158
159        Ok(KgArchive {
160            format: "khive-kg".to_string(),
161            version: "0.1".to_string(),
162            namespace: ns,
163            exported_at: Utc::now(),
164            entities,
165            edges,
166        })
167    }
168
169    /// Export to a JSON string (convenience wrapper around `export_kg`).
170    pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
171        let archive = self.export_kg(namespace).await?;
172        serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
173    }
174
175    /// Import an archive into `target_namespace`.
176    ///
177    /// If `target_namespace` is `None`, the archive's own namespace is used.
178    ///
179    /// - Entities: upserted by ID; existing records are overwritten.
180    /// - Edges: upserted; existing records are overwritten.
181    /// - Validation: `format != "khive-kg"` or unsupported version → `InvalidInput`.
182    ///   Invalid edge relations are caught at JSON deserialization time.
183    pub async fn import_kg(
184        &self,
185        archive: &KgArchive,
186        target_namespace: Option<&str>,
187    ) -> RuntimeResult<ImportSummary> {
188        // Format validation.
189        if archive.format != "khive-kg" {
190            return Err(RuntimeError::InvalidInput(format!(
191                "unsupported archive format {:?}; expected \"khive-kg\"",
192                archive.format
193            )));
194        }
195        if archive.version != "0.1" {
196            return Err(RuntimeError::InvalidInput(format!(
197                "unsupported archive version {:?}; supported: \"0.1\"",
198                archive.version
199            )));
200        }
201
202        let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
203
204        // Import entities.
205        let store = self.entities(Some(&ns))?;
206        let mut entities_imported = 0usize;
207        for ee in &archive.entities {
208            let created_micros = ee.created_at.timestamp_micros();
209            let updated_micros = ee.updated_at.timestamp_micros();
210            let entity = khive_storage::entity::Entity {
211                id: ee.id,
212                namespace: ns.clone(),
213                kind: ee.kind.clone(),
214                name: ee.name.clone(),
215                description: ee.description.clone(),
216                properties: ee.properties.clone(),
217                tags: ee.tags.clone(),
218                created_at: created_micros,
219                updated_at: updated_micros,
220                deleted_at: None,
221            };
222            store.upsert_entity(entity.clone()).await?;
223            // Index into FTS5 (and vector store if a model is configured) so that
224            // imported entities are visible to hybrid_search immediately.
225            self.reindex_entity(Some(&ns), &entity).await?;
226            entities_imported += 1;
227        }
228
229        // Import edges.
230        let graph = self.graph(Some(&ns))?;
231        let mut edges_imported = 0usize;
232        for ee in &archive.edges {
233            let edge = khive_storage::types::Edge {
234                id: LinkId::from(Uuid::new_v4()),
235                source_id: ee.source,
236                target_id: ee.target,
237                relation: ee.relation,
238                weight: ee.weight,
239                created_at: Utc::now(),
240                metadata: None,
241            };
242            graph.upsert_edge(edge).await?;
243            edges_imported += 1;
244        }
245
246        Ok(ImportSummary {
247            entities_imported,
248            edges_imported,
249        })
250    }
251
252    /// Import from a JSON string (convenience wrapper around `import_kg`).
253    pub async fn import_kg_json(
254        &self,
255        json: &str,
256        target_namespace: Option<&str>,
257    ) -> RuntimeResult<ImportSummary> {
258        let archive: KgArchive =
259            serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
260        self.import_kg(&archive, target_namespace).await
261    }
262}
263
264// ── Tests ─────────────────────────────────────────────────────────────────────
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::runtime::KhiveRuntime;
270    use khive_storage::EdgeRelation;
271
272    async fn make_rt() -> KhiveRuntime {
273        KhiveRuntime::memory().expect("in-memory runtime")
274    }
275
276    /// 1. Roundtrip: 3 entities + 2 edges survive export → import on a fresh runtime.
277    #[tokio::test]
278    async fn roundtrip_entities_and_edges() {
279        let src = make_rt().await;
280        let e1 = src
281            .create_entity(
282                None,
283                "concept",
284                "FlashAttention",
285                Some("fast attention"),
286                None,
287                vec![],
288            )
289            .await
290            .unwrap();
291        let e2 = src
292            .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
293            .await
294            .unwrap();
295        let e3 = src
296            .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
297            .await
298            .unwrap();
299        src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
300            .await
301            .unwrap();
302        src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
303            .await
304            .unwrap();
305
306        let archive = src.export_kg(None).await.unwrap();
307        assert_eq!(archive.entities.len(), 3);
308        assert_eq!(archive.edges.len(), 2);
309        assert_eq!(archive.format, "khive-kg");
310        assert_eq!(archive.version, "0.1");
311
312        let dst = make_rt().await;
313        let summary = dst.import_kg(&archive, None).await.unwrap();
314        assert_eq!(summary.entities_imported, 3);
315        assert_eq!(summary.edges_imported, 2);
316
317        // Spot-check: the imported entity is retrievable.
318        let got = dst.get_entity(None, e1.id).await.unwrap();
319        assert!(got.is_some());
320        let got = got.unwrap();
321        assert_eq!(got.name, "FlashAttention");
322        assert_eq!(got.description.as_deref(), Some("fast attention"));
323    }
324
325    /// 2. JSON roundtrip: export_kg_json → import_kg_json produces equivalent state.
326    #[tokio::test]
327    async fn json_roundtrip() {
328        let src = make_rt().await;
329        let e1 = src
330            .create_entity(
331                None,
332                "concept",
333                "LoRA",
334                Some("low-rank adaptation"),
335                Some(serde_json::json!({"year": "2021"})),
336                vec!["fine-tuning".into()],
337            )
338            .await
339            .unwrap();
340        let e2 = src
341            .create_entity(None, "concept", "QLoRA", None, None, vec![])
342            .await
343            .unwrap();
344        src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
345            .await
346            .unwrap();
347
348        let json_str = src.export_kg_json(None).await.unwrap();
349        assert!(json_str.contains("khive-kg"));
350
351        let dst = make_rt().await;
352        let summary = dst.import_kg_json(&json_str, None).await.unwrap();
353        assert_eq!(summary.entities_imported, 2);
354        assert_eq!(summary.edges_imported, 1);
355
356        let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
357        assert_eq!(got.tags, vec!["fine-tuning"]);
358    }
359
360    /// 3. Namespace targeting: export from namespace "a", import into namespace "b" on a
361    ///    fresh runtime — entities land in "b", and the source runtime's "a" is unaffected.
362    ///
363    ///    Note: source and destination are separate runtimes (separate in-memory DBs).
364    ///    Same-DB cross-namespace copy is not a portability use case — portability is about
365    ///    moving graphs between instances, not between namespaces within one instance.
366    #[tokio::test]
367    async fn namespace_targeting() {
368        let src = make_rt().await;
369        src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
370            .await
371            .unwrap();
372
373        let archive = src.export_kg(Some("a")).await.unwrap();
374        assert_eq!(archive.namespace, "a");
375
376        // Import into a fresh runtime, targeting namespace "b".
377        let dst = make_rt().await;
378        let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
379        assert_eq!(summary.entities_imported, 1);
380
381        // Entity is in "b" on the destination runtime.
382        let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
383        assert_eq!(in_b.len(), 1);
384        assert_eq!(in_b[0].name, "Sinkhorn");
385
386        // Namespace "a" on the source runtime is unchanged.
387        let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
388        assert_eq!(in_a.len(), 1);
389
390        // Namespace "a" on the destination runtime has nothing (only "b" was written).
391        let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
392        assert_eq!(dst_a.len(), 0);
393    }
394
395    /// 4. Format validation: wrong `format` field → InvalidInput.
396    #[tokio::test]
397    async fn format_validation_rejects_wrong_format() {
398        let rt = make_rt().await;
399        let bad = KgArchive {
400            format: "wrong".to_string(),
401            version: "0.1".to_string(),
402            namespace: "local".to_string(),
403            exported_at: Utc::now(),
404            entities: vec![],
405            edges: vec![],
406        };
407        let err = rt.import_kg(&bad, None).await.unwrap_err();
408        assert!(matches!(err, RuntimeError::InvalidInput(_)));
409    }
410
411    /// 5. Invalid relation in archive → InvalidInput.
412    #[test]
413    fn invalid_relation_rejected_at_deserialize() {
414        let json = r#"{
415            "format":"khive-kg","version":"0.1","namespace":"local",
416            "exported_at":"2026-01-01T00:00:00Z",
417            "entities":[],
418            "edges":[{"source":"00000000-0000-0000-0000-000000000001",
419                       "target":"00000000-0000-0000-0000-000000000002",
420                       "relation":"related_to","weight":0.5}]
421        }"#;
422        let result: Result<KgArchive, _> = serde_json::from_str(json);
423        assert!(
424            result.is_err(),
425            "non-canonical relation should fail to deserialize"
426        );
427    }
428}