varpulis_runtime/
codec.rs1use serde::de::DeserializeOwned;
12use serde::Serialize;
13
14use crate::persistence::StoreError;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum CheckpointFormat {
19 Json,
21 #[cfg(feature = "binary-codec")]
23 MessagePack,
24}
25
26impl CheckpointFormat {
27 pub const fn active() -> Self {
32 #[cfg(feature = "binary-codec")]
33 {
34 CheckpointFormat::MessagePack
35 }
36 #[cfg(not(feature = "binary-codec"))]
37 {
38 Self::Json
39 }
40 }
41}
42
43pub fn serialize<T: Serialize>(value: &T, format: CheckpointFormat) -> Result<Vec<u8>, StoreError> {
45 match format {
46 CheckpointFormat::Json => {
47 serde_json::to_vec(value).map_err(|e| StoreError::SerializationError(e.to_string()))
48 }
49 #[cfg(feature = "binary-codec")]
50 CheckpointFormat::MessagePack => {
51 rmp_serde::to_vec(value).map_err(|e| StoreError::SerializationError(e.to_string()))
52 }
53 }
54}
55
56pub fn deserialize<T: DeserializeOwned>(data: &[u8]) -> Result<T, StoreError> {
62 if data.is_empty() {
63 return Err(StoreError::SerializationError(
64 "empty checkpoint data".to_string(),
65 ));
66 }
67
68 if is_json(data) {
69 serde_json::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
70 } else {
71 #[cfg(feature = "binary-codec")]
72 {
73 rmp_serde::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
74 }
75 #[cfg(not(feature = "binary-codec"))]
76 {
77 serde_json::from_slice(data).map_err(|e| StoreError::SerializationError(e.to_string()))
79 }
80 }
81}
82
83fn is_json(data: &[u8]) -> bool {
85 data.iter()
86 .find(|b| !b.is_ascii_whitespace())
87 .is_some_and(|&b| b == b'{' || b == b'[')
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::persistence::{EngineCheckpoint, CHECKPOINT_VERSION};
94
95 #[test]
96 fn test_json_roundtrip() {
97 let cp = EngineCheckpoint {
98 version: CHECKPOINT_VERSION,
99 window_states: Default::default(),
100 sase_states: Default::default(),
101 join_states: Default::default(),
102 variables: Default::default(),
103 events_processed: 42,
104 output_events_emitted: 7,
105 watermark_state: None,
106 distinct_states: Default::default(),
107 limit_states: Default::default(),
108 };
109
110 let data = serialize(&cp, CheckpointFormat::Json).unwrap();
111 assert!(is_json(&data), "JSON serialization should produce JSON");
112
113 let restored: EngineCheckpoint = deserialize(&data).unwrap();
114 assert_eq!(restored.events_processed, 42);
115 assert_eq!(restored.output_events_emitted, 7);
116 assert_eq!(restored.version, CHECKPOINT_VERSION);
117 }
118
119 #[cfg(feature = "binary-codec")]
120 #[test]
121 fn test_msgpack_roundtrip() {
122 let cp = EngineCheckpoint {
123 version: CHECKPOINT_VERSION,
124 window_states: Default::default(),
125 sase_states: Default::default(),
126 join_states: Default::default(),
127 variables: Default::default(),
128 events_processed: 1000,
129 output_events_emitted: 500,
130 watermark_state: None,
131 distinct_states: Default::default(),
132 limit_states: Default::default(),
133 };
134
135 let data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
136 assert!(!is_json(&data), "MessagePack should not look like JSON");
137
138 let restored: EngineCheckpoint = deserialize(&data).unwrap();
139 assert_eq!(restored.events_processed, 1000);
140 assert_eq!(restored.output_events_emitted, 500);
141 }
142
143 #[cfg(feature = "binary-codec")]
144 #[test]
145 fn test_msgpack_smaller_than_json() {
146 use std::collections::HashMap;
147
148 use crate::persistence::Checkpoint;
149
150 let cp = Checkpoint {
151 id: 1,
152 timestamp_ms: 1700000000000,
153 events_processed: 100_000,
154 window_states: HashMap::new(),
155 pattern_states: HashMap::new(),
156 metadata: {
157 let mut m = HashMap::new();
158 m.insert("stream".to_string(), "TestStream".to_string());
159 m.insert("tenant".to_string(), "test-tenant-id".to_string());
160 m
161 },
162 context_states: HashMap::new(),
163 };
164
165 let json_data = serialize(&cp, CheckpointFormat::Json).unwrap();
166 let msgpack_data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
167
168 assert!(
169 msgpack_data.len() < json_data.len(),
170 "MessagePack ({} bytes) should be smaller than JSON ({} bytes)",
171 msgpack_data.len(),
172 json_data.len()
173 );
174 }
175
176 #[cfg(feature = "binary-codec")]
177 #[test]
178 fn test_cross_format_deserialize() {
179 let cp = EngineCheckpoint {
180 version: CHECKPOINT_VERSION,
181 window_states: Default::default(),
182 sase_states: Default::default(),
183 join_states: Default::default(),
184 variables: Default::default(),
185 events_processed: 99,
186 output_events_emitted: 33,
187 watermark_state: None,
188 distinct_states: Default::default(),
189 limit_states: Default::default(),
190 };
191
192 let json_data = serialize(&cp, CheckpointFormat::Json).unwrap();
193 let msgpack_data = serialize(&cp, CheckpointFormat::MessagePack).unwrap();
194
195 let from_json: EngineCheckpoint = deserialize(&json_data).unwrap();
197 let from_msgpack: EngineCheckpoint = deserialize(&msgpack_data).unwrap();
198
199 assert_eq!(from_json.events_processed, 99);
200 assert_eq!(from_msgpack.events_processed, 99);
201 }
202
203 #[test]
204 fn test_empty_data_error() {
205 let result = deserialize::<EngineCheckpoint>(&[]);
206 assert!(result.is_err());
207 }
208
209 #[test]
210 fn test_active_format() {
211 let format = CheckpointFormat::active();
212 #[cfg(feature = "binary-codec")]
213 assert_eq!(format, CheckpointFormat::MessagePack);
214 #[cfg(not(feature = "binary-codec"))]
215 assert_eq!(format, CheckpointFormat::Json);
216 }
217}