Skip to main content

graphos_core/execution/spill/
serializer.rs

1//! Binary serialization for Values without serde overhead.
2//!
3//! This module provides efficient serialization for spilling operator state
4//! to disk. The format is designed for:
5//! - Minimal overhead (no schema, direct binary encoding)
6//! - Fast serialization/deserialization
7//! - Compact representation
8
9use graphos_common::types::Value;
10use std::collections::BTreeMap;
11use std::io::{Read, Write};
12use std::sync::Arc;
13
14// Type tags for Value variants
15const TAG_NULL: u8 = 0;
16const TAG_BOOL: u8 = 1;
17const TAG_INT64: u8 = 2;
18const TAG_FLOAT64: u8 = 3;
19const TAG_STRING: u8 = 4;
20const TAG_BYTES: u8 = 5;
21const TAG_TIMESTAMP: u8 = 6;
22const TAG_LIST: u8 = 7;
23const TAG_MAP: u8 = 8;
24
25/// Serializes a Value to bytes.
26///
27/// Returns the number of bytes written.
28///
29/// # Errors
30///
31/// Returns an error if writing fails.
32pub fn serialize_value<W: Write + ?Sized>(value: &Value, w: &mut W) -> std::io::Result<usize> {
33    match value {
34        Value::Null => {
35            w.write_all(&[TAG_NULL])?;
36            Ok(1)
37        }
38        Value::Bool(b) => {
39            w.write_all(&[TAG_BOOL, u8::from(*b)])?;
40            Ok(2)
41        }
42        Value::Int64(i) => {
43            w.write_all(&[TAG_INT64])?;
44            w.write_all(&i.to_le_bytes())?;
45            Ok(9)
46        }
47        Value::Float64(f) => {
48            w.write_all(&[TAG_FLOAT64])?;
49            w.write_all(&f.to_le_bytes())?;
50            Ok(9)
51        }
52        Value::String(s) => {
53            w.write_all(&[TAG_STRING])?;
54            let bytes = s.as_bytes();
55            w.write_all(&(bytes.len() as u64).to_le_bytes())?;
56            w.write_all(bytes)?;
57            Ok(1 + 8 + bytes.len())
58        }
59        Value::Bytes(b) => {
60            w.write_all(&[TAG_BYTES])?;
61            w.write_all(&(b.len() as u64).to_le_bytes())?;
62            w.write_all(b)?;
63            Ok(1 + 8 + b.len())
64        }
65        Value::Timestamp(t) => {
66            w.write_all(&[TAG_TIMESTAMP])?;
67            // Timestamp is internally an i64 (microseconds since epoch)
68            let micros = t.as_micros();
69            w.write_all(&micros.to_le_bytes())?;
70            Ok(9)
71        }
72        Value::List(items) => {
73            w.write_all(&[TAG_LIST])?;
74            w.write_all(&(items.len() as u64).to_le_bytes())?;
75            let mut total = 1 + 8;
76            for item in items.iter() {
77                total += serialize_value(item, w)?;
78            }
79            Ok(total)
80        }
81        Value::Map(map) => {
82            w.write_all(&[TAG_MAP])?;
83            w.write_all(&(map.len() as u64).to_le_bytes())?;
84            let mut total = 1 + 8;
85            for (key, val) in map.iter() {
86                // Serialize key as string
87                let key_bytes = key.as_str().as_bytes();
88                w.write_all(&(key_bytes.len() as u64).to_le_bytes())?;
89                w.write_all(key_bytes)?;
90                total += 8 + key_bytes.len();
91                // Serialize value
92                total += serialize_value(val, w)?;
93            }
94            Ok(total)
95        }
96    }
97}
98
99/// Deserializes a Value from bytes.
100///
101/// # Errors
102///
103/// Returns an error if reading fails or the format is invalid.
104pub fn deserialize_value<R: Read + ?Sized>(r: &mut R) -> std::io::Result<Value> {
105    let mut tag = [0u8; 1];
106    r.read_exact(&mut tag)?;
107
108    match tag[0] {
109        TAG_NULL => Ok(Value::Null),
110        TAG_BOOL => {
111            let mut buf = [0u8; 1];
112            r.read_exact(&mut buf)?;
113            Ok(Value::Bool(buf[0] != 0))
114        }
115        TAG_INT64 => {
116            let mut buf = [0u8; 8];
117            r.read_exact(&mut buf)?;
118            Ok(Value::Int64(i64::from_le_bytes(buf)))
119        }
120        TAG_FLOAT64 => {
121            let mut buf = [0u8; 8];
122            r.read_exact(&mut buf)?;
123            Ok(Value::Float64(f64::from_le_bytes(buf)))
124        }
125        TAG_STRING => {
126            let mut len_buf = [0u8; 8];
127            r.read_exact(&mut len_buf)?;
128            let len = u64::from_le_bytes(len_buf) as usize;
129            let mut str_buf = vec![0u8; len];
130            r.read_exact(&mut str_buf)?;
131            let s = String::from_utf8(str_buf)
132                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
133            Ok(Value::String(Arc::from(s)))
134        }
135        TAG_BYTES => {
136            let mut len_buf = [0u8; 8];
137            r.read_exact(&mut len_buf)?;
138            let len = u64::from_le_bytes(len_buf) as usize;
139            let mut bytes_buf = vec![0u8; len];
140            r.read_exact(&mut bytes_buf)?;
141            Ok(Value::Bytes(Arc::from(bytes_buf)))
142        }
143        TAG_TIMESTAMP => {
144            let mut buf = [0u8; 8];
145            r.read_exact(&mut buf)?;
146            let micros = i64::from_le_bytes(buf);
147            Ok(Value::Timestamp(
148                graphos_common::types::Timestamp::from_micros(micros),
149            ))
150        }
151        TAG_LIST => {
152            let mut len_buf = [0u8; 8];
153            r.read_exact(&mut len_buf)?;
154            let len = u64::from_le_bytes(len_buf) as usize;
155            let mut items = Vec::with_capacity(len);
156            for _ in 0..len {
157                items.push(deserialize_value(r)?);
158            }
159            Ok(Value::List(Arc::from(items)))
160        }
161        TAG_MAP => {
162            let mut len_buf = [0u8; 8];
163            r.read_exact(&mut len_buf)?;
164            let len = u64::from_le_bytes(len_buf) as usize;
165            let mut map = BTreeMap::new();
166            for _ in 0..len {
167                // Read key
168                let mut key_len_buf = [0u8; 8];
169                r.read_exact(&mut key_len_buf)?;
170                let key_len = u64::from_le_bytes(key_len_buf) as usize;
171                let mut key_buf = vec![0u8; key_len];
172                r.read_exact(&mut key_buf)?;
173                let key_str = String::from_utf8(key_buf).map_err(|e| {
174                    std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
175                })?;
176                // Read value
177                let val = deserialize_value(r)?;
178                map.insert(graphos_common::types::PropertyKey::new(key_str), val);
179            }
180            Ok(Value::Map(Arc::new(map)))
181        }
182        _ => Err(std::io::Error::new(
183            std::io::ErrorKind::InvalidData,
184            format!("Unknown value tag: {}", tag[0]),
185        )),
186    }
187}
188
189/// Serializes a row (slice of Values) to bytes.
190///
191/// Format: `[num_columns: u64][value1][value2]...`
192///
193/// Returns the number of bytes written.
194///
195/// # Errors
196///
197/// Returns an error if writing fails.
198pub fn serialize_row<W: Write + ?Sized>(row: &[Value], w: &mut W) -> std::io::Result<usize> {
199    w.write_all(&(row.len() as u64).to_le_bytes())?;
200    let mut total = 8;
201    for value in row {
202        total += serialize_value(value, w)?;
203    }
204    Ok(total)
205}
206
207/// Deserializes a row from bytes.
208///
209/// # Arguments
210///
211/// * `r` - Reader to read from
212/// * `expected_columns` - Expected number of columns (for validation, 0 to skip)
213///
214/// # Errors
215///
216/// Returns an error if reading fails or column count mismatches.
217pub fn deserialize_row<R: Read + ?Sized>(
218    r: &mut R,
219    expected_columns: usize,
220) -> std::io::Result<Vec<Value>> {
221    let mut len_buf = [0u8; 8];
222    r.read_exact(&mut len_buf)?;
223    let num_columns = u64::from_le_bytes(len_buf) as usize;
224
225    if expected_columns > 0 && num_columns != expected_columns {
226        return Err(std::io::Error::new(
227            std::io::ErrorKind::InvalidData,
228            format!(
229                "Column count mismatch: expected {}, got {}",
230                expected_columns, num_columns
231            ),
232        ));
233    }
234
235    let mut row = Vec::with_capacity(num_columns);
236    for _ in 0..num_columns {
237        row.push(deserialize_value(r)?);
238    }
239    Ok(row)
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use std::io::Cursor;
246
247    fn roundtrip_value(value: Value) -> Value {
248        let mut buf = Vec::new();
249        serialize_value(&value, &mut buf).unwrap();
250        let mut cursor = Cursor::new(buf);
251        deserialize_value(&mut cursor).unwrap()
252    }
253
254    #[test]
255    fn test_serialize_null() {
256        let result = roundtrip_value(Value::Null);
257        assert_eq!(result, Value::Null);
258    }
259
260    #[test]
261    fn test_serialize_bool() {
262        assert_eq!(roundtrip_value(Value::Bool(true)), Value::Bool(true));
263        assert_eq!(roundtrip_value(Value::Bool(false)), Value::Bool(false));
264    }
265
266    #[test]
267    fn test_serialize_int64() {
268        assert_eq!(roundtrip_value(Value::Int64(0)), Value::Int64(0));
269        assert_eq!(
270            roundtrip_value(Value::Int64(i64::MAX)),
271            Value::Int64(i64::MAX)
272        );
273        assert_eq!(
274            roundtrip_value(Value::Int64(i64::MIN)),
275            Value::Int64(i64::MIN)
276        );
277        assert_eq!(roundtrip_value(Value::Int64(-42)), Value::Int64(-42));
278    }
279
280    #[test]
281    fn test_serialize_float64() {
282        assert_eq!(roundtrip_value(Value::Float64(0.0)), Value::Float64(0.0));
283        assert_eq!(
284            roundtrip_value(Value::Float64(std::f64::consts::PI)),
285            Value::Float64(std::f64::consts::PI)
286        );
287        // Note: NaN != NaN, so we test differently
288        let nan_result = roundtrip_value(Value::Float64(f64::NAN));
289        assert!(matches!(nan_result, Value::Float64(f) if f.is_nan()));
290    }
291
292    #[test]
293    fn test_serialize_string() {
294        let result = roundtrip_value(Value::String(Arc::from("hello world")));
295        assert_eq!(result.as_str(), Some("hello world"));
296
297        // Empty string
298        let result = roundtrip_value(Value::String(Arc::from("")));
299        assert_eq!(result.as_str(), Some(""));
300
301        // Unicode
302        let result = roundtrip_value(Value::String(Arc::from("héllo 世界 🌍")));
303        assert_eq!(result.as_str(), Some("héllo 世界 🌍"));
304    }
305
306    #[test]
307    fn test_serialize_bytes() {
308        let data = vec![0u8, 1, 2, 255, 128];
309        let result = roundtrip_value(Value::Bytes(Arc::from(data.clone())));
310        assert_eq!(result.as_bytes(), Some(&data[..]));
311
312        // Empty bytes
313        let result = roundtrip_value(Value::Bytes(Arc::from(vec![])));
314        assert_eq!(result.as_bytes(), Some(&[][..]));
315    }
316
317    #[test]
318    fn test_serialize_timestamp() {
319        let ts = graphos_common::types::Timestamp::from_micros(1234567890);
320        let result = roundtrip_value(Value::Timestamp(ts));
321        assert_eq!(result.as_timestamp(), Some(ts));
322    }
323
324    #[test]
325    fn test_serialize_list() {
326        let list = Value::List(Arc::from(vec![
327            Value::Int64(1),
328            Value::String(Arc::from("two")),
329            Value::Bool(true),
330        ]));
331        let result = roundtrip_value(list.clone());
332        assert_eq!(result, list);
333
334        // Nested list
335        let nested = Value::List(Arc::from(vec![
336            Value::List(Arc::from(vec![Value::Int64(1), Value::Int64(2)])),
337            Value::List(Arc::from(vec![Value::Int64(3)])),
338        ]));
339        let result = roundtrip_value(nested.clone());
340        assert_eq!(result, nested);
341
342        // Empty list
343        let empty = Value::List(Arc::from(vec![]));
344        let result = roundtrip_value(empty.clone());
345        assert_eq!(result, empty);
346    }
347
348    #[test]
349    fn test_serialize_map() {
350        let mut map = BTreeMap::new();
351        map.insert(
352            graphos_common::types::PropertyKey::new("name"),
353            Value::String(Arc::from("Alice")),
354        );
355        map.insert(
356            graphos_common::types::PropertyKey::new("age"),
357            Value::Int64(30),
358        );
359
360        let value = Value::Map(Arc::new(map));
361        let result = roundtrip_value(value.clone());
362        assert_eq!(result, value);
363    }
364
365    #[test]
366    fn test_serialize_row() {
367        let row = vec![
368            Value::Int64(1),
369            Value::String(Arc::from("test")),
370            Value::Bool(true),
371            Value::Null,
372        ];
373
374        let mut buf = Vec::new();
375        serialize_row(&row, &mut buf).unwrap();
376
377        let mut cursor = Cursor::new(buf);
378        let result = deserialize_row(&mut cursor, 4).unwrap();
379        assert_eq!(result, row);
380    }
381
382    #[test]
383    fn test_serialize_row_column_count_check() {
384        let row = vec![Value::Int64(1), Value::Int64(2)];
385
386        let mut buf = Vec::new();
387        serialize_row(&row, &mut buf).unwrap();
388
389        // Wrong expected column count
390        let mut cursor = Cursor::new(buf.clone());
391        let result = deserialize_row(&mut cursor, 3);
392        assert!(result.is_err());
393
394        // Skip check with 0
395        let mut cursor = Cursor::new(buf);
396        let result = deserialize_row(&mut cursor, 0).unwrap();
397        assert_eq!(result.len(), 2);
398    }
399
400    #[test]
401    fn test_serialize_multiple_rows() {
402        let rows = vec![
403            vec![Value::Int64(1), Value::String(Arc::from("a"))],
404            vec![Value::Int64(2), Value::String(Arc::from("b"))],
405            vec![Value::Int64(3), Value::String(Arc::from("c"))],
406        ];
407
408        let mut buf = Vec::new();
409        for row in &rows {
410            serialize_row(row, &mut buf).unwrap();
411        }
412
413        let mut cursor = Cursor::new(buf);
414        for expected in &rows {
415            let result = deserialize_row(&mut cursor, 2).unwrap();
416            assert_eq!(&result, expected);
417        }
418    }
419
420    #[test]
421    fn test_serialization_size() {
422        // Verify expected sizes
423        let mut buf = Vec::new();
424
425        // Null: 1 byte (tag only)
426        serialize_value(&Value::Null, &mut buf).unwrap();
427        assert_eq!(buf.len(), 1);
428        buf.clear();
429
430        // Bool: 2 bytes (tag + value)
431        serialize_value(&Value::Bool(true), &mut buf).unwrap();
432        assert_eq!(buf.len(), 2);
433        buf.clear();
434
435        // Int64: 9 bytes (tag + 8)
436        serialize_value(&Value::Int64(42), &mut buf).unwrap();
437        assert_eq!(buf.len(), 9);
438        buf.clear();
439
440        // String "hi": 11 bytes (tag + 8 length + 2)
441        serialize_value(&Value::String(Arc::from("hi")), &mut buf).unwrap();
442        assert_eq!(buf.len(), 11);
443    }
444}