1use async_trait::async_trait;
11use bytes::Bytes;
12use futures::TryStreamExt;
13use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
14use object_store::path::Path as ObjectPath;
15use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
16use tracing::{debug, instrument};
17
18use apiary_core::error::ApiaryError;
19use apiary_core::storage::StorageBackend;
20use apiary_core::Result;
21
22pub struct S3Backend {
28 store: Box<dyn ObjectStore>,
29 prefix: String,
30}
31
32impl S3Backend {
33 pub fn new(uri: &str) -> Result<Self> {
42 let (bucket, prefix) = parse_s3_uri(uri)?;
43
44 let mut builder = AmazonS3Builder::from_env()
45 .with_bucket_name(&bucket)
46 .with_conditional_put(S3ConditionalPut::ETagMatch);
49
50 if let Some(region) = extract_query_param(uri, "region") {
52 builder = builder.with_region(®ion);
53 }
54
55 if let Some(endpoint) = extract_query_param(uri, "endpoint") {
57 builder = builder.with_endpoint(&endpoint).with_allow_http(true);
58 }
59
60 if let Ok(env_endpoint) = std::env::var("AWS_ENDPOINT_URL") {
64 if env_endpoint.starts_with("http://") {
65 builder = builder.with_allow_http(true);
66 }
67 }
68
69 let store = builder.build().map_err(|e| {
70 ApiaryError::storage(format!("Failed to create S3 client for {uri}"), e)
71 })?;
72
73 debug!(bucket = %bucket, prefix = %prefix, "S3Backend initialised");
74
75 Ok(Self {
76 store: Box::new(store),
77 prefix,
78 })
79 }
80
81 fn full_path(&self, key: &str) -> ObjectPath {
83 if self.prefix.is_empty() {
84 ObjectPath::from(key)
85 } else {
86 ObjectPath::from(format!("{}/{}", self.prefix, key))
87 }
88 }
89}
90
91#[async_trait]
92impl StorageBackend for S3Backend {
93 #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
94 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
95 let path = self.full_path(key);
96 self.store
97 .put(&path, PutPayload::from(data))
98 .await
99 .map_err(|e| ApiaryError::storage(format!("S3 put failed for {key}"), e))?;
100 Ok(())
101 }
102
103 #[instrument(skip(self), fields(key = %key))]
104 async fn get(&self, key: &str) -> Result<Bytes> {
105 let path = self.full_path(key);
106 let result = self.store.get(&path).await.map_err(|e| match e {
107 object_store::Error::NotFound { .. } => ApiaryError::NotFound {
108 key: key.to_string(),
109 },
110 other => ApiaryError::storage(format!("S3 get failed for {key}"), other),
111 })?;
112 let bytes = result
113 .bytes()
114 .await
115 .map_err(|e| ApiaryError::storage(format!("S3 get bytes failed for {key}"), e))?;
116 Ok(bytes)
117 }
118
119 #[instrument(skip(self), fields(prefix = %prefix))]
120 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
121 let full_prefix = if self.prefix.is_empty() {
122 ObjectPath::from(prefix)
123 } else {
124 ObjectPath::from(format!("{}/{}", self.prefix, prefix))
125 };
126
127 let mut results = Vec::new();
128 let mut stream = self.store.list(Some(&full_prefix));
129
130 while let Some(meta) = stream
131 .try_next()
132 .await
133 .map_err(|e| ApiaryError::storage(format!("S3 list failed for prefix {prefix}"), e))?
134 {
135 let full_key = meta.location.to_string();
136 let key = if self.prefix.is_empty() {
138 full_key
139 } else {
140 full_key
141 .strip_prefix(&format!("{}/", self.prefix))
142 .unwrap_or(&full_key)
143 .to_string()
144 };
145 results.push(key);
146 }
147
148 results.sort();
149 Ok(results)
150 }
151
152 #[instrument(skip(self), fields(key = %key))]
153 async fn delete(&self, key: &str) -> Result<()> {
154 let path = self.full_path(key);
155 self.store
157 .delete(&path)
158 .await
159 .map_err(|e| ApiaryError::storage(format!("S3 delete failed for {key}"), e))?;
160 Ok(())
161 }
162
163 #[instrument(skip(self, data), fields(key = %key, size = data.len()))]
164 async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
165 let path = self.full_path(key);
166 let opts = PutOptions {
167 mode: PutMode::Create,
168 ..Default::default()
169 };
170 match self
171 .store
172 .put_opts(&path, PutPayload::from(data), opts)
173 .await
174 {
175 Ok(_) => Ok(true),
176 Err(object_store::Error::AlreadyExists { .. }) => Ok(false),
177 Err(object_store::Error::Precondition { .. }) => Ok(false),
179 Err(e) => Err(ApiaryError::storage(
180 format!("S3 conditional put failed for {key}"),
181 e,
182 )),
183 }
184 }
185
186 #[instrument(skip(self), fields(key = %key))]
187 async fn exists(&self, key: &str) -> Result<bool> {
188 let path = self.full_path(key);
189 match self.store.head(&path).await {
190 Ok(_) => Ok(true),
191 Err(object_store::Error::NotFound { .. }) => Ok(false),
192 Err(e) => Err(ApiaryError::storage(format!("S3 head failed for {key}"), e)),
193 }
194 }
195}
196
197fn parse_s3_uri(uri: &str) -> Result<(String, String)> {
202 let stripped = uri
203 .strip_prefix("s3://")
204 .ok_or_else(|| ApiaryError::Config {
205 message: format!("S3 URI must start with 's3://': {uri}"),
206 })?;
207
208 let path_part = stripped.split('?').next().unwrap_or(stripped);
210
211 let mut parts = path_part.splitn(2, '/');
212 let bucket = parts.next().unwrap_or("").to_string();
213 let prefix = parts.next().unwrap_or("").to_string();
214
215 if bucket.is_empty() {
216 return Err(ApiaryError::Config {
217 message: format!("S3 URI must include a bucket name: {uri}"),
218 });
219 }
220
221 Ok((bucket, prefix))
222}
223
224fn extract_query_param(uri: &str, param: &str) -> Option<String> {
226 let query = uri.split('?').nth(1)?;
227 for pair in query.split('&') {
228 let mut kv = pair.splitn(2, '=');
229 if kv.next()? == param {
230 return kv.next().map(|v| v.to_string());
231 }
232 }
233 None
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn test_parse_s3_uri_with_prefix() {
242 let (bucket, prefix) = parse_s3_uri("s3://my-bucket/apiary/data").unwrap();
243 assert_eq!(bucket, "my-bucket");
244 assert_eq!(prefix, "apiary/data");
245 }
246
247 #[test]
248 fn test_parse_s3_uri_no_prefix() {
249 let (bucket, prefix) = parse_s3_uri("s3://my-bucket").unwrap();
250 assert_eq!(bucket, "my-bucket");
251 assert_eq!(prefix, "");
252 }
253
254 #[test]
255 fn test_parse_s3_uri_with_query() {
256 let (bucket, prefix) = parse_s3_uri("s3://my-bucket/prefix?region=eu-west-1").unwrap();
257 assert_eq!(bucket, "my-bucket");
258 assert_eq!(prefix, "prefix");
259 }
260
261 #[test]
262 fn test_parse_s3_uri_invalid() {
263 assert!(parse_s3_uri("http://example.com").is_err());
264 assert!(parse_s3_uri("s3://").is_err());
265 }
266
267 #[test]
268 fn test_extract_query_param() {
269 assert_eq!(
270 extract_query_param("s3://bucket?region=us-east-1", "region"),
271 Some("us-east-1".to_string())
272 );
273 assert_eq!(
274 extract_query_param(
275 "s3://bucket?region=us-east-1&endpoint=http://minio:9000",
276 "endpoint"
277 ),
278 Some("http://minio:9000".to_string())
279 );
280 assert_eq!(
281 extract_query_param("s3://bucket?region=us-east-1", "missing"),
282 None
283 );
284 assert_eq!(extract_query_param("s3://bucket", "region"), None);
285 }
286}