Skip to main content

kyma_memory/
schema.rs

1//! Arrow schemas for the columnar memory tables.
2//!
3//! `memory_nodes` / `memory_edges` are ordinary Kyma columnar tables, written
4//! through the ingest `WritePath` and registered as the `memory` graph. They
5//! must be created with an EXPLICIT schema (auto-provisioning can't declare the
6//! `embedding` vector column). Timestamps are stored as RFC3339 strings so
7//! lexicographic ordering = chronological ordering (and to avoid Arrow
8//! timestamp-coercion pitfalls); `updated_at` is the latest-wins version key.
9
10use std::sync::Arc;
11
12use arrow_schema::{DataType, Field, Schema, SchemaRef};
13
14/// Schema for `memory_nodes`. `dim` is the embedding dimension (e.g. 384).
15///
16/// Graph columns: `id` (node id), `labels` (node label), `realm` (namespace).
17/// Everything else surfaces as graph node properties.
18///
19/// Bi-temporal validity (Zep/Graphiti style, invalidate-don't-delete):
20/// `valid_at` is when the fact became true (defaults to `created_at`),
21/// `invalid_at` is when it was superseded/contradicted (NULL = currently valid),
22/// `superseded_by` points at the memory id that replaced it, and `provenance`
23/// is a JSON blob describing how the memory was formed.
24pub fn memory_nodes_schema(dim: i32) -> SchemaRef {
25    let item = Arc::new(Field::new("item", DataType::Float32, false));
26    Arc::new(Schema::new(vec![
27        Field::new("id", DataType::Utf8, false),
28        Field::new("labels", DataType::Utf8, true),
29        Field::new("realm", DataType::Utf8, true),
30        Field::new("memory_type", DataType::Utf8, true),
31        Field::new("title", DataType::Utf8, true),
32        Field::new("content", DataType::Utf8, true),
33        Field::new("content_preview", DataType::Utf8, true),
34        Field::new("tags", DataType::Utf8, true),
35        Field::new("importance", DataType::Float64, true),
36        Field::new("status", DataType::Utf8, true),
37        Field::new("source_session_id", DataType::Utf8, true),
38        Field::new("source_run_id", DataType::Utf8, true),
39        Field::new("embedding", DataType::FixedSizeList(item, dim), false),
40        Field::new("created_at", DataType::Utf8, true),
41        Field::new("updated_at", DataType::Utf8, true),
42        Field::new("valid_at", DataType::Utf8, true),
43        Field::new("invalid_at", DataType::Utf8, true),
44        Field::new("superseded_by", DataType::Utf8, true),
45        Field::new("provenance", DataType::Utf8, true),
46        // Deterministic upsert key: (realm, topic_key) updates in place.
47        Field::new("topic_key", DataType::Utf8, true),
48    ]))
49}
50
51/// Columns an older `memory_nodes` table may be missing (added after initial
52/// provisioning: bi-temporal validity + the topic-key upsert key). The writer
53/// detects the drift and backfills them via `alter_table_add_column` (old
54/// extents null-fill on read).
55pub const BITEMPORAL_COLUMNS: &[&str] =
56    &["valid_at", "invalid_at", "superseded_by", "provenance", "topic_key"];
57
58/// Schema for `memory_edges`. Graph columns: `src`, `dst`, `type`, `realm`.
59/// `target_namespace` carries the foreign endpoint's `database/graph` for
60/// cross-graph `REFERENCES` edges so the unified canvas can stitch them.
61pub fn memory_edges_schema() -> SchemaRef {
62    Arc::new(Schema::new(vec![
63        Field::new("id", DataType::Utf8, false),
64        Field::new("src", DataType::Utf8, true),
65        Field::new("dst", DataType::Utf8, true),
66        Field::new("type", DataType::Utf8, true),
67        Field::new("realm", DataType::Utf8, true),
68        Field::new("target_namespace", DataType::Utf8, true),
69        Field::new("props", DataType::Utf8, true),
70        Field::new("created_at", DataType::Utf8, true),
71    ]))
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77
78    #[test]
79    fn node_schema_has_vector_embedding() {
80        let s = memory_nodes_schema(384);
81        let f = s.field_with_name("embedding").unwrap();
82        match f.data_type() {
83            DataType::FixedSizeList(inner, dim) => {
84                assert_eq!(*dim, 384);
85                assert_eq!(inner.data_type(), &DataType::Float32);
86            }
87            other => panic!("embedding should be FixedSizeList<Float32>, got {other:?}"),
88        }
89        assert!(!f.is_nullable(), "embedding must be non-nullable");
90    }
91
92    #[test]
93    fn edge_schema_has_graph_columns() {
94        let s = memory_edges_schema();
95        for c in ["id", "src", "dst", "type", "realm", "target_namespace"] {
96            assert!(s.field_with_name(c).is_ok(), "missing column {c}");
97        }
98    }
99
100    #[test]
101    fn node_schema_has_bitemporal_columns() {
102        let s = memory_nodes_schema(384);
103        for c in BITEMPORAL_COLUMNS {
104            let f = s.field_with_name(c).unwrap_or_else(|_| panic!("missing column {c}"));
105            assert!(f.is_nullable(), "{c} must be nullable for back-compat");
106            assert_eq!(f.data_type(), &DataType::Utf8);
107        }
108    }
109}