aws-sdk-s3 1.131.0

AWS SDK for Amazon Simple Storage Service
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
// Code generated by software.amazon.smithy.rust.codegen.smithy-rs. DO NOT EDIT.
/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

#![allow(dead_code)]

//! Interceptor for handling Smithy `@httpChecksum` request checksumming with AWS SigV4

use crate::presigning::PresigningMarker;
use aws_runtime::content_encoding::AwsChunkedBodyOptions;
use aws_smithy_checksums::body::calculate;
use aws_smithy_checksums::body::ChecksumCache;
use aws_smithy_checksums::http::HttpChecksum;
use aws_smithy_checksums::ChecksumAlgorithm;
use aws_smithy_runtime::client::sdk_feature::SmithySdkFeature;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::{BeforeSerializationInterceptorContextMut, BeforeTransmitInterceptorContextMut, Input};
use aws_smithy_runtime_api::client::interceptors::{dyn_dispatch_hint, Intercept};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::Request;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::checksum_config::RequestChecksumCalculation;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use http_1x::{HeaderMap, HeaderName};
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{fmt, mem};

/// Errors related to constructing checksum-validated HTTP requests
#[derive(Debug)]
pub(crate) enum Error {
    ChecksumHeadersAreUnsupportedForStreamingBody,
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::ChecksumHeadersAreUnsupportedForStreamingBody => write!(
                f,
                "Checksum header insertion is only supported for non-streaming HTTP bodies. \
                   To checksum validate a streaming body, the checksums must be sent as trailers."
            ),
        }
    }
}

impl std::error::Error for Error {}

#[derive(Debug, Default, Clone)]
struct RequestChecksumInterceptorState {
    /// The checksum algorithm to calculate
    checksum_algorithm: Option<String>,
    /// This value is set in the model on the `httpChecksum` trait
    request_checksum_required: bool,
    calculate_checksum: Arc<AtomicBool>,
    checksum_cache: ChecksumCache,
}

impl RequestChecksumInterceptorState {
    fn checksum_algorithm(&self) -> Option<ChecksumAlgorithm> {
        self.checksum_algorithm
            .as_ref()
            .and_then(|s| ChecksumAlgorithm::from_str(s.as_str()).ok())
    }

    fn calculate_checksum(&self) -> bool {
        self.calculate_checksum.load(Ordering::SeqCst)
    }
}

impl Storable for RequestChecksumInterceptorState {
    type Storer = StoreReplace<Self>;
}

type CustomDefaultFn = Box<dyn Fn(Option<ChecksumAlgorithm>, &ConfigBag) -> Option<ChecksumAlgorithm> + Send + Sync + 'static>;

pub(crate) struct DefaultRequestChecksumOverride {
    custom_default: CustomDefaultFn,
}
impl fmt::Debug for DefaultRequestChecksumOverride {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("DefaultRequestChecksumOverride").finish()
    }
}
impl Storable for DefaultRequestChecksumOverride {
    type Storer = StoreReplace<Self>;
}
impl DefaultRequestChecksumOverride {
    pub(crate) fn new<F>(custom_default: F) -> Self
    where
        F: Fn(Option<ChecksumAlgorithm>, &ConfigBag) -> Option<ChecksumAlgorithm> + Send + Sync + 'static,
    {
        Self {
            custom_default: Box::new(custom_default),
        }
    }
    pub(crate) fn custom_default(&self, original: Option<ChecksumAlgorithm>, config_bag: &ConfigBag) -> Option<ChecksumAlgorithm> {
        (self.custom_default)(original, config_bag)
    }
}

pub(crate) struct RequestChecksumInterceptor<AP, CM> {
    algorithm_provider: AP,
    checksum_mutator: CM,
}

impl<AP, CM> fmt::Debug for RequestChecksumInterceptor<AP, CM> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("RequestChecksumInterceptor").finish()
    }
}

impl<AP, CM> RequestChecksumInterceptor<AP, CM> {
    pub(crate) fn new(algorithm_provider: AP, checksum_mutator: CM) -> Self {
        Self {
            algorithm_provider,
            checksum_mutator,
        }
    }
}

