1use std::{
2 io::ErrorKind,
3 path::{Path, PathBuf},
4 time::{Duration, SystemTime, UNIX_EPOCH},
5};
6
7use anyhow::{Context, Result};
8use tokio::{
9 fs::{self, File, OpenOptions},
10 io::AsyncWriteExt,
11};
12
13const MAX_ATTEMPTS: u32 = 3;
15const BACKOFF_MS: u64 = 100;
16
17#[derive(Clone)]
18pub struct FilesystemStorage {
19 root: PathBuf,
20}
21
22impl FilesystemStorage {
23 pub fn new(root: PathBuf) -> Self {
24 FilesystemStorage { root }
25 }
26
27 pub async fn prepare(&self) -> Result<()> {
28 fs::create_dir_all(&self.root)
29 .await
30 .with_context(|| format!("creating storage root {}", self.root.display()))
31 }
32
33 pub fn resolve(&self, relative: &str) -> PathBuf {
34 self.root.join(relative)
35 }
36
37 pub async fn open_read(&self, relative: &str) -> Result<Option<FileHandle>> {
38 let path = self.resolve(relative);
39
40 let mut attempt = 0;
42 let file = loop {
43 attempt += 1;
44 match File::open(&path).await {
45 Ok(file) => break file,
46 Err(e) if e.kind() == ErrorKind::NotFound => return Ok(None),
47 Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
48 tracing::debug!(
49 "open_read attempt {}/{} failed with {:?}, retrying in {}ms: {}",
50 attempt,
51 MAX_ATTEMPTS,
52 e.kind(),
53 BACKOFF_MS,
54 path.display()
55 );
56 tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
57 continue;
58 }
59 Err(e) => {
60 return Err(anyhow::Error::from(e).context(format!(
61 "opening cached asset {} (after {} attempts)",
62 path.display(),
63 attempt
64 )));
65 }
66 }
67 };
68
69 let metadata = file
70 .metadata()
71 .await
72 .with_context(|| format!("reading metadata {}", path.display()))?;
73
74 Ok(Some(FileHandle {
75 file,
76 size: metadata.len(),
77 path,
78 }))
79 }
80
81 pub async fn create_temp_writer(&self, relative: &str) -> Result<TempFile> {
82 let final_path = self.resolve(relative);
83 if let Some(parent) = final_path.parent() {
84 fs::create_dir_all(parent)
85 .await
86 .with_context(|| format!("creating storage dir {}", parent.display()))?;
87 }
88
89 let tmp_path = temp_path_for(&final_path);
90
91 let mut attempt = 0;
93 let file = loop {
94 attempt += 1;
95 match OpenOptions::new()
96 .create(true)
97 .write(true)
98 .truncate(true)
99 .open(&tmp_path)
100 .await
101 {
102 Ok(file) => break file,
103 Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
104 tracing::debug!(
105 "create_temp_writer attempt {}/{} failed with {:?}, retrying in {}ms: {}",
106 attempt,
107 MAX_ATTEMPTS,
108 e.kind(),
109 BACKOFF_MS,
110 tmp_path.display()
111 );
112 tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
113 continue;
114 }
115 Err(e) => {
116 return Err(anyhow::Error::from(e).context(format!(
117 "creating temp file {} (after {} attempts)",
118 tmp_path.display(),
119 attempt
120 )));
121 }
122 }
123 };
124
125 Ok(TempFile {
126 tmp_path,
127 final_path,
128 file,
129 })
130 }
131}
132
133pub struct FileHandle {
134 pub file: File,
135 pub size: u64,
136 pub path: PathBuf,
137}
138
139pub struct TempFile {
140 tmp_path: PathBuf,
141 final_path: PathBuf,
142 file: File,
143}
144
145impl TempFile {
146 pub fn file_mut(&mut self) -> &mut File {
147 &mut self.file
148 }
149
150 pub async fn commit(self) -> Result<()> {
151 let Self {
152 tmp_path,
153 final_path,
154 mut file,
155 } = self;
156
157 file.flush()
158 .await
159 .with_context(|| format!("flushing {}", tmp_path.display()))?;
160 drop(file);
161
162 let mut attempt = 0;
164 loop {
165 attempt += 1;
166 match fs::rename(&tmp_path, &final_path).await {
167 Ok(()) => return Ok(()),
168 Err(e) if should_retry(&e) && attempt < MAX_ATTEMPTS => {
169 tracing::debug!(
170 "commit (rename) attempt {}/{} failed with {:?}, \
171 retrying in {}ms: {} -> {}",
172 attempt,
173 MAX_ATTEMPTS,
174 e.kind(),
175 BACKOFF_MS,
176 tmp_path.display(),
177 final_path.display()
178 );
179 tokio::time::sleep(Duration::from_millis(BACKOFF_MS)).await;
180 continue;
181 }
182 Err(e) => {
183 return Err(anyhow::Error::from(e).context(format!(
184 "moving {} to {} (after {} attempts)",
185 tmp_path.display(),
186 final_path.display(),
187 attempt
188 )));
189 }
190 }
191 }
192 }
193
194 pub async fn rollback(self) -> Result<()> {
195 let Self {
196 tmp_path, mut file, ..
197 } = self;
198 file.flush()
199 .await
200 .with_context(|| format!("flushing {}", tmp_path.display()))?;
201 drop(file);
202 match fs::remove_file(&tmp_path).await {
203 Ok(()) => Ok(()),
204 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
205 Err(e) => Err(anyhow::Error::from(e)
206 .context(format!("removing temp file {}", tmp_path.display()))),
207 }
208 }
209}
210
211fn temp_path_for(final_path: &Path) -> PathBuf {
212 let timestamp = SystemTime::now()
213 .duration_since(UNIX_EPOCH)
214 .unwrap_or_default()
215 .as_nanos();
216 let pid = std::process::id();
217 let tmp_name = match final_path.file_name().and_then(|s| s.to_str()) {
218 Some(name) => format!("{name}.tmp-{pid}-{timestamp}"),
219 None => format!("tmp-{pid}-{timestamp}"),
220 };
221 final_path.with_file_name(tmp_name)
222}
223
224fn should_retry(error: &std::io::Error) -> bool {
226 matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::Interrupted)
227 || matches!(error.raw_os_error(), Some(16) | Some(11))
228 }
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use tokio::io::AsyncReadExt;
236
237 #[tokio::test]
238 async fn test_new_creates_storage() {
239 let temp_dir = tempfile::tempdir().unwrap();
240 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
241 assert_eq!(storage.root, temp_dir.path());
242 }
243
244 #[tokio::test]
245 async fn test_prepare_creates_root_directory() {
246 let temp_dir = tempfile::tempdir().unwrap();
247 let storage_path = temp_dir.path().join("storage_root");
248 let storage = FilesystemStorage::new(storage_path.clone());
249
250 assert!(!storage_path.exists());
251 storage.prepare().await.unwrap();
252 assert!(storage_path.exists());
253 assert!(storage_path.is_dir());
254 }
255
256 #[tokio::test]
257 async fn test_prepare_succeeds_if_directory_exists() {
258 let temp_dir = tempfile::tempdir().unwrap();
259 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
260
261 storage.prepare().await.unwrap();
262 storage.prepare().await.unwrap();
263 }
264
265 #[tokio::test]
266 async fn test_resolve_joins_paths() {
267 let temp_dir = tempfile::tempdir().unwrap();
268 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
269
270 let resolved = storage.resolve("gems/rack-3.0.0.gem");
271 assert_eq!(resolved, temp_dir.path().join("gems/rack-3.0.0.gem"));
272 }
273
274 #[tokio::test]
275 async fn test_resolve_handles_nested_paths() {
276 let temp_dir = tempfile::tempdir().unwrap();
277 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
278
279 let resolved = storage.resolve("a/b/c/file.gem");
280 assert_eq!(resolved, temp_dir.path().join("a/b/c/file.gem"));
281 }
282
283 #[tokio::test]
284 async fn test_open_read_returns_none_for_missing_file() {
285 let temp_dir = tempfile::tempdir().unwrap();
286 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
287
288 let result = storage.open_read("missing.gem").await.unwrap();
289 assert!(result.is_none());
290 }
291
292 #[tokio::test]
293 async fn test_open_read_returns_handle_for_existing_file() {
294 let temp_dir = tempfile::tempdir().unwrap();
295 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
296 storage.prepare().await.unwrap();
297
298 let path = storage.resolve("test.gem");
299 fs::write(&path, b"content").await.unwrap();
300
301 let handle = storage.open_read("test.gem").await.unwrap();
302 assert!(handle.is_some());
303
304 let mut handle = handle.unwrap();
305 let mut buf = Vec::new();
306 handle.file.read_to_end(&mut buf).await.unwrap();
307 assert_eq!(buf, b"content");
308 }
309
310 #[tokio::test]
311 async fn test_create_temp_writer_creates_file() {
312 let temp_dir = tempfile::tempdir().unwrap();
313 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
314 storage.prepare().await.unwrap();
315
316 let mut temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
317 temp_file.file_mut().write_all(b"data").await.unwrap();
318 temp_file.commit().await.unwrap();
319
320 let final_path = storage.resolve("test/file.gem");
321 let data = fs::read(final_path).await.unwrap();
322 assert_eq!(data, b"data");
323 }
324
325 #[tokio::test]
326 async fn test_commit_moves_file() {
327 let temp_dir = tempfile::tempdir().unwrap();
328 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
329 storage.prepare().await.unwrap();
330
331 let mut temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
332 temp_file.file_mut().write_all(b"data").await.unwrap();
333 let tmp_path = temp_file.tmp_path.clone();
334 let final_path = temp_file.final_path.clone();
335 temp_file.commit().await.unwrap();
336
337 assert!(!tmp_path.exists());
338 assert!(final_path.exists());
339 }
340
341 #[tokio::test]
342 async fn test_rollback_removes_temp_file() {
343 let temp_dir = tempfile::tempdir().unwrap();
344 let storage = FilesystemStorage::new(temp_dir.path().to_path_buf());
345 storage.prepare().await.unwrap();
346
347 let temp_file = storage.create_temp_writer("test/file.gem").await.unwrap();
348 let tmp_path = temp_file.tmp_path.clone();
349 temp_file.rollback().await.unwrap();
350
351 assert!(!tmp_path.exists());
352 assert!(!storage.resolve("test/file.gem").exists());
353 }
354
355 #[tokio::test]
356 async fn test_temp_path_generation() {
357 let final_path = PathBuf::from("foo/bar.gem");
358 let tmp_path = temp_path_for(&final_path);
359 assert!(
360 tmp_path
361 .file_name()
362 .unwrap()
363 .to_str()
364 .unwrap()
365 .starts_with("bar.gem.tmp-")
366 );
367 }
368
369 #[test]
370 fn test_should_retry_logic() {
371 let would_block = std::io::Error::from(ErrorKind::WouldBlock);
373 assert!(should_retry(&would_block));
374
375 let interrupted = std::io::Error::from(ErrorKind::Interrupted);
376 assert!(should_retry(&interrupted));
377
378 let not_found = std::io::Error::from(ErrorKind::NotFound);
380 assert!(!should_retry(¬_found));
381
382 let permission_denied = std::io::Error::from(ErrorKind::PermissionDenied);
383 assert!(!should_retry(&permission_denied));
384 }
385}