mountpoint_s3_client/s3_crt_client/
get_object.rs1use std::ops::Deref;
2use std::os::unix::prelude::OsStrExt;
3use std::pin::Pin;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use async_trait::async_trait;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::stream::FusedStream;
11use futures::{Stream, StreamExt};
12use mountpoint_s3_crt::http::request_response::{Header, Headers};
13use mountpoint_s3_crt::s3::client::{MetaRequest, MetaRequestResult};
14use pin_project::{pin_project, pinned_drop};
15use tracing::trace;
16
17use crate::object_client::{
18 Checksum, ChecksumMode, ClientBackpressureHandle, GetBodyPart, GetObjectError, GetObjectParams, ObjectClientError,
19 ObjectClientResult, ObjectMetadata,
20};
21
22use super::{
23 parse_checksum, GetObjectResponse, ObjectChecksumError, ResponseHeadersError, S3CrtClient, S3Operation,
24 S3RequestError,
25};
26
27impl S3CrtClient {
28 pub(super) async fn get_object(
31 &self,
32 bucket: &str,
33 key: &str,
34 params: &GetObjectParams,
35 ) -> Result<S3GetObjectResponse, ObjectClientError<GetObjectError, S3RequestError>> {
36 let requested_checksums = params.checksum_mode.as_ref() == Some(&ChecksumMode::Enabled);
37 let next_offset = params.range.as_ref().map(|r| r.start).unwrap_or(0);
38 let (event_sender, mut event_receiver) = futures::channel::mpsc::unbounded();
39 let meta_request = {
40 let span =
41 request_span!(self.inner, "get_object", bucket, key, range=?params.range, if_match=?params.if_match);
42
43 let mut message = self
44 .inner
45 .new_request_template("GET", bucket)
46 .map_err(S3RequestError::construction_failure)?;
47
48 message
50 .set_header(&Header::new("accept", "*/*"))
51 .map_err(S3RequestError::construction_failure)?;
52
53 if requested_checksums {
54 message
56 .set_header(&Header::new("x-amz-checksum-mode", "enabled"))
57 .map_err(S3RequestError::construction_failure)?;
58 }
59
60 if let Some(etag) = params.if_match.as_ref() {
61 message
63 .set_header(&Header::new("If-Match", etag.as_str()))
64 .map_err(S3RequestError::construction_failure)?;
65 }
66
67 if let Some(range) = params.range.as_ref() {
68 let range_value = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
70 message
71 .set_header(&Header::new("Range", range_value))
72 .map_err(S3RequestError::construction_failure)?;
73 }
74
75 let key = format!("/{key}");
76 message
77 .set_request_path(key)
78 .map_err(S3RequestError::construction_failure)?;
79
80 let mut options = message.into_options(S3Operation::GetObject);
81 options.part_size(self.inner.read_part_size as u64);
82
83 let mut headers_sender = Some(event_sender.clone());
84 let part_sender = event_sender.clone();
85 self.inner.meta_request_with_callbacks(
86 options,
87 span,
88 |_| (),
89 move |headers, status| {
90 if (200..300).contains(&status) {
93 if let Some(headers_sender) = headers_sender.take() {
96 _ = headers_sender.unbounded_send(S3GetObjectEvent::Headers(headers.clone()));
97 }
98 }
99 },
100 move |offset, data| {
101 _ = part_sender.unbounded_send(S3GetObjectEvent::BodyPart(offset, data.into()));
102 },
103 parse_get_object_error,
104 move |result| {
105 if let Err(e) = result {
106 _ = event_sender.unbounded_send(S3GetObjectEvent::Error(e));
107 }
108 event_sender.close_channel();
109 },
110 )?
111 };
112
113 let headers = match event_receiver.next().await {
114 Some(S3GetObjectEvent::Headers(headers)) => headers,
115 Some(S3GetObjectEvent::Error(e)) => {
116 return Err(e);
117 }
118 event => {
119 trace!(?event, "unexpected GetObject event while waiting for headers");
121 return Err(S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into());
122 }
123 };
124
125 let backpressure_handle = if self.inner.enable_backpressure {
126 let read_window_end_offset =
127 Arc::new(AtomicU64::new(next_offset + self.inner.initial_read_window_size as u64));
128 Some(S3BackpressureHandle {
129 read_window_end_offset,
130 meta_request: meta_request.clone(),
131 })
132 } else {
133 None
134 };
135 Ok(S3GetObjectResponse {
136 meta_request,
137 event_receiver,
138 requested_checksums,
139 backpressure_handle,
140 headers,
141 next_offset,
142 })
143 }
144}
145
146#[derive(Debug)]
147enum S3GetObjectEvent {
148 Headers(Headers),
149 BodyPart(u64, Box<[u8]>),
150 Error(ObjectClientError<GetObjectError, S3RequestError>),
151}
152
153#[derive(Clone, Debug)]
154pub struct S3BackpressureHandle {
155 read_window_end_offset: Arc<AtomicU64>,
158 meta_request: MetaRequest,
159}
160
161impl ClientBackpressureHandle for S3BackpressureHandle {
162 fn increment_read_window(&mut self, len: usize) {
163 self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
164 self.meta_request.increment_read_window(len as u64);
165 }
166
167 fn ensure_read_window(&mut self, desired_end_offset: u64) {
168 let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
169 self.increment_read_window(diff);
170 }
171
172 fn read_window_end_offset(&self) -> u64 {
173 self.read_window_end_offset.load(Ordering::SeqCst)
174 }
175}
176
177#[derive(Debug)]
183#[pin_project(PinnedDrop)]
184pub struct S3GetObjectResponse {
185 meta_request: MetaRequest,
186 #[pin]
187 event_receiver: UnboundedReceiver<S3GetObjectEvent>,
188 requested_checksums: bool,
189 backpressure_handle: Option<S3BackpressureHandle>,
190 headers: Headers,
191 next_offset: u64,
193}
194
195#[cfg_attr(not(docsrs), async_trait)]
196impl GetObjectResponse for S3GetObjectResponse {
197 type BackpressureHandle = S3BackpressureHandle;
198 type ClientError = S3RequestError;
199
200 fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
201 self.backpressure_handle.as_mut()
202 }
203
204 fn get_object_metadata(&self) -> ObjectMetadata {
205 self.headers
206 .iter()
207 .filter_map(|(key, value)| {
208 let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
209 let value = value.to_str()?;
210 Some((metadata_header.to_string(), value.to_string()))
211 })
212 .collect()
213 }
214
215 fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
216 if !self.requested_checksums {
217 return Err(ObjectChecksumError::DidNotRequestChecksums);
218 }
219
220 parse_checksum(&self.headers).map_err(|e| ObjectChecksumError::HeadersError(Box::new(e)))
221 }
222}
223
224#[pinned_drop]
225impl PinnedDrop for S3GetObjectResponse {
226 fn drop(self: Pin<&mut Self>) {
227 self.meta_request.cancel();
228 }
229}
230
231impl Stream for S3GetObjectResponse {
232 type Item = ObjectClientResult<GetBodyPart, GetObjectError, S3RequestError>;
233
234 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
235 if self.event_receiver.is_terminated() {
236 return Poll::Ready(None);
237 }
238
239 let this = self.project();
240 match this.event_receiver.poll_next(cx) {
241 Poll::Ready(None) => Poll::Ready(None),
242 Poll::Ready(Some(S3GetObjectEvent::BodyPart(offset, part))) => {
243 *this.next_offset = offset + part.len() as u64;
244 Poll::Ready(Some(Ok((offset, part))))
245 }
246 Poll::Ready(Some(S3GetObjectEvent::Headers(_))) => {
247 unreachable!("headers are only sent once and received before returning the stream")
248 }
249 Poll::Ready(Some(S3GetObjectEvent::Error(e))) => Poll::Ready(Some(Err(e))),
250 Poll::Pending => {
251 if let Some(handle) = &this.backpressure_handle {
256 if *this.next_offset >= handle.read_window_end_offset() {
257 return Poll::Ready(Some(Err(ObjectClientError::ClientError(
258 S3RequestError::EmptyReadWindow,
259 ))));
260 }
261 }
262 Poll::Pending
263 }
264 }
265 }
266}
267
268fn parse_get_object_error(result: &MetaRequestResult) -> Option<GetObjectError> {
269 match result.response_status {
270 404 => {
271 let body = result.error_response_body.as_ref()?;
272 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
273 let error_code = root.get_child("Code")?;
274 let error_str = error_code.get_text()?;
275 match error_str.deref() {
276 "NoSuchBucket" => Some(GetObjectError::NoSuchBucket),
277 "NoSuchKey" => Some(GetObjectError::NoSuchKey),
278 _ => None,
279 }
280 }
281 412 => Some(GetObjectError::PreconditionFailed),
282 _ => None,
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use std::ffi::{OsStr, OsString};
289
290 use super::*;
291
292 fn make_result(response_status: i32, body: impl Into<OsString>) -> MetaRequestResult {
293 MetaRequestResult {
294 response_status,
295 crt_error: 1i32.into(),
296 error_response_headers: None,
297 error_response_body: Some(body.into()),
298 }
299 }
300
301 #[test]
302 fn parse_404_no_such_key() {
303 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Key>not-a-real-key</Key><RequestId>NTKJWKHQBYNS73A9</RequestId><HostId>Nc9kWNrf4kGoq5NIUnQ4t7u04ZZXGm/i463v+jwCI8sIrZBqeYI8uffLHQ+/qusdMWNuUwqeXHU=</HostId></Error>"#;
304 let result = make_result(404, OsStr::from_bytes(&body[..]));
305 let result = parse_get_object_error(&result);
306 assert_eq!(result, Some(GetObjectError::NoSuchKey));
307 }
308
309 #[test]
310 fn parse_404_no_such_bucket() {
311 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>amzn-s3-demo-bucket</BucketName><RequestId>4VAGDP5HMYTDNB3Y</RequestId><HostId>JMgGqpVKIaaTieG68IODiV2piWw/q9VCTowGvWP36BEz6oIVEXiesn8cDE5ph7if0gpY5WU1Wc8=</HostId></Error>"#;
312 let result = make_result(404, OsStr::from_bytes(&body[..]));
313 let result = parse_get_object_error(&result);
314 assert_eq!(result, Some(GetObjectError::NoSuchBucket));
315 }
316
317 #[test]
318 fn parse_403_glacier_storage_class() {
319 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidObjectState</Code><Message>The action is not valid for the object's storage class</Message><RequestId>9FEFFF118E15B86F</RequestId><HostId>WVQ5kzhiT+oiUfDCOiOYv8W4Tk9eNcxWi/MK+hTS/av34Xy4rBU3zsavf0aaaaa</HostId></Error>"#;
320 let result = make_result(403, OsStr::from_bytes(&body[..]));
321 let result = parse_get_object_error(&result);
322 assert_eq!(result, None);
323 }
324}