use async_trait::async_trait;
use cellos_core::ports::ExportSink;
use cellos_core::{CellosError, ExportArtifactMetadata, ExportReceipt, ExportReceiptTargetKind};
use reqwest::StatusCode;
use std::time::Duration;
use tracing::instrument;
use zeroize::Zeroize;
pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_TIMEOUT_MS";
pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_CONNECT_TIMEOUT_MS";
pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
match std::env::var(env_var) {
Ok(raw) => raw
.trim()
.parse::<u64>()
.ok()
.filter(|v| *v > 0)
.unwrap_or(default_ms),
Err(_) => default_ms,
}
}
fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
let request_timeout = Duration::from_millis(resolve_timeout_ms(
ENV_REQUEST_TIMEOUT_MS,
DEFAULT_REQUEST_TIMEOUT_MS,
));
let connect_timeout = Duration::from_millis(resolve_timeout_ms(
ENV_CONNECT_TIMEOUT_MS,
DEFAULT_CONNECT_TIMEOUT_MS,
));
let mut builder = reqwest::Client::builder()
.timeout(request_timeout)
.connect_timeout(connect_timeout);
if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
let pem =
std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
let mut added = 0usize;
for block in pem_cert_blocks(&pem) {
let cert = reqwest::Certificate::from_pem(&block)
.map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
builder = builder.add_root_certificate(cert);
added += 1;
}
if added == 0 {
return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
}
tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
}
Ok(builder)
}
fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
let text = String::from_utf8_lossy(pem);
let mut blocks = Vec::new();
let mut current = String::new();
let mut in_block = false;
for line in text.lines() {
if line.starts_with("-----BEGIN ") {
in_block = true;
current.clear();
}
if in_block {
current.push_str(line);
current.push('\n');
if line.starts_with("-----END ") {
blocks.push(current.as_bytes().to_vec());
in_block = false;
}
}
}
blocks
}
pub struct HttpExportSink {
client: reqwest::Client,
base_url: String,
cell_id: String,
bearer_token: Option<String>,
max_attempts: usize,
retry_backoff: Duration,
}
impl Drop for HttpExportSink {
fn drop(&mut self) {
if let Some(ref mut tok) = self.bearer_token {
tok.zeroize();
}
}
}
impl HttpExportSink {
pub fn new(
base_url: impl Into<String>,
cell_id: impl Into<String>,
bearer_token: Option<String>,
max_attempts: usize,
retry_backoff_ms: u64,
) -> Result<Self, CellosError> {
let raw = base_url.into();
let trimmed = raw.trim().trim_end_matches(['/', '\\']).to_string();
if trimmed.is_empty() {
return Err(CellosError::ExportSink(
"HTTP base URL is empty after trim".into(),
));
}
let parsed = reqwest::Url::parse(trimmed.as_str())
.map_err(|e| CellosError::ExportSink(format!("invalid HTTP export base URL: {e}")))?;
let scheme = parsed.scheme();
if scheme != "http" && scheme != "https" {
return Err(CellosError::ExportSink(format!(
"HTTP export base URL scheme must be http or https, got {scheme}"
)));
}
let client = http_client_builder()
.map_err(CellosError::ExportSink)?
.build()
.map_err(|e| CellosError::ExportSink(format!("http client init: {e}")))?;
if max_attempts == 0 {
return Err(CellosError::ExportSink(
"HTTP export max_attempts must be at least 1".into(),
));
}
Ok(Self {
client,
base_url: trimmed,
cell_id: cell_id.into(),
bearer_token,
max_attempts,
retry_backoff: Duration::from_millis(retry_backoff_ms),
})
}
pub fn from_env(cell_id: impl Into<String>) -> Result<Self, CellosError> {
let base_url = std::env::var("CELLOS_EXPORT_HTTP_BASE_URL")
.map_err(|_| CellosError::ExportSink("CELLOS_EXPORT_HTTP_BASE_URL not set".into()))?;
let bearer_token = std::env::var("CELLOS_EXPORT_HTTP_BEARER_TOKEN").ok();
Self::new(base_url, cell_id, bearer_token, 1, 0)
}
fn upload_url(&self, name: &str) -> String {
let safe = name.replace(['/', '\\'], "_");
if self.base_url.contains("{cell_id}") || self.base_url.contains("{artifact_name}") {
return self
.base_url
.replace("{cell_id}", &self.cell_id)
.replace("{artifact_name}", &safe);
}
if self.base_url.contains('?') {
return self.base_url.clone();
}
format!("{}/{}/{safe}", self.base_url, self.cell_id)
}
fn should_retry_status(status: StatusCode) -> bool {
status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
}
}
#[async_trait]
impl ExportSink for HttpExportSink {
fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
Some(ExportReceiptTargetKind::Http)
}
fn destination_hint(&self, name: &str) -> Option<String> {
Some(self.upload_url(name))
}
#[instrument(skip(self), fields(cell_id = %self.cell_id, artifact = %name))]
async fn push(
&self,
name: &str,
path: &str,
metadata: &ExportArtifactMetadata,
) -> Result<ExportReceipt, CellosError> {
let bytes = tokio::fs::read(path)
.await
.map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
let bytes_written = bytes.len() as u64;
let url = self.upload_url(name);
for attempt in 1..=self.max_attempts {
let mut req = self.client.put(&url).body(bytes.clone());
if let Some(ref token) = self.bearer_token {
req = req.bearer_auth(token);
}
if let Some(ref content_type) = metadata.content_type {
req = req.header(reqwest::header::CONTENT_TYPE, content_type);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!(url = %url, artifact = %name, attempts = attempt, "artifact uploaded");
return Ok(ExportReceipt {
target_kind: ExportReceiptTargetKind::Http,
target_name: None,
destination: url,
bytes_written,
});
}
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if attempt < self.max_attempts && Self::should_retry_status(status) {
tracing::warn!(
url = %url,
artifact = %name,
status = %status,
attempt,
max_attempts = self.max_attempts,
"transient HTTP export failure; retrying"
);
if !self.retry_backoff.is_zero() {
tokio::time::sleep(self.retry_backoff).await;
}
continue;
}
return Err(CellosError::ExportSink(format!(
"http put {url} returned {status}: {body}"
)));
}
Err(e) => {
if attempt < self.max_attempts {
tracing::warn!(
url = %url,
artifact = %name,
error = %e,
attempt,
max_attempts = self.max_attempts,
"HTTP export transport error; retrying"
);
if !self.retry_backoff.is_zero() {
tokio::time::sleep(self.retry_backoff).await;
}
continue;
}
return Err(CellosError::ExportSink(format!("http put {url}: {e}")));
}
}
}
unreachable!("retry loop must return or error")
}
}
#[cfg(test)]
mod tests {
use super::{http_client_builder, pem_cert_blocks, HttpExportSink};
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn rejects_invalid_base_url() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let r = HttpExportSink::new("not a url", "c1", None, 1, 0);
assert!(r.is_err(), "expected parse error");
}
#[test]
fn rejects_non_http_scheme() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let r = HttpExportSink::new("ftp://example.com/put", "c1", None, 1, 0);
assert!(r.is_err());
}
#[test]
fn accepts_https_base() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 1, 0);
assert!(r.is_ok());
}
#[test]
fn rejects_zero_attempts() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 0, 0);
assert!(r.is_err());
}
#[test]
fn preserves_exact_url_when_query_present() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let sink = HttpExportSink::new(
"https://example.com/upload/object.txt?X-Amz-Signature=abc",
"c1",
None,
1,
0,
)
.unwrap();
assert_eq!(
sink.upload_url("artifact.txt"),
"https://example.com/upload/object.txt?X-Amz-Signature=abc"
);
}
#[test]
fn expands_placeholders_when_present() {
let _g = ENV_LOCK.lock().unwrap();
std::env::remove_var("CELLOS_CA_BUNDLE");
let sink = HttpExportSink::new(
"https://example.com/upload/{cell_id}/{artifact_name}",
"cell-42",
None,
1,
0,
)
.unwrap();
assert_eq!(
sink.upload_url("artifact.txt"),
"https://example.com/upload/cell-42/artifact.txt"
);
}
#[test]
fn pem_cert_blocks_empty_input_returns_zero() {
assert_eq!(pem_cert_blocks(b""), Vec::<Vec<u8>>::new());
}
#[test]
fn pem_cert_blocks_single_cert_returns_one_block() {
let pem = b"-----BEGIN CERTIFICATE-----\nMIIFake==\n-----END CERTIFICATE-----\n";
let blocks = pem_cert_blocks(pem);
assert_eq!(blocks.len(), 1);
assert!(blocks[0].starts_with(b"-----BEGIN CERTIFICATE-----"));
}
#[test]
fn pem_cert_blocks_two_certs_returns_two_blocks() {
let pem = b"-----BEGIN CERTIFICATE-----\nMIIFirst==\n-----END CERTIFICATE-----\n\
-----BEGIN CERTIFICATE-----\nMIISecond==\n-----END CERTIFICATE-----\n";
let blocks = pem_cert_blocks(pem);
assert_eq!(blocks.len(), 2);
}
#[test]
fn pem_cert_blocks_no_markers_returns_zero() {
let pem = b"this is not a PEM file\nno BEGIN or END markers here\n";
assert_eq!(pem_cert_blocks(pem), Vec::<Vec<u8>>::new());
}
#[test]
fn ca_bundle_nonexistent_file_returns_error_with_path() {
let _g = ENV_LOCK.lock().unwrap();
let path = "/tmp/cellos_test_nonexistent_ca_bundle_99999.pem";
std::env::set_var("CELLOS_CA_BUNDLE", path);
let result = http_client_builder();
std::env::remove_var("CELLOS_CA_BUNDLE");
let err = result.unwrap_err();
assert!(
err.contains(path),
"expected path in error message, got: {err}"
);
}
#[test]
fn ca_bundle_file_with_no_pem_blocks_returns_error() {
let _g = ENV_LOCK.lock().unwrap();
let path = std::env::temp_dir().join("cellos_test_no_pem_blocks.txt");
std::fs::write(&path, b"not a pem bundle\n").unwrap();
let path_str = path.to_str().unwrap().to_string();
std::env::set_var("CELLOS_CA_BUNDLE", &path_str);
let result = http_client_builder();
std::env::remove_var("CELLOS_CA_BUNDLE");
let _ = std::fs::remove_file(&path);
let err = result.unwrap_err();
assert!(
err.contains("no certificates found"),
"expected 'no certificates found' in error, got: {err}"
);
}
}