Skip to main content

cellos_export_s3/
lib.rs

1//! [`ExportSink`] that uploads artifact bytes to S3 via a presigned PUT URL.
2//!
3//! Upload URL behavior:
4//! - If `presigned_url` contains `{cell_id}` or `{artifact_name}`, those placeholders are replaced.
5//! - Else the URL is treated as exact. This is the normal presigned-S3 path.
6//!
7//! Receipts and destination hints remain logical `s3://bucket/key` paths even though the transport
8//! is an HTTP PUT.
9//!
10//! Optional retry knobs:
11//! - `max_attempts` — total PUT attempts, including the first try
12//! - `retry_backoff_ms` — fixed delay between transient retries
13//!
14//! ## Timeout contract (EXPORT-S3-TIMEOUT)
15//!
16//! The reqwest client is built with **bounded** request and connect timeouts so a
17//! hung S3 endpoint cannot stall a cell's export phase indefinitely:
18//!
19//! - Request timeout: [`DEFAULT_REQUEST_TIMEOUT_MS`] (override via
20//!   `CELLOS_EXPORT_S3_TIMEOUT_MS`).
21//! - Connect timeout: [`DEFAULT_CONNECT_TIMEOUT_MS`] (override via
22//!   `CELLOS_EXPORT_S3_CONNECT_TIMEOUT_MS`).
23//!
24//! Both env vars accept a positive `u64` count of milliseconds; unparseable or
25//! zero values fall back to the default. Operators can raise the request
26//! timeout for slow large-artifact endpoints, but the client is **never**
27//! constructed without explicit timeouts.
28
29use async_trait::async_trait;
30use cellos_core::ports::ExportSink;
31use cellos_core::{
32    redact_url_credentials_for_logs, CellosError, ExportArtifactMetadata, ExportReceipt,
33    ExportReceiptTargetKind,
34};
35use reqwest::StatusCode;
36use std::time::Duration;
37use tracing::instrument;
38use zeroize::Zeroize;
39
40/// Default total request timeout (ms) applied to every artifact PUT.
41///
42/// 30 seconds is long enough for a multi-megabyte upload over a modest
43/// connection, short enough that a black-holed endpoint does not block the
44/// export phase indefinitely.
45pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
46
47/// Default TCP connect timeout (ms) for the underlying reqwest client.
48pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
49
50/// Env var to override [`DEFAULT_REQUEST_TIMEOUT_MS`].
51pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_TIMEOUT_MS";
52
53/// Env var to override [`DEFAULT_CONNECT_TIMEOUT_MS`].
54pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_CONNECT_TIMEOUT_MS";
55
56/// Resolve a timeout in milliseconds from the named env var.
57///
58/// Returns `default_ms` when the env var is unset, empty, non-numeric, or `0`.
59/// Pure function — exposed so callers (and contract tests) can verify the
60/// resolution policy without constructing a client.
61pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
62    match std::env::var(env_var) {
63        Ok(raw) => raw
64            .trim()
65            .parse::<u64>()
66            .ok()
67            .filter(|v| *v > 0)
68            .unwrap_or(default_ms),
69        Err(_) => default_ms,
70    }
71}
72
73/// Build a reqwest client that honours `CELLOS_CA_BUNDLE` (path to a PEM CA bundle).
74///
75/// Always installs **bounded** request and connect timeouts (see module docs).
76fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
77    let request_timeout = Duration::from_millis(resolve_timeout_ms(
78        ENV_REQUEST_TIMEOUT_MS,
79        DEFAULT_REQUEST_TIMEOUT_MS,
80    ));
81    let connect_timeout = Duration::from_millis(resolve_timeout_ms(
82        ENV_CONNECT_TIMEOUT_MS,
83        DEFAULT_CONNECT_TIMEOUT_MS,
84    ));
85    let mut builder = reqwest::Client::builder()
86        .timeout(request_timeout)
87        .connect_timeout(connect_timeout);
88    if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
89        let pem =
90            std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
91        let mut added = 0usize;
92        for block in pem_cert_blocks(&pem) {
93            let cert = reqwest::Certificate::from_pem(&block)
94                .map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
95            builder = builder.add_root_certificate(cert);
96            added += 1;
97        }
98        if added == 0 {
99            return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
100        }
101        tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
102    }
103    Ok(builder)
104}
105
106fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
107    let text = String::from_utf8_lossy(pem);
108    let mut blocks = Vec::new();
109    let mut current = String::new();
110    let mut in_block = false;
111    for line in text.lines() {
112        if line.starts_with("-----BEGIN ") {
113            in_block = true;
114            current.clear();
115        }
116        if in_block {
117            current.push_str(line);
118            current.push('\n');
119            if line.starts_with("-----END ") {
120                blocks.push(current.as_bytes().to_vec());
121                in_block = false;
122            }
123        }
124    }
125    blocks
126}
127
128/// S3 export sink using presigned PUT URLs.
129pub struct PresignedS3ExportSink {
130    client: reqwest::Client,
131    /// Presigned URL zeroized on drop — credential bytes wiped when export phase ends.
132    presigned_url: String,
133    cell_id: String,
134    bucket: String,
135    key_prefix: Option<String>,
136    target_name: Option<String>,
137    /// AWS region for the bucket (optional — used for log attribution only; the
138    /// presigned URL already encodes the endpoint, so this does not change routing).
139    region: Option<String>,
140    max_attempts: usize,
141    retry_backoff: Duration,
142}
143
144impl Drop for PresignedS3ExportSink {
145    fn drop(&mut self) {
146        self.presigned_url.zeroize();
147    }
148}
149
150impl PresignedS3ExportSink {
151    #[allow(clippy::too_many_arguments)]
152    pub fn new(
153        presigned_url: impl Into<String>,
154        cell_id: impl Into<String>,
155        bucket: impl Into<String>,
156        key_prefix: Option<String>,
157        target_name: Option<String>,
158        region: Option<String>,
159        max_attempts: usize,
160        retry_backoff_ms: u64,
161    ) -> Result<Self, CellosError> {
162        let raw = presigned_url.into();
163        let trimmed = raw.trim().to_string();
164        if trimmed.is_empty() {
165            return Err(CellosError::ExportSink(
166                "S3 presigned URL is empty after trim".into(),
167            ));
168        }
169        let parsed = reqwest::Url::parse(trimmed.as_str())
170            .map_err(|e| CellosError::ExportSink(format!("invalid S3 presigned URL: {e}")))?;
171        let scheme = parsed.scheme();
172        if scheme != "http" && scheme != "https" {
173            return Err(CellosError::ExportSink(format!(
174                "S3 presigned URL scheme must be http or https, got {scheme}"
175            )));
176        }
177        let client = http_client_builder()
178            .map_err(CellosError::ExportSink)?
179            .build()
180            .map_err(|e| CellosError::ExportSink(format!("s3 http client init: {e}")))?;
181        if max_attempts == 0 {
182            return Err(CellosError::ExportSink(
183                "S3 export max_attempts must be at least 1".into(),
184            ));
185        }
186        Ok(Self {
187            client,
188            presigned_url: trimmed,
189            cell_id: cell_id.into(),
190            bucket: bucket.into(),
191            key_prefix,
192            target_name,
193            region,
194            max_attempts,
195            retry_backoff: Duration::from_millis(retry_backoff_ms),
196        })
197    }
198
199    fn upload_url(&self, artifact_name: &str) -> String {
200        let safe = artifact_name.replace(['/', '\\'], "_");
201        if self.presigned_url.contains("{cell_id}")
202            || self.presigned_url.contains("{artifact_name}")
203        {
204            return self
205                .presigned_url
206                .replace("{cell_id}", &self.cell_id)
207                .replace("{artifact_name}", &safe);
208        }
209        self.presigned_url.clone()
210    }
211
212    fn logical_destination(&self, artifact_name: &str) -> String {
213        let mut key = self
214            .key_prefix
215            .as_deref()
216            .unwrap_or("")
217            .trim_matches('/')
218            .to_string();
219        if !key.is_empty() {
220            key.push('/');
221        }
222        key.push_str(artifact_name);
223        format!("s3://{}/{}", self.bucket, key)
224    }
225
226    fn should_retry_status(status: StatusCode) -> bool {
227        status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
228    }
229}
230
231#[async_trait]
232impl ExportSink for PresignedS3ExportSink {
233    fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
234        Some(ExportReceiptTargetKind::S3)
235    }
236
237    fn destination_hint(&self, name: &str) -> Option<String> {
238        Some(self.logical_destination(name))
239    }
240
241    #[instrument(skip(self), fields(
242        cell_id = %self.cell_id,
243        artifact = %name,
244        region = self.region.as_deref().unwrap_or("(unset)"),
245    ))]
246    async fn push(
247        &self,
248        name: &str,
249        path: &str,
250        metadata: &ExportArtifactMetadata,
251    ) -> Result<ExportReceipt, CellosError> {
252        let bytes = tokio::fs::read(path)
253            .await
254            .map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
255        let bytes_written = bytes.len() as u64;
256
257        let url = self.upload_url(name);
258        for attempt in 1..=self.max_attempts {
259            let mut req = self.client.put(&url).body(bytes.clone());
260            if let Some(ref content_type) = metadata.content_type {
261                req = req.header(reqwest::header::CONTENT_TYPE, content_type);
262            }
263            match req.send().await {
264                Ok(resp) if resp.status().is_success() => {
265                    tracing::info!(
266                        url = %redact_url_credentials_for_logs(&url),
267                        artifact = %name,
268                        bucket = %self.bucket,
269                        region = self.region.as_deref().unwrap_or("(unset)"),
270                        attempts = attempt,
271                        "artifact uploaded to S3"
272                    );
273                    return Ok(ExportReceipt {
274                        target_kind: ExportReceiptTargetKind::S3,
275                        target_name: self.target_name.clone(),
276                        destination: self.logical_destination(name),
277                        bytes_written,
278                    });
279                }
280                Ok(resp) => {
281                    let status = resp.status();
282                    let body = resp.text().await.unwrap_or_default();
283                    if attempt < self.max_attempts && Self::should_retry_status(status) {
284                        tracing::warn!(
285                            url = %redact_url_credentials_for_logs(&url),
286                            artifact = %name,
287                            status = %status,
288                            attempt,
289                            max_attempts = self.max_attempts,
290                            "transient S3 export failure; retrying"
291                        );
292                        if !self.retry_backoff.is_zero() {
293                            tokio::time::sleep(self.retry_backoff).await;
294                        }
295                        continue;
296                    }
297                    return Err(CellosError::ExportSink(format!(
298                        "s3 put {} returned {status}: {body}",
299                        redact_url_credentials_for_logs(&url),
300                    )));
301                }
302                Err(e) => {
303                    if attempt < self.max_attempts {
304                        tracing::warn!(
305                            url = %redact_url_credentials_for_logs(&url),
306                            artifact = %name,
307                            error = %e,
308                            attempt,
309                            max_attempts = self.max_attempts,
310                            "S3 export transport error; retrying"
311                        );
312                        if !self.retry_backoff.is_zero() {
313                            tokio::time::sleep(self.retry_backoff).await;
314                        }
315                        continue;
316                    }
317                    return Err(CellosError::ExportSink(format!("s3 put {url}: {e}")));
318                }
319            }
320        }
321
322        unreachable!("retry loop must return or error")
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::PresignedS3ExportSink;
329    use cellos_core::ports::ExportSink;
330
331    #[test]
332    fn rejects_invalid_url() {
333        let r = PresignedS3ExportSink::new("not a url", "c1", "bucket", None, None, None, 1, 0);
334        assert!(r.is_err(), "expected parse error");
335    }
336
337    #[test]
338    fn rejects_non_http_scheme() {
339        let r = PresignedS3ExportSink::new(
340            "ftp://example.com/object",
341            "c1",
342            "bucket",
343            None,
344            None,
345            None,
346            1,
347            0,
348        );
349        assert!(r.is_err());
350    }
351
352    #[test]
353    fn rejects_zero_attempts() {
354        let r = PresignedS3ExportSink::new(
355            "https://example.com/object",
356            "c1",
357            "bucket",
358            None,
359            None,
360            None,
361            0,
362            0,
363        );
364        assert!(r.is_err());
365    }
366
367    #[test]
368    fn preserves_exact_presigned_url() {
369        let sink = PresignedS3ExportSink::new(
370            "https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc",
371            "c1",
372            "bucket",
373            Some("prefix".into()),
374            Some("artifacts".into()),
375            Some("us-east-1".into()),
376            1,
377            0,
378        )
379        .unwrap();
380        assert_eq!(
381            sink.upload_url("artifact.txt"),
382            "https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc"
383        );
384        assert_eq!(
385            sink.destination_hint("artifact.txt").unwrap(),
386            "s3://bucket/prefix/artifact.txt"
387        );
388        // Region is stored for log attribution.
389        assert_eq!(sink.region.as_deref(), Some("us-east-1"));
390    }
391
392    #[test]
393    fn expands_placeholders_when_present() {
394        let sink = PresignedS3ExportSink::new(
395            "https://bucket.s3.amazonaws.com/{cell_id}/{artifact_name}?X-Amz-Signature=abc",
396            "cell-42",
397            "bucket",
398            Some("prefix".into()),
399            Some("artifacts".into()),
400            None,
401            1,
402            0,
403        )
404        .unwrap();
405        assert_eq!(
406            sink.upload_url("artifact.txt"),
407            "https://bucket.s3.amazonaws.com/cell-42/artifact.txt?X-Amz-Signature=abc"
408        );
409    }
410
411    #[test]
412    fn region_none_when_not_set() {
413        let sink = PresignedS3ExportSink::new(
414            "https://bucket.s3.amazonaws.com/obj",
415            "c1",
416            "bucket",
417            None,
418            None,
419            None,
420            1,
421            0,
422        )
423        .unwrap();
424        assert!(sink.region.is_none());
425    }
426}