1use serde::{Deserialize, Serialize};
11use sonic_rs;
12use zerompk::{FromMessagePack, ToMessagePack};
13
14#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
16pub enum ColumnarWalRecord {
17 InsertRow {
23 collection: String,
24 row_data: Vec<u8>,
28 },
29
30 DeleteRows {
35 collection: String,
36 segment_id: u32,
37 row_indices: Vec<u32>,
38 },
39
40 CompactionCommit {
48 collection: String,
49 old_segment_ids: Vec<u32>,
50 new_segment_ids: Vec<u32>,
51 },
52
53 MemtableFlushed {
59 collection: String,
60 segment_id: u32,
61 row_count: u64,
62 },
63}
64
65impl ColumnarWalRecord {
66 pub fn collection(&self) -> &str {
68 match self {
69 Self::InsertRow { collection, .. }
70 | Self::DeleteRows { collection, .. }
71 | Self::CompactionCommit { collection, .. }
72 | Self::MemtableFlushed { collection, .. } => collection,
73 }
74 }
75
76 pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
78 zerompk::to_msgpack_vec(self)
79 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
80 }
81
82 pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
84 zerompk::from_msgpack(data)
85 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
86 }
87}
88
89pub fn encode_row_for_wal(
95 values: &[nodedb_types::value::Value],
96) -> Result<Vec<u8>, crate::error::ColumnarError> {
97 use nodedb_types::value::Value;
98
99 let mut buf = Vec::with_capacity(values.len() * 10); for value in values {
102 match value {
103 Value::Null => buf.push(0),
104 Value::Integer(v) => {
105 buf.push(1);
106 buf.extend_from_slice(&v.to_le_bytes());
107 }
108 Value::Float(v) => {
109 buf.push(2);
110 buf.extend_from_slice(&v.to_le_bytes());
111 }
112 Value::Bool(v) => {
113 buf.push(3);
114 buf.push(*v as u8);
115 }
116 Value::String(s) => {
117 buf.push(4);
118 let bytes = s.as_bytes();
119 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
120 buf.extend_from_slice(bytes);
121 }
122 Value::Bytes(b) => {
123 buf.push(5);
124 buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
125 buf.extend_from_slice(b);
126 }
127 Value::DateTime(dt) => {
128 buf.push(6);
129 buf.extend_from_slice(&dt.micros.to_le_bytes());
130 }
131 Value::Decimal(d) => {
132 buf.push(7);
133 buf.extend_from_slice(&d.serialize());
134 }
135 Value::Uuid(s) => {
136 buf.push(8);
137 let bytes = s.as_bytes();
138 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
139 buf.extend_from_slice(bytes);
140 }
141 Value::Array(arr) => {
142 buf.push(9);
144 buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
145 for v in arr {
146 let f = match v {
147 Value::Float(f) => *f as f32,
148 Value::Integer(n) => *n as f32,
149 _ => 0.0,
150 };
151 buf.extend_from_slice(&f.to_le_bytes());
152 }
153 }
154 _ => {
155 buf.push(10);
157 let json = sonic_rs::to_vec(value).map_err(|e| {
158 crate::error::ColumnarError::Serialization(format!(
159 "failed to serialize value as JSON: {e}"
160 ))
161 })?;
162 buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
163 buf.extend_from_slice(&json);
164 }
165 }
166 }
167
168 Ok(buf)
169}
170
171const MAX_FIELD_LEN: usize = 256 * 1024 * 1024;
174
175fn read_slice<'a>(
178 data: &'a [u8],
179 cursor: &mut usize,
180 n: usize,
181 context: &str,
182) -> Result<&'a [u8], crate::error::ColumnarError> {
183 let end = cursor.checked_add(n).ok_or_else(|| {
184 crate::error::ColumnarError::Serialization(format!("overflow in {context}"))
185 })?;
186 if end > data.len() {
187 return Err(crate::error::ColumnarError::Serialization(format!(
188 "truncated {context}: need {n} bytes at offset {cursor}, have {}",
189 data.len().saturating_sub(*cursor)
190 )));
191 }
192 let slice = &data[*cursor..end];
193 *cursor = end;
194 Ok(slice)
195}
196
197fn read_length_prefixed<'a>(
200 data: &'a [u8],
201 cursor: &mut usize,
202 context: &str,
203) -> Result<&'a [u8], crate::error::ColumnarError> {
204 let len_bytes = read_slice(data, cursor, 4, context)?;
205 let len = u32::from_le_bytes(len_bytes.try_into().map_err(|_| {
206 crate::error::ColumnarError::Serialization(format!("truncated {context} len"))
207 })?) as usize;
208 if len > MAX_FIELD_LEN {
209 return Err(crate::error::ColumnarError::Serialization(format!(
210 "{context} length {len} exceeds maximum {MAX_FIELD_LEN}"
211 )));
212 }
213 read_slice(data, cursor, len, context)
214}
215
216pub fn decode_row_from_wal(
218 data: &[u8],
219) -> Result<Vec<nodedb_types::value::Value>, crate::error::ColumnarError> {
220 use nodedb_types::value::Value;
221
222 let mut values = Vec::new();
223 let mut cursor = 0;
224
225 while cursor < data.len() {
226 let tag_slice = read_slice(data, &mut cursor, 1, "tag")?;
227 let tag = tag_slice[0];
228
229 let value = match tag {
230 0 => Value::Null,
231 1 => {
232 let bytes = read_slice(data, &mut cursor, 8, "i64")?;
233 let v = i64::from_le_bytes(bytes.try_into().map_err(|_| {
234 crate::error::ColumnarError::Serialization("truncated i64".into())
235 })?);
236 Value::Integer(v)
237 }
238 2 => {
239 let bytes = read_slice(data, &mut cursor, 8, "f64")?;
240 let v = f64::from_le_bytes(bytes.try_into().map_err(|_| {
241 crate::error::ColumnarError::Serialization("truncated f64".into())
242 })?);
243 Value::Float(v)
244 }
245 3 => {
246 let bytes = read_slice(data, &mut cursor, 1, "bool")?;
247 Value::Bool(bytes[0] != 0)
248 }
249 4 | 5 | 8 => {
250 let bytes = read_length_prefixed(
251 data,
252 &mut cursor,
253 match tag {
254 4 => "string",
255 5 => "bytes",
256 8 => "uuid",
257 _ => unreachable!(),
258 },
259 )?;
260 match tag {
261 4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
262 5 => Value::Bytes(bytes.to_vec()),
263 8 => Value::Uuid(String::from_utf8_lossy(bytes).into_owned()),
264 _ => unreachable!(),
265 }
266 }
267 6 => {
268 let bytes = read_slice(data, &mut cursor, 8, "timestamp")?;
269 let micros = i64::from_le_bytes(bytes.try_into().map_err(|_| {
270 crate::error::ColumnarError::Serialization("truncated timestamp".into())
271 })?);
272 Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
273 }
274 7 => {
275 let bytes = read_slice(data, &mut cursor, 16, "decimal")?;
276 let mut arr = [0u8; 16];
277 arr.copy_from_slice(bytes);
278 Value::Decimal(rust_decimal::Decimal::deserialize(arr))
279 }
280 9 => {
281 let count_bytes = read_slice(data, &mut cursor, 4, "vector count")?;
282 let count = u32::from_le_bytes(count_bytes.try_into().map_err(|_| {
283 crate::error::ColumnarError::Serialization("truncated vector count".into())
284 })?) as usize;
285 if count > MAX_FIELD_LEN / 4 {
286 return Err(crate::error::ColumnarError::Serialization(format!(
287 "vector count {count} exceeds maximum {}",
288 MAX_FIELD_LEN / 4
289 )));
290 }
291 let mut arr = Vec::with_capacity(count);
292 for _ in 0..count {
293 let fb = read_slice(data, &mut cursor, 4, "vector f32")?;
294 let f = f32::from_le_bytes(fb.try_into().map_err(|_| {
295 crate::error::ColumnarError::Serialization("truncated f32".into())
296 })?);
297 arr.push(Value::Float(f as f64));
298 }
299 Value::Array(arr)
300 }
301 10 => {
302 let json_bytes = read_length_prefixed(data, &mut cursor, "json")?;
303 sonic_rs::from_slice(json_bytes).unwrap_or(Value::Null)
304 }
305 _ => {
306 return Err(crate::error::ColumnarError::Serialization(format!(
307 "unknown WAL value tag: {tag}"
308 )));
309 }
310 };
311
312 values.push(value);
313 }
314
315 Ok(values)
316}
317
318#[cfg(test)]
319mod tests {
320 use nodedb_types::datetime::NdbDateTime;
321 use nodedb_types::value::Value;
322
323 use super::*;
324
325 #[test]
326 fn wal_record_roundtrip() {
327 let records = vec![
328 ColumnarWalRecord::InsertRow {
329 collection: "test".into(),
330 row_data: vec![1, 2, 3],
331 },
332 ColumnarWalRecord::DeleteRows {
333 collection: "test".into(),
334 segment_id: 0,
335 row_indices: vec![5, 10, 15],
336 },
337 ColumnarWalRecord::CompactionCommit {
338 collection: "test".into(),
339 old_segment_ids: vec![0, 1],
340 new_segment_ids: vec![2],
341 },
342 ColumnarWalRecord::MemtableFlushed {
343 collection: "test".into(),
344 segment_id: 3,
345 row_count: 1024,
346 },
347 ];
348
349 for record in &records {
350 let bytes = record.to_bytes().expect("serialize");
351 let restored = ColumnarWalRecord::from_bytes(&bytes).expect("deserialize");
352 assert_eq!(restored.collection(), record.collection());
353 }
354 }
355
356 #[test]
357 fn row_wire_format_roundtrip() {
358 let values = vec![
359 Value::Integer(42),
360 Value::Float(0.75),
361 Value::Bool(true),
362 Value::String("hello".into()),
363 Value::Bytes(vec![0xDE, 0xAD]),
364 Value::DateTime(NdbDateTime::from_micros(1_700_000_000)),
365 Value::Decimal(rust_decimal::Decimal::new(314, 2)),
366 Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
367 Value::Null,
368 Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
369 ];
370
371 let encoded = encode_row_for_wal(&values).expect("encode");
372 let decoded = decode_row_from_wal(&encoded).expect("decode");
373
374 assert_eq!(decoded.len(), values.len());
375 assert_eq!(decoded[0], Value::Integer(42));
376 assert_eq!(decoded[1], Value::Float(0.75));
377 assert_eq!(decoded[2], Value::Bool(true));
378 assert_eq!(decoded[3], Value::String("hello".into()));
379 assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD]));
380 assert_eq!(
381 decoded[5],
382 Value::DateTime(NdbDateTime::from_micros(1_700_000_000))
383 );
384 assert_eq!(
385 decoded[7],
386 Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into())
387 );
388 assert_eq!(decoded[8], Value::Null);
389 }
390
391 #[test]
392 fn decode_truncated_i64_returns_error() {
393 let result = decode_row_from_wal(&[1]);
398 assert!(
399 result.is_err(),
400 "truncated i64 payload must return Err, not panic"
401 );
402 }
403
404 #[test]
405 fn decode_truncated_string_returns_error() {
406 let input = {
410 let mut v = vec![4u8]; v.extend_from_slice(&255u32.to_le_bytes()); v
414 };
415 let result = decode_row_from_wal(&input);
416 assert!(
417 result.is_err(),
418 "truncated string payload must return Err, not panic"
419 );
420 }
421
422 #[test]
423 fn decode_huge_vector_count_returns_error() {
424 let input = {
429 let mut v = vec![9u8]; v.extend_from_slice(&0x7FFF_FFFFu32.to_le_bytes()); v
433 };
434 let result = decode_row_from_wal(&input);
435 assert!(
436 result.is_err(),
437 "huge vector count with no payload must return Err, not panic or OOM"
438 );
439 }
440
441 #[test]
442 fn decode_truncated_decimal_returns_error() {
443 let input = {
446 let mut v = vec![7u8]; v.extend_from_slice(&[0u8; 4]); v
449 };
450 let result = decode_row_from_wal(&input);
451 assert!(
452 result.is_err(),
453 "truncated decimal payload must return Err, not panic"
454 );
455 }
456}