atomic_ops/process/file/
ser.rs1use std::{
2 fs::OpenOptions,
3 io::Read,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use serde_flow::{
10 encoder::{bincode, FlowEncoder},
11 flow::Bytes,
12};
13
14use crate::{
15 error::{OpsError, OpsResult},
16 operation::LoaderFn,
17 process::{constants::SERIALIZE_FILE_ID, Process},
18};
19
20const SER_REMOVE_FILE: &str = ".ser_rm";
21
22#[derive(Serialize, Deserialize, Default)]
23pub struct Op {
24 to: PathBuf,
25 content: Vec<u8>,
26
27 #[serde(skip)]
28 bytes: Vec<u8>,
29}
30
31impl Op {
32 pub fn new<T: Serialize + DeserializeOwned + Bytes<T>>(
33 to: &Path,
34 entity: &T,
35 ) -> OpsResult<Self> {
36 let mut object = Self {
37 to: to.to_path_buf(),
38 content: entity
39 .encode::<bincode::Encoder>()
40 .map_err(|_| OpsError::SerializeFailed)?,
41 bytes: Vec::new(),
42 };
43 object.bytes =
44 bincode::Encoder::serialize(&object).map_err(|_| OpsError::SerializeFailed)?;
45 Ok(object)
46 }
47
48 pub fn from_bytes(bytes: &[u8]) -> OpsResult<Self> {
49 let mut decoded: Op =
50 bincode::Encoder::deserialize(bytes).map_err(|_| OpsError::SerializeFailed)?;
51 decoded.bytes = bytes.to_vec();
52 Ok(decoded)
53 }
54
55 #[must_use]
56 #[inline]
57 fn remove_name(&self) -> PathBuf {
58 let remove_filename = format!("{}{SER_REMOVE_FILE}", self.to.to_string_lossy());
59 PathBuf::from(remove_filename)
60 }
61}
62pub fn load(bytes: &[u8]) -> OpsResult<Box<dyn Process>> {
63 let value = Op::from_bytes(bytes)?;
64 Ok(Box::new(value))
65}
66
67pub fn loader() -> (u8, LoaderFn) {
68 (SERIALIZE_FILE_ID, Arc::new(load))
69}
70
71impl Process for Op {
72 fn prepare(&self) -> OpsResult<()> {
73 if self.to.exists() {
74 let remove_filename = self.remove_name();
75 std::fs::rename(self.to.as_path(), remove_filename)?;
76 }
77 Ok(())
78 }
79
80 fn run(&self) -> OpsResult<()> {
81 std::fs::write(self.to.as_path(), &self.content)?;
82
83 let checksum = serde_flow::encoder::CASTAGNOLI.checksum(&self.content);
84 let mut attempts = 3;
85 while attempts > 0 {
86 let mut written_bytes = Vec::new();
87 let mut to_file = OpenOptions::new().read(true).open(&self.to)?;
88 to_file.read_to_end(&mut written_bytes)?;
89
90 let written_checksum = serde_flow::encoder::CASTAGNOLI.checksum(&written_bytes);
91 if checksum == written_checksum {
92 return Ok(());
93 }
94 attempts -= 1;
95 }
96 Ok(())
97 }
98
99 fn clean(&self) -> OpsResult<()> {
100 let remove_filename = self.remove_name();
101 if remove_filename.exists() {
102 std::fs::remove_file(remove_filename)?;
103 }
104 Ok(())
105 }
106
107 fn revert_prepare(&self) -> OpsResult<()> {
108 let remove_filename = self.remove_name();
109 if remove_filename.exists() {
110 std::fs::rename(remove_filename, self.to.as_path())?;
111 }
112 Ok(())
113 }
114
115 fn revert_run(&self) -> OpsResult<()> {
116 let remove_filename = self.remove_name();
117 let _ = std::fs::remove_file(self.to.as_path());
119 if remove_filename.exists() {
120 std::fs::rename(remove_filename, self.to.as_path())?;
122 }
123 Ok(())
124 }
125
126 fn as_bytes(&self) -> &[u8] {
127 &self.bytes
128 }
129
130 fn id() -> u8
131 where
132 Self: Sized,
133 {
134 SERIALIZE_FILE_ID
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use serde::{Deserialize, Serialize};
141 use serde_flow::Flow;
142 use tempfile::tempdir;
143
144 use super::*;
145
146 #[derive(Serialize, Deserialize, Flow)]
147 #[flow(variant = 1, bytes)]
148 pub struct TestNumber {
149 pub number: u32,
150 }
151
152 #[test]
153 fn test_prepare_none() {
154 let tempdir = tempdir().unwrap();
155 let path = tempdir.path().join("new_file");
156 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
157
158 let entity = TestNumber { number: 1234 };
159 let ops = Op::new(path.as_path(), &entity).unwrap();
160 ops.prepare().unwrap();
162 assert!(!path.exists());
164 assert!(!saved_path.exists());
165 }
166
167 #[test]
168 fn test_prepare_exists() {
169 let tempdir = tempdir().unwrap();
170 let path = tempdir.path().join("new_file");
171 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
172
173 let old_entity = TestNumber { number: 5678 };
174 std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
175
176 let entity = TestNumber { number: 1234 };
178 let ops = Op::new(path.as_path(), &entity).unwrap();
179 ops.prepare().unwrap();
181
182 assert!(saved_path.exists());
184 let entity_bytes = std::fs::read(saved_path).unwrap();
185 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
186 assert_eq!(5678, entity_decoded.number);
187 }
188
189 #[test]
190 fn test_run_none() {
191 let tempdir = tempdir().unwrap();
192 let path = tempdir.path().join("new_file");
193
194 let entity = TestNumber { number: 1234 };
195 let ops = Op::new(path.as_path(), &entity).unwrap();
196 ops.prepare().unwrap();
198 ops.run().unwrap();
199
200 assert!(path.exists());
202 let entity_bytes = std::fs::read(&path).unwrap();
203 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
204 assert_eq!(1234, entity_decoded.number);
205 }
206
207 #[test]
208 fn test_run_exists() {
209 let tempdir = tempdir().unwrap();
210 let path = tempdir.path().join("new_file");
211 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
212
213 let old_entity = TestNumber { number: 5678 };
214 std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
215
216 let entity = TestNumber { number: 1234 };
218 let ops = Op::new(path.as_path(), &entity).unwrap();
219 ops.prepare().unwrap();
221 ops.run().unwrap();
222
223 assert!(saved_path.exists());
225 let entity_bytes = std::fs::read(saved_path).unwrap();
226 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
227 assert_eq!(5678, entity_decoded.number);
228
229 assert!(path.exists());
231 let entity_bytes = std::fs::read(&path).unwrap();
232 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
233 assert_eq!(1234, entity_decoded.number);
234 }
235
236 #[test]
237 fn test_clean_none() {
238 let tempdir = tempdir().unwrap();
239 let path = tempdir.path().join("new_file");
240 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
241
242 let entity = TestNumber { number: 1234 };
243 let ops = Op::new(path.as_path(), &entity).unwrap();
244 ops.prepare().unwrap();
246 ops.run().unwrap();
247 ops.clean().unwrap();
248
249 assert!(path.exists());
251 assert!(!saved_path.exists());
252 let entity_bytes = std::fs::read(&path).unwrap();
253 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
254 assert_eq!(1234, entity_decoded.number);
255 }
256
257 #[test]
258 fn test_clean_exists() {
259 let tempdir = tempdir().unwrap();
260 let path = tempdir.path().join("new_file");
261 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
262
263 let old_entity = TestNumber { number: 5678 };
264 std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
265
266 let entity = TestNumber { number: 1234 };
268 let ops = Op::new(path.as_path(), &entity).unwrap();
269 ops.prepare().unwrap();
271 ops.run().unwrap();
272 ops.clean().unwrap();
273
274 assert!(!saved_path.exists());
276
277 assert!(path.exists());
279 let entity_bytes = std::fs::read(&path).unwrap();
280 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
281 assert_eq!(1234, entity_decoded.number);
282 }
283
284 #[test]
285 fn test_revert_prepare_none() {
286 let tempdir = tempdir().unwrap();
287 let path = tempdir.path().join("new_file");
288 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
289
290 let entity = TestNumber { number: 1234 };
291 let ops = Op::new(path.as_path(), &entity).unwrap();
292 ops.prepare().unwrap();
294 assert!(!path.exists());
296 assert!(!saved_path.exists());
297 }
298
299 #[test]
300 fn test_revert_prepare_exists() {
301 let tempdir = tempdir().unwrap();
302 let path = tempdir.path().join("new_file");
303 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
304
305 let old_entity = TestNumber { number: 5678 };
306 std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
307
308 let entity = TestNumber { number: 1234 };
310 let ops = Op::new(path.as_path(), &entity).unwrap();
311
312 ops.prepare().unwrap();
314 ops.revert_prepare().unwrap();
315
316 assert!(!saved_path.exists());
318 let entity_bytes = std::fs::read(path).unwrap();
319 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
320 assert_eq!(5678, entity_decoded.number);
321 }
322
323 #[test]
324 fn test_revert_run_none() {
325 let tempdir = tempdir().unwrap();
326 let path = tempdir.path().join("new_file");
327
328 let entity = TestNumber { number: 1234 };
329 let ops = Op::new(path.as_path(), &entity).unwrap();
330 ops.prepare().unwrap();
332 ops.run().unwrap();
333 ops.revert_run().unwrap();
334
335 assert!(!path.exists());
337 }
338
339 #[test]
340 fn test_revert_run_exists() {
341 let tempdir = tempdir().unwrap();
342 let path = tempdir.path().join("new_file");
343 let saved_path = tempdir.path().join(format!("new_file{SER_REMOVE_FILE}"));
344
345 let old_entity = TestNumber { number: 5678 };
346 std::fs::write(&path, old_entity.encode::<bincode::Encoder>().unwrap()).unwrap();
347
348 let entity = TestNumber { number: 1234 };
350 let ops = Op::new(path.as_path(), &entity).unwrap();
351 ops.prepare().unwrap();
353 ops.run().unwrap();
354 ops.revert_run().unwrap();
355
356 assert!(!saved_path.exists());
358 assert!(path.exists());
360 let entity_bytes = std::fs::read(&path).unwrap();
361 let entity_decoded = TestNumber::decode::<bincode::Encoder>(&entity_bytes).unwrap();
362 assert_eq!(5678, entity_decoded.number);
363 }
364}