Skip to main content

cognis_graph/checkpoint/
serializer.rs

1//! Pluggable serialization format for checkpointers.
2//!
3//! V2's `SqliteCheckpointer` and `PostgresCheckpointer` historically
4//! hardcoded `serde_json`. Production users sometimes want CBOR (smaller),
5//! MessagePack (faster), or postcard (deterministic, embedded-friendly).
6//! [`CheckpointSerializer`] is the integration point.
7//!
8//! The shipped impl is [`JsonSerializer`] (the default; matches existing
9//! behaviour). Other formats live behind feature flags so the dep tree
10//! stays minimal: `serializer-cbor`, `serializer-msgpack`,
11//! `serializer-postcard`.
12
13use std::sync::Arc;
14
15use serde::de::DeserializeOwned;
16use serde::Serialize;
17
18use cognis_core::{CognisError, Result};
19
20/// Pluggable serialize/deserialize for checkpoint payloads.
21///
22/// The format `name` is stored alongside the payload in the backend so
23/// stale checkpoints can be deserialized with the matching format on
24/// restart. Mismatches surface as a clear error rather than corrupted
25/// state.
26pub trait CheckpointSerializer: Send + Sync {
27    /// Stable identifier (e.g. `"json"`, `"cbor"`). Stored next to the
28    /// payload so deserialization can pick the matching format.
29    fn name(&self) -> &str;
30
31    /// Serialize an arbitrary `Serialize` value into bytes.
32    fn serialize_bytes(&self, value: &serde_json::Value) -> Result<Vec<u8>>;
33
34    /// Deserialize bytes back into a `serde_json::Value`. The two-step
35    /// indirection (Value → final type) lets the trait stay non-generic
36    /// while still supporting `S: Serialize + DeserializeOwned`.
37    fn deserialize_bytes(&self, bytes: &[u8]) -> Result<serde_json::Value>;
38}
39
40/// JSON serializer (default). Matches the historical behaviour of the
41/// SQLite / Postgres checkpointers.
42#[derive(Debug, Default, Clone, Copy)]
43pub struct JsonSerializer;
44
45impl CheckpointSerializer for JsonSerializer {
46    fn name(&self) -> &str {
47        "json"
48    }
49    fn serialize_bytes(&self, value: &serde_json::Value) -> Result<Vec<u8>> {
50        serde_json::to_vec(value)
51            .map_err(|e| CognisError::Serialization(format!("json serialize: {e}")))
52    }
53    fn deserialize_bytes(&self, bytes: &[u8]) -> Result<serde_json::Value> {
54        serde_json::from_slice(bytes)
55            .map_err(|e| CognisError::Serialization(format!("json deserialize: {e}")))
56    }
57}
58
59/// CBOR serializer. Compact binary; well-supported in the embedded /
60/// IoT space. Behind feature `serializer-cbor`.
61#[cfg(feature = "serializer-cbor")]
62#[derive(Debug, Default, Clone, Copy)]
63pub struct CborSerializer;
64
65#[cfg(feature = "serializer-cbor")]
66impl CheckpointSerializer for CborSerializer {
67    fn name(&self) -> &str {
68        "cbor"
69    }
70    fn serialize_bytes(&self, value: &serde_json::Value) -> Result<Vec<u8>> {
71        let mut out = Vec::new();
72        ciborium::ser::into_writer(value, &mut out)
73            .map_err(|e| CognisError::Serialization(format!("cbor serialize: {e}")))?;
74        Ok(out)
75    }
76    fn deserialize_bytes(&self, bytes: &[u8]) -> Result<serde_json::Value> {
77        ciborium::de::from_reader(bytes)
78            .map_err(|e| CognisError::Serialization(format!("cbor deserialize: {e}")))
79    }
80}
81
82// ---------------------------------------------------------------------------
83// Helpers used by the SQLite / Postgres checkpointer impls.
84// ---------------------------------------------------------------------------
85
86/// Serialize a typed checkpoint value via the given format.
87pub fn encode<S: Serialize>(
88    serializer: &Arc<dyn CheckpointSerializer>,
89    value: &S,
90) -> Result<Vec<u8>> {
91    let v = serde_json::to_value(value)
92        .map_err(|e| CognisError::Serialization(format!("checkpoint to_value: {e}")))?;
93    serializer.serialize_bytes(&v)
94}
95
96/// Deserialize a typed checkpoint value via the given format.
97pub fn decode<S: DeserializeOwned>(
98    serializer: &Arc<dyn CheckpointSerializer>,
99    bytes: &[u8],
100) -> Result<S> {
101    let v = serializer.deserialize_bytes(bytes)?;
102    serde_json::from_value(v)
103        .map_err(|e| CognisError::Serialization(format!("checkpoint from_value: {e}")))
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn json_roundtrip() {
112        let s: Arc<dyn CheckpointSerializer> = Arc::new(JsonSerializer);
113        let v = serde_json::json!({"a": 1, "b": "hi"});
114        let bytes = s.serialize_bytes(&v).unwrap();
115        let back = s.deserialize_bytes(&bytes).unwrap();
116        assert_eq!(v, back);
117    }
118
119    #[test]
120    fn json_serializer_name() {
121        assert_eq!(JsonSerializer.name(), "json");
122    }
123
124    #[test]
125    fn typed_encode_decode_via_helpers() {
126        #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)]
127        struct Foo {
128            n: u32,
129            s: String,
130        }
131        let s: Arc<dyn CheckpointSerializer> = Arc::new(JsonSerializer);
132        let foo = Foo {
133            n: 7,
134            s: "hi".into(),
135        };
136        let bytes = encode(&s, &foo).unwrap();
137        let back: Foo = decode(&s, &bytes).unwrap();
138        assert_eq!(back, foo);
139    }
140}