1use async_stream;
2use async_trait::async_trait;
3use aws_config::{BehaviorVersion, Region};
4use aws_credential_types::Credentials;
5use aws_sdk_s3::{primitives::ByteStream as AwsByteStream, Client};
6use futures::StreamExt;
7use std::env;
8
9use crate::{
10 BlobError, BlobInfo, BlobMetadata, BlobResult, BlobStore, ByteRange, ByteStream, GetResult, ObjectHead, PutResult,
11 StoreCapabilities,
12};
13
14#[derive(Debug)]
16pub struct S3Config {
17 pub region: String,
18 pub access_key_id: String,
19 pub secret_access_key: String,
20 pub endpoint_url: String,
21}
22
23impl S3Config {
24 pub fn from_env() -> BlobResult<Self> {
25 fn get_env(key: &str) -> BlobResult<String> {
26 env::var(key).map_err(|_| BlobError::invalid(format!("{} environment variable required", key)))
27 }
28
29 Ok(Self {
30 region: get_env("RUSTFS_REGION")?,
31 access_key_id: get_env("RUSTFS_ACCESS_KEY_ID")?,
32 secret_access_key: get_env("RUSTFS_SECRET_ACCESS_KEY")?,
33 endpoint_url: get_env("RUSTFS_ENDPOINT_URL")?,
34 })
35 }
36}
37
38#[derive(Clone)]
40pub struct S3CompatibleStore {
41 client: Client,
42 bucket: String,
43}
44
45impl S3CompatibleStore {
46 pub async fn new(bucket: String) -> BlobResult<Self> {
47 let config = S3Config::from_env()?;
48 let client = Self::create_client(config).await;
49 Ok(Self { client, bucket })
50 }
51
52 pub async fn with_config(bucket: String, config: S3Config) -> Self {
53 let client = Self::create_client(config).await;
54 Self { client, bucket }
55 }
56
57 async fn create_client(config: S3Config) -> Client {
58 let credentials = Credentials::new(
59 config.access_key_id,
60 config.secret_access_key,
61 None,
62 None,
63 "s3-compatible",
64 );
65
66 let aws_config = aws_config::defaults(BehaviorVersion::latest())
67 .region(Region::new(config.region))
68 .credentials_provider(credentials)
69 .endpoint_url(config.endpoint_url)
70 .load()
71 .await;
72
73 Client::from_conf(
74 aws_sdk_s3::config::Builder::from(&aws_config)
75 .force_path_style(true) .build(),
77 )
78 }
79
80 async fn collect_stream(&self, stream: &mut ByteStream) -> BlobResult<Vec<u8>> {
81 let mut data = Vec::new();
82 while let Some(chunk) = stream.next().await {
83 let chunk = chunk.map_err(Self::map_aws_error)?;
84 data.extend_from_slice(&chunk);
85 }
86 Ok(data)
87 }
88
89 fn format_range(&self, range: &ByteRange) -> String {
90 match range.end {
91 Some(end) => format!("bytes={}-{}", range.start, end),
92 None => format!("bytes={}-", range.start),
93 }
94 }
95
96 fn resolve_range(&self, range: &ByteRange, content_length: u64) -> crate::store::ResolvedRange {
97 crate::store::ResolvedRange {
98 start: range.start,
99 end: range.end.unwrap_or(content_length.saturating_sub(1)),
100 total_size: content_length,
101 }
102 }
103
104 fn map_aws_error(err: impl std::error::Error + Send + Sync + 'static) -> BlobError {
105 BlobError::backend(err)
106 }
107
108 pub fn add_metadata_to_request(
110 mut request: aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder,
111 metadata: &BlobMetadata,
112 ) -> aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder {
113 macro_rules! add_optional_metadata {
115 ($field:expr, $key:literal) => {
116 if let Some(value) = $field {
117 request = request.metadata($key, value);
118 }
119 };
120 ($field:expr, $key:literal, to_string) => {
121 if let Some(value) = $field {
122 request = request.metadata($key, &value.to_string());
123 }
124 };
125 }
126
127 add_optional_metadata!(&metadata.title, "title");
128 add_optional_metadata!(&metadata.artist, "artist");
129 add_optional_metadata!(&metadata.album, "album");
130 add_optional_metadata!(&metadata.genre, "genre");
131 add_optional_metadata!(metadata.year, "year", to_string);
132 add_optional_metadata!(metadata.duration, "duration", to_string);
133 add_optional_metadata!(metadata.bitrate, "bitrate", to_string);
134 add_optional_metadata!(metadata.sample_rate, "sample_rate", to_string);
135 add_optional_metadata!(metadata.channels, "channels", to_string);
136 add_optional_metadata!(&metadata.encoding, "encoding");
137 add_optional_metadata!(&metadata.thumbnail_url, "thumbnail_url");
138 add_optional_metadata!(&metadata.album_art_url, "album_art_url");
139 add_optional_metadata!(metadata.latitude, "latitude", to_string);
140 add_optional_metadata!(metadata.longitude, "longitude", to_string);
141 add_optional_metadata!(&metadata.location_name, "location_name");
142
143 for (key, value) in &metadata.custom {
145 request = request.metadata(key, value);
146 }
147
148 request
149 }
150
151 pub fn extract_blob_metadata(head_result: &aws_sdk_s3::operation::head_object::HeadObjectOutput) -> BlobMetadata {
153 let mut metadata = BlobMetadata::default();
154
155 if let Some(s3_metadata) = head_result.metadata() {
156 metadata.title = s3_metadata.get("title").map(|s| s.to_string());
158 metadata.artist = s3_metadata.get("artist").map(|s| s.to_string());
159 metadata.album = s3_metadata.get("album").map(|s| s.to_string());
160 metadata.genre = s3_metadata.get("genre").map(|s| s.to_string());
161 metadata.year = s3_metadata.get("year").and_then(|s| s.parse().ok());
162 metadata.duration = s3_metadata.get("duration").and_then(|s| s.parse().ok());
163 metadata.bitrate = s3_metadata.get("bitrate").and_then(|s| s.parse().ok());
164
165 metadata.thumbnail_url = s3_metadata.get("thumbnail_url").map(|s| s.to_string());
167 metadata.album_art_url = s3_metadata.get("album_art_url").map(|s| s.to_string());
168
169 metadata.latitude = s3_metadata.get("latitude").and_then(|s| s.parse().ok());
171 metadata.longitude = s3_metadata.get("longitude").and_then(|s| s.parse().ok());
172 metadata.location_name = s3_metadata.get("location_name").map(|s| s.to_string());
173
174 metadata.encoding = s3_metadata.get("encoding").map(|s| s.to_string());
176 metadata.sample_rate = s3_metadata.get("sample_rate").and_then(|s| s.parse().ok());
177 metadata.channels = s3_metadata.get("channels").and_then(|s| s.parse().ok());
178
179 for (key, value) in s3_metadata {
181 if !matches!(key.as_str(),
182 "filename" | "title" | "artist" | "album" | "genre" | "year" |
183 "duration" | "bitrate" | "thumbnail_url" | "album_art_url" |
184 "latitude" | "longitude" | "location_name" | "encoding" |
185 "sample_rate" | "channels"
186 ) {
187 metadata.custom.insert(key.clone(), value.clone());
188 }
189 }
190 }
191
192 metadata.mime_type = head_result.content_type().map(|s| s.to_string());
194
195 metadata
196 }
197}
198
199#[async_trait]
200impl BlobStore for S3CompatibleStore {
201 fn as_any(&self) -> &dyn std::any::Any {
202 self
203 }
204
205 async fn put(
206 &self,
207 key: &str,
208 content_type: Option<&str>,
209 mut stream: ByteStream,
210 ) -> BlobResult<PutResult> {
211 let data = self.collect_stream(&mut stream).await?;
212 let aws_stream = AwsByteStream::from(data.clone());
213
214 let mut request = self.client
215 .put_object()
216 .bucket(&self.bucket)
217 .key(key)
218 .body(aws_stream);
219
220 if let Some(ct) = content_type {
221 request = request.content_type(ct);
222 }
223
224 let result = request.send().await.map_err(Self::map_aws_error)?;
225
226 Ok(PutResult {
227 etag: result.e_tag,
228 size_bytes: data.len() as u64,
229 checksum: None,
230 })
231 }
232
233 async fn put_with_metadata(
234 &self,
235 key: &str,
236 content_type: Option<&str>,
237 filename: Option<&str>,
238 mut stream: ByteStream,
239 ) -> BlobResult<PutResult> {
240 let data = self.collect_stream(&mut stream).await?;
241 let aws_stream = AwsByteStream::from(data.clone());
242
243 let mut request = self.client
244 .put_object()
245 .bucket(&self.bucket)
246 .key(key)
247 .body(aws_stream);
248
249 if let Some(ct) = content_type {
250 request = request.content_type(ct);
251 }
252
253 if let Some(filename) = filename {
255 request = request.metadata("filename", filename);
256 }
257
258 let result = request.send().await.map_err(Self::map_aws_error)?;
259
260 Ok(PutResult {
261 etag: result.e_tag,
262 size_bytes: data.len() as u64,
263 checksum: None,
264 })
265 }
266
267 async fn get(&self, key: &str, range: Option<ByteRange>) -> BlobResult<GetResult> {
268 let mut request = self.client.get_object().bucket(&self.bucket).key(key);
269
270 if let Some(ref range) = range {
271 request = request.range(self.format_range(range));
272 }
273
274 let result = request.send().await.map_err(Self::map_aws_error)?;
275 let content_length = result.content_length.unwrap_or(0) as u64;
276
277 let resolved_range = range.map(|r| self.resolve_range(&r, content_length));
278
279 Ok(GetResult {
280 stream: Box::pin(async_stream::stream! {
281 let mut body = result.body;
282 while let Some(chunk) = body.next().await {
283 match chunk {
284 Ok(bytes) => yield Ok(bytes.into()),
285 Err(e) => yield Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
286 }
287 }
288 }),
289 size_bytes: content_length,
290 content_type: result.content_type,
291 etag: result.e_tag,
292 resolved_range,
293 })
294 }
295
296 async fn head(&self, key: &str) -> BlobResult<ObjectHead> {
297 let result = self.client
298 .head_object()
299 .bucket(&self.bucket)
300 .key(key)
301 .send()
302 .await
303 .map_err(Self::map_aws_error)?;
304
305 Ok(ObjectHead {
306 size_bytes: result.content_length.unwrap_or(0) as u64,
307 content_type: result.content_type,
308 etag: result.e_tag,
309 last_modified: result.last_modified.map(|dt| dt.secs()),
310 })
311 }
312
313 async fn delete(&self, key: &str) -> BlobResult<()> {
314 self.client
315 .delete_object()
316 .bucket(&self.bucket)
317 .key(key)
318 .send()
319 .await
320 .map_err(Self::map_aws_error)?;
321 Ok(())
322 }
323
324 async fn list(&self, prefix: Option<&str>, limit: Option<usize>) -> BlobResult<Vec<BlobInfo>> {
325 let mut request = self.client
326 .list_objects_v2()
327 .bucket(&self.bucket);
328
329 if let Some(prefix) = prefix {
330 request = request.prefix(prefix);
331 }
332
333 if let Some(limit) = limit {
334 request = request.max_keys(limit as i32);
335 }
336
337 let result = request.send().await.map_err(Self::map_aws_error)?;
338
339 let mut blobs = Vec::new();
340 if let Some(objects) = result.contents {
341 for object in objects {
342 if let Some(key) = object.key {
343 let head_result = self.client
345 .head_object()
346 .bucket(&self.bucket)
347 .key(&key)
348 .send()
349 .await
350 .map_err(Self::map_aws_error)?;
351
352 let filename = head_result.metadata()
354 .and_then(|metadata| metadata.get("filename"))
355 .map(|f| f.to_string());
356
357 let metadata = Self::extract_blob_metadata(&head_result);
359
360 blobs.push(BlobInfo {
361 key: key.clone(),
362 size_bytes: object.size.unwrap_or(0) as u64,
363 content_type: head_result.content_type.clone(),
364 filename,
365 etag: object.e_tag,
366 last_modified: object.last_modified.map(|dt| dt.secs()),
367 metadata,
368 });
369 }
370 }
371 }
372
373 Ok(blobs)
374 }
375
376 fn capabilities(&self) -> StoreCapabilities {
377 StoreCapabilities::basic().with_range().with_signed_urls()
378 }
379}