mod lock;
use self::lock::{CommitLock, StateLock};
use super::ScopeFileSystem;
use crate::Result;
#[derive(Debug)]
pub struct Transaction {
root_fs: ScopeFileSystem,
temp_fs: ScopeFileSystem,
}
impl Transaction {
pub async fn new(root_fs: &ScopeFileSystem) -> Result<Self> {
Self::ensure_committed(root_fs).await?;
let root_fs = root_fs.clone();
let temp_fs = root_fs.child_fs(".temp");
temp_fs.ensure_exist().await?;
let state_lock = StateLock::default();
state_lock.save(&temp_fs).await?;
Ok(Self { root_fs, temp_fs })
}
pub async fn ensure_committed(root_fs: &ScopeFileSystem) -> Result<()> {
let temp_fs = root_fs.child_fs(".temp");
let state_lock = match StateLock::load(&temp_fs).await {
Ok(lock) => Some(lock),
Err(e) if e.is_not_found() => None,
Err(e) => return Err(e),
};
if let Some(state_lock) = state_lock
&& state_lock.is_running()
{
panic!("Transaction already in progress by process {state_lock} in directory '{temp_fs}'",);
}
let commit_lock = match CommitLock::load(&temp_fs).await {
Ok(lock) => Some(lock),
Err(e) if e.is_not_found() => None,
Err(e) => return Err(e),
};
if let Some(commit_lock) = commit_lock {
let transaction = Self {
root_fs: root_fs.clone(),
temp_fs: temp_fs.clone(),
};
transaction.execute_commit(&commit_lock).await?;
temp_fs.remove().await?;
}
Ok(())
}
pub fn readable_fs(&self) -> &ScopeFileSystem {
&self.root_fs
}
pub fn writable_fs(&self) -> &ScopeFileSystem {
&self.temp_fs
}
pub async fn commit(
self,
added_relative_path: Vec<String>,
removed_relative_path: Vec<String>,
) -> Result<()> {
match StateLock::load(&self.temp_fs).await {
Ok(state_lock) if state_lock.is_current() => {}
Ok(state_lock) => panic!(
"State lock mismatch in '{}': expected current process (pid={}), found {}. \
This indicates a race condition.",
self.temp_fs,
std::process::id(),
state_lock
),
Err(e) => {
let _ = self.temp_fs.remove().await;
return Err(e);
}
}
let commit_lock = CommitLock::new(added_relative_path, removed_relative_path);
if let Err(e) = commit_lock.save(&self.temp_fs).await {
let _ = self.temp_fs.remove().await;
return Err(e);
}
if let Err(e) = self.execute_commit(&commit_lock).await {
let _ = StateLock::remove(&self.temp_fs).await;
return Err(e);
}
let _ = self.temp_fs.remove().await;
Ok(())
}
async fn execute_commit(&self, commit_lock: &CommitLock) -> Result<()> {
for path in commit_lock.removed_files() {
self.root_fs.remove_file(path).await?;
}
for path in commit_lock.added_files() {
ScopeFileSystem::move_to(&self.temp_fs, &self.root_fs, path).await?;
}
Ok(())
}
pub async fn rollback(self) -> Result<()> {
self.temp_fs.remove().await
}
}
#[cfg(test)]
mod test {
use super::{Result, ScopeFileSystem, Transaction};
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_smoke() -> Result<()> {
let fs = ScopeFileSystem::new_memory_fs("/".into());
fs.ensure_exist().await?;
fs.write("a.txt", "a".as_bytes()).await?;
fs.write("b.txt", "b".as_bytes()).await?;
fs.write("c.txt", "c".as_bytes()).await?;
assert!(fs.stat("/.temp").await.is_err());
let transaction = Transaction::new(&fs).await?;
assert!(fs.stat("/.temp").await.is_ok());
transaction
.writable_fs()
.write("a.txt", "aa".as_bytes())
.await?;
transaction
.writable_fs()
.write("d.txt", "dd".as_bytes())
.await?;
let readable_fs = transaction.readable_fs();
assert_eq!(readable_fs.read("a.txt").await?, "a".as_bytes());
assert_eq!(readable_fs.read("b.txt").await?, "b".as_bytes());
assert_eq!(readable_fs.read("c.txt").await?, "c".as_bytes());
assert!(readable_fs.read("d.txt").await.is_err());
let writable_fs = transaction.writable_fs();
assert_eq!(writable_fs.read("a.txt").await?, "aa".as_bytes());
assert!(writable_fs.read("b.txt").await.is_err());
assert!(writable_fs.read("c.txt").await.is_err());
assert_eq!(writable_fs.read("d.txt").await?, "dd".as_bytes());
transaction
.commit(vec!["a.txt".into(), "d.txt".into()], vec!["b.txt".into()])
.await?;
assert!(fs.stat("/.temp").await.is_err());
assert_eq!(fs.read("a.txt").await?, "aa".as_bytes());
assert_eq!(fs.read("c.txt").await?, "c".as_bytes());
assert_eq!(fs.read("d.txt").await?, "dd".as_bytes());
Ok(())
}
}