Skip to main content

varpulis_runtime/
codec.rs

1//! Checkpoint serialization codec.
2//!
3//! Provides format-agnostic serialization for checkpoint data.
4//! - **Default (JSON)**: Human-readable, backward compatible with all existing checkpoints.
5//! - **Binary (MessagePack)**: Compact and fast, enabled via the `binary-codec` feature flag.
6//!
7//! The codec auto-detects the format on deserialization by inspecting the first byte,
8//! so checkpoints written in either format can always be read back regardless of
9//! which feature is currently enabled.
10
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13
14use crate::persistence::StoreError;
15
16/// Serialization format for checkpoint data.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CheckpointFormat {
19    /// JSON (default) — human-readable, universally supported.
20    Json,
21    /// MessagePack — compact binary format, ~2-4x smaller and faster.
22    #[cfg(feature = "binary-codec")]
23    MessagePack,
24}
25
26impl CheckpointFormat {
27    /// Returns the active checkpoint format based on enabled features.
28    ///
29    /// With the `binary-codec` feature: returns `MessagePack` (compact, fast).
30    /// Without: returns `Json` (human-readable, universal).
31    pub const fn active() -> Self {
32        #[cfg(feature = "binary-codec")]
33        {
34            CheckpointFormat::MessagePack
35        }
36        #[cfg(not(feature = "binary-codec"))]
37        {
38            Self::Json
39        }
40    }
41}
42
43/// Serialize a value using the specified format.
44pub fn serialize<T: Serialize>(value: &T, format: CheckpointFormat) -> Result<Vec<u8>, StoreError> {
45    match format {
46        CheckpointFormat::Json => {
47            serde_json::to_vec(value).map_err(|e| StoreError::SerializationError(e.to_string()))
48        }
49        #[cfg(feature = "binary-codec")]
50        CheckpointFormat::MessagePack => {
51            rmp_serde::to_vec(value).map_err(|e| StoreError::SerializationError(e.to_string()))
52        }
53    }
54}
55
56/// Deserialize a value, auto-detecting the format from the data.
57///
58/// Detection heuristic:
59/// - Bytes starting with `{` (0x7B) or `[` (0x5B) → JSON
60/// - Everything else → MessagePack (if feature enabled), otherwise try JSON
61pub fn deserialize<T: DeserializeOwned>(data: &[u8]) -> Result<T, StoreError> {
62    if data.is_empty() {
63        return Err(StoreError::SerializationError(
64            "empty checkpoint data".to_string(),
65        ));
66    }
67
68    if is_json(data) {
69        serde_json::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
70    } else {
71        #[cfg(feature = "binary-codec")]
72        {
73            rmp_serde::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
74        }
75        #[cfg(not(feature = "binary-codec"))]
76        {
77            // No binary codec available — try JSON anyway as a fallback
78            serde_json::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
79        }
80    }
81}
82
83/// Check if data looks like JSON (starts with `{` or `[`, ignoring whitespace).
84fn is_json(data: &[u8]) -> bool {
85    data.iter()
86        .find(|b| !b.is_ascii_whitespace())
87        .is_some_and(|&b| b == b'{' || b == b'[')
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::persistence::{EngineCheckpoint, CHECKPOINT_VERSION};
94
95    #[test]
96    fn test_json_roundtrip() {
97        let cp = EngineCheckpoint {
98            version: CHECKPOINT_VERSION,
99            window_states: Default::default(),
100            sase_states: Default::default(),
101            join_states: Default::default(),
102            variables: Default::default(),
103            events_processed: 42,
104            output_events_emitted: 7,
105            watermark_state: None,
106            distinct_states: Default::default(),
107            limit_states: Default::default(),
108        };
109
110        let data = serialize(&cp, CheckpointFormat::Json).unwrap();
111        assert!(is_json(&data), "JSON serialization should produce JSON");
112
113        let restored: EngineCheckpoint = deserialize(&data).unwrap();
114        assert_eq!(restored.events_processed, 42);
115        assert_eq!(restored.output_events_emitted, 7);
116        assert_eq!(restored.version, CHECKPOINT_VERSION);
117    }
118
119    #[cfg(feature = "binary-codec")]
120    #[test]
121    fn test_msgpack_roundtrip() {
122        let cp = EngineCheckpoint {
123            version: CHECKPOINT_VERSION,
124            window_states: Default::default(),
125            sase_states: Default::default(),
126            join_states: Default::default(),
127            variables: Default::default(),
128            events_processed: 1000,
129            output_events_emitted: 500,
130            watermark_state: None,
131            distinct_states: Default::default(),
132            limit_states: Default::default(),
133        };
134
135        let data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
136        assert!(!is_json(&data), "MessagePack should not look like JSON");
137
138        let restored: EngineCheckpoint = deserialize(&data).unwrap();
139        assert_eq!(restored.events_processed, 1000);
140        assert_eq!(restored.output_events_emitted, 500);
141    }
142
143    #[cfg(feature = "binary-codec")]
144    #[test]
145    fn test_msgpack_smaller_than_json() {
146        use std::collections::HashMap;
147
148        use crate::persistence::Checkpoint;
149
150        let cp = Checkpoint {
151            id: 1,
152            timestamp_ms: 1700000000000,
153            events_processed: 100_000,
154            window_states: HashMap::new(),
155            pattern_states: HashMap::new(),
156            metadata: {
157                let mut m = HashMap::new();
158                m.insert("stream".to_string(), "TestStream".to_string());
159                m.insert("tenant".to_string(), "test-tenant-id".to_string());
160                m
161            },
162            context_states: HashMap::new(),
163        };
164
165        let json_data = serialize(&cp, CheckpointFormat::Json).unwrap();
166        let msgpack_data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
167
168        assert!(
169            msgpack_data.len() < json_data.len(),
170            "MessagePack ({} bytes) should be smaller than JSON ({} bytes)",
171            msgpack_data.len(),
172            json_data.len()
173        );
174    }
175
176    #[cfg(feature = "binary-codec")]
177    #[test]
178    fn test_cross_format_deserialize() {
179        let cp = EngineCheckpoint {
180            version: CHECKPOINT_VERSION,
181            window_states: Default::default(),
182            sase_states: Default::default(),
183            join_states: Default::default(),
184            variables: Default::default(),
185            events_processed: 99,
186            output_events_emitted: 33,
187            watermark_state: None,
188            distinct_states: Default::default(),
189            limit_states: Default::default(),
190        };
191
192        let json_data = serialize(&cp, CheckpointFormat::Json).unwrap();
193        let msgpack_data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
194
195        // Both should deserialize correctly via auto-detect
196        let from_json: EngineCheckpoint = deserialize(&json_data).unwrap();
197        let from_msgpack: EngineCheckpoint = deserialize(&msgpack_data).unwrap();
198
199        assert_eq!(from_json.events_processed, 99);
200        assert_eq!(from_msgpack.events_processed, 99);
201    }
202
203    #[test]
204    fn test_empty_data_error() {
205        let result = deserialize::<EngineCheckpoint>(&[]);
206        assert!(result.is_err());
207    }
208
209    #[test]
210    fn test_active_format() {
211        let format = CheckpointFormat::active();
212        #[cfg(feature = "binary-codec")]
213        assert_eq!(format, CheckpointFormat::MessagePack);
214        #[cfg(not(feature = "binary-codec"))]
215        assert_eq!(format, CheckpointFormat::Json);
216    }
217}