#[dyn_dispatch_hint]
impl<AP, CM> Intercept for RequestChecksumInterceptor<AP, CM>
where
    AP: Fn(&Input) -> (Option<String>, bool) + Send + Sync,
    CM: Fn(&mut Request, &ConfigBag) -> Result<bool, BoxError> + Send + Sync,
{
    fn name(&self) -> &'static str {
        "RequestChecksumInterceptor"
    }

    fn modify_before_serialization(
        &self,
        context: &mut BeforeSerializationInterceptorContextMut<'_>,
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let (checksum_algorithm, request_checksum_required) = (self.algorithm_provider)(context.input());

        cfg.interceptor_state().store_put(RequestChecksumInterceptorState {
            checksum_algorithm,
            request_checksum_required,
            checksum_cache: ChecksumCache::new(),
            calculate_checksum: Arc::new(AtomicBool::new(false)),
        });

        Ok(())
    }

    /// Setup state for calculating checksum and setting UA features
    fn modify_before_retry_loop(
        &self,
        context: &mut BeforeTransmitInterceptorContextMut<'_>,
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let user_set_checksum_value = (self.checksum_mutator)(context.request_mut(), cfg).expect("Checksum header mutation should not fail");
        let is_presigned = cfg.load::<PresigningMarker>().is_some();

        // If the user manually set a checksum header or if this is a presigned request, we short circuit
        if user_set_checksum_value || is_presigned {
            // Disable aws-chunked encoding since either the user has set a custom checksum
            cfg.interceptor_state().store_put(AwsChunkedBodyOptions::disable_chunked_encoding());

            return Ok(());
        }

        let state = cfg
            .get_mut_from_interceptor_state::<RequestChecksumInterceptorState>()
            .expect("set in `read_before_serialization`");

        // If the algorithm fails to parse it is not one we support and we error
        let checksum_algorithm = state
            .checksum_algorithm
            .clone()
            .map(|s| ChecksumAlgorithm::from_str(s.as_str()))
            .transpose()?;

        let mut state = std::mem::take(state);

        if calculate_checksum(cfg, &state) {
            state.calculate_checksum.store(true, Ordering::Release);

            // If a checksum override is set in the ConfigBag we use that instead (currently only used by S3Express)
            // If we have made it this far without a checksum being set we set the default (currently Crc32)
            let checksum_algorithm = incorporate_custom_default(checksum_algorithm, cfg).unwrap_or_default();
            state.checksum_algorithm = Some(checksum_algorithm.as_str().to_owned());

            // NOTE: We have to do this in modify_before_retry_loop since UA interceptor also runs
            // in modify_before_signing but is registered before this interceptor (client level vs operation level).
            track_metric_for_selected_checksum_algorithm(cfg, &checksum_algorithm);
        } else {
            // No checksum calculation needed so disable aws-chunked encoding
            cfg.interceptor_state().store_put(AwsChunkedBodyOptions::disable_chunked_encoding());
        }

        cfg.interceptor_state().store_put(state);
        Ok(())
    }

    /// Calculate a checksum and modify the request to do either of the following:
    /// - include the checksum as a header for signing with in-memory request bodies.
    /// - include the checksum as a trailer for streaming request bodies.
    fn modify_before_signing(
        &self,
        context: &mut BeforeTransmitInterceptorContextMut<'_>,
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        let state = cfg.load::<RequestChecksumInterceptorState>().expect("set in `read_before_serialization`");

        if !state.calculate_checksum() {
            return Ok(());
        }

        let checksum_algorithm = state.checksum_algorithm().expect("set in `modify_before_retry_loop`");
        let mut checksum = checksum_algorithm.into_impl();

        match context.request().body().bytes() {
            Some(data) => {
                tracing::debug!("applying {checksum_algorithm:?} of the request body as a header");
                checksum.update(data);

                for (hdr_name, hdr_value) in get_or_cache_headers(checksum.headers(), &state.checksum_cache).iter() {
                    context.request_mut().headers_mut().insert(hdr_name.clone(), hdr_value.clone());
                }
            }
            None => {
                tracing::debug!("applying {checksum_algorithm:?} of the request body as a trailer");
                context
                    .request_mut()
                    .headers_mut()
                    .insert(HeaderName::from_static("x-amz-trailer"), checksum.header_name());

                // Take checksum header into account for `AwsChunkedBodyOptions`'s trailer length
                let trailer_len = HttpChecksum::size(checksum.as_ref());
                let chunked_body_options = AwsChunkedBodyOptions::default().with_trailer_len(trailer_len);
                cfg.interceptor_state().store_put(chunked_body_options);
            }
        }

        Ok(())
    }

    fn modify_before_transmit(
        &self,
        ctx: &mut BeforeTransmitInterceptorContextMut<'_>,
        _runtime_components: &RuntimeComponents,
        cfg: &mut ConfigBag,
    ) -> Result<(), BoxError> {
        if ctx.request().body().bytes().is_some() {
            // Nothing to do for non-streaming bodies since the checksum was added to the the header
            // in `modify_before_signing` and signing has already been done by the time this hook is called.
            return Ok(());
        }

        let state = cfg.load::<RequestChecksumInterceptorState>().expect("set in `read_before_serialization`");

        if !state.calculate_checksum() {
            return Ok(());
        }

        let request = ctx.request_mut();

        let mut body = {
            let body = mem::replace(request.body_mut(), SdkBody::taken());

            let checksum_algorithm = state.checksum_algorithm().expect("set in `modify_before_retry_loop`");
            let checksum_cache = state.checksum_cache.clone();

            body.map(move |body| {
                let checksum = checksum_algorithm.into_impl();
                let body = calculate::ChecksumBody::new(body, checksum).with_cache(checksum_cache.clone());

                SdkBody::from_body_1_x(body)
            })
        };

        mem::swap(request.body_mut(), &mut body);

        Ok(())
    }
}

