use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::fmt;
pub const TXN_LOG_MAGIC: u64 = 0x4C435046535458_u64;
pub const TXN_LOG_VERSION: u32 = 1;
pub const MAX_OPS_PER_TXN: usize = 65536;
pub const MAX_TXN_SIZE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_TXN_TIMEOUT: u64 = 3600;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TxnId(pub u64);
impl TxnId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn value(&self) -> u64 {
self.0
}
}
impl fmt::Display for TxnId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "txn-{:016x}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum TxnLogType {
Begin = 1,
Operation = 2,
Prepare = 3,
Commit = 4,
Abort = 5,
Rollback = 6,
Checkpoint = 7,
}
impl TxnLogType {
pub fn from_u8(val: u8) -> Option<Self> {
match val {
1 => Some(Self::Begin),
2 => Some(Self::Operation),
3 => Some(Self::Prepare),
4 => Some(Self::Commit),
5 => Some(Self::Abort),
6 => Some(Self::Rollback),
7 => Some(Self::Checkpoint),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct TxnLogEntry {
pub txn_id: TxnId,
pub entry_type: TxnLogType,
pub op_index: u32,
pub lsn: u64,
pub timestamp: u64,
pub operation: Option<TxnOperation>,
pub checksum: u64,
}
impl TxnLogEntry {
pub fn begin(txn_id: TxnId, lsn: u64, timestamp: u64) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Begin,
op_index: 0,
lsn,
timestamp,
operation: None,
checksum: 0,
}
}
pub fn operation(
txn_id: TxnId,
op_index: u32,
lsn: u64,
timestamp: u64,
op: TxnOperation,
) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Operation,
op_index,
lsn,
timestamp,
operation: Some(op),
checksum: 0,
}
}
pub fn prepare(txn_id: TxnId, lsn: u64, timestamp: u64) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Prepare,
op_index: 0,
lsn,
timestamp,
operation: None,
checksum: 0,
}
}
pub fn commit(txn_id: TxnId, lsn: u64, timestamp: u64) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Commit,
op_index: 0,
lsn,
timestamp,
operation: None,
checksum: 0,
}
}
pub fn abort(txn_id: TxnId, lsn: u64, timestamp: u64) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Abort,
op_index: 0,
lsn,
timestamp,
operation: None,
checksum: 0,
}
}
pub fn rollback(
txn_id: TxnId,
op_index: u32,
lsn: u64,
timestamp: u64,
op: TxnOperation,
) -> Self {
Self {
txn_id,
entry_type: TxnLogType::Rollback,
op_index,
lsn,
timestamp,
operation: Some(op),
checksum: 0,
}
}
pub fn calculate_checksum(&mut self) {
let mut sum = self.txn_id.0;
sum ^= self.entry_type as u64;
sum ^= self.op_index as u64;
sum ^= self.lsn;
sum ^= self.timestamp;
if let Some(ref op) = self.operation {
sum ^= op.checksum();
}
self.checksum = sum;
}
pub fn verify_checksum(&self) -> bool {
let mut sum = self.txn_id.0;
sum ^= self.entry_type as u64;
sum ^= self.op_index as u64;
sum ^= self.lsn;
sum ^= self.timestamp;
if let Some(ref op) = self.operation {
sum ^= op.checksum();
}
sum == self.checksum
}
}
#[derive(Debug, Clone)]
pub enum TxnOperation {
Create {
path: String,
content: Vec<u8>,
mode: u32,
},
Write {
path: String,
offset: u64,
content: Vec<u8>,
original: Option<Vec<u8>>,
},
Truncate {
path: String,
new_size: u64,
original_size: Option<u64>,
},
Delete {
path: String,
content: Option<Vec<u8>>,
mode: Option<u32>,
},
Rename {
old_path: String,
new_path: String,
},
Mkdir {
path: String,
mode: u32,
},
Rmdir {
path: String,
mode: Option<u32>,
},
Link {
source: String,
target: String,
},
Symlink {
path: String,
target: String,
},
SetAttr {
path: String,
attr: String,
value: Vec<u8>,
original: Option<Vec<u8>>,
},
RemoveAttr {
path: String,
attr: String,
original: Option<Vec<u8>>,
},
Chmod {
path: String,
mode: u32,
original_mode: Option<u32>,
},
Chown {
path: String,
uid: u32,
gid: u32,
original_uid: Option<u32>,
original_gid: Option<u32>,
},
}
impl TxnOperation {
pub fn path(&self) -> &str {
match self {
Self::Create { path, .. } => path,
Self::Write { path, .. } => path,
Self::Truncate { path, .. } => path,
Self::Delete { path, .. } => path,
Self::Rename { old_path, .. } => old_path,
Self::Mkdir { path, .. } => path,
Self::Rmdir { path, .. } => path,
Self::Link { target, .. } => target,
Self::Symlink { path, .. } => path,
Self::SetAttr { path, .. } => path,
Self::RemoveAttr { path, .. } => path,
Self::Chmod { path, .. } => path,
Self::Chown { path, .. } => path,
}
}
pub fn description(&self) -> String {
match self {
Self::Create { path, .. } => alloc::format!("create {}", path),
Self::Write {
path,
offset,
content,
..
} => {
alloc::format!(
"write {} bytes to {} at offset {}",
content.len(),
path,
offset
)
}
Self::Truncate { path, new_size, .. } => {
alloc::format!("truncate {} to {} bytes", path, new_size)
}
Self::Delete { path, .. } => alloc::format!("delete {}", path),
Self::Rename { old_path, new_path } => {
alloc::format!("rename {} to {}", old_path, new_path)
}
Self::Mkdir { path, mode } => alloc::format!("mkdir {} mode {:o}", path, mode),
Self::Rmdir { path, .. } => alloc::format!("rmdir {}", path),
Self::Link { source, target } => alloc::format!("link {} to {}", target, source),
Self::Symlink { path, target } => alloc::format!("symlink {} -> {}", path, target),
Self::SetAttr { path, attr, .. } => alloc::format!("setattr {}:{}", path, attr),
Self::RemoveAttr { path, attr, .. } => alloc::format!("rmattr {}:{}", path, attr),
Self::Chmod { path, mode, .. } => alloc::format!("chmod {} {:o}", path, mode),
Self::Chown { path, uid, gid, .. } => alloc::format!("chown {} {}:{}", path, uid, gid),
}
}
pub fn checksum(&self) -> u64 {
let mut sum = 0u64;
let path = self.path();
for b in path.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
match self {
Self::Create { content, mode, .. } => {
sum ^= content.len() as u64;
sum ^= *mode as u64;
}
Self::Write {
offset, content, ..
} => {
sum ^= *offset;
sum ^= content.len() as u64;
}
Self::Truncate { new_size, .. } => {
sum ^= *new_size;
}
Self::Delete { .. } => {
sum ^= 0xDEAD;
}
Self::Rename { new_path, .. } => {
for b in new_path.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
}
Self::Mkdir { mode, .. } => {
sum ^= *mode as u64;
}
Self::Rmdir { .. } => {
sum ^= 0xD17D;
}
Self::Link { source, .. } => {
for b in source.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
}
Self::Symlink { target, .. } => {
for b in target.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
}
Self::SetAttr { attr, value, .. } => {
for b in attr.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
sum ^= value.len() as u64;
}
Self::RemoveAttr { attr, .. } => {
for b in attr.bytes() {
sum = sum.wrapping_mul(31).wrapping_add(b as u64);
}
}
Self::Chmod { mode, .. } => {
sum ^= *mode as u64;
}
Self::Chown { uid, gid, .. } => {
sum ^= *uid as u64;
sum ^= (*gid as u64) << 32;
}
}
sum
}
pub fn inverse(&self) -> Option<TxnOperation> {
match self {
Self::Create { path, .. } => Some(Self::Delete {
path: path.clone(),
content: None,
mode: None,
}),
Self::Write {
path,
offset,
content,
original,
} => original.as_ref().map(|orig| Self::Write {
path: path.clone(),
offset: *offset,
content: orig.clone(),
original: Some(content.clone()),
}),
Self::Truncate {
path,
original_size,
new_size,
} => original_size.map(|orig| Self::Truncate {
path: path.clone(),
new_size: orig,
original_size: Some(*new_size),
}),
Self::Delete {
path,
content,
mode,
} => match (content, mode) {
(Some(c), Some(m)) => Some(Self::Create {
path: path.clone(),
content: c.clone(),
mode: *m,
}),
_ => None,
},
Self::Rename { old_path, new_path } => Some(Self::Rename {
old_path: new_path.clone(),
new_path: old_path.clone(),
}),
Self::Mkdir { path, .. } => Some(Self::Rmdir {
path: path.clone(),
mode: None,
}),
Self::Rmdir { path, mode } => mode.map(|m| Self::Mkdir {
path: path.clone(),
mode: m,
}),
Self::Link { target, .. } => Some(Self::Delete {
path: target.clone(),
content: None,
mode: None,
}),
Self::Symlink { path, .. } => Some(Self::Delete {
path: path.clone(),
content: None,
mode: None,
}),
Self::SetAttr {
path,
attr,
original,
value,
} => match original {
Some(orig) => Some(Self::SetAttr {
path: path.clone(),
attr: attr.clone(),
value: orig.clone(),
original: Some(value.clone()),
}),
None => Some(Self::RemoveAttr {
path: path.clone(),
attr: attr.clone(),
original: Some(value.clone()),
}),
},
Self::RemoveAttr {
path,
attr,
original,
} => original.as_ref().map(|orig| Self::SetAttr {
path: path.clone(),
attr: attr.clone(),
value: orig.clone(),
original: None,
}),
Self::Chmod {
path,
original_mode,
mode,
} => original_mode.map(|orig| Self::Chmod {
path: path.clone(),
mode: orig,
original_mode: Some(*mode),
}),
Self::Chown {
path,
original_uid,
original_gid,
uid,
gid,
} => match (original_uid, original_gid) {
(Some(ou), Some(og)) => Some(Self::Chown {
path: path.clone(),
uid: *ou,
gid: *og,
original_uid: Some(*uid),
original_gid: Some(*gid),
}),
_ => None,
},
}
}
pub fn estimated_size(&self) -> u64 {
let base = 64; match self {
Self::Create { path, content, .. } => base + path.len() as u64 + content.len() as u64,
Self::Write {
path,
content,
original,
..
} => {
let orig_size = original.as_ref().map(|o| o.len()).unwrap_or(0);
base + path.len() as u64 + content.len() as u64 + orig_size as u64
}
Self::Truncate { path, .. } => base + path.len() as u64,
Self::Delete { path, content, .. } => {
let content_size = content.as_ref().map(|c| c.len()).unwrap_or(0);
base + path.len() as u64 + content_size as u64
}
Self::Rename { old_path, new_path } => {
base + old_path.len() as u64 + new_path.len() as u64
}
Self::Mkdir { path, .. } => base + path.len() as u64,
Self::Rmdir { path, .. } => base + path.len() as u64,
Self::Link { source, target } => base + source.len() as u64 + target.len() as u64,
Self::Symlink { path, target } => base + path.len() as u64 + target.len() as u64,
Self::SetAttr {
path,
attr,
value,
original,
} => {
let orig_size = original.as_ref().map(|o| o.len()).unwrap_or(0);
base + path.len() as u64 + attr.len() as u64 + value.len() as u64 + orig_size as u64
}
Self::RemoveAttr {
path,
attr,
original,
} => {
let orig_size = original.as_ref().map(|o| o.len()).unwrap_or(0);
base + path.len() as u64 + attr.len() as u64 + orig_size as u64
}
Self::Chmod { path, .. } => base + path.len() as u64,
Self::Chown { path, .. } => base + path.len() as u64,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnState {
Active,
Prepared,
Committed,
Aborted,
RollingBack,
}
impl TxnState {
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Committed | Self::Aborted)
}
pub fn is_recoverable(&self) -> bool {
matches!(self, Self::Active | Self::Prepared | Self::RollingBack)
}
}
impl fmt::Display for TxnState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Active => write!(f, "active"),
Self::Prepared => write!(f, "prepared"),
Self::Committed => write!(f, "committed"),
Self::Aborted => write!(f, "aborted"),
Self::RollingBack => write!(f, "rolling_back"),
}
}
}
#[derive(Debug, Clone)]
pub struct TxnResult {
pub txn_id: TxnId,
pub state: TxnState,
pub ops_executed: u32,
pub started_at: u64,
pub completed_at: u64,
pub bytes_written: u64,
}
impl TxnResult {
pub fn is_success(&self) -> bool {
self.state == TxnState::Committed
}
pub fn duration_ns(&self) -> u64 {
self.completed_at.saturating_sub(self.started_at)
}
}
#[derive(Debug, Clone, Default)]
pub struct RecoveryStats {
pub txns_recovered: u32,
pub txns_rolled_forward: u32,
pub txns_rolled_back: u32,
pub ops_replayed: u32,
pub ops_undone: u32,
pub log_entries_processed: u64,
pub recovery_time_ns: u64,
pub errors: u32,
}
impl RecoveryStats {
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &RecoveryStats) {
self.txns_recovered += other.txns_recovered;
self.txns_rolled_forward += other.txns_rolled_forward;
self.txns_rolled_back += other.txns_rolled_back;
self.ops_replayed += other.ops_replayed;
self.ops_undone += other.ops_undone;
self.log_entries_processed += other.log_entries_processed;
self.recovery_time_ns += other.recovery_time_ns;
self.errors += other.errors;
}
}
#[derive(Debug, Clone)]
pub enum TxnError {
NotFound(TxnId),
AlreadyExists(TxnId),
InvalidState {
txn_id: TxnId,
current: TxnState,
expected: &'static [TxnState],
},
TooManyOps {
txn_id: TxnId,
count: usize,
},
TooLarge {
txn_id: TxnId,
size: u64,
},
LogWriteFailed(String),
LogReadFailed(String),
ChecksumMismatch {
lsn: u64,
expected: u64,
actual: u64,
},
OperationFailed {
txn_id: TxnId,
op_index: u32,
message: String,
},
RollbackFailed {
txn_id: TxnId,
op_index: u32,
message: String,
},
PathNotFound(String),
PathExists(String),
NotAFile(String),
NotADirectory(String),
DirectoryNotEmpty(String),
PermissionDenied(String),
Timeout(TxnId),
Internal(String),
}
impl fmt::Display for TxnError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NotFound(id) => write!(f, "transaction not found: {}", id),
Self::AlreadyExists(id) => write!(f, "transaction already exists: {}", id),
Self::InvalidState {
txn_id,
current,
expected,
} => {
write!(
f,
"transaction {} in state {}, expected {:?}",
txn_id, current, expected
)
}
Self::TooManyOps { txn_id, count } => {
write!(
f,
"transaction {} has too many operations: {}",
txn_id, count
)
}
Self::TooLarge { txn_id, size } => {
write!(f, "transaction {} too large: {} bytes", txn_id, size)
}
Self::LogWriteFailed(msg) => write!(f, "log write failed: {}", msg),
Self::LogReadFailed(msg) => write!(f, "log read failed: {}", msg),
Self::ChecksumMismatch {
lsn,
expected,
actual,
} => {
write!(
f,
"checksum mismatch at LSN {}: expected {:016x}, got {:016x}",
lsn, expected, actual
)
}
Self::OperationFailed {
txn_id,
op_index,
message,
} => {
write!(
f,
"operation {} in {} failed: {}",
op_index, txn_id, message
)
}
Self::RollbackFailed {
txn_id,
op_index,
message,
} => {
write!(
f,
"rollback of operation {} in {} failed: {}",
op_index, txn_id, message
)
}
Self::PathNotFound(path) => write!(f, "path not found: {}", path),
Self::PathExists(path) => write!(f, "path already exists: {}", path),
Self::NotAFile(path) => write!(f, "not a file: {}", path),
Self::NotADirectory(path) => write!(f, "not a directory: {}", path),
Self::DirectoryNotEmpty(path) => write!(f, "directory not empty: {}", path),
Self::PermissionDenied(path) => write!(f, "permission denied: {}", path),
Self::Timeout(id) => write!(f, "transaction timeout: {}", id),
Self::Internal(msg) => write!(f, "internal error: {}", msg),
}
}
}
pub type TxnResultType<T> = Result<T, TxnError>;
#[cfg(test)]
mod tests {
use super::*;
use alloc::string::ToString;
use alloc::vec;
#[test]
fn test_txn_id() {
let id = TxnId::new(12345);
assert_eq!(id.value(), 12345);
assert!(id.to_string().contains("3039")); }
#[test]
fn test_log_type_from_u8() {
assert_eq!(TxnLogType::from_u8(1), Some(TxnLogType::Begin));
assert_eq!(TxnLogType::from_u8(4), Some(TxnLogType::Commit));
assert_eq!(TxnLogType::from_u8(99), None);
}
#[test]
fn test_log_entry_checksum() {
let mut entry = TxnLogEntry::begin(TxnId::new(1), 100, 1000);
entry.calculate_checksum();
assert!(entry.verify_checksum());
entry.lsn = 999;
assert!(!entry.verify_checksum());
}
#[test]
fn test_operation_path() {
let op = TxnOperation::Create {
path: "/test/file.txt".into(),
content: vec![1, 2, 3],
mode: 0o644,
};
assert_eq!(op.path(), "/test/file.txt");
}
#[test]
fn test_operation_description() {
let op = TxnOperation::Rename {
old_path: "/old".into(),
new_path: "/new".into(),
};
let desc = op.description();
assert!(desc.contains("rename"));
assert!(desc.contains("/old"));
assert!(desc.contains("/new"));
}
#[test]
fn test_operation_inverse_create() {
let op = TxnOperation::Create {
path: "/test".into(),
content: vec![1, 2, 3],
mode: 0o644,
};
let inv = op.inverse().unwrap();
match inv {
TxnOperation::Delete { path, .. } => assert_eq!(path, "/test"),
_ => panic!("expected Delete"),
}
}
#[test]
fn test_operation_inverse_rename() {
let op = TxnOperation::Rename {
old_path: "/a".into(),
new_path: "/b".into(),
};
let inv = op.inverse().unwrap();
match inv {
TxnOperation::Rename { old_path, new_path } => {
assert_eq!(old_path, "/b");
assert_eq!(new_path, "/a");
}
_ => panic!("expected Rename"),
}
}
#[test]
fn test_operation_inverse_write() {
let op = TxnOperation::Write {
path: "/test".into(),
offset: 100,
content: vec![1, 2, 3],
original: Some(vec![4, 5, 6]),
};
let inv = op.inverse().unwrap();
match inv {
TxnOperation::Write {
path,
offset,
content,
..
} => {
assert_eq!(path, "/test");
assert_eq!(offset, 100);
assert_eq!(content, vec![4, 5, 6]);
}
_ => panic!("expected Write"),
}
}
#[test]
fn test_txn_state_terminal() {
assert!(!TxnState::Active.is_terminal());
assert!(!TxnState::Prepared.is_terminal());
assert!(TxnState::Committed.is_terminal());
assert!(TxnState::Aborted.is_terminal());
}
#[test]
fn test_txn_state_recoverable() {
assert!(TxnState::Active.is_recoverable());
assert!(TxnState::Prepared.is_recoverable());
assert!(!TxnState::Committed.is_recoverable());
assert!(!TxnState::Aborted.is_recoverable());
}
#[test]
fn test_txn_result() {
let result = TxnResult {
txn_id: TxnId::new(1),
state: TxnState::Committed,
ops_executed: 10,
started_at: 1000,
completed_at: 2000,
bytes_written: 4096,
};
assert!(result.is_success());
assert_eq!(result.duration_ns(), 1000);
}
#[test]
fn test_recovery_stats_merge() {
let mut s1 = RecoveryStats {
txns_recovered: 5,
txns_rolled_forward: 2,
txns_rolled_back: 3,
ops_replayed: 10,
ops_undone: 5,
log_entries_processed: 100,
recovery_time_ns: 1000000,
errors: 1,
};
let s2 = RecoveryStats {
txns_recovered: 3,
txns_rolled_forward: 1,
txns_rolled_back: 2,
ops_replayed: 6,
ops_undone: 3,
log_entries_processed: 50,
recovery_time_ns: 500000,
errors: 0,
};
s1.merge(&s2);
assert_eq!(s1.txns_recovered, 8);
assert_eq!(s1.ops_replayed, 16);
}
#[test]
fn test_operation_estimated_size() {
let op = TxnOperation::Create {
path: "/test/file.txt".into(),
content: vec![0; 1000],
mode: 0o644,
};
let size = op.estimated_size();
assert!(size > 1000); }
#[test]
fn test_error_display() {
let err = TxnError::NotFound(TxnId::new(123));
let msg = err.to_string();
assert!(msg.contains("not found"));
}
}