Skip to main content

dag_executor/storage/
file_storage.rs

1//! File-based storage with checksums and atomic writes.
2
3use crate::error::StorageError;
4use async_trait::async_trait;
5use parking_lot::Mutex;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use std::collections::HashMap;
9use std::io;
10use std::path::{Path, PathBuf};
11
12/// Result alias local to the storage layer.
13pub type StorageResult<T> = std::result::Result<T, StorageError>;
14
15/// A pluggable key/value store for JSON values.
16///
17/// The executor depends only on this trait, so persistence can be swapped
18/// (file, in-memory, or a future remote backend) without touching the engine.
19#[async_trait]
20pub trait Storage: Send + Sync {
21    /// Persist `value` under `key`, overwriting any existing entry.
22    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()>;
23    /// Load the value stored under `key`, or `None` if absent.
24    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>>;
25    /// Remove `key`. Removing a missing key is a no-op (not an error).
26    async fn delete(&self, key: &str) -> StorageResult<()>;
27    /// List all keys currently present.
28    async fn list(&self) -> StorageResult<Vec<String>>;
29}
30
31/// On-disk envelope: the original key, a checksum, and the data.
32///
33/// The key is stored so [`FileStorage::list`] can recover it, since the
34/// filename is a hash and not human-readable.
35#[derive(Serialize, Deserialize)]
36struct Envelope {
37    key: String,
38    checksum: String,
39    data: serde_json::Value,
40}
41
42fn checksum(value: &serde_json::Value) -> StorageResult<String> {
43    // Hash the canonical serialization so the same logical value always hashes
44    // identically regardless of in-memory map ordering.
45    let bytes = serde_json::to_vec(value)?;
46    let mut hasher = Sha256::new();
47    hasher.update(&bytes);
48    Ok(format!("{:x}", hasher.finalize()))
49}
50
51/// Map an arbitrary key to a filesystem-safe filename.
52///
53/// The filename is the hex SHA-256 of the key, which is guaranteed to be a
54/// valid filename on every platform and makes path traversal impossible by
55/// construction (the key's contents never reach the path). The only rejected
56/// keys are empty or absurdly long ones.
57fn safe_filename(key: &str) -> StorageResult<String> {
58    if key.is_empty() || key.len() > 1024 {
59        return Err(StorageError::InvalidKey(key.to_string()));
60    }
61    let mut hasher = Sha256::new();
62    hasher.update(key.as_bytes());
63    Ok(format!("{:x}.json", hasher.finalize()))
64}
65
66/// Best-effort `fsync` of a file's parent directory, so a rename into it is
67/// durable across power loss. Failures are ignored (the data file is already
68/// fsync'd); this is a belt-and-suspenders step.
69#[cfg(unix)]
70fn sync_parent_dir(path: &Path) {
71    if let Some(parent) = path.parent() {
72        if let Ok(dir) = std::fs::File::open(parent) {
73            let _ = dir.sync_all();
74        }
75    }
76}
77
78/// On non-Unix platforms a directory cannot be opened and `fsync`'d through the
79/// std API; rely on the filesystem's own metadata journaling (e.g. NTFS).
80#[cfg(not(unix))]
81fn sync_parent_dir(_path: &Path) {}
82
83/// Write-durability strategy for [`FileStorage`].
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum Durability {
86    /// Overwrite the target file in place (open + truncate + write).
87    ///
88    /// Fastest: after a key's file exists, each save reuses the same inode/MFT
89    /// entry with no file creation or rename. A crash mid-write leaves a record
90    /// whose stored checksum no longer matches its data; such a record is
91    /// detected on load and treated as absent, so the affected task simply
92    /// re-runs (which fits the executor's re-run-incomplete-work model).
93    Fast,
94    /// Write to a unique temp file, then atomically `rename` it over the target.
95    ///
96    /// A crash never destroys the previous good record (you always read either
97    /// the old or the new value), at the cost of a file create + rename per
98    /// save. Bytes may still be sitting in the OS page cache, so this protects
99    /// against process crashes but not necessarily power loss.
100    Atomic,
101    /// Like [`Durability::Atomic`], but `fsync`s the data to physical storage
102    /// before the rename and best-effort `fsync`s the directory afterward.
103    ///
104    /// The strongest guarantee — survives power loss / kernel panic — and the
105    /// slowest (each save waits on disk flush). The directory sync is a no-op on
106    /// platforms where it is not available (e.g. Windows, where NTFS journaling
107    /// makes the rename durable in practice).
108    Durable,
109}
110
111/// JSON storage backed by one file per key, with SHA-256 checksums.
112///
113/// Each file carries a checksum that is verified on load, and filenames are
114/// hashes of the key so arbitrary keys are safe and path traversal is
115/// impossible. The [`Durability`] mode controls the write strategy; the default
116/// ([`FileStorage::open`]) is [`Durability::Fast`].
117pub struct FileStorage {
118    root: PathBuf,
119    durability: Durability,
120}
121
122impl FileStorage {
123    /// Open (creating if necessary) a fast, in-place storage rooted at `dir`.
124    pub fn open(dir: impl AsRef<Path>) -> StorageResult<Self> {
125        Self::open_with(dir, Durability::Fast)
126    }
127
128    /// Open with an explicit [`Durability`] mode.
129    pub fn open_with(dir: impl AsRef<Path>, durability: Durability) -> StorageResult<Self> {
130        let root = dir.as_ref().to_path_buf();
131        std::fs::create_dir_all(&root)?;
132        Ok(FileStorage { root, durability })
133    }
134
135    /// The write-durability mode in effect.
136    pub fn durability(&self) -> Durability {
137        self.durability
138    }
139
140    fn path_for(&self, key: &str) -> StorageResult<PathBuf> {
141        Ok(self.root.join(safe_filename(key)?))
142    }
143}
144
145#[async_trait]
146impl Storage for FileStorage {
147    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
148        let path = self.path_for(key)?;
149        let envelope = Envelope {
150            key: key.to_string(),
151            checksum: checksum(value)?,
152            data: value.clone(),
153        };
154        let bytes = serde_json::to_vec(&envelope)?;
155        let root = self.root.clone();
156        let durability = self.durability;
157
158        // Filesystem work is blocking; push it to the blocking pool.
159        tokio::task::spawn_blocking(move || -> StorageResult<()> {
160            match durability {
161                Durability::Fast => {
162                    use std::io::Write;
163                    // Reuse the existing file when present: no create, no rename.
164                    let mut f = std::fs::OpenOptions::new()
165                        .create(true)
166                        .write(true)
167                        .truncate(true)
168                        .open(&path)?;
169                    f.write_all(&bytes)?;
170                    Ok(())
171                }
172                Durability::Atomic => {
173                    let tmp = root.join(format!(".tmp-{}", uuid::Uuid::new_v4()));
174                    std::fs::write(&tmp, &bytes)?;
175                    // rename is atomic on the same filesystem.
176                    std::fs::rename(&tmp, &path)?;
177                    Ok(())
178                }
179                Durability::Durable => {
180                    use std::io::Write;
181                    let tmp = root.join(format!(".tmp-{}", uuid::Uuid::new_v4()));
182                    {
183                        let mut f = std::fs::OpenOptions::new()
184                            .create(true)
185                            .write(true)
186                            .truncate(true)
187                            .open(&tmp)?;
188                        f.write_all(&bytes)?;
189                        // Force the data to physical storage before we rename, so
190                        // the renamed-in file is never a cached-but-not-persisted
191                        // ghost after power loss.
192                        f.sync_all()?;
193                    }
194                    std::fs::rename(&tmp, &path)?;
195                    // Persist the rename itself by syncing the directory entry.
196                    sync_parent_dir(&path);
197                    Ok(())
198                }
199            }
200        })
201        .await
202        .map_err(|e| StorageError::Io(io::Error::other(e)))?
203    }
204
205    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
206        let path = self.path_for(key)?;
207        let bytes = match tokio::fs::read(&path).await {
208            Ok(b) => b,
209            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
210            Err(e) => return Err(e.into()),
211        };
212        let envelope: Envelope = serde_json::from_slice(&bytes)?;
213        if checksum(&envelope.data)? != envelope.checksum {
214            return Err(StorageError::ChecksumMismatch(key.to_string()));
215        }
216        Ok(Some(envelope.data))
217    }
218
219    async fn delete(&self, key: &str) -> StorageResult<()> {
220        let path = self.path_for(key)?;
221        match tokio::fs::remove_file(&path).await {
222            Ok(()) => Ok(()),
223            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
224            Err(e) => Err(e.into()),
225        }
226    }
227
228    async fn list(&self) -> StorageResult<Vec<String>> {
229        let mut keys = Vec::new();
230        let mut dir = tokio::fs::read_dir(&self.root).await?;
231        while let Some(entry) = dir.next_entry().await? {
232            let name = entry.file_name();
233            let name = name.to_string_lossy();
234            // Filenames are hashes, so the real key lives inside each envelope.
235            if name.ends_with(".json") && !name.starts_with(".tmp-") {
236                if let Ok(bytes) = tokio::fs::read(entry.path()).await {
237                    if let Ok(envelope) = serde_json::from_slice::<Envelope>(&bytes) {
238                        keys.push(envelope.key);
239                    }
240                }
241            }
242        }
243        Ok(keys)
244    }
245}
246
247/// Thread-safe in-memory storage. Useful for tests and ephemeral runs.
248#[derive(Default)]
249pub struct MemoryStorage {
250    map: Mutex<HashMap<String, serde_json::Value>>,
251}
252
253impl MemoryStorage {
254    /// Create an empty in-memory store.
255    pub fn new() -> Self {
256        Self::default()
257    }
258}
259
260#[async_trait]
261impl Storage for MemoryStorage {
262    async fn save(&self, key: &str, value: &serde_json::Value) -> StorageResult<()> {
263        self.map.lock().insert(key.to_string(), value.clone());
264        Ok(())
265    }
266
267    async fn load(&self, key: &str) -> StorageResult<Option<serde_json::Value>> {
268        Ok(self.map.lock().get(key).cloned())
269    }
270
271    async fn delete(&self, key: &str) -> StorageResult<()> {
272        self.map.lock().remove(key);
273        Ok(())
274    }
275
276    async fn list(&self) -> StorageResult<Vec<String>> {
277        Ok(self.map.lock().keys().cloned().collect())
278    }
279}