fn incorporate_custom_default(checksum: Option<ChecksumAlgorithm>, cfg: &ConfigBag) -> Option<ChecksumAlgorithm> {
    match cfg.load::<DefaultRequestChecksumOverride>() {
        Some(checksum_override) => checksum_override.custom_default(checksum, cfg),
        None => checksum,
    }
}

fn get_or_cache_headers(calculated_headers: HeaderMap, checksum_cache: &ChecksumCache) -> HeaderMap {
    if let Some(cached_headers) = checksum_cache.get() {
        if cached_headers != calculated_headers {
            tracing::warn!(cached = ?cached_headers, calculated = ?calculated_headers, "calculated checksum differs from cached checksum!");
        }
        cached_headers
    } else {
        checksum_cache.set(calculated_headers.clone());
        calculated_headers
    }
}

// Determine if we actually calculate the checksum
fn calculate_checksum(cfg: &mut ConfigBag, state: &RequestChecksumInterceptorState) -> bool {
    // This value is set by the user on the SdkConfig to indicate their preference
    // We provide a default here for users that use a client config instead of the SdkConfig
    let request_checksum_calculation = cfg
        .load::<RequestChecksumCalculation>()
        .unwrap_or(&RequestChecksumCalculation::WhenSupported);

    // If the user setting is WhenSupported (the default) we always calculate it (because this interceptor
    // isn't added if it isn't supported). If it is WhenRequired we only calculate it if the checksum
    // is marked required on the trait.
    match request_checksum_calculation {
        RequestChecksumCalculation::WhenRequired => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqWhenRequired);
            state.request_checksum_required
        }
        RequestChecksumCalculation::WhenSupported => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqWhenSupported);
            true
        }
        unsupported => {
            tracing::warn!(
                more_info = "Unsupported value of RequestChecksumCalculation when setting user-agent metrics",
                unsupported = ?unsupported
            );
            true
        }
    }
}

