graphos_core/execution/spill/
serializer.rs1use graphos_common::types::Value;
10use std::collections::BTreeMap;
11use std::io::{Read, Write};
12use std::sync::Arc;
13
14const TAG_NULL: u8 = 0;
16const TAG_BOOL: u8 = 1;
17const TAG_INT64: u8 = 2;
18const TAG_FLOAT64: u8 = 3;
19const TAG_STRING: u8 = 4;
20const TAG_BYTES: u8 = 5;
21const TAG_TIMESTAMP: u8 = 6;
22const TAG_LIST: u8 = 7;
23const TAG_MAP: u8 = 8;
24
25pub fn serialize_value<W: Write + ?Sized>(value: &Value, w: &mut W) -> std::io::Result<usize> {
33 match value {
34 Value::Null => {
35 w.write_all(&[TAG_NULL])?;
36 Ok(1)
37 }
38 Value::Bool(b) => {
39 w.write_all(&[TAG_BOOL, u8::from(*b)])?;
40 Ok(2)
41 }
42 Value::Int64(i) => {
43 w.write_all(&[TAG_INT64])?;
44 w.write_all(&i.to_le_bytes())?;
45 Ok(9)
46 }
47 Value::Float64(f) => {
48 w.write_all(&[TAG_FLOAT64])?;
49 w.write_all(&f.to_le_bytes())?;
50 Ok(9)
51 }
52 Value::String(s) => {
53 w.write_all(&[TAG_STRING])?;
54 let bytes = s.as_bytes();
55 w.write_all(&(bytes.len() as u64).to_le_bytes())?;
56 w.write_all(bytes)?;
57 Ok(1 + 8 + bytes.len())
58 }
59 Value::Bytes(b) => {
60 w.write_all(&[TAG_BYTES])?;
61 w.write_all(&(b.len() as u64).to_le_bytes())?;
62 w.write_all(b)?;
63 Ok(1 + 8 + b.len())
64 }
65 Value::Timestamp(t) => {
66 w.write_all(&[TAG_TIMESTAMP])?;
67 let micros = t.as_micros();
69 w.write_all(µs.to_le_bytes())?;
70 Ok(9)
71 }
72 Value::List(items) => {
73 w.write_all(&[TAG_LIST])?;
74 w.write_all(&(items.len() as u64).to_le_bytes())?;
75 let mut total = 1 + 8;
76 for item in items.iter() {
77 total += serialize_value(item, w)?;
78 }
79 Ok(total)
80 }
81 Value::Map(map) => {
82 w.write_all(&[TAG_MAP])?;
83 w.write_all(&(map.len() as u64).to_le_bytes())?;
84 let mut total = 1 + 8;
85 for (key, val) in map.iter() {
86 let key_bytes = key.as_str().as_bytes();
88 w.write_all(&(key_bytes.len() as u64).to_le_bytes())?;
89 w.write_all(key_bytes)?;
90 total += 8 + key_bytes.len();
91 total += serialize_value(val, w)?;
93 }
94 Ok(total)
95 }
96 }
97}
98
99pub fn deserialize_value<R: Read + ?Sized>(r: &mut R) -> std::io::Result<Value> {
105 let mut tag = [0u8; 1];
106 r.read_exact(&mut tag)?;
107
108 match tag[0] {
109 TAG_NULL => Ok(Value::Null),
110 TAG_BOOL => {
111 let mut buf = [0u8; 1];
112 r.read_exact(&mut buf)?;
113 Ok(Value::Bool(buf[0] != 0))
114 }
115 TAG_INT64 => {
116 let mut buf = [0u8; 8];
117 r.read_exact(&mut buf)?;
118 Ok(Value::Int64(i64::from_le_bytes(buf)))
119 }
120 TAG_FLOAT64 => {
121 let mut buf = [0u8; 8];
122 r.read_exact(&mut buf)?;
123 Ok(Value::Float64(f64::from_le_bytes(buf)))
124 }
125 TAG_STRING => {
126 let mut len_buf = [0u8; 8];
127 r.read_exact(&mut len_buf)?;
128 let len = u64::from_le_bytes(len_buf) as usize;
129 let mut str_buf = vec![0u8; len];
130 r.read_exact(&mut str_buf)?;
131 let s = String::from_utf8(str_buf)
132 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
133 Ok(Value::String(Arc::from(s)))
134 }
135 TAG_BYTES => {
136 let mut len_buf = [0u8; 8];
137 r.read_exact(&mut len_buf)?;
138 let len = u64::from_le_bytes(len_buf) as usize;
139 let mut bytes_buf = vec![0u8; len];
140 r.read_exact(&mut bytes_buf)?;
141 Ok(Value::Bytes(Arc::from(bytes_buf)))
142 }
143 TAG_TIMESTAMP => {
144 let mut buf = [0u8; 8];
145 r.read_exact(&mut buf)?;
146 let micros = i64::from_le_bytes(buf);
147 Ok(Value::Timestamp(
148 graphos_common::types::Timestamp::from_micros(micros),
149 ))
150 }
151 TAG_LIST => {
152 let mut len_buf = [0u8; 8];
153 r.read_exact(&mut len_buf)?;
154 let len = u64::from_le_bytes(len_buf) as usize;
155 let mut items = Vec::with_capacity(len);
156 for _ in 0..len {
157 items.push(deserialize_value(r)?);
158 }
159 Ok(Value::List(Arc::from(items)))
160 }
161 TAG_MAP => {
162 let mut len_buf = [0u8; 8];
163 r.read_exact(&mut len_buf)?;
164 let len = u64::from_le_bytes(len_buf) as usize;
165 let mut map = BTreeMap::new();
166 for _ in 0..len {
167 let mut key_len_buf = [0u8; 8];
169 r.read_exact(&mut key_len_buf)?;
170 let key_len = u64::from_le_bytes(key_len_buf) as usize;
171 let mut key_buf = vec![0u8; key_len];
172 r.read_exact(&mut key_buf)?;
173 let key_str = String::from_utf8(key_buf).map_err(|e| {
174 std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
175 })?;
176 let val = deserialize_value(r)?;
178 map.insert(graphos_common::types::PropertyKey::new(key_str), val);
179 }
180 Ok(Value::Map(Arc::new(map)))
181 }
182 _ => Err(std::io::Error::new(
183 std::io::ErrorKind::InvalidData,
184 format!("Unknown value tag: {}", tag[0]),
185 )),
186 }
187}
188
189pub fn serialize_row<W: Write + ?Sized>(row: &[Value], w: &mut W) -> std::io::Result<usize> {
199 w.write_all(&(row.len() as u64).to_le_bytes())?;
200 let mut total = 8;
201 for value in row {
202 total += serialize_value(value, w)?;
203 }
204 Ok(total)
205}
206
207pub fn deserialize_row<R: Read + ?Sized>(
218 r: &mut R,
219 expected_columns: usize,
220) -> std::io::Result<Vec<Value>> {
221 let mut len_buf = [0u8; 8];
222 r.read_exact(&mut len_buf)?;
223 let num_columns = u64::from_le_bytes(len_buf) as usize;
224
225 if expected_columns > 0 && num_columns != expected_columns {
226 return Err(std::io::Error::new(
227 std::io::ErrorKind::InvalidData,
228 format!(
229 "Column count mismatch: expected {}, got {}",
230 expected_columns, num_columns
231 ),
232 ));
233 }
234
235 let mut row = Vec::with_capacity(num_columns);
236 for _ in 0..num_columns {
237 row.push(deserialize_value(r)?);
238 }
239 Ok(row)
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use std::io::Cursor;
246
247 fn roundtrip_value(value: Value) -> Value {
248 let mut buf = Vec::new();
249 serialize_value(&value, &mut buf).unwrap();
250 let mut cursor = Cursor::new(buf);
251 deserialize_value(&mut cursor).unwrap()
252 }
253
254 #[test]
255 fn test_serialize_null() {
256 let result = roundtrip_value(Value::Null);
257 assert_eq!(result, Value::Null);
258 }
259
260 #[test]
261 fn test_serialize_bool() {
262 assert_eq!(roundtrip_value(Value::Bool(true)), Value::Bool(true));
263 assert_eq!(roundtrip_value(Value::Bool(false)), Value::Bool(false));
264 }
265
266 #[test]
267 fn test_serialize_int64() {
268 assert_eq!(roundtrip_value(Value::Int64(0)), Value::Int64(0));
269 assert_eq!(
270 roundtrip_value(Value::Int64(i64::MAX)),
271 Value::Int64(i64::MAX)
272 );
273 assert_eq!(
274 roundtrip_value(Value::Int64(i64::MIN)),
275 Value::Int64(i64::MIN)
276 );
277 assert_eq!(roundtrip_value(Value::Int64(-42)), Value::Int64(-42));
278 }
279
280 #[test]
281 fn test_serialize_float64() {
282 assert_eq!(roundtrip_value(Value::Float64(0.0)), Value::Float64(0.0));
283 assert_eq!(
284 roundtrip_value(Value::Float64(std::f64::consts::PI)),
285 Value::Float64(std::f64::consts::PI)
286 );
287 let nan_result = roundtrip_value(Value::Float64(f64::NAN));
289 assert!(matches!(nan_result, Value::Float64(f) if f.is_nan()));
290 }
291
292 #[test]
293 fn test_serialize_string() {
294 let result = roundtrip_value(Value::String(Arc::from("hello world")));
295 assert_eq!(result.as_str(), Some("hello world"));
296
297 let result = roundtrip_value(Value::String(Arc::from("")));
299 assert_eq!(result.as_str(), Some(""));
300
301 let result = roundtrip_value(Value::String(Arc::from("héllo 世界 🌍")));
303 assert_eq!(result.as_str(), Some("héllo 世界 🌍"));
304 }
305
306 #[test]
307 fn test_serialize_bytes() {
308 let data = vec![0u8, 1, 2, 255, 128];
309 let result = roundtrip_value(Value::Bytes(Arc::from(data.clone())));
310 assert_eq!(result.as_bytes(), Some(&data[..]));
311
312 let result = roundtrip_value(Value::Bytes(Arc::from(vec![])));
314 assert_eq!(result.as_bytes(), Some(&[][..]));
315 }
316
317 #[test]
318 fn test_serialize_timestamp() {
319 let ts = graphos_common::types::Timestamp::from_micros(1234567890);
320 let result = roundtrip_value(Value::Timestamp(ts));
321 assert_eq!(result.as_timestamp(), Some(ts));
322 }
323
324 #[test]
325 fn test_serialize_list() {
326 let list = Value::List(Arc::from(vec![
327 Value::Int64(1),
328 Value::String(Arc::from("two")),
329 Value::Bool(true),
330 ]));
331 let result = roundtrip_value(list.clone());
332 assert_eq!(result, list);
333
334 let nested = Value::List(Arc::from(vec![
336 Value::List(Arc::from(vec![Value::Int64(1), Value::Int64(2)])),
337 Value::List(Arc::from(vec![Value::Int64(3)])),
338 ]));
339 let result = roundtrip_value(nested.clone());
340 assert_eq!(result, nested);
341
342 let empty = Value::List(Arc::from(vec![]));
344 let result = roundtrip_value(empty.clone());
345 assert_eq!(result, empty);
346 }
347
348 #[test]
349 fn test_serialize_map() {
350 let mut map = BTreeMap::new();
351 map.insert(
352 graphos_common::types::PropertyKey::new("name"),
353 Value::String(Arc::from("Alice")),
354 );
355 map.insert(
356 graphos_common::types::PropertyKey::new("age"),
357 Value::Int64(30),
358 );
359
360 let value = Value::Map(Arc::new(map));
361 let result = roundtrip_value(value.clone());
362 assert_eq!(result, value);
363 }
364
365 #[test]
366 fn test_serialize_row() {
367 let row = vec![
368 Value::Int64(1),
369 Value::String(Arc::from("test")),
370 Value::Bool(true),
371 Value::Null,
372 ];
373
374 let mut buf = Vec::new();
375 serialize_row(&row, &mut buf).unwrap();
376
377 let mut cursor = Cursor::new(buf);
378 let result = deserialize_row(&mut cursor, 4).unwrap();
379 assert_eq!(result, row);
380 }
381
382 #[test]
383 fn test_serialize_row_column_count_check() {
384 let row = vec![Value::Int64(1), Value::Int64(2)];
385
386 let mut buf = Vec::new();
387 serialize_row(&row, &mut buf).unwrap();
388
389 let mut cursor = Cursor::new(buf.clone());
391 let result = deserialize_row(&mut cursor, 3);
392 assert!(result.is_err());
393
394 let mut cursor = Cursor::new(buf);
396 let result = deserialize_row(&mut cursor, 0).unwrap();
397 assert_eq!(result.len(), 2);
398 }
399
400 #[test]
401 fn test_serialize_multiple_rows() {
402 let rows = vec![
403 vec![Value::Int64(1), Value::String(Arc::from("a"))],
404 vec![Value::Int64(2), Value::String(Arc::from("b"))],
405 vec![Value::Int64(3), Value::String(Arc::from("c"))],
406 ];
407
408 let mut buf = Vec::new();
409 for row in &rows {
410 serialize_row(row, &mut buf).unwrap();
411 }
412
413 let mut cursor = Cursor::new(buf);
414 for expected in &rows {
415 let result = deserialize_row(&mut cursor, 2).unwrap();
416 assert_eq!(&result, expected);
417 }
418 }
419
420 #[test]
421 fn test_serialization_size() {
422 let mut buf = Vec::new();
424
425 serialize_value(&Value::Null, &mut buf).unwrap();
427 assert_eq!(buf.len(), 1);
428 buf.clear();
429
430 serialize_value(&Value::Bool(true), &mut buf).unwrap();
432 assert_eq!(buf.len(), 2);
433 buf.clear();
434
435 serialize_value(&Value::Int64(42), &mut buf).unwrap();
437 assert_eq!(buf.len(), 9);
438 buf.clear();
439
440 serialize_value(&Value::String(Arc::from("hi")), &mut buf).unwrap();
442 assert_eq!(buf.len(), 11);
443 }
444}