s3_algo/
lib.rs

1//! # S3 high-performance algorithms
2//! High-performance algorithms for batch operations in Amazon S3.
3//!
4//! https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance-guidelines.html
5//!
6//! - Upload multiple files with `S3Algo::upload_files`.
7//! - List files with `S3Algo::s3_list_objects` or `S3Algo::s3_list_prefix`,
8//! and then execute deletion or copy on all the files.
9
10use crate::timeout::*;
11use aws_config::default_provider::credentials::DefaultCredentialsChain;
12use aws_config::meta::region::RegionProviderChain;
13use aws_config::BehaviorVersion;
14use aws_sdk_s3::config::retry::RetryConfig;
15use aws_sdk_s3::Client;
16use futures::future::{Future, TryFutureExt};
17use futures::prelude::*;
18use futures::stream;
19use futures_retry::{FutureRetry, RetryPolicy};
20use futures_stopwatch::try_stopwatch;
21use snafu::futures::TryFutureExt as S;
22use snafu::ResultExt;
23use std::marker::Unpin;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::Mutex;
28
29mod config;
30pub mod err;
31mod list_actions;
32mod upload;
33
34pub use list_actions::*;
35pub use upload::*;
36pub mod timeout;
37pub use config::*;
38pub use err::Error;
39
40#[cfg(test)]
41mod test;
42
43#[derive(Clone)]
44pub struct S3Algo {
45    s3: Client,
46    config: Config,
47}
48impl S3Algo {
49    pub fn new(s3: Client) -> Self {
50        Self {
51            s3,
52            config: Config::default(),
53        }
54    }
55    pub fn with_config(s3: Client, config: Config) -> Self {
56        Self { s3, config }
57    }
58}
59
60/// Result of a single S3 request.
61#[derive(Debug, Clone, Copy)]
62pub struct RequestReport {
63    /// The number of this request in a series of multiple requests (0 if not applicable)
64    pub seq: usize,
65    /// Size of request - in bytes or in number of objects, depending on the type of request.
66    pub size: usize,
67    /// The total time including all retries
68    pub total_time: Duration,
69    /// The time of the successful request
70    pub success_time: Duration,
71    /// Number of attempts. A value of `1` means no retries - success on first attempt.
72    pub attempts: usize,
73    /// Estimated sec/unit that was used in this request. Useful for
74    /// debugging the upload algorithm and not much more.
75    pub est: f64,
76}
77
78/// Issue a single S3 request, with retries and appropriate timeouts using sane defaults.
79/// Basically an easier, less general version of `s3_request`.
80///
81/// `extra_initial_timeout`: initial timeout of request (will increase with backoff) added to
82/// `cfg.base_timeout`. It can be set to 0 if the S3 operation is a small one, but if the operation
83/// size depends on for example a byte count or object count, set it to something that depends on
84/// that.
85pub async fn s3_single_request<F, G, R>(
86    future_factory: F,
87    extra_initial_timeout_s: f64,
88) -> Result<(RequestReport, R), Error>
89where
90    F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
91    G: Future<Output = Result<R, Error>> + Send,
92{
93    // Configure a one-time Timeout that gives the desired initial_timeout_s on first try.
94    // We tell `s3_request` that the request is of size `1`
95
96    let timeout = TimeoutState::new(
97        AlgorithmConfig::default(),
98        SpecificTimings {
99            seconds_per_unit: extra_initial_timeout_s,
100            minimum_units_for_estimation: 0, // doesn't matter
101        },
102    );
103
104    s3_request(
105        move || {
106            let factory = future_factory.clone();
107            async move { Ok((factory(), 1)) }
108        },
109        |_, size| size,
110        10,
111        Arc::new(Mutex::new(timeout)),
112    )
113    .await
114}
115
116/// Every request to S3 should be issued with `s3_request`, which puts the appropriate timeouts and
117/// retries the request, as well as times it.
118///
119/// `future_factory` is a bit funky, being a closure that returns a future that resolves to another
120/// future. We need the closure F to run the request multiple times. Its return type G is a future
121/// because it might need to for example open a file using async, which might then be used in H to
122/// stream from the file...
123/// This is needed so that we can get e.g. the length of the file before streaming to S3.
124///
125/// `get_size(report, expected)`: get the real size of the request. For some types of requests
126/// (e.g. DeleteObjects/PutObject), we know the size upfront, so real size = expected.
127/// For others (ListObjectsV2), we need to result of the action to know the size.
128/// The size returned from this function is only used to construct the `RequestReport`, which in
129/// turn is only useful for eventual progress closures. So the existence of `get_size` parameter is
130/// due to the feature of monitoring progress.
131///
132/// The "expected" size returned by `future_factory` on the other hand is needed to calculate the
133/// timeout.
134pub(crate) async fn s3_request<F, G, H, T, R, S>(
135    future_factory: F,
136    get_size: S,
137    n_retries: usize,
138    timeout: Arc<Mutex<T>>,
139) -> Result<(RequestReport, R), Error>
140where
141    F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
142    G: Future<Output = Result<(H, usize), Error>> + Send,
143    H: Future<Output = Result<R, Error>> + Send,
144    S: Fn(&R, usize) -> usize + Unpin + Clone + Send + Sync + 'static,
145    T: timeout::Timeout,
146{
147    let mut attempts1 = 0;
148    let mut attempts2 = 0;
149    try_stopwatch(
150        // Time the entire file upload (across all retries)
151        FutureRetry::new(
152            // Future factory - creates a future that reads file while uploading it
153            move || {
154                let (future_factory, timeout, get_size) =
155                    (future_factory.clone(), timeout.clone(), get_size.clone());
156
157                async move {
158                    attempts1 += 1;
159                    let (request, expected_size) = future_factory().await?;
160                    let (est, timeout_value) = {
161                        let t = timeout.lock().await;
162                        (t.get_estimate(), t.get_timeout(expected_size, attempts1))
163                    };
164                    try_stopwatch(
165                        tokio::time::timeout(timeout_value, request)
166                            .with_context(|| err::Timeout {})
167                            .map(|result| result.and_then(|x| x)), // flatten the Result<Result<(), err>, timeout err>
168                    )
169                    .map_ok(move |(response, success_time)| {
170                        let real_size = get_size(&response, expected_size);
171                        (response, success_time, real_size, est)
172                    })
173                    .await
174                }
175            },
176            // retry function
177            {
178                move |e| {
179                    attempts2 += 1;
180                    if attempts2 > n_retries {
181                        RetryPolicy::ForwardError(e)
182                    } else {
183                        RetryPolicy::WaitRetry(Duration::from_millis(200)) //  TODO adjust the time, maybe depending on retries
184                    }
185                }
186            },
187        ),
188    )
189    .await
190    .map(
191        move |(((response, success_time, size, est), attempts), total_time)| {
192            (
193                RequestReport {
194                    seq: 0,
195                    size,
196                    total_time,
197                    success_time,
198                    attempts,
199                    est,
200                },
201                response,
202            )
203        },
204    )
205    .map_err(|(err, _attempts)| err)
206}
207
208pub async fn retriable_s3_client() -> Client {
209    let retry_config = RetryConfig::standard()
210        .with_max_attempts(3)
211        .with_initial_backoff(Duration::from_secs(10));
212
213    let region_provider = RegionProviderChain::default_provider();
214    let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
215        .region(region_provider)
216        .load()
217        .await;
218
219    let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
220    s3_config_builder.set_retry_config(Some(retry_config));
221
222    aws_sdk_s3::Client::from_conf(s3_config_builder.build())
223}
224
225pub async fn testing_sdk_client() -> Client {
226    let retry_config = RetryConfig::standard()
227        .with_max_attempts(3)
228        .with_initial_backoff(Duration::from_secs(10));
229
230    let credentials_provider = DefaultCredentialsChain::builder()
231        .profile_name("testing")
232        .build()
233        .await;
234    let region_provider = RegionProviderChain::first_try("EuWest1");
235    let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
236        .region(region_provider)
237        .endpoint_url("http://localhost:9000")
238        .credentials_provider(credentials_provider)
239        .load()
240        .await;
241
242    let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
243    s3_config_builder.set_retry_config(Some(retry_config));
244    s3_config_builder.set_force_path_style(Some(true));
245
246    Client::from_conf(s3_config_builder.build())
247}