1use async_trait::async_trait;
30use cellos_core::ports::ExportSink;
31use cellos_core::{
32 redact_url_credentials_for_logs, CellosError, ExportArtifactMetadata, ExportReceipt,
33 ExportReceiptTargetKind,
34};
35use reqwest::StatusCode;
36use std::time::Duration;
37use tracing::instrument;
38use zeroize::Zeroize;
39
40pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000;
46
47pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
49
50pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_TIMEOUT_MS";
52
53pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_EXPORT_S3_CONNECT_TIMEOUT_MS";
55
56pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
62 match std::env::var(env_var) {
63 Ok(raw) => raw
64 .trim()
65 .parse::<u64>()
66 .ok()
67 .filter(|v| *v > 0)
68 .unwrap_or(default_ms),
69 Err(_) => default_ms,
70 }
71}
72
73fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
77 let request_timeout = Duration::from_millis(resolve_timeout_ms(
78 ENV_REQUEST_TIMEOUT_MS,
79 DEFAULT_REQUEST_TIMEOUT_MS,
80 ));
81 let connect_timeout = Duration::from_millis(resolve_timeout_ms(
82 ENV_CONNECT_TIMEOUT_MS,
83 DEFAULT_CONNECT_TIMEOUT_MS,
84 ));
85 let mut builder = reqwest::Client::builder()
86 .timeout(request_timeout)
87 .connect_timeout(connect_timeout);
88 if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
89 let pem =
90 std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
91 let mut added = 0usize;
92 for block in pem_cert_blocks(&pem) {
93 let cert = reqwest::Certificate::from_pem(&block)
94 .map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
95 builder = builder.add_root_certificate(cert);
96 added += 1;
97 }
98 if added == 0 {
99 return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
100 }
101 tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
102 }
103 Ok(builder)
104}
105
106fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
107 let text = String::from_utf8_lossy(pem);
108 let mut blocks = Vec::new();
109 let mut current = String::new();
110 let mut in_block = false;
111 for line in text.lines() {
112 if line.starts_with("-----BEGIN ") {
113 in_block = true;
114 current.clear();
115 }
116 if in_block {
117 current.push_str(line);
118 current.push('\n');
119 if line.starts_with("-----END ") {
120 blocks.push(current.as_bytes().to_vec());
121 in_block = false;
122 }
123 }
124 }
125 blocks
126}
127
128pub struct PresignedS3ExportSink {
130 client: reqwest::Client,
131 presigned_url: String,
133 cell_id: String,
134 bucket: String,
135 key_prefix: Option<String>,
136 target_name: Option<String>,
137 region: Option<String>,
140 max_attempts: usize,
141 retry_backoff: Duration,
142}
143
144impl Drop for PresignedS3ExportSink {
145 fn drop(&mut self) {
146 self.presigned_url.zeroize();
147 }
148}
149
150impl PresignedS3ExportSink {
151 #[allow(clippy::too_many_arguments)]
152 pub fn new(
153 presigned_url: impl Into<String>,
154 cell_id: impl Into<String>,
155 bucket: impl Into<String>,
156 key_prefix: Option<String>,
157 target_name: Option<String>,
158 region: Option<String>,
159 max_attempts: usize,
160 retry_backoff_ms: u64,
161 ) -> Result<Self, CellosError> {
162 let raw = presigned_url.into();
163 let trimmed = raw.trim().to_string();
164 if trimmed.is_empty() {
165 return Err(CellosError::ExportSink(
166 "S3 presigned URL is empty after trim".into(),
167 ));
168 }
169 let parsed = reqwest::Url::parse(trimmed.as_str())
170 .map_err(|e| CellosError::ExportSink(format!("invalid S3 presigned URL: {e}")))?;
171 let scheme = parsed.scheme();
172 if scheme != "http" && scheme != "https" {
173 return Err(CellosError::ExportSink(format!(
174 "S3 presigned URL scheme must be http or https, got {scheme}"
175 )));
176 }
177 let client = http_client_builder()
178 .map_err(CellosError::ExportSink)?
179 .build()
180 .map_err(|e| CellosError::ExportSink(format!("s3 http client init: {e}")))?;
181 if max_attempts == 0 {
182 return Err(CellosError::ExportSink(
183 "S3 export max_attempts must be at least 1".into(),
184 ));
185 }
186 Ok(Self {
187 client,
188 presigned_url: trimmed,
189 cell_id: cell_id.into(),
190 bucket: bucket.into(),
191 key_prefix,
192 target_name,
193 region,
194 max_attempts,
195 retry_backoff: Duration::from_millis(retry_backoff_ms),
196 })
197 }
198
199 fn upload_url(&self, artifact_name: &str) -> String {
200 let safe = artifact_name.replace(['/', '\\'], "_");
201 if self.presigned_url.contains("{cell_id}")
202 || self.presigned_url.contains("{artifact_name}")
203 {
204 return self
205 .presigned_url
206 .replace("{cell_id}", &self.cell_id)
207 .replace("{artifact_name}", &safe);
208 }
209 self.presigned_url.clone()
210 }
211
212 fn logical_destination(&self, artifact_name: &str) -> String {
213 let mut key = self
214 .key_prefix
215 .as_deref()
216 .unwrap_or("")
217 .trim_matches('/')
218 .to_string();
219 if !key.is_empty() {
220 key.push('/');
221 }
222 key.push_str(artifact_name);
223 format!("s3://{}/{}", self.bucket, key)
224 }
225
226 fn should_retry_status(status: StatusCode) -> bool {
227 status.is_server_error() || matches!(status.as_u16(), 408 | 425 | 429)
228 }
229}
230
231#[async_trait]
232impl ExportSink for PresignedS3ExportSink {
233 fn target_kind(&self) -> Option<ExportReceiptTargetKind> {
234 Some(ExportReceiptTargetKind::S3)
235 }
236
237 fn destination_hint(&self, name: &str) -> Option<String> {
238 Some(self.logical_destination(name))
239 }
240
241 #[instrument(skip(self), fields(
242 cell_id = %self.cell_id,
243 artifact = %name,
244 region = self.region.as_deref().unwrap_or("(unset)"),
245 ))]
246 async fn push(
247 &self,
248 name: &str,
249 path: &str,
250 metadata: &ExportArtifactMetadata,
251 ) -> Result<ExportReceipt, CellosError> {
252 let bytes = tokio::fs::read(path)
253 .await
254 .map_err(|e| CellosError::ExportSink(format!("read artifact {path}: {e}")))?;
255 let bytes_written = bytes.len() as u64;
256
257 let url = self.upload_url(name);
258 for attempt in 1..=self.max_attempts {
259 let mut req = self.client.put(&url).body(bytes.clone());
260 if let Some(ref content_type) = metadata.content_type {
261 req = req.header(reqwest::header::CONTENT_TYPE, content_type);
262 }
263 match req.send().await {
264 Ok(resp) if resp.status().is_success() => {
265 tracing::info!(
266 url = %redact_url_credentials_for_logs(&url),
267 artifact = %name,
268 bucket = %self.bucket,
269 region = self.region.as_deref().unwrap_or("(unset)"),
270 attempts = attempt,
271 "artifact uploaded to S3"
272 );
273 return Ok(ExportReceipt {
274 target_kind: ExportReceiptTargetKind::S3,
275 target_name: self.target_name.clone(),
276 destination: self.logical_destination(name),
277 bytes_written,
278 });
279 }
280 Ok(resp) => {
281 let status = resp.status();
282 let body = resp.text().await.unwrap_or_default();
283 if attempt < self.max_attempts && Self::should_retry_status(status) {
284 tracing::warn!(
285 url = %redact_url_credentials_for_logs(&url),
286 artifact = %name,
287 status = %status,
288 attempt,
289 max_attempts = self.max_attempts,
290 "transient S3 export failure; retrying"
291 );
292 if !self.retry_backoff.is_zero() {
293 tokio::time::sleep(self.retry_backoff).await;
294 }
295 continue;
296 }
297 return Err(CellosError::ExportSink(format!(
298 "s3 put {} returned {status}: {body}",
299 redact_url_credentials_for_logs(&url),
300 )));
301 }
302 Err(e) => {
303 if attempt < self.max_attempts {
304 tracing::warn!(
305 url = %redact_url_credentials_for_logs(&url),
306 artifact = %name,
307 error = %e,
308 attempt,
309 max_attempts = self.max_attempts,
310 "S3 export transport error; retrying"
311 );
312 if !self.retry_backoff.is_zero() {
313 tokio::time::sleep(self.retry_backoff).await;
314 }
315 continue;
316 }
317 return Err(CellosError::ExportSink(format!("s3 put {url}: {e}")));
318 }
319 }
320 }
321
322 unreachable!("retry loop must return or error")
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::PresignedS3ExportSink;
329 use cellos_core::ports::ExportSink;
330
331 #[test]
332 fn rejects_invalid_url() {
333 let r = PresignedS3ExportSink::new("not a url", "c1", "bucket", None, None, None, 1, 0);
334 assert!(r.is_err(), "expected parse error");
335 }
336
337 #[test]
338 fn rejects_non_http_scheme() {
339 let r = PresignedS3ExportSink::new(
340 "ftp://example.com/object",
341 "c1",
342 "bucket",
343 None,
344 None,
345 None,
346 1,
347 0,
348 );
349 assert!(r.is_err());
350 }
351
352 #[test]
353 fn rejects_zero_attempts() {
354 let r = PresignedS3ExportSink::new(
355 "https://example.com/object",
356 "c1",
357 "bucket",
358 None,
359 None,
360 None,
361 0,
362 0,
363 );
364 assert!(r.is_err());
365 }
366
367 #[test]
368 fn preserves_exact_presigned_url() {
369 let sink = PresignedS3ExportSink::new(
370 "https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc",
371 "c1",
372 "bucket",
373 Some("prefix".into()),
374 Some("artifacts".into()),
375 Some("us-east-1".into()),
376 1,
377 0,
378 )
379 .unwrap();
380 assert_eq!(
381 sink.upload_url("artifact.txt"),
382 "https://bucket.s3.amazonaws.com/object.txt?X-Amz-Signature=abc"
383 );
384 assert_eq!(
385 sink.destination_hint("artifact.txt").unwrap(),
386 "s3://bucket/prefix/artifact.txt"
387 );
388 assert_eq!(sink.region.as_deref(), Some("us-east-1"));
390 }
391
392 #[test]
393 fn expands_placeholders_when_present() {
394 let sink = PresignedS3ExportSink::new(
395 "https://bucket.s3.amazonaws.com/{cell_id}/{artifact_name}?X-Amz-Signature=abc",
396 "cell-42",
397 "bucket",
398 Some("prefix".into()),
399 Some("artifacts".into()),
400 None,
401 1,
402 0,
403 )
404 .unwrap();
405 assert_eq!(
406 sink.upload_url("artifact.txt"),
407 "https://bucket.s3.amazonaws.com/cell-42/artifact.txt?X-Amz-Signature=abc"
408 );
409 }
410
411 #[test]
412 fn region_none_when_not_set() {
413 let sink = PresignedS3ExportSink::new(
414 "https://bucket.s3.amazonaws.com/obj",
415 "c1",
416 "bucket",
417 None,
418 None,
419 None,
420 1,
421 0,
422 )
423 .unwrap();
424 assert!(sink.region.is_none());
425 }
426}