1use crate::{BackendId, Op};
6use alembic_core::{TypeName, Uid};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::fs;
10use std::fs::{File, OpenOptions};
11use std::hash::{DefaultHasher, Hash, Hasher};
12use std::io::{Read, Seek, Write};
13use std::path::{Path, PathBuf};
14use tempfile::NamedTempFile;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct Journal {
18 #[serde(skip)]
19 file: Option<(File, PathBuf)>,
20 ops: Vec<OpWithMeta>,
21}
22
23impl Journal {
24 pub fn stable_file_name(directory: &Path, adapter_name: &str, ops: &[Op]) -> PathBuf {
25 let mut hasher = DefaultHasher::new();
26 ops.hash(&mut hasher);
27 let hash = hasher.finish();
28 let file_name: PathBuf = format!("{}_journal_{}.yaml", adapter_name, hash).into();
29 directory.join(file_name)
30 }
31
32 pub fn load_or_create(directory: &Path, adapter_name: &str, ops: &[Op]) -> Result<Self> {
36 let file_name = Self::stable_file_name(directory, adapter_name, ops);
37 if fs::metadata(&file_name).is_ok() {
38 Self::new_from_existing_file(directory, adapter_name, ops)
39 } else {
40 Self::new_with_file(directory, adapter_name, ops)
41 }
42 }
43
44 fn new_from_existing_file(
46 directory: &Path,
47 adapter_name: &str,
48 expected_ops: &[Op],
49 ) -> Result<Self> {
50 let file_name = Self::stable_file_name(directory, adapter_name, expected_ops);
51
52 let mut file = OpenOptions::new()
53 .read(true)
54 .write(true)
55 .create(false)
56 .append(false)
57 .open(&file_name)?;
58 let mut contents = String::new();
59 file.read_to_string(&mut contents)?;
60
61 let mut journal: Journal = serde_yaml::from_str(&contents)?;
62
63 let journal_keys = journal
64 .ops
65 .iter()
66 .map(|op_with_meta| {
67 (
68 op_with_meta.op_uid,
69 &op_with_meta.op_typename,
70 op_with_meta.op_hash,
71 )
72 })
73 .collect::<Vec<(Uid, &TypeName, u64)>>();
74
75 let expected_keys = expected_ops
76 .iter()
77 .filter(|op| !matches!(op, Op::Delete { .. }))
78 .map(|op| (op.uid(), op.type_name(), op.hashed()))
79 .collect::<Vec<(Uid, &TypeName, u64)>>();
80 if journal_keys != expected_keys {
81 return Err(anyhow!(
82 "the ops in the loaded journal file `{}` don't match the expected ops",
83 file_name.display()
84 ));
85 }
86
87 journal.file = Some((file, file_name));
88 Ok(journal)
89 }
90
91 fn new_with_file(directory: &Path, adapter_name: &str, ops: &[Op]) -> Result<Self> {
93 let file_name = Self::stable_file_name(directory, adapter_name, ops);
94 let mut journal = Self::new_ephemeral(ops);
95
96 let mut file = OpenOptions::new()
98 .read(true)
99 .write(true)
100 .create(true)
101 .truncate(true)
102 .append(false)
103 .open(&file_name)?;
104 file.set_len(0)?;
105 file.rewind()?;
106
107 journal.file = Some((file, file_name));
108 journal.save()?;
109
110 Ok(journal)
111 }
112
113 pub fn new_ephemeral(ops: &[Op]) -> Self {
115 Self {
116 file: None,
117 ops: ops
118 .iter()
119 .filter(|op| !matches!(op, Op::Delete { .. }))
120 .map(OpWithMeta::new)
121 .collect(),
122 }
123 }
124
125 pub fn done_ops(&self) -> Vec<(Uid, TypeName, u64)> {
126 self.ops
127 .iter()
128 .filter(|owm| owm.done)
129 .map(|owm| (owm.op_uid, owm.op_typename.clone(), owm.op_hash))
130 .collect()
131 }
132
133 pub fn done_ops_count(&self) -> usize {
134 self.ops.iter().filter(|op| op.done).count()
135 }
136
137 pub fn is_completed(&self) -> bool {
138 self.ops.iter().all(|op| op.done)
139 }
140
141 pub fn mark_op_as_done(&mut self, op: &Op) -> Result<()> {
144 let op_hash = op.hashed();
145 let op_uid = op.uid();
146 let op_typename = op.type_name();
147
148 let Some(op_index) = self.ops.iter().position(|op| {
149 !op.done
150 && op.op_hash == op_hash
151 && op.op_uid == op_uid
152 && &op.op_typename == op_typename
153 }) else {
154 return Err(anyhow!(
155 "no matching op found in journal, can't mark any as done"
156 ));
157 };
158
159 self.ops[op_index].done = true;
161
162 Ok(())
163 }
164
165 pub fn save(&mut self) -> Result<()> {
166 let str = serde_yaml::to_string(self)?;
167
168 let (_, path) = self
169 .file
170 .as_ref()
171 .ok_or_else(|| anyhow!("can't save journal because it's missing a backing file"))?;
172
173 let path = path.clone();
174 let dir = path
175 .parent()
176 .ok_or_else(|| anyhow!("file path has no parent directory"))?;
177
178 let mut temp_file = NamedTempFile::new_in(dir)?;
179 temp_file.write_all(str.as_bytes())?;
180 temp_file.as_file().sync_all()?; temp_file.persist(&path)?;
182 File::open(dir)?.sync_all()?;
183 let new_file = OpenOptions::new().read(true).write(true).open(&path)?;
184 self.file = Some((new_file, path));
185
186 Ok(())
187 }
188
189 pub fn delete_backing_file(&mut self) -> Result<()> {
190 if let Some((file, file_path)) = self.file.take() {
191 drop(file);
192 fs::remove_file(file_path)?;
193 }
194 Ok(())
195 }
196}
197
198impl Drop for Journal {
199 fn drop(&mut self) {
200 if let Some((file, _)) = self.file.take() {
201 let _ = file.sync_all();
202 }
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
208struct OpWithMeta {
209 op_uid: Uid,
210 op_typename: TypeName,
211 op_hash: u64,
212 done: bool,
213 backend_id: Option<BackendId>, }
215
216impl OpWithMeta {
217 fn new(op: &Op) -> Self {
218 OpWithMeta {
219 op_uid: op.uid(),
220 op_typename: op.type_name().clone(),
221 op_hash: op.hashed(),
222 done: false,
223 backend_id: None,
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use alembic_core::{Object, TypeName};
232 use tempfile::tempdir;
233
234 fn test_ops() -> Vec<Op> {
235 vec![
236 Op::Create {
237 uid: Uid::from_u128(1),
238 type_name: TypeName::new("dcim.device"),
239 desired: Object {
240 uid: Uid::from_u128(1),
241 type_name: TypeName::new("dcim.device"),
242 key: Default::default(),
243 attrs: Default::default(),
244 source: None,
245 },
246 },
247 Op::Create {
248 uid: Uid::from_u128(2),
249 type_name: TypeName::new("dcim.device"),
250 desired: Object {
251 uid: Uid::from_u128(2),
252 type_name: TypeName::new("dcim.device"),
253 key: Default::default(),
254 attrs: Default::default(),
255 source: None,
256 },
257 },
258 Op::Update {
259 uid: Uid::from_u128(3),
260 type_name: TypeName::new("dcim.site"),
261 desired: Object {
262 uid: Uid::from_u128(3),
263 type_name: TypeName::new("dcim.site"),
264 key: Default::default(),
265 attrs: Default::default(),
266 source: None,
267 },
268 changes: vec![],
269 backend_id: None,
270 },
271 ]
272 }
273
274 #[test]
275 fn save_and_load_journal() {
276 let dir = tempdir().unwrap();
277 let ops = test_ops();
278 {
279 let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
280 journal.save().unwrap();
281 drop(journal);
282 }
283 {
284 let journal = Journal::new_from_existing_file(dir.path(), "test", &ops).unwrap();
285 assert_eq!(
286 journal
287 .ops
288 .iter()
289 .map(|owm| (owm.op_uid, &owm.op_typename))
290 .collect::<Vec<(Uid, &TypeName)>>(),
291 ops.iter()
292 .map(|op| (op.uid(), op.type_name()))
293 .collect::<Vec<(Uid, &TypeName)>>()
294 );
295 }
296 }
297
298 #[test]
299 fn load_and_save_existing_journal() {
300 let dir = tempdir().unwrap();
301 let ops = test_ops();
302 {
303 let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
304 journal.save().unwrap();
305 }
306 {
307 let mut journal = Journal::new_from_existing_file(dir.path(), "test", &ops).unwrap();
308 journal.save().unwrap();
309 assert_eq!(journal.ops.len(), 3);
310 }
311 }
312
313 #[test]
314 fn mark_ops_as_done() {
315 let ops = test_ops();
316 let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
317 journal.mark_op_as_done(&ops[0]).unwrap();
318 assert!(!journal.is_completed());
319 journal.mark_op_as_done(&ops[1]).unwrap();
320 assert!(!journal.is_completed());
321 journal.mark_op_as_done(&ops[2]).unwrap();
322 assert!(journal.is_completed());
323 }
324
325 #[test]
326 fn mark_ops_as_done_backwards() {
327 let ops = test_ops();
328 let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
329 journal.mark_op_as_done(&ops[2]).unwrap();
330 assert!(!journal.is_completed());
331 journal.mark_op_as_done(&ops[1]).unwrap();
332 assert!(!journal.is_completed());
333 journal.mark_op_as_done(&ops[0]).unwrap();
334 assert!(journal.is_completed());
335 }
336
337 #[test]
338 fn mark_invalid_op_as_done() {
339 let ops = test_ops();
340 let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
341 journal
342 .mark_op_as_done(&Op::Create {
343 uid: Uid::from_u128(999),
344 type_name: TypeName::new("dcim.site"),
345 desired: Object {
346 uid: Uid::from_u128(999),
347 type_name: TypeName::new("dcim.site"),
348 key: Default::default(),
349 attrs: Default::default(),
350 source: None,
351 },
352 })
353 .expect_err("should fail");
354 assert!(!journal.is_completed());
355 }
356
357 #[test]
358 fn mark_same_op_as_done_twice() {
359 let ops = test_ops();
360 let mut journal = Journal::new_with_file(tempdir().unwrap().path(), "test", &ops).unwrap();
361 journal.mark_op_as_done(&ops[1]).unwrap();
362 journal.mark_op_as_done(&ops[1]).expect_err("should fail");
363 }
364
365 #[test]
366 fn delete_backing_file() {
367 let dir = tempdir().unwrap();
368 let ops = test_ops();
369 let mut journal = Journal::new_with_file(dir.path(), "test", &ops).unwrap();
370 journal.save().unwrap();
371 let file_path = Journal::stable_file_name(dir.path(), "test", &ops);
372 assert!(file_path.exists());
373 journal.delete_backing_file().unwrap();
374 assert!(!file_path.exists());
375 }
376}