use async_trait::async_trait;
use cellos_core::ports::ExportSink;
use cellos_core::{
redact_url_credentials_for_logs, CellosError, ExportArtifactMetadata, ExportReceipt,
ExportReceiptTargetKind,
};
use reqwest::StatusCode;
use std::time::Duration;
use tracing::instrument;
use zeroize::Zeroize;
pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_TIMEOUT_MS";
pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_CONNECT_TIMEOUT_MS";
pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
match std::env::var(env_var) {
Ok(raw) => raw
.trim()
.parse::<u64>()
.ok()
.filter(|v| *v > 0)
.unwrap_or(default_ms),
Err(_) => default_ms,
}
}
fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
let request_timeout = Duration::from_millis(resolve_timeout_ms(
ENV_REQUEST_TIMEOUT_MS,
DEFAULT_REQUEST_TIMEOUT_MS,
));
let connect_timeout = Duration::from_millis(resolve_timeout_ms(
ENV_CONNECT_TIMEOUT_MS,
DEFAULT_CONNECT_TIMEOUT_MS,
));
let mut builder = reqwest::Client::builder()
.timeout(request_timeout)
.connect_timeout(connect_timeout);
if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
let pem =
std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
let mut added = 0usize;
for block in pem_cert_blocks(&pem) {
let cert = reqwest::Certificate::from_pem(&block)
.map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
builder = builder.add_root_certificate(cert);
added += 1;
}
if added == 0 {
return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
}
tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
}
Ok(builder)
}
fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
let text = String::from_utf8_lossy(pem);
let mut blocks = Vec::new();
let mut current = String::new();
let mut in_block = false;
for line in text.lines() {
if line.starts_with("-----BEGIN ") {
in_block = true;
current.clear();
}
if in_block {
current.push_str(line);
current.push('\n');
if line.starts_with("-----END ") {
blocks.push(current.as_bytes().to_vec());
in_block = false;
}
}
}
blocks
}
pub struct PresignedS3ExportSink {
client: reqwest::Client,
presigned_url: String,
cell_id: String,
bucket: String,
key_prefix: Option<String>,
target_name: Option<String>,
region: Option<String>,
max_attempts: usize,
retry_backoff: Duration,
}
impl Drop for PresignedS3ExportSink {
fn drop(&mut self) {
self.presigned_url.zeroize();
}
}
impl PresignedS3ExportSink {
#[allow(clippy::too_many_arguments)]
pub fn new(
presigned_url: impl Into<String>,
cell_id: impl Into<String>,
bucket: impl Into<String>,
key_prefix: Option<String>,
target_name: Option<String>,
region: Option<String>,
max_attempts: usize,
retry_backoff_ms: u64,
) -> Result<Self, CellosError> {
let raw = presigned_url.into();
let trimmed = raw.trim().to_string();
if trimmed.is_empty() {
return Err(CellosError::ExportSink(
"S3 presigned URL is empty after trim".into(),
));
}
let parsed = reqwest::Url::parse(trimmed.as_str())
.map_err(|e| CellosError::ExportSink(format!("invalid S3 presigned URL: {e}")))?;
let scheme = parsed.scheme();
if scheme != "http" && scheme != "https" {
return Err(CellosError::ExportSink(format!(
"S3 presigned URL scheme must be http or https, got {scheme}"
)));
}
let client = http_client_builder()
.map_err(CellosError::ExportSink)?
.build()
.map_err(|e| CellosError::ExportSink(format!("s3 http client init: {e}")))?;
if max_attempts == 0 {
return Err(CellosError::ExportSink(
"S3 export max_attempts must be at least 1".into(),
));
}
Ok(Self {
client,
presigned_url: trimmed,
cell_id: cell_id.into(),
bucket: bucket.into(),
key_prefix,
target_name,
region,
max_attempts,
retry_backoff: Duration::from_millis(retry_backoff_ms),
})
}
fn upload_url(&self, artifact_name: &str) -> String {
let safe = artifact_name.replace(['/', '\\'], "_");
if self.presigned_url.contains("{cell_id}")
|| self.presigned_url.contains("{artifact_name}")
{
return self
.presigned_url
.replace("{cell_id}", &self.cell_id)
.replace("{artifact_name}", &safe);
}
self.presigned_url.clone()
}
fn logical_destination(&self, artifact_name: &str) -> String {
let mut key = self
.key_prefix
.as_deref()
.unwrap_or("")
.trim_matches('/')
.to_string();
if !key.is_empty() {
key.push('/');
}
key.push_str(artifact_name);
format!("s3://{}/{}", self.bucket, key)
}
fn should_retry_status(status: StatusCode) -> bool {
status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
}
}
#[async_trait]
impl ExportSink for PresignedS3ExportSink {
fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
Some(ExportReceiptTargetKind::S3)
}
fn destination_hint(&self, name: &str) -> Option<String> {
Some(self.logical_destination(name))
}
#[instrument(skip(self), fields(
cell_id = %self.cell_id,
artifact = %name,
region = self.region.as_deref().unwrap_or("(unset)"),
))]
async fn push(
&self,
name: &str,
path: &str,
metadata: &ExportArtifactMetadata,
) -> Result<ExportReceipt, CellosError> {
let bytes = tokio::fs::read(path)
.await
.map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
let bytes_written = bytes.len() as u64;
let url = self.upload_url(name);
for attempt in 1..=self.max_attempts {
let mut req = self.client.put(&url).body(bytes.clone());
if let Some(ref content_type) = metadata.content_type {
req = req.header(reqwest::header::CONTENT_TYPE, content_type);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!(
url = %redact_url_credentials_for_logs(&url),
artifact = %name,
bucket = %self.bucket,
region = self.region.as_deref().unwrap_or("(unset)"),
attempts = attempt,
"artifact uploaded to S3"
);
return Ok(ExportReceipt {
target_kind: ExportReceiptTargetKind::S3,
target_name: self.target_name.clone(),
destination: self.logical_destination(name),
bytes_written,
});
}
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if attempt < self.max_attempts && Self::should_retry_status(status) {
tracing::warn!(
url = %redact_url_credentials_for_logs(&url),
artifact = %name,
status = %status,
attempt,
max_attempts = self.max_attempts,
"transient S3 export failure; retrying"
);
if !self.retry_backoff.is_zero() {
tokio::time::sleep(self.retry_backoff).await;
}
continue;
}
return Err(CellosError::ExportSink(format!(
"s3 put {} returned {status}: {body}",
redact_url_credentials_for_logs(&url),
)));
}
Err(e) => {
if attempt < self.max_attempts {
tracing::warn!(
url = %redact_url_credentials_for_logs(&url),
artifact = %name,
error = %e,
attempt,
max_attempts = self.max_attempts,
"S3 export transport error; retrying"
);
if !self.retry_backoff.is_zero() {
tokio::time::sleep(self.retry_backoff).await;
}
continue;
}
return Err(CellosError::ExportSink(format!("s3 put {url}: {e}")));
}
}
}
unreachable!("retry loop must return or error")
}
}
#[cfg(test)]
mod tests {
use super::PresignedS3ExportSink;
use cellos_core::ports::ExportSink;
#[test]
fn rejects_invalid_url() {
let r = PresignedS3ExportSink::new("not a url", "c1", "bucket", None, None, None, 1, 0);
assert!(r.is_err(), "expected parse error");
}
#[test]
fn rejects_non_http_scheme() {
let r = PresignedS3ExportSink::new(
"ftp://example.com/object",
"c1",
"bucket",
None,
None,
None,
1,
0,
);
assert!(r.is_err());
}
#[test]
fn rejects_zero_attempts() {
let r = PresignedS3ExportSink::new(
"https://example.com/object",
"c1",
"bucket",
None,
None,
None,
0,
0,
);
assert!(r.is_err());
}
#[test]
fn preserves_exact_presigned_url() {
let sink = PresignedS3ExportSink::new(
"https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc",
"c1",
"bucket",
Some("prefix".into()),
Some("artifacts".into()),
Some("us-east-1".into()),
1,
0,
)
.unwrap();
assert_eq!(
sink.upload_url("artifact.txt"),
"https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc"
);
assert_eq!(
sink.destination_hint("artifact.txt").unwrap(),
"s3://bucket/prefix/artifact.txt"
);
assert_eq!(sink.region.as_deref(), Some("us-east-1"));
}
#[test]
fn expands_placeholders_when_present() {
let sink = PresignedS3ExportSink::new(
"https://bucket.s3.amazonaws.com/{cell_id}/{artifact_name}?X-Amz-Signature=abc",
"cell-42",
"bucket",
Some("prefix".into()),
Some("artifacts".into()),
None,
1,
0,
)
.unwrap();
assert_eq!(
sink.upload_url("artifact.txt"),
"https://bucket.s3.amazonaws.com/cell-42/artifact.txt?X-Amz-Signature=abc"
);
}
#[test]
fn region_none_when_not_set() {
let sink = PresignedS3ExportSink::new(
"https://bucket.s3.amazonaws.com/obj",
"c1",
"bucket",
None,
None,
None,
1,
0,
)
.unwrap();
assert!(sink.region.is_none());
}
}