Skip to main content

nodedb_columnar/
wal_record.rs

1//! WAL record types for columnar operations.
2//!
3//! Each mutation (INSERT, DELETE, compaction commit) produces a WAL record
4//! that is written before the mutation is applied. On crash recovery, WAL
5//! records are replayed to reconstruct the memtable, delete bitmaps, and
6//! segment metadata.
7//!
8//! Records are serialized as MessagePack for compact wire representation.
9
10use serde::{Deserialize, Serialize};
11use sonic_rs;
12use zerompk::{FromMessagePack, ToMessagePack};
13
14/// A WAL record for a columnar collection operation.
15#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
16pub enum ColumnarWalRecord {
17    /// A row was inserted into the memtable.
18    ///
19    /// Contains the collection name and the row data as packed binary
20    /// (the columnar wire format, not MessagePack). On replay, the row
21    /// is re-inserted into the memtable.
22    InsertRow {
23        collection: String,
24        /// Row data as packed binary values. Each value is encoded per its
25        /// column type: i64 as 8 LE bytes, f64 as 8 LE bytes, strings as
26        /// length-prefixed UTF-8, etc.
27        row_data: Vec<u8>,
28    },
29
30    /// Rows were marked as deleted in a segment's delete bitmap.
31    ///
32    /// On replay, these row indices are re-applied to the segment's
33    /// delete bitmap.
34    DeleteRows {
35        collection: String,
36        segment_id: u32,
37        row_indices: Vec<u32>,
38    },
39
40    /// A compaction was committed: old segments replaced with new ones.
41    ///
42    /// This is the atomic commit point of the 3-phase compaction protocol.
43    /// On replay:
44    /// - If new segments exist on disk: complete the metadata swap.
45    /// - If new segments don't exist: the compaction was interrupted before
46    ///   writing; discard and treat old segments as authoritative.
47    CompactionCommit {
48        collection: String,
49        old_segment_ids: Vec<u32>,
50        new_segment_ids: Vec<u32>,
51    },
52
53    /// The memtable was flushed to a new segment.
54    ///
55    /// On replay, if the segment file exists, update metadata to include it.
56    /// If it doesn't exist, the flush was interrupted; rows are already in
57    /// the memtable via InsertRow records.
58    MemtableFlushed {
59        collection: String,
60        segment_id: u32,
61        row_count: u64,
62    },
63}
64
65impl ColumnarWalRecord {
66    /// Collection name this record belongs to.
67    pub fn collection(&self) -> &str {
68        match self {
69            Self::InsertRow { collection, .. }
70            | Self::DeleteRows { collection, .. }
71            | Self::CompactionCommit { collection, .. }
72            | Self::MemtableFlushed { collection, .. } => collection,
73        }
74    }
75
76    /// Serialize the record to bytes.
77    pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
78        zerompk::to_msgpack_vec(self)
79            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
80    }
81
82    /// Deserialize a record from bytes.
83    pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
84        zerompk::from_msgpack(data)
85            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
86    }
87}
88
89/// Encode a row of values into the columnar wire format for WAL records.
90///
91/// Each value is written as: [type_tag: u8][value_bytes].
92/// This is more compact than MessagePack for typed columns and enables
93/// direct replay into the memtable without schema interpretation overhead.
94pub fn encode_row_for_wal(values: &[nodedb_types::value::Value]) -> Vec<u8> {
95    use nodedb_types::value::Value;
96
97    let mut buf = Vec::with_capacity(values.len() * 10); // Rough estimate.
98
99    for value in values {
100        match value {
101            Value::Null => buf.push(0),
102            Value::Integer(v) => {
103                buf.push(1);
104                buf.extend_from_slice(&v.to_le_bytes());
105            }
106            Value::Float(v) => {
107                buf.push(2);
108                buf.extend_from_slice(&v.to_le_bytes());
109            }
110            Value::Bool(v) => {
111                buf.push(3);
112                buf.push(*v as u8);
113            }
114            Value::String(s) => {
115                buf.push(4);
116                let bytes = s.as_bytes();
117                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
118                buf.extend_from_slice(bytes);
119            }
120            Value::Bytes(b) => {
121                buf.push(5);
122                buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
123                buf.extend_from_slice(b);
124            }
125            Value::DateTime(dt) => {
126                buf.push(6);
127                buf.extend_from_slice(&dt.micros.to_le_bytes());
128            }
129            Value::Decimal(d) => {
130                buf.push(7);
131                buf.extend_from_slice(&d.serialize());
132            }
133            Value::Uuid(s) => {
134                buf.push(8);
135                let bytes = s.as_bytes();
136                buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
137                buf.extend_from_slice(bytes);
138            }
139            Value::Array(arr) => {
140                // Vectors stored as: tag(9) + count(u32) + f32 values.
141                buf.push(9);
142                buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
143                for v in arr {
144                    let f = match v {
145                        Value::Float(f) => *f as f32,
146                        Value::Integer(n) => *n as f32,
147                        _ => 0.0,
148                    };
149                    buf.extend_from_slice(&f.to_le_bytes());
150                }
151            }
152            _ => {
153                // Geometry and other complex types: serialize as JSON bytes.
154                buf.push(10);
155                let json = sonic_rs::to_vec(value).unwrap_or_default();
156                buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
157                buf.extend_from_slice(&json);
158            }
159        }
160    }
161
162    buf
163}
164
165/// Decode a row from the columnar wire format back into Values.
166pub fn decode_row_from_wal(
167    data: &[u8],
168) -> Result<Vec<nodedb_types::value::Value>, crate::error::ColumnarError> {
169    use nodedb_types::value::Value;
170
171    let mut values = Vec::new();
172    let mut cursor = 0;
173
174    while cursor < data.len() {
175        let tag = data[cursor];
176        cursor += 1;
177
178        let value = match tag {
179            0 => Value::Null,
180            1 => {
181                let v = i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
182                    crate::error::ColumnarError::Serialization("truncated i64".into())
183                })?);
184                cursor += 8;
185                Value::Integer(v)
186            }
187            2 => {
188                let v = f64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
189                    crate::error::ColumnarError::Serialization("truncated f64".into())
190                })?);
191                cursor += 8;
192                Value::Float(v)
193            }
194            3 => {
195                let v = data[cursor] != 0;
196                cursor += 1;
197                Value::Bool(v)
198            }
199            4 | 5 | 8 => {
200                let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
201                    crate::error::ColumnarError::Serialization("truncated len".into())
202                })?) as usize;
203                cursor += 4;
204                let bytes = &data[cursor..cursor + len];
205                cursor += len;
206                match tag {
207                    4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
208                    5 => Value::Bytes(bytes.to_vec()),
209                    8 => Value::Uuid(String::from_utf8_lossy(bytes).into_owned()),
210                    _ => unreachable!(),
211                }
212            }
213            6 => {
214                let micros =
215                    i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
216                        crate::error::ColumnarError::Serialization("truncated timestamp".into())
217                    })?);
218                cursor += 8;
219                Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
220            }
221            7 => {
222                let mut bytes = [0u8; 16];
223                bytes.copy_from_slice(&data[cursor..cursor + 16]);
224                cursor += 16;
225                Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
226            }
227            9 => {
228                let count =
229                    u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
230                        crate::error::ColumnarError::Serialization("truncated vector count".into())
231                    })?) as usize;
232                cursor += 4;
233                let mut arr = Vec::with_capacity(count);
234                for _ in 0..count {
235                    let f =
236                        f32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
237                            crate::error::ColumnarError::Serialization("truncated f32".into())
238                        })?);
239                    cursor += 4;
240                    arr.push(Value::Float(f as f64));
241                }
242                Value::Array(arr)
243            }
244            10 => {
245                let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
246                    crate::error::ColumnarError::Serialization("truncated json len".into())
247                })?) as usize;
248                cursor += 4;
249                let json_bytes = &data[cursor..cursor + len];
250                cursor += len;
251                sonic_rs::from_slice(json_bytes).unwrap_or(Value::Null)
252            }
253            _ => {
254                return Err(crate::error::ColumnarError::Serialization(format!(
255                    "unknown WAL value tag: {tag}"
256                )));
257            }
258        };
259
260        values.push(value);
261    }
262
263    Ok(values)
264}
265
266#[cfg(test)]
267mod tests {
268    use nodedb_types::datetime::NdbDateTime;
269    use nodedb_types::value::Value;
270
271    use super::*;
272
273    #[test]
274    fn wal_record_roundtrip() {
275        let records = vec![
276            ColumnarWalRecord::InsertRow {
277                collection: "test".into(),
278                row_data: vec![1, 2, 3],
279            },
280            ColumnarWalRecord::DeleteRows {
281                collection: "test".into(),
282                segment_id: 0,
283                row_indices: vec![5, 10, 15],
284            },
285            ColumnarWalRecord::CompactionCommit {
286                collection: "test".into(),
287                old_segment_ids: vec![0, 1],
288                new_segment_ids: vec![2],
289            },
290            ColumnarWalRecord::MemtableFlushed {
291                collection: "test".into(),
292                segment_id: 3,
293                row_count: 1024,
294            },
295        ];
296
297        for record in &records {
298            let bytes = record.to_bytes().expect("serialize");
299            let restored = ColumnarWalRecord::from_bytes(&bytes).expect("deserialize");
300            assert_eq!(restored.collection(), record.collection());
301        }
302    }
303
304    #[test]
305    fn row_wire_format_roundtrip() {
306        let values = vec![
307            Value::Integer(42),
308            Value::Float(0.75),
309            Value::Bool(true),
310            Value::String("hello".into()),
311            Value::Bytes(vec![0xDE, 0xAD]),
312            Value::DateTime(NdbDateTime::from_micros(1_700_000_000)),
313            Value::Decimal(rust_decimal::Decimal::new(314, 2)),
314            Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
315            Value::Null,
316            Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
317        ];
318
319        let encoded = encode_row_for_wal(&values);
320        let decoded = decode_row_from_wal(&encoded).expect("decode");
321
322        assert_eq!(decoded.len(), values.len());
323        assert_eq!(decoded[0], Value::Integer(42));
324        assert_eq!(decoded[1], Value::Float(0.75));
325        assert_eq!(decoded[2], Value::Bool(true));
326        assert_eq!(decoded[3], Value::String("hello".into()));
327        assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD]));
328        assert_eq!(
329            decoded[5],
330            Value::DateTime(NdbDateTime::from_micros(1_700_000_000))
331        );
332        assert_eq!(
333            decoded[7],
334            Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into())
335        );
336        assert_eq!(decoded[8], Value::Null);
337    }
338}