1use async_trait::async_trait;
38use cellos_core::ports::ExportSink;
39use cellos_core::{CellosError, ExportArtifactMetadata, ExportReceipt, ExportReceiptTargetKind};
40use reqwest::StatusCode;
41use std::time::Duration;
42use tracing::instrument;
43use zeroize::Zeroize;
44
45pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
51
52pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
54
55pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_TIMEOUT_MS";
57
58pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_CONNECT_TIMEOUT_MS";
60
61pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
67 match std::env::var(env_var) {
68 Ok(raw) => raw
69 .trim()
70 .parse::<u64>()
71 .ok()
72 .filter(|v| *v > 0)
73 .unwrap_or(default_ms),
74 Err(_) => default_ms,
75 }
76}
77
78fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
86 let request_timeout = Duration::from_millis(resolve_timeout_ms(
87 ENV_REQUEST_TIMEOUT_MS,
88 DEFAULT_REQUEST_TIMEOUT_MS,
89 ));
90 let connect_timeout = Duration::from_millis(resolve_timeout_ms(
91 ENV_CONNECT_TIMEOUT_MS,
92 DEFAULT_CONNECT_TIMEOUT_MS,
93 ));
94 let mut builder = reqwest::Client::builder()
95 .timeout(request_timeout)
96 .connect_timeout(connect_timeout);
97 if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
98 let pem =
99 std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
100 let mut added = 0usize;
102 for block in pem_cert_blocks(&pem) {
103 let cert = reqwest::Certificate::from_pem(&block)
104 .map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
105 builder = builder.add_root_certificate(cert);
106 added += 1;
107 }
108 if added == 0 {
109 return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
110 }
111 tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
112 }
113 Ok(builder)
114}
115
116fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
118 let text = String::from_utf8_lossy(pem);
119 let mut blocks = Vec::new();
120 let mut current = String::new();
121 let mut in_block = false;
122 for line in text.lines() {
123 if line.starts_with("-----BEGIN ") {
124 in_block = true;
125 current.clear();
126 }
127 if in_block {
128 current.push_str(line);
129 current.push('\n');
130 if line.starts_with("-----END ") {
131 blocks.push(current.as_bytes().to_vec());
132 in_block = false;
133 }
134 }
135 }
136 blocks
137}
138
139pub struct HttpExportSink {
141 client: reqwest::Client,
142 base_url: String,
143 cell_id: String,
144 bearer_token: Option<String>,
146 max_attempts: usize,
147 retry_backoff: Duration,
148}
149
150impl Drop for HttpExportSink {
151 fn drop(&mut self) {
152 if let Some(ref mut tok) = self.bearer_token {
153 tok.zeroize();
154 }
155 }
156}
157
158impl HttpExportSink {
159 pub fn new(
160 base_url: impl Into<String>,
161 cell_id: impl Into<String>,
162 bearer_token: Option<String>,
163 max_attempts: usize,
164 retry_backoff_ms: u64,
165 ) -> Result<Self, CellosError> {
166 let raw = base_url.into();
167 let trimmed = raw.trim().trim_end_matches(['/', '\\']).to_string();
168 if trimmed.is_empty() {
169 return Err(CellosError::ExportSink(
170 "HTTP base URL is empty after trim".into(),
171 ));
172 }
173 let parsed = reqwest::Url::parse(trimmed.as_str())
174 .map_err(|e| CellosError::ExportSink(format!("invalid HTTP export base URL: {e}")))?;
175 let scheme = parsed.scheme();
176 if scheme != "http" && scheme != "https" {
177 return Err(CellosError::ExportSink(format!(
178 "HTTP export base URL scheme must be http or https, got {scheme}"
179 )));
180 }
181 let client = http_client_builder()
182 .map_err(CellosError::ExportSink)?
183 .build()
184 .map_err(|e| CellosError::ExportSink(format!("http client init: {e}")))?;
185 if max_attempts == 0 {
186 return Err(CellosError::ExportSink(
187 "HTTP export max_attempts must be at least 1".into(),
188 ));
189 }
190 Ok(Self {
191 client,
192 base_url: trimmed,
193 cell_id: cell_id.into(),
194 bearer_token,
195 max_attempts,
196 retry_backoff: Duration::from_millis(retry_backoff_ms),
197 })
198 }
199
200 pub fn from_env(cell_id: impl Into<String>) -> Result<Self, CellosError> {
205 let base_url = std::env::var("CELLOS_EXPORT_HTTP_BASE_URL")
206 .map_err(|_| CellosError::ExportSink("CELLOS_EXPORT_HTTP_BASE_URL not set".into()))?;
207 let bearer_token = std::env::var("CELLOS_EXPORT_HTTP_BEARER_TOKEN").ok();
208 Self::new(base_url, cell_id, bearer_token, 1, 0)
209 }
210
211 fn upload_url(&self, name: &str) -> String {
212 let safe = name.replace(['/', '\\'], "_");
213 if self.base_url.contains("{cell_id}") || self.base_url.contains("{artifact_name}") {
214 return self
215 .base_url
216 .replace("{cell_id}", &self.cell_id)
217 .replace("{artifact_name}", &safe);
218 }
219 if self.base_url.contains('?') {
220 return self.base_url.clone();
221 }
222 format!("{}/{}/{safe}", self.base_url, self.cell_id)
223 }
224
225 fn should_retry_status(status: StatusCode) -> bool {
226 status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
227 }
228}
229
230#[async_trait]
231impl ExportSink for HttpExportSink {
232 fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
233 Some(ExportReceiptTargetKind::Http)
234 }
235
236 fn destination_hint(&self, name: &str) -> Option<String> {
237 Some(self.upload_url(name))
238 }
239
240 #[instrument(skip(self), fields(cell_id = %self.cell_id, artifact = %name))]
241 async fn push(
242 &self,
243 name: &str,
244 path: &str,
245 metadata: &ExportArtifactMetadata,
246 ) -> Result<ExportReceipt, CellosError> {
247 let bytes = tokio::fs::read(path)
248 .await
249 .map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
250 let bytes_written = bytes.len() as u64;
251 let url = self.upload_url(name);
252
253 for attempt in 1..=self.max_attempts {
254 let mut req = self.client.put(&url).body(bytes.clone());
255
256 if let Some(ref token) = self.bearer_token {
257 req = req.bearer_auth(token);
258 }
259 if let Some(ref content_type) = metadata.content_type {
260 req = req.header(reqwest::header::CONTENT_TYPE, content_type);
261 }
262
263 match req.send().await {
264 Ok(resp) if resp.status().is_success() => {
265 tracing::info!(url = %url, artifact = %name, attempts = attempt, "artifact uploaded");
266 return Ok(ExportReceipt {
267 target_kind: ExportReceiptTargetKind::Http,
268 target_name: None,
269 destination: url,
270 bytes_written,
271 });
272 }
273 Ok(resp) => {
274 let status = resp.status();
275 let body = resp.text().await.unwrap_or_default();
276 if attempt < self.max_attempts && Self::should_retry_status(status) {
277 tracing::warn!(
278 url = %url,
279 artifact = %name,
280 status = %status,
281 attempt,
282 max_attempts = self.max_attempts,
283 "transient HTTP export failure; retrying"
284 );
285 if !self.retry_backoff.is_zero() {
286 tokio::time::sleep(self.retry_backoff).await;
287 }
288 continue;
289 }
290 return Err(CellosError::ExportSink(format!(
291 "http put {url} returned {status}: {body}"
292 )));
293 }
294 Err(e) => {
295 if attempt < self.max_attempts {
296 tracing::warn!(
297 url = %url,
298 artifact = %name,
299 error = %e,
300 attempt,
301 max_attempts = self.max_attempts,
302 "HTTP export transport error; retrying"
303 );
304 if !self.retry_backoff.is_zero() {
305 tokio::time::sleep(self.retry_backoff).await;
306 }
307 continue;
308 }
309 return Err(CellosError::ExportSink(format!("http put {url}: {e}")));
310 }
311 }
312 }
313
314 unreachable!("retry loop must return or error")
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::{http_client_builder, pem_cert_blocks, HttpExportSink};
321 use std::sync::Mutex;
322
323 static ENV_LOCK: Mutex<()> = Mutex::new(());
327
328 #[test]
329 fn rejects_invalid_base_url() {
330 let _g = ENV_LOCK.lock().unwrap();
331 std::env::remove_var("CELLOS_CA_BUNDLE");
332 let r = HttpExportSink::new("not a url", "c1", None, 1, 0);
333 assert!(r.is_err(), "expected parse error");
334 }
335
336 #[test]
337 fn rejects_non_http_scheme() {
338 let _g = ENV_LOCK.lock().unwrap();
339 std::env::remove_var("CELLOS_CA_BUNDLE");
340 let r = HttpExportSink::new("ftp://example.com/put", "c1", None, 1, 0);
341 assert!(r.is_err());
342 }
343
344 #[test]
345 fn accepts_https_base() {
346 let _g = ENV_LOCK.lock().unwrap();
347 std::env::remove_var("CELLOS_CA_BUNDLE");
348 let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 1, 0);
349 assert!(r.is_ok());
350 }
351
352 #[test]
353 fn rejects_zero_attempts() {
354 let _g = ENV_LOCK.lock().unwrap();
355 std::env::remove_var("CELLOS_CA_BUNDLE");
356 let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 0, 0);
357 assert!(r.is_err());
358 }
359
360 #[test]
361 fn preserves_exact_url_when_query_present() {
362 let _g = ENV_LOCK.lock().unwrap();
363 std::env::remove_var("CELLOS_CA_BUNDLE");
364 let sink = HttpExportSink::new(
365 "https://example.com/upload/object.txt?X-Amz-Signature=abc",
366 "c1",
367 None,
368 1,
369 0,
370 )
371 .unwrap();
372 assert_eq!(
373 sink.upload_url("artifact.txt"),
374 "https://example.com/upload/object.txt?X-Amz-Signature=abc"
375 );
376 }
377
378 #[test]
379 fn expands_placeholders_when_present() {
380 let _g = ENV_LOCK.lock().unwrap();
381 std::env::remove_var("CELLOS_CA_BUNDLE");
382 let sink = HttpExportSink::new(
383 "https://example.com/upload/{cell_id}/{artifact_name}",
384 "cell-42",
385 None,
386 1,
387 0,
388 )
389 .unwrap();
390 assert_eq!(
391 sink.upload_url("artifact.txt"),
392 "https://example.com/upload/cell-42/artifact.txt"
393 );
394 }
395
396 #[test]
399 fn pem_cert_blocks_empty_input_returns_zero() {
400 assert_eq!(pem_cert_blocks(b""), Vec::<Vec<u8>>::new());
401 }
402
403 #[test]
404 fn pem_cert_blocks_single_cert_returns_one_block() {
405 let pem = b"-----BEGIN CERTIFICATE-----\nMIIFake==\n-----END CERTIFICATE-----\n";
406 let blocks = pem_cert_blocks(pem);
407 assert_eq!(blocks.len(), 1);
408 assert!(blocks[0].starts_with(b"-----BEGIN CERTIFICATE-----"));
409 }
410
411 #[test]
412 fn pem_cert_blocks_two_certs_returns_two_blocks() {
413 let pem = b"-----BEGIN CERTIFICATE-----\nMIIFirst==\n-----END CERTIFICATE-----\n\
414 -----BEGIN CERTIFICATE-----\nMIISecond==\n-----END CERTIFICATE-----\n";
415 let blocks = pem_cert_blocks(pem);
416 assert_eq!(blocks.len(), 2);
417 }
418
419 #[test]
420 fn pem_cert_blocks_no_markers_returns_zero() {
421 let pem = b"this is not a PEM file\nno BEGIN or END markers here\n";
422 assert_eq!(pem_cert_blocks(pem), Vec::<Vec<u8>>::new());
423 }
424
425 #[test]
428 fn ca_bundle_nonexistent_file_returns_error_with_path() {
429 let _g = ENV_LOCK.lock().unwrap();
430 let path = "/tmp/cellos_test_nonexistent_ca_bundle_99999.pem";
431 std::env::set_var("CELLOS_CA_BUNDLE", path);
432 let result = http_client_builder();
433 std::env::remove_var("CELLOS_CA_BUNDLE");
434 let err = result.unwrap_err();
435 assert!(
436 err.contains(path),
437 "expected path in error message, got: {err}"
438 );
439 }
440
441 #[test]
442 fn ca_bundle_file_with_no_pem_blocks_returns_error() {
443 let _g = ENV_LOCK.lock().unwrap();
444 let path = std::env::temp_dir().join("cellos_test_no_pem_blocks.txt");
445 std::fs::write(&path, b"not a pem bundle\n").unwrap();
446 let path_str = path.to_str().unwrap().to_string();
447 std::env::set_var("CELLOS_CA_BUNDLE", &path_str);
448 let result = http_client_builder();
449 std::env::remove_var("CELLOS_CA_BUNDLE");
450 let _ = std::fs::remove_file(&path);
451 let err = result.unwrap_err();
452 assert!(
453 err.contains("no certificates found"),
454 "expected 'no certificates found' in error, got: {err}"
455 );
456 }
457}