use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::error::OpsResult;
use crate::process::{BoxedProc, ProcStatus, Process, STATUS_SHIFT};
pub type LoaderFn = Arc<fn(&[u8]) -> OpsResult<Box<dyn Process + 'static>>>;
pub struct Ops {
procs: Vec<BoxedProc>,
path: PathBuf,
is_revert: bool,
file: File,
file_size: u64,
}
const PREDICTED_SIZE_FOR_ONE_PROC: usize = 512;
impl Ops {
pub fn empty(path: &Path) -> OpsResult<Ops> {
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(path)?;
Ok(Self {
procs: Vec::new(),
path: path.to_path_buf(),
is_revert: true,
file,
file_size: 0,
})
}
pub fn new(path: &Path, ops: Vec<BoxedProc>) -> OpsResult<Self> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
let file_size = file.metadata()?.len();
Ok(Self {
procs: ops,
is_revert: true,
file,
file_size,
path: path.to_path_buf(),
})
}
pub fn add<T: Process + 'static>(&mut self, process: T) -> OpsResult<()> {
let ops_boxed = BoxedProc::new(T::id(), Box::new(process));
self.procs.push(ops_boxed);
Ok(())
}
pub fn set_revert(&mut self, is_revert: bool) {
self.is_revert = is_revert;
}
pub fn processes(&self) -> &[BoxedProc] {
&self.procs
}
pub fn init(mut self) -> OpsResult<Self> {
let mut total = Vec::with_capacity(self.procs.len() * PREDICTED_SIZE_FOR_ONE_PROC);
let mut offset: usize = 0;
for proc in &mut self.procs {
total.extend_from_slice(proc.encode().as_slice());
println!("proc = {}, Offset = {offset}", proc.id());
proc.set_offset(offset);
offset += proc.encode().len();
}
total.shrink_to_fit();
self.file_size = total.len() as u64;
self.file.write_all(&total)?;
Ok(self)
}
pub fn all(mut self) -> OpsResult<()> {
self.prepare()?;
self.run()?;
self.clean()?;
drop(self.file);
std::fs::remove_file(self.path.as_path())?;
Ok(())
}
#[inline]
pub fn revert(&mut self) -> OpsResult<()> {
self.revert_all()
}
pub fn print(&self) {
for process in &self.procs {
process.print();
}
}
#[inline]
fn prepare(&mut self) -> OpsResult<()> {
if let Err(err) = self.try_prepare() {
if self.is_revert {
self.revert_all()?;
}
return Err(err);
}
Ok(())
}
#[inline]
fn run(&mut self) -> OpsResult<()> {
if let Err(err) = self.try_run() {
if self.is_revert {
self.revert_all()?;
}
return Err(err);
}
Ok(())
}
#[inline]
fn clean(&mut self) -> OpsResult<()> {
for process in &self.procs {
process.clean()?;
let length = process.encode().len();
self.file_size -= length as u64;
self.file.set_len(self.file_size)?;
self.file.sync_all()?;
}
Ok(())
}
#[inline]
fn revert_all(&mut self) -> OpsResult<()> {
self.try_revert()
}
#[inline]
fn try_prepare(&mut self) -> OpsResult<()> {
for process in self.procs.iter().rev() {
process.prepare()?;
self.file
.seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
let new_status: u8 = ProcStatus::PREPARED.into();
self.file.write_all(&[new_status])?;
self.file.sync_all()?;
}
Ok(())
}
#[inline]
fn try_run(&mut self) -> OpsResult<()> {
for process in self.procs.iter().rev() {
process.run()?;
self.file
.seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
let new_status: u8 = ProcStatus::RAN.into();
self.file.write_all(&[new_status])?;
self.file.sync_all()?;
}
Ok(())
}
#[inline]
fn try_revert(&mut self) -> OpsResult<()> {
for process in self.procs.iter().rev() {
match process.status() {
ProcStatus::PREPARED => process.revert_prepare()?,
ProcStatus::RAN => process.revert_run()?,
_ => continue,
};
self.file_size -= process.encode().len() as u64;
self.file.set_len(self.file_size)?;
self.file.sync_all()?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use serde::{Deserialize, Serialize};
use serde_flow::encoder::{bincode, FlowEncoder};
use tempfile::{tempdir, TempDir};
use super::*;
use crate::{
error::OpsError,
process::{
constants::{self, TEST_FILE_ID},
file,
},
recover::OpsRecovery,
};
#[derive(Serialize, Deserialize, Default)]
struct TestOp {
pub prepare: PathBuf,
pub run: PathBuf,
pub clean: PathBuf,
#[serde(skip)]
pub bytes: Vec<u8>,
}
impl TestOp {
pub fn new(prepare: PathBuf, run: PathBuf, clean: PathBuf) -> Self {
let mut object = Self {
prepare,
run,
clean,
bytes: vec![1, 2, 3],
};
object.bytes = bincode::Encoder::serialize(&object).unwrap();
object
}
pub fn from_bytes(bytes: &[u8]) -> OpsResult<Self> {
let mut decoded: TestOp =
bincode::Encoder::deserialize(bytes).map_err(|_| OpsError::SerializeFailed)?;
decoded.bytes = bytes.to_vec();
Ok(decoded)
}
}
pub fn test_load(bytes: &[u8]) -> OpsResult<Box<dyn Process>> {
let value = TestOp::from_bytes(bytes)?;
Ok(Box::new(value))
}
pub fn test_loader() -> (u8, LoaderFn) {
(TEST_FILE_ID, Arc::new(test_load))
}
impl Process for TestOp {
fn prepare(&self) -> OpsResult<()> {
if self.prepare.exists() {
return Err(OpsError::Unknown);
}
Ok(())
}
fn run(&self) -> OpsResult<()> {
if self.run.exists() {
return Err(OpsError::Unknown);
}
Ok(())
}
fn clean(&self) -> OpsResult<()> {
if self.clean.exists() {
return Err(OpsError::Unknown);
}
Ok(())
}
fn revert_prepare(&self) -> OpsResult<()> {
Ok(())
}
fn revert_run(&self) -> OpsResult<()> {
Ok(())
}
fn as_bytes(&self) -> &[u8] {
&self.bytes
}
fn id() -> u8
where
Self: Sized,
{
constants::TEST_FILE_ID
}
}
struct TestHelper {
pub tempdir: TempDir,
pub remove_path: PathBuf,
pub copy_from_path: PathBuf,
pub copy_to_path: PathBuf,
pub status: PathBuf,
pub is_prepare_fail: bool,
pub is_run_fail: bool,
pub is_clean_fail: bool,
}
impl Default for TestHelper {
fn default() -> Self {
let tempdir = tempdir().unwrap();
Self {
remove_path: tempdir.path().to_path_buf().join("file_for_remove"),
copy_from_path: tempdir.path().to_path_buf().join("copy_from"),
copy_to_path: tempdir.path().to_path_buf().join("copy_to"),
status: tempdir.path().to_path_buf().join("status"),
is_prepare_fail: false,
is_run_fail: false,
is_clean_fail: false,
tempdir,
}
}
}
impl TestHelper {
pub fn update_fails(&self) {
let prepare_test = self.tempdir.path().join("prepare.fail");
let run_test = self.tempdir.path().join("run.fail");
let clean_test = self.tempdir.path().join("clean.fail");
if prepare_test.exists() {
std::fs::remove_file(&prepare_test).unwrap();
}
if run_test.exists() {
std::fs::remove_file(&run_test).unwrap();
}
if clean_test.exists() {
std::fs::remove_file(&clean_test).unwrap();
}
if self.is_prepare_fail {
std::fs::write(prepare_test, vec![1]).unwrap();
return;
}
if self.is_run_fail {
std::fs::write(run_test, vec![1]).unwrap();
return;
}
if self.is_clean_fail {
std::fs::write(clean_test, vec![1]).unwrap();
}
}
fn test_ops(&self) -> TestOp {
let prepare = self.tempdir.path().join("prepare.fail");
let run = self.tempdir.path().join("run.fail");
let clean = self.tempdir.path().join("clean.fail");
TestOp::new(prepare, run, clean)
}
fn remove_path(&self) -> &Path {
self.remove_path.as_path()
}
fn copy_from_path(&self) -> &Path {
self.copy_from_path.as_path()
}
fn copy_to_path(&self) -> &Path {
self.copy_to_path.as_path()
}
fn status(&self) -> &Path {
self.status.as_path()
}
}
#[test]
fn test_init() {
let th = TestHelper::default();
std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
let mut ops = Ops::empty(th.status()).unwrap();
ops.add(file::remove::Op::new(th.remove_path())).unwrap();
ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
.unwrap();
ops.init().unwrap();
}
#[test]
fn test_all() {
let th = TestHelper::default();
std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
let mut ops = Ops::empty(th.status()).unwrap();
ops.add(file::remove::Op::new(th.remove_path())).unwrap();
ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
.unwrap();
ops.init().unwrap().all().unwrap();
assert!(!th.status().exists());
assert!(!th.remove_path().exists());
assert!(th.copy_to_path().exists());
assert!(th.copy_from_path().exists());
}
#[test]
fn test_all_failed_prepared() {
let th = TestHelper {
is_prepare_fail: true,
..Default::default()
};
th.update_fails();
std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
let mut ops = Ops::empty(th.status()).unwrap();
ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
.unwrap();
ops.add(th.test_ops()).unwrap();
ops.add(file::remove::Op::new(th.remove_path())).unwrap();
let result = ops.init().unwrap().all();
assert!(result.is_err());
assert!(th.status().exists());
assert!(th.remove_path().exists());
assert!(!th.copy_to_path().exists());
assert!(th.copy_from_path().exists());
}
#[test]
fn test_recover_after_failed_in_clean() {
let th = TestHelper {
is_clean_fail: true,
..Default::default()
};
th.update_fails();
std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
let mut ops = Ops::empty(th.status()).unwrap();
ops.set_revert(false);
ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
.unwrap();
ops.add(th.test_ops()).unwrap();
ops.add(file::remove::Op::new(th.remove_path())).unwrap();
let ops_result = ops.init().unwrap().all();
assert!(ops_result.is_err());
let mut recover = OpsRecovery::default();
recover.add_loaders(vec![
file::copy::loader(),
file::remove::loader(),
test_loader(),
]);
let revert_result = recover.recover(th.status());
assert!(revert_result.is_ok());
assert!(th.status().exists());
assert!(th.remove_path().exists());
}
#[test]
fn test_all_failed_in_run() {
let th = TestHelper {
is_run_fail: true,
..Default::default()
};
th.update_fails();
std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
let mut ops = Ops::empty(th.status()).unwrap();
ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
.unwrap();
ops.add(th.test_ops()).unwrap();
ops.add(file::remove::Op::new(th.remove_path())).unwrap();
let result = ops.init().unwrap().all();
assert!(result.is_err());
assert!(th.status().exists());
assert!(th.remove_path().exists());
assert!(!th.copy_to_path().exists());
assert!(th.copy_from_path().exists());
}
#[test]
fn test_recovered_failed_copy() {
let temp_dir = tempdir().unwrap();
let root_dir = temp_dir.path().to_path_buf();
let pb_status = root_dir.clone().join("status");
let prepare_test = root_dir.clone().join("prepare");
let run_test = root_dir.clone().join("run");
let clean_test = root_dir.clone().join("clean");
std::fs::write(run_test.as_path(), vec![1, 2, 3]).unwrap();
let pb_remove = root_dir.clone().join("to_remove");
let pb_copy_from = root_dir.clone().join("from_file.txt");
let pb_copy_to = root_dir.clone().join("to_file.txt");
std::fs::write(pb_remove.as_path(), vec![1, 2, 3]).unwrap();
std::fs::write(pb_copy_from.as_path(), vec![1, 2, 3]).unwrap();
let path = pb_status.as_path();
let remove_path = pb_remove.as_path();
let mut ops = Ops::empty(path).unwrap();
ops.set_revert(false);
ops.add(file::copy::Op::new(pb_copy_from.as_path(), pb_copy_to.as_path()).unwrap())
.unwrap();
ops.add(TestOp::new(
prepare_test.clone(),
run_test.clone(),
clean_test.clone(),
))
.unwrap();
ops.add(file::remove::Op::new(remove_path)).unwrap();
let result = ops.init().unwrap().all();
assert!(result.is_err());
assert!(pb_status.exists());
assert!(pb_remove.exists());
std::fs::remove_file(run_test.as_path()).unwrap();
let mut recover = OpsRecovery::default();
recover.add_loaders(vec![
file::copy::loader(),
file::remove::loader(),
test_loader(),
]);
let ops = recover.recover(pb_status.as_path()).unwrap();
ops.all().unwrap();
assert!(!pb_status.exists());
assert!(!pb_remove.exists());
assert!(pb_copy_to.exists());
assert!(pb_copy_from.exists());
}
}