vibesql_executor/memory/
row_serialization.rs

1//! Row serialization for disk spilling
2//!
3//! This module provides compact binary serialization for rows and sort keys,
4//! optimized for external sort and aggregate operations.
5//!
6//! # Format
7//!
8//! Each row is serialized as:
9//! ```text
10//! [num_values: u16] [value1] [value2] ... [valueN]
11//! ```
12//!
13//! Each value is serialized as:
14//! ```text
15//! [type_tag: u8] [data...]
16//! ```
17//!
18//! # Design Decisions
19//!
20//! - **No length prefix for entire row**: We write rows sequentially and read them back in chunks,
21//!   so we don't need random access within a run.
22//! - **Compact type tags**: Single byte discriminant for common types
23//! - **Little-endian**: Matches most modern hardware for zero-copy potential
24
25use std::io::{self, Read, Write};
26
27use vibesql_types::{Interval, SqlValue};
28
29/// Type tags for serialized values
30mod tags {
31    pub const NULL: u8 = 0;
32    pub const INTEGER: u8 = 1;
33    pub const SMALLINT: u8 = 2;
34    pub const BIGINT: u8 = 3;
35    pub const UNSIGNED: u8 = 4;
36    pub const NUMERIC: u8 = 5;
37    pub const FLOAT: u8 = 6;
38    pub const REAL: u8 = 7;
39    pub const DOUBLE: u8 = 8;
40    pub const BOOLEAN_FALSE: u8 = 9;
41    pub const BOOLEAN_TRUE: u8 = 10;
42    pub const CHARACTER: u8 = 11;
43    pub const VARCHAR: u8 = 12;
44    pub const DATE: u8 = 13;
45    pub const TIME: u8 = 14;
46    pub const TIMESTAMP: u8 = 15;
47    pub const INTERVAL: u8 = 16;
48    pub const VECTOR: u8 = 17;
49    pub const BLOB: u8 = 18;
50}
51
52/// Serialize a single SqlValue to the writer
53pub fn serialize_value<W: Write>(value: &SqlValue, writer: &mut W) -> io::Result<()> {
54    match value {
55        SqlValue::Null => {
56            writer.write_all(&[tags::NULL])?;
57        }
58        SqlValue::Integer(v) => {
59            writer.write_all(&[tags::INTEGER])?;
60            writer.write_all(&v.to_le_bytes())?;
61        }
62        SqlValue::Smallint(v) => {
63            writer.write_all(&[tags::SMALLINT])?;
64            writer.write_all(&v.to_le_bytes())?;
65        }
66        SqlValue::Bigint(v) => {
67            writer.write_all(&[tags::BIGINT])?;
68            writer.write_all(&v.to_le_bytes())?;
69        }
70        SqlValue::Unsigned(v) => {
71            writer.write_all(&[tags::UNSIGNED])?;
72            writer.write_all(&v.to_le_bytes())?;
73        }
74        SqlValue::Numeric(v) => {
75            writer.write_all(&[tags::NUMERIC])?;
76            writer.write_all(&v.to_le_bytes())?;
77        }
78        SqlValue::Float(v) => {
79            writer.write_all(&[tags::FLOAT])?;
80            writer.write_all(&v.to_le_bytes())?;
81        }
82        SqlValue::Real(v) => {
83            writer.write_all(&[tags::REAL])?;
84            writer.write_all(&v.to_le_bytes())?;
85        }
86        SqlValue::Double(v) => {
87            writer.write_all(&[tags::DOUBLE])?;
88            writer.write_all(&v.to_le_bytes())?;
89        }
90        SqlValue::Boolean(false) => {
91            writer.write_all(&[tags::BOOLEAN_FALSE])?;
92        }
93        SqlValue::Boolean(true) => {
94            writer.write_all(&[tags::BOOLEAN_TRUE])?;
95        }
96        SqlValue::Character(s) | SqlValue::Varchar(s) => {
97            let tag = if matches!(value, SqlValue::Character(_)) {
98                tags::CHARACTER
99            } else {
100                tags::VARCHAR
101            };
102            writer.write_all(&[tag])?;
103            let bytes = s.as_bytes();
104            let len = bytes.len() as u32;
105            writer.write_all(&len.to_le_bytes())?;
106            writer.write_all(bytes)?;
107        }
108        SqlValue::Date(d) => {
109            writer.write_all(&[tags::DATE])?;
110            // Serialize date components: year (i32), month (u8), day (u8)
111            writer.write_all(&d.year.to_le_bytes())?;
112            writer.write_all(&[d.month])?;
113            writer.write_all(&[d.day])?;
114        }
115        SqlValue::Time(t) => {
116            writer.write_all(&[tags::TIME])?;
117            // Serialize time components: hour (u8), minute (u8), second (u8), nanosecond (u32)
118            writer.write_all(&[t.hour])?;
119            writer.write_all(&[t.minute])?;
120            writer.write_all(&[t.second])?;
121            writer.write_all(&t.nanosecond.to_le_bytes())?;
122        }
123        SqlValue::Timestamp(ts) => {
124            writer.write_all(&[tags::TIMESTAMP])?;
125            // Serialize date and time components
126            writer.write_all(&ts.date.year.to_le_bytes())?;
127            writer.write_all(&[ts.date.month])?;
128            writer.write_all(&[ts.date.day])?;
129            writer.write_all(&[ts.time.hour])?;
130            writer.write_all(&[ts.time.minute])?;
131            writer.write_all(&[ts.time.second])?;
132            writer.write_all(&ts.time.nanosecond.to_le_bytes())?;
133        }
134        SqlValue::Interval(i) => {
135            writer.write_all(&[tags::INTERVAL])?;
136            // Serialize interval as string (the original string value preserves all info)
137            let bytes = i.value.as_bytes();
138            let len = bytes.len() as u32;
139            writer.write_all(&len.to_le_bytes())?;
140            writer.write_all(bytes)?;
141        }
142        SqlValue::Vector(v) => {
143            writer.write_all(&[tags::VECTOR])?;
144            let len = v.len() as u32;
145            writer.write_all(&len.to_le_bytes())?;
146            for f in v {
147                writer.write_all(&f.to_le_bytes())?;
148            }
149        }
150        SqlValue::Blob(b) => {
151            writer.write_all(&[tags::BLOB])?;
152            let len = b.len() as u32;
153            writer.write_all(&len.to_le_bytes())?;
154            writer.write_all(b)?;
155        }
156    }
157    Ok(())
158}
159
160/// Deserialize a single SqlValue from the reader
161pub fn deserialize_value<R: Read>(reader: &mut R) -> io::Result<SqlValue> {
162    let mut tag = [0u8; 1];
163    reader.read_exact(&mut tag)?;
164
165    match tag[0] {
166        tags::NULL => Ok(SqlValue::Null),
167        tags::INTEGER => {
168            let mut buf = [0u8; 8];
169            reader.read_exact(&mut buf)?;
170            Ok(SqlValue::Integer(i64::from_le_bytes(buf)))
171        }
172        tags::SMALLINT => {
173            let mut buf = [0u8; 2];
174            reader.read_exact(&mut buf)?;
175            Ok(SqlValue::Smallint(i16::from_le_bytes(buf)))
176        }
177        tags::BIGINT => {
178            let mut buf = [0u8; 8];
179            reader.read_exact(&mut buf)?;
180            Ok(SqlValue::Bigint(i64::from_le_bytes(buf)))
181        }
182        tags::UNSIGNED => {
183            let mut buf = [0u8; 8];
184            reader.read_exact(&mut buf)?;
185            Ok(SqlValue::Unsigned(u64::from_le_bytes(buf)))
186        }
187        tags::NUMERIC => {
188            let mut buf = [0u8; 8];
189            reader.read_exact(&mut buf)?;
190            Ok(SqlValue::Numeric(f64::from_le_bytes(buf)))
191        }
192        tags::FLOAT => {
193            let mut buf = [0u8; 4];
194            reader.read_exact(&mut buf)?;
195            Ok(SqlValue::Float(f32::from_le_bytes(buf)))
196        }
197        tags::REAL => {
198            // Real is now f64 (SQLite REAL is 8-byte IEEE float)
199            let mut buf = [0u8; 8];
200            reader.read_exact(&mut buf)?;
201            Ok(SqlValue::Real(f64::from_le_bytes(buf)))
202        }
203        tags::DOUBLE => {
204            let mut buf = [0u8; 8];
205            reader.read_exact(&mut buf)?;
206            Ok(SqlValue::Double(f64::from_le_bytes(buf)))
207        }
208        tags::BOOLEAN_FALSE => Ok(SqlValue::Boolean(false)),
209        tags::BOOLEAN_TRUE => Ok(SqlValue::Boolean(true)),
210        tags::CHARACTER | tags::VARCHAR => {
211            let mut len_buf = [0u8; 4];
212            reader.read_exact(&mut len_buf)?;
213            let len = u32::from_le_bytes(len_buf) as usize;
214
215            let mut str_buf = vec![0u8; len];
216            reader.read_exact(&mut str_buf)?;
217
218            let s = String::from_utf8(str_buf)
219                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
220
221            if tag[0] == tags::CHARACTER {
222                Ok(SqlValue::Character(s.into()))
223            } else {
224                Ok(SqlValue::Varchar(s.into()))
225            }
226        }
227        tags::DATE => {
228            let mut year_buf = [0u8; 4];
229            let mut month_buf = [0u8; 1];
230            let mut day_buf = [0u8; 1];
231            reader.read_exact(&mut year_buf)?;
232            reader.read_exact(&mut month_buf)?;
233            reader.read_exact(&mut day_buf)?;
234            let year = i32::from_le_bytes(year_buf);
235            let month = month_buf[0];
236            let day = day_buf[0];
237            Ok(SqlValue::Date(
238                vibesql_types::Date::new(year, month, day)
239                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
240            ))
241        }
242        tags::TIME => {
243            let mut hour_buf = [0u8; 1];
244            let mut minute_buf = [0u8; 1];
245            let mut second_buf = [0u8; 1];
246            let mut nano_buf = [0u8; 4];
247            reader.read_exact(&mut hour_buf)?;
248            reader.read_exact(&mut minute_buf)?;
249            reader.read_exact(&mut second_buf)?;
250            reader.read_exact(&mut nano_buf)?;
251            Ok(SqlValue::Time(
252                vibesql_types::Time::new(
253                    hour_buf[0],
254                    minute_buf[0],
255                    second_buf[0],
256                    u32::from_le_bytes(nano_buf),
257                )
258                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
259            ))
260        }
261        tags::TIMESTAMP => {
262            let mut year_buf = [0u8; 4];
263            let mut month_buf = [0u8; 1];
264            let mut day_buf = [0u8; 1];
265            let mut hour_buf = [0u8; 1];
266            let mut minute_buf = [0u8; 1];
267            let mut second_buf = [0u8; 1];
268            let mut nano_buf = [0u8; 4];
269            reader.read_exact(&mut year_buf)?;
270            reader.read_exact(&mut month_buf)?;
271            reader.read_exact(&mut day_buf)?;
272            reader.read_exact(&mut hour_buf)?;
273            reader.read_exact(&mut minute_buf)?;
274            reader.read_exact(&mut second_buf)?;
275            reader.read_exact(&mut nano_buf)?;
276            let date =
277                vibesql_types::Date::new(i32::from_le_bytes(year_buf), month_buf[0], day_buf[0])
278                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
279            let time = vibesql_types::Time::new(
280                hour_buf[0],
281                minute_buf[0],
282                second_buf[0],
283                u32::from_le_bytes(nano_buf),
284            )
285            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
286            Ok(SqlValue::Timestamp(vibesql_types::Timestamp::new(date, time)))
287        }
288        tags::INTERVAL => {
289            let mut len_buf = [0u8; 4];
290            reader.read_exact(&mut len_buf)?;
291            let len = u32::from_le_bytes(len_buf) as usize;
292
293            let mut str_buf = vec![0u8; len];
294            reader.read_exact(&mut str_buf)?;
295
296            let s = String::from_utf8(str_buf)
297                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
298
299            Ok(SqlValue::Interval(Interval::new(s)))
300        }
301        tags::VECTOR => {
302            let mut len_buf = [0u8; 4];
303            reader.read_exact(&mut len_buf)?;
304            let len = u32::from_le_bytes(len_buf) as usize;
305
306            let mut v = Vec::with_capacity(len);
307            for _ in 0..len {
308                let mut buf = [0u8; 4];
309                reader.read_exact(&mut buf)?;
310                v.push(f32::from_le_bytes(buf));
311            }
312            Ok(SqlValue::Vector(v))
313        }
314        tags::BLOB => {
315            let mut len_buf = [0u8; 4];
316            reader.read_exact(&mut len_buf)?;
317            let len = u32::from_le_bytes(len_buf) as usize;
318
319            let mut b = vec![0u8; len];
320            reader.read_exact(&mut b)?;
321            Ok(SqlValue::Blob(b))
322        }
323        _ => {
324            Err(io::Error::new(io::ErrorKind::InvalidData, format!("unknown type tag: {}", tag[0])))
325        }
326    }
327}
328
329/// Serialize a row to the writer
330///
331/// Format: [num_values: u16] [value1] [value2] ...
332pub fn serialize_row<W: Write>(row: &vibesql_storage::Row, writer: &mut W) -> io::Result<()> {
333    let num_values = row.values.len() as u16;
334    writer.write_all(&num_values.to_le_bytes())?;
335
336    for value in &row.values {
337        serialize_value(value, writer)?;
338    }
339
340    Ok(())
341}
342
343/// Deserialize a row from the reader
344pub fn deserialize_row<R: Read>(reader: &mut R) -> io::Result<vibesql_storage::Row> {
345    let mut len_buf = [0u8; 2];
346    reader.read_exact(&mut len_buf)?;
347    let num_values = u16::from_le_bytes(len_buf) as usize;
348
349    let mut values = Vec::with_capacity(num_values);
350    for _ in 0..num_values {
351        values.push(deserialize_value(reader)?);
352    }
353
354    Ok(vibesql_storage::Row::from_vec(values))
355}
356
357/// Serialize a row with its sort keys
358///
359/// Format: [row] [num_keys: u16] [key1_value] [key1_dir: u8] ...
360pub fn serialize_row_with_keys<W: Write>(
361    row: &vibesql_storage::Row,
362    sort_keys: &[(SqlValue, vibesql_ast::OrderDirection)],
363    writer: &mut W,
364) -> io::Result<()> {
365    serialize_row(row, writer)?;
366
367    let num_keys = sort_keys.len() as u16;
368    writer.write_all(&num_keys.to_le_bytes())?;
369
370    for (value, direction) in sort_keys {
371        serialize_value(value, writer)?;
372        let dir_byte = match direction {
373            vibesql_ast::OrderDirection::Asc => 0u8,
374            vibesql_ast::OrderDirection::Desc => 1u8,
375        };
376        writer.write_all(&[dir_byte])?;
377    }
378
379    Ok(())
380}
381
382/// Deserialize a row with its sort keys
383pub fn deserialize_row_with_keys<R: Read>(
384    reader: &mut R,
385) -> io::Result<(vibesql_storage::Row, Vec<(SqlValue, vibesql_ast::OrderDirection)>)> {
386    let row = deserialize_row(reader)?;
387
388    let mut len_buf = [0u8; 2];
389    reader.read_exact(&mut len_buf)?;
390    let num_keys = u16::from_le_bytes(len_buf) as usize;
391
392    let mut keys = Vec::with_capacity(num_keys);
393    for _ in 0..num_keys {
394        let value = deserialize_value(reader)?;
395        let mut dir_buf = [0u8; 1];
396        reader.read_exact(&mut dir_buf)?;
397        let direction = if dir_buf[0] == 0 {
398            vibesql_ast::OrderDirection::Asc
399        } else {
400            vibesql_ast::OrderDirection::Desc
401        };
402        keys.push((value, direction));
403    }
404
405    Ok((row, keys))
406}
407
408/// Estimate the serialized size of a row in bytes
409///
410/// Used for memory budget tracking when deciding whether to spill.
411pub fn estimate_serialized_size(row: &vibesql_storage::Row) -> usize {
412    let mut size = 2; // num_values header
413
414    for value in &row.values {
415        size += estimate_value_size(value);
416    }
417
418    size
419}
420
421/// Estimate the serialized size of a value
422fn estimate_value_size(value: &SqlValue) -> usize {
423    match value {
424        SqlValue::Null => 1,
425        SqlValue::Integer(_) | SqlValue::Bigint(_) => 1 + 8,
426        SqlValue::Smallint(_) => 1 + 2,
427        SqlValue::Unsigned(_) | SqlValue::Numeric(_) | SqlValue::Double(_) | SqlValue::Real(_) => 1 + 8,  // Real is now f64
428        SqlValue::Float(_) => 1 + 4,
429        SqlValue::Boolean(_) => 1,
430        SqlValue::Character(s) | SqlValue::Varchar(s) => 1 + 4 + s.len(),
431        SqlValue::Date(_) => 1 + 4 + 1 + 1, // tag + year + month + day
432        SqlValue::Time(_) => 1 + 1 + 1 + 1 + 4, // tag + hour + minute + second + nanosecond
433        SqlValue::Timestamp(_) => 1 + 4 + 1 + 1 + 1 + 1 + 1 + 4, // tag + date + time components
434        SqlValue::Interval(i) => 1 + 4 + i.value.len(), // tag + length + string
435        SqlValue::Vector(v) => 1 + 4 + (v.len() * 4),
436        SqlValue::Blob(b) => 1 + 4 + b.len(),
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use std::io::Cursor;
443
444    use super::*;
445
446    #[test]
447    fn test_roundtrip_null() {
448        let value = SqlValue::Null;
449        let mut buf = Vec::new();
450        serialize_value(&value, &mut buf).unwrap();
451
452        let mut reader = Cursor::new(buf);
453        let result = deserialize_value(&mut reader).unwrap();
454        assert_eq!(result, value);
455    }
456
457    #[test]
458    fn test_roundtrip_integer() {
459        let value = SqlValue::Integer(12345);
460        let mut buf = Vec::new();
461        serialize_value(&value, &mut buf).unwrap();
462
463        let mut reader = Cursor::new(buf);
464        let result = deserialize_value(&mut reader).unwrap();
465        assert_eq!(result, value);
466    }
467
468    #[test]
469    fn test_roundtrip_string() {
470        let value = SqlValue::Varchar("hello world".into());
471        let mut buf = Vec::new();
472        serialize_value(&value, &mut buf).unwrap();
473
474        let mut reader = Cursor::new(buf);
475        let result = deserialize_value(&mut reader).unwrap();
476        assert_eq!(result, value);
477    }
478
479    #[test]
480    fn test_roundtrip_date() {
481        let value = SqlValue::Date(vibesql_types::Date::new(2024, 3, 15).unwrap());
482        let mut buf = Vec::new();
483        serialize_value(&value, &mut buf).unwrap();
484
485        let mut reader = Cursor::new(buf);
486        let result = deserialize_value(&mut reader).unwrap();
487        assert_eq!(result, value);
488    }
489
490    #[test]
491    fn test_roundtrip_vector() {
492        let vec_data = vec![1.0f32, 2.0f32, 3.0f32, 4.0f32];
493        let value = SqlValue::Vector(vec_data.clone());
494        let mut buf = Vec::new();
495        serialize_value(&value, &mut buf).unwrap();
496
497        let mut reader = Cursor::new(buf);
498        let result = deserialize_value(&mut reader).unwrap();
499
500        // SqlValue's PartialEq doesn't cover Vector, so compare manually
501        match result {
502            SqlValue::Vector(v) => assert_eq!(v, vec_data),
503            _ => panic!("Expected Vector variant"),
504        }
505    }
506
507    #[test]
508    fn test_roundtrip_row() {
509        let row = vibesql_storage::Row::from_vec(vec![
510            SqlValue::Integer(42),
511            SqlValue::Varchar("test".into()),
512            SqlValue::Double(2.5),
513            SqlValue::Null,
514        ]);
515
516        let mut buf = Vec::new();
517        serialize_row(&row, &mut buf).unwrap();
518
519        let mut reader = Cursor::new(buf);
520        let result = deserialize_row(&mut reader).unwrap();
521        assert_eq!(result, row);
522    }
523
524    #[test]
525    fn test_roundtrip_row_with_keys() {
526        let row = vibesql_storage::Row::from_vec(vec![
527            SqlValue::Integer(42),
528            SqlValue::Varchar("test".into()),
529        ]);
530        let keys = vec![
531            (SqlValue::Integer(42), vibesql_ast::OrderDirection::Asc),
532            (SqlValue::Varchar("test".into()), vibesql_ast::OrderDirection::Desc),
533        ];
534
535        let mut buf = Vec::new();
536        serialize_row_with_keys(&row, &keys, &mut buf).unwrap();
537
538        let mut reader = Cursor::new(buf);
539        let (result_row, result_keys) = deserialize_row_with_keys(&mut reader).unwrap();
540        assert_eq!(result_row, row);
541        assert_eq!(result_keys.len(), keys.len());
542        assert_eq!(result_keys[0].0, keys[0].0);
543        assert_eq!(result_keys[1].0, keys[1].0);
544    }
545
546    #[test]
547    fn test_size_estimation() {
548        let row = vibesql_storage::Row::from_vec(vec![
549            SqlValue::Integer(42),          // 1 + 8 = 9
550            SqlValue::Varchar("hi".into()), // 1 + 4 + 2 = 7
551        ]);
552
553        let estimated = estimate_serialized_size(&row);
554        let mut buf = Vec::new();
555        serialize_row(&row, &mut buf).unwrap();
556
557        // Estimated should be close to actual
558        assert_eq!(estimated, 2 + 9 + 7); // header + values
559        assert_eq!(buf.len(), estimated);
560    }
561}