atomic_ops/
operation.rs

1use std::fs::{File, OpenOptions};
2use std::io::{Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use crate::error::OpsResult;
7use crate::process::{BoxedProc, ProcStatus, Process, STATUS_SHIFT};
8
9pub type LoaderFn = Arc<fn(&[u8]) -> OpsResult<Box<dyn Process + 'static>>>;
10
11pub struct Ops {
12    procs: Vec<BoxedProc>,
13    path: PathBuf,
14    is_revert: bool,
15    file: File,
16    file_size: u64,
17}
18
19const PREDICTED_SIZE_FOR_ONE_PROC: usize = 512;
20impl Ops {
21    pub fn empty(path: &Path) -> OpsResult<Ops> {
22        let file = OpenOptions::new()
23            .create(true)
24            .read(true)
25            .write(true)
26            .open(path)?;
27        Ok(Self {
28            procs: Vec::new(),
29            path: path.to_path_buf(),
30            is_revert: true,
31            file,
32            file_size: 0,
33        })
34    }
35
36    pub fn new(path: &Path, ops: Vec<BoxedProc>) -> OpsResult<Self> {
37        let file = OpenOptions::new().read(true).write(true).open(path)?;
38        let file_size = file.metadata()?.len();
39        Ok(Self {
40            procs: ops,
41            is_revert: true,
42            file,
43            file_size,
44            path: path.to_path_buf(),
45        })
46    }
47
48    pub fn add<T: Process + 'static>(&mut self, process: T) -> OpsResult<()> {
49        let ops_boxed = BoxedProc::new(T::id(), Box::new(process));
50        self.procs.push(ops_boxed);
51        Ok(())
52    }
53
54    pub fn set_revert(&mut self, is_revert: bool) {
55        self.is_revert = is_revert;
56    }
57
58    pub fn processes(&self) -> &[BoxedProc] {
59        &self.procs
60    }
61
62    /// Init serializes all processes into a file
63    pub fn init(mut self) -> OpsResult<Self> {
64        let mut total = Vec::with_capacity(self.procs.len() * PREDICTED_SIZE_FOR_ONE_PROC);
65        let mut offset: usize = 0;
66        for proc in &mut self.procs {
67            total.extend_from_slice(proc.encode().as_slice());
68            println!("proc = {}, Offset = {offset}", proc.id());
69            proc.set_offset(offset);
70            offset += proc.encode().len();
71        }
72        total.shrink_to_fit();
73        self.file_size = total.len() as u64;
74
75        // write all serialized processes to the file
76        self.file.write_all(&total)?;
77        Ok(self)
78    }
79
80    pub fn all(mut self) -> OpsResult<()> {
81        self.prepare()?;
82        self.run()?;
83        self.clean()?;
84
85        drop(self.file);
86        std::fs::remove_file(self.path.as_path())?;
87        Ok(())
88    }
89
90    #[inline]
91    pub fn revert(&mut self) -> OpsResult<()> {
92        self.revert_all()
93    }
94
95    pub fn print(&self) {
96        for process in &self.procs {
97            process.print();
98        }
99    }
100
101    #[inline]
102    fn prepare(&mut self) -> OpsResult<()> {
103        if let Err(err) = self.try_prepare() {
104            if self.is_revert {
105                self.revert_all()?;
106            }
107            return Err(err);
108        }
109        Ok(())
110    }
111
112    #[inline]
113    fn run(&mut self) -> OpsResult<()> {
114        if let Err(err) = self.try_run() {
115            if self.is_revert {
116                self.revert_all()?;
117            }
118            return Err(err);
119        }
120        Ok(())
121    }
122
123    #[inline]
124    fn clean(&mut self) -> OpsResult<()> {
125        for process in &self.procs {
126            process.clean()?;
127            let length = process.encode().len();
128            self.file_size -= length as u64;
129            self.file.set_len(self.file_size)?;
130            self.file.sync_all()?;
131        }
132        Ok(())
133    }
134
135    #[inline]
136    fn revert_all(&mut self) -> OpsResult<()> {
137        self.try_revert()
138    }
139
140    //--------------------------------------------------------
141    //
142    #[inline]
143    fn try_prepare(&mut self) -> OpsResult<()> {
144        for process in self.procs.iter().rev() {
145            process.prepare()?;
146            self.file
147                .seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
148            let new_status: u8 = ProcStatus::PREPARED.into();
149            self.file.write_all(&[new_status])?;
150            self.file.sync_all()?;
151        }
152        Ok(())
153    }
154
155    #[inline]
156    fn try_run(&mut self) -> OpsResult<()> {
157        for process in self.procs.iter().rev() {
158            process.run()?;
159            self.file
160                .seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
161            let new_status: u8 = ProcStatus::RAN.into();
162            self.file.write_all(&[new_status])?;
163            self.file.sync_all()?;
164        }
165        Ok(())
166    }
167
168    #[inline]
169    fn try_revert(&mut self) -> OpsResult<()> {
170        for process in self.procs.iter().rev() {
171            match process.status() {
172                ProcStatus::PREPARED => process.revert_prepare()?,
173                ProcStatus::RAN => process.revert_run()?,
174                _ => continue,
175            };
176
177            self.file_size -= process.encode().len() as u64;
178            self.file.set_len(self.file_size)?;
179            self.file.sync_all()?;
180        }
181        Ok(())
182    }
183}
184
185#[cfg(test)]
186mod test {
187    use serde::{Deserialize, Serialize};
188    use serde_flow::encoder::{bincode, FlowEncoder};
189    use tempfile::{tempdir, TempDir};
190
191    use super::*;
192    use crate::{
193        error::OpsError,
194        process::{
195            constants::{self, TEST_FILE_ID},
196            file,
197        },
198        recover::OpsRecovery,
199    };
200
201    #[derive(Serialize, Deserialize, Default)]
202    struct TestOp {
203        pub prepare: PathBuf,
204        pub run: PathBuf,
205        pub clean: PathBuf,
206
207        #[serde(skip)]
208        pub bytes: Vec<u8>,
209    }
210
211    impl TestOp {
212        pub fn new(prepare: PathBuf, run: PathBuf, clean: PathBuf) -> Self {
213            let mut object = Self {
214                prepare,
215                run,
216                clean,
217                bytes: vec![1, 2, 3],
218            };
219
220            object.bytes = bincode::Encoder::serialize(&object).unwrap();
221            object
222        }
223
224        pub fn from_bytes(bytes: &[u8]) -> OpsResult<Self> {
225            let mut decoded: TestOp =
226                bincode::Encoder::deserialize(bytes).map_err(|_| OpsError::SerializeFailed)?;
227            decoded.bytes = bytes.to_vec();
228            Ok(decoded)
229        }
230    }
231
232    pub fn test_load(bytes: &[u8]) -> OpsResult<Box<dyn Process>> {
233        let value = TestOp::from_bytes(bytes)?;
234        Ok(Box::new(value))
235    }
236
237    pub fn test_loader() -> (u8, LoaderFn) {
238        (TEST_FILE_ID, Arc::new(test_load))
239    }
240
241    impl Process for TestOp {
242        fn prepare(&self) -> OpsResult<()> {
243            if self.prepare.exists() {
244                return Err(OpsError::Unknown);
245            }
246            Ok(())
247        }
248
249        fn run(&self) -> OpsResult<()> {
250            if self.run.exists() {
251                return Err(OpsError::Unknown);
252            }
253            Ok(())
254        }
255
256        fn clean(&self) -> OpsResult<()> {
257            if self.clean.exists() {
258                return Err(OpsError::Unknown);
259            }
260            Ok(())
261        }
262
263        fn revert_prepare(&self) -> OpsResult<()> {
264            Ok(())
265        }
266
267        fn revert_run(&self) -> OpsResult<()> {
268            Ok(())
269        }
270
271        fn as_bytes(&self) -> &[u8] {
272            &self.bytes
273        }
274
275        fn id() -> u8
276        where
277            Self: Sized,
278        {
279            constants::TEST_FILE_ID
280        }
281    }
282
283    struct TestHelper {
284        pub tempdir: TempDir,
285        pub remove_path: PathBuf,
286        pub copy_from_path: PathBuf,
287        pub copy_to_path: PathBuf,
288        pub status: PathBuf,
289
290        pub is_prepare_fail: bool,
291        pub is_run_fail: bool,
292        pub is_clean_fail: bool,
293    }
294    impl Default for TestHelper {
295        fn default() -> Self {
296            let tempdir = tempdir().unwrap();
297            Self {
298                remove_path: tempdir.path().to_path_buf().join("file_for_remove"),
299                copy_from_path: tempdir.path().to_path_buf().join("copy_from"),
300                copy_to_path: tempdir.path().to_path_buf().join("copy_to"),
301                status: tempdir.path().to_path_buf().join("status"),
302                is_prepare_fail: false,
303                is_run_fail: false,
304                is_clean_fail: false,
305                tempdir,
306            }
307        }
308    }
309    impl TestHelper {
310        pub fn update_fails(&self) {
311            let prepare_test = self.tempdir.path().join("prepare.fail");
312            let run_test = self.tempdir.path().join("run.fail");
313            let clean_test = self.tempdir.path().join("clean.fail");
314
315            if prepare_test.exists() {
316                std::fs::remove_file(&prepare_test).unwrap();
317            }
318            if run_test.exists() {
319                std::fs::remove_file(&run_test).unwrap();
320            }
321            if clean_test.exists() {
322                std::fs::remove_file(&clean_test).unwrap();
323            }
324
325            if self.is_prepare_fail {
326                std::fs::write(prepare_test, vec![1]).unwrap();
327                return;
328            }
329
330            if self.is_run_fail {
331                std::fs::write(run_test, vec![1]).unwrap();
332                return;
333            }
334
335            if self.is_clean_fail {
336                std::fs::write(clean_test, vec![1]).unwrap();
337            }
338        }
339
340        fn test_ops(&self) -> TestOp {
341            let prepare = self.tempdir.path().join("prepare.fail");
342            let run = self.tempdir.path().join("run.fail");
343            let clean = self.tempdir.path().join("clean.fail");
344            TestOp::new(prepare, run, clean)
345        }
346
347        fn remove_path(&self) -> &Path {
348            self.remove_path.as_path()
349        }
350
351        fn copy_from_path(&self) -> &Path {
352            self.copy_from_path.as_path()
353        }
354
355        fn copy_to_path(&self) -> &Path {
356            self.copy_to_path.as_path()
357        }
358
359        fn status(&self) -> &Path {
360            self.status.as_path()
361        }
362    }
363
364    #[test]
365    fn test_init() {
366        let th = TestHelper::default();
367        std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
368        std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
369
370        let mut ops = Ops::empty(th.status()).unwrap();
371
372        ops.add(file::remove::Op::new(th.remove_path())).unwrap();
373        ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
374            .unwrap();
375
376        ops.init().unwrap();
377    }
378
379    #[test]
380    fn test_all() {
381        let th = TestHelper::default();
382
383        std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
384        std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
385
386        let mut ops = Ops::empty(th.status()).unwrap();
387
388        ops.add(file::remove::Op::new(th.remove_path())).unwrap();
389        ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
390            .unwrap();
391
392        ops.init().unwrap().all().unwrap();
393        assert!(!th.status().exists());
394        assert!(!th.remove_path().exists());
395        assert!(th.copy_to_path().exists());
396        assert!(th.copy_from_path().exists());
397    }
398
399    #[test]
400    fn test_all_failed_prepared() {
401        let th = TestHelper {
402            is_prepare_fail: true,
403            ..Default::default()
404        };
405        th.update_fails();
406
407        std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
408        std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
409
410        let mut ops = Ops::empty(th.status()).unwrap();
411
412        ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
413            .unwrap();
414        ops.add(th.test_ops()).unwrap();
415        ops.add(file::remove::Op::new(th.remove_path())).unwrap();
416
417        let result = ops.init().unwrap().all();
418        assert!(result.is_err());
419        assert!(th.status().exists());
420        assert!(th.remove_path().exists());
421        // not ready yet
422        assert!(!th.copy_to_path().exists());
423        assert!(th.copy_from_path().exists());
424    }
425
426    #[test]
427    fn test_recover_after_failed_in_clean() {
428        let th = TestHelper {
429            is_clean_fail: true,
430            ..Default::default()
431        };
432        th.update_fails();
433
434        std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
435        std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
436
437        let mut ops = Ops::empty(th.status()).unwrap();
438        ops.set_revert(false);
439
440        ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
441            .unwrap();
442        ops.add(th.test_ops()).unwrap();
443        ops.add(file::remove::Op::new(th.remove_path())).unwrap();
444
445        let ops_result = ops.init().unwrap().all();
446        assert!(ops_result.is_err());
447
448        let mut recover = OpsRecovery::default();
449        recover.add_loaders(vec![
450            file::copy::loader(),
451            file::remove::loader(),
452            test_loader(),
453        ]);
454        let revert_result = recover.recover(th.status());
455        assert!(revert_result.is_ok());
456        assert!(th.status().exists());
457        assert!(th.remove_path().exists());
458    }
459
460    #[test]
461    fn test_all_failed_in_run() {
462        let th = TestHelper {
463            is_run_fail: true,
464            ..Default::default()
465        };
466        th.update_fails();
467
468        std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
469        std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
470
471        let mut ops = Ops::empty(th.status()).unwrap();
472
473        ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
474            .unwrap();
475        ops.add(th.test_ops()).unwrap();
476        ops.add(file::remove::Op::new(th.remove_path())).unwrap();
477
478        let result = ops.init().unwrap().all();
479        assert!(result.is_err());
480        assert!(th.status().exists());
481        assert!(th.remove_path().exists());
482        // not ready yet
483        assert!(!th.copy_to_path().exists());
484        assert!(th.copy_from_path().exists());
485    }
486
487    #[test]
488    fn test_recovered_failed_copy() {
489        let temp_dir = tempdir().unwrap();
490        let root_dir = temp_dir.path().to_path_buf();
491        let pb_status = root_dir.clone().join("status");
492
493        let prepare_test = root_dir.clone().join("prepare");
494        let run_test = root_dir.clone().join("run");
495        let clean_test = root_dir.clone().join("clean");
496
497        std::fs::write(run_test.as_path(), vec![1, 2, 3]).unwrap();
498
499        let pb_remove = root_dir.clone().join("to_remove");
500        let pb_copy_from = root_dir.clone().join("from_file.txt");
501        let pb_copy_to = root_dir.clone().join("to_file.txt");
502
503        std::fs::write(pb_remove.as_path(), vec![1, 2, 3]).unwrap();
504        std::fs::write(pb_copy_from.as_path(), vec![1, 2, 3]).unwrap();
505
506        let path = pb_status.as_path();
507        let remove_path = pb_remove.as_path();
508
509        let mut ops = Ops::empty(path).unwrap();
510        ops.set_revert(false);
511
512        ops.add(file::copy::Op::new(pb_copy_from.as_path(), pb_copy_to.as_path()).unwrap())
513            .unwrap();
514        ops.add(TestOp::new(
515            prepare_test.clone(),
516            run_test.clone(),
517            clean_test.clone(),
518        ))
519        .unwrap();
520        ops.add(file::remove::Op::new(remove_path)).unwrap();
521
522        // Must have an error
523        let result = ops.init().unwrap().all();
524        assert!(result.is_err());
525        assert!(pb_status.exists());
526        assert!(pb_remove.exists());
527
528        // Remove error
529        std::fs::remove_file(run_test.as_path()).unwrap();
530
531        // Recover
532        let mut recover = OpsRecovery::default();
533        recover.add_loaders(vec![
534            file::copy::loader(),
535            file::remove::loader(),
536            test_loader(),
537        ]);
538        let ops = recover.recover(pb_status.as_path()).unwrap();
539        ops.all().unwrap();
540
541        // Test
542        assert!(!pb_status.exists());
543        assert!(!pb_remove.exists());
544        assert!(pb_copy_to.exists());
545        assert!(pb_copy_from.exists());
546    }
547}