Skip to main content

alimentar/backend/
s3.rs

1//! S3-compatible storage backend.
2//!
3//! Supports AWS S3, MinIO, Ceph, Cloudflare R2, Scaleway, OVH, and other
4//! S3-compatible object stores.
5
6use std::sync::Arc;
7
8use aws_config::BehaviorVersion;
9use aws_sdk_s3::{
10    config::{Credentials, Region},
11    primitives::ByteStream,
12    Client,
13};
14use bytes::Bytes;
15use tokio::runtime::Runtime;
16
17use super::StorageBackend;
18use crate::error::{Error, Result};
19
20/// Configuration for S3 backend credentials.
21#[derive(Debug, Clone)]
22pub enum CredentialSource {
23    /// Use environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
24    Environment,
25    /// Use static credentials.
26    Static {
27        /// Access key ID.
28        access_key: String,
29        /// Secret access key.
30        secret_key: String,
31    },
32    /// Anonymous/public access.
33    Anonymous,
34}
35
36/// A storage backend using S3-compatible object storage.
37///
38/// This backend supports AWS S3 and any S3-compatible service like MinIO,
39/// Ceph, Cloudflare R2, Scaleway, OVH, Wasabi, and Backblaze B2.
40///
41/// # Example
42///
43/// ```no_run
44/// use alimentar::backend::{CredentialSource, S3Backend};
45///
46/// // AWS S3
47/// let backend = S3Backend::new(
48///     "my-bucket",
49///     "us-east-1",
50///     None, // Use default AWS endpoint
51///     CredentialSource::Environment,
52/// )
53/// .unwrap();
54///
55/// // MinIO (local)
56/// let backend = S3Backend::new(
57///     "datasets",
58///     "us-east-1",
59///     Some("http://localhost:9000".to_string()),
60///     CredentialSource::Static {
61///         access_key: "minioadmin".to_string(),
62///         secret_key: "minioadmin".to_string(),
63///     },
64/// )
65/// .unwrap();
66/// ```
67pub struct S3Backend {
68    client: Client,
69    bucket: String,
70    runtime: Arc<Runtime>,
71}
72
73impl std::fmt::Debug for S3Backend {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("S3Backend")
76            .field("bucket", &self.bucket)
77            .finish_non_exhaustive()
78    }
79}
80
81impl S3Backend {
82    /// Creates a new S3 backend.
83    ///
84    /// # Arguments
85    ///
86    /// * `bucket` - The S3 bucket name
87    /// * `region` - AWS region (e.g., "us-east-1")
88    /// * `endpoint` - Optional custom endpoint for S3-compatible services
89    /// * `credentials` - Credential source for authentication
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the runtime or client cannot be created.
94    pub fn new(
95        bucket: impl Into<String>,
96        region: impl Into<String>,
97        endpoint: Option<String>,
98        credentials: CredentialSource,
99    ) -> Result<Self> {
100        let runtime =
101            Runtime::new().map_err(|e| Error::storage(format!("Failed to create runtime: {e}")))?;
102
103        let bucket = bucket.into();
104        let region = region.into();
105
106        let client = runtime
107            .block_on(async { Self::create_client(&region, endpoint, credentials).await })?;
108
109        Ok(Self {
110            client,
111            bucket,
112            runtime: Arc::new(runtime),
113        })
114    }
115
116    async fn create_client(
117        region: &str,
118        endpoint: Option<String>,
119        credentials: CredentialSource,
120    ) -> Result<Client> {
121        let region = Region::new(region.to_string());
122
123        let mut config_loader = aws_config::defaults(BehaviorVersion::latest()).region(region);
124
125        // Set credentials based on source
126        match credentials {
127            CredentialSource::Environment => {
128                // Use default credential chain (env vars, config files, etc.)
129            }
130            CredentialSource::Static {
131                access_key,
132                secret_key,
133            } => {
134                let creds = Credentials::new(access_key, secret_key, None, None, "alimentar");
135                config_loader = config_loader.credentials_provider(creds);
136            }
137            CredentialSource::Anonymous => {
138                let creds = Credentials::new("", "", None, None, "anonymous");
139                config_loader = config_loader.credentials_provider(creds);
140            }
141        }
142
143        let sdk_config = config_loader.load().await;
144
145        let mut s3_config = aws_sdk_s3::config::Builder::from(&sdk_config);
146
147        // Set custom endpoint for S3-compatible services
148        if let Some(endpoint_url) = endpoint {
149            s3_config = s3_config.endpoint_url(&endpoint_url).force_path_style(true);
150            // Required for MinIO and most S3-compatible services
151        }
152
153        Ok(Client::from_conf(s3_config.build()))
154    }
155
156    /// Returns the bucket name.
157    pub fn bucket(&self) -> &str {
158        &self.bucket
159    }
160
161    fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
162        self.runtime.block_on(future)
163    }
164}
165
166impl StorageBackend for S3Backend {
167    fn list(&self, prefix: &str) -> Result<Vec<String>> {
168        self.block_on(async {
169            let mut keys = Vec::new();
170            let mut continuation_token: Option<String> = None;
171
172            loop {
173                let mut request = self
174                    .client
175                    .list_objects_v2()
176                    .bucket(&self.bucket)
177                    .prefix(prefix);
178
179                if let Some(token) = continuation_token.take() {
180                    request = request.continuation_token(token);
181                }
182
183                let response = request
184                    .send()
185                    .await
186                    .map_err(|e| Error::storage(format!("S3 list error: {e}")))?;
187
188                if let Some(contents) = response.contents {
189                    for object in contents {
190                        if let Some(key) = object.key {
191                            keys.push(key);
192                        }
193                    }
194                }
195
196                if response.is_truncated.unwrap_or(false) {
197                    continuation_token = response.next_continuation_token;
198                } else {
199                    break;
200                }
201            }
202
203            Ok(keys)
204        })
205    }
206
207    fn get(&self, key: &str) -> Result<Bytes> {
208        self.block_on(async {
209            let response = self
210                .client
211                .get_object()
212                .bucket(&self.bucket)
213                .key(key)
214                .send()
215                .await
216                .map_err(|e| Error::storage(format!("S3 get error for key '{}': {}", key, e)))?;
217
218            let body = response
219                .body
220                .collect()
221                .await
222                .map_err(|e| Error::storage(format!("S3 body read error: {e}")))?;
223
224            Ok(body.into_bytes())
225        })
226    }
227
228    fn put(&self, key: &str, data: Bytes) -> Result<()> {
229        self.block_on(async {
230            self.client
231                .put_object()
232                .bucket(&self.bucket)
233                .key(key)
234                .body(ByteStream::from(data))
235                .send()
236                .await
237                .map_err(|e| Error::storage(format!("S3 put error for key '{}': {}", key, e)))?;
238
239            Ok(())
240        })
241    }
242
243    fn delete(&self, key: &str) -> Result<()> {
244        self.block_on(async {
245            self.client
246                .delete_object()
247                .bucket(&self.bucket)
248                .key(key)
249                .send()
250                .await
251                .map_err(|e| Error::storage(format!("S3 delete error for key '{}': {}", key, e)))?;
252
253            Ok(())
254        })
255    }
256
257    fn exists(&self, key: &str) -> Result<bool> {
258        self.block_on(async {
259            match self
260                .client
261                .head_object()
262                .bucket(&self.bucket)
263                .key(key)
264                .send()
265                .await
266            {
267                Ok(_) => Ok(true),
268                Err(e) => {
269                    // Check if it's a "not found" error
270                    let service_error = e.into_service_error();
271                    if service_error.is_not_found() {
272                        Ok(false)
273                    } else {
274                        Err(Error::storage(format!(
275                            "S3 exists error for key '{}': {}",
276                            key, service_error
277                        )))
278                    }
279                }
280            }
281        })
282    }
283
284    fn size(&self, key: &str) -> Result<u64> {
285        self.block_on(async {
286            let response = self
287                .client
288                .head_object()
289                .bucket(&self.bucket)
290                .key(key)
291                .send()
292                .await
293                .map_err(|e| Error::storage(format!("S3 head error for key '{}': {}", key, e)))?;
294
295            let size = response
296                .content_length
297                .and_then(|l| u64::try_from(l).ok())
298                .unwrap_or(0);
299            Ok(size)
300        })
301    }
302}
303
304// S3Backend is automatically Send + Sync because:
305// - Client is Send + Sync
306// - Arc<Runtime> is Send + Sync
307// - String is Send + Sync
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_credential_source_environment() {
315        let creds = CredentialSource::Environment;
316        assert!(matches!(creds, CredentialSource::Environment));
317    }
318
319    #[test]
320    fn test_credential_source_static() {
321        let creds = CredentialSource::Static {
322            access_key: "test".to_string(),
323            secret_key: "secret".to_string(),
324        };
325        assert!(matches!(creds, CredentialSource::Static { .. }));
326    }
327
328    #[test]
329    fn test_credential_source_anonymous() {
330        let creds = CredentialSource::Anonymous;
331        assert!(matches!(creds, CredentialSource::Anonymous));
332    }
333
334    // Integration tests require a running S3-compatible service
335    // See tests/s3_integration.rs for MinIO-based tests
336}