mountpoint_s3_client/s3_crt_client/
get_object.rs

1use std::ops::Deref;
2use std::os::unix::prelude::OsStrExt;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use async_trait::async_trait;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::stream::FusedStream;
11use futures::{Stream, StreamExt};
12use mountpoint_s3_crt::http::request_response::{Header, Headers};
13use mountpoint_s3_crt::s3::client::{MetaRequest, MetaRequestResult};
14use pin_project::pin_project;
15use tracing::trace;
16
17use crate::object_client::{
18    Checksum, ChecksumMode, ClientBackpressureHandle, GetBodyPart, GetObjectError, GetObjectParams, ObjectClientError,
19    ObjectClientResult, ObjectMetadata,
20};
21
22use super::{
23    parse_checksum, CancellingMetaRequest, GetObjectResponse, ObjectChecksumError, ResponseHeadersError, S3CrtClient,
24    S3Operation, S3RequestError,
25};
26
27impl S3CrtClient {
28    /// Create and begin a new GetObject request. The returned [S3GetObjectResponse] is a [Stream] of
29    /// body parts of the object, which will be delivered in order.
30    pub(super) async fn get_object(
31        &self,
32        bucket: &str,
33        key: &str,
34        params: &GetObjectParams,
35    ) -> Result<S3GetObjectResponse, ObjectClientError<GetObjectError, S3RequestError>> {
36        let requested_checksums = params.checksum_mode.as_ref() == Some(&ChecksumMode::Enabled);
37        let next_offset = params.range.as_ref().map(|r| r.start).unwrap_or(0);
38        let (event_sender, mut event_receiver) = futures::channel::mpsc::unbounded();
39        let meta_request = {
40            let span =
41                request_span!(self.inner, "get_object", bucket, key, range=?params.range, if_match=?params.if_match);
42
43            let mut message = self
44                .inner
45                .new_request_template("GET", bucket)
46                .map_err(S3RequestError::construction_failure)?;
47
48            // Overwrite "accept" header since this returns raw object data.
49            message
50                .set_header(&Header::new("accept", "*/*"))
51                .map_err(S3RequestError::construction_failure)?;
52
53            if requested_checksums {
54                // Add checksum header to receive object checksums.
55                message
56                    .set_header(&Header::new("x-amz-checksum-mode", "enabled"))
57                    .map_err(S3RequestError::construction_failure)?;
58            }
59
60            if let Some(etag) = params.if_match.as_ref() {
61                // Return the object only if its entity tag (ETag) is matched
62                message
63                    .set_header(&Header::new("If-Match", etag.as_str()))
64                    .map_err(S3RequestError::construction_failure)?;
65            }
66
67            if let Some(range) = params.range.as_ref() {
68                // Range HTTP header is bounded below *inclusive*
69                let range_value = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
70                message
71                    .set_header(&Header::new("Range", range_value))
72                    .map_err(S3RequestError::construction_failure)?;
73            }
74
75            let key = format!("/{key}");
76            message
77                .set_request_path(key)
78                .map_err(S3RequestError::construction_failure)?;
79
80            let mut options = message.into_options(S3Operation::GetObject);
81            options.part_size(self.inner.read_part_size as u64);
82
83            let mut headers_sender = Some(event_sender.clone());
84            let part_sender = event_sender.clone();
85            self.inner.meta_request_with_callbacks(
86                options,
87                span,
88                |_| (),
89                move |headers, status| {
90                    // Only send headers if we have a 2xx status code. If we only get other status codes,
91                    // then on_meta_request_result will send an error.
92                    if (200..300).contains(&status) {
93                        // Headers can be returned multiple times, but the metadata/checksums don't change.
94                        // We only send the first occurence to the channel.
95                        if let Some(headers_sender) = headers_sender.take() {
96                            _ = headers_sender.unbounded_send(S3GetObjectEvent::Headers(headers.clone()));
97                        }
98                    }
99                },
100                move |offset, data| {
101                    _ = part_sender.unbounded_send(S3GetObjectEvent::BodyPart(offset, data.into()));
102                },
103                parse_get_object_error,
104                move |result| {
105                    if let Err(e) = result {
106                        _ = event_sender.unbounded_send(S3GetObjectEvent::Error(e));
107                    }
108                    event_sender.close_channel();
109                },
110            )?
111        };
112
113        let headers = match event_receiver.next().await {
114            Some(S3GetObjectEvent::Headers(headers)) => headers,
115            Some(S3GetObjectEvent::Error(e)) => {
116                return Err(e);
117            }
118            event => {
119                // If we did not received the headers first, the request must have failed.
120                trace!(?event, "unexpected GetObject event while waiting for headers");
121                return Err(S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into());
122            }
123        };
124
125        let backpressure_handle = if self.inner.enable_backpressure {
126            let read_window_end_offset =
127                Arc::new(AtomicU64::new(next_offset + self.inner.initial_read_window_size as u64));
128            Some(S3BackpressureHandle {
129                read_window_end_offset,
130                meta_request: meta_request.clone(),
131            })
132        } else {
133            None
134        };
135        Ok(S3GetObjectResponse {
136            meta_request,
137            event_receiver,
138            requested_checksums,
139            backpressure_handle,
140            headers,
141            next_offset,
142        })
143    }
144}
145
146#[derive(Debug)]
147enum S3GetObjectEvent {
148    Headers(Headers),
149    BodyPart(u64, Box<[u8]>),
150    Error(ObjectClientError<GetObjectError, S3RequestError>),
151}
152
153#[derive(Clone, Debug)]
154pub struct S3BackpressureHandle {
155    /// Upper bound of the current read window. When backpressure is enabled, [S3GetObjectRequest]
156    /// can return data up to this offset *exclusively*.
157    read_window_end_offset: Arc<AtomicU64>,
158    meta_request: MetaRequest,
159}
160
161impl ClientBackpressureHandle for S3BackpressureHandle {
162    fn increment_read_window(&mut self, len: usize) {
163        self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
164        self.meta_request.increment_read_window(len as u64);
165    }
166
167    fn ensure_read_window(&mut self, desired_end_offset: u64) {
168        let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
169        self.increment_read_window(diff);
170    }
171
172    fn read_window_end_offset(&self) -> u64 {
173        self.read_window_end_offset.load(Ordering::SeqCst)
174    }
175}
176
177/// A streaming response to a GetObject request.
178///
179/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
180/// Each item of the stream is a part of the object body together with the part's offset within the
181/// object.
182#[derive(Debug)]
183#[pin_project]
184pub struct S3GetObjectResponse {
185    meta_request: CancellingMetaRequest,
186    #[pin]
187    event_receiver: UnboundedReceiver<S3GetObjectEvent>,
188    requested_checksums: bool,
189    backpressure_handle: Option<S3BackpressureHandle>,
190    headers: Headers,
191    /// Next offset of the data to be polled from [poll_next]
192    next_offset: u64,
193}
194
195#[cfg_attr(not(docsrs), async_trait)]
196impl GetObjectResponse for S3GetObjectResponse {
197    type BackpressureHandle = S3BackpressureHandle;
198    type ClientError = S3RequestError;
199
200    fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
201        self.backpressure_handle.as_mut()
202    }
203
204    fn get_object_metadata(&self) -> ObjectMetadata {
205        self.headers
206            .iter()
207            .filter_map(|(key, value)| {
208                let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
209                let value = value.to_str()?;
210                Some((metadata_header.to_string(), value.to_string()))
211            })
212            .collect()
213    }
214
215    fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
216        if !self.requested_checksums {
217            return Err(ObjectChecksumError::DidNotRequestChecksums);
218        }
219
220        parse_checksum(&self.headers).map_err(|e| ObjectChecksumError::HeadersError(Box::new(e)))
221    }
222}
223
224impl Stream for S3GetObjectResponse {
225    type Item = ObjectClientResult<GetBodyPart, GetObjectError, S3RequestError>;
226
227    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
228        if self.event_receiver.is_terminated() {
229            return Poll::Ready(None);
230        }
231
232        let this = self.project();
233        match this.event_receiver.poll_next(cx) {
234            Poll::Ready(None) => Poll::Ready(None),
235            Poll::Ready(Some(S3GetObjectEvent::BodyPart(offset, part))) => {
236                *this.next_offset = offset + part.len() as u64;
237                Poll::Ready(Some(Ok((offset, part))))
238            }
239            Poll::Ready(Some(S3GetObjectEvent::Headers(_))) => {
240                unreachable!("headers are only sent once and received before returning the stream")
241            }
242            Poll::Ready(Some(S3GetObjectEvent::Error(e))) => Poll::Ready(Some(Err(e))),
243            Poll::Pending => {
244                // If the request is still not finished but the read window is not enough to poll
245                // the next chunk we want to return error instead of keeping the request blocked.
246                // This prevents a risk of deadlock from using the [S3CrtClient], users must implement
247                // their own logic to block the request if they really want to block a [S3GetObjectResponse].
248                if let Some(handle) = &this.backpressure_handle {
249                    if *this.next_offset >= handle.read_window_end_offset() {
250                        return Poll::Ready(Some(Err(ObjectClientError::ClientError(
251                            S3RequestError::EmptyReadWindow,
252                        ))));
253                    }
254                }
255                Poll::Pending
256            }
257        }
258    }
259}
260
261fn parse_get_object_error(result: &MetaRequestResult) -> Option<GetObjectError> {
262    match result.response_status {
263        404 => {
264            let body = result.error_response_body.as_ref()?;
265            let root = xmltree::Element::parse(body.as_bytes()).ok()?;
266            let error_code = root.get_child("Code")?;
267            let error_str = error_code.get_text()?;
268            match error_str.deref() {
269                "NoSuchBucket" => Some(GetObjectError::NoSuchBucket),
270                "NoSuchKey" => Some(GetObjectError::NoSuchKey),
271                _ => None,
272            }
273        }
274        412 => Some(GetObjectError::PreconditionFailed),
275        _ => None,
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use std::ffi::{OsStr, OsString};
282
283    use super::*;
284
285    fn make_result(response_status: i32, body: impl Into<OsString>) -> MetaRequestResult {
286        MetaRequestResult {
287            response_status,
288            crt_error: 1i32.into(),
289            error_response_headers: None,
290            error_response_body: Some(body.into()),
291        }
292    }
293
294    #[test]
295    fn parse_404_no_such_key() {
296        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Key>not-a-real-key</Key><RequestId>NTKJWKHQBYNS73A9</RequestId><HostId>Nc9kWNrf4kGoq5NIUnQ4t7u04ZZXGm/i463v+jwCI8sIrZBqeYI8uffLHQ+/qusdMWNuUwqeXHU=</HostId></Error>"#;
297        let result = make_result(404, OsStr::from_bytes(&body[..]));
298        let result = parse_get_object_error(&result);
299        assert_eq!(result, Some(GetObjectError::NoSuchKey));
300    }
301
302    #[test]
303    fn parse_404_no_such_bucket() {
304        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>amzn-s3-demo-bucket</BucketName><RequestId>4VAGDP5HMYTDNB3Y</RequestId><HostId>JMgGqpVKIaaTieG68IODiV2piWw/q9VCTowGvWP36BEz6oIVEXiesn8cDE5ph7if0gpY5WU1Wc8=</HostId></Error>"#;
305        let result = make_result(404, OsStr::from_bytes(&body[..]));
306        let result = parse_get_object_error(&result);
307        assert_eq!(result, Some(GetObjectError::NoSuchBucket));
308    }
309
310    #[test]
311    fn parse_403_glacier_storage_class() {
312        let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidObjectState</Code><Message>The action is not valid for the object's storage class</Message><RequestId>9FEFFF118E15B86F</RequestId><HostId>WVQ5kzhiT+oiUfDCOiOYv8W4Tk9eNcxWi/MK+hTS/av34Xy4rBU3zsavf0aaaaa</HostId></Error>"#;
313        let result = make_result(403, OsStr::from_bytes(&body[..]));
314        let result = parse_get_object_error(&result);
315        assert_eq!(result, None);
316    }
317}