use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
pub type TxnId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TxnState {
Active,
Committed,
Aborted,
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WalRecordType {
Data = 0x01,
PageUpdate = 0x02,
Delete = 0x03,
TxnBegin = 0x10,
TxnCommit = 0x11,
TxnAbort = 0x12,
Savepoint = 0x13,
RollbackToSavepoint = 0x14,
TxnEnd = 0x15,
Checkpoint = 0x20,
CheckpointEnd = 0x21,
SchemaChange = 0x30,
CompensationLogRecord = 0x40,
Compaction = 0x50,
Flush = 0x51,
BatchBegin = 0x60,
BatchCommit = 0x61,
}
impl TryFrom<u8> for WalRecordType {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0x01 => Ok(WalRecordType::Data),
0x02 => Ok(WalRecordType::PageUpdate),
0x03 => Ok(WalRecordType::Delete),
0x10 => Ok(WalRecordType::TxnBegin),
0x11 => Ok(WalRecordType::TxnCommit),
0x12 => Ok(WalRecordType::TxnAbort),
0x13 => Ok(WalRecordType::Savepoint),
0x14 => Ok(WalRecordType::RollbackToSavepoint),
0x15 => Ok(WalRecordType::TxnEnd),
0x20 => Ok(WalRecordType::Checkpoint),
0x21 => Ok(WalRecordType::CheckpointEnd),
0x30 => Ok(WalRecordType::SchemaChange),
0x40 => Ok(WalRecordType::CompensationLogRecord),
0x50 => Ok(WalRecordType::Compaction),
0x51 => Ok(WalRecordType::Flush),
0x60 => Ok(WalRecordType::BatchBegin),
0x61 => Ok(WalRecordType::BatchCommit),
_ => Err(()),
}
}
}
pub type Lsn = u64;
pub type PageId = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AriesTransactionEntry {
pub txn_id: TxnId,
pub state: TxnState,
pub last_lsn: Lsn,
pub undo_next_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AriesDirtyPageEntry {
pub page_id: PageId,
pub rec_lsn: Lsn,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AriesCheckpointData {
pub active_transactions: Vec<AriesTransactionEntry>,
pub dirty_pages: Vec<AriesDirtyPageEntry>,
pub begin_checkpoint_lsn: Lsn,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxnWrite {
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub table: String,
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct TxnRead {
pub key: Vec<u8>,
pub table: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxnWalEntry {
pub record_type: WalRecordType,
pub txn_id: TxnId,
pub timestamp_us: u64,
pub key: Option<Vec<u8>>,
pub value: Option<Vec<u8>>,
pub table: Option<String>,
pub checksum: u32,
#[serde(default)]
pub lsn: Lsn,
#[serde(default)]
pub prev_lsn: Option<Lsn>,
#[serde(default)]
pub page_id: Option<PageId>,
#[serde(default)]
pub undo_info: Option<Vec<u8>>,
#[serde(default)]
pub undo_next_lsn: Option<Lsn>,
}
impl TxnWalEntry {
pub fn new_begin(txn_id: TxnId, timestamp_us: u64) -> Self {
Self {
record_type: WalRecordType::TxnBegin,
txn_id,
timestamp_us,
key: None,
value: None,
table: None,
checksum: 0,
lsn: 0,
prev_lsn: None,
page_id: None,
undo_info: None,
undo_next_lsn: None,
}
}
pub fn new_commit(txn_id: TxnId, timestamp_us: u64) -> Self {
Self {
record_type: WalRecordType::TxnCommit,
txn_id,
timestamp_us,
key: None,
value: None,
table: None,
checksum: 0,
lsn: 0,
prev_lsn: None,
page_id: None,
undo_info: None,
undo_next_lsn: None,
}
}
pub fn new_abort(txn_id: TxnId, timestamp_us: u64) -> Self {
Self {
record_type: WalRecordType::TxnAbort,
txn_id,
timestamp_us,
key: None,
value: None,
table: None,
checksum: 0,
lsn: 0,
prev_lsn: None,
page_id: None,
undo_info: None,
undo_next_lsn: None,
}
}
pub fn new_data(
txn_id: TxnId,
timestamp_us: u64,
table: String,
key: Vec<u8>,
value: Option<Vec<u8>>,
) -> Self {
Self {
record_type: WalRecordType::Data,
txn_id,
timestamp_us,
key: Some(key),
value,
table: Some(table),
checksum: 0,
lsn: 0,
prev_lsn: None,
page_id: None,
undo_info: None,
undo_next_lsn: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_aries_data(
txn_id: TxnId,
timestamp_us: u64,
table: String,
key: Vec<u8>,
value: Option<Vec<u8>>,
page_id: PageId,
prev_lsn: Option<Lsn>,
undo_info: Option<Vec<u8>>,
) -> Self {
Self {
record_type: WalRecordType::Data,
txn_id,
timestamp_us,
key: Some(key),
value,
table: Some(table),
checksum: 0,
lsn: 0, prev_lsn,
page_id: Some(page_id),
undo_info,
undo_next_lsn: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_clr(
txn_id: TxnId,
timestamp_us: u64,
table: String,
key: Vec<u8>,
value: Option<Vec<u8>>,
page_id: PageId,
prev_lsn: Lsn,
undo_next_lsn: Lsn,
) -> Self {
Self {
record_type: WalRecordType::CompensationLogRecord,
txn_id,
timestamp_us,
key: Some(key),
value,
table: Some(table),
checksum: 0,
lsn: 0,
prev_lsn: Some(prev_lsn),
page_id: Some(page_id),
undo_info: None, undo_next_lsn: Some(undo_next_lsn),
}
}
pub fn new_checkpoint_end(
timestamp_us: u64,
checkpoint_data: AriesCheckpointData,
) -> Result<Self, String> {
let data = bincode::serialize(&checkpoint_data)
.map_err(|e| format!("Failed to serialize checkpoint data: {}", e))?;
Ok(Self {
record_type: WalRecordType::CheckpointEnd,
txn_id: 0,
timestamp_us,
key: None,
value: Some(data),
table: None,
checksum: 0,
lsn: 0,
prev_lsn: None,
page_id: None,
undo_info: None,
undo_next_lsn: None,
})
}
pub fn get_checkpoint_data(&self) -> Option<AriesCheckpointData> {
if self.record_type != WalRecordType::CheckpointEnd {
return None;
}
self.value
.as_ref()
.and_then(|data| bincode::deserialize(data).ok())
}
pub fn compute_checksum(&mut self) {
let data = self.serialize_for_checksum();
self.checksum = crc32fast::hash(&data);
}
pub fn verify_checksum(&self) -> bool {
let data = self.serialize_for_checksum();
crc32fast::hash(&data) == self.checksum
}
fn serialize_for_checksum(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.push(self.record_type as u8);
buf.extend(&self.txn_id.to_le_bytes());
buf.extend(&self.timestamp_us.to_le_bytes());
if let Some(ref key) = self.key {
buf.extend(&(key.len() as u32).to_le_bytes());
buf.extend(key);
} else {
buf.extend(&0u32.to_le_bytes());
}
if let Some(ref value) = self.value {
buf.extend(&(value.len() as u32).to_le_bytes());
buf.extend(value);
} else {
buf.extend(&0u32.to_le_bytes());
}
if let Some(ref table) = self.table {
buf.extend(&(table.len() as u32).to_le_bytes());
buf.extend(table.as_bytes());
} else {
buf.extend(&0u32.to_le_bytes());
}
buf.extend(&self.lsn.to_le_bytes());
match self.prev_lsn {
Some(lsn) => {
buf.push(1u8);
buf.extend(&lsn.to_le_bytes());
}
None => buf.push(0u8),
}
match self.page_id {
Some(pid) => {
buf.push(1u8);
buf.extend(&pid.to_le_bytes());
}
None => buf.push(0u8),
}
if let Some(ref undo) = self.undo_info {
buf.extend(&(undo.len() as u32).to_le_bytes());
buf.extend(undo);
} else {
buf.extend(&0u32.to_le_bytes());
}
match self.undo_next_lsn {
Some(lsn) => {
buf.push(1u8);
buf.extend(&lsn.to_le_bytes());
}
None => buf.push(0u8),
}
buf
}
pub const FORMAT_VERSION: u8 = 0x01;
pub fn to_bytes(&self) -> Vec<u8> {
let payload = self.serialize_for_checksum();
let mut buf = Vec::with_capacity(1 + payload.len() + 4);
buf.push(Self::FORMAT_VERSION);
buf.extend(&payload);
buf.extend(&self.checksum.to_le_bytes());
buf
}
pub fn from_bytes(data: &[u8]) -> Result<Self, String> {
if data.is_empty() {
return Err("WAL entry empty".to_string());
}
let (version, payload_start) = if data.len() >= 2
&& data[0] == Self::FORMAT_VERSION
&& WalRecordType::try_from(data[1]).is_ok()
{
(1u8, 1usize) } else {
(0u8, 0usize) };
let payload = &data[payload_start..];
if payload.len() < 21 {
return Err(format!(
"WAL entry too short: {} bytes, need at least 21",
payload.len()
));
}
let record_type = WalRecordType::try_from(payload[0])
.map_err(|_| format!("Invalid WAL record type: {}", payload[0]))?;
let txn_id = u64::from_le_bytes(
payload[1..9]
.try_into()
.map_err(|_| "Failed to parse txn_id: slice too short")?,
);
let timestamp_us = u64::from_le_bytes(
payload[9..17]
.try_into()
.map_err(|_| "Failed to parse timestamp: slice too short")?,
);
let mut offset = 17;
if offset + 4 > payload.len() {
return Err(format!(
"WAL entry truncated at key_len: offset {} + 4 > {}",
offset,
payload.len()
));
}
let key_len = u32::from_le_bytes(
payload[offset..offset + 4]
.try_into()
.map_err(|_| "Failed to parse key_len")?,
) as usize;
offset += 4;
if offset + key_len > payload.len() {
return Err(format!(
"WAL entry truncated at key: need {} bytes at offset {}, have {}",
key_len,
offset,
payload.len()
));
}
let key = if key_len > 0 {
Some(payload[offset..offset + key_len].to_vec())
} else {
None
};
offset += key_len;
if offset + 4 > payload.len() {
return Err(format!(
"WAL entry truncated at value_len: offset {} + 4 > {}",
offset,
payload.len()
));
}
let value_len = u32::from_le_bytes(
payload[offset..offset + 4]
.try_into()
.map_err(|_| "Failed to parse value_len")?,
) as usize;
offset += 4;
if offset + value_len > payload.len() {
return Err(format!(
"WAL entry truncated at value: need {} bytes at offset {}, have {}",
value_len,
offset,
payload.len()
));
}
let value = if value_len > 0 {
Some(payload[offset..offset + value_len].to_vec())
} else {
None
};
offset += value_len;
if offset + 4 > payload.len() {
return Err(format!(
"WAL entry truncated at table_len: offset {} + 4 > {}",
offset,
payload.len()
));
}
let table_len = u32::from_le_bytes(
payload[offset..offset + 4]
.try_into()
.map_err(|_| "Failed to parse table_len")?,
) as usize;
offset += 4;
if offset + table_len > payload.len() {
return Err(format!(
"WAL entry truncated at table: need {} bytes at offset {}, have {}",
table_len,
offset,
payload.len()
));
}
let table = if table_len > 0 {
Some(
String::from_utf8(payload[offset..offset + table_len].to_vec())
.map_err(|e| format!("Invalid UTF-8 in table name: {}", e))?,
)
} else {
None
};
offset += table_len;
let (lsn, prev_lsn, page_id, undo_info, undo_next_lsn) = if version >= 1 {
if offset + 8 > payload.len() {
return Err(format!(
"WAL V1 entry truncated at lsn: offset {} + 8 > {}",
offset,
payload.len()
));
}
let lsn = u64::from_le_bytes(
payload[offset..offset + 8]
.try_into()
.map_err(|_| "Failed to parse lsn")?,
);
offset += 8;
if offset >= payload.len() {
return Err("WAL V1 entry truncated at prev_lsn tag".to_string());
}
let prev_lsn = if payload[offset] == 1 {
offset += 1;
if offset + 8 > payload.len() {
return Err("WAL V1 entry truncated at prev_lsn value".to_string());
}
let v = u64::from_le_bytes(
payload[offset..offset + 8]
.try_into()
.map_err(|_| "Failed to parse prev_lsn")?,
);
offset += 8;
Some(v)
} else {
offset += 1;
None
};
if offset >= payload.len() {
return Err("WAL V1 entry truncated at page_id tag".to_string());
}
let page_id = if payload[offset] == 1 {
offset += 1;
if offset + 8 > payload.len() {
return Err("WAL V1 entry truncated at page_id value".to_string());
}
let v = u64::from_le_bytes(
payload[offset..offset + 8]
.try_into()
.map_err(|_| "Failed to parse page_id")?,
);
offset += 8;
Some(v)
} else {
offset += 1;
None
};
if offset + 4 > payload.len() {
return Err("WAL V1 entry truncated at undo_info_len".to_string());
}
let undo_len = u32::from_le_bytes(
payload[offset..offset + 4]
.try_into()
.map_err(|_| "Failed to parse undo_info_len")?,
) as usize;
offset += 4;
let undo_info = if undo_len > 0 {
if offset + undo_len > payload.len() {
return Err("WAL V1 entry truncated at undo_info data".to_string());
}
let v = payload[offset..offset + undo_len].to_vec();
offset += undo_len;
Some(v)
} else {
None
};
if offset >= payload.len() {
return Err("WAL V1 entry truncated at undo_next_lsn tag".to_string());
}
let undo_next_lsn = if payload[offset] == 1 {
offset += 1;
if offset + 8 > payload.len() {
return Err("WAL V1 entry truncated at undo_next_lsn value".to_string());
}
let v = u64::from_le_bytes(
payload[offset..offset + 8]
.try_into()
.map_err(|_| "Failed to parse undo_next_lsn")?,
);
offset += 8;
Some(v)
} else {
offset += 1;
None
};
(lsn, prev_lsn, page_id, undo_info, undo_next_lsn)
} else {
(0u64, None, None, None, None)
};
if offset + 4 > payload.len() {
return Err(format!(
"WAL entry truncated at checksum: offset {} + 4 > {}",
offset,
payload.len()
));
}
let checksum = u32::from_le_bytes(
payload[offset..offset + 4]
.try_into()
.map_err(|_| "Failed to parse checksum")?,
);
let entry = Self {
record_type,
txn_id,
timestamp_us,
key,
value,
table,
checksum,
lsn,
prev_lsn,
page_id,
undo_info,
undo_next_lsn,
};
if !entry.verify_checksum() {
return Err(format!(
"WAL entry checksum mismatch for txn_id {}: expected valid checksum, got {}",
entry.txn_id, entry.checksum
));
}
Ok(entry)
}
}
#[derive(Debug)]
pub struct Transaction {
pub id: TxnId,
pub state: TxnState,
pub start_ts: u64,
pub commit_ts: Option<u64>,
pub writes: Vec<TxnWrite>,
pub read_set: HashSet<TxnRead>,
pub isolation: IsolationLevel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationLevel {
ReadCommitted,
#[default]
SnapshotIsolation,
Serializable,
}
impl Transaction {
pub fn new(id: TxnId, start_ts: u64, isolation: IsolationLevel) -> Self {
Self {
id,
state: TxnState::Active,
start_ts,
commit_ts: None,
writes: Vec::new(),
read_set: HashSet::new(),
isolation,
}
}
pub fn put(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
self.writes.push(TxnWrite {
key,
value: Some(value),
table: table.to_string(),
});
}
pub fn delete(&mut self, table: &str, key: Vec<u8>) {
self.writes.push(TxnWrite {
key,
value: None,
table: table.to_string(),
});
}
pub fn record_read(&mut self, table: &str, key: Vec<u8>) {
self.read_set.insert(TxnRead {
key,
table: table.to_string(),
});
}
pub fn get_local(&self, table: &str, key: &[u8]) -> Option<&TxnWrite> {
self.writes
.iter()
.rev()
.find(|w| w.table == table && w.key == key)
}
pub fn is_read_only(&self) -> bool {
self.writes.is_empty()
}
}
#[derive(Debug, Clone, Default)]
pub struct TxnStats {
pub active_count: u64,
pub committed_count: u64,
pub aborted_count: u64,
pub conflict_aborts: u64,
}
pub struct TransactionManager {
next_txn_id: AtomicU64,
timestamp_counter: AtomicU64,
committed_watermark: AtomicU64,
stats: parking_lot::RwLock<TxnStats>,
}
impl TransactionManager {
pub fn new() -> Self {
Self {
next_txn_id: AtomicU64::new(1),
timestamp_counter: AtomicU64::new(1),
committed_watermark: AtomicU64::new(0),
stats: parking_lot::RwLock::new(TxnStats::default()),
}
}
pub fn begin(&self) -> Transaction {
self.begin_with_isolation(IsolationLevel::default())
}
pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Transaction {
let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
let start_ts = self.timestamp_counter.fetch_add(1, Ordering::SeqCst);
{
let mut stats = self.stats.write();
stats.active_count += 1;
}
Transaction::new(txn_id, start_ts, isolation)
}
pub fn get_commit_ts(&self) -> u64 {
self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
}
pub fn mark_committed(&self, txn: &mut Transaction) {
txn.state = TxnState::Committed;
txn.commit_ts = Some(self.get_commit_ts());
let mut stats = self.stats.write();
stats.active_count = stats.active_count.saturating_sub(1);
stats.committed_count += 1;
}
pub fn mark_aborted(&self, txn: &mut Transaction) {
txn.state = TxnState::Aborted;
let mut stats = self.stats.write();
stats.active_count = stats.active_count.saturating_sub(1);
stats.aborted_count += 1;
}
pub fn mark_conflict_abort(&self, txn: &mut Transaction) {
self.mark_aborted(txn);
let mut stats = self.stats.write();
stats.conflict_aborts += 1;
}
pub fn oldest_active_ts(&self) -> u64 {
self.committed_watermark.load(Ordering::SeqCst)
}
pub fn advance_watermark(&self, new_watermark: u64) {
self.committed_watermark
.fetch_max(new_watermark, Ordering::SeqCst);
}
pub fn stats(&self) -> TxnStats {
self.stats.read().clone()
}
}
impl Default for TransactionManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transaction_lifecycle() {
let mgr = TransactionManager::new();
let mut txn = mgr.begin();
assert_eq!(txn.state, TxnState::Active);
assert!(txn.is_read_only());
txn.put("users", vec![1], vec![2, 3, 4]);
assert!(!txn.is_read_only());
mgr.mark_committed(&mut txn);
assert_eq!(txn.state, TxnState::Committed);
assert!(txn.commit_ts.is_some());
}
#[test]
fn test_read_your_writes() {
let mgr = TransactionManager::new();
let mut txn = mgr.begin();
txn.put("users", vec![1], vec![10, 20]);
txn.put("users", vec![1], vec![30, 40]);
let local = txn.get_local("users", &[1]);
assert!(local.is_some());
assert_eq!(local.unwrap().value, Some(vec![30, 40]));
}
#[test]
fn test_wal_entry_serialization() {
let mut entry = TxnWalEntry::new_data(
42,
1234567890,
"users".to_string(),
vec![1, 2, 3],
Some(vec![4, 5, 6]),
);
entry.compute_checksum();
let bytes = entry.to_bytes();
let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
assert_eq!(parsed.txn_id, 42);
assert_eq!(parsed.timestamp_us, 1234567890);
assert_eq!(parsed.table, Some("users".to_string()));
assert_eq!(parsed.key, Some(vec![1, 2, 3]));
assert_eq!(parsed.value, Some(vec![4, 5, 6]));
assert!(parsed.verify_checksum());
}
#[test]
fn test_wal_entry_aries_roundtrip() {
let mut entry = TxnWalEntry::new_aries_data(
99,
9999999,
"orders".to_string(),
vec![10, 20],
Some(vec![30, 40, 50]),
42, Some(100), Some(vec![0xDE, 0xAD]), );
entry.lsn = 200;
entry.undo_next_lsn = Some(50);
entry.compute_checksum();
let bytes = entry.to_bytes();
let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
assert_eq!(parsed.txn_id, 99);
assert_eq!(parsed.lsn, 200);
assert_eq!(parsed.prev_lsn, Some(100));
assert_eq!(parsed.page_id, Some(42));
assert_eq!(parsed.undo_info, Some(vec![0xDE, 0xAD]));
assert_eq!(parsed.undo_next_lsn, Some(50));
assert_eq!(parsed.key, Some(vec![10, 20]));
assert_eq!(parsed.value, Some(vec![30, 40, 50]));
assert!(parsed.verify_checksum());
}
#[test]
fn test_wal_entry_clr_roundtrip() {
let mut entry = TxnWalEntry::new_clr(
77,
5555555,
"inventory".to_string(),
vec![1],
Some(vec![2]),
10, 300, 250, );
entry.lsn = 400;
entry.compute_checksum();
let bytes = entry.to_bytes();
let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
assert_eq!(parsed.record_type, WalRecordType::CompensationLogRecord);
assert_eq!(parsed.lsn, 400);
assert_eq!(parsed.prev_lsn, Some(300));
assert_eq!(parsed.page_id, Some(10));
assert_eq!(parsed.undo_next_lsn, Some(250));
assert!(parsed.undo_info.is_none()); assert!(parsed.verify_checksum());
}
#[test]
fn test_wal_entry_none_aries_fields_roundtrip() {
let mut entry = TxnWalEntry::new_begin(1, 100);
entry.compute_checksum();
let bytes = entry.to_bytes();
let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
assert_eq!(parsed.lsn, 0);
assert_eq!(parsed.prev_lsn, None);
assert_eq!(parsed.page_id, None);
assert_eq!(parsed.undo_info, None);
assert_eq!(parsed.undo_next_lsn, None);
assert!(parsed.verify_checksum());
}
#[test]
fn test_transaction_stats() {
let mgr = TransactionManager::new();
let mut txn1 = mgr.begin();
let mut txn2 = mgr.begin();
assert_eq!(mgr.stats().active_count, 2);
mgr.mark_committed(&mut txn1);
assert_eq!(mgr.stats().committed_count, 1);
mgr.mark_aborted(&mut txn2);
assert_eq!(mgr.stats().aborted_count, 1);
assert_eq!(mgr.stats().active_count, 0);
}
#[test]
fn test_wal_entry_error_too_short() {
let short_data = vec![0u8; 10];
let result = TxnWalEntry::from_bytes(&short_data);
assert!(result.is_err());
assert!(result.unwrap_err().contains("too short"));
}
#[test]
fn test_wal_entry_error_invalid_record_type() {
let mut data = vec![0u8; 30];
data[0] = 255; let result = TxnWalEntry::from_bytes(&data);
assert!(result.is_err());
assert!(result.unwrap_err().contains("Invalid WAL record type"));
}
#[test]
fn test_wal_entry_error_truncated_key() {
let mut entry =
TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1, 2], Some(vec![3, 4]));
entry.compute_checksum();
let mut bytes = entry.to_bytes();
let huge_len: u32 = 10000;
bytes[17..21].copy_from_slice(&huge_len.to_le_bytes());
let result = TxnWalEntry::from_bytes(&bytes);
assert!(result.is_err());
assert!(result.unwrap_err().contains("truncated at key"));
}
#[test]
fn test_wal_entry_error_corrupted_checksum() {
let mut entry = TxnWalEntry::new_data(
42,
1234567890,
"users".to_string(),
vec![1, 2, 3],
Some(vec![4, 5, 6]),
);
entry.compute_checksum();
let mut bytes = entry.to_bytes();
let len = bytes.len();
bytes[len - 1] ^= 0xFF;
let result = TxnWalEntry::from_bytes(&bytes);
assert!(result.is_err());
assert!(result.unwrap_err().contains("checksum mismatch"));
}
#[test]
fn test_wal_entry_error_invalid_utf8_table() {
let mut entry = TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1], Some(vec![2]));
entry.compute_checksum();
let mut bytes = entry.to_bytes();
let table_start = 17 + 4 + 1 + 4 + 1 + 4;
bytes[table_start] = 0xFF;
let result = TxnWalEntry::from_bytes(&bytes);
assert!(result.is_err());
}
}