wsi_streamer/io/
s3_reader.rs1use async_trait::async_trait;
2use aws_sdk_s3::Client;
3use bytes::Bytes;
4
5use super::RangeReader;
6use crate::error::IoError;
7
8#[derive(Clone)]
13pub struct S3RangeReader {
14 client: Client,
15 bucket: String,
16 key: String,
17 size: u64,
18 identifier: String,
19}
20
21impl S3RangeReader {
22 pub async fn new(client: Client, bucket: String, key: String) -> Result<Self, IoError> {
27 let head = client
28 .head_object()
29 .bucket(&bucket)
30 .key(&key)
31 .send()
32 .await
33 .map_err(|e| {
34 let is_not_found = e
37 .as_service_error()
38 .map(|se| se.is_not_found())
39 .unwrap_or(false);
40
41 if is_not_found {
42 return IoError::NotFound(format!("s3://{}/{}", bucket, key));
43 }
44
45 let status_is_404 = e
47 .raw_response()
48 .map(|r| r.status().as_u16() == 404)
49 .unwrap_or(false);
50
51 if status_is_404 {
52 return IoError::NotFound(format!("s3://{}/{}", bucket, key));
53 }
54
55 let err_str = e.to_string();
57 if err_str.contains("NotFound")
58 || err_str.contains("NoSuchKey")
59 || err_str.contains("404")
60 {
61 return IoError::NotFound(format!("s3://{}/{}", bucket, key));
62 }
63
64 IoError::S3(err_str)
65 })?;
66
67 let size = head.content_length().unwrap_or(0) as u64;
68 let identifier = format!("s3://{}/{}", bucket, key);
69
70 Ok(Self {
71 client,
72 bucket,
73 key,
74 size,
75 identifier,
76 })
77 }
78
79 pub fn bucket(&self) -> &str {
81 &self.bucket
82 }
83
84 pub fn key(&self) -> &str {
86 &self.key
87 }
88}
89
90#[async_trait]
91impl RangeReader for S3RangeReader {
92 async fn read_exact_at(&self, offset: u64, len: usize) -> Result<Bytes, IoError> {
93 if offset + len as u64 > self.size {
95 return Err(IoError::RangeOutOfBounds {
96 offset,
97 requested: len as u64,
98 size: self.size,
99 });
100 }
101
102 if len == 0 {
104 return Ok(Bytes::new());
105 }
106
107 let range = format!("bytes={}-{}", offset, offset + len as u64 - 1);
109
110 let resp = self
111 .client
112 .get_object()
113 .bucket(&self.bucket)
114 .key(&self.key)
115 .range(range)
116 .send()
117 .await
118 .map_err(|e| IoError::S3(e.to_string()))?;
119
120 let data = resp
121 .body
122 .collect()
123 .await
124 .map_err(|e| IoError::Connection(e.to_string()))?
125 .into_bytes();
126
127 Ok(data)
128 }
129
130 fn size(&self) -> u64 {
131 self.size
132 }
133
134 fn identifier(&self) -> &str {
135 &self.identifier
136 }
137}
138
139pub async fn create_s3_client(endpoint_url: Option<&str>, region: &str) -> Client {
151 let region = aws_config::Region::new(region.to_string());
152 let mut config_loader =
153 aws_config::defaults(aws_config::BehaviorVersion::latest()).region(region);
154
155 if let Some(endpoint) = endpoint_url {
156 config_loader = config_loader.endpoint_url(endpoint);
157 }
158
159 let sdk_config = config_loader.load().await;
160
161 let s3_config = if endpoint_url.is_some() {
163 aws_sdk_s3::config::Builder::from(&sdk_config)
164 .force_path_style(true)
165 .build()
166 } else {
167 aws_sdk_s3::config::Builder::from(&sdk_config).build()
168 };
169
170 Client::from_conf(s3_config)
171}
172
173#[cfg(test)]
174mod tests {
175 }