Skip to main content

nodedb_columnar/
wal_record.rs

1//! WAL record types for columnar operations.
2//!
3//! Each mutation (INSERT, DELETE, compaction commit) produces a WAL record
4//! that is written before the mutation is applied. On crash recovery, WAL
5//! records are replayed to reconstruct the memtable, delete bitmaps, and
6//! segment metadata.
7//!
8//! Records are serialized as MessagePack for compact wire representation.
9
10use serde::{Deserialize, Serialize};
11use sonic_rs;
12use zerompk::{FromMessagePack, ToMessagePack};
13
14/// A WAL record for a columnar collection operation.
15#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
16pub enum ColumnarWalRecord {
17    /// A row was inserted into the memtable.
18    ///
19    /// Contains the collection name and the row data as packed binary
20    /// (the columnar wire format, not MessagePack). On replay, the row
21    /// is re-inserted into the memtable.
22    InsertRow {
23        collection: String,
24        /// Row data as packed binary values. Each value is encoded per its
25        /// column type: i64 as 8 LE bytes, f64 as 8 LE bytes, strings as
26        /// length-prefixed UTF-8, etc.
27        row_data: Vec<u8>,
28    },
29
30    /// Rows were marked as deleted in a segment's delete bitmap.
31    ///
32    /// On replay, these row indices are re-applied to the segment's
33    /// delete bitmap.
34    DeleteRows {
35        collection: String,
36        segment_id: u32,
37        row_indices: Vec<u32>,
38    },
39
40    /// A compaction was committed: old segments replaced with new ones.
41    ///
42    /// This is the atomic commit point of the 3-phase compaction protocol.
43    /// On replay:
44    /// - If new segments exist on disk: complete the metadata swap.
45    /// - If new segments don't exist: the compaction was interrupted before
46    ///   writing; discard and treat old segments as authoritative.
47    CompactionCommit {
48        collection: String,
49        old_segment_ids: Vec<u32>,
50        new_segment_ids: Vec<u32>,
51    },
52
53    /// The memtable was flushed to a new segment.
54    ///
55    /// On replay, if the segment file exists, update metadata to include it.
56    /// If it doesn't exist, the flush was interrupted; rows are already in
57    /// the memtable via InsertRow records.
58    MemtableFlushed {
59        collection: String,
60        segment_id: u32,
61        row_count: u64,
62    },
63}
64
65impl ColumnarWalRecord {
66    /// Collection name this record belongs to.
67    pub fn collection(&self) -> &str {
68        match self {
69            Self::InsertRow { collection, .. }
70            | Self::DeleteRows { collection, .. }
71            | Self::CompactionCommit { collection, .. }
72            | Self::MemtableFlushed { collection, .. } => collection,
73        }
74    }
75
76    /// Serialize the record to bytes.
77    pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
78        zerompk::to_msgpack_vec(self)
79            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
80    }
81
82    /// Deserialize a record from bytes.
83    pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
84        zerompk::from_msgpack(data)
85            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
86    }
87}
88
89/// Encode a row of values into the columnar wire format for WAL records.
90///
91/// Each value is written as: [type_tag: u8][value_bytes].
92/// This is more compact than MessagePack for typed columns and enables
93/// direct replay into the memtable without schema interpretation overhead.
94pub fn encode_row_for_wal(
95    values: &[nodedb_types::value::Value],
96) -> Result<Vec<u8>, crate::error::ColumnarError> {
97    use nodedb_types::value::Value;
98
99    let mut buf = Vec::with_capacity(values.len() * 10); // Rough estimate.
100
101    for value in values {
102        match value {
103            Value::Null => buf.push(0),
104            Value::Integer(v) => {
105                buf.push(1);
106                buf.extend_from_slice(&v.to_le_bytes());
107            }
108            Value::Float(v) => {
109                buf.push(2);
110                buf.extend_from_slice(&v.to_le_bytes());
111            }
112            Value::Bool(v) => {
113                buf.push(3);
114                buf.push(*v as u8);
115            }
116            Value::String(s) => {
117                buf.push(4);
118                let bytes = s.as_bytes();
119                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
120                buf.extend_from_slice(bytes);
121            }
122            Value::Bytes(b) => {
123                buf.push(5);
124                buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
125                buf.extend_from_slice(b);
126            }
127            Value::DateTime(dt) => {
128                buf.push(6);
129                buf.extend_from_slice(&dt.micros.to_le_bytes());
130            }
131            Value::Decimal(d) => {
132                buf.push(7);
133                buf.extend_from_slice(&d.serialize());
134            }
135            Value::Uuid(s) => {
136                buf.push(8);
137                let bytes = s.as_bytes();
138                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
139                buf.extend_from_slice(bytes);
140            }
141            Value::Array(arr) => {
142                // Vectors stored as: tag(9) + count(u32) + f32 values.
143                buf.push(9);
144                buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
145                for v in arr {
146                    let f = match v {
147                        Value::Float(f) => *f as f32,
148                        Value::Integer(n) => *n as f32,
149                        _ => 0.0,
150                    };
151                    buf.extend_from_slice(&f.to_le_bytes());
152                }
153            }
154            _ => {
155                // Geometry and other complex types: serialize as JSON bytes.
156                buf.push(10);
157                let json = sonic_rs::to_vec(value).map_err(|e| {
158                    crate::error::ColumnarError::Serialization(format!(
159                        "failed to serialize value as JSON: {e}"
160                    ))
161                })?;
162                buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
163                buf.extend_from_slice(&json);
164            }
165        }
166    }
167
168    Ok(buf)
169}
170
171/// Maximum length for a variable-length field in a WAL record (256 MiB).
172/// Prevents OOM from crafted/corrupt records with bogus length prefixes.
173const MAX_FIELD_LEN: usize = 256 * 1024 * 1024;
174
175/// Read exactly `n` bytes from `data` at `cursor`, advancing cursor.
176/// Returns `Err` if not enough bytes remain.
177fn read_slice<'a>(
178    data: &'a [u8],
179    cursor: &mut usize,
180    n: usize,
181    context: &str,
182) -> Result<&'a [u8], crate::error::ColumnarError> {
183    let end = cursor.checked_add(n).ok_or_else(|| {
184        crate::error::ColumnarError::Serialization(format!("overflow in {context}"))
185    })?;
186    if end > data.len() {
187        return Err(crate::error::ColumnarError::Serialization(format!(
188            "truncated {context}: need {n} bytes at offset {cursor}, have {}",
189            data.len().saturating_sub(*cursor)
190        )));
191    }
192    let slice = &data[*cursor..end];
193    *cursor = end;
194    Ok(slice)
195}
196
197/// Read a u32 length prefix, validate it against MAX_FIELD_LEN, then read
198/// that many bytes. Returns the payload slice.
199fn read_length_prefixed<'a>(
200    data: &'a [u8],
201    cursor: &mut usize,
202    context: &str,
203) -> Result<&'a [u8], crate::error::ColumnarError> {
204    let len_bytes = read_slice(data, cursor, 4, context)?;
205    let len = u32::from_le_bytes(len_bytes.try_into().map_err(|_| {
206        crate::error::ColumnarError::Serialization(format!("truncated {context} len"))
207    })?) as usize;
208    if len > MAX_FIELD_LEN {
209        return Err(crate::error::ColumnarError::Serialization(format!(
210            "{context} length {len} exceeds maximum {MAX_FIELD_LEN}"
211        )));
212    }
213    read_slice(data, cursor, len, context)
214}
215
216/// Decode a row from the columnar wire format back into Values.
217pub fn decode_row_from_wal(
218    data: &[u8],
219) -> Result<Vec<nodedb_types::value::Value>, crate::error::ColumnarError> {
220    use nodedb_types::value::Value;
221
222    let mut values = Vec::new();
223    let mut cursor = 0;
224
225    while cursor < data.len() {
226        let tag_slice = read_slice(data, &mut cursor, 1, "tag")?;
227        let tag = tag_slice[0];
228
229        let value = match tag {
230            0 => Value::Null,
231            1 => {
232                let bytes = read_slice(data, &mut cursor, 8, "i64")?;
233                let v = i64::from_le_bytes(bytes.try_into().map_err(|_| {
234                    crate::error::ColumnarError::Serialization("truncated i64".into())
235                })?);
236                Value::Integer(v)
237            }
238            2 => {
239                let bytes = read_slice(data, &mut cursor, 8, "f64")?;
240                let v = f64::from_le_bytes(bytes.try_into().map_err(|_| {
241                    crate::error::ColumnarError::Serialization("truncated f64".into())
242                })?);
243                Value::Float(v)
244            }
245            3 => {
246                let bytes = read_slice(data, &mut cursor, 1, "bool")?;
247                Value::Bool(bytes[0] != 0)
248            }
249            4 | 5 | 8 => {
250                let bytes = read_length_prefixed(
251                    data,
252                    &mut cursor,
253                    match tag {
254                        4 => "string",
255                        5 => "bytes",
256                        8 => "uuid",
257                        _ => unreachable!(),
258                    },
259                )?;
260                match tag {
261                    4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
262                    5 => Value::Bytes(bytes.to_vec()),
263                    8 => Value::Uuid(String::from_utf8_lossy(bytes).into_owned()),
264                    _ => unreachable!(),
265                }
266            }
267            6 => {
268                let bytes = read_slice(data, &mut cursor, 8, "timestamp")?;
269                let micros = i64::from_le_bytes(bytes.try_into().map_err(|_| {
270                    crate::error::ColumnarError::Serialization("truncated timestamp".into())
271                })?);
272                Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
273            }
274            7 => {
275                let bytes = read_slice(data, &mut cursor, 16, "decimal")?;
276                let mut arr = [0u8; 16];
277                arr.copy_from_slice(bytes);
278                Value::Decimal(rust_decimal::Decimal::deserialize(arr))
279            }
280            9 => {
281                let count_bytes = read_slice(data, &mut cursor, 4, "vector count")?;
282                let count = u32::from_le_bytes(count_bytes.try_into().map_err(|_| {
283                    crate::error::ColumnarError::Serialization("truncated vector count".into())
284                })?) as usize;
285                if count > MAX_FIELD_LEN / 4 {
286                    return Err(crate::error::ColumnarError::Serialization(format!(
287                        "vector count {count} exceeds maximum {}",
288                        MAX_FIELD_LEN / 4
289                    )));
290                }
291                let mut arr = Vec::with_capacity(count);
292                for _ in 0..count {
293                    let fb = read_slice(data, &mut cursor, 4, "vector f32")?;
294                    let f = f32::from_le_bytes(fb.try_into().map_err(|_| {
295                        crate::error::ColumnarError::Serialization("truncated f32".into())
296                    })?);
297                    arr.push(Value::Float(f as f64));
298                }
299                Value::Array(arr)
300            }
301            10 => {
302                let json_bytes = read_length_prefixed(data, &mut cursor, "json")?;
303                sonic_rs::from_slice(json_bytes).unwrap_or(Value::Null)
304            }
305            _ => {
306                return Err(crate::error::ColumnarError::Serialization(format!(
307                    "unknown WAL value tag: {tag}"
308                )));
309            }
310        };
311
312        values.push(value);
313    }
314
315    Ok(values)
316}
317
318#[cfg(test)]
319mod tests {
320    use nodedb_types::datetime::NdbDateTime;
321    use nodedb_types::value::Value;
322
323    use super::*;
324
325    #[test]
326    fn wal_record_roundtrip() {
327        let records = vec![
328            ColumnarWalRecord::InsertRow {
329                collection: "test".into(),
330                row_data: vec![1, 2, 3],
331            },
332            ColumnarWalRecord::DeleteRows {
333                collection: "test".into(),
334                segment_id: 0,
335                row_indices: vec![5, 10, 15],
336            },
337            ColumnarWalRecord::CompactionCommit {
338                collection: "test".into(),
339                old_segment_ids: vec![0, 1],
340                new_segment_ids: vec![2],
341            },
342            ColumnarWalRecord::MemtableFlushed {
343                collection: "test".into(),
344                segment_id: 3,
345                row_count: 1024,
346            },
347        ];
348
349        for record in &records {
350            let bytes = record.to_bytes().expect("serialize");
351            let restored = ColumnarWalRecord::from_bytes(&bytes).expect("deserialize");
352            assert_eq!(restored.collection(), record.collection());
353        }
354    }
355
356    #[test]
357    fn row_wire_format_roundtrip() {
358        let values = vec![
359            Value::Integer(42),
360            Value::Float(0.75),
361            Value::Bool(true),
362            Value::String("hello".into()),
363            Value::Bytes(vec![0xDE, 0xAD]),
364            Value::DateTime(NdbDateTime::from_micros(1_700_000_000)),
365            Value::Decimal(rust_decimal::Decimal::new(314, 2)),
366            Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
367            Value::Null,
368            Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
369        ];
370
371        let encoded = encode_row_for_wal(&values).expect("encode");
372        let decoded = decode_row_from_wal(&encoded).expect("decode");
373
374        assert_eq!(decoded.len(), values.len());
375        assert_eq!(decoded[0], Value::Integer(42));
376        assert_eq!(decoded[1], Value::Float(0.75));
377        assert_eq!(decoded[2], Value::Bool(true));
378        assert_eq!(decoded[3], Value::String("hello".into()));
379        assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD]));
380        assert_eq!(
381            decoded[5],
382            Value::DateTime(NdbDateTime::from_micros(1_700_000_000))
383        );
384        assert_eq!(
385            decoded[7],
386            Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into())
387        );
388        assert_eq!(decoded[8], Value::Null);
389    }
390
391    #[test]
392    fn decode_truncated_i64_returns_error() {
393        // Tag 1 (i64) requires 8 payload bytes; supply none.
394        // Today the slice index `data[cursor..cursor+8]` panics with an index
395        // out-of-bounds. After the fix, `try_into()` returns the
396        // Serialization error instead.
397        let result = decode_row_from_wal(&[1]);
398        assert!(
399            result.is_err(),
400            "truncated i64 payload must return Err, not panic"
401        );
402    }
403
404    #[test]
405    fn decode_truncated_string_returns_error() {
406        // Tag 4 (string): length prefix says 255 bytes but the slice ends
407        // immediately after the 4-byte length field. The read of
408        // `data[cursor..cursor+255]` panics today; after the fix it errors.
409        let input = {
410            let mut v = vec![4u8]; // tag = string
411            v.extend_from_slice(&255u32.to_le_bytes()); // len = 255
412            // no payload bytes follow
413            v
414        };
415        let result = decode_row_from_wal(&input);
416        assert!(
417            result.is_err(),
418            "truncated string payload must return Err, not panic"
419        );
420    }
421
422    #[test]
423    fn decode_huge_vector_count_returns_error() {
424        // Tag 9 (vector array): count = 0x7FFFFFFF. After reading the count,
425        // the very first iteration tries to read 4 bytes of f32 from an empty
426        // slice, which panics today. After the fix the loop errors out cleanly
427        // before any allocation proportional to count is attempted.
428        let input = {
429            let mut v = vec![9u8]; // tag = vector array
430            v.extend_from_slice(&0x7FFF_FFFFu32.to_le_bytes()); // count
431            // no f32 bytes follow
432            v
433        };
434        let result = decode_row_from_wal(&input);
435        assert!(
436            result.is_err(),
437            "huge vector count with no payload must return Err, not panic or OOM"
438        );
439    }
440
441    #[test]
442    fn decode_truncated_decimal_returns_error() {
443        // Tag 7 (Decimal) requires 16 bytes; supply only 4.
444        // `data[cursor..cursor+16]` panics today; after the fix it errors.
445        let input = {
446            let mut v = vec![7u8]; // tag = decimal
447            v.extend_from_slice(&[0u8; 4]); // only 4 bytes, need 16
448            v
449        };
450        let result = decode_row_from_wal(&input);
451        assert!(
452            result.is_err(),
453            "truncated decimal payload must return Err, not panic"
454        );
455    }
456}