1use std::fs::{File, OpenOptions};
2use std::io::{Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use crate::error::OpsResult;
7use crate::process::{BoxedProc, ProcStatus, Process, STATUS_SHIFT};
8
9pub type LoaderFn = Arc<fn(&[u8]) -> OpsResult<Box<dyn Process + 'static>>>;
10
11pub struct Ops {
12 procs: Vec<BoxedProc>,
13 path: PathBuf,
14 is_revert: bool,
15 file: File,
16 file_size: u64,
17}
18
19const PREDICTED_SIZE_FOR_ONE_PROC: usize = 512;
20impl Ops {
21 pub fn empty(path: &Path) -> OpsResult<Ops> {
22 let file = OpenOptions::new()
23 .create(true)
24 .read(true)
25 .write(true)
26 .open(path)?;
27 Ok(Self {
28 procs: Vec::new(),
29 path: path.to_path_buf(),
30 is_revert: true,
31 file,
32 file_size: 0,
33 })
34 }
35
36 pub fn new(path: &Path, ops: Vec<BoxedProc>) -> OpsResult<Self> {
37 let file = OpenOptions::new().read(true).write(true).open(path)?;
38 let file_size = file.metadata()?.len();
39 Ok(Self {
40 procs: ops,
41 is_revert: true,
42 file,
43 file_size,
44 path: path.to_path_buf(),
45 })
46 }
47
48 pub fn add<T: Process + 'static>(&mut self, process: T) -> OpsResult<()> {
49 let ops_boxed = BoxedProc::new(T::id(), Box::new(process));
50 self.procs.push(ops_boxed);
51 Ok(())
52 }
53
54 pub fn set_revert(&mut self, is_revert: bool) {
55 self.is_revert = is_revert;
56 }
57
58 pub fn processes(&self) -> &[BoxedProc] {
59 &self.procs
60 }
61
62 pub fn init(mut self) -> OpsResult<Self> {
64 let mut total = Vec::with_capacity(self.procs.len() * PREDICTED_SIZE_FOR_ONE_PROC);
65 let mut offset: usize = 0;
66 for proc in &mut self.procs {
67 total.extend_from_slice(proc.encode().as_slice());
68 println!("proc = {}, Offset = {offset}", proc.id());
69 proc.set_offset(offset);
70 offset += proc.encode().len();
71 }
72 total.shrink_to_fit();
73 self.file_size = total.len() as u64;
74
75 self.file.write_all(&total)?;
77 Ok(self)
78 }
79
80 pub fn all(mut self) -> OpsResult<()> {
81 self.prepare()?;
82 self.run()?;
83 self.clean()?;
84
85 drop(self.file);
86 std::fs::remove_file(self.path.as_path())?;
87 Ok(())
88 }
89
90 #[inline]
91 pub fn revert(&mut self) -> OpsResult<()> {
92 self.revert_all()
93 }
94
95 pub fn print(&self) {
96 for process in &self.procs {
97 process.print();
98 }
99 }
100
101 #[inline]
102 fn prepare(&mut self) -> OpsResult<()> {
103 if let Err(err) = self.try_prepare() {
104 if self.is_revert {
105 self.revert_all()?;
106 }
107 return Err(err);
108 }
109 Ok(())
110 }
111
112 #[inline]
113 fn run(&mut self) -> OpsResult<()> {
114 if let Err(err) = self.try_run() {
115 if self.is_revert {
116 self.revert_all()?;
117 }
118 return Err(err);
119 }
120 Ok(())
121 }
122
123 #[inline]
124 fn clean(&mut self) -> OpsResult<()> {
125 for process in &self.procs {
126 process.clean()?;
127 let length = process.encode().len();
128 self.file_size -= length as u64;
129 self.file.set_len(self.file_size)?;
130 self.file.sync_all()?;
131 }
132 Ok(())
133 }
134
135 #[inline]
136 fn revert_all(&mut self) -> OpsResult<()> {
137 self.try_revert()
138 }
139
140 #[inline]
143 fn try_prepare(&mut self) -> OpsResult<()> {
144 for process in self.procs.iter().rev() {
145 process.prepare()?;
146 self.file
147 .seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
148 let new_status: u8 = ProcStatus::PREPARED.into();
149 self.file.write_all(&[new_status])?;
150 self.file.sync_all()?;
151 }
152 Ok(())
153 }
154
155 #[inline]
156 fn try_run(&mut self) -> OpsResult<()> {
157 for process in self.procs.iter().rev() {
158 process.run()?;
159 self.file
160 .seek(SeekFrom::Start((process.offcet() + STATUS_SHIFT) as u64))?;
161 let new_status: u8 = ProcStatus::RAN.into();
162 self.file.write_all(&[new_status])?;
163 self.file.sync_all()?;
164 }
165 Ok(())
166 }
167
168 #[inline]
169 fn try_revert(&mut self) -> OpsResult<()> {
170 for process in self.procs.iter().rev() {
171 match process.status() {
172 ProcStatus::PREPARED => process.revert_prepare()?,
173 ProcStatus::RAN => process.revert_run()?,
174 _ => continue,
175 };
176
177 self.file_size -= process.encode().len() as u64;
178 self.file.set_len(self.file_size)?;
179 self.file.sync_all()?;
180 }
181 Ok(())
182 }
183}
184
185#[cfg(test)]
186mod test {
187 use serde::{Deserialize, Serialize};
188 use serde_flow::encoder::{bincode, FlowEncoder};
189 use tempfile::{tempdir, TempDir};
190
191 use super::*;
192 use crate::{
193 error::OpsError,
194 process::{
195 constants::{self, TEST_FILE_ID},
196 file,
197 },
198 recover::OpsRecovery,
199 };
200
201 #[derive(Serialize, Deserialize, Default)]
202 struct TestOp {
203 pub prepare: PathBuf,
204 pub run: PathBuf,
205 pub clean: PathBuf,
206
207 #[serde(skip)]
208 pub bytes: Vec<u8>,
209 }
210
211 impl TestOp {
212 pub fn new(prepare: PathBuf, run: PathBuf, clean: PathBuf) -> Self {
213 let mut object = Self {
214 prepare,
215 run,
216 clean,
217 bytes: vec![1, 2, 3],
218 };
219
220 object.bytes = bincode::Encoder::serialize(&object).unwrap();
221 object
222 }
223
224 pub fn from_bytes(bytes: &[u8]) -> OpsResult<Self> {
225 let mut decoded: TestOp =
226 bincode::Encoder::deserialize(bytes).map_err(|_| OpsError::SerializeFailed)?;
227 decoded.bytes = bytes.to_vec();
228 Ok(decoded)
229 }
230 }
231
232 pub fn test_load(bytes: &[u8]) -> OpsResult<Box<dyn Process>> {
233 let value = TestOp::from_bytes(bytes)?;
234 Ok(Box::new(value))
235 }
236
237 pub fn test_loader() -> (u8, LoaderFn) {
238 (TEST_FILE_ID, Arc::new(test_load))
239 }
240
241 impl Process for TestOp {
242 fn prepare(&self) -> OpsResult<()> {
243 if self.prepare.exists() {
244 return Err(OpsError::Unknown);
245 }
246 Ok(())
247 }
248
249 fn run(&self) -> OpsResult<()> {
250 if self.run.exists() {
251 return Err(OpsError::Unknown);
252 }
253 Ok(())
254 }
255
256 fn clean(&self) -> OpsResult<()> {
257 if self.clean.exists() {
258 return Err(OpsError::Unknown);
259 }
260 Ok(())
261 }
262
263 fn revert_prepare(&self) -> OpsResult<()> {
264 Ok(())
265 }
266
267 fn revert_run(&self) -> OpsResult<()> {
268 Ok(())
269 }
270
271 fn as_bytes(&self) -> &[u8] {
272 &self.bytes
273 }
274
275 fn id() -> u8
276 where
277 Self: Sized,
278 {
279 constants::TEST_FILE_ID
280 }
281 }
282
283 struct TestHelper {
284 pub tempdir: TempDir,
285 pub remove_path: PathBuf,
286 pub copy_from_path: PathBuf,
287 pub copy_to_path: PathBuf,
288 pub status: PathBuf,
289
290 pub is_prepare_fail: bool,
291 pub is_run_fail: bool,
292 pub is_clean_fail: bool,
293 }
294 impl Default for TestHelper {
295 fn default() -> Self {
296 let tempdir = tempdir().unwrap();
297 Self {
298 remove_path: tempdir.path().to_path_buf().join("file_for_remove"),
299 copy_from_path: tempdir.path().to_path_buf().join("copy_from"),
300 copy_to_path: tempdir.path().to_path_buf().join("copy_to"),
301 status: tempdir.path().to_path_buf().join("status"),
302 is_prepare_fail: false,
303 is_run_fail: false,
304 is_clean_fail: false,
305 tempdir,
306 }
307 }
308 }
309 impl TestHelper {
310 pub fn update_fails(&self) {
311 let prepare_test = self.tempdir.path().join("prepare.fail");
312 let run_test = self.tempdir.path().join("run.fail");
313 let clean_test = self.tempdir.path().join("clean.fail");
314
315 if prepare_test.exists() {
316 std::fs::remove_file(&prepare_test).unwrap();
317 }
318 if run_test.exists() {
319 std::fs::remove_file(&run_test).unwrap();
320 }
321 if clean_test.exists() {
322 std::fs::remove_file(&clean_test).unwrap();
323 }
324
325 if self.is_prepare_fail {
326 std::fs::write(prepare_test, vec![1]).unwrap();
327 return;
328 }
329
330 if self.is_run_fail {
331 std::fs::write(run_test, vec![1]).unwrap();
332 return;
333 }
334
335 if self.is_clean_fail {
336 std::fs::write(clean_test, vec![1]).unwrap();
337 }
338 }
339
340 fn test_ops(&self) -> TestOp {
341 let prepare = self.tempdir.path().join("prepare.fail");
342 let run = self.tempdir.path().join("run.fail");
343 let clean = self.tempdir.path().join("clean.fail");
344 TestOp::new(prepare, run, clean)
345 }
346
347 fn remove_path(&self) -> &Path {
348 self.remove_path.as_path()
349 }
350
351 fn copy_from_path(&self) -> &Path {
352 self.copy_from_path.as_path()
353 }
354
355 fn copy_to_path(&self) -> &Path {
356 self.copy_to_path.as_path()
357 }
358
359 fn status(&self) -> &Path {
360 self.status.as_path()
361 }
362 }
363
364 #[test]
365 fn test_init() {
366 let th = TestHelper::default();
367 std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
368 std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
369
370 let mut ops = Ops::empty(th.status()).unwrap();
371
372 ops.add(file::remove::Op::new(th.remove_path())).unwrap();
373 ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
374 .unwrap();
375
376 ops.init().unwrap();
377 }
378
379 #[test]
380 fn test_all() {
381 let th = TestHelper::default();
382
383 std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
384 std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
385
386 let mut ops = Ops::empty(th.status()).unwrap();
387
388 ops.add(file::remove::Op::new(th.remove_path())).unwrap();
389 ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
390 .unwrap();
391
392 ops.init().unwrap().all().unwrap();
393 assert!(!th.status().exists());
394 assert!(!th.remove_path().exists());
395 assert!(th.copy_to_path().exists());
396 assert!(th.copy_from_path().exists());
397 }
398
399 #[test]
400 fn test_all_failed_prepared() {
401 let th = TestHelper {
402 is_prepare_fail: true,
403 ..Default::default()
404 };
405 th.update_fails();
406
407 std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
408 std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
409
410 let mut ops = Ops::empty(th.status()).unwrap();
411
412 ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
413 .unwrap();
414 ops.add(th.test_ops()).unwrap();
415 ops.add(file::remove::Op::new(th.remove_path())).unwrap();
416
417 let result = ops.init().unwrap().all();
418 assert!(result.is_err());
419 assert!(th.status().exists());
420 assert!(th.remove_path().exists());
421 assert!(!th.copy_to_path().exists());
423 assert!(th.copy_from_path().exists());
424 }
425
426 #[test]
427 fn test_recover_after_failed_in_clean() {
428 let th = TestHelper {
429 is_clean_fail: true,
430 ..Default::default()
431 };
432 th.update_fails();
433
434 std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
435 std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
436
437 let mut ops = Ops::empty(th.status()).unwrap();
438 ops.set_revert(false);
439
440 ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
441 .unwrap();
442 ops.add(th.test_ops()).unwrap();
443 ops.add(file::remove::Op::new(th.remove_path())).unwrap();
444
445 let ops_result = ops.init().unwrap().all();
446 assert!(ops_result.is_err());
447
448 let mut recover = OpsRecovery::default();
449 recover.add_loaders(vec![
450 file::copy::loader(),
451 file::remove::loader(),
452 test_loader(),
453 ]);
454 let revert_result = recover.recover(th.status());
455 assert!(revert_result.is_ok());
456 assert!(th.status().exists());
457 assert!(th.remove_path().exists());
458 }
459
460 #[test]
461 fn test_all_failed_in_run() {
462 let th = TestHelper {
463 is_run_fail: true,
464 ..Default::default()
465 };
466 th.update_fails();
467
468 std::fs::write(th.remove_path(), vec![1, 2, 3]).unwrap();
469 std::fs::write(th.copy_from_path(), vec![1, 2, 3]).unwrap();
470
471 let mut ops = Ops::empty(th.status()).unwrap();
472
473 ops.add(file::copy::Op::new(th.copy_from_path(), th.copy_to_path()).unwrap())
474 .unwrap();
475 ops.add(th.test_ops()).unwrap();
476 ops.add(file::remove::Op::new(th.remove_path())).unwrap();
477
478 let result = ops.init().unwrap().all();
479 assert!(result.is_err());
480 assert!(th.status().exists());
481 assert!(th.remove_path().exists());
482 assert!(!th.copy_to_path().exists());
484 assert!(th.copy_from_path().exists());
485 }
486
487 #[test]
488 fn test_recovered_failed_copy() {
489 let temp_dir = tempdir().unwrap();
490 let root_dir = temp_dir.path().to_path_buf();
491 let pb_status = root_dir.clone().join("status");
492
493 let prepare_test = root_dir.clone().join("prepare");
494 let run_test = root_dir.clone().join("run");
495 let clean_test = root_dir.clone().join("clean");
496
497 std::fs::write(run_test.as_path(), vec![1, 2, 3]).unwrap();
498
499 let pb_remove = root_dir.clone().join("to_remove");
500 let pb_copy_from = root_dir.clone().join("from_file.txt");
501 let pb_copy_to = root_dir.clone().join("to_file.txt");
502
503 std::fs::write(pb_remove.as_path(), vec![1, 2, 3]).unwrap();
504 std::fs::write(pb_copy_from.as_path(), vec![1, 2, 3]).unwrap();
505
506 let path = pb_status.as_path();
507 let remove_path = pb_remove.as_path();
508
509 let mut ops = Ops::empty(path).unwrap();
510 ops.set_revert(false);
511
512 ops.add(file::copy::Op::new(pb_copy_from.as_path(), pb_copy_to.as_path()).unwrap())
513 .unwrap();
514 ops.add(TestOp::new(
515 prepare_test.clone(),
516 run_test.clone(),
517 clean_test.clone(),
518 ))
519 .unwrap();
520 ops.add(file::remove::Op::new(remove_path)).unwrap();
521
522 let result = ops.init().unwrap().all();
524 assert!(result.is_err());
525 assert!(pb_status.exists());
526 assert!(pb_remove.exists());
527
528 std::fs::remove_file(run_test.as_path()).unwrap();
530
531 let mut recover = OpsRecovery::default();
533 recover.add_loaders(vec![
534 file::copy::loader(),
535 file::remove::loader(),
536 test_loader(),
537 ]);
538 let ops = recover.recover(pb_status.as_path()).unwrap();
539 ops.all().unwrap();
540
541 assert!(!pb_status.exists());
543 assert!(!pb_remove.exists());
544 assert!(pb_copy_to.exists());
545 assert!(pb_copy_from.exists());
546 }
547}