atomic_ops/process/file/
ser.rs

1use std::{
2    fs::OpenOptions,
3    io::Read,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use serde_flow::{
10    encoder::{bincode, FlowEncoder},
11    flow::Bytes,
12};
13
14use crate::{
15    error::{OpsError, OpsResult},
16    operation::LoaderFn,
17    process::{constants::SERIALIZE_FILE_ID, Process},
18};
19
20const SER_REMOVE_FILE: &str = ".ser_rm";
21
22#[derive(Serialize, Deserialize, Default)]
23pub struct Op {
24    to: PathBuf,
25    content: Vec<u8>,
26
27    #[serde(skip)]
28    bytes: Vec<u8>,
29}
30
31impl Op {
32    pub fn new<T: Serialize + DeserializeOwned + Bytes<T>>(
33        to: &Path,
34        entity: &T,
35    ) -> OpsResult<Self> {
36        let mut object = Self {
37            to: to.to_path_buf(),
38            content: entity
39                .encode::<bincode::Encoder>()
40                .map_err(|_| OpsError::SerializeFailed)?,
41            bytes: Vec::new(),
42        };
43        object.bytes =
44            bincode::Encoder::serialize(&object).map_err(|_| OpsError::SerializeFailed)?;
45        Ok(object)
46    }
47
48    pub fn from_bytes(bytes: &[u8]) -> OpsResult<Self> {
49        let mut decoded: Op =
50            bincode::Encoder::deserialize(bytes).map_err(|_| OpsError::SerializeFailed)?;
51        decoded.bytes = bytes.to_vec();
52        Ok(decoded)
53    }
54
55    #[must_use]
56    #[inline]
57    fn remove_name(&self) -> PathBuf {
58        let remove_filename = format!("{}{SER_REMOVE_FILE}", self.to.to_string_lossy());
59        PathBuf::from(remove_filename)
60    }
61}
62pub fn load(bytes: &[u8]) -> OpsResult<Box<dyn Process>> {
63    let value = Op::from_bytes(bytes)?;
64    Ok(Box::new(value))
65}
66
67pub fn loader() -> (u8, LoaderFn) {
68    (SERIALIZE_FILE_ID, Arc::new(load))
69}
70
71impl Process for Op {
72    fn prepare(&self) -> OpsResult<()> {
73        if self.to.exists() {
74            let remove_filename = self.remove_name();
75            std::fs::rename(self.to.as_path(), remove_filename)?;
76        }
77        Ok(())
78    }
79
80    fn run(&self) -> OpsResult<()> {
81        std::fs::write(self.to.as_path(), &self.content)?;
82
83        let checksum = serde_flow::encoder::CASTAGNOLI.checksum(&self.content);
84        let mut attempts = 3;
85        while attempts > 0 {
86            let mut written_bytes = Vec::new();
87            let mut to_file = OpenOptions::new().read(true).open(&self.to)?;
88            to_file.read_to_end(&mut written_bytes)?;
89
90            let written_checksum = serde_flow::encoder::CASTAGNOLI.checksum(&written_bytes);
91            if checksum == written_checksum {
92                return Ok(());
93            }
94            attempts -= 1;
95        }
96        Ok(())
97    }
98
99    fn clean(&self) -> OpsResult<()> {
100        let remove_filename = self.remove_name();
101        if remove_filename.exists() {
102            std::fs::remove_file(remove_filename)?;
103        }
104        Ok(())
105    }
106
107    fn revert_prepare(&self) -> OpsResult<()> {
108        let remove_filename = self.remove_name();
109        if remove_filename.exists() {
110            std::fs::rename(remove_filename, self.to.as_path())?;
111        }
112        Ok(())
113    }
114
115    fn revert_run(&self) -> OpsResult<()> {
116        let remove_filename = self.remove_name();
117        // tolerate this error, because it's not very important
118        let _ = std::fs::remove_file(self.to.as_path());
119        if remove_filename.exists() {
120            // this must be executed
121            std::fs::rename(remove_filename, self.to.as_path())?;
122        }
123        Ok(())
124    }
125
126    fn as_bytes(&self) -> &[u8] {
127        &self.bytes
128    }
129
130    fn id() -> u8
131    where
132        Self: Sized,
133    {
134        SERIALIZE_FILE_ID
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use serde::{Deserialize, Serialize};
141    use serde_flow::Flow;
142    use tempfile::tempdir;
143
144    use super::*;
145
146    #[derive(Serialize, Deserialize, Flow)]
147    #[flow(variant = 1, bytes)]
148    pub struct TestNumber {
149        pub number: u32,
150    }
151
152    #[test]
153    fn test_prepare_none() {
154        let tempdir = tempdir().unwrap();
155        let path = tempdir.path().join("new_file");
156        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
157
158        let entity = TestNumber { number: 1234 };
159        let ops = Op::new(path.as_path(), &entity).unwrap();
160        // Run
161        ops.prepare().unwrap();
162        // Test
163        assert!(!path.exists());
164        assert!(!saved_path.exists());
165    }
166
167    #[test]
168    fn test_prepare_exists() {
169        let tempdir = tempdir().unwrap();
170        let path = tempdir.path().join("new_file");
171        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
172
173        let old_entity = TestNumber { number: 5678 };
174        std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
175
176        // new entity
177        let entity = TestNumber { number: 1234 };
178        let ops = Op::new(path.as_path(), &entity).unwrap();
179        // Run
180        ops.prepare().unwrap();
181
182        // Test
183        assert!(saved_path.exists());
184        let entity_bytes = std::fs::read(saved_path).unwrap();
185        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
186        assert_eq!(5678, entity_decoded.number);
187    }
188
189    #[test]
190    fn test_run_none() {
191        let tempdir = tempdir().unwrap();
192        let path = tempdir.path().join("new_file");
193
194        let entity = TestNumber { number: 1234 };
195        let ops = Op::new(path.as_path(), &entity).unwrap();
196        // Run
197        ops.prepare().unwrap();
198        ops.run().unwrap();
199
200        // Test
201        assert!(path.exists());
202        let entity_bytes = std::fs::read(&path).unwrap();
203        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
204        assert_eq!(1234, entity_decoded.number);
205    }
206
207    #[test]
208    fn test_run_exists() {
209        let tempdir = tempdir().unwrap();
210        let path = tempdir.path().join("new_file");
211        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
212
213        let old_entity = TestNumber { number: 5678 };
214        std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
215
216        // new entity
217        let entity = TestNumber { number: 1234 };
218        let ops = Op::new(path.as_path(), &entity).unwrap();
219        // Run
220        ops.prepare().unwrap();
221        ops.run().unwrap();
222
223        // Test
224        assert!(saved_path.exists());
225        let entity_bytes = std::fs::read(saved_path).unwrap();
226        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
227        assert_eq!(5678, entity_decoded.number);
228
229        // new file must be present
230        assert!(path.exists());
231        let entity_bytes = std::fs::read(&path).unwrap();
232        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
233        assert_eq!(1234, entity_decoded.number);
234    }
235
236    #[test]
237    fn test_clean_none() {
238        let tempdir = tempdir().unwrap();
239        let path = tempdir.path().join("new_file");
240        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
241
242        let entity = TestNumber { number: 1234 };
243        let ops = Op::new(path.as_path(), &entity).unwrap();
244        // Run
245        ops.prepare().unwrap();
246        ops.run().unwrap();
247        ops.clean().unwrap();
248
249        // Test
250        assert!(path.exists());
251        assert!(!saved_path.exists());
252        let entity_bytes = std::fs::read(&path).unwrap();
253        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
254        assert_eq!(1234, entity_decoded.number);
255    }
256
257    #[test]
258    fn test_clean_exists() {
259        let tempdir = tempdir().unwrap();
260        let path = tempdir.path().join("new_file");
261        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
262
263        let old_entity = TestNumber { number: 5678 };
264        std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
265
266        // new entity
267        let entity = TestNumber { number: 1234 };
268        let ops = Op::new(path.as_path(), &entity).unwrap();
269        // Run
270        ops.prepare().unwrap();
271        ops.run().unwrap();
272        ops.clean().unwrap();
273
274        // Test
275        assert!(!saved_path.exists());
276
277        // new file must be present
278        assert!(path.exists());
279        let entity_bytes = std::fs::read(&path).unwrap();
280        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
281        assert_eq!(1234, entity_decoded.number);
282    }
283
284    #[test]
285    fn test_revert_prepare_none() {
286        let tempdir = tempdir().unwrap();
287        let path = tempdir.path().join("new_file");
288        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
289
290        let entity = TestNumber { number: 1234 };
291        let ops = Op::new(path.as_path(), &entity).unwrap();
292        // Run
293        ops.prepare().unwrap();
294        // Test
295        assert!(!path.exists());
296        assert!(!saved_path.exists());
297    }
298
299    #[test]
300    fn test_revert_prepare_exists() {
301        let tempdir = tempdir().unwrap();
302        let path = tempdir.path().join("new_file");
303        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
304
305        let old_entity = TestNumber { number: 5678 };
306        std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
307
308        // new entity
309        let entity = TestNumber { number: 1234 };
310        let ops = Op::new(path.as_path(), &entity).unwrap();
311
312        // Run
313        ops.prepare().unwrap();
314        ops.revert_prepare().unwrap();
315
316        // Test
317        assert!(!saved_path.exists());
318        let entity_bytes = std::fs::read(path).unwrap();
319        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
320        assert_eq!(5678, entity_decoded.number);
321    }
322
323    #[test]
324    fn test_revert_run_none() {
325        let tempdir = tempdir().unwrap();
326        let path = tempdir.path().join("new_file");
327
328        let entity = TestNumber { number: 1234 };
329        let ops = Op::new(path.as_path(), &entity).unwrap();
330        // Run
331        ops.prepare().unwrap();
332        ops.run().unwrap();
333        ops.revert_run().unwrap();
334
335        // Test
336        assert!(!path.exists());
337    }
338
339    #[test]
340    fn test_revert_run_exists() {
341        let tempdir = tempdir().unwrap();
342        let path = tempdir.path().join("new_file");
343        let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
344
345        let old_entity = TestNumber { number: 5678 };
346        std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
347
348        // new entity
349        let entity = TestNumber { number: 1234 };
350        let ops = Op::new(path.as_path(), &entity).unwrap();
351        // Run
352        ops.prepare().unwrap();
353        ops.run().unwrap();
354        ops.revert_run().unwrap();
355
356        // Test
357        assert!(!saved_path.exists());
358        // old file must be present
359        assert!(path.exists());
360        let entity_bytes = std::fs::read(&path).unwrap();
361        let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
362        assert_eq!(5678, entity_decoded.number);
363    }
364}