Skip to main content

oximedia_cloud/
generic.rs

1//! Generic S3-compatible storage implementation for `MinIO`, Wasabi, Backblaze B2, etc.
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use chrono::Utc;
6use reqwest::{Client, StatusCode};
7use std::collections::HashMap;
8use url::Url;
9
10use crate::error::{CloudError, Result};
11use crate::security::Credentials;
12use crate::types::{
13    CloudStorage, DeleteResult, ListResult, ObjectInfo, ObjectMetadata, StorageClass, StorageStats,
14    UploadOptions,
15};
16
17/// Generic S3-compatible storage backend
18pub struct GenericStorage {
19    client: Client,
20    endpoint: Url,
21    credentials: Credentials,
22    bucket: String,
23}
24
25impl GenericStorage {
26    /// Create a new generic S3-compatible storage backend
27    ///
28    /// # Errors
29    ///
30    /// Returns an error if configuration is invalid
31    pub fn new(endpoint: Url, credentials: Credentials, bucket: String) -> Result<Self> {
32        credentials.validate()?;
33
34        Ok(Self {
35            client: Client::new(),
36            endpoint,
37            credentials,
38            bucket,
39        })
40    }
41
42    /// Create bucket
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if bucket creation fails
47    pub async fn create_bucket(&self) -> Result<()> {
48        let url = format!("{}/{}", self.endpoint, self.bucket);
49
50        let response = self
51            .client
52            .put(&url)
53            .header("Authorization", self.auth_header("PUT", &self.bucket, ""))
54            .send()
55            .await?;
56
57        if !response.status().is_success() {
58            return Err(CloudError::Storage("Failed to create bucket".to_string()));
59        }
60
61        Ok(())
62    }
63
64    /// Delete bucket
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if bucket deletion fails
69    pub async fn delete_bucket(&self) -> Result<()> {
70        let url = format!("{}/{}", self.endpoint, self.bucket);
71
72        let response = self
73            .client
74            .delete(&url)
75            .header(
76                "Authorization",
77                self.auth_header("DELETE", &self.bucket, ""),
78            )
79            .send()
80            .await?;
81
82        if !response.status().is_success() {
83            return Err(CloudError::Storage("Failed to delete bucket".to_string()));
84        }
85
86        Ok(())
87    }
88
89    /// Generate authorization header (simplified AWS Signature V4)
90    fn auth_header(&self, _method: &str, _path: &str, _query: &str) -> String {
91        // Simplified authentication - in production would implement full AWS Signature V4
92        let date = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
93
94        // Basic authentication for compatible services
95        format!(
96            "AWS4-HMAC-SHA256 Credential={}/{}/us-east-1/s3/aws4_request",
97            self.credentials.access_key, date
98        )
99    }
100
101    /// Build object URL
102    fn object_url(&self, key: &str) -> String {
103        format!("{}/{}/{}", self.endpoint, self.bucket, key)
104    }
105}
106
107#[async_trait]
108impl CloudStorage for GenericStorage {
109    async fn upload(&self, key: &str, data: Bytes) -> Result<()> {
110        let url = self.object_url(key);
111
112        let response = self
113            .client
114            .put(&url)
115            .header(
116                "Authorization",
117                self.auth_header("PUT", &format!("/{key}"), ""),
118            )
119            .header("Content-Type", "application/octet-stream")
120            .body(data)
121            .send()
122            .await?;
123
124        if !response.status().is_success() {
125            return Err(CloudError::Storage("Failed to upload object".to_string()));
126        }
127
128        Ok(())
129    }
130
131    async fn upload_with_options(
132        &self,
133        key: &str,
134        data: Bytes,
135        options: UploadOptions,
136    ) -> Result<()> {
137        let url = self.object_url(key);
138
139        let mut request = self.client.put(&url).header(
140            "Authorization",
141            self.auth_header("PUT", &format!("/{key}"), ""),
142        );
143
144        if let Some(content_type) = options.content_type {
145            request = request.header("Content-Type", content_type);
146        } else {
147            request = request.header("Content-Type", "application/octet-stream");
148        }
149
150        if let Some(cache_control) = options.cache_control {
151            request = request.header("Cache-Control", cache_control);
152        }
153
154        if let Some(content_encoding) = options.content_encoding {
155            request = request.header("Content-Encoding", content_encoding);
156        }
157
158        // Add user metadata with x-amz-meta- prefix
159        for (key, value) in options.metadata {
160            request = request.header(format!("x-amz-meta-{key}"), value);
161        }
162
163        let response = request.body(data).send().await?;
164
165        if !response.status().is_success() {
166            return Err(CloudError::Storage("Failed to upload object".to_string()));
167        }
168
169        Ok(())
170    }
171
172    async fn download(&self, key: &str) -> Result<Bytes> {
173        let url = self.object_url(key);
174
175        let response = self
176            .client
177            .get(&url)
178            .header(
179                "Authorization",
180                self.auth_header("GET", &format!("/{key}"), ""),
181            )
182            .send()
183            .await?;
184
185        if !response.status().is_success() {
186            return Err(CloudError::Storage("Failed to download object".to_string()));
187        }
188
189        let data = response.bytes().await?;
190        Ok(data)
191    }
192
193    async fn download_range(&self, key: &str, start: u64, end: u64) -> Result<Bytes> {
194        let url = self.object_url(key);
195        let range_header = format!("bytes={start}-{end}");
196
197        let response = self
198            .client
199            .get(&url)
200            .header(
201                "Authorization",
202                self.auth_header("GET", &format!("/{key}"), ""),
203            )
204            .header("Range", range_header)
205            .send()
206            .await?;
207
208        if !response.status().is_success() && response.status() != StatusCode::PARTIAL_CONTENT {
209            return Err(CloudError::Storage(
210                "Failed to download object range".to_string(),
211            ));
212        }
213
214        let data = response.bytes().await?;
215        Ok(data)
216    }
217
218    async fn list(&self, prefix: &str) -> Result<Vec<ObjectInfo>> {
219        let url = format!(
220            "{}{}?list-type=2&prefix={}",
221            self.endpoint, self.bucket, prefix
222        );
223
224        let response = self
225            .client
226            .get(&url)
227            .header(
228                "Authorization",
229                self.auth_header("GET", "", &format!("list-type=2&prefix={prefix}")),
230            )
231            .send()
232            .await?;
233
234        if !response.status().is_success() {
235            return Err(CloudError::Storage("Failed to list objects".to_string()));
236        }
237
238        let text = response.text().await?;
239
240        // Parse XML response (simplified - in production would use proper XML parser)
241        let mut objects = Vec::new();
242
243        // This is a simplified parser - in production would use quick-xml or similar
244        for line in text.lines() {
245            if line.contains("<Key>") {
246                if let Some(key) = extract_xml_value(line, "Key") {
247                    // Create a minimal ObjectInfo
248                    objects.push(ObjectInfo {
249                        key,
250                        size: 0,
251                        last_modified: Utc::now(),
252                        etag: None,
253                        storage_class: None,
254                        content_type: None,
255                    });
256                }
257            }
258        }
259
260        Ok(objects)
261    }
262
263    async fn list_paginated(
264        &self,
265        prefix: &str,
266        continuation_token: Option<String>,
267        max_keys: usize,
268    ) -> Result<ListResult> {
269        let mut url = format!(
270            "{}{}?list-type=2&prefix={}&max-keys={}",
271            self.endpoint, self.bucket, prefix, max_keys
272        );
273
274        if let Some(token) = continuation_token {
275            url.push_str(&format!("&continuation-token={token}"));
276        }
277
278        let response = self
279            .client
280            .get(&url)
281            .header("Authorization", self.auth_header("GET", "", ""))
282            .send()
283            .await?;
284
285        if !response.status().is_success() {
286            return Err(CloudError::Storage("Failed to list objects".to_string()));
287        }
288
289        let text = response.text().await?;
290        let mut objects = Vec::new();
291
292        // Simplified XML parsing
293        for line in text.lines() {
294            if line.contains("<Key>") {
295                if let Some(key) = extract_xml_value(line, "Key") {
296                    objects.push(ObjectInfo {
297                        key,
298                        size: 0,
299                        last_modified: Utc::now(),
300                        etag: None,
301                        storage_class: None,
302                        content_type: None,
303                    });
304                }
305            }
306        }
307
308        let is_truncated = text.contains("<IsTruncated>true</IsTruncated>");
309
310        Ok(ListResult {
311            objects,
312            continuation_token: None,
313            is_truncated,
314            common_prefixes: Vec::new(),
315        })
316    }
317
318    async fn delete(&self, key: &str) -> Result<()> {
319        let url = self.object_url(key);
320
321        let response = self
322            .client
323            .delete(&url)
324            .header(
325                "Authorization",
326                self.auth_header("DELETE", &format!("/{key}"), ""),
327            )
328            .send()
329            .await?;
330
331        if !response.status().is_success() && response.status() != StatusCode::NOT_FOUND {
332            return Err(CloudError::Storage("Failed to delete object".to_string()));
333        }
334
335        Ok(())
336    }
337
338    async fn delete_batch(&self, keys: &[String]) -> Result<Vec<DeleteResult>> {
339        let mut results = Vec::new();
340
341        for key in keys {
342            match self.delete(key).await {
343                Ok(()) => results.push(DeleteResult {
344                    key: key.clone(),
345                    success: true,
346                    error: None,
347                }),
348                Err(e) => results.push(DeleteResult {
349                    key: key.clone(),
350                    success: false,
351                    error: Some(e.to_string()),
352                }),
353            }
354        }
355
356        Ok(results)
357    }
358
359    async fn get_metadata(&self, key: &str) -> Result<ObjectMetadata> {
360        let url = self.object_url(key);
361
362        let response = self
363            .client
364            .head(&url)
365            .header(
366                "Authorization",
367                self.auth_header("HEAD", &format!("/{key}"), ""),
368            )
369            .send()
370            .await?;
371
372        if !response.status().is_success() {
373            return Err(CloudError::Storage(
374                "Failed to get object metadata".to_string(),
375            ));
376        }
377
378        let headers = response.headers();
379        let size = headers
380            .get("Content-Length")
381            .and_then(|v| v.to_str().ok())
382            .and_then(|s| s.parse().ok())
383            .unwrap_or(0);
384
385        let content_type = headers
386            .get("Content-Type")
387            .and_then(|v| v.to_str().ok())
388            .map(ToString::to_string);
389
390        let info = ObjectInfo {
391            key: key.to_string(),
392            size,
393            last_modified: Utc::now(),
394            etag: headers
395                .get("ETag")
396                .and_then(|v| v.to_str().ok())
397                .map(ToString::to_string),
398            storage_class: None,
399            content_type,
400        };
401
402        let mut user_metadata = HashMap::new();
403        for (key, value) in headers {
404            if let Some(meta_key) = key.as_str().strip_prefix("x-amz-meta-") {
405                if let Ok(value_str) = value.to_str() {
406                    user_metadata.insert(meta_key.to_string(), value_str.to_string());
407                }
408            }
409        }
410
411        Ok(ObjectMetadata {
412            info,
413            user_metadata,
414            system_metadata: HashMap::new(),
415            tags: HashMap::new(),
416            content_encoding: headers
417                .get("Content-Encoding")
418                .and_then(|v| v.to_str().ok())
419                .map(ToString::to_string),
420            content_language: None,
421            cache_control: headers
422                .get("Cache-Control")
423                .and_then(|v| v.to_str().ok())
424                .map(ToString::to_string),
425            content_disposition: None,
426        })
427    }
428
429    async fn update_metadata(&self, key: &str, _metadata: HashMap<String, String>) -> Result<()> {
430        // S3-compatible APIs require copying the object to update metadata
431        self.copy(key, key).await
432    }
433
434    async fn exists(&self, key: &str) -> Result<bool> {
435        let url = self.object_url(key);
436
437        let response = self
438            .client
439            .head(&url)
440            .header(
441                "Authorization",
442                self.auth_header("HEAD", &format!("/{key}"), ""),
443            )
444            .send()
445            .await?;
446
447        Ok(response.status().is_success())
448    }
449
450    async fn copy(&self, source_key: &str, dest_key: &str) -> Result<()> {
451        let url = self.object_url(dest_key);
452        let copy_source = format!("/{}/{}", self.bucket, source_key);
453
454        let response = self
455            .client
456            .put(&url)
457            .header(
458                "Authorization",
459                self.auth_header("PUT", &format!("/{dest_key}"), ""),
460            )
461            .header("x-amz-copy-source", copy_source)
462            .send()
463            .await?;
464
465        if !response.status().is_success() {
466            return Err(CloudError::Storage("Failed to copy object".to_string()));
467        }
468
469        Ok(())
470    }
471
472    async fn presigned_download_url(&self, key: &str, expires_in_secs: u64) -> Result<String> {
473        // Simplified presigned URL generation
474        let url = self.object_url(key);
475        Ok(format!("{url}?expires={expires_in_secs}"))
476    }
477
478    async fn presigned_upload_url(&self, key: &str, expires_in_secs: u64) -> Result<String> {
479        let url = self.object_url(key);
480        Ok(format!("{url}?expires={expires_in_secs}"))
481    }
482
483    async fn set_storage_class(&self, _key: &str, _class: StorageClass) -> Result<()> {
484        // Most S3-compatible services don't support storage classes
485        Ok(())
486    }
487
488    async fn get_stats(&self, prefix: &str) -> Result<StorageStats> {
489        let objects = self.list(prefix).await?;
490
491        let mut stats = StorageStats::default();
492        for obj in objects {
493            stats.total_size += obj.size;
494            stats.object_count += 1;
495        }
496
497        Ok(stats)
498    }
499}
500
501/// Extract value from XML tag (simplified)
502fn extract_xml_value(line: &str, tag: &str) -> Option<String> {
503    let start_tag = format!("<{tag}>");
504    let end_tag = format!("</{tag}>");
505
506    if let Some(start) = line.find(&start_tag) {
507        if let Some(end) = line.find(&end_tag) {
508            let value_start = start + start_tag.len();
509            if value_start < end {
510                return Some(line[value_start..end].to_string());
511            }
512        }
513    }
514    None
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520
521    #[test]
522    fn test_extract_xml_value() {
523        let line = "<Key>test-object.txt</Key>";
524        let value = extract_xml_value(line, "Key");
525        assert_eq!(value, Some("test-object.txt".to_string()));
526
527        let line2 = "<Size>1024</Size>";
528        let value2 = extract_xml_value(line2, "Size");
529        assert_eq!(value2, Some("1024".to_string()));
530    }
531
532    #[test]
533    fn test_extract_xml_value_no_match() {
534        let line = "<Name>bucket</Name>";
535        let value = extract_xml_value(line, "Key");
536        assert_eq!(value, None);
537    }
538
539    #[test]
540    fn test_generic_storage_creation() {
541        let endpoint = Url::parse("https://s3.example.com").expect("endpoint should be valid");
542        let credentials = Credentials::new("access".to_string(), "secret".to_string());
543        let storage = GenericStorage::new(endpoint, credentials, "test-bucket".to_string());
544        assert!(storage.is_ok());
545    }
546}