use std::time::Duration;
use aws_sdk_s3::{Client, primitives::ByteStream};
use fraiseql_error::{FileError, FraiseQLError, Result};
use super::validate_key;
#[cfg(test)]
mod tests;
pub struct S3Backend {
client: Client,
bucket: String,
}
impl S3Backend {
pub async fn new(bucket: &str, region: Option<&str>, endpoint: Option<&str>) -> Self {
let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
if let Some(r) = region {
config_loader = config_loader.region(aws_config::Region::new(r.to_owned()));
}
let config = config_loader.load().await;
let client = if let Some(ep) = endpoint {
let s3_config = aws_sdk_s3::config::Builder::from(&config)
.endpoint_url(ep)
.force_path_style(true)
.build();
Client::from_conf(s3_config)
} else {
Client::new(&config)
};
Self {
client,
bucket: bucket.to_owned(),
}
}
}
fn storage_err_src(op: &str, err: impl std::error::Error + Send + Sync + 'static) -> FraiseQLError {
let message = format!("S3 {op} failed: {err}");
FraiseQLError::File(FileError::Backend {
message,
source: Some(Box::new(err)),
})
}
impl S3Backend {
pub async fn upload(&self, key: &str, data: &[u8], content_type: &str) -> Result<String> {
validate_key(key)?;
self.client
.put_object()
.bucket(&self.bucket)
.key(key)
.body(ByteStream::from(data.to_vec()))
.content_type(content_type)
.send()
.await
.map_err(|e| storage_err_src("put_object", e))?;
Ok(key.to_owned())
}
pub async fn download(&self, key: &str) -> Result<Vec<u8>> {
validate_key(key)?;
let resp =
self.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.map_err(|e| {
let msg = e.to_string();
if msg.contains("NoSuchKey") || msg.contains("404") {
FraiseQLError::File(FileError::NotFound {
id: key.to_string(),
})
} else {
storage_err_src("get_object", e)
}
})?;
let body = resp.body.collect().await.map_err(|e| storage_err_src("get_object body", e))?;
Ok(body.into_bytes().to_vec())
}
pub async fn delete(&self, key: &str) -> Result<()> {
validate_key(key)?;
self.client
.delete_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.map_err(|e| storage_err_src("delete_object", e))?;
Ok(())
}
pub async fn exists(&self, key: &str) -> Result<bool> {
validate_key(key)?;
match self.client.head_object().bucket(&self.bucket).key(key).send().await {
Ok(_) => Ok(true),
Err(err) => {
let msg = err.to_string();
if msg.contains("NotFound") || msg.contains("NoSuchKey") || msg.contains("404") {
Ok(false)
} else {
Err(storage_err_src("head_object", err))
}
},
}
}
pub async fn presigned_url(&self, key: &str, expiry: Duration) -> Result<String> {
validate_key(key)?;
let presigning_config = aws_sdk_s3::presigning::PresigningConfig::expires_in(expiry)
.map_err(|e| storage_err_src("presigning config", e))?;
let presigned = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.presigned(presigning_config)
.await
.map_err(|e| storage_err_src("presigned URL", e))?;
Ok(presigned.uri().to_string())
}
pub async fn list(
&self,
prefix: &str,
cursor: Option<&str>,
limit: usize,
) -> Result<super::types::ListResult> {
let mut objects = Vec::new();
let continuation_token = cursor.map(|s| s.to_string());
let resp = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(prefix)
.max_keys(i32::try_from(limit).unwrap_or(i32::MAX))
.set_continuation_token(continuation_token)
.send()
.await
.map_err(|e| storage_err_src("list_objects_v2", e))?;
for obj in resp.contents() {
let key = obj.key().unwrap_or("").to_string();
#[allow(clippy::cast_sign_loss)]
let size = obj.size().unwrap_or(0) as u64;
let etag = obj.e_tag().unwrap_or("").to_string();
let last_modified = obj
.last_modified()
.map_or_else(|| chrono::Utc::now().to_rfc3339(), |dt| dt.to_string());
objects.push(super::types::ObjectInfo {
key,
size,
content_type: "application/octet-stream".to_string(),
etag,
last_modified,
});
}
let next_cursor =
resp.next_continuation_token().filter(|t| !t.is_empty()).map(|t| t.to_string());
Ok(super::types::ListResult {
objects,
next_cursor,
})
}
}
impl super::PresignCapable for S3Backend {
async fn presign_put(
&self,
key: &str,
content_type: &str,
expires_in: Duration,
) -> Result<super::PresignedUrl> {
validate_key(key)?;
let presigning_config = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in)
.map_err(|e| storage_err_src("presigning config", e))?;
let presigned = self
.client
.put_object()
.bucket(&self.bucket)
.key(key)
.content_type(content_type)
.presigned(presigning_config)
.await
.map_err(|e| storage_err_src("presigned PUT URL", e))?;
let expires_at = chrono::Utc::now()
+ chrono::Duration::from_std(expires_in)
.map_err(|e| storage_err_src("duration conversion", e))?;
Ok(super::PresignedUrl::new(presigned.uri().to_string(), expires_at, "PUT"))
}
async fn presign_get(&self, key: &str, expires_in: Duration) -> Result<super::PresignedUrl> {
validate_key(key)?;
let presigning_config = aws_sdk_s3::presigning::PresigningConfig::expires_in(expires_in)
.map_err(|e| storage_err_src("presigning config", e))?;
let presigned = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.presigned(presigning_config)
.await
.map_err(|e| storage_err_src("presigned GET URL", e))?;
let expires_at = chrono::Utc::now()
+ chrono::Duration::from_std(expires_in)
.map_err(|e| storage_err_src("duration conversion", e))?;
Ok(super::PresignedUrl::new(presigned.uri().to_string(), expires_at, "GET"))
}
}