use std::fs::{File, OpenOptions};
use std::io::{Seek, Write};
use std::path::PathBuf;
use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError};
use crate::snapshot::model::Snapshot;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SnapshotFsWriterInitError {
IoError(String),
}
impl std::fmt::Display for SnapshotFsWriterInitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SnapshotFsWriterInitError::IoError(e) => {
write!(f, "I/O error when opening snapshot file: {e}")
}
}
}
}
impl std::error::Error for SnapshotFsWriterInitError {}
impl std::convert::From<std::io::Error> for SnapshotFsWriterInitError {
fn from(err: std::io::Error) -> Self {
SnapshotFsWriterInitError::IoError(err.to_string())
}
}
pub trait SnapshotWriter {
fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError>;
fn flush(&mut self) -> Result<(), SnapshotWriterError>;
fn close(self) -> Result<(), SnapshotWriterError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SnapshotWriterError {
IoError(String),
EncodeError(String),
MappingError(SnapshotMappingError),
Closed,
}
impl std::fmt::Display for SnapshotWriterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IoError(e) => write!(f, "I/O error: {e}"),
Self::EncodeError(e) => write!(f, "Encode error: {e}"),
Self::MappingError(e) => write!(f, "Mapping error: {e}"),
Self::Closed => write!(f, "snapshot writer is closed"),
}
}
}
impl std::error::Error for SnapshotWriterError {}
pub struct SnapshotFsWriter {
file: File,
target_path: PathBuf,
temp_path: PathBuf,
is_closed: bool,
}
impl SnapshotFsWriter {
pub fn new(path: PathBuf) -> Result<Self, SnapshotFsWriterInitError> {
let file_name = path.file_name().ok_or_else(|| {
SnapshotFsWriterInitError::IoError(format!(
"snapshot path has no filename component: {}",
path.display()
))
})?;
let mut temp_name = file_name.to_os_string();
temp_name.push(".tmp");
let temp_path = path.with_file_name(temp_name);
let file = OpenOptions::new().create(true).write(true).truncate(true).open(&temp_path)?;
Ok(SnapshotFsWriter { file, target_path: path, temp_path, is_closed: false })
}
fn seek_to_beginning(&mut self) -> Result<(), SnapshotWriterError> {
self.file
.seek(std::io::SeekFrom::Start(0))
.map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
Ok(())
}
}
impl SnapshotWriter for SnapshotFsWriter {
fn write(&mut self, snapshot: &Snapshot) -> Result<(), SnapshotWriterError> {
if self.is_closed {
return Err(SnapshotWriterError::Closed);
}
validate_snapshot(snapshot).map_err(SnapshotWriterError::MappingError)?;
self.seek_to_beginning()?;
let payload = serde_json::to_vec(snapshot)
.map_err(|e| SnapshotWriterError::EncodeError(e.to_string()))?;
let version = snapshot.version;
self.file
.write_all(&version.to_le_bytes())
.map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
let payload_len = u32::try_from(payload.len()).map_err(|_| {
SnapshotWriterError::EncodeError(format!(
"snapshot payload too large: {} bytes exceeds u32::MAX",
payload.len()
))
})?;
self.file
.write_all(&payload_len.to_le_bytes())
.map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
let crc = crc32fast::hash(&payload);
self.file
.write_all(&crc.to_le_bytes())
.map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
self.file.write_all(&payload).map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
Ok(())
}
fn flush(&mut self) -> Result<(), SnapshotWriterError> {
if self.is_closed {
return Err(SnapshotWriterError::Closed);
}
self.file.sync_all().map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
Ok(())
}
fn close(mut self) -> Result<(), SnapshotWriterError> {
self.flush()?;
std::fs::rename(&self.temp_path, &self.target_path)
.map_err(|e| SnapshotWriterError::IoError(e.to_string()))?;
if let Some(parent) = self.target_path.parent() {
let dir = File::open(parent).map_err(|e| {
SnapshotWriterError::IoError(format!(
"failed to open snapshot parent directory for fsync: {e}"
))
})?;
dir.sync_all().map_err(|e| {
SnapshotWriterError::IoError(format!(
"failed to fsync snapshot parent directory: {e}"
))
})?;
}
self.is_closed = true;
tracing::info!("snapshot written and persisted");
Ok(())
}
}
impl Drop for SnapshotFsWriter {
fn drop(&mut self) {
if !self.is_closed {
let _ = std::fs::remove_file(&self.temp_path);
}
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
use crate::snapshot::mapping::{SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
use crate::snapshot::model::SnapshotMetadata;
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn temp_snapshot_path() -> std::path::PathBuf {
let dir = std::env::temp_dir();
let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let path = dir.join(format!(
"actionqueue_snapshot_writer_test_{}_{}.snap",
std::process::id(),
count
));
let _ = fs::remove_file(&path);
let _ = fs::remove_file(temp_sibling(&path));
path
}
fn temp_sibling(path: &std::path::Path) -> std::path::PathBuf {
let mut name =
path.file_name().expect("test path must have a filename component").to_os_string();
name.push(".tmp");
path.with_file_name(name)
}
fn create_test_snapshot(payload: &[u8]) -> Snapshot {
let task_spec = actionqueue_core::task::task_spec::TaskSpec::new(
actionqueue_core::ids::TaskId::new(),
actionqueue_core::task::task_spec::TaskPayload::with_content_type(
payload.to_vec(),
"application/octet-stream",
),
actionqueue_core::task::run_policy::RunPolicy::Once,
actionqueue_core::task::constraints::TaskConstraints::default(),
actionqueue_core::task::metadata::TaskMetadata::default(),
)
.expect("test task spec should be valid");
Snapshot {
version: 4,
timestamp: 1234567890,
metadata: SnapshotMetadata {
schema_version: SNAPSHOT_SCHEMA_VERSION,
wal_sequence: 42,
task_count: 1,
run_count: 0,
},
tasks: vec![test_snapshot_task(task_spec)],
runs: Vec::new(),
engine: crate::snapshot::model::SnapshotEngineControl::default(),
dependency_declarations: Vec::new(),
budgets: Vec::new(),
subscriptions: Vec::new(),
actors: Vec::new(),
tenants: Vec::new(),
role_assignments: Vec::new(),
capability_grants: Vec::new(),
ledger_entries: Vec::new(),
}
}
fn test_snapshot_task(
task_spec: actionqueue_core::task::task_spec::TaskSpec,
) -> crate::snapshot::model::SnapshotTask {
crate::snapshot::model::SnapshotTask {
task_spec,
created_at: 0,
updated_at: None,
canceled_at: None,
}
}
#[test]
fn test_new_creates_temp_file() {
let path = temp_snapshot_path();
let temp_path = temp_sibling(&path);
let writer =
SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
assert!(temp_path.exists(), "temp file should exist after new()");
assert!(!path.exists(), "target file should not exist after new()");
drop(writer);
let _ = fs::remove_file(&path);
let _ = fs::remove_file(&temp_path);
}
#[test]
fn test_write_persists_snapshot_payload() {
let path = temp_snapshot_path();
let mut writer =
SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
let snapshot = create_test_snapshot(&[1, 2, 3]);
writer.write(&snapshot).expect("snapshot write should succeed");
writer.flush().expect("snapshot flush should succeed");
writer.close().expect("snapshot close should succeed");
let bytes = fs::read(&path).expect("snapshot file should be readable");
assert!(bytes.len() > 12);
let _ = fs::remove_file(path);
}
#[test]
fn test_reopen_truncates_existing_snapshot_file() {
let path = temp_snapshot_path();
{
let mut writer = SnapshotFsWriter::new(path.clone())
.expect("first snapshot writer creation should succeed");
let large_snapshot = create_test_snapshot(&[9; 128]);
writer.write(&large_snapshot).expect("first write should succeed");
writer.close().expect("first close should succeed");
}
let len_before = fs::metadata(&path).expect("metadata should be readable").len();
{
let mut writer = SnapshotFsWriter::new(path.clone())
.expect("second snapshot writer creation should succeed");
let small_snapshot = create_test_snapshot(&[1]);
writer.write(&small_snapshot).expect("second write should succeed");
writer.close().expect("second close should succeed");
}
let len_after = fs::metadata(&path).expect("metadata should be readable").len();
assert!(len_after < len_before);
let _ = fs::remove_file(path);
}
#[test]
fn test_new_returns_error_when_parent_directory_is_missing() {
let parent = std::env::temp_dir().join(format!(
"actionqueue_snapshot_writer_missing_parent_{}_{}",
std::process::id(),
TEST_COUNTER.fetch_add(1, Ordering::SeqCst)
));
let _ = fs::remove_dir_all(&parent);
let path = parent.join("snapshot.bin");
let result = SnapshotFsWriter::new(path);
assert!(matches!(result, Err(SnapshotFsWriterInitError::IoError(_))));
}
#[test]
fn test_write_rejects_mapping_violation() {
let path = temp_snapshot_path();
let temp_path = temp_sibling(&path);
let mut writer =
SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
let mut snapshot = create_test_snapshot(&[1, 2, 3]);
snapshot.metadata.task_count = 99;
let result = writer.write(&snapshot);
assert!(matches!(
result,
Err(SnapshotWriterError::MappingError(SnapshotMappingError::TaskCountMismatch {
declared: 99,
actual: 1
}))
));
drop(writer);
let _ = fs::remove_file(&path);
let _ = fs::remove_file(&temp_path);
}
#[test]
fn test_target_file_absent_until_close() {
let path = temp_snapshot_path();
let mut writer =
SnapshotFsWriter::new(path.clone()).expect("snapshot writer creation should succeed");
let snapshot = create_test_snapshot(&[1, 2, 3]);
writer.write(&snapshot).expect("snapshot write should succeed");
writer.flush().expect("snapshot flush should succeed");
assert!(!path.exists(), "target file should not exist before close()");
assert!(temp_sibling(&path).exists(), "temp file should exist before close()");
writer.close().expect("snapshot close should succeed");
assert!(path.exists(), "target file should exist after close()");
assert!(!temp_sibling(&path).exists(), "temp file should not exist after close()");
let _ = fs::remove_file(path);
}
#[test]
fn test_drop_without_close_preserves_original() {
let path = temp_snapshot_path();
{
let mut writer = SnapshotFsWriter::new(path.clone())
.expect("first snapshot writer creation should succeed");
let snapshot = create_test_snapshot(&[10, 20, 30]);
writer.write(&snapshot).expect("first write should succeed");
writer.close().expect("first close should succeed");
}
let original_bytes = fs::read(&path).expect("original snapshot should be readable");
assert!(!original_bytes.is_empty(), "original snapshot should not be empty");
{
let mut writer = SnapshotFsWriter::new(path.clone())
.expect("second snapshot writer creation should succeed");
let different_snapshot = create_test_snapshot(&[99; 64]);
writer.write(&different_snapshot).expect("second write should succeed");
}
let preserved_bytes =
fs::read(&path).expect("snapshot should still be readable after aborted write");
assert_eq!(
original_bytes, preserved_bytes,
"original snapshot content should be preserved when writer is dropped without close()"
);
assert!(!temp_sibling(&path).exists(), "temp file should be cleaned up on drop");
let _ = fs::remove_file(path);
}
}