use crate::error::StorageError;
use crate::types::{Key, TypeName, Value};
use alloc::format;
use alloc::string::String;
use alloc::vec::Vec;
use core::cmp::Ordering;
use core::fmt;
#[derive(Debug, Clone, Default)]
pub struct CdcConfig {
pub enabled: bool,
pub retention_max_txns: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ChangeOp {
Insert = 0,
Update = 1,
Delete = 2,
Corrupted = 255,
}
impl ChangeOp {
fn from_u8(v: u8) -> Result<Self, StorageError> {
match v {
0 => Ok(Self::Insert),
1 => Ok(Self::Update),
2 => Ok(Self::Delete),
other => Err(StorageError::Corrupted(format!(
"invalid ChangeOp discriminant byte: {other}"
))),
}
}
}
pub(crate) struct CdcEvent {
pub table_name: String,
pub op: ChangeOp,
pub key: Vec<u8>,
pub new_value: Option<Vec<u8>>,
pub old_value: Option<Vec<u8>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct CdcKey {
pub transaction_id: u64,
pub sequence: u32,
}
impl CdcKey {
pub const SERIALIZED_SIZE: usize = 12;
pub fn new(transaction_id: u64, sequence: u32) -> Self {
Self {
transaction_id,
sequence,
}
}
#[allow(clippy::big_endian_bytes)]
pub(crate) fn to_be_bytes(self) -> [u8; Self::SERIALIZED_SIZE] {
let mut buf = [0u8; Self::SERIALIZED_SIZE];
buf[..8].copy_from_slice(&self.transaction_id.to_be_bytes());
buf[8..12].copy_from_slice(&self.sequence.to_be_bytes());
buf
}
#[allow(clippy::big_endian_bytes)]
pub(crate) fn from_be_bytes(data: &[u8]) -> Self {
debug_assert!(
data.len() >= Self::SERIALIZED_SIZE,
"CdcKey::from_be_bytes: truncated data ({} < {})",
data.len(),
Self::SERIALIZED_SIZE,
);
if data.len() < Self::SERIALIZED_SIZE {
return Self {
transaction_id: 0,
sequence: 0,
};
}
let transaction_id = u64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let sequence = u32::from_be_bytes([data[8], data[9], data[10], data[11]]);
Self {
transaction_id,
sequence,
}
}
}
impl PartialOrd for CdcKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for CdcKey {
fn cmp(&self, other: &Self) -> Ordering {
self.transaction_id
.cmp(&other.transaction_id)
.then(self.sequence.cmp(&other.sequence))
}
}
impl Value for CdcKey {
type SelfType<'a>
= CdcKey
where
Self: 'a;
type AsBytes<'a>
= [u8; CdcKey::SERIALIZED_SIZE]
where
Self: 'a;
fn fixed_width() -> Option<usize> {
Some(Self::SERIALIZED_SIZE)
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
Self::from_be_bytes(data)
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'b,
{
value.to_be_bytes()
}
fn type_name() -> TypeName {
TypeName::internal("redb::cdc::CdcKey")
}
}
impl Key for CdcKey {
fn compare(data1: &[u8], data2: &[u8]) -> Ordering {
let len = Self::SERIALIZED_SIZE.min(data1.len()).min(data2.len());
data1[..len]
.cmp(&data2[..len])
.then_with(|| data1.len().cmp(&data2.len()))
}
}
const NONE_SENTINEL: u32 = u32::MAX;
#[derive(Clone)]
pub(crate) struct CdcRecord {
pub op: ChangeOp,
pub table_name: String,
pub key: Vec<u8>,
pub new_value: Option<Vec<u8>>,
pub old_value: Option<Vec<u8>>,
}
impl fmt::Debug for CdcRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CdcRecord")
.field("op", &self.op)
.field("table_name", &self.table_name)
.field("key", &self.key)
.field("new_value", &self.new_value)
.field("old_value", &self.old_value)
.finish()
}
}
impl CdcRecord {
pub fn from_event(event: &CdcEvent) -> Result<Self, StorageError> {
if u16::try_from(event.table_name.len()).is_err() {
return Err(StorageError::Corrupted(format!(
"CDC table_name exceeds u16::MAX bytes ({})",
event.table_name.len()
)));
}
if event.key.len() >= NONE_SENTINEL as usize {
return Err(StorageError::Corrupted(format!(
"CDC key exceeds maximum serializable length ({})",
event.key.len()
)));
}
if event
.new_value
.as_ref()
.is_some_and(|v| v.len() >= NONE_SENTINEL as usize)
{
return Err(StorageError::Corrupted(format!(
"CDC new_value exceeds maximum serializable length ({})",
event.new_value.as_ref().unwrap().len()
)));
}
if event
.old_value
.as_ref()
.is_some_and(|v| v.len() >= NONE_SENTINEL as usize)
{
return Err(StorageError::Corrupted(format!(
"CDC old_value exceeds maximum serializable length ({})",
event.old_value.as_ref().unwrap().len()
)));
}
Ok(Self {
op: event.op,
table_name: event.table_name.clone(),
key: event.key.clone(),
new_value: event.new_value.clone(),
old_value: event.old_value.clone(),
})
}
pub(crate) fn serialized_size(&self) -> usize {
1 + 2 + self.table_name.len() + 4 + self.key.len() + 4 + self.new_value.as_ref().map_or(0, Vec::len) + 4 + self.old_value.as_ref().map_or(0, Vec::len) }
pub(crate) fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(self.serialized_size());
buf.push(self.op as u8);
let name_len = u16::try_from(self.table_name.len()).unwrap_or(u16::MAX);
buf.extend_from_slice(&name_len.to_le_bytes());
buf.extend_from_slice(&self.table_name.as_bytes()[..usize::from(name_len)]);
let key_len = u32::try_from(self.key.len()).unwrap_or(NONE_SENTINEL - 1);
buf.extend_from_slice(&key_len.to_le_bytes());
buf.extend_from_slice(&self.key[..key_len as usize]);
match &self.new_value {
Some(v) => {
let len = u32::try_from(v.len()).unwrap_or(NONE_SENTINEL - 1);
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&v[..len as usize]);
}
None => {
buf.extend_from_slice(&NONE_SENTINEL.to_le_bytes());
}
}
match &self.old_value {
Some(v) => {
let len = u32::try_from(v.len()).unwrap_or(NONE_SENTINEL - 1);
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&v[..len as usize]);
}
None => {
buf.extend_from_slice(&NONE_SENTINEL.to_le_bytes());
}
}
buf
}
pub(crate) fn deserialize(data: &[u8]) -> Result<Self, StorageError> {
let mut pos = 0;
if data.is_empty() {
return Err(StorageError::Corrupted("CDC record is empty".into()));
}
let op = ChangeOp::from_u8(data[pos])?;
pos += 1;
if pos + 2 > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at table name length".into(),
));
}
let name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
pos += 2;
if pos + usize::from(name_len) > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at table name".into(),
));
}
let table_name =
String::from_utf8_lossy(&data[pos..pos + usize::from(name_len)]).into_owned();
pos += usize::from(name_len);
if pos + 4 > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at key length".into(),
));
}
let key_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
pos += 4;
if pos + key_len as usize > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at key data".into(),
));
}
let key = data[pos..pos + key_len as usize].to_vec();
pos += key_len as usize;
if pos + 4 > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at new value length".into(),
));
}
let new_val_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
pos += 4;
let new_value = if new_val_len == NONE_SENTINEL {
None
} else {
if pos + new_val_len as usize > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at new value data".into(),
));
}
let v = data[pos..pos + new_val_len as usize].to_vec();
pos += new_val_len as usize;
Some(v)
};
if pos + 4 > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at old value length".into(),
));
}
let old_val_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
pos += 4;
let old_value = if old_val_len == NONE_SENTINEL {
None
} else {
if pos + old_val_len as usize > data.len() {
return Err(StorageError::Corrupted(
"CDC record truncated at old value data".into(),
));
}
let v = data[pos..pos + old_val_len as usize].to_vec();
let _ = pos + old_val_len as usize; Some(v)
};
Ok(Self {
op,
table_name,
key,
new_value,
old_value,
})
}
}
impl Value for CdcRecord {
type SelfType<'a>
= CdcRecord
where
Self: 'a;
type AsBytes<'a>
= Vec<u8>
where
Self: 'a;
fn fixed_width() -> Option<usize> {
None
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
match Self::deserialize(data) {
Ok(record) => record,
Err(_) => CdcRecord {
op: ChangeOp::Corrupted,
table_name: String::new(),
key: Vec::new(),
new_value: None,
old_value: None,
},
}
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'b,
{
value.serialize()
}
fn type_name() -> TypeName {
TypeName::internal("redb::cdc::CdcRecord")
}
}
#[derive(Debug, Clone)]
pub struct ChangeStream {
pub transaction_id: u64,
pub sequence: u32,
pub op: ChangeOp,
pub table_name: String,
pub key: Vec<u8>,
pub new_value: Option<Vec<u8>>,
pub old_value: Option<Vec<u8>>,
}
impl ChangeStream {
pub(crate) fn from_key_record(key: CdcKey, record: CdcRecord) -> Self {
Self {
transaction_id: key.transaction_id,
sequence: key.sequence,
op: record.op,
table_name: record.table_name,
key: record.key,
new_value: record.new_value,
old_value: record.old_value,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cdc_key_round_trip() {
let key = CdcKey::new(42, 7);
let bytes = key.to_be_bytes();
let decoded = CdcKey::from_be_bytes(&bytes);
assert_eq!(key, decoded);
}
#[test]
fn cdc_key_ordering() {
let a = CdcKey::new(1, 0);
let b = CdcKey::new(1, 1);
let c = CdcKey::new(2, 0);
assert!(a < b);
assert!(b < c);
let ab = a.to_be_bytes();
let bb = b.to_be_bytes();
let cb = c.to_be_bytes();
assert_eq!(CdcKey::compare(&ab, &bb), core::cmp::Ordering::Less);
assert_eq!(CdcKey::compare(&bb, &cb), core::cmp::Ordering::Less);
}
#[test]
fn cdc_key_be_ordering_across_byte_boundary() {
let small = CdcKey::new(1, 0);
let large = CdcKey::new(256, 0);
let sb = small.to_be_bytes();
let lb = large.to_be_bytes();
assert_eq!(CdcKey::compare(&sb, &lb), core::cmp::Ordering::Less);
assert!(sb < lb);
}
#[test]
fn cdc_record_round_trip_insert() {
let record = CdcRecord {
op: ChangeOp::Insert,
table_name: String::from("my_table"),
key: vec![1, 2, 3],
new_value: Some(vec![4, 5, 6]),
old_value: None,
};
let bytes = record.serialize();
let decoded = CdcRecord::deserialize(&bytes).unwrap();
assert_eq!(decoded.op, ChangeOp::Insert);
assert_eq!(decoded.table_name, "my_table");
assert_eq!(decoded.key, vec![1, 2, 3]);
assert_eq!(decoded.new_value, Some(vec![4, 5, 6]));
assert!(decoded.old_value.is_none());
}
#[test]
fn cdc_record_round_trip_update() {
let record = CdcRecord {
op: ChangeOp::Update,
table_name: String::from("t"),
key: vec![10],
new_value: Some(vec![20]),
old_value: Some(vec![30]),
};
let bytes = record.serialize();
let decoded = CdcRecord::deserialize(&bytes).unwrap();
assert_eq!(decoded.op, ChangeOp::Update);
assert_eq!(decoded.new_value, Some(vec![20]));
assert_eq!(decoded.old_value, Some(vec![30]));
}
#[test]
fn cdc_record_round_trip_delete() {
let record = CdcRecord {
op: ChangeOp::Delete,
table_name: String::from("x"),
key: vec![99],
new_value: None,
old_value: Some(vec![100]),
};
let bytes = record.serialize();
let decoded = CdcRecord::deserialize(&bytes).unwrap();
assert_eq!(decoded.op, ChangeOp::Delete);
assert!(decoded.new_value.is_none());
assert_eq!(decoded.old_value, Some(vec![100]));
}
#[test]
fn cdc_record_empty_values() {
let record = CdcRecord {
op: ChangeOp::Insert,
table_name: String::new(),
key: vec![],
new_value: Some(vec![]),
old_value: None,
};
let bytes = record.serialize();
let decoded = CdcRecord::deserialize(&bytes).unwrap();
assert_eq!(decoded.table_name, "");
assert!(decoded.key.is_empty());
assert_eq!(decoded.new_value, Some(vec![]));
}
#[test]
fn cdc_change_op_invalid_discriminant() {
let err = ChangeOp::from_u8(255).unwrap_err();
match err {
crate::error::StorageError::Corrupted(msg) => {
assert!(msg.contains("invalid ChangeOp discriminant"));
}
other => panic!("expected StorageError::Corrupted, got: {other:?}"),
}
}
#[test]
fn cdc_record_deserialize_empty_data() {
let err = CdcRecord::deserialize(&[]).unwrap_err();
match err {
crate::error::StorageError::Corrupted(msg) => {
assert!(msg.contains("empty"));
}
other => panic!("expected StorageError::Corrupted, got: {other:?}"),
}
}
#[test]
fn cdc_record_deserialize_invalid_op() {
let record = CdcRecord {
op: ChangeOp::Insert,
table_name: String::from("t"),
key: vec![1],
new_value: None,
old_value: None,
};
let mut bytes = record.serialize();
bytes[0] = 99;
let err = CdcRecord::deserialize(&bytes).unwrap_err();
match err {
crate::error::StorageError::Corrupted(msg) => {
assert!(msg.contains("invalid ChangeOp discriminant"));
}
other => panic!("expected StorageError::Corrupted, got: {other:?}"),
}
}
}