use crate::access::S3Access;
use crate::auth::S3Auth;
use crate::config::{S3ConfigProvider, StaticConfigProvider};
use crate::host::S3Host;
use crate::http::{Body, Request};
use crate::route::S3Route;
use crate::s3_trait::S3;
use crate::validation::NameValidation;
use crate::{HttpError, HttpRequest, HttpResponse};
use std::any::TypeId;
use std::fmt;
use std::sync::Arc;
use bytes::Bytes;
use futures::future::BoxFuture;
use tracing::{debug, error};
pub struct S3ServiceBuilder {
s3: Arc<dyn S3>,
config: Option<Arc<dyn S3ConfigProvider>>,
host: Option<Box<dyn S3Host>>,
auth: Option<Box<dyn S3Auth>>,
access: Option<Box<dyn S3Access>>,
route: Option<Box<dyn S3Route>>,
validation: Option<Box<dyn NameValidation>>,
}
impl S3ServiceBuilder {
#[must_use]
pub fn new(s3: impl S3) -> Self {
Self {
s3: Arc::new(s3),
config: None,
host: None,
auth: None,
access: None,
route: None,
validation: None,
}
}
pub fn set_config(&mut self, config: Arc<dyn S3ConfigProvider>) {
self.config = Some(config);
}
pub fn set_host(&mut self, host: impl S3Host) {
self.host = Some(Box::new(host));
}
pub fn set_auth(&mut self, auth: impl S3Auth) {
self.auth = Some(Box::new(auth));
}
pub fn set_access(&mut self, access: impl S3Access) {
self.access = Some(Box::new(access));
}
pub fn set_route(&mut self, route: impl S3Route) {
self.route = Some(Box::new(route));
}
pub fn set_validation(&mut self, validation: impl NameValidation) {
self.validation = Some(Box::new(validation));
}
#[must_use]
pub fn build(self) -> S3Service {
let config = self.config.unwrap_or_else(|| Arc::new(StaticConfigProvider::default()));
S3Service {
inner: Arc::new(Inner {
s3: self.s3,
config,
host: self.host,
auth: self.auth,
access: self.access,
route: self.route,
validation: self.validation,
}),
}
}
}
#[derive(Clone)]
pub struct S3Service {
inner: Arc<Inner>,
}
struct Inner {
s3: Arc<dyn S3>,
config: Arc<dyn S3ConfigProvider>,
host: Option<Box<dyn S3Host>>,
auth: Option<Box<dyn S3Auth>>,
access: Option<Box<dyn S3Access>>,
route: Option<Box<dyn S3Route>>,
validation: Option<Box<dyn NameValidation>>,
}
impl S3Service {
#[tracing::instrument(
level = "debug",
skip(self, req),
fields(start_time=?crate::time::now_utc())
)]
pub async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
debug!(?req);
let t0 = crate::time::Instant::now();
let mut req = Request::from(req);
let ccx = crate::ops::CallContext {
s3: &self.inner.s3,
config: &self.inner.config,
host: self.inner.host.as_deref(),
auth: self.inner.auth.as_deref(),
access: self.inner.access.as_deref(),
route: self.inner.route.as_deref(),
validation: self.inner.validation.as_deref(),
};
let result = match crate::ops::call(&mut req, &ccx).await {
Ok(resp) => Ok(HttpResponse::from(resp)),
Err(err) => Err(HttpError::new(Box::new(err))),
};
let duration = t0.elapsed();
match result {
Ok(ref resp) => {
if resp.status().is_server_error() {
error!(?duration, ?resp);
} else {
debug!(?duration, ?resp);
}
}
Err(ref err) => error!(?duration, ?err),
}
result
}
async fn call_owned(self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
self.call(req).await
}
}
impl fmt::Debug for S3Service {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Service").finish_non_exhaustive()
}
}
impl hyper::service::Service<http::Request<hyper::body::Incoming>> for S3Service {
type Response = HttpResponse;
type Error = HttpError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
let req = req.map(Body::from);
let service = self.clone();
Box::pin(service.call_owned(req))
}
}
impl<B> tower::Service<http::Request<B>> for S3Service
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
type Response = HttpResponse;
type Error = HttpError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
let req = if TypeId::of::<B>() == TypeId::of::<hyper::body::Incoming>() {
let (parts, body) = req.into_parts();
let mut slot = Some(body);
let body = (&mut slot as &mut dyn std::any::Any)
.downcast_mut::<Option<hyper::body::Incoming>>()
.unwrap()
.take()
.unwrap();
http::Request::from_parts(parts, Body::from(body))
} else {
req.map(Body::http_body_unsync)
};
let service = self.clone();
Box::pin(service.call_owned(req))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{S3Error, S3Request, S3Response};
use stdx::mem::output_size;
macro_rules! print_future_size {
($func:path) => {
println!("{:<24}: {}", stringify!($func), output_size(&$func));
};
}
macro_rules! print_type_size {
($ty:path) => {
println!("{:<24}: {}", stringify!($ty), std::mem::size_of::<$ty>());
};
}
#[test]
fn future_size() {
print_type_size!(std::time::Instant);
print_type_size!(HttpRequest);
print_type_size!(HttpResponse);
print_type_size!(HttpError);
print_type_size!(S3Request<()>);
print_type_size!(S3Response<()>);
print_type_size!(S3Error);
print_type_size!(S3Service);
print_future_size!(crate::ops::call);
print_future_size!(S3Service::call);
print_future_size!(S3Service::call_owned);
assert!(output_size(&crate::ops::call) <= 1600);
assert!(output_size(&S3Service::call) <= 3000);
assert!(output_size(&S3Service::call_owned) <= 3300);
}
use crate::validation::NameValidation;
struct MockS3;
impl S3 for MockS3 {}
struct RelaxedValidation;
impl NameValidation for RelaxedValidation {
fn validate_bucket_name(&self, _name: &str) -> bool {
true }
}
#[test]
fn test_service_builder_validation() {
let validation = RelaxedValidation;
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_validation(validation);
let service = builder.build();
assert!(service.inner.validation.is_some());
}
#[test]
fn test_service_builder_default_validation() {
let builder = S3ServiceBuilder::new(MockS3);
let service = builder.build();
assert!(service.inner.validation.is_none()); }
#[test]
fn test_service_builder_default_config() {
let builder = S3ServiceBuilder::new(MockS3);
let service = builder.build();
let config = service.inner.config.snapshot();
assert_eq!(config.xml_max_body_size, 20 * 1024 * 1024);
assert_eq!(config.post_object_max_file_size, 5 * 1024 * 1024 * 1024);
}
#[test]
fn test_service_builder_custom_config() {
use crate::config::{HotReloadConfigProvider, S3Config};
let custom_config = Arc::new(HotReloadConfigProvider::new(Arc::new(S3Config {
xml_max_body_size: 10 * 1024 * 1024,
post_object_max_file_size: 2 * 1024 * 1024 * 1024,
..Default::default()
})));
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_config(custom_config);
let service = builder.build();
let config = service.inner.config.snapshot();
assert_eq!(config.xml_max_body_size, 10 * 1024 * 1024);
assert_eq!(config.post_object_max_file_size, 2 * 1024 * 1024 * 1024);
}
#[test]
fn test_service_builder_hot_reload_config() {
use crate::config::{HotReloadConfigProvider, S3Config};
let hot_config = Arc::new(HotReloadConfigProvider::new(Arc::new(S3Config::default())));
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_config(hot_config.clone());
let service = builder.build();
let config = service.inner.config.snapshot();
assert_eq!(config.xml_max_body_size, 20 * 1024 * 1024);
hot_config.update(Arc::new(S3Config {
xml_max_body_size: 30 * 1024 * 1024,
..Default::default()
}));
let new_config = service.inner.config.snapshot();
assert_eq!(new_config.xml_max_body_size, 30 * 1024 * 1024);
}
#[test]
fn test_service_builder_static_config() {
use crate::config::{S3Config, StaticConfigProvider};
let static_config = Arc::new(StaticConfigProvider::new(Arc::new(S3Config {
xml_max_body_size: 10 * 1024 * 1024,
..Default::default()
})));
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_config(static_config);
let service = builder.build();
let config = service.inner.config.snapshot();
assert_eq!(config.xml_max_body_size, 10 * 1024 * 1024);
}
#[test]
fn test_service_builder_set_auth() {
use crate::auth::SimpleAuth;
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_auth(SimpleAuth::from_single("AK", "SK"));
let service = builder.build();
assert!(service.inner.auth.is_some());
}
#[test]
fn test_service_builder_set_host() {
use crate::host::SingleDomain;
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_host(SingleDomain::new("s3.example.com").unwrap());
let service = builder.build();
assert!(service.inner.host.is_some());
}
#[test]
fn test_service_builder_set_route() {
use crate::route::S3Route;
#[derive(Clone)]
struct NoopRoute;
#[async_trait::async_trait]
impl S3Route for NoopRoute {
fn is_match(&self, _: &http::Method, _: &http::Uri, _: &http::HeaderMap, _: &mut http::Extensions) -> bool {
false
}
async fn call(&self, _: crate::S3Request<crate::Body>) -> crate::S3Result<crate::S3Response<crate::Body>> {
Err(crate::s3_error!(NotImplemented))
}
}
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_route(NoopRoute);
let service = builder.build();
assert!(service.inner.route.is_some());
}
#[test]
fn test_service_builder_set_access() {
use crate::access::{S3Access, S3AccessContext};
#[derive(Clone)]
struct AllowAll;
#[async_trait::async_trait]
impl S3Access for AllowAll {
async fn check(&self, _: &mut S3AccessContext<'_>) -> crate::S3Result<()> {
Ok(())
}
}
let mut builder = S3ServiceBuilder::new(MockS3);
builder.set_access(AllowAll);
let service = builder.build();
assert!(service.inner.access.is_some());
}
#[test]
fn test_service_debug() {
let service = S3ServiceBuilder::new(MockS3).build();
let dbg = format!("{service:?}");
assert!(dbg.contains("S3Service"));
}
#[test]
fn test_service_clone() {
let service = S3ServiceBuilder::new(MockS3).build();
let cloned = service.clone();
assert!(format!("{cloned:?}").contains("S3Service"));
}
}