1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
//! # S3 high-performance algorithms
//! High-performance algorithms for batch operations in Amazon S3.
//!
//! https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance-guidelines.html
//!
//! - Upload multiple files with `S3Algo::upload_files`.
//! - List files with `S3Algo::s3_list_objects` or `S3Algo::s3_list_prefix`,
//! and then execute deletion or copy on all the files.
use crate::timeout::*;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::region::RegionProviderChain;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::retry::RetryConfig;
use aws_sdk_s3::Client;
use futures::future::{Future, TryFutureExt};
use futures::prelude::*;
use futures::stream;
use futures_retry::{FutureRetry, RetryPolicy};
use futures_stopwatch::try_stopwatch;
use snafu::futures::TryFutureExt as S;
use snafu::ResultExt;
use std::marker::Unpin;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
mod config;
pub mod err;
mod list_actions;
mod upload;
pub use list_actions::*;
pub use upload::*;
pub mod timeout;
pub use config::*;
pub use err::Error;
#[cfg(test)]
mod test;
#[derive(Clone)]
pub struct S3Algo {
s3: Client,
config: Config,
}
impl S3Algo {
pub fn new(s3: Client) -> Self {
Self {
s3,
config: Config::default(),
}
}
pub fn with_config(s3: Client, config: Config) -> Self {
Self { s3, config }
}
}
/// Result of a single S3 request.
#[derive(Debug, Clone, Copy)]
pub struct RequestReport {
/// The number of this request in a series of multiple requests (0 if not applicable)
pub seq: usize,
/// Size of request - in bytes or in number of objects, depending on the type of request.
pub size: usize,
/// The total time including all retries
pub total_time: Duration,
/// The time of the successful request
pub success_time: Duration,
/// Number of attempts. A value of `1` means no retries - success on first attempt.
pub attempts: usize,
/// Estimated sec/unit that was used in this request. Useful for
/// debugging the upload algorithm and not much more.
pub est: f64,
}
/// Issue a single S3 request, with retries and appropriate timeouts using sane defaults.
/// Basically an easier, less general version of `s3_request`.
///
/// `extra_initial_timeout`: initial timeout of request (will increase with backoff) added to
/// `cfg.base_timeout`. It can be set to 0 if the S3 operation is a small one, but if the operation
/// size depends on for example a byte count or object count, set it to something that depends on
/// that.
pub async fn s3_single_request<F, G, R>(
future_factory: F,
extra_initial_timeout_s: f64,
) -> Result<(RequestReport, R), Error>
where
F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
G: Future<Output = Result<R, Error>> + Send,
{
// Configure a one-time Timeout that gives the desired initial_timeout_s on first try.
// We tell `s3_request` that the request is of size `1`
let timeout = TimeoutState::new(
AlgorithmConfig::default(),
SpecificTimings {
seconds_per_unit: extra_initial_timeout_s,
minimum_units_for_estimation: 0, // doesn't matter
},
);
s3_request(
move || {
let factory = future_factory.clone();
async move { Ok((factory(), 1)) }
},
|_, size| size,
10,
Arc::new(Mutex::new(timeout)),
)
.await
}
/// Every request to S3 should be issued with `s3_request`, which puts the appropriate timeouts and
/// retries the request, as well as times it.
///
/// `future_factory` is a bit funky, being a closure that returns a future that resolves to another
/// future. We need the closure F to run the request multiple times. Its return type G is a future
/// because it might need to for example open a file using async, which might then be used in H to
/// stream from the file...
/// This is needed so that we can get e.g. the length of the file before streaming to S3.
///
/// `get_size(report, expected)`: get the real size of the request. For some types of requests
/// (e.g. DeleteObjects/PutObject), we know the size upfront, so real size = expected.
/// For others (ListObjectsV2), we need to result of the action to know the size.
/// The size returned from this function is only used to construct the `RequestReport`, which in
/// turn is only useful for eventual progress closures. So the existence of `get_size` parameter is
/// due to the feature of monitoring progress.
///
/// The "expected" size returned by `future_factory` on the other hand is needed to calculate the
/// timeout.
pub(crate) async fn s3_request<F, G, H, T, R, S>(
future_factory: F,
get_size: S,
n_retries: usize,
timeout: Arc<Mutex<T>>,
) -> Result<(RequestReport, R), Error>
where
F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
G: Future<Output = Result<(H, usize), Error>> + Send,
H: Future<Output = Result<R, Error>> + Send,
S: Fn(&R, usize) -> usize + Unpin + Clone + Send + Sync + 'static,
T: timeout::Timeout,
{
let mut attempts1 = 0;
let mut attempts2 = 0;
try_stopwatch(
// Time the entire file upload (across all retries)
FutureRetry::new(
// Future factory - creates a future that reads file while uploading it
move || {
let (future_factory, timeout, get_size) =
(future_factory.clone(), timeout.clone(), get_size.clone());
async move {
attempts1 += 1;
let (request, expected_size) = future_factory().await?;
let (est, timeout_value) = {
let t = timeout.lock().await;
(t.get_estimate(), t.get_timeout(expected_size, attempts1))
};
try_stopwatch(
tokio::time::timeout(timeout_value, request)
.with_context(|| err::Timeout {})
.map(|result| result.and_then(|x| x)), // flatten the Result<Result<(), err>, timeout err>
)
.map_ok(move |(response, success_time)| {
let real_size = get_size(&response, expected_size);
(response, success_time, real_size, est)
})
.await
}
},
// retry function
{
move |e| {
attempts2 += 1;
if attempts2 > n_retries {
RetryPolicy::ForwardError(e)
} else {
RetryPolicy::WaitRetry(Duration::from_millis(200)) // TODO adjust the time, maybe depending on retries
}
}
},
),
)
.await
.map(
move |(((response, success_time, size, est), attempts), total_time)| {
(
RequestReport {
seq: 0,
size,
total_time,
success_time,
attempts,
est,
},
response,
)
},
)
.map_err(|(err, _attempts)| err)
}
pub async fn retriable_s3_client() -> Client {
let retry_config = RetryConfig::standard()
.with_max_attempts(3)
.with_initial_backoff(Duration::from_secs(10));
let region_provider = RegionProviderChain::default_provider();
let sdk_config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.region(region_provider)
.load()
.await;
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
s3_config_builder.set_retry_config(Some(retry_config));
aws_sdk_s3::Client::from_conf(s3_config_builder.build())
}
pub async fn testing_sdk_client() -> Client {
let retry_config = RetryConfig::standard()
.with_max_attempts(3)
.with_initial_backoff(Duration::from_secs(10));
let credentials_provider = DefaultCredentialsChain::builder()
.profile_name("testing")
.build()
.await;
let region_provider = RegionProviderChain::first_try("EuWest1");
let sdk_config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.region(region_provider)
.endpoint_url("http://localhost:9000")
.credentials_provider(credentials_provider)
.load()
.await;
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
s3_config_builder.set_retry_config(Some(retry_config));
s3_config_builder.set_force_path_style(Some(true));
Client::from_conf(s3_config_builder.build())
}