Skip to main content

actionqueue_storage/snapshot/
writer.rs

1//! Snapshot writer interface and file system implementation.
2//!
3//! This module provides snapshot persistence using a file system backend with
4//! versioned encoding for compatibility validation.
5
6use std::fs::{File, OpenOptions};
7use std::io::{Seek, Write};
8use std::path::PathBuf;
9
10use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError};
11use crate::snapshot::model::Snapshot;
12
13/// Errors that can occur when creating a [`SnapshotFsWriter`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum SnapshotFsWriterInitError {
16    /// I/O error when opening the snapshot file.
17    IoError(String),
18}
19
20impl std::fmt::Display for SnapshotFsWriterInitError {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        match self {
23            SnapshotFsWriterInitError::IoError(e) => {
24                write!(f, "I/O error when opening snapshot file: {e}")
25            }
26        }
27    }
28}
29
30impl std::error::Error for SnapshotFsWriterInitError {}
31
32impl std::convert::From<std::io::Error> for SnapshotFsWriterInitError {
33    fn from(err: std::io::Error) -> Self {
34        SnapshotFsWriterInitError::IoError(err.to_string())
35    }
36}
37
38/// A snapshot writer that persists state to storage.
39pub trait SnapshotWriter {
40    /// Write a snapshot to storage.
41    fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError>;
42
43    /// Flush pending writes to durable storage.
44    fn flush(&mut self) -> Result<(), SnapshotWriterError>;
45
46    /// Close the writer, releasing any resources.
47    fn close(self) -> Result<(), SnapshotWriterError>;
48}
49
50/// Errors that can occur during snapshot writing.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub enum SnapshotWriterError {
53    /// I/O error during write.
54    IoError(String),
55    /// The snapshot could not be encoded.
56    EncodeError(String),
57    /// The snapshot violated mapping/parity invariants before encode.
58    MappingError(SnapshotMappingError),
59    /// The writer was closed.
60    Closed,
61}
62
63impl std::fmt::Display for SnapshotWriterError {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::IoError(e) => write!(f, "I/O error: {e}"),
67            Self::EncodeError(e) => write!(f, "Encode error: {e}"),
68            Self::MappingError(e) => write!(f, "Mapping error: {e}"),
69            Self::Closed => write!(f, "snapshot writer is closed"),
70        }
71    }
72}
73
74impl std::error::Error for SnapshotWriterError {}
75
76/// A file system backed snapshot writer using atomic write-to-temp-then-rename.
77///
78/// The writer opens a temporary sibling file (target path + `.tmp` suffix) and
79/// writes encoded snapshot data to it using the same framing format as the WAL
80/// (version + length + crc32 + payload). On [`close()`](SnapshotWriter::close), the
81/// temp file is flushed, renamed atomically over the target path, and the
82/// parent directory is fsynced. If the writer is dropped without calling
83/// `close()`, the temp file is removed and the original snapshot (if any)
84/// remains intact.
85pub struct SnapshotFsWriter {
86    file: File,
87    target_path: PathBuf,
88    temp_path: PathBuf,
89    is_closed: bool,
90}
91
92impl SnapshotFsWriter {
93    /// Creates a new snapshot writer at the given path.
94    ///
95    /// Instead of opening the target path directly, a temporary sibling file
96    /// (target path with `.tmp` suffix appended) is created and written to. The
97    /// target file is only replaced atomically when [`close()`](SnapshotWriter::close)
98    /// is called, ensuring crash safety.
99    ///
100    /// # Arguments
101    ///
102    /// * `path` - The filesystem path where the snapshot file should ultimately reside
103    ///
104    /// # Errors
105    ///
106    /// Returns [`SnapshotFsWriterInitError::IoError`] if the temporary file cannot
107    /// be created or opened for writing.
108    pub fn new(path: PathBuf) -> Result<Self, SnapshotFsWriterInitError> {
109        // Build temp path by appending ".tmp" suffix (not replacing extension),
110        // so "snapshot.bin" becomes "snapshot.bin.tmp", avoiding collisions when
111        // the target path already uses a ".tmp" extension.
112        let file_name = path.file_name().ok_or_else(|| {
113            SnapshotFsWriterInitError::IoError(format!(
114                "snapshot path has no filename component: {}",
115                path.display()
116            ))
117        })?;
118        let mut temp_name = file_name.to_os_string();
119        temp_name.push(".tmp");
120        let temp_path = path.with_file_name(temp_name);
121        let file = OpenOptions::new().create(true).write(true).truncate(true).open(&temp_path)?;
122
123        Ok(SnapshotFsWriter { file, target_path: path, temp_path, is_closed: false })
124    }
125
126    /// Seeks to the beginning of the file for writing.
127    fn seek_to_beginning(&mut self) -> Result<(), SnapshotWriterError> {
128        self.file
129            .seek(std::io::SeekFrom::Start(0))
130            .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
131        Ok(())
132    }
133}
134
135impl SnapshotWriter for SnapshotFsWriter {
136    /// Write a snapshot to storage.
137    ///
138    /// The snapshot is encoded using the same framing format as the WAL:
139    /// - 4 bytes version (currently 4)
140    /// - 4 bytes length of payload
141    /// - 4 bytes CRC-32 of payload
142    /// - JSON-serialized Snapshot
143    ///
144    /// # Arguments
145    ///
146    /// * `snapshot` - The snapshot to write
147    ///
148    /// # Errors
149    ///
150    /// Returns `SnapshotWriterError::Closed` if the writer has been closed.
151    /// Returns `SnapshotWriterError::IoError` if the write operation fails.
152    /// Returns `SnapshotWriterError::EncodeError` if serialization fails.
153    fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError> {
154        if self.is_closed {
155            return Err(SnapshotWriterError::Closed);
156        }
157
158        validate_snapshot(snapshot).map_err(SnapshotWriterError::MappingError)?;
159
160        // Seek to beginning to overwrite
161        self.seek_to_beginning()?;
162
163        // Serialize the snapshot to JSON for deterministic representation
164        let payload = serde_json::to_vec(snapshot)
165            .map_err(|e| SnapshotWriterError::EncodeError(e.to_string()))?;
166
167        // Version frame (4 bytes, little-endian)
168        let version = snapshot.version;
169        self.file
170            .write_all(&version.to_le_bytes())
171            .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
172
173        // Length frame (4 bytes, little-endian)
174        let payload_len = u32::try_from(payload.len()).map_err(|_| {
175            SnapshotWriterError::EncodeError(format!(
176                "snapshot payload too large: {} bytes exceeds u32::MAX",
177                payload.len()
178            ))
179        })?;
180        self.file
181            .write_all(&payload_len.to_le_bytes())
182            .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
183
184        // CRC-32 frame (4 bytes, little-endian)
185        let crc = crc32fast::hash(&payload);
186        self.file
187            .write_all(&crc.to_le_bytes())
188            .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
189
190        // Write payload
191        self.file.write_all(&payload).map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
192
193        Ok(())
194    }
195
196    /// Flushes pending writes to durable storage.
197    ///
198    /// This ensures that all buffered data is written to disk before
199    /// returning. The flush operation is synchronous and will block
200    /// until the data is persisted.
201    ///
202    /// # Errors
203    ///
204    /// Returns `SnapshotWriterError::Closed` if the writer has been closed.
205    /// Returns `SnapshotWriterError::IoError` if the flush operation fails.
206    fn flush(&mut self) -> Result<(), SnapshotWriterError> {
207        if self.is_closed {
208            return Err(SnapshotWriterError::Closed);
209        }
210
211        self.file.sync_all().map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
212
213        Ok(())
214    }
215
216    /// Closes the writer, atomically replacing the target snapshot file.
217    ///
218    /// This performs:
219    /// 1. `sync_all()` on the temp file to ensure data is durable
220    /// 2. `rename(temp, target)` for atomic replacement
221    /// 3. `fsync` on the parent directory to make the rename durable
222    ///
223    /// Once closed, the writer cannot be used for further operations.
224    /// If `close()` is not called (e.g. due to a crash or error), the
225    /// temp file is cleaned up on drop and the original snapshot is preserved.
226    ///
227    /// # Errors
228    ///
229    /// Returns `SnapshotWriterError::IoError` if the flush, rename, or
230    /// directory sync operation fails.
231    fn close(mut self) -> Result<(), SnapshotWriterError> {
232        // Flush all data to disk before renaming
233        self.flush()?;
234
235        // Atomic rename of temp file over target
236        std::fs::rename(&self.temp_path, &self.target_path)
237            .map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
238
239        // Fsync parent directory to make the rename durable
240        if let Some(parent) = self.target_path.parent() {
241            let dir = File::open(parent).map_err(|e| {
242                SnapshotWriterError::IoError(format!(
243                    "failed to open snapshot parent directory for fsync: {e}"
244                ))
245            })?;
246            dir.sync_all().map_err(|e| {
247                SnapshotWriterError::IoError(format!(
248                    "failed to fsync snapshot parent directory: {e}"
249                ))
250            })?;
251        }
252
253        // Mark as closed so Drop does not attempt temp cleanup
254        self.is_closed = true;
255
256        tracing::info!("snapshot written and persisted");
257
258        Ok(())
259    }
260}
261
262impl Drop for SnapshotFsWriter {
263    fn drop(&mut self) {
264        // If the writer was not explicitly closed, remove the temp file so
265        // that the original snapshot (if any) remains intact.
266        if !self.is_closed {
267            let _ = std::fs::remove_file(&self.temp_path);
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::fs;
275    use std::sync::atomic::{AtomicUsize, Ordering};
276
277    use super::*;
278    use crate::snapshot::mapping::{SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
279    use crate::snapshot::model::SnapshotMetadata;
280
281    static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
282
283    fn temp_snapshot_path() -> std::path::PathBuf {
284        let dir = std::env::temp_dir();
285        let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
286        let path = dir.join(format!(
287            "actionqueue_snapshot_writer_test_{}_{}.snap",
288            std::process::id(),
289            count
290        ));
291        let _ = fs::remove_file(&path);
292        let _ = fs::remove_file(temp_sibling(&path));
293        path
294    }
295
296    /// Returns the temp sibling path that SnapshotFsWriter would use internally
297    /// (target filename + ".tmp" suffix).
298    fn temp_sibling(path: &std::path::Path) -> std::path::PathBuf {
299        let mut name =
300            path.file_name().expect("test path must have a filename component").to_os_string();
301        name.push(".tmp");
302        path.with_file_name(name)
303    }
304
305    fn create_test_snapshot(payload: &[u8]) -> Snapshot {
306        let task_spec = actionqueue_core::task::task_spec::TaskSpec::new(
307            actionqueue_core::ids::TaskId::new(),
308            actionqueue_core::task::task_spec::TaskPayload::with_content_type(
309                payload.to_vec(),
310                "application/octet-stream",
311            ),
312            actionqueue_core::task::run_policy::RunPolicy::Once,
313            actionqueue_core::task::constraints::TaskConstraints::default(),
314            actionqueue_core::task::metadata::TaskMetadata::default(),
315        )
316        .expect("test task spec should be valid");
317        Snapshot {
318            version: 4,
319            timestamp: 1234567890,
320            metadata: SnapshotMetadata {
321                schema_version: SNAPSHOT_SCHEMA_VERSION,
322                wal_sequence: 42,
323                task_count: 1,
324                run_count: 0,
325            },
326            tasks: vec![test_snapshot_task(task_spec)],
327            runs: Vec::new(),
328            engine: crate::snapshot::model::SnapshotEngineControl::default(),
329            dependency_declarations: Vec::new(),
330            budgets: Vec::new(),
331            subscriptions: Vec::new(),
332            actors: Vec::new(),
333            tenants: Vec::new(),
334            role_assignments: Vec::new(),
335            capability_grants: Vec::new(),
336            ledger_entries: Vec::new(),
337        }
338    }
339
340    fn test_snapshot_task(
341        task_spec: actionqueue_core::task::task_spec::TaskSpec,
342    ) -> crate::snapshot::model::SnapshotTask {
343        crate::snapshot::model::SnapshotTask {
344            task_spec,
345            created_at: 0,
346            updated_at: None,
347            canceled_at: None,
348        }
349    }
350
351    #[test]
352    fn test_new_creates_temp_file() {
353        let path = temp_snapshot_path();
354        let temp_path = temp_sibling(&path);
355        let writer =
356            SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
357        assert!(temp_path.exists(), "temp file should exist after new()");
358        assert!(!path.exists(), "target file should not exist after new()");
359        drop(writer);
360        let _ = fs::remove_file(&path);
361        let _ = fs::remove_file(&temp_path);
362    }
363
364    #[test]
365    fn test_write_persists_snapshot_payload() {
366        let path = temp_snapshot_path();
367        let mut writer =
368            SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
369        let snapshot = create_test_snapshot(&[1, 2, 3]);
370
371        writer.write(&snapshot).expect("snapshot write should succeed");
372        writer.flush().expect("snapshot flush should succeed");
373        writer.close().expect("snapshot close should succeed");
374
375        let bytes = fs::read(&path).expect("snapshot file should be readable");
376        assert!(bytes.len() > 12);
377
378        let _ = fs::remove_file(path);
379    }
380
381    #[test]
382    fn test_reopen_truncates_existing_snapshot_file() {
383        let path = temp_snapshot_path();
384
385        {
386            let mut writer = SnapshotFsWriter::new(path.clone())
387                .expect("first snapshot writer creation should succeed");
388            let large_snapshot = create_test_snapshot(&[9; 128]);
389            writer.write(&large_snapshot).expect("first write should succeed");
390            writer.close().expect("first close should succeed");
391        }
392        let len_before = fs::metadata(&path).expect("metadata should be readable").len();
393
394        {
395            let mut writer = SnapshotFsWriter::new(path.clone())
396                .expect("second snapshot writer creation should succeed");
397            let small_snapshot = create_test_snapshot(&[1]);
398            writer.write(&small_snapshot).expect("second write should succeed");
399            writer.close().expect("second close should succeed");
400        }
401        let len_after = fs::metadata(&path).expect("metadata should be readable").len();
402
403        assert!(len_after < len_before);
404
405        let _ = fs::remove_file(path);
406    }
407
408    #[test]
409    fn test_new_returns_error_when_parent_directory_is_missing() {
410        let parent = std::env::temp_dir().join(format!(
411            "actionqueue_snapshot_writer_missing_parent_{}_{}",
412            std::process::id(),
413            TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
414        ));
415        let _ = fs::remove_dir_all(&parent);
416        let path = parent.join("snapshot.bin");
417
418        let result = SnapshotFsWriter::new(path);
419        assert!(matches!(result, Err(SnapshotFsWriterInitError::IoError(_))));
420    }
421
422    #[test]
423    fn test_write_rejects_mapping_violation() {
424        let path = temp_snapshot_path();
425        let temp_path = temp_sibling(&path);
426        let mut writer =
427            SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
428
429        let mut snapshot = create_test_snapshot(&[1, 2, 3]);
430        snapshot.metadata.task_count = 99;
431
432        let result = writer.write(&snapshot);
433        assert!(matches!(
434            result,
435            Err(SnapshotWriterError::MappingError(SnapshotMappingError::TaskCountMismatch {
436                declared: 99,
437                actual: 1
438            }))
439        ));
440
441        drop(writer);
442        let _ = fs::remove_file(&path);
443        let _ = fs::remove_file(&temp_path);
444    }
445
446    #[test]
447    fn test_target_file_absent_until_close() {
448        let path = temp_snapshot_path();
449        let mut writer =
450            SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
451        let snapshot = create_test_snapshot(&[1, 2, 3]);
452
453        writer.write(&snapshot).expect("snapshot write should succeed");
454        writer.flush().expect("snapshot flush should succeed");
455
456        // Target file must NOT exist yet — only the temp file should be present
457        assert!(!path.exists(), "target file should not exist before close()");
458        assert!(temp_sibling(&path).exists(), "temp file should exist before close()");
459
460        writer.close().expect("snapshot close should succeed");
461
462        // Now the target file should exist and temp should be gone
463        assert!(path.exists(), "target file should exist after close()");
464        assert!(!temp_sibling(&path).exists(), "temp file should not exist after close()");
465
466        let _ = fs::remove_file(path);
467    }
468
469    #[test]
470    fn test_drop_without_close_preserves_original() {
471        let path = temp_snapshot_path();
472
473        // Write an initial snapshot and close it properly
474        {
475            let mut writer = SnapshotFsWriter::new(path.clone())
476                .expect("first snapshot writer creation should succeed");
477            let snapshot = create_test_snapshot(&[10, 20, 30]);
478            writer.write(&snapshot).expect("first write should succeed");
479            writer.close().expect("first close should succeed");
480        }
481
482        let original_bytes = fs::read(&path).expect("original snapshot should be readable");
483        assert!(!original_bytes.is_empty(), "original snapshot should not be empty");
484
485        // Open a new writer (writes to temp), write data, then drop WITHOUT close
486        {
487            let mut writer = SnapshotFsWriter::new(path.clone())
488                .expect("second snapshot writer creation should succeed");
489            let different_snapshot = create_test_snapshot(&[99; 64]);
490            writer.write(&different_snapshot).expect("second write should succeed");
491            // Intentionally drop without calling close()
492        }
493
494        // Original snapshot must be intact
495        let preserved_bytes =
496            fs::read(&path).expect("snapshot should still be readable after aborted write");
497        assert_eq!(
498            original_bytes, preserved_bytes,
499            "original snapshot content should be preserved when writer is dropped without close()"
500        );
501
502        // Temp file should have been cleaned up by Drop
503        assert!(!temp_sibling(&path).exists(), "temp file should be cleaned up on drop");
504
505        let _ = fs::remove_file(path);
506    }
507}