use std::{ops::Range, sync::Arc, time::Duration};
use async_trait::async_trait;
use bytes::Bytes;
use futures::TryStreamExt;
use object_store::{
ClientOptions, Error as ObjError, GetOptions, GetRange, MultipartUpload, ObjectStore,
ObjectStoreExt, PutMode, PutOptions, PutPayload, UpdateVersion,
aws::{AmazonS3, AmazonS3Builder, S3ConditionalPut},
path::Path as ObjPath,
};
use super::{ObjectMeta, StorageError, StorageProvider, retry};
#[derive(Debug)]
pub struct S3StorageProvider {
bucket: String,
prefix: String,
store: Arc<AmazonS3>,
}
impl S3StorageProvider {
pub fn new(bucket: impl Into<String>) -> Result<Self, StorageError> {
let bucket = bucket.into();
let store = AmazonS3Builder::from_env()
.with_bucket_name(&bucket)
.with_conditional_put(S3ConditionalPut::ETagMatch)
.with_client_options(tuned_client_options())
.with_retry(retry::config())
.build()
.map_err(|e| StorageError::Permanent {
uri: format!("s3://{bucket}"),
source: Box::new(e),
})?;
Ok(Self {
bucket,
prefix: String::new(),
store: Arc::new(store),
})
}
pub fn new_with_prefix(
bucket: impl Into<String>,
prefix: impl Into<String>,
) -> Result<Self, StorageError> {
let mut provider = Self::new(bucket)?;
provider.prefix = normalize_prefix(prefix);
Ok(provider)
}
pub fn new_with_endpoint(
endpoint: impl Into<String>,
bucket: impl Into<String>,
access_key: impl Into<String>,
secret_key: impl Into<String>,
region: impl Into<String>,
) -> Result<Self, StorageError> {
let bucket = bucket.into();
let endpoint = endpoint.into();
let store = AmazonS3Builder::new()
.with_endpoint(endpoint.clone())
.with_bucket_name(&bucket)
.with_access_key_id(access_key.into())
.with_secret_access_key(secret_key.into())
.with_region(region.into())
.with_allow_http(true)
.with_virtual_hosted_style_request(false)
.with_conditional_put(S3ConditionalPut::ETagMatch)
.build()
.map_err(|e| StorageError::Permanent {
uri: format!("s3://{bucket} @ {endpoint}"),
source: Box::new(e),
})?;
Ok(Self {
bucket,
prefix: String::new(),
store: Arc::new(store),
})
}
pub fn new_with_endpoint_and_prefix(
endpoint: impl Into<String>,
bucket: impl Into<String>,
access_key: impl Into<String>,
secret_key: impl Into<String>,
region: impl Into<String>,
prefix: impl Into<String>,
) -> Result<Self, StorageError> {
let mut provider =
Self::new_with_endpoint(endpoint, bucket, access_key, secret_key, region)?;
provider.prefix = normalize_prefix(prefix);
Ok(provider)
}
pub fn from_object_store(bucket: impl Into<String>, store: AmazonS3) -> Self {
Self {
bucket: bucket.into(),
prefix: String::new(),
store: Arc::new(store),
}
}
pub fn bucket(&self) -> &str {
&self.bucket
}
pub fn prefix(&self) -> &str {
&self.prefix
}
fn key(&self, uri: &str) -> String {
let uri = uri.trim_start_matches('/');
if self.prefix.is_empty() {
uri.to_string()
} else {
format!("{}/{uri}", self.prefix)
}
}
fn path(&self, uri: &str) -> Result<ObjPath, StorageError> {
let key = self.key(uri);
ObjPath::parse(&key).map_err(|e| StorageError::Permanent {
uri: uri.into(),
source: Box::new(e),
})
}
}
fn normalize_prefix(prefix: impl Into<String>) -> String {
prefix.into().trim_matches('/').to_string()
}
const S3_POOL_MAX_IDLE_PER_HOST: usize = 1024;
const S3_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
const S3_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
fn tuned_client_options() -> ClientOptions {
ClientOptions::new()
.with_pool_max_idle_per_host(S3_POOL_MAX_IDLE_PER_HOST)
.with_pool_idle_timeout(S3_POOL_IDLE_TIMEOUT)
.with_connect_timeout(S3_CONNECT_TIMEOUT)
}
fn translate(uri: &str, e: ObjError) -> StorageError {
match e {
ObjError::NotFound { .. } => StorageError::NotFound { uri: uri.into() },
ObjError::AlreadyExists { .. } | ObjError::Precondition { .. } => {
StorageError::PreconditionFailed { uri: uri.into() }
}
ObjError::Generic { source, .. } => StorageError::TransientExhausted {
uri: uri.into(),
source,
},
other => StorageError::Permanent {
uri: uri.into(),
source: Box::new(other),
},
}
}
#[async_trait]
impl StorageProvider for S3StorageProvider {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
let path = self.path(uri)?;
let meta = self
.store
.head(&path)
.await
.map_err(|e| translate(uri, e))?;
Ok(ObjectMeta {
size: meta.size as u64,
etag: meta.e_tag,
last_modified: meta.last_modified.into(),
})
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
let path = self.path(uri)?;
retry::with_reissue(|| async {
let result = self.store.get(&path).await.map_err(|e| translate(uri, e))?;
let meta = ObjectMeta {
size: result.meta.size as u64,
etag: result.meta.e_tag.clone(),
last_modified: result.meta.last_modified.into(),
};
let bytes = result.bytes().await.map_err(|e| translate(uri, e))?;
Ok((bytes, meta))
})
.await
}
async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError> {
let path = self.path(uri)?;
retry::complete_range(uri, range, |r| async {
self.store
.get_range(&path, r)
.await
.map_err(|e| translate(uri, e))
})
.await
}
async fn tail(&self, uri: &str, len: u64) -> Result<(Bytes, u64), StorageError> {
if len == 0 {
let meta = self.head(uri).await?;
return Ok((Bytes::new(), meta.size));
}
let path = self.path(uri)?;
retry::with_reissue(|| async {
let opts = GetOptions {
range: Some(GetRange::Suffix(len)),
..Default::default()
};
let result = self
.store
.get_opts(&path, opts)
.await
.map_err(|e| translate(uri, e))?;
let size = result.meta.size as u64;
let bytes = result.bytes().await.map_err(|e| translate(uri, e))?;
Ok((bytes, size))
})
.await
}
async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError> {
let path = self.path(uri)?;
let opts = PutOptions {
mode: PutMode::Create,
..Default::default()
};
self.store
.put_opts(&path, PutPayload::from_bytes(bytes), opts)
.await
.map(|r| r.e_tag)
.map_err(|e| translate(uri, e))
}
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
expected_etag: Option<&str>,
) -> Result<Option<String>, StorageError> {
let path = self.path(uri)?;
let opts = match expected_etag {
None => PutOptions {
mode: PutMode::Create,
..Default::default()
},
Some(expected) => PutOptions {
mode: PutMode::Update(UpdateVersion {
e_tag: Some(expected.to_string()),
version: None,
}),
..Default::default()
},
};
self.store
.put_opts(&path, PutPayload::from_bytes(bytes), opts)
.await
.map(|r| r.e_tag)
.map_err(|e| translate(uri, e))
}
async fn put_multipart(&self, uri: &str) -> Result<Box<dyn MultipartUpload>, StorageError> {
let path = self.path(uri)?;
self.store
.put_multipart(&path)
.await
.map_err(|e| translate(uri, e))
}
async fn delete(&self, uri: &str) -> Result<(), StorageError> {
let path = self.path(uri)?;
match self.store.delete(&path).await {
Ok(()) => Ok(()),
Err(ObjError::NotFound { .. }) => Ok(()),
Err(e) => Err(translate(uri, e)),
}
}
async fn list_with_prefix_metadata(
&self,
prefix: &str,
) -> Result<Vec<(String, ObjectMeta)>, StorageError> {
let path = ObjPath::from(prefix);
let mut stream = self.store.list(Some(&path));
let mut out = Vec::new();
while let Some(meta) = stream.try_next().await.map_err(|e| translate(prefix, e))? {
out.push((
meta.location.to_string(),
ObjectMeta {
size: meta.size,
etag: meta.e_tag,
last_modified: meta.last_modified.into(),
},
));
}
Ok(out)
}
fn object_store_handle(&self, uri: &str) -> Option<(Arc<dyn ObjectStore>, ObjPath)> {
let path = self.path(uri).ok()?;
Some((Arc::clone(&self.store) as Arc<dyn ObjectStore>, path))
}
}
#[cfg(test)]
mod tests {
use std::{fs::create_dir_all, net::SocketAddr};
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use s3s_fs::FileSystem;
use tempfile::TempDir;
use tokio::net::TcpListener;
use super::*;
const HARNESS_BUCKET: &str = "infino-s3-unit";
const HARNESS_REGION: &str = "us-east-1";
const HARNESS_ACCESS_KEY: &str = "AKIAIOSFODNN7EXAMPLE";
const HARNESS_SECRET_KEY: &str = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
async fn spawn_s3s_fs() -> (SocketAddr, TempDir) {
let fs_root = TempDir::new().expect("s3s-fs root tempdir");
create_dir_all(fs_root.path().join(HARNESS_BUCKET)).expect("create bucket dir");
let fs_backend = FileSystem::new(fs_root.path()).expect("s3s-fs FileSystem");
let service = {
let mut b = S3ServiceBuilder::new(fs_backend);
b.set_auth(SimpleAuth::from_single(
HARNESS_ACCESS_KEY,
HARNESS_SECRET_KEY,
));
b.build()
};
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local_addr");
tokio::spawn(async move {
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
};
let http = ConnBuilder::new(TokioExecutor::new());
loop {
let (stream, _peer) = match listener.accept().await {
Ok(t) => t,
Err(_) => break,
};
let service = service.clone();
let http = http.clone();
tokio::spawn(async move {
let _ = http.serve_connection(TokioIo::new(stream), service).await;
});
}
});
(addr, fs_root)
}
async fn harness_provider() -> (S3StorageProvider, TempDir) {
let (addr, guard) = spawn_s3s_fs().await;
let endpoint = format!("http://{addr}");
let provider = S3StorageProvider::new_with_endpoint(
endpoint,
HARNESS_BUCKET,
HARNESS_ACCESS_KEY,
HARNESS_SECRET_KEY,
HARNESS_REGION,
)
.expect("construct provider against in-process s3s-fs");
(provider, guard)
}
#[test]
fn translate_not_found_to_typed_variant() {
let err = translate(
"some/key",
ObjError::NotFound {
path: "some/key".into(),
source: "raw".into(),
},
);
match err {
StorageError::NotFound { uri } => assert_eq!(uri, "some/key"),
other => panic!("expected NotFound; got {other:?}"),
}
}
#[test]
fn translate_already_exists_to_precondition_failed() {
let err = translate(
"k",
ObjError::AlreadyExists {
path: "k".into(),
source: "raw".into(),
},
);
assert!(matches!(err, StorageError::PreconditionFailed { uri } if uri == "k"));
}
#[test]
fn translate_precondition_to_precondition_failed() {
let err = translate(
"k",
ObjError::Precondition {
path: "k".into(),
source: "raw".into(),
},
);
assert!(matches!(err, StorageError::PreconditionFailed { uri } if uri == "k"));
}
#[test]
fn translate_generic_to_transient_exhausted() {
let err = translate(
"k",
ObjError::Generic {
store: "S3",
source: "boom".into(),
},
);
match err {
StorageError::TransientExhausted { uri, .. } => assert_eq!(uri, "k"),
other => panic!("expected TransientExhausted; got {other:?}"),
}
}
#[test]
fn translate_other_variant_to_permanent() {
let err = translate(
"k",
ObjError::UnknownConfigurationKey {
store: "S3",
key: "foo".into(),
},
);
match err {
StorageError::Permanent { uri, .. } => assert_eq!(uri, "k"),
other => panic!("expected Permanent; got {other:?}"),
}
}
#[test]
fn path_parses_simple_uri() {
let p = endpoint_provider().path("foo/bar.txt").expect("parse");
assert_eq!(p.to_string(), "foo/bar.txt");
}
#[test]
fn path_parses_nested_uri() {
let p = endpoint_provider()
.path("manifest-lists/list-000042.json")
.expect("parse");
assert_eq!(p.to_string(), "manifest-lists/list-000042.json");
}
fn endpoint_provider() -> S3StorageProvider {
S3StorageProvider::new_with_endpoint(
"http://127.0.0.1:1",
"test-bucket",
"AKIATESTKEY",
"secret/example",
"us-east-1",
)
.expect("construct with endpoint")
}
#[test]
fn new_with_endpoint_builds_succeeds_and_exposes_bucket() {
let p = endpoint_provider();
assert_eq!(p.bucket(), "test-bucket");
}
#[test]
fn from_object_store_preserves_bucket() {
let store = AmazonS3Builder::new()
.with_endpoint("http://127.0.0.1:1")
.with_bucket_name("hatch-bucket")
.with_access_key_id("AKIATESTKEY")
.with_secret_access_key("secret")
.with_region("us-east-1")
.with_allow_http(true)
.with_virtual_hosted_style_request(false)
.build()
.expect("build AmazonS3");
let p = S3StorageProvider::from_object_store("hatch-bucket", store);
assert_eq!(p.bucket(), "hatch-bucket");
}
#[test]
fn debug_impl_does_not_panic() {
let p = endpoint_provider();
let s = format!("{p:?}");
assert!(s.contains("S3StorageProvider"));
}
#[test]
fn normalize_prefix_trims_surrounding_slashes() {
assert_eq!(normalize_prefix("/tbl/"), "tbl");
assert_eq!(normalize_prefix("///a/b///"), "a/b");
assert_eq!(normalize_prefix("plain"), "plain");
assert_eq!(normalize_prefix(""), "");
}
#[test]
fn key_without_prefix_strips_leading_slash() {
let p = endpoint_provider();
assert_eq!(p.prefix(), "");
assert_eq!(p.key("/foo/bar"), "foo/bar");
assert_eq!(p.key("foo/bar"), "foo/bar");
}
#[test]
fn key_with_prefix_prepends_and_strips_leading_slash() {
let mut p = endpoint_provider();
p.prefix = "tbl".into();
assert_eq!(p.prefix(), "tbl");
assert_eq!(p.key("data/seg-1"), "tbl/data/seg-1");
assert_eq!(p.key("/data/seg-1"), "tbl/data/seg-1");
}
#[test]
fn new_with_endpoint_and_prefix_normalizes_and_applies_prefix() {
let p = S3StorageProvider::new_with_endpoint_and_prefix(
"http://127.0.0.1:1",
"b",
"AKIATESTKEY",
"secret",
"us-east-1",
"/scoped/tbl/",
)
.expect("construct with endpoint + prefix");
assert_eq!(p.bucket(), "b");
assert_eq!(p.prefix(), "scoped/tbl");
assert_eq!(p.key("data/seg-1"), "scoped/tbl/data/seg-1");
}
#[test]
fn object_store_handle_returns_path_under_prefix() {
let mut p = endpoint_provider();
p.prefix = "tbl".into();
let (_, path) = p
.object_store_handle("data/seg-1")
.expect("handle for valid uri");
assert_eq!(path.to_string(), "tbl/data/seg-1");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_atomic_then_get_round_trips() {
let (p, _guard) = harness_provider().await;
let body = Bytes::from_static(b"hello-unit-s3");
p.put_atomic("k/hello.txt", body.clone())
.await
.expect("put_atomic");
let (got, meta) = p.get("k/hello.txt").await.expect("get");
assert_eq!(got, body);
assert_eq!(meta.size, body.len() as u64);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_atomic_twice_is_precondition_failed() {
let (p, _guard) = harness_provider().await;
let body = Bytes::from_static(b"first");
p.put_atomic("k/dup", body.clone())
.await
.expect("first put");
let err = p
.put_atomic("k/dup", Bytes::from_static(b"second"))
.await
.expect_err("second create must fail");
assert!(
matches!(err, StorageError::PreconditionFailed { .. }),
"expected PreconditionFailed; got {err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_missing_is_not_found() {
let (p, _guard) = harness_provider().await;
let err = p.get("k/absent").await.expect_err("get missing must fail");
assert!(
matches!(err, StorageError::NotFound { .. }),
"expected NotFound; got {err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn head_missing_is_not_found() {
let (p, _guard) = harness_provider().await;
let err = p.head("k/absent").await.expect_err("head missing fails");
assert!(
matches!(err, StorageError::NotFound { .. }),
"expected NotFound; got {err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn head_reports_size() {
let (p, _guard) = harness_provider().await;
let body = Bytes::from_static(b"0123456789");
p.put_atomic("k/sized", body.clone())
.await
.expect("put_atomic");
let meta = p.head("k/sized").await.expect("head");
assert_eq!(meta.size, body.len() as u64);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_range_returns_subslice() {
let (p, _guard) = harness_provider().await;
let body: Vec<u8> = (0..=255u8).collect();
p.put_atomic("k/range.bin", Bytes::from(body.clone()))
.await
.expect("put_atomic");
let got = p.get_range("k/range.bin", 10..20).await.expect("get_range");
assert_eq!(&got[..], &body[10..20]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tail_returns_trailing_bytes_and_size() {
let (p, _guard) = harness_provider().await;
let body: Vec<u8> = (0..200u8).collect();
p.put_atomic("k/tail.bin", Bytes::from(body.clone()))
.await
.expect("put_atomic");
let (tail, size) = p.tail("k/tail.bin", 32).await.expect("tail");
assert_eq!(size, body.len() as u64);
assert_eq!(&tail[..], &body[body.len() - 32..]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tail_zero_len_falls_back_to_head_for_size() {
let (p, _guard) = harness_provider().await;
let body = Bytes::from_static(b"abcdef");
p.put_atomic("k/tail0.bin", body.clone())
.await
.expect("put_atomic");
let (tail, size) = p.tail("k/tail0.bin", 0).await.expect("zero-len tail");
assert!(tail.is_empty());
assert_eq!(size, body.len() as u64);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn delete_removes_object() {
let (p, _guard) = harness_provider().await;
p.put_atomic("k/del", Bytes::from_static(b"x"))
.await
.expect("put_atomic");
p.delete("k/del").await.expect("delete existing");
let err = p.get("k/del").await.expect_err("deleted object gone");
assert!(matches!(err, StorageError::NotFound { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_with_prefix_returns_matching_keys() {
let (p, _guard) = harness_provider().await;
p.put_atomic("list/a.txt", Bytes::from_static(b"a"))
.await
.expect("put a");
p.put_atomic("list/b.txt", Bytes::from_static(b"b"))
.await
.expect("put b");
p.put_atomic("other/c.txt", Bytes::from_static(b"c"))
.await
.expect("put c");
let mut keys = p.list_with_prefix("list/").await.expect("list");
keys.sort();
assert_eq!(
keys,
vec!["list/a.txt".to_string(), "list/b.txt".to_string()]
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_if_match_none_is_create_only() {
let (p, _guard) = harness_provider().await;
p.put_if_match("k/cas", Bytes::from_static(b"v1"), None)
.await
.expect("create-if-absent");
let err = p
.put_if_match("k/cas", Bytes::from_static(b"v2"), None)
.await
.expect_err("second create-if-absent must fail");
assert!(
matches!(err, StorageError::PreconditionFailed { .. }),
"expected PreconditionFailed; got {err:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_if_match_etag_update_succeeds_with_matching_etag() {
let (p, _guard) = harness_provider().await;
let etag = p
.put_atomic("k/upd", Bytes::from_static(b"v1"))
.await
.expect("initial put")
.expect("s3 returns an etag on create");
p.put_if_match("k/upd", Bytes::from_static(b"v2"), Some(&etag))
.await
.expect("update with matching etag");
let (got, _) = p.get("k/upd").await.expect("get latest");
assert_eq!(&got[..], b"v2");
}
}