use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client as S3Client;
use super::BlobBackend;
use crate::protocol::{sha256_hex, BlobDescriptor};
#[derive(Debug, Clone)]
pub struct S3Config {
pub endpoint: Option<String>,
pub bucket: String,
pub region: String,
pub public_url: Option<String>,
}
pub struct S3Backend {
client: S3Client,
config: S3Config,
index: std::collections::HashMap<String, u64>,
}
impl S3Backend {
pub async fn new(config: S3Config) -> Result<Self, String> {
let mut aws_config_builder = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(aws_config::Region::new(config.region.clone()));
if let Some(ref endpoint) = config.endpoint {
aws_config_builder = aws_config_builder.endpoint_url(endpoint);
}
let aws_config = aws_config_builder.load().await;
let client = S3Client::new(&aws_config);
let mut backend = S3Backend {
client,
config,
index: std::collections::HashMap::new(),
};
backend.rebuild_index().await?;
Ok(backend)
}
async fn rebuild_index(&mut self) -> Result<(), String> {
let mut continuation_token: Option<String> = None;
loop {
let mut req = self.client.list_objects_v2().bucket(&self.config.bucket);
if let Some(ref token) = continuation_token {
req = req.continuation_token(token);
}
let resp = req
.send()
.await
.map_err(|e| format!("s3 list objects: {e}"))?;
for obj in resp.contents() {
if let Some(key) = obj.key() {
if let Some(hash) = key.strip_suffix(".blob") {
if hash.len() == 64 {
let size = obj.size.unwrap_or(0) as u64;
self.index.insert(hash.to_string(), size);
}
}
}
}
if resp.is_truncated() == Some(true) {
continuation_token = resp.next_continuation_token().map(|s| s.to_string());
} else {
break;
}
}
tracing::info!(
storage.backend = "s3",
storage.bucket = %self.config.bucket,
storage.existing_blobs = self.index.len(),
"initialized S3 blob storage"
);
Ok(())
}
fn object_key(sha256: &str) -> String {
format!("{}.blob", sha256)
}
fn blob_url(&self, sha256: &str, base_url: &str) -> String {
if let Some(ref cdn) = self.config.public_url {
format!("{}/{}", cdn.trim_end_matches('/'), sha256)
} else {
format!("{}/{}", base_url, sha256)
}
}
fn block_on<F: std::future::Future<Output = T>, T>(future: F) -> T {
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(future))
}
}
impl BlobBackend for S3Backend {
fn insert(&mut self, data: Vec<u8>, base_url: &str) -> BlobDescriptor {
let hash = sha256_hex(&data);
let size = data.len() as u64;
let key = Self::object_key(&hash);
let result = Self::block_on(async {
self.client
.put_object()
.bucket(&self.config.bucket)
.key(&key)
.content_type("application/octet-stream")
.body(ByteStream::from(data))
.send()
.await
});
if let Err(e) = result {
tracing::warn!(
storage.backend = "s3",
blob.sha256 = %hash,
error.message = %e,
"failed to upload blob to S3"
);
}
self.index.insert(hash.clone(), size);
let url = self.blob_url(&hash, base_url);
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
BlobDescriptor {
sha256: hash,
size,
content_type: Some("application/octet-stream".into()),
url: Some(url),
uploaded: Some(ts),
}
}
fn get(&self, sha256: &str) -> Option<Vec<u8>> {
let key = Self::object_key(sha256);
let result = Self::block_on(async {
self.client
.get_object()
.bucket(&self.config.bucket)
.key(&key)
.send()
.await
});
match result {
Ok(output) => {
let bytes = Self::block_on(async { output.body.collect().await });
match bytes {
Ok(b) => Some(b.into_bytes().to_vec()),
Err(e) => {
tracing::warn!(
storage.backend = "s3",
blob.sha256 = %sha256,
error.message = %e,
"failed to read S3 object body"
);
None
}
}
}
Err(_) => None,
}
}
fn exists(&self, sha256: &str) -> bool {
if self.index.contains_key(sha256) {
return true;
}
let key = Self::object_key(sha256);
let result = Self::block_on(async {
self.client
.head_object()
.bucket(&self.config.bucket)
.key(&key)
.send()
.await
});
result.is_ok()
}
fn delete(&mut self, sha256: &str) -> bool {
let existed = self.index.remove(sha256).is_some();
let key = Self::object_key(sha256);
let _ = Self::block_on(async {
self.client
.delete_object()
.bucket(&self.config.bucket)
.key(&key)
.send()
.await
});
existed
}
fn len(&self) -> usize {
self.index.len()
}
fn total_bytes(&self) -> u64 {
self.index.values().sum()
}
fn insert_stream(
&mut self,
reader: &mut dyn std::io::Read,
_size: u64,
base_url: &str,
) -> Result<BlobDescriptor, String> {
use crate::protocol::STREAM_CHUNK_SIZE;
use sha2::{Digest, Sha256};
use std::io::Write;
let tmp_path = std::env::temp_dir().join(format!(
"blossom_s3_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
let result = (|| -> Result<BlobDescriptor, String> {
let mut file =
std::fs::File::create(&tmp_path).map_err(|e| format!("create temp: {e}"))?;
let mut hasher = Sha256::new();
let mut buf = [0u8; STREAM_CHUNK_SIZE];
let mut total = 0u64;
loop {
let n = reader
.read(&mut buf)
.map_err(|e| format!("read stream: {e}"))?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
file.write_all(&buf[..n])
.map_err(|e| format!("write temp: {e}"))?;
total += n as u64;
}
file.flush().map_err(|e| format!("flush temp: {e}"))?;
drop(file);
let hash = hex::encode(hasher.finalize());
let key = Self::object_key(&hash);
let upload_result = Self::block_on(async {
let body = ByteStream::from_path(&tmp_path)
.await
.map_err(|e| format!("read temp for s3: {e}"))?;
self.client
.put_object()
.bucket(&self.config.bucket)
.key(&key)
.content_type("application/octet-stream")
.body(body)
.send()
.await
.map_err(|e| format!("s3 upload: {e}"))
});
upload_result?;
self.index.insert(hash.clone(), total);
let url = self.blob_url(&hash, base_url);
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Ok(BlobDescriptor {
sha256: hash,
size: total,
content_type: Some("application/octet-stream".into()),
url: Some(url),
uploaded: Some(ts),
})
})();
let _ = std::fs::remove_file(&tmp_path);
result
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_s3_config_creation() {
let config = S3Config {
endpoint: Some("http://localhost:9000".into()),
bucket: "test-blobs".into(),
region: "us-east-1".into(),
public_url: Some("https://cdn.example.com".into()),
};
assert_eq!(config.bucket, "test-blobs");
assert!(config.public_url.is_some());
}
#[test]
fn test_object_key_format() {
let key = S3Backend::object_key("abc123");
assert_eq!(key, "abc123.blob");
}
}