use fsqlite_types::{CommitSeq, PageNumber, TxnId};
use tracing::{debug, trace, warn};
use crate::cell_visibility::{CellDeltaKind, CellKey};
pub const CELL_DELTA_FRAME_TYPE: u8 = 0x43;
pub const CELL_DELTA_HEADER_SIZE: usize = 42;
pub const CELL_DELTA_CHECKSUM_SIZE: usize = 4;
pub const CELL_DELTA_MIN_FRAME_SIZE: usize = CELL_DELTA_HEADER_SIZE + CELL_DELTA_CHECKSUM_SIZE;
pub const CELL_DELTA_MAX_DATA_LEN: u32 = 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum CellDeltaOp {
Insert = 1,
Update = 2,
Delete = 3,
}
impl CellDeltaOp {
#[must_use]
pub fn from_byte(b: u8) -> Option<Self> {
match b {
1 => Some(Self::Insert),
2 => Some(Self::Update),
3 => Some(Self::Delete),
_ => None,
}
}
#[must_use]
pub fn from_kind(kind: &CellDeltaKind) -> Self {
match kind {
CellDeltaKind::Insert => Self::Insert,
CellDeltaKind::Update => Self::Update,
CellDeltaKind::Delete => Self::Delete,
}
}
#[must_use]
pub fn to_kind(self) -> CellDeltaKind {
match self {
Self::Insert => CellDeltaKind::Insert,
Self::Update => CellDeltaKind::Update,
Self::Delete => CellDeltaKind::Delete,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CellDeltaWalFrame {
pub page_number: PageNumber,
pub key_digest: [u8; 16],
pub op: CellDeltaOp,
pub commit_seq: CommitSeq,
pub txn_id: TxnId,
pub cell_data: Vec<u8>,
}
impl CellDeltaWalFrame {
#[must_use]
pub fn new(
page_number: PageNumber,
cell_key: &CellKey,
op: CellDeltaOp,
commit_seq: CommitSeq,
txn_id: TxnId,
cell_data: Vec<u8>,
) -> Self {
Self {
page_number,
key_digest: cell_key.key_digest,
op,
commit_seq,
txn_id,
cell_data,
}
}
#[must_use]
pub fn serialized_size(&self) -> usize {
CELL_DELTA_HEADER_SIZE + self.cell_data.len() + CELL_DELTA_CHECKSUM_SIZE
}
#[must_use]
pub fn serialize(&self) -> Vec<u8> {
let total_size = self.serialized_size();
let mut buf = Vec::with_capacity(total_size);
buf.push(CELL_DELTA_FRAME_TYPE);
buf.extend_from_slice(&self.page_number.get().to_be_bytes());
buf.extend_from_slice(&self.key_digest);
buf.push(self.op as u8);
buf.extend_from_slice(&self.commit_seq.get().to_be_bytes());
buf.extend_from_slice(&self.txn_id.get().to_be_bytes());
let data_len = u32::try_from(self.cell_data.len()).unwrap_or(u32::MAX);
buf.extend_from_slice(&data_len.to_be_bytes());
buf.extend_from_slice(&self.cell_data);
let checksum = crc32c_checksum(&buf);
buf.extend_from_slice(&checksum.to_be_bytes());
trace!(
pgno = self.page_number.get(),
op = ?self.op,
commit_seq = self.commit_seq.get(),
data_len = self.cell_data.len(),
frame_size = buf.len(),
"cell_delta_wal_frame_serialized"
);
buf
}
#[must_use]
pub fn deserialize(buf: &[u8]) -> Option<Self> {
if buf.len() < CELL_DELTA_MIN_FRAME_SIZE {
warn!(
buf_len = buf.len(),
min_size = CELL_DELTA_MIN_FRAME_SIZE,
"cell_delta_wal_frame_too_short"
);
return None;
}
if buf[0] != CELL_DELTA_FRAME_TYPE {
return None; }
let page_number = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]);
let page_number = PageNumber::new(page_number)?;
let mut key_digest = [0u8; 16];
key_digest.copy_from_slice(&buf[5..21]);
let op = CellDeltaOp::from_byte(buf[21])?;
let commit_seq = u64::from_be_bytes([
buf[22], buf[23], buf[24], buf[25], buf[26], buf[27], buf[28], buf[29],
]);
let commit_seq = CommitSeq::new(commit_seq);
let txn_id = u64::from_be_bytes([
buf[30], buf[31], buf[32], buf[33], buf[34], buf[35], buf[36], buf[37],
]);
let txn_id = TxnId::new(txn_id)?;
let data_len = u32::from_be_bytes([buf[38], buf[39], buf[40], buf[41]]);
if data_len > CELL_DELTA_MAX_DATA_LEN {
warn!(
data_len,
max = CELL_DELTA_MAX_DATA_LEN,
"cell_delta_wal_frame_data_too_large"
);
return None;
}
let expected_total_size =
CELL_DELTA_HEADER_SIZE + data_len as usize + CELL_DELTA_CHECKSUM_SIZE;
if buf.len() < expected_total_size {
warn!(
buf_len = buf.len(),
expected_size = expected_total_size,
"cell_delta_wal_frame_truncated"
);
return None;
}
let data_start = CELL_DELTA_HEADER_SIZE;
let data_end = data_start + data_len as usize;
let cell_data = buf[data_start..data_end].to_vec();
let checksum_start = data_end;
let stored_checksum = u32::from_be_bytes([
buf[checksum_start],
buf[checksum_start + 1],
buf[checksum_start + 2],
buf[checksum_start + 3],
]);
let computed_checksum = crc32c_checksum(&buf[..checksum_start]);
if stored_checksum != computed_checksum {
warn!(
stored = stored_checksum,
computed = computed_checksum,
"cell_delta_wal_frame_checksum_mismatch"
);
return None;
}
trace!(
pgno = page_number.get(),
op = ?op,
commit_seq = commit_seq.get(),
data_len,
"cell_delta_wal_frame_deserialized"
);
Some(Self {
page_number,
key_digest,
op,
commit_seq,
txn_id,
cell_data,
})
}
#[inline]
#[must_use]
pub fn is_cell_delta_frame(buf: &[u8]) -> bool {
!buf.is_empty() && buf[0] == CELL_DELTA_FRAME_TYPE
}
}
#[inline]
fn crc32c_checksum(data: &[u8]) -> u32 {
crc32c::crc32c(data)
}
#[must_use]
pub fn serialize_cell_delta_batch(frames: &[CellDeltaWalFrame]) -> Vec<u8> {
let total_size: usize = frames.iter().map(CellDeltaWalFrame::serialized_size).sum();
let mut buf = Vec::with_capacity(total_size);
for frame in frames {
buf.extend_from_slice(&frame.serialize());
}
debug!(
frame_count = frames.len(),
total_bytes = buf.len(),
"cell_delta_batch_serialized"
);
buf
}
#[must_use]
pub fn deserialize_cell_delta_batch(buf: &[u8]) -> Vec<CellDeltaWalFrame> {
let mut frames = Vec::new();
let mut offset = 0;
while offset < buf.len() {
let remaining = &buf[offset..];
if !CellDeltaWalFrame::is_cell_delta_frame(remaining) {
break;
}
if remaining.len() < CELL_DELTA_HEADER_SIZE {
break;
}
let data_len =
u32::from_be_bytes([remaining[38], remaining[39], remaining[40], remaining[41]]);
let frame_size = CELL_DELTA_HEADER_SIZE + data_len as usize + CELL_DELTA_CHECKSUM_SIZE;
if remaining.len() < frame_size {
break;
}
if let Some(frame) = CellDeltaWalFrame::deserialize(&remaining[..frame_size]) {
frames.push(frame);
offset += frame_size;
} else {
break;
}
}
debug!(
frame_count = frames.len(),
bytes_consumed = offset,
"cell_delta_batch_deserialized"
);
frames
}
#[derive(Debug, Clone, Default)]
pub struct CellDeltaRecoverySummary {
pub cell_delta_frames: u64,
pub full_page_frames: u64,
pub cell_delta_bytes: u64,
pub pages_with_cell_deltas: u64,
pub deltas_inserted: u64,
}
impl CellDeltaRecoverySummary {
pub fn log_summary(&self) {
tracing::info!(
cell_delta_frames = self.cell_delta_frames,
full_page_frames = self.full_page_frames,
cell_delta_bytes = self.cell_delta_bytes,
pages_with_cell_deltas = self.pages_with_cell_deltas,
deltas_inserted = self.deltas_inserted,
"wal_recovery_summary"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{BtreeRef, SemanticKeyKind, TableId};
fn make_cell_key() -> CellKey {
CellKey {
btree: BtreeRef::Table(TableId::new(1)),
kind: SemanticKeyKind::TableRow,
key_digest: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
}
}
#[test]
fn test_cell_delta_frame_round_trip() {
let frame = CellDeltaWalFrame {
page_number: PageNumber::new(42).unwrap(),
key_digest: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(12345),
txn_id: TxnId::new(67890).unwrap(),
cell_data: vec![0xDE, 0xAD, 0xBE, 0xEF],
};
let serialized = frame.serialize();
let deserialized = CellDeltaWalFrame::deserialize(&serialized);
assert_eq!(deserialized, Some(frame));
}
#[test]
fn test_cell_delta_frame_checksum() {
let frame = CellDeltaWalFrame {
page_number: PageNumber::new(42).unwrap(),
key_digest: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
op: CellDeltaOp::Update,
commit_seq: CommitSeq::new(100),
txn_id: TxnId::new(200).unwrap(),
cell_data: vec![1, 2, 3, 4, 5],
};
let mut serialized = frame.serialize();
let corrupt_idx = serialized.len() / 2;
serialized[corrupt_idx] ^= 0xFF;
assert!(CellDeltaWalFrame::deserialize(&serialized).is_none());
}
#[test]
fn test_cell_delta_frame_variable_length() {
let frame_empty = CellDeltaWalFrame {
page_number: PageNumber::new(1).unwrap(),
key_digest: [0; 16],
op: CellDeltaOp::Delete,
commit_seq: CommitSeq::new(1),
txn_id: TxnId::new(1).unwrap(),
cell_data: vec![],
};
let ser = frame_empty.serialize();
assert_eq!(CellDeltaWalFrame::deserialize(&ser), Some(frame_empty));
let frame_100 = CellDeltaWalFrame {
page_number: PageNumber::new(2).unwrap(),
key_digest: [1; 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(2),
txn_id: TxnId::new(2).unwrap(),
cell_data: vec![0xAB; 100],
};
let ser = frame_100.serialize();
assert_eq!(CellDeltaWalFrame::deserialize(&ser), Some(frame_100));
let frame_4000 = CellDeltaWalFrame {
page_number: PageNumber::new(3).unwrap(),
key_digest: [2; 16],
op: CellDeltaOp::Update,
commit_seq: CommitSeq::new(3),
txn_id: TxnId::new(3).unwrap(),
cell_data: vec![0xCD; 4000],
};
let ser = frame_4000.serialize();
assert_eq!(CellDeltaWalFrame::deserialize(&ser), Some(frame_4000));
}
#[test]
fn test_cell_delta_frame_type_byte() {
let frame = CellDeltaWalFrame {
page_number: PageNumber::new(42).unwrap(),
key_digest: [0; 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(1),
txn_id: TxnId::new(1).unwrap(),
cell_data: vec![1, 2, 3],
};
let serialized = frame.serialize();
assert_eq!(serialized[0], CELL_DELTA_FRAME_TYPE);
assert!(CellDeltaWalFrame::is_cell_delta_frame(&serialized));
let fake_page_frame = [0x00, 0x00, 0x00, 0x01]; assert!(!CellDeltaWalFrame::is_cell_delta_frame(&fake_page_frame));
}
#[test]
fn test_cell_delta_op_conversion() {
assert_eq!(CellDeltaOp::from_byte(1), Some(CellDeltaOp::Insert));
assert_eq!(CellDeltaOp::from_byte(2), Some(CellDeltaOp::Update));
assert_eq!(CellDeltaOp::from_byte(3), Some(CellDeltaOp::Delete));
assert_eq!(CellDeltaOp::from_byte(0), None);
assert_eq!(CellDeltaOp::from_byte(4), None);
assert_eq!(CellDeltaOp::from_byte(255), None);
}
#[test]
fn test_batch_serialization() {
let frames = vec![
CellDeltaWalFrame {
page_number: PageNumber::new(1).unwrap(),
key_digest: [1; 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(10),
txn_id: TxnId::new(100).unwrap(),
cell_data: vec![1, 2, 3],
},
CellDeltaWalFrame {
page_number: PageNumber::new(2).unwrap(),
key_digest: [2; 16],
op: CellDeltaOp::Update,
commit_seq: CommitSeq::new(20),
txn_id: TxnId::new(200).unwrap(),
cell_data: vec![4, 5, 6, 7],
},
CellDeltaWalFrame {
page_number: PageNumber::new(3).unwrap(),
key_digest: [3; 16],
op: CellDeltaOp::Delete,
commit_seq: CommitSeq::new(30),
txn_id: TxnId::new(300).unwrap(),
cell_data: vec![],
},
];
let serialized = serialize_cell_delta_batch(&frames);
let deserialized = deserialize_cell_delta_batch(&serialized);
assert_eq!(deserialized, frames);
}
#[test]
fn test_serialized_size() {
let frame_empty = CellDeltaWalFrame {
page_number: PageNumber::new(1).unwrap(),
key_digest: [0; 16],
op: CellDeltaOp::Delete,
commit_seq: CommitSeq::new(1),
txn_id: TxnId::new(1).unwrap(),
cell_data: vec![],
};
assert_eq!(frame_empty.serialized_size(), 46);
assert_eq!(frame_empty.serialize().len(), 46);
let frame_100 = CellDeltaWalFrame {
page_number: PageNumber::new(1).unwrap(),
key_digest: [0; 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(1),
txn_id: TxnId::new(1).unwrap(),
cell_data: vec![0; 100],
};
assert_eq!(frame_100.serialized_size(), 146);
assert_eq!(frame_100.serialize().len(), 146);
}
#[test]
fn test_truncated_frame_rejected() {
let frame = CellDeltaWalFrame {
page_number: PageNumber::new(42).unwrap(),
key_digest: [0; 16],
op: CellDeltaOp::Insert,
commit_seq: CommitSeq::new(1),
txn_id: TxnId::new(1).unwrap(),
cell_data: vec![1, 2, 3, 4, 5],
};
let serialized = frame.serialize();
for truncate_at in [0, 10, 20, 40, serialized.len() - 1] {
let truncated = &serialized[..truncate_at];
assert!(
CellDeltaWalFrame::deserialize(truncated).is_none(),
"Should reject frame truncated at {truncate_at}"
);
}
}
#[test]
fn test_invalid_page_number_rejected() {
let mut buf = vec![CELL_DELTA_FRAME_TYPE];
buf.extend_from_slice(&0u32.to_be_bytes()); buf.extend_from_slice(&[0u8; 16]); buf.push(1); buf.extend_from_slice(&1u64.to_be_bytes()); buf.extend_from_slice(&1u64.to_be_bytes()); buf.extend_from_slice(&0u32.to_be_bytes()); let checksum = crc32c_checksum(&buf);
buf.extend_from_slice(&checksum.to_be_bytes());
assert!(CellDeltaWalFrame::deserialize(&buf).is_none());
}
#[test]
fn test_invalid_txn_id_rejected() {
let mut buf = vec![CELL_DELTA_FRAME_TYPE];
buf.extend_from_slice(&1u32.to_be_bytes()); buf.extend_from_slice(&[0u8; 16]); buf.push(1); buf.extend_from_slice(&1u64.to_be_bytes()); buf.extend_from_slice(&0u64.to_be_bytes()); buf.extend_from_slice(&0u32.to_be_bytes()); let checksum = crc32c_checksum(&buf);
buf.extend_from_slice(&checksum.to_be_bytes());
assert!(CellDeltaWalFrame::deserialize(&buf).is_none());
}
#[test]
fn test_invalid_op_rejected() {
let mut buf = vec![CELL_DELTA_FRAME_TYPE];
buf.extend_from_slice(&1u32.to_be_bytes()); buf.extend_from_slice(&[0u8; 16]); buf.push(99); buf.extend_from_slice(&1u64.to_be_bytes()); buf.extend_from_slice(&1u64.to_be_bytes()); buf.extend_from_slice(&0u32.to_be_bytes()); let checksum = crc32c_checksum(&buf);
buf.extend_from_slice(&checksum.to_be_bytes());
assert!(CellDeltaWalFrame::deserialize(&buf).is_none());
}
#[test]
fn test_from_cell_key() {
let cell_key = make_cell_key();
let frame = CellDeltaWalFrame::new(
PageNumber::new(42).unwrap(),
&cell_key,
CellDeltaOp::Insert,
CommitSeq::new(100),
TxnId::new(200).unwrap(),
vec![1, 2, 3],
);
assert_eq!(frame.key_digest, cell_key.key_digest);
}
}