1use crate::timeout::*;
11use aws_config::default_provider::credentials::DefaultCredentialsChain;
12use aws_config::meta::region::RegionProviderChain;
13use aws_config::BehaviorVersion;
14use aws_sdk_s3::config::retry::RetryConfig;
15use aws_sdk_s3::Client;
16use futures::future::{Future, TryFutureExt};
17use futures::prelude::*;
18use futures::stream;
19use futures_retry::{FutureRetry, RetryPolicy};
20use futures_stopwatch::try_stopwatch;
21use snafu::futures::TryFutureExt as S;
22use snafu::ResultExt;
23use std::marker::Unpin;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::Mutex;
28
29mod config;
30pub mod err;
31mod list_actions;
32mod upload;
33
34pub use list_actions::*;
35pub use upload::*;
36pub mod timeout;
37pub use config::*;
38pub use err::Error;
39
40#[cfg(test)]
41mod test;
42
43#[derive(Clone)]
44pub struct S3Algo {
45 s3: Client,
46 config: Config,
47}
48impl S3Algo {
49 pub fn new(s3: Client) -> Self {
50 Self {
51 s3,
52 config: Config::default(),
53 }
54 }
55 pub fn with_config(s3: Client, config: Config) -> Self {
56 Self { s3, config }
57 }
58}
59
60#[derive(Debug, Clone, Copy)]
62pub struct RequestReport {
63 pub seq: usize,
65 pub size: usize,
67 pub total_time: Duration,
69 pub success_time: Duration,
71 pub attempts: usize,
73 pub est: f64,
76}
77
78pub async fn s3_single_request<F, G, R>(
86 future_factory: F,
87 extra_initial_timeout_s: f64,
88) -> Result<(RequestReport, R), Error>
89where
90 F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
91 G: Future<Output = Result<R, Error>> + Send,
92{
93 let timeout = TimeoutState::new(
97 AlgorithmConfig::default(),
98 SpecificTimings {
99 seconds_per_unit: extra_initial_timeout_s,
100 minimum_units_for_estimation: 0, },
102 );
103
104 s3_request(
105 move || {
106 let factory = future_factory.clone();
107 async move { Ok((factory(), 1)) }
108 },
109 |_, size| size,
110 10,
111 Arc::new(Mutex::new(timeout)),
112 )
113 .await
114}
115
116pub(crate) async fn s3_request<F, G, H, T, R, S>(
135 future_factory: F,
136 get_size: S,
137 n_retries: usize,
138 timeout: Arc<Mutex<T>>,
139) -> Result<(RequestReport, R), Error>
140where
141 F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
142 G: Future<Output = Result<(H, usize), Error>> + Send,
143 H: Future<Output = Result<R, Error>> + Send,
144 S: Fn(&R, usize) -> usize + Unpin + Clone + Send + Sync + 'static,
145 T: timeout::Timeout,
146{
147 let mut attempts1 = 0;
148 let mut attempts2 = 0;
149 try_stopwatch(
150 FutureRetry::new(
152 move || {
154 let (future_factory, timeout, get_size) =
155 (future_factory.clone(), timeout.clone(), get_size.clone());
156
157 async move {
158 attempts1 += 1;
159 let (request, expected_size) = future_factory().await?;
160 let (est, timeout_value) = {
161 let t = timeout.lock().await;
162 (t.get_estimate(), t.get_timeout(expected_size, attempts1))
163 };
164 try_stopwatch(
165 tokio::time::timeout(timeout_value, request)
166 .with_context(|| err::Timeout {})
167 .map(|result| result.and_then(|x| x)), )
169 .map_ok(move |(response, success_time)| {
170 let real_size = get_size(&response, expected_size);
171 (response, success_time, real_size, est)
172 })
173 .await
174 }
175 },
176 {
178 move |e| {
179 attempts2 += 1;
180 if attempts2 > n_retries {
181 RetryPolicy::ForwardError(e)
182 } else {
183 RetryPolicy::WaitRetry(Duration::from_millis(200)) }
185 }
186 },
187 ),
188 )
189 .await
190 .map(
191 move |(((response, success_time, size, est), attempts), total_time)| {
192 (
193 RequestReport {
194 seq: 0,
195 size,
196 total_time,
197 success_time,
198 attempts,
199 est,
200 },
201 response,
202 )
203 },
204 )
205 .map_err(|(err, _attempts)| err)
206}
207
208pub async fn retriable_s3_client() -> Client {
209 let retry_config = RetryConfig::standard()
210 .with_max_attempts(3)
211 .with_initial_backoff(Duration::from_secs(10));
212
213 let region_provider = RegionProviderChain::default_provider();
214 let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
215 .region(region_provider)
216 .load()
217 .await;
218
219 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
220 s3_config_builder.set_retry_config(Some(retry_config));
221
222 aws_sdk_s3::Client::from_conf(s3_config_builder.build())
223}
224
225pub async fn testing_sdk_client() -> Client {
226 let retry_config = RetryConfig::standard()
227 .with_max_attempts(3)
228 .with_initial_backoff(Duration::from_secs(10));
229
230 let credentials_provider = DefaultCredentialsChain::builder()
231 .profile_name("testing")
232 .build()
233 .await;
234 let region_provider = RegionProviderChain::first_try("EuWest1");
235 let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
236 .region(region_provider)
237 .endpoint_url("http://localhost:9000")
238 .credentials_provider(credentials_provider)
239 .load()
240 .await;
241
242 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&sdk_config);
243 s3_config_builder.set_retry_config(Some(retry_config));
244 s3_config_builder.set_force_path_style(Some(true));
245
246 Client::from_conf(s3_config_builder.build())
247}