Skip to main content

apiary_storage/
s3.rs

1//! S3-compatible object storage backend.
2//!
3//! [`S3Backend`] implements the [`StorageBackend`] trait using the `object_store`
4//! crate, supporting any S3-compatible endpoint: AWS S3, MinIO, GCS (via S3
5//! compatibility), Ceph, etc.
6//!
7//! Conditional writes use `put_opts` with `IfNotExists` mode, which maps to
8//! the `If-None-Match: *` HTTP header (available on S3 since 2024).
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::TryStreamExt;
13use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
14use object_store::path::Path as ObjectPath;
15use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
16use tracing::{debug, instrument};
17
18use apiary_core::error::ApiaryError;
19use apiary_core::storage::StorageBackend;
20use apiary_core::Result;
21
22/// A [`StorageBackend`] backed by any S3-compatible object storage.
23///
24/// Configured from a URI like `s3://bucket/prefix?region=eu-west-1`.
25/// Uses the `object_store` crate for S3 operations with built-in
26/// retry logic and connection pooling.
27pub struct S3Backend {
28    store: Box<dyn ObjectStore>,
29    prefix: String,
30}
31
32impl S3Backend {
33    /// Create a new `S3Backend` from an S3 URI.
34    ///
35    /// # URI Format
36    ///
37    /// `s3://bucket/prefix?region=us-east-1`
38    ///
39    /// Environment variables `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`,
40    /// and `AWS_REGION` are used as fallbacks for credentials and region.
41    pub fn new(uri: &str) -> Result<Self> {
42        let (bucket, prefix) = parse_s3_uri(uri)?;
43
44        let mut builder = AmazonS3Builder::from_env()
45            .with_bucket_name(&bucket)
46            // Enable conditional put so PutMode::Create works on all
47            // S3-compatible stores (MinIO, Ceph, etc.).
48            .with_conditional_put(S3ConditionalPut::ETagMatch);
49
50        // Extract region from query params if present
51        if let Some(region) = extract_query_param(uri, "region") {
52            builder = builder.with_region(&region);
53        }
54
55        // Extract endpoint for MinIO / custom S3-compatible services
56        if let Some(endpoint) = extract_query_param(uri, "endpoint") {
57            builder = builder.with_endpoint(&endpoint).with_allow_http(true);
58        }
59
60        // When the endpoint comes from the AWS_ENDPOINT_URL env var instead
61        // of a query parameter, we still need allow_http for plain-HTTP
62        // endpoints (common with MinIO and other local S3 replacements).
63        if let Ok(env_endpoint) = std::env::var("AWS_ENDPOINT_URL") {
64            if env_endpoint.starts_with("http://") {
65                builder = builder.with_allow_http(true);
66            }
67        }
68
69        let store = builder.build().map_err(|e| {
70            ApiaryError::storage(format!("Failed to create S3 client for {uri}"), e)
71        })?;
72
73        debug!(bucket = %bucket, prefix = %prefix, "S3Backend initialised");
74
75        Ok(Self {
76            store: Box::new(store),
77            prefix,
78        })
79    }
80
81    /// Build the full object path from a key.
82    fn full_path(&self, key: &str) -> ObjectPath {
83        if self.prefix.is_empty() {
84            ObjectPath::from(key)
85        } else {
86            ObjectPath::from(format!("{}/{}", self.prefix, key))
87        }
88    }
89}
90
91#[async_trait]
92impl StorageBackend for S3Backend {
93    #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
94    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
95        let path = self.full_path(key);
96        self.store
97            .put(&path, PutPayload::from(data))
98            .await
99            .map_err(|e| ApiaryError::storage(format!("S3 put failed for {key}"), e))?;
100        Ok(())
101    }
102
103    #[instrument(skip(self), fields(key = %key))]
104    async fn get(&self, key: &str) -> Result<Bytes> {
105        let path = self.full_path(key);
106        let result = self.store.get(&path).await.map_err(|e| match e {
107            object_store::Error::NotFound { .. } => ApiaryError::NotFound {
108                key: key.to_string(),
109            },
110            other => ApiaryError::storage(format!("S3 get failed for {key}"), other),
111        })?;
112        let bytes = result
113            .bytes()
114            .await
115            .map_err(|e| ApiaryError::storage(format!("S3 get bytes failed for {key}"), e))?;
116        Ok(bytes)
117    }
118
119    #[instrument(skip(self), fields(prefix = %prefix))]
120    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
121        let full_prefix = if self.prefix.is_empty() {
122            ObjectPath::from(prefix)
123        } else {
124            ObjectPath::from(format!("{}/{}", self.prefix, prefix))
125        };
126
127        let mut results = Vec::new();
128        let mut stream = self.store.list(Some(&full_prefix));
129
130        while let Some(meta) = stream
131            .try_next()
132            .await
133            .map_err(|e| ApiaryError::storage(format!("S3 list failed for prefix {prefix}"), e))?
134        {
135            let full_key = meta.location.to_string();
136            // Strip the backend prefix to return keys relative to the storage root
137            let key = if self.prefix.is_empty() {
138                full_key
139            } else {
140                full_key
141                    .strip_prefix(&format!("{}/", self.prefix))
142                    .unwrap_or(&full_key)
143                    .to_string()
144            };
145            results.push(key);
146        }
147
148        results.sort();
149        Ok(results)
150    }
151
152    #[instrument(skip(self), fields(key = %key))]
153    async fn delete(&self, key: &str) -> Result<()> {
154        let path = self.full_path(key);
155        // S3 delete is idempotent — does not error if key is missing
156        self.store
157            .delete(&path)
158            .await
159            .map_err(|e| ApiaryError::storage(format!("S3 delete failed for {key}"), e))?;
160        Ok(())
161    }
162
163    #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
164    async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
165        let path = self.full_path(key);
166        let opts = PutOptions {
167            mode: PutMode::Create,
168            ..Default::default()
169        };
170        match self
171            .store
172            .put_opts(&path, PutPayload::from(data), opts)
173            .await
174        {
175            Ok(_) => Ok(true),
176            Err(object_store::Error::AlreadyExists { .. }) => Ok(false),
177            // Some S3-compatible stores return Precondition instead of AlreadyExists
178            Err(object_store::Error::Precondition { .. }) => Ok(false),
179            Err(e) => Err(ApiaryError::storage(
180                format!("S3 conditional put failed for {key}"),
181                e,
182            )),
183        }
184    }
185
186    #[instrument(skip(self), fields(key = %key))]
187    async fn exists(&self, key: &str) -> Result<bool> {
188        let path = self.full_path(key);
189        match self.store.head(&path).await {
190            Ok(_) => Ok(true),
191            Err(object_store::Error::NotFound { .. }) => Ok(false),
192            Err(e) => Err(ApiaryError::storage(format!("S3 head failed for {key}"), e)),
193        }
194    }
195}
196
197/// Parse an S3 URI into (bucket, prefix).
198///
199/// `s3://bucket/prefix/path` → `("bucket", "prefix/path")`
200/// `s3://bucket` → `("bucket", "")`
201fn parse_s3_uri(uri: &str) -> Result<(String, String)> {
202    let stripped = uri
203        .strip_prefix("s3://")
204        .ok_or_else(|| ApiaryError::Config {
205            message: format!("S3 URI must start with 's3://': {uri}"),
206        })?;
207
208    // Remove query string before parsing path
209    let path_part = stripped.split('?').next().unwrap_or(stripped);
210
211    let mut parts = path_part.splitn(2, '/');
212    let bucket = parts.next().unwrap_or("").to_string();
213    let prefix = parts.next().unwrap_or("").to_string();
214
215    if bucket.is_empty() {
216        return Err(ApiaryError::Config {
217            message: format!("S3 URI must include a bucket name: {uri}"),
218        });
219    }
220
221    Ok((bucket, prefix))
222}
223
224/// Extract a query parameter value from a URI.
225fn extract_query_param(uri: &str, param: &str) -> Option<String> {
226    let query = uri.split('?').nth(1)?;
227    for pair in query.split('&') {
228        let mut kv = pair.splitn(2, '=');
229        if kv.next()? == param {
230            return kv.next().map(|v| v.to_string());
231        }
232    }
233    None
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn test_parse_s3_uri_with_prefix() {
242        let (bucket, prefix) = parse_s3_uri("s3://my-bucket/apiary/data").unwrap();
243        assert_eq!(bucket, "my-bucket");
244        assert_eq!(prefix, "apiary/data");
245    }
246
247    #[test]
248    fn test_parse_s3_uri_no_prefix() {
249        let (bucket, prefix) = parse_s3_uri("s3://my-bucket").unwrap();
250        assert_eq!(bucket, "my-bucket");
251        assert_eq!(prefix, "");
252    }
253
254    #[test]
255    fn test_parse_s3_uri_with_query() {
256        let (bucket, prefix) = parse_s3_uri("s3://my-bucket/prefix?region=eu-west-1").unwrap();
257        assert_eq!(bucket, "my-bucket");
258        assert_eq!(prefix, "prefix");
259    }
260
261    #[test]
262    fn test_parse_s3_uri_invalid() {
263        assert!(parse_s3_uri("http://example.com").is_err());
264        assert!(parse_s3_uri("s3://").is_err());
265    }
266
267    #[test]
268    fn test_extract_query_param() {
269        assert_eq!(
270            extract_query_param("s3://bucket?region=us-east-1", "region"),
271            Some("us-east-1".to_string())
272        );
273        assert_eq!(
274            extract_query_param(
275                "s3://bucket?region=us-east-1&endpoint=http://minio:9000",
276                "endpoint"
277            ),
278            Some("http://minio:9000".to_string())
279        );
280        assert_eq!(
281            extract_query_param("s3://bucket?region=us-east-1", "missing"),
282            None
283        );
284        assert_eq!(extract_query_param("s3://bucket", "region"), None);
285    }
286}