Skip to main content

dog_blob/
s3_store.rs

1use async_stream;
2use async_trait::async_trait;
3use aws_config::{BehaviorVersion, Region};
4use aws_credential_types::Credentials;
5use aws_sdk_s3::{primitives::ByteStream as AwsByteStream, Client};
6use futures::StreamExt;
7use std::env;
8
9use crate::{
10    BlobError, BlobInfo, BlobMetadata, BlobResult, BlobStore, ByteRange, ByteStream, GetResult, ObjectHead, PutResult,
11    StoreCapabilities,
12};
13
14/// S3-compatible configuration from environment variables
15#[derive(Debug)]
16pub struct S3Config {
17    pub region: String,
18    pub access_key_id: String,
19    pub secret_access_key: String,
20    pub endpoint_url: String,
21}
22
23impl S3Config {
24    pub fn from_env() -> BlobResult<Self> {
25        fn get_env(key: &str) -> BlobResult<String> {
26            env::var(key).map_err(|_| BlobError::invalid(format!("{} environment variable required", key)))
27        }
28
29        Ok(Self {
30            region: get_env("RUSTFS_REGION")?,
31            access_key_id: get_env("RUSTFS_ACCESS_KEY_ID")?,
32            secret_access_key: get_env("RUSTFS_SECRET_ACCESS_KEY")?,
33            endpoint_url: get_env("RUSTFS_ENDPOINT_URL")?,
34        })
35    }
36}
37
38/// Generic S3-compatible blob store implementation
39#[derive(Clone)]
40pub struct S3CompatibleStore {
41    client: Client,
42    bucket: String,
43}
44
45impl S3CompatibleStore {
46    pub async fn new(bucket: String) -> BlobResult<Self> {
47        let config = S3Config::from_env()?;
48        let client = Self::create_client(config).await;
49        Ok(Self { client, bucket })
50    }
51
52    pub async fn with_config(bucket: String, config: S3Config) -> Self {
53        let client = Self::create_client(config).await;
54        Self { client, bucket }
55    }
56
57    async fn create_client(config: S3Config) -> Client {
58        let credentials = Credentials::new(
59            config.access_key_id,
60            config.secret_access_key,
61            None,
62            None,
63            "s3-compatible",
64        );
65
66        let aws_config = aws_config::defaults(BehaviorVersion::latest())
67            .region(Region::new(config.region))
68            .credentials_provider(credentials)
69            .endpoint_url(config.endpoint_url)
70            .load()
71            .await;
72
73        Client::from_conf(
74            aws_sdk_s3::config::Builder::from(&aws_config)
75                .force_path_style(true) // Required for S3-compatible services
76                .build(),
77        )
78    }
79
80    async fn collect_stream(&self, stream: &mut ByteStream) -> BlobResult<Vec<u8>> {
81        let mut data = Vec::new();
82        while let Some(chunk) = stream.next().await {
83            let chunk = chunk.map_err(Self::map_aws_error)?;
84            data.extend_from_slice(&chunk);
85        }
86        Ok(data)
87    }
88
89    fn format_range(&self, range: &ByteRange) -> String {
90        match range.end {
91            Some(end) => format!("bytes={}-{}", range.start, end),
92            None => format!("bytes={}-", range.start),
93        }
94    }
95
96    fn resolve_range(&self, range: &ByteRange, content_length: u64) -> crate::store::ResolvedRange {
97        crate::store::ResolvedRange {
98            start: range.start,
99            end: range.end.unwrap_or(content_length.saturating_sub(1)),
100            total_size: content_length,
101        }
102    }
103
104    fn map_aws_error(err: impl std::error::Error + Send + Sync + 'static) -> BlobError {
105        BlobError::backend(err)
106    }
107
108    /// Add metadata fields to S3 put request
109    pub fn add_metadata_to_request(
110        mut request: aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder,
111        metadata: &BlobMetadata,
112    ) -> aws_sdk_s3::operation::put_object::builders::PutObjectFluentBuilder {
113        // Helper macro to reduce repetition
114        macro_rules! add_optional_metadata {
115            ($field:expr, $key:literal) => {
116                if let Some(value) = $field {
117                    request = request.metadata($key, value);
118                }
119            };
120            ($field:expr, $key:literal, to_string) => {
121                if let Some(value) = $field {
122                    request = request.metadata($key, &value.to_string());
123                }
124            };
125        }
126
127        add_optional_metadata!(&metadata.title, "title");
128        add_optional_metadata!(&metadata.artist, "artist");
129        add_optional_metadata!(&metadata.album, "album");
130        add_optional_metadata!(&metadata.genre, "genre");
131        add_optional_metadata!(metadata.year, "year", to_string);
132        add_optional_metadata!(metadata.duration, "duration", to_string);
133        add_optional_metadata!(metadata.bitrate, "bitrate", to_string);
134        add_optional_metadata!(metadata.sample_rate, "sample_rate", to_string);
135        add_optional_metadata!(metadata.channels, "channels", to_string);
136        add_optional_metadata!(&metadata.encoding, "encoding");
137        add_optional_metadata!(&metadata.thumbnail_url, "thumbnail_url");
138        add_optional_metadata!(&metadata.album_art_url, "album_art_url");
139        add_optional_metadata!(metadata.latitude, "latitude", to_string);
140        add_optional_metadata!(metadata.longitude, "longitude", to_string);
141        add_optional_metadata!(&metadata.location_name, "location_name");
142
143        // Add custom attributes
144        for (key, value) in &metadata.custom {
145            request = request.metadata(key, value);
146        }
147
148        request
149    }
150
151    /// Extract rich metadata from S3 head_object response
152    pub fn extract_blob_metadata(head_result: &aws_sdk_s3::operation::head_object::HeadObjectOutput) -> BlobMetadata {
153        let mut metadata = BlobMetadata::default();
154
155        if let Some(s3_metadata) = head_result.metadata() {
156            // Audio metadata
157            metadata.title = s3_metadata.get("title").map(|s| s.to_string());
158            metadata.artist = s3_metadata.get("artist").map(|s| s.to_string());
159            metadata.album = s3_metadata.get("album").map(|s| s.to_string());
160            metadata.genre = s3_metadata.get("genre").map(|s| s.to_string());
161            metadata.year = s3_metadata.get("year").and_then(|s| s.parse().ok());
162            metadata.duration = s3_metadata.get("duration").and_then(|s| s.parse().ok());
163            metadata.bitrate = s3_metadata.get("bitrate").and_then(|s| s.parse().ok());
164
165            // Visual metadata
166            metadata.thumbnail_url = s3_metadata.get("thumbnail_url").map(|s| s.to_string());
167            metadata.album_art_url = s3_metadata.get("album_art_url").map(|s| s.to_string());
168
169            // Location metadata
170            metadata.latitude = s3_metadata.get("latitude").and_then(|s| s.parse().ok());
171            metadata.longitude = s3_metadata.get("longitude").and_then(|s| s.parse().ok());
172            metadata.location_name = s3_metadata.get("location_name").map(|s| s.to_string());
173
174            // Technical metadata
175            metadata.encoding = s3_metadata.get("encoding").map(|s| s.to_string());
176            metadata.sample_rate = s3_metadata.get("sample_rate").and_then(|s| s.parse().ok());
177            metadata.channels = s3_metadata.get("channels").and_then(|s| s.parse().ok());
178
179            // Custom attributes (any metadata not in standard fields)
180            for (key, value) in s3_metadata {
181                if !matches!(key.as_str(), 
182                    "filename" | "title" | "artist" | "album" | "genre" | "year" | 
183                    "duration" | "bitrate" | "thumbnail_url" | "album_art_url" |
184                    "latitude" | "longitude" | "location_name" | "encoding" |
185                    "sample_rate" | "channels"
186                ) {
187                    metadata.custom.insert(key.clone(), value.clone());
188                }
189            }
190        }
191
192        // Set mime_type from content_type
193        metadata.mime_type = head_result.content_type().map(|s| s.to_string());
194
195        metadata
196    }
197}
198
199#[async_trait]
200impl BlobStore for S3CompatibleStore {
201    fn as_any(&self) -> &dyn std::any::Any {
202        self
203    }
204
205    async fn put(
206        &self,
207        key: &str,
208        content_type: Option<&str>,
209        mut stream: ByteStream,
210    ) -> BlobResult<PutResult> {
211        let data = self.collect_stream(&mut stream).await?;
212        let aws_stream = AwsByteStream::from(data.clone());
213
214        let mut request = self.client
215            .put_object()
216            .bucket(&self.bucket)
217            .key(key)
218            .body(aws_stream);
219
220        if let Some(ct) = content_type {
221            request = request.content_type(ct);
222        }
223
224        let result = request.send().await.map_err(Self::map_aws_error)?;
225
226        Ok(PutResult {
227            etag: result.e_tag,
228            size_bytes: data.len() as u64,
229            checksum: None,
230        })
231    }
232
233    async fn put_with_metadata(
234        &self,
235        key: &str,
236        content_type: Option<&str>,
237        filename: Option<&str>,
238        mut stream: ByteStream,
239    ) -> BlobResult<PutResult> {
240        let data = self.collect_stream(&mut stream).await?;
241        let aws_stream = AwsByteStream::from(data.clone());
242
243        let mut request = self.client
244            .put_object()
245            .bucket(&self.bucket)
246            .key(key)
247            .body(aws_stream);
248
249        if let Some(ct) = content_type {
250            request = request.content_type(ct);
251        }
252
253        // Add filename as metadata if provided
254        if let Some(filename) = filename {
255            request = request.metadata("filename", filename);
256        }
257
258        let result = request.send().await.map_err(Self::map_aws_error)?;
259
260        Ok(PutResult {
261            etag: result.e_tag,
262            size_bytes: data.len() as u64,
263            checksum: None,
264        })
265    }
266
267    async fn get(&self, key: &str, range: Option<ByteRange>) -> BlobResult<GetResult> {
268        let mut request = self.client.get_object().bucket(&self.bucket).key(key);
269
270        if let Some(ref range) = range {
271            request = request.range(self.format_range(range));
272        }
273
274        let result = request.send().await.map_err(Self::map_aws_error)?;
275        let content_length = result.content_length.unwrap_or(0) as u64;
276
277        let resolved_range = range.map(|r| self.resolve_range(&r, content_length));
278
279        Ok(GetResult {
280            stream: Box::pin(async_stream::stream! {
281                let mut body = result.body;
282                while let Some(chunk) = body.next().await {
283                    match chunk {
284                        Ok(bytes) => yield Ok(bytes.into()),
285                        Err(e) => yield Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
286                    }
287                }
288            }),
289            size_bytes: content_length,
290            content_type: result.content_type,
291            etag: result.e_tag,
292            resolved_range,
293        })
294    }
295
296    async fn head(&self, key: &str) -> BlobResult<ObjectHead> {
297        let result = self.client
298            .head_object()
299            .bucket(&self.bucket)
300            .key(key)
301            .send()
302            .await
303            .map_err(Self::map_aws_error)?;
304
305        Ok(ObjectHead {
306            size_bytes: result.content_length.unwrap_or(0) as u64,
307            content_type: result.content_type,
308            etag: result.e_tag,
309            last_modified: result.last_modified.map(|dt| dt.secs()),
310        })
311    }
312
313    async fn delete(&self, key: &str) -> BlobResult<()> {
314        self.client
315            .delete_object()
316            .bucket(&self.bucket)
317            .key(key)
318            .send()
319            .await
320            .map_err(Self::map_aws_error)?;
321        Ok(())
322    }
323
324    async fn list(&self, prefix: Option<&str>, limit: Option<usize>) -> BlobResult<Vec<BlobInfo>> {
325        let mut request = self.client
326            .list_objects_v2()
327            .bucket(&self.bucket);
328
329        if let Some(prefix) = prefix {
330            request = request.prefix(prefix);
331        }
332
333        if let Some(limit) = limit {
334            request = request.max_keys(limit as i32);
335        }
336
337        let result = request.send().await.map_err(Self::map_aws_error)?;
338
339        let mut blobs = Vec::new();
340        if let Some(objects) = result.contents {
341            for object in objects {
342                if let Some(key) = object.key {
343                    // Get additional metadata including filename from head_object
344                    let head_result = self.client
345                        .head_object()
346                        .bucket(&self.bucket)
347                        .key(&key)
348                        .send()
349                        .await
350                        .map_err(Self::map_aws_error)?;
351
352                    // Extract filename from metadata if available
353                    let filename = head_result.metadata()
354                        .and_then(|metadata| metadata.get("filename"))
355                        .map(|f| f.to_string());
356
357                    // Extract rich metadata from S3 object metadata
358                    let metadata = Self::extract_blob_metadata(&head_result);
359
360                    blobs.push(BlobInfo {
361                        key: key.clone(),
362                        size_bytes: object.size.unwrap_or(0) as u64,
363                        content_type: head_result.content_type.clone(),
364                        filename,
365                        etag: object.e_tag,
366                        last_modified: object.last_modified.map(|dt| dt.secs()),
367                        metadata,
368                    });
369                }
370            }
371        }
372
373        Ok(blobs)
374    }
375
376    fn capabilities(&self) -> StoreCapabilities {
377        StoreCapabilities::basic().with_range().with_signed_urls()
378    }
379}