use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::log::TxnLog;
use super::types::{
DEFAULT_TXN_TIMEOUT, MAX_OPS_PER_TXN, MAX_TXN_SIZE, RecoveryStats, TxnError, TxnId,
TxnOperation, TxnResult, TxnResultType, TxnState,
};
pub trait TxnFilesystem: Send + Sync {
fn create(&self, path: &str, content: &[u8], mode: u32) -> TxnResultType<()>;
fn write(&self, path: &str, offset: u64, content: &[u8]) -> TxnResultType<Vec<u8>>;
fn read(&self, path: &str, offset: u64, len: u64) -> TxnResultType<Vec<u8>>;
fn truncate(&self, path: &str, new_size: u64) -> TxnResultType<u64>;
fn delete(&self, path: &str) -> TxnResultType<(Vec<u8>, u32)>;
fn rename(&self, old_path: &str, new_path: &str) -> TxnResultType<()>;
fn mkdir(&self, path: &str, mode: u32) -> TxnResultType<()>;
fn rmdir(&self, path: &str) -> TxnResultType<u32>;
fn link(&self, source: &str, target: &str) -> TxnResultType<()>;
fn symlink(&self, path: &str, target: &str) -> TxnResultType<()>;
fn get_attr(&self, path: &str, attr: &str) -> TxnResultType<Option<Vec<u8>>>;
fn set_attr(&self, path: &str, attr: &str, value: &[u8]) -> TxnResultType<Option<Vec<u8>>>;
fn remove_attr(&self, path: &str, attr: &str) -> TxnResultType<Option<Vec<u8>>>;
fn get_mode(&self, path: &str) -> TxnResultType<u32>;
fn chmod(&self, path: &str, mode: u32) -> TxnResultType<u32>;
fn get_owner(&self, path: &str) -> TxnResultType<(u32, u32)>;
fn chown(&self, path: &str, uid: u32, gid: u32) -> TxnResultType<(u32, u32)>;
fn exists(&self, path: &str) -> bool;
fn is_file(&self, path: &str) -> bool;
fn is_directory(&self, path: &str) -> bool;
fn sync(&self) -> TxnResultType<()>;
}
#[derive(Debug)]
pub struct Transaction {
id: TxnId,
dataset: String,
state: TxnState,
operations: Vec<TxnOperation>,
started_at: u64,
timeout: u64,
estimated_size: u64,
}
impl Transaction {
pub fn new(id: TxnId, dataset: &str, timestamp: u64) -> Self {
Self {
id,
dataset: dataset.to_string(),
state: TxnState::Active,
operations: Vec::new(),
started_at: timestamp,
timeout: DEFAULT_TXN_TIMEOUT,
estimated_size: 0,
}
}
pub fn id(&self) -> TxnId {
self.id
}
pub fn dataset(&self) -> &str {
&self.dataset
}
pub fn state(&self) -> TxnState {
self.state
}
pub fn operations(&self) -> &[TxnOperation] {
&self.operations
}
pub fn op_count(&self) -> usize {
self.operations.len()
}
pub fn started_at(&self) -> u64 {
self.started_at
}
pub fn set_timeout(&mut self, timeout: u64) {
self.timeout = timeout;
}
pub fn is_timed_out(&self, now: u64) -> bool {
let elapsed = now.saturating_sub(self.started_at) / 1_000_000_000; elapsed > self.timeout
}
pub fn add_operation(&mut self, op: TxnOperation) -> TxnResultType<u32> {
if self.state != TxnState::Active {
return Err(TxnError::InvalidState {
txn_id: self.id,
current: self.state,
expected: &[TxnState::Active],
});
}
if self.operations.len() >= MAX_OPS_PER_TXN {
return Err(TxnError::TooManyOps {
txn_id: self.id,
count: self.operations.len(),
});
}
let op_size = op.estimated_size();
if self.estimated_size + op_size > MAX_TXN_SIZE {
return Err(TxnError::TooLarge {
txn_id: self.id,
size: self.estimated_size + op_size,
});
}
self.estimated_size += op_size;
let index = self.operations.len() as u32;
self.operations.push(op);
Ok(index)
}
}
pub struct TxnExecutor {
log: TxnLog,
fs: Box<dyn TxnFilesystem>,
transactions: BTreeMap<TxnId, Transaction>,
next_txn_id: u64,
}
impl TxnExecutor {
pub fn new(dataset: &str, fs: Box<dyn TxnFilesystem>, timestamp: u64) -> Self {
Self {
log: TxnLog::new(dataset, 1, timestamp),
fs,
transactions: BTreeMap::new(),
next_txn_id: 1,
}
}
pub fn dataset(&self) -> &str {
self.log.dataset()
}
pub fn begin(&mut self, timestamp: u64) -> TxnResultType<TxnId> {
let txn_id = TxnId::new(self.next_txn_id);
self.next_txn_id += 1;
self.log.begin(txn_id, timestamp)?;
let txn = Transaction::new(txn_id, self.log.dataset(), timestamp);
self.transactions.insert(txn_id, txn);
Ok(txn_id)
}
pub fn add_operation(
&mut self,
txn_id: TxnId,
op: TxnOperation,
timestamp: u64,
) -> TxnResultType<u32> {
let txn = self
.transactions
.get_mut(&txn_id)
.ok_or(TxnError::NotFound(txn_id))?;
let index = txn.add_operation(op.clone())?;
self.log.log_operation(txn_id, op, timestamp)?;
Ok(index)
}
pub fn commit(&mut self, txn_id: TxnId, timestamp: u64) -> TxnResultType<TxnResult> {
{
let txn = self
.transactions
.get(&txn_id)
.ok_or(TxnError::NotFound(txn_id))?;
if txn.state != TxnState::Active {
return Err(TxnError::InvalidState {
txn_id,
current: txn.state,
expected: &[TxnState::Active],
});
}
}
self.log.prepare(txn_id, timestamp)?;
if let Some(txn) = self.transactions.get_mut(&txn_id) {
txn.state = TxnState::Prepared;
}
let operations: Vec<TxnOperation> = self
.transactions
.get(&txn_id)
.map(|txn| txn.operations.clone())
.unwrap_or_default();
let mut executed_ops: Vec<(u32, TxnOperation)> = Vec::new();
let mut bytes_written = 0u64;
for (index, op) in operations.iter().enumerate() {
match self.execute_operation(op) {
Ok(op_with_rollback) => {
bytes_written += op.estimated_size();
executed_ops.push((index as u32, op_with_rollback));
}
Err(e) => {
self.rollback_operations(txn_id, &executed_ops, timestamp)?;
self.log.abort(txn_id, timestamp)?;
if let Some(txn) = self.transactions.get_mut(&txn_id) {
txn.state = TxnState::Aborted;
}
return Err(TxnError::OperationFailed {
txn_id,
op_index: index as u32,
message: e.to_string(),
});
}
}
}
self.log.commit(txn_id, timestamp)?;
let _ = self.fs.sync();
let (ops_executed, started_at) = self
.transactions
.get(&txn_id)
.map(|txn| (txn.operations.len() as u32, txn.started_at))
.unwrap_or((0, 0));
self.transactions.remove(&txn_id);
Ok(TxnResult {
txn_id,
state: TxnState::Committed,
ops_executed,
started_at,
completed_at: timestamp,
bytes_written,
})
}
pub fn abort(&mut self, txn_id: TxnId, timestamp: u64) -> TxnResultType<TxnResult> {
let txn = self
.transactions
.get_mut(&txn_id)
.ok_or(TxnError::NotFound(txn_id))?;
if txn.state == TxnState::Committed {
return Err(TxnError::InvalidState {
txn_id,
current: txn.state,
expected: &[TxnState::Active, TxnState::Prepared],
});
}
self.log.abort(txn_id, timestamp)?;
txn.state = TxnState::Aborted;
let ops_executed = 0;
let started_at = txn.started_at;
self.transactions.remove(&txn_id);
Ok(TxnResult {
txn_id,
state: TxnState::Aborted,
ops_executed,
started_at,
completed_at: timestamp,
bytes_written: 0,
})
}
fn execute_operation(&self, op: &TxnOperation) -> TxnResultType<TxnOperation> {
match op {
TxnOperation::Create {
path,
content,
mode,
} => {
self.fs.create(path, content, *mode)?;
Ok(TxnOperation::Delete {
path: path.clone(),
content: Some(content.clone()),
mode: Some(*mode),
})
}
TxnOperation::Write {
path,
offset,
content,
..
} => {
let original = self.fs.write(path, *offset, content)?;
Ok(TxnOperation::Write {
path: path.clone(),
offset: *offset,
content: original.clone(),
original: Some(content.clone()),
})
}
TxnOperation::Truncate { path, new_size, .. } => {
let original_size = self.fs.truncate(path, *new_size)?;
Ok(TxnOperation::Truncate {
path: path.clone(),
new_size: original_size,
original_size: Some(*new_size),
})
}
TxnOperation::Delete { path, .. } => {
let (content, mode) = self.fs.delete(path)?;
Ok(TxnOperation::Create {
path: path.clone(),
content,
mode,
})
}
TxnOperation::Rename { old_path, new_path } => {
self.fs.rename(old_path, new_path)?;
Ok(TxnOperation::Rename {
old_path: new_path.clone(),
new_path: old_path.clone(),
})
}
TxnOperation::Mkdir { path, mode } => {
self.fs.mkdir(path, *mode)?;
Ok(TxnOperation::Rmdir {
path: path.clone(),
mode: Some(*mode),
})
}
TxnOperation::Rmdir { path, .. } => {
let mode = self.fs.rmdir(path)?;
Ok(TxnOperation::Mkdir {
path: path.clone(),
mode,
})
}
TxnOperation::Link { source, target } => {
self.fs.link(source, target)?;
Ok(TxnOperation::Delete {
path: target.clone(),
content: None,
mode: None,
})
}
TxnOperation::Symlink { path, target } => {
self.fs.symlink(path, target)?;
Ok(TxnOperation::Delete {
path: path.clone(),
content: None,
mode: None,
})
}
TxnOperation::SetAttr {
path, attr, value, ..
} => {
let original = self.fs.set_attr(path, attr, value)?;
Ok(TxnOperation::SetAttr {
path: path.clone(),
attr: attr.clone(),
value: original.unwrap_or_default(),
original: Some(value.clone()),
})
}
TxnOperation::RemoveAttr { path, attr, .. } => {
let original = self.fs.remove_attr(path, attr)?;
Ok(TxnOperation::SetAttr {
path: path.clone(),
attr: attr.clone(),
value: original.unwrap_or_default(),
original: None,
})
}
TxnOperation::Chmod { path, mode, .. } => {
let original_mode = self.fs.chmod(path, *mode)?;
Ok(TxnOperation::Chmod {
path: path.clone(),
mode: original_mode,
original_mode: Some(*mode),
})
}
TxnOperation::Chown { path, uid, gid, .. } => {
let (original_uid, original_gid) = self.fs.chown(path, *uid, *gid)?;
Ok(TxnOperation::Chown {
path: path.clone(),
uid: original_uid,
gid: original_gid,
original_uid: Some(*uid),
original_gid: Some(*gid),
})
}
}
}
fn rollback_operations(
&mut self,
txn_id: TxnId,
executed_ops: &[(u32, TxnOperation)],
timestamp: u64,
) -> TxnResultType<()> {
for (index, rollback_op) in executed_ops.iter().rev() {
if let Err(e) = self.execute_rollback_op(rollback_op) {
return Err(TxnError::RollbackFailed {
txn_id,
op_index: *index,
message: e.to_string(),
});
}
self.log
.log_rollback(txn_id, *index, rollback_op.clone(), timestamp)?;
}
Ok(())
}
fn execute_rollback_op(&self, op: &TxnOperation) -> TxnResultType<()> {
match op {
TxnOperation::Create {
path,
content,
mode,
} => {
self.fs.create(path, content, *mode)?;
}
TxnOperation::Write {
path,
offset,
content,
..
} => {
self.fs.write(path, *offset, content)?;
}
TxnOperation::Truncate { path, new_size, .. } => {
self.fs.truncate(path, *new_size)?;
}
TxnOperation::Delete { path, .. } => {
self.fs.delete(path)?;
}
TxnOperation::Rename { old_path, new_path } => {
self.fs.rename(old_path, new_path)?;
}
TxnOperation::Mkdir { path, mode } => {
self.fs.mkdir(path, *mode)?;
}
TxnOperation::Rmdir { path, .. } => {
self.fs.rmdir(path)?;
}
TxnOperation::Link { source, target } => {
self.fs.link(source, target)?;
}
TxnOperation::Symlink { path, target } => {
self.fs.symlink(path, target)?;
}
TxnOperation::SetAttr {
path, attr, value, ..
} => {
self.fs.set_attr(path, attr, value)?;
}
TxnOperation::RemoveAttr { path, attr, .. } => {
self.fs.remove_attr(path, attr)?;
}
TxnOperation::Chmod { path, mode, .. } => {
self.fs.chmod(path, *mode)?;
}
TxnOperation::Chown { path, uid, gid, .. } => {
self.fs.chown(path, *uid, *gid)?;
}
}
Ok(())
}
pub fn recover(&mut self, timestamp: u64) -> TxnResultType<RecoveryStats> {
let start = timestamp;
let mut stats = RecoveryStats::new();
let recovery_txns = self.log.get_recovery_txns();
for (txn_id, state, ops) in recovery_txns {
match state {
TxnState::Prepared => {
for op in &ops {
if let Err(_e) = self.execute_operation(op) {
self.log.abort(txn_id, timestamp)?;
stats.txns_rolled_back += 1;
continue;
}
stats.ops_replayed += 1;
}
self.log.commit(txn_id, timestamp)?;
stats.txns_rolled_forward += 1;
}
TxnState::Active | TxnState::RollingBack => {
self.log.abort(txn_id, timestamp)?;
stats.txns_rolled_back += 1;
}
_ => {}
}
stats.txns_recovered += 1;
}
stats.recovery_time_ns = timestamp.saturating_sub(start);
Ok(stats)
}
pub fn active_txn_count(&self) -> usize {
self.transactions.len()
}
pub fn checkpoint(&mut self, timestamp: u64) -> TxnResultType<u64> {
self.log.checkpoint(timestamp)
}
}
#[derive(Debug, Default)]
pub struct MemoryFs {
files: Mutex<BTreeMap<String, (Vec<u8>, u32)>>,
directories: Mutex<BTreeMap<String, u32>>,
attrs: Mutex<BTreeMap<String, BTreeMap<String, Vec<u8>>>>,
owners: Mutex<BTreeMap<String, (u32, u32)>>,
}
impl MemoryFs {
pub fn new() -> Self {
let mut fs = Self::default();
fs.directories.lock().insert("/".to_string(), 0o755);
fs
}
}
impl TxnFilesystem for MemoryFs {
fn create(&self, path: &str, content: &[u8], mode: u32) -> TxnResultType<()> {
let mut files = self.files.lock();
if files.contains_key(path) {
return Err(TxnError::PathExists(path.to_string()));
}
files.insert(path.to_string(), (content.to_vec(), mode));
Ok(())
}
fn write(&self, path: &str, offset: u64, content: &[u8]) -> TxnResultType<Vec<u8>> {
let mut files = self.files.lock();
let (data, _) = files
.get_mut(path)
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))?;
let offset = offset as usize;
let end = offset + content.len();
if end > data.len() {
data.resize(end, 0);
}
let original = data[offset..end].to_vec();
data[offset..end].copy_from_slice(content);
Ok(original)
}
fn read(&self, path: &str, offset: u64, len: u64) -> TxnResultType<Vec<u8>> {
let files = self.files.lock();
let (data, _) = files
.get(path)
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))?;
let offset = offset as usize;
let end = (offset + len as usize).min(data.len());
if offset >= data.len() {
return Ok(Vec::new());
}
Ok(data[offset..end].to_vec())
}
fn truncate(&self, path: &str, new_size: u64) -> TxnResultType<u64> {
let mut files = self.files.lock();
let (data, _) = files
.get_mut(path)
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))?;
let original_size = data.len() as u64;
data.resize(new_size as usize, 0);
Ok(original_size)
}
fn delete(&self, path: &str) -> TxnResultType<(Vec<u8>, u32)> {
let mut files = self.files.lock();
files
.remove(path)
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))
}
fn rename(&self, old_path: &str, new_path: &str) -> TxnResultType<()> {
let mut files = self.files.lock();
let entry = files
.remove(old_path)
.ok_or_else(|| TxnError::PathNotFound(old_path.to_string()))?;
files.insert(new_path.to_string(), entry);
Ok(())
}
fn mkdir(&self, path: &str, mode: u32) -> TxnResultType<()> {
let mut dirs = self.directories.lock();
if dirs.contains_key(path) {
return Err(TxnError::PathExists(path.to_string()));
}
dirs.insert(path.to_string(), mode);
Ok(())
}
fn rmdir(&self, path: &str) -> TxnResultType<u32> {
let mut dirs = self.directories.lock();
dirs.remove(path)
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))
}
fn link(&self, source: &str, target: &str) -> TxnResultType<()> {
let files = self.files.lock();
let (content, mode) = files
.get(source)
.ok_or_else(|| TxnError::PathNotFound(source.to_string()))?;
let content = content.clone();
let mode = *mode;
drop(files);
self.create(target, &content, mode)
}
fn symlink(&self, path: &str, target: &str) -> TxnResultType<()> {
self.create(path, target.as_bytes(), 0o777)
}
fn get_attr(&self, path: &str, attr: &str) -> TxnResultType<Option<Vec<u8>>> {
let attrs = self.attrs.lock();
Ok(attrs.get(path).and_then(|a| a.get(attr).cloned()))
}
fn set_attr(&self, path: &str, attr: &str, value: &[u8]) -> TxnResultType<Option<Vec<u8>>> {
let mut attrs = self.attrs.lock();
let file_attrs = attrs.entry(path.to_string()).or_default();
let original = file_attrs.insert(attr.to_string(), value.to_vec());
Ok(original)
}
fn remove_attr(&self, path: &str, attr: &str) -> TxnResultType<Option<Vec<u8>>> {
let mut attrs = self.attrs.lock();
Ok(attrs.get_mut(path).and_then(|a| a.remove(attr)))
}
fn get_mode(&self, path: &str) -> TxnResultType<u32> {
let files = self.files.lock();
if let Some((_, mode)) = files.get(path) {
return Ok(*mode);
}
drop(files);
let dirs = self.directories.lock();
dirs.get(path)
.copied()
.ok_or_else(|| TxnError::PathNotFound(path.to_string()))
}
fn chmod(&self, path: &str, mode: u32) -> TxnResultType<u32> {
let mut files = self.files.lock();
if let Some((_, m)) = files.get_mut(path) {
let original = *m;
*m = mode;
return Ok(original);
}
drop(files);
let mut dirs = self.directories.lock();
if let Some(m) = dirs.get_mut(path) {
let original = *m;
*m = mode;
return Ok(original);
}
Err(TxnError::PathNotFound(path.to_string()))
}
fn get_owner(&self, path: &str) -> TxnResultType<(u32, u32)> {
let owners = self.owners.lock();
Ok(owners.get(path).copied().unwrap_or((0, 0)))
}
fn chown(&self, path: &str, uid: u32, gid: u32) -> TxnResultType<(u32, u32)> {
let mut owners = self.owners.lock();
let original = owners
.insert(path.to_string(), (uid, gid))
.unwrap_or((0, 0));
Ok(original)
}
fn exists(&self, path: &str) -> bool {
self.files.lock().contains_key(path) || self.directories.lock().contains_key(path)
}
fn is_file(&self, path: &str) -> bool {
self.files.lock().contains_key(path)
}
fn is_directory(&self, path: &str) -> bool {
self.directories.lock().contains_key(path)
}
fn sync(&self) -> TxnResultType<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
fn test_executor() -> TxnExecutor {
let fs = Box::new(MemoryFs::new());
TxnExecutor::new("test/pool", fs, 1000)
}
#[test]
fn test_begin_transaction() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
assert_eq!(exec.active_txn_count(), 1);
assert!(exec.transactions.contains_key(&txn_id));
}
#[test]
fn test_add_operation() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
let op = TxnOperation::Create {
path: "/test.txt".into(),
content: vec![1, 2, 3],
mode: 0o644,
};
let index = exec.add_operation(txn_id, op, 1001).unwrap();
assert_eq!(index, 0);
assert_eq!(exec.transactions.get(&txn_id).unwrap().op_count(), 1);
}
#[test]
fn test_commit_transaction() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/test.txt".into(),
content: vec![1, 2, 3],
mode: 0o644,
},
1001,
)
.unwrap();
let result = exec.commit(txn_id, 1002).unwrap();
assert!(result.is_success());
assert_eq!(result.ops_executed, 1);
assert_eq!(exec.active_txn_count(), 0);
assert!(exec.fs.exists("/test.txt"));
}
#[test]
fn test_abort_transaction() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/test.txt".into(),
content: vec![1, 2, 3],
mode: 0o644,
},
1001,
)
.unwrap();
let result = exec.abort(txn_id, 1002).unwrap();
assert!(!result.is_success());
assert_eq!(exec.active_txn_count(), 0);
assert!(!exec.fs.exists("/test.txt"));
}
#[test]
fn test_multiple_operations() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/a.txt".into(),
content: b"hello".to_vec(),
mode: 0o644,
},
1001,
)
.unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/b.txt".into(),
content: b"world".to_vec(),
mode: 0o644,
},
1002,
)
.unwrap();
exec.add_operation(
txn_id,
TxnOperation::Mkdir {
path: "/dir".into(),
mode: 0o755,
},
1003,
)
.unwrap();
let result = exec.commit(txn_id, 1004).unwrap();
assert!(result.is_success());
assert_eq!(result.ops_executed, 3);
assert!(exec.fs.exists("/a.txt"));
assert!(exec.fs.exists("/b.txt"));
assert!(exec.fs.is_directory("/dir"));
}
#[test]
fn test_rollback_on_failure() {
let mut exec = test_executor();
exec.fs.create("/existing.txt", b"original", 0o644).unwrap();
let txn_id = exec.begin(1000).unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/new.txt".into(),
content: b"new".to_vec(),
mode: 0o644,
},
1001,
)
.unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/existing.txt".into(),
content: b"conflict".to_vec(),
mode: 0o644,
},
1002,
)
.unwrap();
let result = exec.commit(txn_id, 1003);
assert!(result.is_err());
assert!(!exec.fs.exists("/new.txt"));
}
#[test]
fn test_rename_operation() {
let mut exec = test_executor();
let txn_id = exec.begin(1000).unwrap();
exec.add_operation(
txn_id,
TxnOperation::Create {
path: "/old.txt".into(),
content: b"content".to_vec(),
mode: 0o644,
},
1001,
)
.unwrap();
exec.commit(txn_id, 1002).unwrap();
let txn_id2 = exec.begin(1003).unwrap();
exec.add_operation(
txn_id2,
TxnOperation::Rename {
old_path: "/old.txt".into(),
new_path: "/new.txt".into(),
},
1004,
)
.unwrap();
exec.commit(txn_id2, 1005).unwrap();
assert!(!exec.fs.exists("/old.txt"));
assert!(exec.fs.exists("/new.txt"));
}
#[test]
fn test_memory_fs_write() {
let fs = MemoryFs::new();
fs.create("/test.txt", b"hello", 0o644).unwrap();
let original = fs.write("/test.txt", 0, b"HELLO").unwrap();
assert_eq!(original, b"hello");
let content = fs.read("/test.txt", 0, 10).unwrap();
assert_eq!(content, b"HELLO");
}
#[test]
fn test_memory_fs_truncate() {
let fs = MemoryFs::new();
fs.create("/test.txt", b"hello world", 0o644).unwrap();
let original_size = fs.truncate("/test.txt", 5).unwrap();
assert_eq!(original_size, 11);
let content = fs.read("/test.txt", 0, 10).unwrap();
assert_eq!(content, b"hello");
}
#[test]
fn test_memory_fs_attrs() {
let fs = MemoryFs::new();
fs.create("/test.txt", b"content", 0o644).unwrap();
let original = fs.set_attr("/test.txt", "user.test", b"value").unwrap();
assert!(original.is_none());
let value = fs.get_attr("/test.txt", "user.test").unwrap();
assert_eq!(value, Some(b"value".to_vec()));
let removed = fs.remove_attr("/test.txt", "user.test").unwrap();
assert_eq!(removed, Some(b"value".to_vec()));
}
#[test]
fn test_transaction_timeout() {
let txn = Transaction::new(TxnId::new(1), "test", 0);
assert!(!txn.is_timed_out(1_000_000_000));
assert!(txn.is_timed_out(4_000_000_000_000)); }
}