// Set the user-agent metric for the selected checksum algorithm
fn track_metric_for_selected_checksum_algorithm(cfg: &mut ConfigBag, checksum_algorithm: &ChecksumAlgorithm) {
    match checksum_algorithm {
        ChecksumAlgorithm::Crc32 => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqCrc32);
        }
        ChecksumAlgorithm::Crc32c => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqCrc32c);
        }
        ChecksumAlgorithm::Crc64Nvme => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqCrc64);
        }
        #[allow(deprecated)]
        ChecksumAlgorithm::Md5 => {
            tracing::warn!(more_info = "Unsupported ChecksumAlgorithm MD5 set");
        }
        ChecksumAlgorithm::Sha1 => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqSha1);
        }
        ChecksumAlgorithm::Sha256 => {
            cfg.interceptor_state().store_append(SmithySdkFeature::FlexibleChecksumsReqSha256);
        }
        unsupported => tracing::warn!(
                more_info = "Unsupported value of ChecksumAlgorithm detected when setting user-agent metrics",
                unsupported = ?unsupported),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use aws_smithy_checksums::ChecksumAlgorithm;
    use aws_smithy_runtime_api::client::interceptors::context::{BeforeTransmitInterceptorContextMut, InterceptorContext};
    use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
    use aws_smithy_types::base64;
    use aws_smithy_types::byte_stream::ByteStream;
    use bytes::BytesMut;
    use http_body_util::BodyExt;
    use tempfile::NamedTempFile;

    fn create_test_interceptor() -> RequestChecksumInterceptor<
        impl Fn(&Input) -> (Option<String>, bool) + Send + Sync,
        impl Fn(&mut Request, &ConfigBag) -> Result<bool, BoxError> + Send + Sync,
    > {
        fn algo(_: &Input) -> (Option<String>, bool) {
            (Some("crc32".to_string()), false)
        }
        fn mutator(_: &mut Request, _: &ConfigBag) -> Result<bool, BoxError> {
            Ok(false)
        }
        RequestChecksumInterceptor::new(algo, mutator)
    }

    #[tokio::test]
    async fn test_checksum_body_is_retryable() {
        use std::io::Write;
        let mut file = NamedTempFile::new().unwrap();
        let algorithm_str = "crc32c";
        let checksum_algorithm: ChecksumAlgorithm = algorithm_str.parse().unwrap();

        let mut crc32c_checksum = checksum_algorithm.into_impl();
        for i in 0..10000 {
            let line = format!("This is a large file created for testing purposes {}", i);
            file.as_file_mut().write_all(line.as_bytes()).unwrap();
            crc32c_checksum.update(line.as_bytes());
        }
        let crc32c_checksum = crc32c_checksum.finalize();

        let request = HttpRequest::new(ByteStream::read_from().path(&file).buffer_size(1024).build().await.unwrap().into_inner());

        // ensure original SdkBody is retryable
        assert!(request.body().try_clone().is_some());

        let interceptor = create_test_interceptor();
        let mut cfg = ConfigBag::base();
        cfg.interceptor_state().store_put(RequestChecksumInterceptorState {
            checksum_algorithm: Some(algorithm_str.to_string()),
            calculate_checksum: Arc::new(AtomicBool::new(true)),
            ..Default::default()
        });
        let runtime_components = RuntimeComponentsBuilder::for_tests().build().unwrap();
        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
        ctx.enter_serialization_phase();
        let _ = ctx.take_input();
        ctx.set_request(request);
        ctx.enter_before_transmit_phase();
        let mut ctx: BeforeTransmitInterceptorContextMut<'_> = (&mut ctx).into();
        interceptor.modify_before_transmit(&mut ctx, &runtime_components, &mut cfg).unwrap();

        // ensure wrapped SdkBody is retryable
        let mut body = ctx.request().body().try_clone().expect("body is retryable");

        let mut body_data = BytesMut::new();
        let mut header_value = None;
        while let Some(Ok(frame)) = body.frame().await {
            if frame.is_data() {
                let data = frame.into_data().unwrap();
                body_data.extend_from_slice(&data);
            } else {
                let trailers = frame.into_trailers().unwrap();
                if let Some(hv) = trailers.get("x-amz-checksum-crc32c") {
                    header_value = Some(hv.to_str().unwrap().to_owned());
                }
            }
        }
        let body_str = std::str::from_utf8(&body_data).unwrap();
        let expected = format!("This is a large file created for testing purposes 9999");
        assert!(body_str.ends_with(&expected), "expected '{body_str}' to end with '{expected}'");
        let expected_checksum = base64::encode(&crc32c_checksum);
        assert_eq!(
            header_value.as_ref(),
            Some(&expected_checksum),
            "expected checksum '{header_value:?}' to match '{expected_checksum}'"
        );

        let collected_body = body.collect().await.unwrap();
        while let Some(trailer) = collected_body.trailers() {
            if let Some(header_value) = trailer.get("x-amz-checksum-crc32c") {
                let header_value = header_value.to_str().unwrap();
                assert_eq!(
                    header_value, expected_checksum,
                    "expected checksum '{header_value}' to match '{expected_checksum}'"
                );
            }
        }
    }
}