use crate::timeout::*;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::region::RegionProviderChain;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::retry::RetryConfig;
use aws_sdk_s3::Client;
use futures::future::{Future, TryFutureExt};
use futures::prelude::*;
use futures::stream;
use futures_retry::{FutureRetry, RetryPolicy};
use futures_stopwatch::try_stopwatch;
use snafu::futures::TryFutureExt as S;
use snafu::ResultExt;
use std::marker::Unpin;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
mod config;
pub mod err;
mod list_actions;
mod upload;
pub use list_actions::*;
pub use upload::*;
pub mod timeout;
pub use config::*;
pub use err::Error;
#[cfg(test)]
mod test;
#[derive(Clone)]
pub struct S3Algo {
s3: Client,
config: Config,
}
impl S3Algo {
pub fn new(s3: Client) -> Self {
Self {
s3,
config: Config::default(),
}
}
pub fn with_config(s3: Client, config: Config) -> Self {
Self { s3, config }
}
}
#[derive(Debug, Clone, Copy)]
pub struct RequestReport {
pub seq: usize,
pub size: usize,
pub total_time: Duration,
pub success_time: Duration,
pub attempts: usize,
pub est: f64,
}
pub async fn s3_single_request<F, G, R>(
future_factory: F,
extra_initial_timeout_s: f64,
) -> Result<(RequestReport, R), Error>
where
F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
G: Future<Output = Result<R, Error>> + Send,
{
let timeout = TimeoutState::new(
AlgorithmConfig::default(),
SpecificTimings {
seconds_per_unit: extra_initial_timeout_s,
minimum_units_for_estimation: 0, },
);
s3_request(
move || {
let factory = future_factory.clone();
async move { Ok((factory(), 1)) }
},
|_, size| size,
10,
Arc::new(Mutex::new(timeout)),
)
.await
}
pub(crate) async fn s3_request<F, G, H, T, R, S>(
future_factory: F,
get_size: S,
n_retries: usize,
timeout: Arc<Mutex<T>>,
) -> Result<(RequestReport, R), Error>
where
F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
G: Future<Output = Result<(H, usize), Error>> + Send,
H: Future<Output = Result<R, Error>> + Send,
S: Fn(&R, usize) -> usize + Unpin + Clone + Send + Sync + 'static,
T: timeout::Timeout,
{
let mut attempts1 = 0;
let mut attempts2 = 0;
try_stopwatch(
FutureRetry::new(
move || {
let (future_factory, timeout, get_size) =
(future_factory.clone(), timeout.clone(), get_size.clone());
async move {
attempts1 += 1;
let (request, expected_size) = future_factory().await?;
let (est, timeout_value) = {
let t = timeout.lock().await;
(t.get_estimate(), t.get_timeout(expected_size, attempts1))
};
try_stopwatch(
tokio::time::timeout(timeout_value, request)
.with_context(|| err::Timeout {})
.map(|result| result.and_then(|x| x)), )
.map_ok(move |(response, success_time)| {
let real_size = get_size(&response, expected_size);
(response, success_time, real_size, est)
})
.await
}
},
{
move |e| {
attempts2 += 1;
if attempts2 > n_retries {
RetryPolicy::ForwardError(e)
} else {
RetryPolicy::WaitRetry(Duration::from_millis(200)) }
}
},
),
)
.await
.map(
move |(((response, success_time, size, est), attempts), total_time)| {
(
RequestReport {
seq: 0,
size,
total_time,
success_time,
attempts,
est,
},
response,
)
},
)
.map_err(|(err, _attempts)| err)
}
pub async fn retriable_s3_client() -> Client {
let retry_config = RetryConfig::standard()
.with_max_attempts(3)
.with_initial_backoff(Duration::from_secs(10));
let region_provider = RegionProviderChain::default_provider();
let sdk_config = aws_config::defaults(BehaviorVersion::v2026_01_12())
.region(region_provider)
.load()
.await;
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
s3_config_builder.set_retry_config(Some(retry_config));
aws_sdk_s3::Client::from_conf(s3_config_builder.build())
}
pub async fn testing_sdk_client() -> Client {
let retry_config = RetryConfig::standard()
.with_max_attempts(3)
.with_initial_backoff(Duration::from_secs(10));
let credentials_provider = DefaultCredentialsChain::builder()
.profile_name("testing")
.build()
.await;
let region_provider = RegionProviderChain::first_try("EuWest1");
let sdk_config = aws_config::defaults(BehaviorVersion::v2026_01_12())
.region(region_provider)
.endpoint_url("http://localhost:9000")
.credentials_provider(credentials_provider)
.load()
.await;
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
s3_config_builder.set_retry_config(Some(retry_config));
s3_config_builder.set_force_path_style(Some(true));
Client::from_conf(s3_config_builder.build())
}