Skip to main content

alembic_engine/
journal.rs

1//! Keep track of successfully applied ops to enable resume after an error.
2//!
3//! When resuming, the journal must match the previous run's non-delete op sequence (including op hashes).
4
5use crate::{BackendId, Op};
6use alembic_core::{TypeName, Uid};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::fs;
10use std::fs::{File, OpenOptions};
11use std::hash::{DefaultHasher, Hash, Hasher};
12use std::io::{Read, Seek, Write};
13use std::path::{Path, PathBuf};
14use tempfile::NamedTempFile;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct Journal {
18    #[serde(skip)]
19    file: Option<(File, PathBuf)>,
20    ops: Vec<OpWithMeta>,
21}
22
23impl Journal {
24    pub fn stable_file_name(directory: &Path, adapter_name: &str, ops: &[Op]) -> PathBuf {
25        let mut hasher = DefaultHasher::new();
26        ops.hash(&mut hasher);
27        let hash = hasher.finish();
28        let file_name: PathBuf = format!("{}_journal_{}.yaml", adapter_name, hash).into();
29        directory.join(file_name)
30    }
31
32    /// tries to load a Journal from `file_path`, otherwise creates a new one.
33    /// in either case, the new Journal instance will be backed by the file at `file_path`.
34    /// delete ops will not be saved in the journal.
35    pub fn load_or_create(directory: &Path, adapter_name: &str, ops: &[Op]) -> Result<Self> {
36        let file_name = Self::stable_file_name(directory, adapter_name, ops);
37        if fs::metadata(&file_name).is_ok() {
38            Self::new_from_existing_file(directory, adapter_name, ops)
39        } else {
40            Self::new_with_file(directory, adapter_name, ops)
41        }
42    }
43
44    /// loads a journal from the file with `file_path` and sets its backing file to that file
45    fn new_from_existing_file(
46        directory: &Path,
47        adapter_name: &str,
48        expected_ops: &[Op],
49    ) -> Result<Self> {
50        let file_name = Self::stable_file_name(directory, adapter_name, expected_ops);
51
52        let mut file = OpenOptions::new()
53            .read(true)
54            .write(true)
55            .create(false)
56            .append(false)
57            .open(&file_name)?;
58        let mut contents = String::new();
59        file.read_to_string(&mut contents)?;
60
61        let mut journal: Journal = serde_yaml::from_str(&contents)?;
62
63        let journal_keys = journal
64            .ops
65            .iter()
66            .map(|op_with_meta| {
67                (
68                    op_with_meta.op_uid,
69                    &op_with_meta.op_typename,
70                    op_with_meta.op_hash,
71                )
72            })
73            .collect::<Vec<(Uid, &TypeName, u64)>>();
74
75        let expected_keys = expected_ops
76            .iter()
77            .filter(|op| !matches!(op, Op::Delete { .. }))
78            .map(|op| (op.uid(), op.type_name(), op.hashed()))
79            .collect::<Vec<(Uid, &TypeName, u64)>>();
80        if journal_keys != expected_keys {
81            return Err(anyhow!(
82                "the ops in the loaded journal file `{}` don't match the expected ops",
83                file_name.display()
84            ));
85        }
86
87        journal.file = Some((file, file_name));
88        Ok(journal)
89    }
90
91    /// creates a journal with a new backing file
92    fn new_with_file(directory: &Path, adapter_name: &str, ops: &[Op]) -> Result<Self> {
93        let file_name = Self::stable_file_name(directory, adapter_name, ops);
94        let mut journal = Self::new_ephemeral(ops);
95
96        // create and write to the file to check that it works before applying any ops
97        let mut file = OpenOptions::new()
98            .read(true)
99            .write(true)
100            .create(true)
101            .truncate(true)
102            .append(false)
103            .open(&file_name)?;
104        file.set_len(0)?;
105        file.rewind()?;
106
107        journal.file = Some((file, file_name));
108        journal.save()?;
109
110        Ok(journal)
111    }
112
113    /// creates a journal without a backing file set
114    pub fn new_ephemeral(ops: &[Op]) -> Self {
115        Self {
116            file: None,
117            ops: ops
118                .iter()
119                .filter(|op| !matches!(op, Op::Delete { .. }))
120                .map(OpWithMeta::new)
121                .collect(),
122        }
123    }
124
125    pub fn done_ops(&self) -> Vec<(Uid, TypeName, u64)> {
126        self.ops
127            .iter()
128            .filter(|owm| owm.done)
129            .map(|owm| (owm.op_uid, owm.op_typename.clone(), owm.op_hash))
130            .collect()
131    }
132
133    pub fn done_ops_count(&self) -> usize {
134        self.ops.iter().filter(|op| op.done).count()
135    }
136
137    pub fn is_completed(&self) -> bool {
138        self.ops.iter().all(|op| op.done)
139    }
140
141    /// will mark the first op that is not done (will fail if there's no such op).
142    /// uses a linear search from the start.
143    pub fn mark_op_as_done(&mut self, op: &Op) -> Result<()> {
144        let op_hash = op.hashed();
145        let op_uid = op.uid();
146        let op_typename = op.type_name();
147
148        let Some(op_index) = self.ops.iter().position(|op| {
149            !op.done
150                && op.op_hash == op_hash
151                && op.op_uid == op_uid
152                && &op.op_typename == op_typename
153        }) else {
154            return Err(anyhow!(
155                "no matching op found in journal, can't mark any as done"
156            ));
157        };
158
159        // index comes from call to `position` above, so it must be in range
160        self.ops[op_index].done = true;
161
162        Ok(())
163    }
164
165    pub fn save(&mut self) -> Result<()> {
166        let str = serde_yaml::to_string(self)?;
167
168        let (_, path) = self
169            .file
170            .as_ref()
171            .ok_or_else(|| anyhow!("can't save journal because it's missing a backing file"))?;
172
173        let path = path.clone();
174        let dir = path
175            .parent()
176            .ok_or_else(|| anyhow!("file path has no parent directory"))?;
177
178        let mut temp_file = NamedTempFile::new_in(dir)?;
179        temp_file.write_all(str.as_bytes())?;
180        temp_file.as_file().sync_all()?; // fsync data + metadata before it can become visible
181        temp_file.persist(&path)?;
182        File::open(dir)?.sync_all()?;
183        let new_file = OpenOptions::new().read(true).write(true).open(&path)?;
184        self.file = Some((new_file, path));
185
186        Ok(())
187    }
188
189    pub fn delete_backing_file(&mut self) -> Result<()> {
190        if let Some((file, file_path)) = self.file.take() {
191            drop(file);
192            fs::remove_file(file_path)?;
193        }
194        Ok(())
195    }
196}
197
198impl Drop for Journal {
199    fn drop(&mut self) {
200        if let Some((file, _)) = self.file.take() {
201            let _ = file.sync_all();
202        }
203    }
204}
205
206// we're only storing the uid and typename for the Op to keep this struct small and readable
207#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
208struct OpWithMeta {
209    op_uid: Uid,
210    op_typename: TypeName,
211    op_hash: u64,
212    done: bool,
213    backend_id: Option<BackendId>, // FIXME: not sure if/when this is needed
214}
215
216impl OpWithMeta {
217    fn new(op: &Op) -> Self {
218        OpWithMeta {
219            op_uid: op.uid(),
220            op_typename: op.type_name().clone(),
221            op_hash: op.hashed(),
222            done: false,
223            backend_id: None,
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use alembic_core::{Object, TypeName};
232    use tempfile::tempdir;
233
234    fn test_ops() -> Vec<Op> {
235        vec![
236            Op::Create {
237                uid: Uid::from_u128(1),
238                type_name: TypeName::new("dcim.device"),
239                desired: Object {
240                    uid: Uid::from_u128(1),
241                    type_name: TypeName::new("dcim.device"),
242                    key: Default::default(),
243                    attrs: Default::default(),
244                    source: None,
245                },
246            },
247            Op::Create {
248                uid: Uid::from_u128(2),
249                type_name: TypeName::new("dcim.device"),
250                desired: Object {
251                    uid: Uid::from_u128(2),
252                    type_name: TypeName::new("dcim.device"),
253                    key: Default::default(),
254                    attrs: Default::default(),
255                    source: None,
256                },
257            },
258            Op::Update {
259                uid: Uid::from_u128(3),
260                type_name: TypeName::new("dcim.site"),
261                desired: Object {
262                    uid: Uid::from_u128(3),
263                    type_name: TypeName::new("dcim.site"),
264                    key: Default::default(),
265                    attrs: Default::default(),
266                    source: None,
267                },
268                changes: vec![],
269                backend_id: None,
270            },
271        ]
272    }
273
274    #[test]
275    fn save_and_load_journal() {
276        let dir = tempdir().unwrap();
277        let ops = test_ops();
278        {
279            let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
280            journal.save().unwrap();
281            drop(journal);
282        }
283        {
284            let journal = Journal::new_from_existing_file(dir.path(), "test", &ops).unwrap();
285            assert_eq!(
286                journal
287                    .ops
288                    .iter()
289                    .map(|owm| (owm.op_uid, &owm.op_typename))
290                    .collect::<Vec<(Uid, &TypeName)>>(),
291                ops.iter()
292                    .map(|op| (op.uid(), op.type_name()))
293                    .collect::<Vec<(Uid, &TypeName)>>()
294            );
295        }
296    }
297
298    #[test]
299    fn load_and_save_existing_journal() {
300        let dir = tempdir().unwrap();
301        let ops = test_ops();
302        {
303            let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
304            journal.save().unwrap();
305        }
306        {
307            let mut journal = Journal::new_from_existing_file(dir.path(), "test", &ops).unwrap();
308            journal.save().unwrap();
309            assert_eq!(journal.ops.len(), 3);
310        }
311    }
312
313    #[test]
314    fn mark_ops_as_done() {
315        let ops = test_ops();
316        let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
317        journal.mark_op_as_done(&ops[0]).unwrap();
318        assert!(!journal.is_completed());
319        journal.mark_op_as_done(&ops[1]).unwrap();
320        assert!(!journal.is_completed());
321        journal.mark_op_as_done(&ops[2]).unwrap();
322        assert!(journal.is_completed());
323    }
324
325    #[test]
326    fn mark_ops_as_done_backwards() {
327        let ops = test_ops();
328        let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
329        journal.mark_op_as_done(&ops[2]).unwrap();
330        assert!(!journal.is_completed());
331        journal.mark_op_as_done(&ops[1]).unwrap();
332        assert!(!journal.is_completed());
333        journal.mark_op_as_done(&ops[0]).unwrap();
334        assert!(journal.is_completed());
335    }
336
337    #[test]
338    fn mark_invalid_op_as_done() {
339        let ops = test_ops();
340        let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
341        journal
342            .mark_op_as_done(&Op::Create {
343                uid: Uid::from_u128(999),
344                type_name: TypeName::new("dcim.site"),
345                desired: Object {
346                    uid: Uid::from_u128(999),
347                    type_name: TypeName::new("dcim.site"),
348                    key: Default::default(),
349                    attrs: Default::default(),
350                    source: None,
351                },
352            })
353            .expect_err("should fail");
354        assert!(!journal.is_completed());
355    }
356
357    #[test]
358    fn mark_same_op_as_done_twice() {
359        let ops = test_ops();
360        let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
361        journal.mark_op_as_done(&ops[1]).unwrap();
362        journal.mark_op_as_done(&ops[1]).expect_err("should fail");
363    }
364
365    #[test]
366    fn delete_backing_file() {
367        let dir = tempdir().unwrap();
368        let ops = test_ops();
369        let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
370        journal.save().unwrap();
371        let file_path = Journal::stable_file_name(dir.path(), "test", &ops);
372        assert!(file_path.exists());
373        journal.delete_backing_file().unwrap();
374        assert!(!file_path.exists());
375    }
376}