use std::future::Future;
use std::time::Duration;
use aws_config::BehaviorVersion;
use aws_credential_types::Credentials;
use aws_sdk_s3::{
config::{Builder, Region},
presigning::PresigningConfig,
primitives::ByteStream,
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use md5::{Digest, Md5};
use crate::config::TransferConfig;
use crate::error::AppError;
pub struct R2Object {
pub key: String,
pub etag: String,
pub size: u64,
pub last_modified: DateTime<Utc>,
pub content_md5: Option<String>,
}
pub struct R2ObjectMeta {
pub etag: String,
pub size: u64,
pub content_md5: Option<String>,
}
pub enum AcquireResult {
Acquired,
AlreadyExists,
}
pub struct R2Client {
client: aws_sdk_s3::Client,
pub bucket: String,
transfer: TransferConfig,
}
impl R2Client {
pub const METADATA_KEY: &'static str = "metadata.json";
pub fn project_prefix(project: &str) -> String {
format!("projects/{}/", project)
}
pub fn lock_key(project: &str) -> String {
format!("locks/{}.lock", project)
}
pub fn template_key(template_name: &str, uppercase: bool) -> String {
if uppercase {
format!("templates/{}.RPP", template_name)
} else {
format!("templates/{}.rpp", template_name)
}
}
pub async fn new(config: &crate::config::Config) -> Result<Self, AppError> {
let endpoint = format!("https://{}.r2.cloudflarestorage.com", config.r2.account_id);
let creds =
Credentials::from_keys(&config.r2.access_key_id, &config.r2.secret_access_key, None);
let sdk_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("auto"))
.endpoint_url(&endpoint)
.credentials_provider(creds)
.load()
.await;
let s3_config = Builder::from(&sdk_config).force_path_style(true).build();
let client = aws_sdk_s3::Client::from_conf(s3_config);
Ok(Self {
client,
bucket: config.r2.bucket.clone(),
transfer: config.transfer.clone(),
})
}
pub async fn list_objects(&self, prefix: &str) -> Result<Vec<R2Object>, AppError> {
let mut objects: Vec<R2Object> = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(prefix);
if let Some(ref token) = continuation_token {
req = req.continuation_token(token);
}
let resp = req.send().await.map_err(|e| {
AppError::R2Error(format!(
"list_objects failed for prefix '{}': {}",
prefix, e
))
})?;
for obj in resp.contents() {
let full_key = obj.key().unwrap_or_default();
let relative_key = full_key
.strip_prefix(prefix)
.unwrap_or(full_key)
.to_string();
let raw_etag = obj.e_tag().unwrap_or_default();
let etag = strip_etag_quotes(raw_etag);
let size = obj.size().unwrap_or(0) as u64;
let last_modified = obj
.last_modified()
.and_then(|dt| {
let secs = dt.secs();
let nanos = dt.subsec_nanos();
DateTime::from_timestamp(secs, nanos)
})
.unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap());
objects.push(R2Object {
key: relative_key,
etag,
size,
last_modified,
content_md5: None,
});
}
if resp.is_truncated().unwrap_or(false) {
continuation_token = resp.next_continuation_token().map(str::to_string);
} else {
break;
}
}
Ok(objects)
}
pub async fn get_object_bytes(&self, key: &str) -> Result<Bytes, AppError> {
let op_name = key.rsplit('/').next().unwrap_or(key).to_string();
let max_retries = self.transfer.retry_count;
let initial_delay = Duration::from_secs(1);
let timeout = Duration::from_secs(self.transfer.timeout_secs);
retry_with_backoff(&op_name, max_retries, initial_delay, timeout, || async {
let result = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await;
let resp = match result {
Ok(r) => r,
Err(sdk_err) => {
let http_status = sdk_err.raw_response().map(|r| r.status().as_u16());
if matches!(http_status, Some(404)) {
return Err(AppError::NotFound {
key: key.to_string(),
});
}
let debug = format!("{:?}", sdk_err);
if debug.contains("NoSuchKey") {
return Err(AppError::NotFound {
key: key.to_string(),
});
}
return Err(AppError::DownloadFailed {
path: key.to_string(),
source: Box::new(sdk_err),
});
}
};
let body = resp
.body
.collect()
.await
.map_err(|e| AppError::DownloadFailed {
path: key.to_string(),
source: Box::new(e),
})?;
Ok(body.into_bytes())
})
.await
}
pub async fn put_object(&self, key: &str, body: Vec<u8>) -> Result<(), AppError> {
let md5_hex = compute_md5_hex(&body);
let op_name = key.rsplit('/').next().unwrap_or(key).to_string();
let max_retries = self.transfer.retry_count;
let initial_delay = Duration::from_secs(1);
let timeout = Duration::from_secs(self.transfer.timeout_secs);
retry_with_backoff(&op_name, max_retries, initial_delay, timeout, || async {
self.client
.put_object()
.bucket(&self.bucket)
.key(key)
.metadata("content-md5", &md5_hex)
.body(ByteStream::from(body.clone()))
.send()
.await
.map_err(|e| AppError::UploadFailed {
path: key.to_string(),
source: Box::new(e),
})?;
Ok(())
})
.await
}
pub async fn put_object_if_not_exists(
&self,
key: &str,
body: Vec<u8>,
) -> Result<AcquireResult, AppError> {
let md5_hex = compute_md5_hex(&body);
let result = self
.client
.put_object()
.bucket(&self.bucket)
.key(key)
.if_none_match("*")
.metadata("content-md5", &md5_hex)
.body(ByteStream::from(body))
.send()
.await;
match result {
Ok(_) => Ok(AcquireResult::Acquired),
Err(sdk_err) => {
let http_status = sdk_err.raw_response().map(|r| r.status().as_u16());
match http_status {
Some(412) => Ok(AcquireResult::AlreadyExists),
_ => {
let debug = format!("{:?}", sdk_err);
if debug.contains("PreconditionFailed") {
Ok(AcquireResult::AlreadyExists)
} else {
Err(AppError::UploadFailed {
path: key.to_string(),
source: Box::new(sdk_err),
})
}
}
}
}
}
}
pub async fn delete_object(&self, key: &str) -> Result<(), AppError> {
let result = self
.client
.delete_object()
.bucket(&self.bucket)
.key(key)
.send()
.await;
match result {
Ok(_) => Ok(()),
Err(sdk_err) => {
let http_status = sdk_err.raw_response().map(|r| r.status().as_u16());
if matches!(http_status, Some(404)) {
return Ok(());
}
let debug = format!("{:?}", sdk_err);
if debug.contains("NoSuchKey") || debug.contains("404") {
return Ok(());
}
Err(AppError::R2Error(format!(
"delete_object failed for key '{}': {}",
key, sdk_err
)))
}
}
}
pub async fn presign_get_object(
&self,
key: &str,
expires_in: Duration,
) -> Result<String, AppError> {
let presigning_config = PresigningConfig::expires_in(expires_in)
.map_err(|e| AppError::R2Error(format!("invalid presigning duration: {}", e)))?;
let presigned = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.presigned(presigning_config)
.await
.map_err(|e| {
AppError::R2Error(format!(
"presign_get_object failed for key '{}': {}",
key, e
))
})?;
Ok(presigned.uri().to_string())
}
pub async fn head_object(&self, key: &str) -> Result<Option<R2ObjectMeta>, AppError> {
let result = self
.client
.head_object()
.bucket(&self.bucket)
.key(key)
.send()
.await;
match result {
Ok(resp) => {
let raw_etag = resp.e_tag().unwrap_or_default();
let etag = strip_etag_quotes(raw_etag);
let size = resp.content_length().unwrap_or(0) as u64;
let content_md5 = resp.metadata().and_then(|m| m.get("content-md5")).cloned();
Ok(Some(R2ObjectMeta {
etag,
size,
content_md5,
}))
}
Err(sdk_err) => {
let http_status = sdk_err.raw_response().map(|r| r.status().as_u16());
if matches!(http_status, Some(404)) {
return Ok(None);
}
let debug = format!("{:?}", sdk_err);
if debug.contains("NoSuchKey")
|| debug.contains("NotFound")
|| debug.contains("404")
{
return Ok(None);
}
Err(AppError::R2Error(format!(
"head_object failed for key '{}': {}",
key, sdk_err
)))
}
}
}
}
async fn retry_with_backoff<F, Fut, T>(
op_name: &str,
max_retries: u32,
initial_delay: Duration,
timeout: Duration,
mut operation: F,
) -> Result<T, AppError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, AppError>>,
{
const MAX_DELAY: Duration = Duration::from_secs(30);
let mut last_err = None;
for attempt in 0..=max_retries {
if attempt > 0 {
let delay = initial_delay
.saturating_mul(1u32.checked_shl(attempt - 1).unwrap_or(u32::MAX))
.min(MAX_DELAY);
eprintln!(
"Retrying {} (attempt {}/{})...",
op_name, attempt, max_retries
);
tokio::time::sleep(delay).await;
}
match tokio::time::timeout(timeout, operation()).await {
Ok(Ok(value)) => return Ok(value),
Ok(Err(err)) => last_err = Some(err),
Err(_elapsed) => {
last_err = Some(AppError::Other(format!(
"{} timed out after {}s",
op_name,
timeout.as_secs()
)));
}
}
}
Err(last_err.expect("loop always runs at least once"))
}
pub(crate) fn compute_md5_hex(data: &[u8]) -> String {
let mut hasher = Md5::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
fn strip_etag_quotes(etag: &str) -> String {
etag.trim_matches('"').to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strip_etag_quotes_removes_surrounding_quotes() {
assert_eq!(strip_etag_quotes("\"abc123\""), "abc123");
}
#[test]
fn strip_etag_quotes_leaves_unquoted_etag_unchanged() {
assert_eq!(strip_etag_quotes("abc123"), "abc123");
}
#[test]
fn strip_etag_quotes_handles_empty_string() {
assert_eq!(strip_etag_quotes(""), "");
}
#[test]
fn project_prefix_format() {
assert_eq!(
R2Client::project_prefix("episode-47"),
"projects/episode-47/"
);
}
#[test]
fn lock_key_format() {
assert_eq!(R2Client::lock_key("episode-47"), "locks/episode-47.lock");
}
#[test]
fn metadata_key_is_correct() {
assert_eq!(R2Client::METADATA_KEY, "metadata.json");
}
#[test]
fn compute_md5_hex_known_value() {
assert_eq!(compute_md5_hex(b""), "d41d8cd98f00b204e9800998ecf8427e");
}
#[test]
fn compute_md5_hex_nonempty() {
assert_eq!(
compute_md5_hex(b"hello"),
"5d41402abc4b2a76b9719d911017c592"
);
}
#[tokio::test]
async fn retry_succeeds_on_first_attempt() {
let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let cc = call_count.clone();
let result = retry_with_backoff(
"test-op",
3,
Duration::from_millis(1),
Duration::from_secs(60),
|| {
let cc = cc.clone();
async move {
cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok::<u32, AppError>(42)
}
},
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
assert_eq!(
call_count.load(std::sync::atomic::Ordering::SeqCst),
1,
"expected exactly one attempt"
);
}
#[tokio::test]
async fn retry_exhausts_all_attempts_on_persistent_failure() {
let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let cc = call_count.clone();
let result = retry_with_backoff(
"test-op",
2,
Duration::from_millis(1),
Duration::from_secs(60),
|| {
let cc = cc.clone();
async move {
cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err::<u32, AppError>(AppError::Other("transient".to_string()))
}
},
)
.await;
assert!(result.is_err());
assert_eq!(
call_count.load(std::sync::atomic::Ordering::SeqCst),
3,
"expected 3 total attempts (initial + 2 retries)"
);
}
#[tokio::test]
async fn retry_succeeds_on_third_attempt() {
let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let cc = call_count.clone();
let result = retry_with_backoff(
"test-op",
3,
Duration::from_millis(1),
Duration::from_secs(60),
|| {
let cc = cc.clone();
async move {
let n = cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if n < 2 {
Err(AppError::Other("transient".to_string()))
} else {
Ok::<u32, AppError>(99)
}
}
},
)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 99);
assert_eq!(
call_count.load(std::sync::atomic::Ordering::SeqCst),
3,
"expected 3 total attempts"
);
}
#[tokio::test]
async fn retry_zero_max_retries_makes_single_attempt() {
let call_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let cc = call_count.clone();
let result = retry_with_backoff(
"test-op",
0,
Duration::from_millis(1),
Duration::from_secs(60),
|| {
let cc = cc.clone();
async move {
cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Err::<u32, AppError>(AppError::Other("fail".to_string()))
}
},
)
.await;
assert!(result.is_err());
assert_eq!(
call_count.load(std::sync::atomic::Ordering::SeqCst),
1,
"expected exactly one attempt when max_retries=0"
);
}
}