use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use super::executor::{MemoryFs, TxnExecutor, TxnFilesystem};
use super::types::{TxnError, TxnOperation, TxnResult, TxnResultType};
pub struct TxnBuilder {
dataset: String,
operations: Vec<TxnOperation>,
fs: Option<Box<dyn TxnFilesystem>>,
timeout: Option<u64>,
}
impl TxnBuilder {
pub fn new(dataset: &str) -> Self {
Self {
dataset: dataset.to_string(),
operations: Vec::new(),
fs: None,
timeout: None,
}
}
pub fn with_fs(mut self, fs: Box<dyn TxnFilesystem>) -> Self {
self.fs = Some(fs);
self
}
pub fn with_timeout(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
pub fn create(mut self, path: &str, content: &[u8]) -> Self {
self.operations.push(TxnOperation::Create {
path: path.to_string(),
content: content.to_vec(),
mode: 0o644,
});
self
}
pub fn create_with_mode(mut self, path: &str, content: &[u8], mode: u32) -> Self {
self.operations.push(TxnOperation::Create {
path: path.to_string(),
content: content.to_vec(),
mode,
});
self
}
pub fn write(mut self, path: &str, offset: u64, content: &[u8]) -> Self {
self.operations.push(TxnOperation::Write {
path: path.to_string(),
offset,
content: content.to_vec(),
original: None,
});
self
}
pub fn append(mut self, path: &str, content: &[u8]) -> Self {
self.operations.push(TxnOperation::Write {
path: path.to_string(),
offset: u64::MAX,
content: content.to_vec(),
original: None,
});
self
}
pub fn truncate(mut self, path: &str, new_size: u64) -> Self {
self.operations.push(TxnOperation::Truncate {
path: path.to_string(),
new_size,
original_size: None,
});
self
}
pub fn delete(mut self, path: &str) -> Self {
self.operations.push(TxnOperation::Delete {
path: path.to_string(),
content: None,
mode: None,
});
self
}
pub fn rename(mut self, old_path: &str, new_path: &str) -> Self {
self.operations.push(TxnOperation::Rename {
old_path: old_path.to_string(),
new_path: new_path.to_string(),
});
self
}
pub fn mkdir(mut self, path: &str, mode: u32) -> Self {
self.operations.push(TxnOperation::Mkdir {
path: path.to_string(),
mode,
});
self
}
pub fn rmdir(mut self, path: &str) -> Self {
self.operations.push(TxnOperation::Rmdir {
path: path.to_string(),
mode: None,
});
self
}
pub fn link(mut self, source: &str, target: &str) -> Self {
self.operations.push(TxnOperation::Link {
source: source.to_string(),
target: target.to_string(),
});
self
}
pub fn symlink(mut self, path: &str, target: &str) -> Self {
self.operations.push(TxnOperation::Symlink {
path: path.to_string(),
target: target.to_string(),
});
self
}
pub fn set_attr(mut self, path: &str, attr: &str, value: &[u8]) -> Self {
self.operations.push(TxnOperation::SetAttr {
path: path.to_string(),
attr: attr.to_string(),
value: value.to_vec(),
original: None,
});
self
}
pub fn remove_attr(mut self, path: &str, attr: &str) -> Self {
self.operations.push(TxnOperation::RemoveAttr {
path: path.to_string(),
attr: attr.to_string(),
original: None,
});
self
}
pub fn chmod(mut self, path: &str, mode: u32) -> Self {
self.operations.push(TxnOperation::Chmod {
path: path.to_string(),
mode,
original_mode: None,
});
self
}
pub fn chown(mut self, path: &str, uid: u32, gid: u32) -> Self {
self.operations.push(TxnOperation::Chown {
path: path.to_string(),
uid,
gid,
original_uid: None,
original_gid: None,
});
self
}
pub fn operation(mut self, op: TxnOperation) -> Self {
self.operations.push(op);
self
}
pub fn operations(mut self, ops: Vec<TxnOperation>) -> Self {
self.operations.extend(ops);
self
}
pub fn op_count(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
pub fn dataset(&self) -> &str {
&self.dataset
}
pub fn build(self) -> Vec<TxnOperation> {
self.operations
}
pub fn commit(self) -> TxnResultType<TxnResult> {
self.commit_with_timestamp(current_timestamp())
}
pub fn commit_with_timestamp(self, timestamp: u64) -> TxnResultType<TxnResult> {
if self.operations.is_empty() {
return Err(TxnError::Internal("no operations to commit".into()));
}
let fs = self.fs.unwrap_or_else(|| Box::new(MemoryFs::new()));
let mut executor = TxnExecutor::new(&self.dataset, fs, timestamp);
let txn_id = executor.begin(timestamp)?;
for op in self.operations {
executor.add_operation(txn_id, op, timestamp)?;
}
executor.commit(txn_id, timestamp)
}
pub fn commit_with_executor(
self,
executor: &mut TxnExecutor,
timestamp: u64,
) -> TxnResultType<TxnResult> {
if self.operations.is_empty() {
return Err(TxnError::Internal("no operations to commit".into()));
}
let txn_id = executor.begin(timestamp)?;
for op in self.operations {
executor.add_operation(txn_id, op, timestamp)?;
}
executor.commit(txn_id, timestamp)
}
}
pub struct BatchBuilder {
transactions: Vec<TxnBuilder>,
fs: Option<Box<dyn TxnFilesystem>>,
stop_on_error: bool,
}
impl BatchBuilder {
pub fn new() -> Self {
Self {
transactions: Vec::new(),
fs: None,
stop_on_error: true,
}
}
pub fn with_fs(mut self, fs: Box<dyn TxnFilesystem>) -> Self {
self.fs = Some(fs);
self
}
pub fn stop_on_error(mut self, stop: bool) -> Self {
self.stop_on_error = stop;
self
}
pub fn add(mut self, txn: TxnBuilder) -> Self {
self.transactions.push(txn);
self
}
pub fn txn_count(&self) -> usize {
self.transactions.len()
}
pub fn execute(self) -> BatchResult {
self.execute_with_timestamp(current_timestamp())
}
pub fn execute_with_timestamp(self, timestamp: u64) -> BatchResult {
let mut results = Vec::new();
let mut errors = Vec::new();
for (i, txn) in self.transactions.into_iter().enumerate() {
match txn.commit_with_timestamp(timestamp) {
Ok(result) => results.push(result),
Err(e) => {
errors.push((i, e));
if self.stop_on_error {
break;
}
}
}
}
BatchResult {
successful: results,
failed: errors,
}
}
}
impl Default for BatchBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct BatchResult {
pub successful: Vec<TxnResult>,
pub failed: Vec<(usize, TxnError)>,
}
impl BatchResult {
pub fn all_succeeded(&self) -> bool {
self.failed.is_empty()
}
pub fn success_count(&self) -> usize {
self.successful.len()
}
pub fn failure_count(&self) -> usize {
self.failed.len()
}
pub fn total_bytes_written(&self) -> u64 {
self.successful.iter().map(|r| r.bytes_written).sum()
}
pub fn total_ops_executed(&self) -> u32 {
self.successful.iter().map(|r| r.ops_executed).sum()
}
}
fn current_timestamp() -> u64 {
crate::time::now()
}
pub fn quick_create(dataset: &str, path: &str, content: &[u8]) -> TxnResultType<TxnResult> {
TxnBuilder::new(dataset).create(path, content).commit()
}
pub fn quick_delete(dataset: &str, path: &str) -> TxnResultType<TxnResult> {
TxnBuilder::new(dataset).delete(path).commit()
}
pub fn quick_rename(dataset: &str, old_path: &str, new_path: &str) -> TxnResultType<TxnResult> {
TxnBuilder::new(dataset).rename(old_path, new_path).commit()
}
pub fn quick_mkdir(dataset: &str, path: &str, mode: u32) -> TxnResultType<TxnResult> {
TxnBuilder::new(dataset).mkdir(path, mode).commit()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_builder_create() {
let builder = TxnBuilder::new("test/pool")
.create("/a.txt", b"hello")
.create("/b.txt", b"world");
assert_eq!(builder.op_count(), 2);
assert_eq!(builder.dataset(), "test/pool");
}
#[test]
fn test_builder_commit() {
let result = TxnBuilder::new("test/pool")
.with_fs(Box::new(MemoryFs::new()))
.create("/test.txt", b"content")
.commit()
.unwrap();
assert!(result.is_success());
assert_eq!(result.ops_executed, 1);
}
#[test]
fn test_builder_multiple_ops() {
let result = TxnBuilder::new("test/pool")
.with_fs(Box::new(MemoryFs::new()))
.create("/a.txt", b"hello")
.create("/b.txt", b"world")
.mkdir("/dir", 0o755)
.commit()
.unwrap();
assert!(result.is_success());
assert_eq!(result.ops_executed, 3);
}
#[test]
fn test_builder_empty() {
let result = TxnBuilder::new("test/pool")
.with_fs(Box::new(MemoryFs::new()))
.commit();
assert!(result.is_err());
}
#[test]
fn test_builder_chain() {
let builder = TxnBuilder::new("test/pool")
.create("/file.txt", b"content")
.chmod("/file.txt", 0o600)
.chown("/file.txt", 1000, 1000)
.set_attr("/file.txt", "user.tag", b"important");
assert_eq!(builder.op_count(), 4);
}
#[test]
fn test_builder_build() {
let ops = TxnBuilder::new("test/pool")
.create("/a.txt", b"hello")
.delete("/old.txt")
.build();
assert_eq!(ops.len(), 2);
}
#[test]
fn test_batch_builder() {
let batch = BatchBuilder::new()
.add(TxnBuilder::new("pool1").create("/a.txt", b"a"))
.add(TxnBuilder::new("pool2").create("/b.txt", b"b"));
assert_eq!(batch.txn_count(), 2);
}
#[test]
fn test_batch_execute() {
let result = BatchBuilder::new()
.add(
TxnBuilder::new("pool1")
.with_fs(Box::new(MemoryFs::new()))
.create("/a.txt", b"a"),
)
.add(
TxnBuilder::new("pool2")
.with_fs(Box::new(MemoryFs::new()))
.create("/b.txt", b"b"),
)
.execute();
assert!(result.all_succeeded());
assert_eq!(result.success_count(), 2);
}
#[test]
fn test_quick_create() {
let result = quick_create("test/pool", "/quick.txt", b"quick content");
assert!(result.is_ok());
}
#[test]
fn test_builder_with_timeout() {
let builder = TxnBuilder::new("test/pool")
.with_timeout(300)
.create("/test.txt", b"content");
assert_eq!(builder.op_count(), 1);
}
#[test]
fn test_builder_rename() {
let fs = MemoryFs::new();
fs.create("/old.txt", b"content", 0o644).unwrap();
let result = TxnBuilder::new("test/pool")
.with_fs(Box::new(fs))
.rename("/old.txt", "/new.txt")
.commit();
assert!(result.is_ok() || result.is_err()); }
#[test]
fn test_batch_result() {
let result = BatchResult {
successful: vec![
TxnResult {
txn_id: super::super::types::TxnId::new(1),
state: super::super::types::TxnState::Committed,
ops_executed: 5,
started_at: 0,
completed_at: 100,
bytes_written: 1000,
},
TxnResult {
txn_id: super::super::types::TxnId::new(2),
state: super::super::types::TxnState::Committed,
ops_executed: 3,
started_at: 100,
completed_at: 200,
bytes_written: 500,
},
],
failed: vec![],
};
assert!(result.all_succeeded());
assert_eq!(result.total_ops_executed(), 8);
assert_eq!(result.total_bytes_written(), 1500);
}
}