Skip to main content

cellos_export_http/
lib.rs

1//! [`ExportSink`] that HTTP PUTs artifact bytes to a configurable base URL.
2//!
3//! Supports:
4//! - S3 presigned PUT URLs
5//! - Generic HTTP artifact endpoints
6//! - CI systems that accept artifact uploads via HTTP
7//!
8//! Optional retry knobs:
9//! - `max_attempts` — total PUT attempts, including the first try
10//! - `retry_backoff_ms` — fixed delay between transient retries
11//!
12//! Upload URL behavior:
13//! - If `base_url` contains `{cell_id}` or `{artifact_name}`, those placeholders are replaced.
14//! - Else if `base_url` contains a query string, it is treated as an exact URL.
15//! - Else the sink appends `/{cell_id}/{artifact_name}`.
16//!
17//! Configure `CELLOS_EXPORT_HTTP_BASE_URL` and optionally
18//! `CELLOS_EXPORT_HTTP_BEARER_TOKEN` in the environment, then construct via
19//! [`HttpExportSink::new`] or [`HttpExportSink::from_env`]. Base URLs must parse as **`http` or
20//! `https`** after trim (empty or other schemes are rejected at construction).
21//!
22//! ## Timeout contract (EXPORT-HTTP-TIMEOUT)
23//!
24//! The reqwest client is built with **bounded** request and connect timeouts so a
25//! hung artifact endpoint cannot stall a cell's export phase indefinitely:
26//!
27//! - Request timeout: [`DEFAULT_REQUEST_TIMEOUT_MS`] (override via
28//!   `CELLOS_EXPORT_HTTP_TIMEOUT_MS`).
29//! - Connect timeout: [`DEFAULT_CONNECT_TIMEOUT_MS`] (override via
30//!   `CELLOS_EXPORT_HTTP_CONNECT_TIMEOUT_MS`).
31//!
32//! Both env vars accept a positive `u64` count of milliseconds; unparseable or
33//! zero values fall back to the default. Operators can raise the request
34//! timeout for slow large-artifact endpoints, but the client is **never**
35//! constructed without explicit timeouts.
36
37use async_trait::async_trait;
38use cellos_core::ports::ExportSink;
39use cellos_core::{CellosError, ExportArtifactMetadata, ExportReceipt, ExportReceiptTargetKind};
40use reqwest::StatusCode;
41use std::time::Duration;
42use tracing::instrument;
43use zeroize::Zeroize;
44
45/// Default total request timeout (ms) applied to every artifact PUT.
46///
47/// 30 seconds is long enough for a multi-megabyte upload over a modest
48/// connection, short enough that a black-holed endpoint does not block the
49/// export phase indefinitely.
50pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
51
52/// Default TCP connect timeout (ms) for the underlying reqwest client.
53pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
54
55/// Env var to override [`DEFAULT_REQUEST_TIMEOUT_MS`].
56pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_TIMEOUT_MS";
57
58/// Env var to override [`DEFAULT_CONNECT_TIMEOUT_MS`].
59pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_HTTP_CONNECT_TIMEOUT_MS";
60
61/// Resolve a timeout in milliseconds from the named env var.
62///
63/// Returns `default_ms` when the env var is unset, empty, non-numeric, or `0`.
64/// Pure function — exposed so callers (and contract tests) can verify the
65/// resolution policy without constructing a client.
66pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
67    match std::env::var(env_var) {
68        Ok(raw) => raw
69            .trim()
70            .parse::<u64>()
71            .ok()
72            .filter(|v| *v > 0)
73            .unwrap_or(default_ms),
74        Err(_) => default_ms,
75    }
76}
77
78/// Build a reqwest client that honours `CELLOS_CA_BUNDLE` (path to a PEM CA bundle).
79///
80/// Corporate / private-PKI deployments set this env var to point at a CA chain so
81/// that HTTP export sinks can reach HTTPS artifact endpoints signed by an internal CA.
82/// `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` are respected by reqwest automatically.
83///
84/// Always installs **bounded** request and connect timeouts (see module docs).
85fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
86    let request_timeout = Duration::from_millis(resolve_timeout_ms(
87        ENV_REQUEST_TIMEOUT_MS,
88        DEFAULT_REQUEST_TIMEOUT_MS,
89    ));
90    let connect_timeout = Duration::from_millis(resolve_timeout_ms(
91        ENV_CONNECT_TIMEOUT_MS,
92        DEFAULT_CONNECT_TIMEOUT_MS,
93    ));
94    let mut builder = reqwest::Client::builder()
95        .timeout(request_timeout)
96        .connect_timeout(connect_timeout);
97    if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
98        let pem =
99            std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
100        // Split on PEM block boundaries to support bundles with multiple root / intermediate CAs.
101        let mut added = 0usize;
102        for block in pem_cert_blocks(&pem) {
103            let cert = reqwest::Certificate::from_pem(&block)
104                .map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
105            builder = builder.add_root_certificate(cert);
106            added += 1;
107        }
108        if added == 0 {
109            return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
110        }
111        tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
112    }
113    Ok(builder)
114}
115
116/// Split a concatenated PEM byte slice into one PEM block per certificate.
117fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
118    let text = String::from_utf8_lossy(pem);
119    let mut blocks = Vec::new();
120    let mut current = String::new();
121    let mut in_block = false;
122    for line in text.lines() {
123        if line.starts_with("-----BEGIN ") {
124            in_block = true;
125            current.clear();
126        }
127        if in_block {
128            current.push_str(line);
129            current.push('\n');
130            if line.starts_with("-----END ") {
131                blocks.push(current.as_bytes().to_vec());
132                in_block = false;
133            }
134        }
135    }
136    blocks
137}
138
139/// HTTP PUT export sink — uploads artifact file bytes to a resolved URL derived from `base_url`.
140pub struct HttpExportSink {
141    client: reqwest::Client,
142    base_url: String,
143    cell_id: String,
144    /// Bearer token zeroized on drop — credential should not outlive the export phase.
145    bearer_token: Option<String>,
146    max_attempts: usize,
147    retry_backoff: Duration,
148}
149
150impl Drop for HttpExportSink {
151    fn drop(&mut self) {
152        if let Some(ref mut tok) = self.bearer_token {
153            tok.zeroize();
154        }
155    }
156}
157
158impl HttpExportSink {
159    pub fn new(
160        base_url: impl Into<String>,
161        cell_id: impl Into<String>,
162        bearer_token: Option<String>,
163        max_attempts: usize,
164        retry_backoff_ms: u64,
165    ) -> Result<Self, CellosError> {
166        let raw = base_url.into();
167        let trimmed = raw.trim().trim_end_matches(['/', '\\']).to_string();
168        if trimmed.is_empty() {
169            return Err(CellosError::ExportSink(
170                "HTTP base URL is empty after trim".into(),
171            ));
172        }
173        let parsed = reqwest::Url::parse(trimmed.as_str())
174            .map_err(|e| CellosError::ExportSink(format!("invalid HTTP export base URL: {e}")))?;
175        let scheme = parsed.scheme();
176        if scheme != "http" && scheme != "https" {
177            return Err(CellosError::ExportSink(format!(
178                "HTTP export base URL scheme must be http or https, got {scheme}"
179            )));
180        }
181        let client = http_client_builder()
182            .map_err(CellosError::ExportSink)?
183            .build()
184            .map_err(|e| CellosError::ExportSink(format!("http client init: {e}")))?;
185        if max_attempts == 0 {
186            return Err(CellosError::ExportSink(
187                "HTTP export max_attempts must be at least 1".into(),
188            ));
189        }
190        Ok(Self {
191            client,
192            base_url: trimmed,
193            cell_id: cell_id.into(),
194            bearer_token,
195            max_attempts,
196            retry_backoff: Duration::from_millis(retry_backoff_ms),
197        })
198    }
199
200    /// Construct from environment variables.
201    ///
202    /// - `CELLOS_EXPORT_HTTP_BASE_URL` — required
203    /// - `CELLOS_EXPORT_HTTP_BEARER_TOKEN` — optional
204    pub fn from_env(cell_id: impl Into<String>) -> Result<Self, CellosError> {
205        let base_url = std::env::var("CELLOS_EXPORT_HTTP_BASE_URL")
206            .map_err(|_| CellosError::ExportSink("CELLOS_EXPORT_HTTP_BASE_URL not set".into()))?;
207        let bearer_token = std::env::var("CELLOS_EXPORT_HTTP_BEARER_TOKEN").ok();
208        Self::new(base_url, cell_id, bearer_token, 1, 0)
209    }
210
211    fn upload_url(&self, name: &str) -> String {
212        let safe = name.replace(['/', '\\'], "_");
213        if self.base_url.contains("{cell_id}") || self.base_url.contains("{artifact_name}") {
214            return self
215                .base_url
216                .replace("{cell_id}", &self.cell_id)
217                .replace("{artifact_name}", &safe);
218        }
219        if self.base_url.contains('?') {
220            return self.base_url.clone();
221        }
222        format!("{}/{}/{safe}", self.base_url, self.cell_id)
223    }
224
225    fn should_retry_status(status: StatusCode) -> bool {
226        status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
227    }
228}
229
230#[async_trait]
231impl ExportSink for HttpExportSink {
232    fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
233        Some(ExportReceiptTargetKind::Http)
234    }
235
236    fn destination_hint(&self, name: &str) -> Option<String> {
237        Some(self.upload_url(name))
238    }
239
240    #[instrument(skip(self), fields(cell_id = %self.cell_id, artifact = %name))]
241    async fn push(
242        &self,
243        name: &str,
244        path: &str,
245        metadata: &ExportArtifactMetadata,
246    ) -> Result<ExportReceipt, CellosError> {
247        let bytes = tokio::fs::read(path)
248            .await
249            .map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
250        let bytes_written = bytes.len() as u64;
251        let url = self.upload_url(name);
252
253        for attempt in 1..=self.max_attempts {
254            let mut req = self.client.put(&url).body(bytes.clone());
255
256            if let Some(ref token) = self.bearer_token {
257                req = req.bearer_auth(token);
258            }
259            if let Some(ref content_type) = metadata.content_type {
260                req = req.header(reqwest::header::CONTENT_TYPE, content_type);
261            }
262
263            match req.send().await {
264                Ok(resp) if resp.status().is_success() => {
265                    tracing::info!(url = %url, artifact = %name, attempts = attempt, "artifact uploaded");
266                    return Ok(ExportReceipt {
267                        target_kind: ExportReceiptTargetKind::Http,
268                        target_name: None,
269                        destination: url,
270                        bytes_written,
271                    });
272                }
273                Ok(resp) => {
274                    let status = resp.status();
275                    let body = resp.text().await.unwrap_or_default();
276                    if attempt < self.max_attempts && Self::should_retry_status(status) {
277                        tracing::warn!(
278                            url = %url,
279                            artifact = %name,
280                            status = %status,
281                            attempt,
282                            max_attempts = self.max_attempts,
283                            "transient HTTP export failure; retrying"
284                        );
285                        if !self.retry_backoff.is_zero() {
286                            tokio::time::sleep(self.retry_backoff).await;
287                        }
288                        continue;
289                    }
290                    return Err(CellosError::ExportSink(format!(
291                        "http put {url} returned {status}: {body}"
292                    )));
293                }
294                Err(e) => {
295                    if attempt < self.max_attempts {
296                        tracing::warn!(
297                            url = %url,
298                            artifact = %name,
299                            error = %e,
300                            attempt,
301                            max_attempts = self.max_attempts,
302                            "HTTP export transport error; retrying"
303                        );
304                        if !self.retry_backoff.is_zero() {
305                            tokio::time::sleep(self.retry_backoff).await;
306                        }
307                        continue;
308                    }
309                    return Err(CellosError::ExportSink(format!("http put {url}: {e}")));
310                }
311            }
312        }
313
314        unreachable!("retry loop must return or error")
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::{http_client_builder, pem_cert_blocks, HttpExportSink};
321    use std::sync::Mutex;
322
323    /// Serialize all tests that read or write `CELLOS_CA_BUNDLE`, since
324    /// `std::env::set_var` affects the whole process and Rust tests run in
325    /// parallel threads by default.
326    static ENV_LOCK: Mutex<()> = Mutex::new(());
327
328    #[test]
329    fn rejects_invalid_base_url() {
330        let _g = ENV_LOCK.lock().unwrap();
331        std::env::remove_var("CELLOS_CA_BUNDLE");
332        let r = HttpExportSink::new("not a url", "c1", None, 1, 0);
333        assert!(r.is_err(), "expected parse error");
334    }
335
336    #[test]
337    fn rejects_non_http_scheme() {
338        let _g = ENV_LOCK.lock().unwrap();
339        std::env::remove_var("CELLOS_CA_BUNDLE");
340        let r = HttpExportSink::new("ftp://example.com/put", "c1", None, 1, 0);
341        assert!(r.is_err());
342    }
343
344    #[test]
345    fn accepts_https_base() {
346        let _g = ENV_LOCK.lock().unwrap();
347        std::env::remove_var("CELLOS_CA_BUNDLE");
348        let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 1, 0);
349        assert!(r.is_ok());
350    }
351
352    #[test]
353    fn rejects_zero_attempts() {
354        let _g = ENV_LOCK.lock().unwrap();
355        std::env::remove_var("CELLOS_CA_BUNDLE");
356        let r = HttpExportSink::new("https://example.com/artifacts/", "c1", None, 0, 0);
357        assert!(r.is_err());
358    }
359
360    #[test]
361    fn preserves_exact_url_when_query_present() {
362        let _g = ENV_LOCK.lock().unwrap();
363        std::env::remove_var("CELLOS_CA_BUNDLE");
364        let sink = HttpExportSink::new(
365            "https://example.com/upload/object.txt?X-Amz-Signature=abc",
366            "c1",
367            None,
368            1,
369            0,
370        )
371        .unwrap();
372        assert_eq!(
373            sink.upload_url("artifact.txt"),
374            "https://example.com/upload/object.txt?X-Amz-Signature=abc"
375        );
376    }
377
378    #[test]
379    fn expands_placeholders_when_present() {
380        let _g = ENV_LOCK.lock().unwrap();
381        std::env::remove_var("CELLOS_CA_BUNDLE");
382        let sink = HttpExportSink::new(
383            "https://example.com/upload/{cell_id}/{artifact_name}",
384            "cell-42",
385            None,
386            1,
387            0,
388        )
389        .unwrap();
390        assert_eq!(
391            sink.upload_url("artifact.txt"),
392            "https://example.com/upload/cell-42/artifact.txt"
393        );
394    }
395
396    // --- pem_cert_blocks unit tests (no env var access, run in parallel) ---
397
398    #[test]
399    fn pem_cert_blocks_empty_input_returns_zero() {
400        assert_eq!(pem_cert_blocks(b""), Vec::<Vec<u8>>::new());
401    }
402
403    #[test]
404    fn pem_cert_blocks_single_cert_returns_one_block() {
405        let pem = b"-----BEGIN CERTIFICATE-----\nMIIFake==\n-----END CERTIFICATE-----\n";
406        let blocks = pem_cert_blocks(pem);
407        assert_eq!(blocks.len(), 1);
408        assert!(blocks[0].starts_with(b"-----BEGIN CERTIFICATE-----"));
409    }
410
411    #[test]
412    fn pem_cert_blocks_two_certs_returns_two_blocks() {
413        let pem = b"-----BEGIN CERTIFICATE-----\nMIIFirst==\n-----END CERTIFICATE-----\n\
414                    -----BEGIN CERTIFICATE-----\nMIISecond==\n-----END CERTIFICATE-----\n";
415        let blocks = pem_cert_blocks(pem);
416        assert_eq!(blocks.len(), 2);
417    }
418
419    #[test]
420    fn pem_cert_blocks_no_markers_returns_zero() {
421        let pem = b"this is not a PEM file\nno BEGIN or END markers here\n";
422        assert_eq!(pem_cert_blocks(pem), Vec::<Vec<u8>>::new());
423    }
424
425    // --- CELLOS_CA_BUNDLE env var tests ---
426
427    #[test]
428    fn ca_bundle_nonexistent_file_returns_error_with_path() {
429        let _g = ENV_LOCK.lock().unwrap();
430        let path = "/tmp/cellos_test_nonexistent_ca_bundle_99999.pem";
431        std::env::set_var("CELLOS_CA_BUNDLE", path);
432        let result = http_client_builder();
433        std::env::remove_var("CELLOS_CA_BUNDLE");
434        let err = result.unwrap_err();
435        assert!(
436            err.contains(path),
437            "expected path in error message, got: {err}"
438        );
439    }
440
441    #[test]
442    fn ca_bundle_file_with_no_pem_blocks_returns_error() {
443        let _g = ENV_LOCK.lock().unwrap();
444        let path = std::env::temp_dir().join("cellos_test_no_pem_blocks.txt");
445        std::fs::write(&path, b"not a pem bundle\n").unwrap();
446        let path_str = path.to_str().unwrap().to_string();
447        std::env::set_var("CELLOS_CA_BUNDLE", &path_str);
448        let result = http_client_builder();
449        std::env::remove_var("CELLOS_CA_BUNDLE");
450        let _ = std::fs::remove_file(&path);
451        let err = result.unwrap_err();
452        assert!(
453            err.contains("no certificates found"),
454            "expected 'no certificates found' in error, got: {err}"
455        );
456    }
457}