1use std::path::{Path, PathBuf};
8
9use async_trait::async_trait;
10use bytes::Bytes;
11use tokio::fs;
12use tracing::{debug, instrument};
13
14use apiary_core::error::ApiaryError;
15use apiary_core::storage::StorageBackend;
16use apiary_core::Result;
17
18#[derive(Debug, Clone)]
23pub struct LocalBackend {
24 base_dir: PathBuf,
25}
26
27impl LocalBackend {
28 pub async fn new(base_dir: impl Into<PathBuf>) -> Result<Self> {
32 let base_dir = base_dir.into();
33 fs::create_dir_all(&base_dir).await.map_err(|e| {
34 ApiaryError::storage(
35 format!("Failed to create base directory: {}", base_dir.display()),
36 e,
37 )
38 })?;
39 debug!(base_dir = %base_dir.display(), "LocalBackend initialised");
40 Ok(Self { base_dir })
41 }
42
43 fn key_to_path(&self, key: &str) -> PathBuf {
45 self.base_dir.join(key)
46 }
47
48 pub fn base_dir(&self) -> &Path {
50 &self.base_dir
51 }
52}
53
54#[async_trait]
55impl StorageBackend for LocalBackend {
56 #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
57 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
58 let path = self.key_to_path(key);
59 if let Some(parent) = path.parent() {
60 fs::create_dir_all(parent).await.map_err(|e| {
61 ApiaryError::storage(
62 format!("Failed to create parent directories for {}", path.display()),
63 e,
64 )
65 })?;
66 }
67 fs::write(&path, &data)
68 .await
69 .map_err(|e| ApiaryError::storage(format!("Failed to write {}", path.display()), e))?;
70 debug!("Put {} bytes to {}", data.len(), key);
71 Ok(())
72 }
73
74 #[instrument(skip(self), fields(key = %key))]
75 async fn get(&self, key: &str) -> Result<Bytes> {
76 let path = self.key_to_path(key);
77 let data = fs::read(&path).await.map_err(|e| {
78 if e.kind() == std::io::ErrorKind::NotFound {
79 ApiaryError::NotFound {
80 key: key.to_string(),
81 }
82 } else {
83 ApiaryError::storage(format!("Failed to read {}", path.display()), e)
84 }
85 })?;
86 debug!("Get {} bytes from {}", data.len(), key);
87 Ok(Bytes::from(data))
88 }
89
90 #[instrument(skip(self), fields(prefix = %prefix))]
91 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
92 let base = &self.base_dir;
93 let mut results = Vec::new();
94 list_recursive(base, base, prefix, &mut results).await?;
95 results.sort();
96 debug!("Listed {} keys with prefix '{}'", results.len(), prefix);
97 Ok(results)
98 }
99
100 #[instrument(skip(self), fields(key = %key))]
101 async fn delete(&self, key: &str) -> Result<()> {
102 let path = self.key_to_path(key);
103 match fs::remove_file(&path).await {
104 Ok(()) => {
105 debug!("Deleted {}", key);
106 Ok(())
107 }
108 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
109 debug!("Delete {}: already absent", key);
110 Ok(())
111 }
112 Err(e) => Err(ApiaryError::storage(
113 format!("Failed to delete {}", path.display()),
114 e,
115 )),
116 }
117 }
118
119 #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
120 async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
121 let path = self.key_to_path(key);
122 if let Some(parent) = path.parent() {
123 fs::create_dir_all(parent).await.map_err(|e| {
124 ApiaryError::storage(
125 format!("Failed to create parent directories for {}", path.display()),
126 e,
127 )
128 })?;
129 }
130
131 let path_clone = path.clone();
134 let data_clone = data.clone();
135 let result = tokio::task::spawn_blocking(move || {
136 use std::io::Write;
137 match std::fs::OpenOptions::new()
138 .write(true)
139 .create_new(true)
140 .open(&path_clone)
141 {
142 Ok(mut file) => {
143 file.write_all(&data_clone)
144 .map_err(|e| ApiaryError::storage("Failed to write file", e))?;
145 Ok(true)
146 }
147 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => Ok(false),
148 Err(e) => Err(ApiaryError::storage(
149 format!("Failed to create {}", path_clone.display()),
150 e,
151 )),
152 }
153 })
154 .await
155 .map_err(|e| ApiaryError::Internal {
156 message: format!("Blocking task panicked: {e}"),
157 })??;
158
159 debug!(
160 "put_if_not_exists {} → {}",
161 key,
162 if result { "created" } else { "already exists" }
163 );
164 Ok(result)
165 }
166
167 #[instrument(skip(self), fields(key = %key))]
168 async fn exists(&self, key: &str) -> Result<bool> {
169 let path = self.key_to_path(key);
170 let exists = path.exists();
171 debug!("exists {} → {}", key, exists);
172 Ok(exists)
173 }
174}
175
176async fn list_recursive(
178 base: &Path,
179 dir: &Path,
180 prefix: &str,
181 results: &mut Vec<String>,
182) -> Result<()> {
183 let mut entries = match fs::read_dir(dir).await {
184 Ok(entries) => entries,
185 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
186 Err(e) => {
187 return Err(ApiaryError::storage(
188 format!("Failed to read directory {}", dir.display()),
189 e,
190 ))
191 }
192 };
193
194 while let Some(entry) = entries.next_entry().await.map_err(|e| {
195 ApiaryError::storage(
196 format!("Failed to read directory entry in {}", dir.display()),
197 e,
198 )
199 })? {
200 let path = entry.path();
201 if path.is_dir() {
202 Box::pin(list_recursive(base, &path, prefix, results)).await?;
203 } else {
204 let relative = path.strip_prefix(base).map_err(|e| ApiaryError::Internal {
205 message: format!("Path prefix strip failed: {e}"),
206 })?;
207 let key = relative
209 .components()
210 .map(|c| c.as_os_str().to_string_lossy().to_string())
211 .collect::<Vec<_>>()
212 .join("/");
213 if key.starts_with(prefix) {
214 results.push(key);
215 }
216 }
217 }
218
219 Ok(())
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use tempfile::TempDir;
226
227 async fn test_backend() -> (LocalBackend, TempDir) {
228 let tmp = TempDir::new().unwrap();
229 let backend = LocalBackend::new(tmp.path()).await.unwrap();
230 (backend, tmp)
231 }
232
233 #[tokio::test]
234 async fn test_put_and_get() {
235 let (backend, _tmp) = test_backend().await;
236 backend
237 .put("test/file.txt", Bytes::from("hello"))
238 .await
239 .unwrap();
240 let data = backend.get("test/file.txt").await.unwrap();
241 assert_eq!(data, Bytes::from("hello"));
242 }
243
244 #[tokio::test]
245 async fn test_get_not_found() {
246 let (backend, _tmp) = test_backend().await;
247 let result = backend.get("nonexistent").await;
248 assert!(matches!(result, Err(ApiaryError::NotFound { .. })));
249 }
250
251 #[tokio::test]
252 async fn test_list() {
253 let (backend, _tmp) = test_backend().await;
254 backend.put("prefix/a.txt", Bytes::from("a")).await.unwrap();
255 backend.put("prefix/b.txt", Bytes::from("b")).await.unwrap();
256 backend.put("other/c.txt", Bytes::from("c")).await.unwrap();
257
258 let keys = backend.list("prefix/").await.unwrap();
259 assert_eq!(keys.len(), 2);
260 assert!(keys.contains(&"prefix/a.txt".to_string()));
261 assert!(keys.contains(&"prefix/b.txt".to_string()));
262 }
263
264 #[tokio::test]
265 async fn test_list_empty_prefix() {
266 let (backend, _tmp) = test_backend().await;
267 backend.put("a.txt", Bytes::from("a")).await.unwrap();
268 backend.put("b/c.txt", Bytes::from("c")).await.unwrap();
269
270 let keys = backend.list("").await.unwrap();
271 assert_eq!(keys.len(), 2);
272 }
273
274 #[tokio::test]
275 async fn test_delete() {
276 let (backend, _tmp) = test_backend().await;
277 backend
278 .put("to_delete.txt", Bytes::from("data"))
279 .await
280 .unwrap();
281 backend.delete("to_delete.txt").await.unwrap();
282 let result = backend.get("to_delete.txt").await;
283 assert!(matches!(result, Err(ApiaryError::NotFound { .. })));
284 }
285
286 #[tokio::test]
287 async fn test_delete_nonexistent() {
288 let (backend, _tmp) = test_backend().await;
289 backend.delete("nonexistent").await.unwrap();
291 }
292
293 #[tokio::test]
294 async fn test_put_if_not_exists_creates() {
295 let (backend, _tmp) = test_backend().await;
296 let created = backend
297 .put_if_not_exists("new_key.txt", Bytes::from("data"))
298 .await
299 .unwrap();
300 assert!(created);
301 let data = backend.get("new_key.txt").await.unwrap();
302 assert_eq!(data, Bytes::from("data"));
303 }
304
305 #[tokio::test]
306 async fn test_put_if_not_exists_returns_false_when_exists() {
307 let (backend, _tmp) = test_backend().await;
308 backend
309 .put("existing.txt", Bytes::from("original"))
310 .await
311 .unwrap();
312 let created = backend
313 .put_if_not_exists("existing.txt", Bytes::from("new"))
314 .await
315 .unwrap();
316 assert!(!created);
317 let data = backend.get("existing.txt").await.unwrap();
319 assert_eq!(data, Bytes::from("original"));
320 }
321
322 #[tokio::test]
323 async fn test_put_if_not_exists_atomic() {
324 let (backend, _tmp) = test_backend().await;
325 let first = backend
327 .put_if_not_exists("race.txt", Bytes::from("first"))
328 .await
329 .unwrap();
330 assert!(first);
331 let second = backend
333 .put_if_not_exists("race.txt", Bytes::from("second"))
334 .await
335 .unwrap();
336 assert!(!second);
337 let data = backend.get("race.txt").await.unwrap();
339 assert_eq!(data, Bytes::from("first"));
340 }
341
342 #[tokio::test]
343 async fn test_exists() {
344 let (backend, _tmp) = test_backend().await;
345 assert!(!backend.exists("missing").await.unwrap());
346 backend
347 .put("present.txt", Bytes::from("data"))
348 .await
349 .unwrap();
350 assert!(backend.exists("present.txt").await.unwrap());
351 }
352
353 #[tokio::test]
354 async fn test_put_creates_parent_dirs() {
355 let (backend, _tmp) = test_backend().await;
356 backend
357 .put("deep/nested/dir/file.txt", Bytes::from("deep"))
358 .await
359 .unwrap();
360 let data = backend.get("deep/nested/dir/file.txt").await.unwrap();
361 assert_eq!(data, Bytes::from("deep"));
362 }
363
364 #[tokio::test]
365 async fn test_put_overwrites() {
366 let (backend, _tmp) = test_backend().await;
367 backend
368 .put("overwrite.txt", Bytes::from("v1"))
369 .await
370 .unwrap();
371 backend
372 .put("overwrite.txt", Bytes::from("v2"))
373 .await
374 .unwrap();
375 let data = backend.get("overwrite.txt").await.unwrap();
376 assert_eq!(data, Bytes::from("v2"));
377 }
378
379 #[tokio::test]
380 async fn test_put_if_not_exists_concurrent() {
381 use futures::future::join_all;
382
383 let (backend, _tmp) = test_backend().await;
384 let backend = std::sync::Arc::new(backend);
385
386 let futures: Vec<_> = (0..10)
389 .map(|i| {
390 let backend_clone = backend.clone();
391 let data = format!("writer-{}", i);
392 tokio::spawn(async move {
393 backend_clone
394 .put_if_not_exists("concurrent.txt", Bytes::from(data))
395 .await
396 })
397 })
398 .collect();
399
400 let results = join_all(futures).await;
402
403 let outcomes: Vec<bool> = results
405 .into_iter()
406 .map(|join_result| {
407 join_result
408 .expect("Task should not panic")
409 .unwrap_or_else(|e| panic!("Storage operation failed unexpectedly: {:?}", e))
410 })
411 .collect();
412
413 let success_count = outcomes.iter().filter(|&&r| r).count();
415 assert_eq!(
416 success_count, 1,
417 "Expected exactly 1 successful write, got {}",
418 success_count
419 );
420
421 let data = backend.get("concurrent.txt").await.unwrap();
423 let data_str = std::str::from_utf8(&data).expect("Data should be valid UTF-8");
424
425 let expected_values: Vec<String> = (0..10).map(|i| format!("writer-{}", i)).collect();
427 assert!(
428 expected_values.contains(&data_str.to_string()),
429 "Expected one of {:?}, got '{}'",
430 expected_values,
431 data_str
432 );
433 }
434}