Skip to main content

khive_runtime/
portability.rs

1// Copyright 2026 khive contributors. Licensed under Apache-2.0.
2//
3//! KG export / import — portable JSON archive for namespace-scoped knowledge graphs.
4//!
5//! Implements the v1 portability format described in ADR-010. Embeddings are
6//! intentionally excluded: they are regenerable from the embedding model + text
7//! and their inclusion would lock the format to a specific model.
8//!
9//! # Edge namespace enumeration
10//!
11//! `GraphStore::query_edges` has no namespace column — edges are linked to entities,
12//! not namespaces. Export collects all entity IDs in the namespace first, then
13//! queries edges where source_id is in that set. This covers every edge whose
14//! source entity belongs to the namespace, which is the correct definition of
15//! "edges in a namespace" for an export that preserves referential integrity.
16
17use std::collections::HashSet;
18
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23use khive_storage::types::{EdgeFilter, LinkId, PageRequest};
24use khive_storage::{EdgeRelation, EntityFilter};
25
26use crate::error::{RuntimeError, RuntimeResult};
27use crate::runtime::KhiveRuntime;
28
29// ── Archive types ─────────────────────────────────────────────────────────────
30
31/// Portable JSON archive of a namespace-scoped knowledge graph.
32///
33/// The `format` field is always `"khive-kg"`. The `version` field identifies
34/// the serialization schema; parsers should reject unknown versions.
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub struct KgArchive {
37    pub format: String,
38    pub version: String,
39    pub namespace: String,
40    pub exported_at: DateTime<Utc>,
41    pub entities: Vec<ExportedEntity>,
42    pub edges: Vec<ExportedEdge>,
43}
44
45/// An entity record in the portable archive.
46#[derive(Clone, Debug, Serialize, Deserialize)]
47pub struct ExportedEntity {
48    pub id: Uuid,
49    /// Pack-owned kind string (e.g. `"concept"`, `"person"`).
50    pub kind: String,
51    pub name: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub description: Option<String>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub properties: Option<serde_json::Value>,
56    #[serde(default)]
57    pub tags: Vec<String>,
58    pub created_at: DateTime<Utc>,
59    pub updated_at: DateTime<Utc>,
60}
61
62/// A directed edge record in the portable archive.
63#[derive(Clone, Debug, Serialize, Deserialize)]
64pub struct ExportedEdge {
65    pub source: Uuid,
66    pub target: Uuid,
67    /// One of the 13 canonical relations defined in ADR-002.
68    pub relation: EdgeRelation,
69    pub weight: f64,
70}
71
72/// Outcome of a successful import operation.
73#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct ImportSummary {
75    pub entities_imported: usize,
76    pub edges_imported: usize,
77    /// Number of edges that were skipped because one or both endpoint UUIDs
78    /// were not found in the target namespace after entity import.
79    ///
80    /// A non-zero value indicates the archive contained dangling edges (edges
81    /// referencing entities not present in the archive or the existing graph).
82    pub edges_skipped: usize,
83}
84
85// ── KhiveRuntime impl ─────────────────────────────────────────────────────────
86
87impl KhiveRuntime {
88    /// Export all entities and edges in a namespace to a portable JSON archive.
89    ///
90    /// Edge collection: all entity IDs in the namespace are gathered first;
91    /// `query_edges` is then called with those IDs as `source_ids`. This
92    /// captures every edge whose source entity belongs to the namespace.
93    pub async fn export_kg(&self, namespace: Option<&str>) -> RuntimeResult<KgArchive> {
94        let ns = self.ns(namespace).to_string();
95
96        // 1. Collect all entities in the namespace.
97        let entity_page = self
98            .entities(Some(&ns))?
99            .query_entities(
100                &ns,
101                EntityFilter::default(),
102                PageRequest {
103                    offset: 0,
104                    limit: u32::MAX,
105                },
106            )
107            .await?;
108
109        let entities: Vec<ExportedEntity> = entity_page
110            .items
111            .into_iter()
112            .map(|e| {
113                let created_at =
114                    DateTime::from_timestamp_micros(e.created_at).unwrap_or_else(Utc::now);
115                let updated_at =
116                    DateTime::from_timestamp_micros(e.updated_at).unwrap_or_else(Utc::now);
117                ExportedEntity {
118                    id: e.id,
119                    kind: e.kind.to_string(),
120                    name: e.name,
121                    description: e.description,
122                    properties: e.properties,
123                    tags: e.tags,
124                    created_at,
125                    updated_at,
126                }
127            })
128            .collect();
129
130        // 2. Collect edges whose source is any entity in this namespace.
131        let source_ids: Vec<Uuid> = entities.iter().map(|e| e.id).collect();
132        let edges = if source_ids.is_empty() {
133            Vec::new()
134        } else {
135            let filter = EdgeFilter {
136                source_ids: source_ids.clone(),
137                ..Default::default()
138            };
139            let edge_page = self
140                .graph(Some(&ns))?
141                .query_edges(
142                    filter,
143                    Vec::new(),
144                    PageRequest {
145                        offset: 0,
146                        limit: u32::MAX,
147                    },
148                )
149                .await?;
150
151            let id_set: HashSet<Uuid> = source_ids.into_iter().collect();
152            edge_page
153                .items
154                .into_iter()
155                .filter(|e| id_set.contains(&e.source_id))
156                .map(|e| ExportedEdge {
157                    source: e.source_id,
158                    target: e.target_id,
159                    relation: e.relation,
160                    weight: e.weight,
161                })
162                .collect()
163        };
164
165        Ok(KgArchive {
166            format: "khive-kg".to_string(),
167            version: "0.1".to_string(),
168            namespace: ns,
169            exported_at: Utc::now(),
170            entities,
171            edges,
172        })
173    }
174
175    /// Export to a JSON string (convenience wrapper around `export_kg`).
176    pub async fn export_kg_json(&self, namespace: Option<&str>) -> RuntimeResult<String> {
177        let archive = self.export_kg(namespace).await?;
178        serde_json::to_string(&archive).map_err(|e| RuntimeError::InvalidInput(e.to_string()))
179    }
180
181    /// Import an archive into `target_namespace`.
182    ///
183    /// If `target_namespace` is `None`, the archive's own namespace is used.
184    ///
185    /// - Entities: upserted by ID; existing records are overwritten.
186    /// - Edges: upserted; existing records are overwritten.
187    /// - Validation: `format != "khive-kg"` or unsupported version → `InvalidInput`.
188    ///   Invalid edge relations are caught at JSON deserialization time.
189    pub async fn import_kg(
190        &self,
191        archive: &KgArchive,
192        target_namespace: Option<&str>,
193    ) -> RuntimeResult<ImportSummary> {
194        // Format validation.
195        if archive.format != "khive-kg" {
196            return Err(RuntimeError::InvalidInput(format!(
197                "unsupported archive format {:?}; expected \"khive-kg\"",
198                archive.format
199            )));
200        }
201        if archive.version != "0.1" {
202            return Err(RuntimeError::InvalidInput(format!(
203                "unsupported archive version {:?}; supported: \"0.1\"",
204                archive.version
205            )));
206        }
207
208        let ns = target_namespace.unwrap_or(&archive.namespace).to_string();
209
210        // Import entities.
211        let store = self.entities(Some(&ns))?;
212        let mut entities_imported = 0usize;
213        for ee in &archive.entities {
214            let created_micros = ee.created_at.timestamp_micros();
215            let updated_micros = ee.updated_at.timestamp_micros();
216            let entity = khive_storage::entity::Entity {
217                id: ee.id,
218                namespace: ns.clone(),
219                kind: ee.kind.clone(),
220                name: ee.name.clone(),
221                description: ee.description.clone(),
222                properties: ee.properties.clone(),
223                tags: ee.tags.clone(),
224                created_at: created_micros,
225                updated_at: updated_micros,
226                deleted_at: None,
227            };
228            store.upsert_entity(entity.clone()).await?;
229            // Index into FTS5 (and vector store if a model is configured) so that
230            // imported entities are visible to hybrid_search immediately.
231            self.reindex_entity(Some(&ns), &entity).await?;
232            entities_imported += 1;
233        }
234
235        // Import edges — validate both endpoints before inserting.
236        //
237        // An untrusted archive may contain edges whose source or target UUIDs
238        // do not correspond to any entity in the target namespace. Inserting
239        // such edges would leave dangling references in the graph store. We
240        // therefore check each endpoint with `get_entity` (namespace-scoped,
241        // fail-closed) and skip any edge whose source or target is absent.
242        let graph = self.graph(Some(&ns))?;
243        let mut edges_imported = 0usize;
244        let mut edges_skipped = 0usize;
245        for ee in &archive.edges {
246            let source_ok = self.get_entity(Some(&ns), ee.source).await?.is_some();
247            if !source_ok {
248                tracing::warn!(
249                    source = %ee.source,
250                    target = %ee.target,
251                    relation = ?ee.relation,
252                    "import_kg: skipping edge — source entity not found in namespace {ns:?}"
253                );
254                edges_skipped += 1;
255                continue;
256            }
257            let target_ok = self.get_entity(Some(&ns), ee.target).await?.is_some();
258            if !target_ok {
259                tracing::warn!(
260                    source = %ee.source,
261                    target = %ee.target,
262                    relation = ?ee.relation,
263                    "import_kg: skipping edge — target entity not found in namespace {ns:?}"
264                );
265                edges_skipped += 1;
266                continue;
267            }
268            let edge = khive_storage::types::Edge {
269                id: LinkId::from(Uuid::new_v4()),
270                source_id: ee.source,
271                target_id: ee.target,
272                relation: ee.relation,
273                weight: ee.weight,
274                created_at: Utc::now(),
275                metadata: None,
276            };
277            graph.upsert_edge(edge).await?;
278            edges_imported += 1;
279        }
280
281        Ok(ImportSummary {
282            entities_imported,
283            edges_imported,
284            edges_skipped,
285        })
286    }
287
288    /// Import from a JSON string (convenience wrapper around `import_kg`).
289    pub async fn import_kg_json(
290        &self,
291        json: &str,
292        target_namespace: Option<&str>,
293    ) -> RuntimeResult<ImportSummary> {
294        let archive: KgArchive =
295            serde_json::from_str(json).map_err(|e| RuntimeError::InvalidInput(e.to_string()))?;
296        self.import_kg(&archive, target_namespace).await
297    }
298}
299
300// ── Tests ─────────────────────────────────────────────────────────────────────
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::runtime::KhiveRuntime;
306    use khive_storage::EdgeRelation;
307
308    async fn make_rt() -> KhiveRuntime {
309        KhiveRuntime::memory().expect("in-memory runtime")
310    }
311
312    /// 1. Roundtrip: 3 entities + 2 edges survive export → import on a fresh runtime.
313    #[tokio::test]
314    async fn roundtrip_entities_and_edges() {
315        let src = make_rt().await;
316        let e1 = src
317            .create_entity(
318                None,
319                "concept",
320                "FlashAttention",
321                Some("fast attention"),
322                None,
323                vec![],
324            )
325            .await
326            .unwrap();
327        let e2 = src
328            .create_entity(None, "concept", "FlashAttention-2", None, None, vec![])
329            .await
330            .unwrap();
331        let e3 = src
332            .create_entity(None, "person", "Tri Dao", None, None, vec!["author".into()])
333            .await
334            .unwrap();
335        src.link(None, e2.id, e1.id, EdgeRelation::Extends, 1.0)
336            .await
337            .unwrap();
338        src.link(None, e1.id, e3.id, EdgeRelation::IntroducedBy, 0.9)
339            .await
340            .unwrap();
341
342        let archive = src.export_kg(None).await.unwrap();
343        assert_eq!(archive.entities.len(), 3);
344        assert_eq!(archive.edges.len(), 2);
345        assert_eq!(archive.format, "khive-kg");
346        assert_eq!(archive.version, "0.1");
347
348        let dst = make_rt().await;
349        let summary = dst.import_kg(&archive, None).await.unwrap();
350        assert_eq!(summary.entities_imported, 3);
351        assert_eq!(summary.edges_imported, 2);
352
353        // Spot-check: the imported entity is retrievable.
354        let got = dst.get_entity(None, e1.id).await.unwrap();
355        assert!(got.is_some());
356        let got = got.unwrap();
357        assert_eq!(got.name, "FlashAttention");
358        assert_eq!(got.description.as_deref(), Some("fast attention"));
359    }
360
361    /// 2. JSON roundtrip: export_kg_json → import_kg_json produces equivalent state.
362    #[tokio::test]
363    async fn json_roundtrip() {
364        let src = make_rt().await;
365        let e1 = src
366            .create_entity(
367                None,
368                "concept",
369                "LoRA",
370                Some("low-rank adaptation"),
371                Some(serde_json::json!({"year": "2021"})),
372                vec!["fine-tuning".into()],
373            )
374            .await
375            .unwrap();
376        let e2 = src
377            .create_entity(None, "concept", "QLoRA", None, None, vec![])
378            .await
379            .unwrap();
380        src.link(None, e2.id, e1.id, EdgeRelation::VariantOf, 0.9)
381            .await
382            .unwrap();
383
384        let json_str = src.export_kg_json(None).await.unwrap();
385        assert!(json_str.contains("khive-kg"));
386
387        let dst = make_rt().await;
388        let summary = dst.import_kg_json(&json_str, None).await.unwrap();
389        assert_eq!(summary.entities_imported, 2);
390        assert_eq!(summary.edges_imported, 1);
391
392        let got = dst.get_entity(None, e1.id).await.unwrap().unwrap();
393        assert_eq!(got.tags, vec!["fine-tuning"]);
394    }
395
396    /// 3. Namespace targeting: export from namespace "a", import into namespace "b" on a
397    ///    fresh runtime — entities land in "b", and the source runtime's "a" is unaffected.
398    ///
399    ///    Note: source and destination are separate runtimes (separate in-memory DBs).
400    ///    Same-DB cross-namespace copy is not a portability use case — portability is about
401    ///    moving graphs between instances, not between namespaces within one instance.
402    #[tokio::test]
403    async fn namespace_targeting() {
404        let src = make_rt().await;
405        src.create_entity(Some("a"), "concept", "Sinkhorn", None, None, vec![])
406            .await
407            .unwrap();
408
409        let archive = src.export_kg(Some("a")).await.unwrap();
410        assert_eq!(archive.namespace, "a");
411
412        // Import into a fresh runtime, targeting namespace "b".
413        let dst = make_rt().await;
414        let summary = dst.import_kg(&archive, Some("b")).await.unwrap();
415        assert_eq!(summary.entities_imported, 1);
416
417        // Entity is in "b" on the destination runtime.
418        let in_b = dst.list_entities(Some("b"), None, 100).await.unwrap();
419        assert_eq!(in_b.len(), 1);
420        assert_eq!(in_b[0].name, "Sinkhorn");
421
422        // Namespace "a" on the source runtime is unchanged.
423        let in_a = src.list_entities(Some("a"), None, 100).await.unwrap();
424        assert_eq!(in_a.len(), 1);
425
426        // Namespace "a" on the destination runtime has nothing (only "b" was written).
427        let dst_a = dst.list_entities(Some("a"), None, 100).await.unwrap();
428        assert_eq!(dst_a.len(), 0);
429    }
430
431    /// 4. Format validation: wrong `format` field → InvalidInput.
432    #[tokio::test]
433    async fn format_validation_rejects_wrong_format() {
434        let rt = make_rt().await;
435        let bad = KgArchive {
436            format: "wrong".to_string(),
437            version: "0.1".to_string(),
438            namespace: "local".to_string(),
439            exported_at: Utc::now(),
440            entities: vec![],
441            edges: vec![],
442        };
443        let err = rt.import_kg(&bad, None).await.unwrap_err();
444        assert!(matches!(err, RuntimeError::InvalidInput(_)));
445    }
446
447    /// 5. Invalid relation in archive → InvalidInput.
448    #[test]
449    fn invalid_relation_rejected_at_deserialize() {
450        let json = r#"{
451            "format":"khive-kg","version":"0.1","namespace":"local",
452            "exported_at":"2026-01-01T00:00:00Z",
453            "entities":[],
454            "edges":[{"source":"00000000-0000-0000-0000-000000000001",
455                       "target":"00000000-0000-0000-0000-000000000002",
456                       "relation":"related_to","weight":0.5}]
457        }"#;
458        let result: Result<KgArchive, _> = serde_json::from_str(json);
459        assert!(
460            result.is_err(),
461            "non-canonical relation should fail to deserialize"
462        );
463    }
464
465    // ── Dangling-edge validation tests (issue #28) ────────────────────────────
466
467    /// 6. Edge with dangling source (source UUID not in entity table) is skipped.
468    ///
469    /// The archive has one entity + one edge whose source is a phantom UUID.
470    /// Import succeeds, entities_imported=1, edges_imported=0, edges_skipped=1.
471    #[tokio::test]
472    async fn import_edge_with_dangling_source_is_skipped() {
473        let phantom_source = Uuid::parse_str("deadbeef-dead-4ead-dead-deadbeefcafe").unwrap();
474
475        let rt = make_rt().await;
476        // Create an entity that will be the real target.
477        let real = rt
478            .create_entity(None, "concept", "Real", None, None, vec![])
479            .await
480            .unwrap();
481
482        // Build archive manually: one real entity, one edge with phantom source.
483        let archive = KgArchive {
484            format: "khive-kg".to_string(),
485            version: "0.1".to_string(),
486            namespace: "local".to_string(),
487            exported_at: Utc::now(),
488            entities: vec![ExportedEntity {
489                id: real.id,
490                kind: "concept".to_string(),
491                name: "Real".to_string(),
492                description: None,
493                properties: None,
494                tags: vec![],
495                created_at: Utc::now(),
496                updated_at: Utc::now(),
497            }],
498            edges: vec![ExportedEdge {
499                source: phantom_source,
500                target: real.id,
501                relation: EdgeRelation::Extends,
502                weight: 1.0,
503            }],
504        };
505
506        let dst = make_rt().await;
507        let summary = dst.import_kg(&archive, None).await.unwrap();
508        assert_eq!(summary.entities_imported, 1);
509        assert_eq!(
510            summary.edges_imported, 0,
511            "dangling source must not be imported"
512        );
513        assert_eq!(
514            summary.edges_skipped, 1,
515            "dangling source must be counted as skipped"
516        );
517    }
518
519    /// 7. Edge with dangling target (target UUID not in entity table) is skipped.
520    ///
521    /// The archive has one entity + one edge whose target is a phantom UUID.
522    /// Import succeeds, entities_imported=1, edges_imported=0, edges_skipped=1.
523    #[tokio::test]
524    async fn import_edge_with_dangling_target_is_skipped() {
525        let phantom_target = Uuid::parse_str("cafebabe-cafe-4abe-cafe-cafebabecafe").unwrap();
526
527        let rt = make_rt().await;
528        let real = rt
529            .create_entity(None, "concept", "Source", None, None, vec![])
530            .await
531            .unwrap();
532
533        let archive = KgArchive {
534            format: "khive-kg".to_string(),
535            version: "0.1".to_string(),
536            namespace: "local".to_string(),
537            exported_at: Utc::now(),
538            entities: vec![ExportedEntity {
539                id: real.id,
540                kind: "concept".to_string(),
541                name: "Source".to_string(),
542                description: None,
543                properties: None,
544                tags: vec![],
545                created_at: Utc::now(),
546                updated_at: Utc::now(),
547            }],
548            edges: vec![ExportedEdge {
549                source: real.id,
550                target: phantom_target,
551                relation: EdgeRelation::DependsOn,
552                weight: 0.8,
553            }],
554        };
555
556        let dst = make_rt().await;
557        let summary = dst.import_kg(&archive, None).await.unwrap();
558        assert_eq!(summary.entities_imported, 1);
559        assert_eq!(
560            summary.edges_imported, 0,
561            "dangling target must not be imported"
562        );
563        assert_eq!(
564            summary.edges_skipped, 1,
565            "dangling target must be counted as skipped"
566        );
567    }
568
569    /// 8. Mixed batch: some valid edges and some dangling edges — correct counts reported.
570    ///
571    /// Archive has 3 entities, 2 valid edges, and 1 dangling edge (phantom target).
572    /// Import succeeds with edges_imported=2, edges_skipped=1.
573    #[tokio::test]
574    async fn import_mixed_edges_reports_correct_counts() {
575        let phantom = Uuid::parse_str("11111111-1111-4111-8111-111111111111").unwrap();
576
577        let src = make_rt().await;
578        let a = src
579            .create_entity(None, "concept", "A", None, None, vec![])
580            .await
581            .unwrap();
582        let b = src
583            .create_entity(None, "concept", "B", None, None, vec![])
584            .await
585            .unwrap();
586        let c = src
587            .create_entity(None, "concept", "C", None, None, vec![])
588            .await
589            .unwrap();
590
591        // Build archive with 3 entities and 3 edges: 2 valid, 1 dangling.
592        let archive = KgArchive {
593            format: "khive-kg".to_string(),
594            version: "0.1".to_string(),
595            namespace: "local".to_string(),
596            exported_at: Utc::now(),
597            entities: vec![
598                ExportedEntity {
599                    id: a.id,
600                    kind: "concept".to_string(),
601                    name: "A".to_string(),
602                    description: None,
603                    properties: None,
604                    tags: vec![],
605                    created_at: Utc::now(),
606                    updated_at: Utc::now(),
607                },
608                ExportedEntity {
609                    id: b.id,
610                    kind: "concept".to_string(),
611                    name: "B".to_string(),
612                    description: None,
613                    properties: None,
614                    tags: vec![],
615                    created_at: Utc::now(),
616                    updated_at: Utc::now(),
617                },
618                ExportedEntity {
619                    id: c.id,
620                    kind: "concept".to_string(),
621                    name: "C".to_string(),
622                    description: None,
623                    properties: None,
624                    tags: vec![],
625                    created_at: Utc::now(),
626                    updated_at: Utc::now(),
627                },
628            ],
629            edges: vec![
630                // Valid: A → B
631                ExportedEdge {
632                    source: a.id,
633                    target: b.id,
634                    relation: EdgeRelation::Extends,
635                    weight: 1.0,
636                },
637                // Valid: B → C
638                ExportedEdge {
639                    source: b.id,
640                    target: c.id,
641                    relation: EdgeRelation::DependsOn,
642                    weight: 0.9,
643                },
644                // Dangling: A → phantom
645                ExportedEdge {
646                    source: a.id,
647                    target: phantom,
648                    relation: EdgeRelation::Enables,
649                    weight: 0.5,
650                },
651            ],
652        };
653
654        let dst = make_rt().await;
655        let summary = dst.import_kg(&archive, None).await.unwrap();
656        assert_eq!(summary.entities_imported, 3);
657        assert_eq!(
658            summary.edges_imported, 2,
659            "only valid edges must be imported"
660        );
661        assert_eq!(
662            summary.edges_skipped, 1,
663            "one dangling edge must be reported"
664        );
665    }
666
667    /// 9. All-valid edges produce edges_skipped=0 (no regression on the happy path).
668    #[tokio::test]
669    async fn import_all_valid_edges_reports_zero_skipped() {
670        let src = make_rt().await;
671        let e1 = src
672            .create_entity(None, "concept", "E1", None, None, vec![])
673            .await
674            .unwrap();
675        let e2 = src
676            .create_entity(None, "concept", "E2", None, None, vec![])
677            .await
678            .unwrap();
679        src.link(None, e1.id, e2.id, EdgeRelation::VariantOf, 0.7)
680            .await
681            .unwrap();
682
683        let archive = src.export_kg(None).await.unwrap();
684        let dst = make_rt().await;
685        let summary = dst.import_kg(&archive, None).await.unwrap();
686        assert_eq!(summary.edges_imported, 1);
687        assert_eq!(
688            summary.edges_skipped, 0,
689            "no edges should be skipped when all endpoints exist"
690        );
691    }
692}