use crate::StorageBackend;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use aws_sdk_s3::Client;
use bytes::Bytes;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, warn};
#[derive(Clone, Debug)]
pub struct MinIOConfig {
pub endpoint: String,
pub bucket: String,
pub access_key: String,
pub secret_key: String,
pub path_style: bool,
pub part_size: u64,
pub max_concurrent_parts: usize,
pub max_retries: u32,
pub initial_retry_delay_ms: u64,
}
impl Default for MinIOConfig {
fn default() -> Self {
MinIOConfig {
endpoint: String::new(),
bucket: String::new(),
access_key: String::new(),
secret_key: String::new(),
path_style: true,
part_size: 100 * 1024 * 1024, max_concurrent_parts: 8,
max_retries: 3,
initial_retry_delay_ms: 100,
}
}
}
#[derive(Debug)]
struct MinIOStats {
total_bytes_uploaded: AtomicU64,
total_bytes_downloaded: AtomicU64,
total_objects_deleted: AtomicU64,
}
impl MinIOStats {
fn new() -> Self {
MinIOStats {
total_bytes_uploaded: AtomicU64::new(0),
total_bytes_downloaded: AtomicU64::new(0),
total_objects_deleted: AtomicU64::new(0),
}
}
}
#[derive(Clone)]
pub struct MinIOBackend {
client: Client,
config: Arc<MinIOConfig>,
stats: Arc<MinIOStats>,
endpoint: String,
bucket: String,
_access_key: String,
_secret_key: String,
}
impl MinIOBackend {
pub async fn new(
endpoint: &str,
bucket: &str,
access_key: &str,
secret_key: &str,
) -> anyhow::Result<Self> {
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
return Err(anyhow::anyhow!(
"Invalid endpoint: must start with http:// or https://"
));
}
let endpoint = endpoint.trim_end_matches('/').to_string();
if bucket.is_empty() {
return Err(anyhow::anyhow!("bucket name cannot be empty"));
}
if bucket.len() > 63 {
return Err(anyhow::anyhow!("bucket name must be 63 characters or less"));
}
if !bucket
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
{
return Err(anyhow::anyhow!(
"bucket name must contain only lowercase letters, numbers, and hyphens"
));
}
if bucket.starts_with('-') || bucket.ends_with('-') {
return Err(anyhow::anyhow!(
"bucket name cannot start or end with a hyphen"
));
}
if access_key.is_empty() {
return Err(anyhow::anyhow!("access key cannot be empty"));
}
if secret_key.is_empty() {
return Err(anyhow::anyhow!("secret key cannot be empty"));
}
let config = MinIOConfig {
endpoint: endpoint.clone(),
bucket: bucket.to_string(),
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
..Default::default()
};
Self::with_config(config).await
}
pub async fn with_config(config: MinIOConfig) -> Result<Self> {
debug!(
"Initializing MinIO backend: endpoint={}, bucket={}, path_style={}",
config.endpoint, config.bucket, config.path_style
);
let credentials = aws_sdk_s3::config::Credentials::new(
config.access_key.clone(),
config.secret_key.clone(),
None,
None,
"MinIOBackend",
);
let s3_config = aws_sdk_s3::config::Builder::new()
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.endpoint_url(&config.endpoint)
.credentials_provider(credentials)
.force_path_style(config.path_style)
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.build();
let client = Client::from_conf(s3_config);
match client.create_bucket().bucket(&config.bucket).send().await {
Ok(_) => {
debug!("MinIO bucket '{}' created successfully", config.bucket);
}
Err(e) => {
let already_exists = e
.as_service_error()
.map(|se| se.is_bucket_already_owned_by_you() || se.is_bucket_already_exists())
.unwrap_or(false);
if already_exists {
debug!(
"MinIO bucket '{}' already exists, continuing",
config.bucket
);
} else {
return Err(e).context(format!(
"Failed to access or create MinIO bucket: {}",
config.bucket
));
}
}
}
debug!("Successfully connected to MinIO bucket: {}", config.bucket);
Ok(MinIOBackend {
client,
config: Arc::new(config.clone()),
stats: Arc::new(MinIOStats::new()),
endpoint: config.endpoint,
bucket: config.bucket,
_access_key: config.access_key,
_secret_key: config.secret_key,
})
}
pub fn stats(&self) -> (u64, u64, u64) {
(
self.stats.total_bytes_uploaded.load(Ordering::Relaxed),
self.stats.total_bytes_downloaded.load(Ordering::Relaxed),
self.stats.total_objects_deleted.load(Ordering::Relaxed),
)
}
fn validate_key(key: &str) -> Result<()> {
if key.is_empty() {
return Err(anyhow!("key cannot be empty"));
}
if key.starts_with('/') {
return Err(anyhow!("key cannot start with '/'"));
}
Ok(())
}
async fn with_retry<F, T>(&self, mut operation: F) -> Result<T>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>,
{
let mut retry_count = 0;
let mut delay_ms = self.config.initial_retry_delay_ms;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
retry_count += 1;
if retry_count >= self.config.max_retries {
return Err(e)
.context(format!("Failed after {} retries", self.config.max_retries));
}
warn!(
"Operation failed (attempt {}/{}), retrying in {}ms: {}",
retry_count, self.config.max_retries, delay_ms, e
);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(10000); }
}
}
}
pub async fn from_env() -> anyhow::Result<Self> {
let endpoint = std::env::var("MINIO_ENDPOINT")
.map_err(|_| anyhow::anyhow!("MINIO_ENDPOINT environment variable not set"))?;
let bucket = std::env::var("MINIO_BUCKET")
.map_err(|_| anyhow::anyhow!("MINIO_BUCKET environment variable not set"))?;
let access_key = std::env::var("MINIO_ACCESS_KEY")
.map_err(|_| anyhow::anyhow!("MINIO_ACCESS_KEY environment variable not set"))?;
let secret_key = std::env::var("MINIO_SECRET_KEY")
.map_err(|_| anyhow::anyhow!("MINIO_SECRET_KEY environment variable not set"))?;
Self::new(&endpoint, &bucket, &access_key, &secret_key).await
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn bucket(&self) -> &str {
&self.bucket
}
async fn put_simple(&self, key: &str, data: &[u8]) -> Result<()> {
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let key_clone = key.to_string();
let stats = self.stats.clone();
let body = Bytes::copy_from_slice(data);
self.with_retry(|| {
let client = client.clone();
let bucket = bucket.clone();
let key = key_clone.clone();
let stats = stats.clone();
let body = body.clone();
Box::pin(async move {
debug!(
"Putting object to MinIO (simple): {} ({} bytes)",
key,
body.len()
);
client
.put_object()
.bucket(&bucket)
.key(&key)
.body(body.clone().into())
.send()
.await
.map_err(|e| anyhow!("Failed to put object: {}", e))?;
stats
.total_bytes_uploaded
.fetch_add(body.len() as u64, Ordering::Relaxed);
Ok(())
})
})
.await
}
async fn put_multipart(&self, key: &str, data: &[u8]) -> Result<()> {
debug!(
"Putting large object to MinIO (multipart): {} ({} bytes)",
key,
data.len()
);
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let key_clone = key.to_string();
let multipart = client
.create_multipart_upload()
.bucket(&bucket)
.key(&key_clone)
.send()
.await
.map_err(|e| anyhow!("Failed to initiate multipart upload: {}", e))?;
let upload_id = multipart
.upload_id()
.ok_or_else(|| anyhow!("No upload ID returned from MinIO"))?
.to_string();
debug!(
"Initiated multipart upload for {}: {}",
key_clone, upload_id
);
let mut part_handles = vec![];
let part_size = self.config.part_size as usize;
let mut part_number = 1;
for chunk in data.chunks(part_size) {
let client = client.clone();
let bucket = bucket.clone();
let key = key_clone.clone();
let upload_id = upload_id.clone();
let stats = self.stats.clone();
let chunk_data = chunk.to_vec();
let part_num = part_number;
let handle = tokio::spawn(async move {
debug!(
"Uploading part {} ({} bytes) for key: {}",
part_num,
chunk_data.len(),
key
);
let response = client
.upload_part()
.bucket(&bucket)
.key(&key)
.upload_id(&upload_id)
.part_number(part_num)
.body(Bytes::from(chunk_data.clone()).into())
.send()
.await
.map_err(|e| anyhow!("Failed to upload part {}: {}", part_num, e))?;
let etag = response
.e_tag()
.ok_or_else(|| anyhow!("No ETag returned for part {}", part_num))?
.to_string();
stats
.total_bytes_uploaded
.fetch_add(chunk_data.len() as u64, Ordering::Relaxed);
Ok::<_, anyhow::Error>((part_num, etag))
});
part_handles.push(handle);
if part_handles.len() >= self.config.max_concurrent_parts {
if let Some(handle) = part_handles.pop() {
let _ = handle.await??;
}
}
part_number += 1;
}
let mut parts = vec![];
for handle in part_handles {
let (part_num, etag) = handle.await??;
parts.push((part_num, etag));
}
parts.sort_by_key(|p| p.0);
let part_list: Vec<_> = parts
.into_iter()
.map(|(part_num, etag)| {
aws_sdk_s3::types::CompletedPart::builder()
.part_number(part_num)
.e_tag(etag)
.build()
})
.collect();
client
.complete_multipart_upload()
.bucket(&bucket)
.key(&key_clone)
.upload_id(&upload_id)
.multipart_upload(
aws_sdk_s3::types::CompletedMultipartUpload::builder()
.set_parts(Some(part_list))
.build(),
)
.send()
.await
.map_err(|e| anyhow!("Failed to complete multipart upload: {}", e))?;
debug!("Successfully completed multipart upload for {}", key_clone);
Ok(())
}
}
impl fmt::Debug for MinIOBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MinIOBackend")
.field("endpoint", &self.endpoint)
.field("bucket", &self.bucket)
.field("access_key", &"***")
.field("secret_key", &"***")
.finish()
}
}
#[async_trait]
impl StorageBackend for MinIOBackend {
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
Self::validate_key(key)?;
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let key_clone = key.to_string();
let stats = self.stats.clone();
self.with_retry(|| {
let client = client.clone();
let bucket = bucket.clone();
let key = key_clone.clone();
let stats = stats.clone();
Box::pin(async move {
debug!("Getting object from MinIO: {}", key);
let response = client
.get_object()
.bucket(&bucket)
.key(&key)
.send()
.await
.map_err(|e| anyhow!("Failed to get object: {}", e))?;
let body = response
.body
.collect()
.await
.map_err(|e| anyhow!("Failed to read object body: {}", e))?;
let data = body.into_bytes().to_vec();
stats
.total_bytes_downloaded
.fetch_add(data.len() as u64, Ordering::Relaxed);
Ok(data)
})
})
.await
}
async fn put(&self, key: &str, data: &[u8]) -> anyhow::Result<()> {
Self::validate_key(key)?;
if data.len() as u64 <= self.config.part_size {
return self.put_simple(key, data).await;
}
self.put_multipart(key, data).await
}
async fn exists(&self, key: &str) -> anyhow::Result<bool> {
Self::validate_key(key)?;
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let key_clone = key.to_string();
self.with_retry(|| {
let client = client.clone();
let bucket = bucket.clone();
let key = key_clone.clone();
Box::pin(async move {
debug!("Checking if object exists in MinIO: {}", key);
match client.head_object().bucket(&bucket).key(&key).send().await {
Ok(_) => {
debug!("Object exists: {}", key);
Ok(true)
}
Err(e) => {
let error_message = e.to_string().to_lowercase();
if error_message.contains("404")
|| error_message.contains("not found")
|| error_message.contains("notfound")
|| error_message.contains("nosuchkey")
|| error_message.contains("does not exist")
|| error_message.contains("no such key")
|| (error_message.contains("service error") && error_message.len() < 50)
{
debug!("Object does not exist: {}", key);
Ok(false)
} else {
Err(anyhow!("Failed to check object existence: {}", e))
}
}
}
})
})
.await
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
Self::validate_key(key)?;
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let key_clone = key.to_string();
let stats = self.stats.clone();
self.with_retry(|| {
let client = client.clone();
let bucket = bucket.clone();
let key = key_clone.clone();
let stats = stats.clone();
Box::pin(async move {
debug!("Deleting object from MinIO: {}", key);
client
.delete_object()
.bucket(&bucket)
.key(&key)
.send()
.await
.map_err(|e| anyhow!("Failed to delete object: {}", e))?;
stats.total_objects_deleted.fetch_add(1, Ordering::Relaxed);
Ok(())
})
})
.await
}
async fn list_objects(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
let client = self.client.clone();
let bucket = self.config.bucket.clone();
let prefix_clone = prefix.to_string();
self.with_retry(|| {
let client = client.clone();
let bucket = bucket.clone();
let prefix = prefix_clone.clone();
Box::pin(async move {
debug!("Listing objects in MinIO with prefix: '{}'", prefix);
let mut result = vec![];
let mut continuation_token: Option<String> = None;
loop {
let mut request = client.list_objects_v2().bucket(&bucket);
if !prefix.is_empty() {
request = request.prefix(&prefix);
}
if let Some(token) = continuation_token {
request = request.continuation_token(token);
}
let response = request
.send()
.await
.map_err(|e| anyhow!("Failed to list objects: {}", e))?;
for obj in response.contents() {
if let Some(key) = obj.key() {
result.push(key.to_string());
}
}
if response.is_truncated() == Some(true) {
continuation_token =
response.next_continuation_token().map(|t| t.to_string());
} else {
break;
}
}
result.sort();
debug!("Found {} objects with prefix: '{}'", result.len(), prefix);
Ok(result)
})
})
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_new_valid_config() {
let backend = MinIOBackend::new(
"http://localhost:9000",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await;
assert!(backend.is_ok());
let backend = backend.unwrap();
assert_eq!(backend.endpoint(), "http://localhost:9000");
assert_eq!(backend.bucket(), "mediagit-test");
}
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_new_https_endpoint() {
let backend = MinIOBackend::new(
"http://localhost:9000",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await;
assert!(backend.is_ok());
let backend = backend.unwrap();
assert_eq!(backend.endpoint(), "http://localhost:9000");
}
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_new_removes_trailing_slash() {
let backend = MinIOBackend::new(
"http://localhost:9000/",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await;
assert!(backend.is_ok());
let backend = backend.unwrap();
assert_eq!(backend.endpoint(), "http://localhost:9000");
}
#[tokio::test]
async fn test_invalid_endpoint_format() {
let result = MinIOBackend::new(
"localhost:9000", "bucket",
"key",
"secret",
)
.await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("must start with http"));
}
#[tokio::test]
async fn test_empty_bucket_name() {
let result = MinIOBackend::new(
"http://localhost:9000",
"", "key",
"secret",
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("bucket name"));
}
#[tokio::test]
async fn test_bucket_name_too_long() {
let long_bucket = "a".repeat(64);
let result =
MinIOBackend::new("http://localhost:9000", &long_bucket, "key", "secret").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("63 characters"));
}
#[tokio::test]
async fn test_bucket_name_invalid_characters() {
let result = MinIOBackend::new(
"http://localhost:9000",
"INVALID_BUCKET", "key",
"secret",
)
.await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("lowercase letters"));
}
#[tokio::test]
async fn test_bucket_name_starts_with_hyphen() {
let result = MinIOBackend::new("http://localhost:9000", "-invalid", "key", "secret").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_bucket_name_ends_with_hyphen() {
let result = MinIOBackend::new("http://localhost:9000", "invalid-", "key", "secret").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_empty_access_key() {
let result = MinIOBackend::new(
"http://localhost:9000",
"bucket",
"", "secret",
)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("access key"));
}
#[tokio::test]
async fn test_empty_secret_key() {
let result = MinIOBackend::new(
"http://localhost:9000",
"bucket",
"key",
"", )
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("secret key"));
}
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_debug_impl() {
let backend = MinIOBackend::new(
"http://localhost:9000",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await
.unwrap();
let debug_str = format!("{:?}", backend);
assert!(debug_str.contains("MinIOBackend"));
assert!(debug_str.contains("localhost:9000"));
assert!(debug_str.contains("***")); }
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_clone() {
let backend1 = MinIOBackend::new(
"http://localhost:9000",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await
.unwrap();
let backend2 = backend1.clone();
assert_eq!(backend2.endpoint(), backend1.endpoint());
assert_eq!(backend2.bucket(), backend1.bucket());
}
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_valid_bucket_names() {
let result = MinIOBackend::new(
"http://localhost:9000",
"mediagit-test",
"minioadmin",
"minioadmin",
)
.await;
assert!(
result.is_ok(),
"Bucket name 'mediagit-test' should be valid"
);
let result2 = MinIOBackend::new(
"http://localhost:9000",
"mediagit-repos",
"minioadmin",
"minioadmin",
)
.await;
assert!(
result2.is_ok(),
"Bucket name 'mediagit-repos' should be valid"
);
}
#[tokio::test]
async fn test_from_env_missing_variables() {
let endpoint = std::env::var("MINIO_ENDPOINT").ok();
let bucket = std::env::var("MINIO_BUCKET").ok();
let access_key = std::env::var("MINIO_ACCESS_KEY").ok();
let secret_key = std::env::var("MINIO_SECRET_KEY").ok();
std::env::remove_var("MINIO_ENDPOINT");
std::env::remove_var("MINIO_BUCKET");
std::env::remove_var("MINIO_ACCESS_KEY");
std::env::remove_var("MINIO_SECRET_KEY");
let result = MinIOBackend::from_env().await;
assert!(result.is_err());
if let Some(v) = endpoint {
std::env::set_var("MINIO_ENDPOINT", v);
}
if let Some(v) = bucket {
std::env::set_var("MINIO_BUCKET", v);
}
if let Some(v) = access_key {
std::env::set_var("MINIO_ACCESS_KEY", v);
}
if let Some(v) = secret_key {
std::env::set_var("MINIO_SECRET_KEY", v);
}
}
#[tokio::test]
#[ignore = "requires MinIO server"]
async fn test_from_env_all_variables() {
let endpoint = std::env::var("MINIO_ENDPOINT").ok();
let bucket = std::env::var("MINIO_BUCKET").ok();
let access_key = std::env::var("MINIO_ACCESS_KEY").ok();
let secret_key = std::env::var("MINIO_SECRET_KEY").ok();
std::env::set_var("MINIO_ENDPOINT", "http://localhost:9000");
std::env::set_var("MINIO_BUCKET", "mediagit-test");
std::env::set_var("MINIO_ACCESS_KEY", "minioadmin");
std::env::set_var("MINIO_SECRET_KEY", "minioadmin");
let result = MinIOBackend::from_env().await;
if let Err(ref e) = result {
eprintln!("from_env error: {}", e);
}
assert!(result.is_ok());
let backend = result.unwrap();
assert_eq!(backend.endpoint(), "http://localhost:9000");
assert_eq!(backend.bucket(), "mediagit-test");
if let Some(v) = endpoint {
std::env::set_var("MINIO_ENDPOINT", v);
} else {
std::env::remove_var("MINIO_ENDPOINT");
}
if let Some(v) = bucket {
std::env::set_var("MINIO_BUCKET", v);
} else {
std::env::remove_var("MINIO_BUCKET");
}
if let Some(v) = access_key {
std::env::set_var("MINIO_ACCESS_KEY", v);
} else {
std::env::remove_var("MINIO_ACCESS_KEY");
}
if let Some(v) = secret_key {
std::env::set_var("MINIO_SECRET_KEY", v);
} else {
std::env::remove_var("MINIO_SECRET_KEY");
}
}
}