nodedb_columnar/
wal_record.rs1use serde::{Deserialize, Serialize};
11use sonic_rs;
12use zerompk::{FromMessagePack, ToMessagePack};
13
14#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
16pub enum ColumnarWalRecord {
17 InsertRow {
23 collection: String,
24 row_data: Vec<u8>,
28 },
29
30 DeleteRows {
35 collection: String,
36 segment_id: u32,
37 row_indices: Vec<u32>,
38 },
39
40 CompactionCommit {
48 collection: String,
49 old_segment_ids: Vec<u32>,
50 new_segment_ids: Vec<u32>,
51 },
52
53 MemtableFlushed {
59 collection: String,
60 segment_id: u32,
61 row_count: u64,
62 },
63}
64
65impl ColumnarWalRecord {
66 pub fn collection(&self) -> &str {
68 match self {
69 Self::InsertRow { collection, .. }
70 | Self::DeleteRows { collection, .. }
71 | Self::CompactionCommit { collection, .. }
72 | Self::MemtableFlushed { collection, .. } => collection,
73 }
74 }
75
76 pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
78 zerompk::to_msgpack_vec(self)
79 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
80 }
81
82 pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
84 zerompk::from_msgpack(data)
85 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
86 }
87}
88
89pub fn encode_row_for_wal(values: &[nodedb_types::value::Value]) -> Vec<u8> {
95 use nodedb_types::value::Value;
96
97 let mut buf = Vec::with_capacity(values.len() * 10); for value in values {
100 match value {
101 Value::Null => buf.push(0),
102 Value::Integer(v) => {
103 buf.push(1);
104 buf.extend_from_slice(&v.to_le_bytes());
105 }
106 Value::Float(v) => {
107 buf.push(2);
108 buf.extend_from_slice(&v.to_le_bytes());
109 }
110 Value::Bool(v) => {
111 buf.push(3);
112 buf.push(*v as u8);
113 }
114 Value::String(s) => {
115 buf.push(4);
116 let bytes = s.as_bytes();
117 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
118 buf.extend_from_slice(bytes);
119 }
120 Value::Bytes(b) => {
121 buf.push(5);
122 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
123 buf.extend_from_slice(b);
124 }
125 Value::DateTime(dt) => {
126 buf.push(6);
127 buf.extend_from_slice(&dt.micros.to_le_bytes());
128 }
129 Value::Decimal(d) => {
130 buf.push(7);
131 buf.extend_from_slice(&d.serialize());
132 }
133 Value::Uuid(s) => {
134 buf.push(8);
135 let bytes = s.as_bytes();
136 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
137 buf.extend_from_slice(bytes);
138 }
139 Value::Array(arr) => {
140 buf.push(9);
142 buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
143 for v in arr {
144 let f = match v {
145 Value::Float(f) => *f as f32,
146 Value::Integer(n) => *n as f32,
147 _ => 0.0,
148 };
149 buf.extend_from_slice(&f.to_le_bytes());
150 }
151 }
152 _ => {
153 buf.push(10);
155 let json = sonic_rs::to_vec(value).unwrap_or_default();
156 buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
157 buf.extend_from_slice(&json);
158 }
159 }
160 }
161
162 buf
163}
164
165pub fn decode_row_from_wal(
167 data: &[u8],
168) -> Result<Vec<nodedb_types::value::Value>, crate::error::ColumnarError> {
169 use nodedb_types::value::Value;
170
171 let mut values = Vec::new();
172 let mut cursor = 0;
173
174 while cursor < data.len() {
175 let tag = data[cursor];
176 cursor += 1;
177
178 let value = match tag {
179 0 => Value::Null,
180 1 => {
181 let v = i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
182 crate::error::ColumnarError::Serialization("truncated i64".into())
183 })?);
184 cursor += 8;
185 Value::Integer(v)
186 }
187 2 => {
188 let v = f64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
189 crate::error::ColumnarError::Serialization("truncated f64".into())
190 })?);
191 cursor += 8;
192 Value::Float(v)
193 }
194 3 => {
195 let v = data[cursor] != 0;
196 cursor += 1;
197 Value::Bool(v)
198 }
199 4 | 5 | 8 => {
200 let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
201 crate::error::ColumnarError::Serialization("truncated len".into())
202 })?) as usize;
203 cursor += 4;
204 let bytes = &data[cursor..cursor + len];
205 cursor += len;
206 match tag {
207 4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
208 5 => Value::Bytes(bytes.to_vec()),
209 8 => Value::Uuid(String::from_utf8_lossy(bytes).into_owned()),
210 _ => unreachable!(),
211 }
212 }
213 6 => {
214 let micros =
215 i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
216 crate::error::ColumnarError::Serialization("truncated timestamp".into())
217 })?);
218 cursor += 8;
219 Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
220 }
221 7 => {
222 let mut bytes = [0u8; 16];
223 bytes.copy_from_slice(&data[cursor..cursor + 16]);
224 cursor += 16;
225 Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
226 }
227 9 => {
228 let count =
229 u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
230 crate::error::ColumnarError::Serialization("truncated vector count".into())
231 })?) as usize;
232 cursor += 4;
233 let mut arr = Vec::with_capacity(count);
234 for _ in 0..count {
235 let f =
236 f32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
237 crate::error::ColumnarError::Serialization("truncated f32".into())
238 })?);
239 cursor += 4;
240 arr.push(Value::Float(f as f64));
241 }
242 Value::Array(arr)
243 }
244 10 => {
245 let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
246 crate::error::ColumnarError::Serialization("truncated json len".into())
247 })?) as usize;
248 cursor += 4;
249 let json_bytes = &data[cursor..cursor + len];
250 cursor += len;
251 sonic_rs::from_slice(json_bytes).unwrap_or(Value::Null)
252 }
253 _ => {
254 return Err(crate::error::ColumnarError::Serialization(format!(
255 "unknown WAL value tag: {tag}"
256 )));
257 }
258 };
259
260 values.push(value);
261 }
262
263 Ok(values)
264}
265
266#[cfg(test)]
267mod tests {
268 use nodedb_types::datetime::NdbDateTime;
269 use nodedb_types::value::Value;
270
271 use super::*;
272
273 #[test]
274 fn wal_record_roundtrip() {
275 let records = vec![
276 ColumnarWalRecord::InsertRow {
277 collection: "test".into(),
278 row_data: vec![1, 2, 3],
279 },
280 ColumnarWalRecord::DeleteRows {
281 collection: "test".into(),
282 segment_id: 0,
283 row_indices: vec![5, 10, 15],
284 },
285 ColumnarWalRecord::CompactionCommit {
286 collection: "test".into(),
287 old_segment_ids: vec![0, 1],
288 new_segment_ids: vec![2],
289 },
290 ColumnarWalRecord::MemtableFlushed {
291 collection: "test".into(),
292 segment_id: 3,
293 row_count: 1024,
294 },
295 ];
296
297 for record in &records {
298 let bytes = record.to_bytes().expect("serialize");
299 let restored = ColumnarWalRecord::from_bytes(&bytes).expect("deserialize");
300 assert_eq!(restored.collection(), record.collection());
301 }
302 }
303
304 #[test]
305 fn row_wire_format_roundtrip() {
306 let values = vec![
307 Value::Integer(42),
308 Value::Float(0.75),
309 Value::Bool(true),
310 Value::String("hello".into()),
311 Value::Bytes(vec![0xDE, 0xAD]),
312 Value::DateTime(NdbDateTime::from_micros(1_700_000_000)),
313 Value::Decimal(rust_decimal::Decimal::new(314, 2)),
314 Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
315 Value::Null,
316 Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
317 ];
318
319 let encoded = encode_row_for_wal(&values);
320 let decoded = decode_row_from_wal(&encoded).expect("decode");
321
322 assert_eq!(decoded.len(), values.len());
323 assert_eq!(decoded[0], Value::Integer(42));
324 assert_eq!(decoded[1], Value::Float(0.75));
325 assert_eq!(decoded[2], Value::Bool(true));
326 assert_eq!(decoded[3], Value::String("hello".into()));
327 assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD]));
328 assert_eq!(
329 decoded[5],
330 Value::DateTime(NdbDateTime::from_micros(1_700_000_000))
331 );
332 assert_eq!(
333 decoded[7],
334 Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into())
335 );
336 assert_eq!(decoded[8], Value::Null);
337 }
338}