actionqueue_storage/snapshot/
writer.rs1use std::fs::{File, OpenOptions};
7use std::io::{Seek, Write};
8use std::path::PathBuf;
9
10use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError};
11use crate::snapshot::model::Snapshot;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum SnapshotFsWriterInitError {
16 IoError(String),
18}
19
20impl std::fmt::Display for SnapshotFsWriterInitError {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 match self {
23 SnapshotFsWriterInitError::IoError(e) => {
24 write!(f, "I/O error when opening snapshot file: {e}")
25 }
26 }
27 }
28}
29
30impl std::error::Error for SnapshotFsWriterInitError {}
31
32impl std::convert::From<std::io::Error> for SnapshotFsWriterInitError {
33 fn from(err: std::io::Error) -> Self {
34 SnapshotFsWriterInitError::IoError(err.to_string())
35 }
36}
37
38pub trait SnapshotWriter {
40 fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError>;
42
43 fn flush(&mut self) -> Result<(), SnapshotWriterError>;
45
46 fn close(self) -> Result<(), SnapshotWriterError>;
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum SnapshotWriterError {
53 IoError(String),
55 EncodeError(String),
57 MappingError(SnapshotMappingError),
59 Closed,
61}
62
63impl std::fmt::Display for SnapshotWriterError {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::IoError(e) => write!(f, "I/O error: {e}"),
67 Self::EncodeError(e) => write!(f, "Encode error: {e}"),
68 Self::MappingError(e) => write!(f, "Mapping error: {e}"),
69 Self::Closed => write!(f, "snapshot writer is closed"),
70 }
71 }
72}
73
74impl std::error::Error for SnapshotWriterError {}
75
76pub struct SnapshotFsWriter {
86 file: File,
87 target_path: PathBuf,
88 temp_path: PathBuf,
89 is_closed: bool,
90}
91
92impl SnapshotFsWriter {
93 pub fn new(path: PathBuf) -> Result<Self, SnapshotFsWriterInitError> {
109 let file_name = path.file_name().ok_or_else(|| {
113 SnapshotFsWriterInitError::IoError(format!(
114 "snapshot path has no filename component: {}",
115 path.display()
116 ))
117 })?;
118 let mut temp_name = file_name.to_os_string();
119 temp_name.push(".tmp");
120 let temp_path = path.with_file_name(temp_name);
121 let file = OpenOptions::new().create(true).write(true).truncate(true).open(&temp_path)?;
122
123 Ok(SnapshotFsWriter { file, target_path: path, temp_path, is_closed: false })
124 }
125
126 fn seek_to_beginning(&mut self) -> Result<(), SnapshotWriterError> {
128 self.file
129 .seek(std::io::SeekFrom::Start(0))
130 .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
131 Ok(())
132 }
133}
134
135impl SnapshotWriter for SnapshotFsWriter {
136 fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError> {
154 if self.is_closed {
155 return Err(SnapshotWriterError::Closed);
156 }
157
158 validate_snapshot(snapshot).map_err(SnapshotWriterError::MappingError)?;
159
160 self.seek_to_beginning()?;
162
163 let payload = serde_json::to_vec(snapshot)
165 .map_err(|e| SnapshotWriterError::EncodeError(e.to_string()))?;
166
167 let version = snapshot.version;
169 self.file
170 .write_all(&version.to_le_bytes())
171 .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
172
173 let payload_len = u32::try_from(payload.len()).map_err(|_| {
175 SnapshotWriterError::EncodeError(format!(
176 "snapshot payload too large: {} bytes exceeds u32::MAX",
177 payload.len()
178 ))
179 })?;
180 self.file
181 .write_all(&payload_len.to_le_bytes())
182 .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
183
184 let crc = crc32fast::hash(&payload);
186 self.file
187 .write_all(&crc.to_le_bytes())
188 .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
189
190 self.file.write_all(&payload).map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
192
193 Ok(())
194 }
195
196 fn flush(&mut self) -> Result<(), SnapshotWriterError> {
207 if self.is_closed {
208 return Err(SnapshotWriterError::Closed);
209 }
210
211 self.file.sync_all().map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
212
213 Ok(())
214 }
215
216 fn close(mut self) -> Result<(), SnapshotWriterError> {
232 self.flush()?;
234
235 std::fs::rename(&self.temp_path, &self.target_path)
237 .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
238
239 if let Some(parent) = self.target_path.parent() {
241 let dir = File::open(parent).map_err(|e| {
242 SnapshotWriterError::IoError(format!(
243 "failed to open snapshot parent directory for fsync: {e}"
244 ))
245 })?;
246 dir.sync_all().map_err(|e| {
247 SnapshotWriterError::IoError(format!(
248 "failed to fsync snapshot parent directory: {e}"
249 ))
250 })?;
251 }
252
253 self.is_closed = true;
255
256 tracing::info!("snapshot written and persisted");
257
258 Ok(())
259 }
260}
261
262impl Drop for SnapshotFsWriter {
263 fn drop(&mut self) {
264 if !self.is_closed {
267 let _ = std::fs::remove_file(&self.temp_path);
268 }
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::fs;
275 use std::sync::atomic::{AtomicUsize, Ordering};
276
277 use super::*;
278 use crate::snapshot::mapping::{SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
279 use crate::snapshot::model::SnapshotMetadata;
280
281 static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
282
283 fn temp_snapshot_path() -> std::path::PathBuf {
284 let dir = std::env::temp_dir();
285 let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
286 let path = dir.join(format!(
287 "actionqueue_snapshot_writer_test_{}_{}.snap",
288 std::process::id(),
289 count
290 ));
291 let _ = fs::remove_file(&path);
292 let _ = fs::remove_file(temp_sibling(&path));
293 path
294 }
295
296 fn temp_sibling(path: &std::path::Path) -> std::path::PathBuf {
299 let mut name =
300 path.file_name().expect("test path must have a filename component").to_os_string();
301 name.push(".tmp");
302 path.with_file_name(name)
303 }
304
305 fn create_test_snapshot(payload: &[u8]) -> Snapshot {
306 let task_spec = actionqueue_core::task::task_spec::TaskSpec::new(
307 actionqueue_core::ids::TaskId::new(),
308 actionqueue_core::task::task_spec::TaskPayload::with_content_type(
309 payload.to_vec(),
310 "application/octet-stream",
311 ),
312 actionqueue_core::task::run_policy::RunPolicy::Once,
313 actionqueue_core::task::constraints::TaskConstraints::default(),
314 actionqueue_core::task::metadata::TaskMetadata::default(),
315 )
316 .expect("test task spec should be valid");
317 Snapshot {
318 version: 4,
319 timestamp: 1234567890,
320 metadata: SnapshotMetadata {
321 schema_version: SNAPSHOT_SCHEMA_VERSION,
322 wal_sequence: 42,
323 task_count: 1,
324 run_count: 0,
325 },
326 tasks: vec![test_snapshot_task(task_spec)],
327 runs: Vec::new(),
328 engine: crate::snapshot::model::SnapshotEngineControl::default(),
329 dependency_declarations: Vec::new(),
330 budgets: Vec::new(),
331 subscriptions: Vec::new(),
332 actors: Vec::new(),
333 tenants: Vec::new(),
334 role_assignments: Vec::new(),
335 capability_grants: Vec::new(),
336 ledger_entries: Vec::new(),
337 }
338 }
339
340 fn test_snapshot_task(
341 task_spec: actionqueue_core::task::task_spec::TaskSpec,
342 ) -> crate::snapshot::model::SnapshotTask {
343 crate::snapshot::model::SnapshotTask {
344 task_spec,
345 created_at: 0,
346 updated_at: None,
347 canceled_at: None,
348 }
349 }
350
351 #[test]
352 fn test_new_creates_temp_file() {
353 let path = temp_snapshot_path();
354 let temp_path = temp_sibling(&path);
355 let writer =
356 SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
357 assert!(temp_path.exists(), "temp file should exist after new()");
358 assert!(!path.exists(), "target file should not exist after new()");
359 drop(writer);
360 let _ = fs::remove_file(&path);
361 let _ = fs::remove_file(&temp_path);
362 }
363
364 #[test]
365 fn test_write_persists_snapshot_payload() {
366 let path = temp_snapshot_path();
367 let mut writer =
368 SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
369 let snapshot = create_test_snapshot(&[1, 2, 3]);
370
371 writer.write(&snapshot).expect("snapshot write should succeed");
372 writer.flush().expect("snapshot flush should succeed");
373 writer.close().expect("snapshot close should succeed");
374
375 let bytes = fs::read(&path).expect("snapshot file should be readable");
376 assert!(bytes.len() > 12);
377
378 let _ = fs::remove_file(path);
379 }
380
381 #[test]
382 fn test_reopen_truncates_existing_snapshot_file() {
383 let path = temp_snapshot_path();
384
385 {
386 let mut writer = SnapshotFsWriter::new(path.clone())
387 .expect("first snapshot writer creation should succeed");
388 let large_snapshot = create_test_snapshot(&[9; 128]);
389 writer.write(&large_snapshot).expect("first write should succeed");
390 writer.close().expect("first close should succeed");
391 }
392 let len_before = fs::metadata(&path).expect("metadata should be readable").len();
393
394 {
395 let mut writer = SnapshotFsWriter::new(path.clone())
396 .expect("second snapshot writer creation should succeed");
397 let small_snapshot = create_test_snapshot(&[1]);
398 writer.write(&small_snapshot).expect("second write should succeed");
399 writer.close().expect("second close should succeed");
400 }
401 let len_after = fs::metadata(&path).expect("metadata should be readable").len();
402
403 assert!(len_after < len_before);
404
405 let _ = fs::remove_file(path);
406 }
407
408 #[test]
409 fn test_new_returns_error_when_parent_directory_is_missing() {
410 let parent = std::env::temp_dir().join(format!(
411 "actionqueue_snapshot_writer_missing_parent_{}_{}",
412 std::process::id(),
413 TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
414 ));
415 let _ = fs::remove_dir_all(&parent);
416 let path = parent.join("snapshot.bin");
417
418 let result = SnapshotFsWriter::new(path);
419 assert!(matches!(result, Err(SnapshotFsWriterInitError::IoError(_))));
420 }
421
422 #[test]
423 fn test_write_rejects_mapping_violation() {
424 let path = temp_snapshot_path();
425 let temp_path = temp_sibling(&path);
426 let mut writer =
427 SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
428
429 let mut snapshot = create_test_snapshot(&[1, 2, 3]);
430 snapshot.metadata.task_count = 99;
431
432 let result = writer.write(&snapshot);
433 assert!(matches!(
434 result,
435 Err(SnapshotWriterError::MappingError(SnapshotMappingError::TaskCountMismatch {
436 declared: 99,
437 actual: 1
438 }))
439 ));
440
441 drop(writer);
442 let _ = fs::remove_file(&path);
443 let _ = fs::remove_file(&temp_path);
444 }
445
446 #[test]
447 fn test_target_file_absent_until_close() {
448 let path = temp_snapshot_path();
449 let mut writer =
450 SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
451 let snapshot = create_test_snapshot(&[1, 2, 3]);
452
453 writer.write(&snapshot).expect("snapshot write should succeed");
454 writer.flush().expect("snapshot flush should succeed");
455
456 assert!(!path.exists(), "target file should not exist before close()");
458 assert!(temp_sibling(&path).exists(), "temp file should exist before close()");
459
460 writer.close().expect("snapshot close should succeed");
461
462 assert!(path.exists(), "target file should exist after close()");
464 assert!(!temp_sibling(&path).exists(), "temp file should not exist after close()");
465
466 let _ = fs::remove_file(path);
467 }
468
469 #[test]
470 fn test_drop_without_close_preserves_original() {
471 let path = temp_snapshot_path();
472
473 {
475 let mut writer = SnapshotFsWriter::new(path.clone())
476 .expect("first snapshot writer creation should succeed");
477 let snapshot = create_test_snapshot(&[10, 20, 30]);
478 writer.write(&snapshot).expect("first write should succeed");
479 writer.close().expect("first close should succeed");
480 }
481
482 let original_bytes = fs::read(&path).expect("original snapshot should be readable");
483 assert!(!original_bytes.is_empty(), "original snapshot should not be empty");
484
485 {
487 let mut writer = SnapshotFsWriter::new(path.clone())
488 .expect("second snapshot writer creation should succeed");
489 let different_snapshot = create_test_snapshot(&[99; 64]);
490 writer.write(&different_snapshot).expect("second write should succeed");
491 }
493
494 let preserved_bytes =
496 fs::read(&path).expect("snapshot should still be readable after aborted write");
497 assert_eq!(
498 original_bytes, preserved_bytes,
499 "original snapshot content should be preserved when writer is dropped without close()"
500 );
501
502 assert!(!temp_sibling(&path).exists(), "temp file should be cleaned up on drop");
504
505 let _ = fs::remove_file(path);
506 }
507}