use std::{fmt, ops::Range, sync::Arc, time::SystemTime};
use async_trait::async_trait;
use bytes::Bytes;
use thiserror::Error;
pub mod azure;
pub mod local_fs;
mod retry;
pub mod s3;
pub use azure::AzureStorageProvider;
pub use local_fs::LocalFsStorageProvider;
pub use s3::S3StorageProvider;
#[derive(Debug, Clone)]
pub struct ObjectMeta {
pub size: u64,
pub etag: Option<String>,
pub last_modified: SystemTime,
}
#[derive(Debug, Error)]
pub enum StorageError {
#[error("not found: {uri}")]
NotFound { uri: String },
#[error("precondition failed: {uri}")]
PreconditionFailed { uri: String },
#[error("transient error after retry: {uri} — {source}")]
TransientExhausted {
uri: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("permanent error: {uri} — {source}")]
Permanent {
uri: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[async_trait]
pub trait StorageProvider: Send + Sync + fmt::Debug {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError>;
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError>;
async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError>;
async fn tail(&self, uri: &str, len: u64) -> Result<(Bytes, u64), StorageError> {
let meta = self.head(uri).await?;
let len = len.min(meta.size);
if len == 0 {
return Ok((Bytes::new(), meta.size));
}
let start = meta.size - len;
let bytes = self.get_range(uri, start..meta.size).await?;
Ok((bytes, meta.size))
}
async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError>;
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
expected_etag: Option<&str>,
) -> Result<Option<String>, StorageError>;
async fn put_multipart(
&self,
uri: &str,
) -> Result<Box<dyn object_store::MultipartUpload>, StorageError>;
async fn delete(&self, uri: &str) -> Result<(), StorageError>;
async fn list_with_prefix_metadata(
&self,
_prefix: &str,
) -> Result<Vec<(String, ObjectMeta)>, StorageError> {
Ok(Vec::new())
}
async fn list_with_prefix(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
Ok(self
.list_with_prefix_metadata(prefix)
.await?
.into_iter()
.map(|(key, _)| key)
.collect())
}
fn object_store_handle(
&self,
_uri: &str,
) -> Option<(Arc<dyn object_store::ObjectStore>, object_store::path::Path)> {
None
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, error::Error, ops::Range, sync::Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use super::*;
const MOCK_ETAG: &str = "mock-etag";
#[derive(Debug, Default)]
struct InMemoryMock {
objects: Mutex<HashMap<String, Bytes>>,
}
impl InMemoryMock {
fn with(uri: &str, bytes: &[u8]) -> Self {
let mock = Self::default();
mock.objects
.lock()
.expect("lock")
.insert(uri.into(), Bytes::copy_from_slice(bytes));
mock
}
}
fn not_found(uri: &str) -> StorageError {
StorageError::NotFound { uri: uri.into() }
}
#[async_trait]
impl StorageProvider for InMemoryMock {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
let map = self.objects.lock().expect("lock");
match map.get(uri) {
Some(b) => Ok(ObjectMeta {
size: b.len() as u64,
etag: Some(MOCK_ETAG.into()),
last_modified: SystemTime::UNIX_EPOCH,
}),
None => Err(not_found(uri)),
}
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
let map = self.objects.lock().expect("lock");
match map.get(uri) {
Some(b) => Ok((
b.clone(),
ObjectMeta {
size: b.len() as u64,
etag: Some(MOCK_ETAG.into()),
last_modified: SystemTime::UNIX_EPOCH,
},
)),
None => Err(not_found(uri)),
}
}
async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError> {
let map = self.objects.lock().expect("lock");
match map.get(uri) {
Some(b) => Ok(b.slice(range.start as usize..range.end as usize)),
None => Err(not_found(uri)),
}
}
async fn put_atomic(
&self,
uri: &str,
bytes: Bytes,
) -> Result<Option<String>, StorageError> {
let mut map = self.objects.lock().expect("lock");
if map.contains_key(uri) {
return Err(StorageError::PreconditionFailed { uri: uri.into() });
}
map.insert(uri.into(), bytes);
Ok(Some(MOCK_ETAG.into()))
}
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
_expected_etag: Option<&str>,
) -> Result<Option<String>, StorageError> {
self.objects.lock().expect("lock").insert(uri.into(), bytes);
Ok(Some(MOCK_ETAG.into()))
}
async fn put_multipart(
&self,
uri: &str,
) -> Result<Box<dyn object_store::MultipartUpload>, StorageError> {
let boxed: Box<dyn Error + Send + Sync> = "multipart unsupported".into();
Err(StorageError::Permanent {
uri: uri.into(),
source: boxed,
})
}
async fn delete(&self, uri: &str) -> Result<(), StorageError> {
self.objects.lock().expect("lock").remove(uri);
Ok(())
}
}
#[tokio::test]
async fn default_tail_returns_trailing_bytes_and_size() {
let mock = InMemoryMock::with("k", b"abcdefgh");
let (bytes, size) = mock.tail("k", 3).await.expect("tail");
assert_eq!(size, 8);
assert_eq!(&bytes[..], b"fgh");
}
#[tokio::test]
async fn default_tail_clamps_len_to_object_size() {
let mock = InMemoryMock::with("k", b"abc");
let (bytes, size) = mock.tail("k", 100).await.expect("tail over-long");
assert_eq!(size, 3);
assert_eq!(&bytes[..], b"abc", "len clamps to the whole object");
}
#[tokio::test]
async fn default_tail_zero_len_returns_empty_with_size() {
let mock = InMemoryMock::with("k", b"abc");
let (bytes, size) = mock.tail("k", 0).await.expect("tail zero");
assert_eq!(size, 3);
assert!(bytes.is_empty(), "zero-len tail still discloses size");
}
#[tokio::test]
async fn default_tail_propagates_not_found() {
let mock = InMemoryMock::default();
assert!(matches!(
mock.tail("missing", 4).await,
Err(StorageError::NotFound { .. })
));
}
#[tokio::test]
async fn default_list_with_prefix_is_empty() {
let mock = InMemoryMock::with("a/b", b"x");
assert!(
mock.list_with_prefix("a/").await.expect("list").is_empty(),
"the default list never enumerates objects",
);
}
#[test]
fn default_object_store_handle_is_none() {
let mock = InMemoryMock::default();
assert!(mock.object_store_handle("k").is_none());
}
#[tokio::test]
async fn mock_byte_ops_round_trip() {
let mock = InMemoryMock::default();
assert_eq!(
mock.put_atomic("k", Bytes::from_static(b"hello"))
.await
.expect("put_atomic"),
Some(MOCK_ETAG.to_string()),
);
assert!(matches!(
mock.put_atomic("k", Bytes::from_static(b"x")).await,
Err(StorageError::PreconditionFailed { .. })
));
assert_eq!(mock.head("k").await.expect("head").size, 5);
let (bytes, _) = mock.get("k").await.expect("get");
assert_eq!(&bytes[..], b"hello");
assert_eq!(&mock.get_range("k", 1..3).await.expect("range")[..], b"el");
mock.put_if_match("k", Bytes::from_static(b"world!"), Some(MOCK_ETAG))
.await
.expect("put_if_match");
assert_eq!(mock.head("k").await.expect("head2").size, 6);
mock.delete("k").await.expect("delete");
mock.delete("k").await.expect("delete idempotent");
assert!(matches!(
mock.get("k").await,
Err(StorageError::NotFound { .. })
));
assert!(matches!(
mock.head("missing").await,
Err(StorageError::NotFound { .. })
));
assert!(matches!(
mock.get_range("missing", 0..1).await,
Err(StorageError::NotFound { .. })
));
}
#[tokio::test]
async fn mock_put_multipart_surfaces_permanent_error() {
let mock = InMemoryMock::default();
assert!(matches!(
mock.put_multipart("k").await,
Err(StorageError::Permanent { .. })
));
}
#[test]
fn storage_error_display_covers_every_variant() {
let cases: [(StorageError, &str); 4] = [
(StorageError::NotFound { uri: "u".into() }, "not found"),
(
StorageError::PreconditionFailed { uri: "u".into() },
"precondition failed",
),
(
StorageError::TransientExhausted {
uri: "u".into(),
source: "boom".into(),
},
"transient",
),
(
StorageError::Permanent {
uri: "u".into(),
source: "boom".into(),
},
"permanent",
),
];
for (err, needle) in cases {
assert!(
err.to_string().contains(needle),
"{err:?} display should contain {needle:?}",
);
}
}
#[test]
fn object_meta_is_clone_and_debug() {
let meta = ObjectMeta {
size: 7,
etag: Some("e".into()),
last_modified: SystemTime::UNIX_EPOCH,
};
let cloned = meta.clone();
assert_eq!(cloned.size, 7);
assert_eq!(cloned.etag.as_deref(), Some("e"));
assert!(format!("{meta:?}").contains("ObjectMeta"));
}
}