use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ColumnarWalRecord {
InsertRow {
collection: String,
row_data: Vec<u8>,
},
DeleteRows {
collection: String,
segment_id: u32,
row_indices: Vec<u32>,
},
CompactionCommit {
collection: String,
old_segment_ids: Vec<u32>,
new_segment_ids: Vec<u32>,
},
MemtableFlushed {
collection: String,
segment_id: u32,
row_count: u64,
},
}
impl ColumnarWalRecord {
pub fn collection(&self) -> &str {
match self {
Self::InsertRow { collection, .. }
| Self::DeleteRows { collection, .. }
| Self::CompactionCommit { collection, .. }
| Self::MemtableFlushed { collection, .. } => collection,
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
rmp_serde::to_vec_named(self)
.map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
}
pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
rmp_serde::from_slice(data)
.map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
}
}
pub fn encode_row_for_wal(values: &[nodedb_types::value::Value]) -> Vec<u8> {
use nodedb_types::value::Value;
let mut buf = Vec::with_capacity(values.len() * 10);
for value in values {
match value {
Value::Null => buf.push(0),
Value::Integer(v) => {
buf.push(1);
buf.extend_from_slice(&v.to_le_bytes());
}
Value::Float(v) => {
buf.push(2);
buf.extend_from_slice(&v.to_le_bytes());
}
Value::Bool(v) => {
buf.push(3);
buf.push(*v as u8);
}
Value::String(s) => {
buf.push(4);
let bytes = s.as_bytes();
buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(bytes);
}
Value::Bytes(b) => {
buf.push(5);
buf.extend_from_slice(&(b.len() as u32).to_le_bytes());
buf.extend_from_slice(b);
}
Value::DateTime(dt) => {
buf.push(6);
buf.extend_from_slice(&dt.micros.to_le_bytes());
}
Value::Decimal(d) => {
buf.push(7);
buf.extend_from_slice(&d.serialize());
}
Value::Uuid(s) => {
buf.push(8);
let bytes = s.as_bytes();
buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(bytes);
}
Value::Array(arr) => {
buf.push(9);
buf.extend_from_slice(&(arr.len() as u32).to_le_bytes());
for v in arr {
let f = match v {
Value::Float(f) => *f as f32,
Value::Integer(n) => *n as f32,
_ => 0.0,
};
buf.extend_from_slice(&f.to_le_bytes());
}
}
_ => {
buf.push(10);
let json = serde_json::to_vec(value).unwrap_or_default();
buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
buf.extend_from_slice(&json);
}
}
}
buf
}
pub fn decode_row_from_wal(
data: &[u8],
) -> Result<Vec<nodedb_types::value::Value>, crate::error::ColumnarError> {
use nodedb_types::value::Value;
let mut values = Vec::new();
let mut cursor = 0;
while cursor < data.len() {
let tag = data[cursor];
cursor += 1;
let value = match tag {
0 => Value::Null,
1 => {
let v = i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated i64".into())
})?);
cursor += 8;
Value::Integer(v)
}
2 => {
let v = f64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated f64".into())
})?);
cursor += 8;
Value::Float(v)
}
3 => {
let v = data[cursor] != 0;
cursor += 1;
Value::Bool(v)
}
4 | 5 | 8 => {
let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated len".into())
})?) as usize;
cursor += 4;
let bytes = &data[cursor..cursor + len];
cursor += len;
match tag {
4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
5 => Value::Bytes(bytes.to_vec()),
8 => Value::Uuid(String::from_utf8_lossy(bytes).into_owned()),
_ => unreachable!(),
}
}
6 => {
let micros =
i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated timestamp".into())
})?);
cursor += 8;
Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
}
7 => {
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&data[cursor..cursor + 16]);
cursor += 16;
Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
}
9 => {
let count =
u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated vector count".into())
})?) as usize;
cursor += 4;
let mut arr = Vec::with_capacity(count);
for _ in 0..count {
let f =
f32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated f32".into())
})?);
cursor += 4;
arr.push(Value::Float(f as f64));
}
Value::Array(arr)
}
10 => {
let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated json len".into())
})?) as usize;
cursor += 4;
let json_bytes = &data[cursor..cursor + len];
cursor += len;
serde_json::from_slice(json_bytes).unwrap_or(Value::Null)
}
_ => {
return Err(crate::error::ColumnarError::Serialization(format!(
"unknown WAL value tag: {tag}"
)));
}
};
values.push(value);
}
Ok(values)
}
#[cfg(test)]
mod tests {
use nodedb_types::datetime::NdbDateTime;
use nodedb_types::value::Value;
use super::*;
#[test]
fn wal_record_roundtrip() {
let records = vec![
ColumnarWalRecord::InsertRow {
collection: "test".into(),
row_data: vec![1, 2, 3],
},
ColumnarWalRecord::DeleteRows {
collection: "test".into(),
segment_id: 0,
row_indices: vec![5, 10, 15],
},
ColumnarWalRecord::CompactionCommit {
collection: "test".into(),
old_segment_ids: vec![0, 1],
new_segment_ids: vec![2],
},
ColumnarWalRecord::MemtableFlushed {
collection: "test".into(),
segment_id: 3,
row_count: 1024,
},
];
for record in &records {
let bytes = record.to_bytes().expect("serialize");
let restored = ColumnarWalRecord::from_bytes(&bytes).expect("deserialize");
assert_eq!(restored.collection(), record.collection());
}
}
#[test]
fn row_wire_format_roundtrip() {
let values = vec![
Value::Integer(42),
Value::Float(0.75),
Value::Bool(true),
Value::String("hello".into()),
Value::Bytes(vec![0xDE, 0xAD]),
Value::DateTime(NdbDateTime::from_micros(1_700_000_000)),
Value::Decimal(rust_decimal::Decimal::new(314, 2)),
Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
Value::Null,
Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
];
let encoded = encode_row_for_wal(&values);
let decoded = decode_row_from_wal(&encoded).expect("decode");
assert_eq!(decoded.len(), values.len());
assert_eq!(decoded[0], Value::Integer(42));
assert_eq!(decoded[1], Value::Float(0.75));
assert_eq!(decoded[2], Value::Bool(true));
assert_eq!(decoded[3], Value::String("hello".into()));
assert_eq!(decoded[4], Value::Bytes(vec![0xDE, 0xAD]));
assert_eq!(
decoded[5],
Value::DateTime(NdbDateTime::from_micros(1_700_000_000))
);
assert_eq!(
decoded[7],
Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into())
);
assert_eq!(decoded[8], Value::Null);
}
}