vein_adapter/
storage.rs

1use std::{
2    io::ErrorKind,
3    path::{Path, PathBuf},
4    time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use anyhow::{Context, Result};
8use tokio::{
9    fs::{self, File, OpenOptions},
10    io::AsyncWriteExt,
11};
12
13// Retry configuration constants
14const MAX_ATTEMPTS: u32 = 3;
15const BACKOFF_MS: u64 = 100;
16
17#[derive(Clone)]
18pub struct FilesystemStorage {
19    root: PathBuf,
20}
21
22impl FilesystemStorage {
23    pub fn new(root: PathBuf) -> Self {
24        FilesystemStorage { root }
25    }
26
27    pub async fn prepare(&self) -> Result<()> {
28        fs::create_dir_all(&self.root)
29            .await
30            .with_context(|| format!("creating storage root {}", self.root.display()))
31    }
32
33    pub fn resolve(&self, relative: &str) -> PathBuf {
34        self.root.join(relative)
35    }
36
37    pub async fn open_read(&self, relative: &str) -> Result<Option<FileHandle>> {
38        let path = self.resolve(relative);
39
40        // Retry logic for file open
41        let mut attempt = 0;
42        let file = loop {
43            attempt += 1;
44            match File::open(&path).await {
45                Ok(file) => break file,
46                Err(e) if e.kind() == ErrorKind::NotFound => return Ok(None),
47                Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
48                    tracing::debug!(
49                        "open_read attempt {}/{} failed with {:?}, retrying in {}ms: {}",
50                        attempt,
51                        MAX_ATTEMPTS,
52                        e.kind(),
53                        BACKOFF_MS,
54                        path.display()
55                    );
56                    tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
57                    continue;
58                }
59                Err(e) => {
60                    return Err(anyhow::Error::from(e).context(format!(
61                        "opening cached asset {} (after {} attempts)",
62                        path.display(),
63                        attempt
64                    )));
65                }
66            }
67        };
68
69        let metadata = file
70            .metadata()
71            .await
72            .with_context(|| format!("reading metadata {}", path.display()))?;
73
74        Ok(Some(FileHandle {
75            file,
76            size: metadata.len(),
77            path,
78        }))
79    }
80
81    pub async fn create_temp_writer(&self, relative: &str) -> Result<TempFile> {
82        let final_path = self.resolve(relative);
83        if let Some(parent) = final_path.parent() {
84            fs::create_dir_all(parent)
85                .await
86                .with_context(|| format!("creating storage dir {}", parent.display()))?;
87        }
88
89        let tmp_path = temp_path_for(&final_path);
90
91        // Retry logic for temp file creation
92        let mut attempt = 0;
93        let file = loop {
94            attempt += 1;
95            match OpenOptions::new()
96                .create(true)
97                .write(true)
98                .truncate(true)
99                .open(&tmp_path)
100                .await
101            {
102                Ok(file) => break file,
103                Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
104                    tracing::debug!(
105                        "create_temp_writer attempt {}/{} failed with {:?}, retrying in {}ms: {}",
106                        attempt,
107                        MAX_ATTEMPTS,
108                        e.kind(),
109                        BACKOFF_MS,
110                        tmp_path.display()
111                    );
112                    tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
113                    continue;
114                }
115                Err(e) => {
116                    return Err(anyhow::Error::from(e).context(format!(
117                        "creating temp file {} (after {} attempts)",
118                        tmp_path.display(),
119                        attempt
120                    )));
121                }
122            }
123        };
124
125        Ok(TempFile {
126            tmp_path,
127            final_path,
128            file,
129        })
130    }
131}
132
133pub struct FileHandle {
134    pub file: File,
135    pub size: u64,
136    pub path: PathBuf,
137}
138
139pub struct TempFile {
140    tmp_path: PathBuf,
141    final_path: PathBuf,
142    file: File,
143}
144
145impl TempFile {
146    pub fn file_mut(&mut self) -> &mut File {
147        &mut self.file
148    }
149
150    pub async fn commit(self) -> Result<()> {
151        let Self {
152            tmp_path,
153            final_path,
154            mut file,
155        } = self;
156
157        file.flush()
158            .await
159            .with_context(|| format!("flushing {}", tmp_path.display()))?;
160        drop(file);
161
162        // Retry logic for atomic rename
163        let mut attempt = 0;
164        loop {
165            attempt += 1;
166            match fs::rename(&tmp_path, &final_path).await {
167                Ok(()) => return Ok(()),
168                Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
169                    tracing::debug!(
170                        "commit (rename) attempt {}/{} failed with {:?}, \
171                         retrying in {}ms: {} -> {}",
172                        attempt,
173                        MAX_ATTEMPTS,
174                        e.kind(),
175                        BACKOFF_MS,
176                        tmp_path.display(),
177                        final_path.display()
178                    );
179                    tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
180                    continue;
181                }
182                Err(e) => {
183                    return Err(anyhow::Error::from(e).context(format!(
184                        "moving {} to {} (after {} attempts)",
185                        tmp_path.display(),
186                        final_path.display(),
187                        attempt
188                    )));
189                }
190            }
191        }
192    }
193
194    pub async fn rollback(self) -> Result<()> {
195        let Self {
196            tmp_path, mut file, ..
197        } = self;
198        file.flush()
199            .await
200            .with_context(|| format!("flushing {}", tmp_path.display()))?;
201        drop(file);
202        match fs::remove_file(&tmp_path).await {
203            Ok(()) => Ok(()),
204            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
205            Err(e) => Err(anyhow::Error::from(e)
206                .context(format!("removing temp file {}", tmp_path.display()))),
207        }
208    }
209}
210
211fn temp_path_for(final_path: &Path) -> PathBuf {
212    let timestamp = SystemTime::now()
213        .duration_since(UNIX_EPOCH)
214        .unwrap_or_default()
215        .as_nanos();
216    let pid = std::process::id();
217    let tmp_name = match final_path.file_name().and_then(|s| s.to_str()) {
218        Some(name) => format!("{name}.tmp-{pid}-{timestamp}"),
219        None => format!("tmp-{pid}-{timestamp}"),
220    };
221    final_path.with_file_name(tmp_name)
222}
223
224/// Determines if an I/O error should be retried
225fn should_retry(error: &std::io::Error) -> bool {
226    matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted)
227        || matches!(error.raw_os_error(), Some(16) | Some(11))
228    // 16 = EBUSY (Device or resource busy)
229    // 11 = EAGAIN (Resource temporarily unavailable)
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use tokio::io::AsyncReadExt;
236
237    #[tokio::test]
238    async fn test_new_creates_storage() {
239        let temp_dir = tempfile::tempdir().unwrap();
240        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
241        assert_eq!(storage.root, temp_dir.path());
242    }
243
244    #[tokio::test]
245    async fn test_prepare_creates_root_directory() {
246        let temp_dir = tempfile::tempdir().unwrap();
247        let storage_path = temp_dir.path().join("storage_root");
248        let storage = FilesystemStorage::new(storage_path.clone());
249
250        assert!(!storage_path.exists());
251        storage.prepare().await.unwrap();
252        assert!(storage_path.exists());
253        assert!(storage_path.is_dir());
254    }
255
256    #[tokio::test]
257    async fn test_prepare_succeeds_if_directory_exists() {
258        let temp_dir = tempfile::tempdir().unwrap();
259        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
260
261        storage.prepare().await.unwrap();
262        storage.prepare().await.unwrap();
263    }
264
265    #[tokio::test]
266    async fn test_resolve_joins_paths() {
267        let temp_dir = tempfile::tempdir().unwrap();
268        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
269
270        let resolved = storage.resolve("gems/rack-3.0.0.gem");
271        assert_eq!(resolved, temp_dir.path().join("gems/rack-3.0.0.gem"));
272    }
273
274    #[tokio::test]
275    async fn test_resolve_handles_nested_paths() {
276        let temp_dir = tempfile::tempdir().unwrap();
277        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
278
279        let resolved = storage.resolve("a/b/c/file.gem");
280        assert_eq!(resolved, temp_dir.path().join("a/b/c/file.gem"));
281    }
282
283    #[tokio::test]
284    async fn test_open_read_returns_none_for_missing_file() {
285        let temp_dir = tempfile::tempdir().unwrap();
286        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
287
288        let result = storage.open_read("missing.gem").await.unwrap();
289        assert!(result.is_none());
290    }
291
292    #[tokio::test]
293    async fn test_open_read_returns_handle_for_existing_file() {
294        let temp_dir = tempfile::tempdir().unwrap();
295        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
296        storage.prepare().await.unwrap();
297
298        let path = storage.resolve("test.gem");
299        fs::write(&path, b"content").await.unwrap();
300
301        let handle = storage.open_read("test.gem").await.unwrap();
302        assert!(handle.is_some());
303
304        let mut handle = handle.unwrap();
305        let mut buf = Vec::new();
306        handle.file.read_to_end(&mut buf).await.unwrap();
307        assert_eq!(buf, b"content");
308    }
309
310    #[tokio::test]
311    async fn test_create_temp_writer_creates_file() {
312        let temp_dir = tempfile::tempdir().unwrap();
313        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
314        storage.prepare().await.unwrap();
315
316        let mut temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
317        temp_file.file_mut().write_all(b"data").await.unwrap();
318        temp_file.commit().await.unwrap();
319
320        let final_path = storage.resolve("test/file.gem");
321        let data = fs::read(final_path).await.unwrap();
322        assert_eq!(data, b"data");
323    }
324
325    #[tokio::test]
326    async fn test_commit_moves_file() {
327        let temp_dir = tempfile::tempdir().unwrap();
328        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
329        storage.prepare().await.unwrap();
330
331        let mut temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
332        temp_file.file_mut().write_all(b"data").await.unwrap();
333        let tmp_path = temp_file.tmp_path.clone();
334        let final_path = temp_file.final_path.clone();
335        temp_file.commit().await.unwrap();
336
337        assert!(!tmp_path.exists());
338        assert!(final_path.exists());
339    }
340
341    #[tokio::test]
342    async fn test_rollback_removes_temp_file() {
343        let temp_dir = tempfile::tempdir().unwrap();
344        let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
345        storage.prepare().await.unwrap();
346
347        let temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
348        let tmp_path = temp_file.tmp_path.clone();
349        temp_file.rollback().await.unwrap();
350
351        assert!(!tmp_path.exists());
352        assert!(!storage.resolve("test/file.gem").exists());
353    }
354
355    #[tokio::test]
356    async fn test_temp_path_generation() {
357        let final_path = PathBuf::from("foo/bar.gem");
358        let tmp_path = temp_path_for(&final_path);
359        assert!(
360            tmp_path
361                .file_name()
362                .unwrap()
363                .to_str()
364                .unwrap()
365                .starts_with("bar.gem.tmp-")
366        );
367    }
368
369    #[test]
370    fn test_should_retry_logic() {
371        // Should retry these errors
372        let would_block = std::io::Error::from(ErrorKind::WouldBlock);
373        assert!(should_retry(&would_block));
374
375        let interrupted = std::io::Error::from(ErrorKind::Interrupted);
376        assert!(should_retry(&interrupted));
377
378        // Should NOT retry these errors
379        let not_found = std::io::Error::from(ErrorKind::NotFound);
380        assert!(!should_retry(&not_found));
381
382        let permission_denied = std::io::Error::from(ErrorKind::PermissionDenied);
383        assert!(!should_retry(&permission_denied));
384    }
385}