mountpoint_s3_client/s3_crt_client/
get_object.rs1use std::ops::Deref;
2use std::os::unix::prelude::OsStrExt;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::task::{Context, Poll};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::channel::mpsc::UnboundedReceiver;
11use futures::stream::FusedStream;
12use futures::{Stream, StreamExt};
13use mountpoint_s3_crt::http::request_response::{Header, Headers};
14use mountpoint_s3_crt::s3::client::{MetaRequest, MetaRequestResult};
15use pin_project::pin_project;
16use tracing::trace;
17
18use crate::error_metadata::ClientErrorMetadata;
19use crate::object_client::{
20 Checksum, ChecksumMode, ClientBackpressureHandle, GetBodyPart, GetObjectError, GetObjectParams, GetObjectResponse,
21 ObjectChecksumError, ObjectClientError, ObjectClientResult, ObjectMetadata,
22};
23
24use super::{CancellingMetaRequest, ResponseHeadersError, S3CrtClient, S3Operation, S3RequestError, parse_checksum};
25
26impl S3CrtClient {
27 pub(super) async fn get_object(
30 &self,
31 bucket: &str,
32 key: &str,
33 params: &GetObjectParams,
34 ) -> Result<S3GetObjectResponse, ObjectClientError<GetObjectError, S3RequestError>> {
35 let requested_checksums = params.checksum_mode.as_ref() == Some(&ChecksumMode::Enabled);
36 let next_offset = params.range.as_ref().map(|r| r.start).unwrap_or(0);
37 let (event_sender, mut event_receiver) = futures::channel::mpsc::unbounded();
38 let meta_request = {
39 let span =
40 request_span!(self.inner, "get_object", bucket, key, range=?params.range, if_match=?params.if_match);
41
42 let mut message = self
43 .inner
44 .new_request_template("GET", bucket)
45 .map_err(S3RequestError::construction_failure)?;
46
47 message
49 .set_header(&Header::new("accept", "*/*"))
50 .map_err(S3RequestError::construction_failure)?;
51
52 if requested_checksums {
53 message
55 .set_header(&Header::new("x-amz-checksum-mode", "enabled"))
56 .map_err(S3RequestError::construction_failure)?;
57 }
58
59 if let Some(etag) = params.if_match.as_ref() {
60 message
62 .set_header(&Header::new("If-Match", etag.as_str()))
63 .map_err(S3RequestError::construction_failure)?;
64 }
65
66 if let Some(range) = params.range.as_ref() {
67 let range_value = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
69 message
70 .set_header(&Header::new("Range", range_value))
71 .map_err(S3RequestError::construction_failure)?;
72 }
73
74 let key = format!("/{key}");
75 message
76 .set_request_path(key)
77 .map_err(S3RequestError::construction_failure)?;
78
79 let mut options = message.into_options(S3Operation::GetObject);
80 options.part_size(self.inner.read_part_size as u64);
81
82 let mut headers_sender = Some(event_sender.clone());
83 let part_sender = event_sender.clone();
84
85 self.inner.meta_request_with_callbacks(
86 options,
87 span,
88 |_| (),
89 move |headers, status| {
90 if (200..300).contains(&status) {
93 if let Some(headers_sender) = headers_sender.take() {
96 _ = headers_sender.unbounded_send(S3GetObjectEvent::Headers(headers.clone()));
97 }
98 }
99 },
100 move |offset, data| {
101 let owned_buffer = data
102 .to_owned_buffer()
103 .expect("buffers returned from GetObject can always be acquired");
104 let bytes = Bytes::from_owner(owned_buffer);
105 let body_part = GetBodyPart { offset, data: bytes };
106 _ = part_sender.unbounded_send(S3GetObjectEvent::BodyPart(body_part));
107 },
108 parse_get_object_error,
109 move |result| {
110 if let Err(e) = result {
111 _ = event_sender.unbounded_send(S3GetObjectEvent::Error(e));
112 }
113 event_sender.close_channel();
114 },
115 )?
116 };
117
118 let headers = match event_receiver.next().await {
119 Some(S3GetObjectEvent::Headers(headers)) => headers,
120 Some(S3GetObjectEvent::Error(e)) => {
121 return Err(e);
122 }
123 event => {
124 trace!(?event, "unexpected GetObject event while waiting for headers");
126 return Err(S3RequestError::internal_failure(ResponseHeadersError::MissingHeaders).into());
127 }
128 };
129
130 let backpressure_handle = if self.inner.enable_backpressure {
131 let read_window_end_offset =
132 Arc::new(AtomicU64::new(next_offset + self.inner.initial_read_window_size as u64));
133 Some(S3BackpressureHandle {
134 read_window_end_offset,
135 meta_request: meta_request.clone(),
136 })
137 } else {
138 None
139 };
140 Ok(S3GetObjectResponse {
141 meta_request,
142 event_receiver,
143 requested_checksums,
144 backpressure_handle,
145 headers,
146 next_offset,
147 })
148 }
149}
150
151#[derive(Debug)]
152enum S3GetObjectEvent {
153 Headers(Headers),
154 BodyPart(GetBodyPart),
155 Error(ObjectClientError<GetObjectError, S3RequestError>),
156}
157
158#[derive(Clone, Debug)]
159pub struct S3BackpressureHandle {
160 read_window_end_offset: Arc<AtomicU64>,
163 meta_request: MetaRequest,
164}
165
166impl ClientBackpressureHandle for S3BackpressureHandle {
167 fn increment_read_window(&mut self, len: usize) {
168 self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
169 self.meta_request.increment_read_window(len as u64);
170 }
171
172 fn ensure_read_window(&mut self, desired_end_offset: u64) {
173 trace!(desired_end_offset, "applying new read window for meta request");
174 let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
175 self.increment_read_window(diff);
176 }
177
178 fn read_window_end_offset(&self) -> u64 {
179 self.read_window_end_offset.load(Ordering::SeqCst)
180 }
181}
182
183#[derive(Debug)]
189#[pin_project]
190pub struct S3GetObjectResponse {
191 meta_request: CancellingMetaRequest,
192 #[pin]
193 event_receiver: UnboundedReceiver<S3GetObjectEvent>,
194 requested_checksums: bool,
195 backpressure_handle: Option<S3BackpressureHandle>,
196 headers: Headers,
197 next_offset: u64,
199}
200
201#[cfg_attr(not(docsrs), async_trait)]
202impl GetObjectResponse for S3GetObjectResponse {
203 type BackpressureHandle = S3BackpressureHandle;
204 type ClientError = S3RequestError;
205
206 fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
207 self.backpressure_handle.as_mut()
208 }
209
210 fn get_object_metadata(&self) -> ObjectMetadata {
211 self.headers
212 .iter()
213 .filter_map(|(key, value)| {
214 let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?;
215 let value = value.to_str()?;
216 Some((metadata_header.to_string(), value.to_string()))
217 })
218 .collect()
219 }
220
221 fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
222 if !self.requested_checksums {
223 return Err(ObjectChecksumError::DidNotRequestChecksums);
224 }
225
226 parse_checksum(&self.headers).map_err(|e| ObjectChecksumError::HeadersError(Box::new(e)))
227 }
228}
229
230impl Stream for S3GetObjectResponse {
231 type Item = ObjectClientResult<GetBodyPart, GetObjectError, S3RequestError>;
232
233 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
234 if self.event_receiver.is_terminated() {
235 return Poll::Ready(None);
236 }
237
238 let this = self.project();
239 match this.event_receiver.poll_next(cx) {
240 Poll::Ready(None) => Poll::Ready(None),
241 Poll::Ready(Some(S3GetObjectEvent::BodyPart(part))) => {
242 *this.next_offset = part.offset + part.data.len() as u64;
243 Poll::Ready(Some(Ok(part)))
244 }
245 Poll::Ready(Some(S3GetObjectEvent::Headers(_))) => {
246 unreachable!("headers are only sent once and received before returning the stream")
247 }
248 Poll::Ready(Some(S3GetObjectEvent::Error(e))) => Poll::Ready(Some(Err(e))),
249 Poll::Pending => {
250 if let Some(handle) = &this.backpressure_handle
255 && *this.next_offset >= handle.read_window_end_offset()
256 {
257 let err = ObjectClientError::from(S3RequestError::EmptyReadWindow);
258 return Poll::Ready(Some(Err(err)));
259 }
260 Poll::Pending
261 }
262 }
263 }
264}
265
266fn parse_get_object_error(result: &MetaRequestResult) -> Option<GetObjectError> {
267 let client_error_metadata = ClientErrorMetadata::from_meta_request_result(result);
268 match result.response_status {
269 404 => {
270 let body = result.error_response_body.as_ref()?;
271 let root = xmltree::Element::parse(body.as_bytes()).ok()?;
272 let error_code = root.get_child("Code")?;
273 let error_str = error_code.get_text()?;
274 match error_str.deref() {
275 "NoSuchBucket" => Some(GetObjectError::NoSuchBucket(client_error_metadata)),
276 "NoSuchKey" => Some(GetObjectError::NoSuchKey(client_error_metadata)),
277 _ => None,
278 }
279 }
280 412 => Some(GetObjectError::PreconditionFailed(client_error_metadata)),
281 _ => None,
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::ffi::{OsStr, OsString};
288
289 use super::*;
290
291 fn make_result(response_status: i32, body: impl Into<OsString>) -> MetaRequestResult {
292 MetaRequestResult {
293 response_status,
294 crt_error: 1i32.into(),
295 error_response_headers: None,
296 error_response_body: Some(body.into()),
297 }
298 }
299
300 #[test]
301 fn parse_404_no_such_key() {
302 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Key>not-a-real-key</Key><RequestId>NTKJWKHQBYNS73A9</RequestId><HostId>Nc9kWNrf4kGoq5NIUnQ4t7u04ZZXGm/i463v+jwCI8sIrZBqeYI8uffLHQ+/qusdMWNuUwqeXHU=</HostId></Error>"#;
303 let result = make_result(404, OsStr::from_bytes(&body[..]));
304 let result = parse_get_object_error(&result);
305 assert!(matches!(result, Some(GetObjectError::NoSuchKey(_))));
306 }
307
308 #[test]
309 fn parse_404_no_such_bucket() {
310 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>amzn-s3-demo-bucket</BucketName><RequestId>4VAGDP5HMYTDNB3Y</RequestId><HostId>JMgGqpVKIaaTieG68IODiV2piWw/q9VCTowGvWP36BEz6oIVEXiesn8cDE5ph7if0gpY5WU1Wc8=</HostId></Error>"#;
311 let result = make_result(404, OsStr::from_bytes(&body[..]));
312 let result = parse_get_object_error(&result);
313 assert!(matches!(result, Some(GetObjectError::NoSuchBucket(_))));
314 }
315
316 #[test]
317 fn parse_403_glacier_storage_class() {
318 let body = br#"<?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidObjectState</Code><Message>The action is not valid for the object's storage class</Message><RequestId>9FEFFF118E15B86F</RequestId><HostId>WVQ5kzhiT+oiUfDCOiOYv8W4Tk9eNcxWi/MK+hTS/av34Xy4rBU3zsavf0aaaaa</HostId></Error>"#;
319 let result = make_result(403, OsStr::from_bytes(&body[..]));
320 let result = parse_get_object_error(&result);
321 assert_eq!(result, None);
322 }
323}