1use std::path::{Path, PathBuf};
7
8use super::error::{AWS_CREDENTIALS_REFRESH_HELP, BundleUploadError, BundleUploadResult};
9use super::types::{UploadOptions, UploadedBundle};
10use super::uploader::BundleUploader;
11
12#[derive(Debug, Clone)]
13pub struct S3Target {
14 pub bucket: String,
15 pub key_prefix: String,
16}
17
18impl S3Target {
19 pub fn parse(url: &str) -> BundleUploadResult<Self> {
20 let parsed =
21 url::Url::parse(url).map_err(|_| BundleUploadError::InvalidUrl(url.to_string()))?;
22 if parsed.scheme() != "s3" {
23 return Err(BundleUploadError::InvalidUrl(url.to_string()));
24 }
25 let bucket = parsed
26 .host_str()
27 .ok_or_else(|| BundleUploadError::InvalidUrl(url.to_string()))?
28 .to_string();
29 if bucket.is_empty() {
30 return Err(BundleUploadError::InvalidUrl(url.to_string()));
31 }
32 let key_prefix = parsed.path().trim_start_matches('/').to_string();
34 Ok(Self { bucket, key_prefix })
35 }
36
37 pub fn compose_key(&self, filename: &str) -> String {
39 if self.key_prefix.is_empty() {
40 filename.to_string()
41 } else if self.key_prefix.ends_with('/') {
42 format!("{}{}", self.key_prefix, filename)
43 } else {
44 format!("{}/{}", self.key_prefix, filename)
45 }
46 }
47}
48
49#[derive(Debug)]
50pub struct S3Uploader {
51 target: S3Target,
52}
53
54impl S3Uploader {
55 pub fn from_url(url: &str) -> BundleUploadResult<Self> {
56 Ok(Self {
57 target: S3Target::parse(url)?,
58 })
59 }
60
61 async fn s3_client(&self) -> BundleUploadResult<aws_sdk_s3::Client> {
62 let mut loader = aws_config::defaults(aws_config::BehaviorVersion::latest());
63 if !env_has_explicit_aws_access_key()
64 && let Some(credentials) = load_shared_file_static_credentials()
65 {
66 loader = loader.credentials_provider(credentials);
67 }
68 let config = loader.load().await;
69 if config.credentials_provider().is_none() {
70 return Err(BundleUploadError::CredentialsUnresolved);
71 }
72 Ok(aws_sdk_s3::Client::new(&config))
73 }
74
75 fn region_or_error(client: &aws_sdk_s3::Client) -> BundleUploadResult<String> {
76 client
77 .config()
78 .region()
79 .map(|r| r.to_string())
80 .ok_or_else(|| {
81 BundleUploadError::Other(
82 "AWS region not configured; set AWS_REGION or ~/.aws/config region".to_string(),
83 )
84 })
85 }
86
87 fn aws_operation_error(action: &str, detail: impl std::fmt::Debug) -> BundleUploadError {
88 let detail = format!("{detail:?}");
89 if is_aws_credentials_refresh_required(&detail) {
90 BundleUploadError::AwsCredentialsRefreshRequired {
91 action: action.to_string(),
92 help: &AWS_CREDENTIALS_REFRESH_HELP,
93 }
94 } else {
95 BundleUploadError::Other(format!("{action}: {detail}"))
96 }
97 }
98
99 async fn ensure_bucket(&self, client: &aws_sdk_s3::Client) -> BundleUploadResult<()> {
102 use aws_sdk_s3::operation::head_bucket::HeadBucketError;
103 use aws_sdk_s3::types::*;
104
105 let bucket = &self.target.bucket;
106 let head = client.head_bucket().bucket(bucket).send().await;
107 let must_create = match head {
108 Ok(_) => false,
109 Err(sdk_err) => {
110 let status = sdk_err
111 .raw_response()
112 .map(|response| response.status().as_u16());
113 let bucket_region = sdk_err
114 .raw_response()
115 .and_then(|response| response.headers().get("x-amz-bucket-region"))
116 .map(str::to_string);
117 match sdk_err.into_service_error() {
118 HeadBucketError::NotFound(_) => true,
119 other => {
120 return Err(Self::head_bucket_error(
121 bucket,
122 status,
123 bucket_region.as_deref(),
124 Self::region_or_error(client).ok().as_deref(),
125 other,
126 ));
127 }
128 }
129 }
130 };
131
132 if must_create {
133 let region = Self::region_or_error(client)?;
134 let mut create = client.create_bucket().bucket(bucket);
135 if region != "us-east-1" {
137 let constraint = BucketLocationConstraint::from(region.as_str());
138 let cfg = CreateBucketConfiguration::builder()
139 .location_constraint(constraint)
140 .build();
141 create = create.create_bucket_configuration(cfg);
142 }
143 create.send().await.map_err(|err| {
144 let svc = err.into_service_error();
145 if let aws_sdk_s3::operation::create_bucket::CreateBucketError::BucketAlreadyExists(_) = svc {
146 BundleUploadError::BucketAlreadyExistsInOtherAccount(bucket.clone())
147 } else {
148 Self::aws_operation_error(&format!("creating S3 bucket {bucket}"), svc)
149 }
150 })?;
151 }
152
153 client
155 .put_public_access_block()
156 .bucket(bucket)
157 .public_access_block_configuration(
158 PublicAccessBlockConfiguration::builder()
159 .block_public_acls(true)
160 .ignore_public_acls(true)
161 .block_public_policy(true)
162 .restrict_public_buckets(true)
163 .build(),
164 )
165 .send()
166 .await
167 .map_err(|err| {
168 Self::aws_operation_error(
169 "configuring S3 bucket public access block",
170 err.into_service_error(),
171 )
172 })?;
173
174 client
176 .put_bucket_versioning()
177 .bucket(bucket)
178 .versioning_configuration(
179 VersioningConfiguration::builder()
180 .status(BucketVersioningStatus::Enabled)
181 .build(),
182 )
183 .send()
184 .await
185 .map_err(|err| {
186 Self::aws_operation_error(
187 "configuring S3 bucket versioning",
188 err.into_service_error(),
189 )
190 })?;
191
192 client
194 .put_bucket_encryption()
195 .bucket(bucket)
196 .server_side_encryption_configuration(
197 ServerSideEncryptionConfiguration::builder()
198 .rules(
199 ServerSideEncryptionRule::builder()
200 .apply_server_side_encryption_by_default(
201 ServerSideEncryptionByDefault::builder()
202 .sse_algorithm(ServerSideEncryption::Aes256)
203 .build()
204 .map_err(|e| {
205 BundleUploadError::Other(format!("SSE config: {e}"))
206 })?,
207 )
208 .build(),
209 )
210 .build()
211 .map_err(|e| BundleUploadError::Other(format!("encryption config: {e}")))?,
212 )
213 .send()
214 .await
215 .map_err(|err| {
216 Self::aws_operation_error(
217 "configuring S3 bucket encryption",
218 err.into_service_error(),
219 )
220 })?;
221
222 Ok(())
223 }
224
225 async fn presign_get(
226 &self,
227 client: &aws_sdk_s3::Client,
228 key: &str,
229 digest: &str,
230 object_ref: &str,
231 opts: &UploadOptions,
232 ) -> BundleUploadResult<UploadedBundle> {
233 use aws_sdk_s3::presigning::PresigningConfig;
234 use std::time::Duration;
235
236 let expires_secs = opts.clamped_for_s3();
237 let presigning = PresigningConfig::expires_in(Duration::from_secs(expires_secs))
238 .map_err(|e| BundleUploadError::Other(format!("presigning config: {e}")))?;
239 let presigned = client
240 .get_object()
241 .bucket(&self.target.bucket)
242 .key(key)
243 .presigned(presigning)
244 .await
245 .map_err(|err| {
246 Self::aws_operation_error(
247 "creating presigned S3 download URL",
248 err.into_service_error(),
249 )
250 })?;
251
252 let url = presigned.uri().to_string();
253 let expires_at = chrono::Utc::now() + chrono::Duration::seconds(expires_secs as i64);
254
255 Ok(UploadedBundle {
256 url,
257 digest: digest.to_string(),
258 expires_at: Some(expires_at),
259 object_ref: object_ref.to_string(),
260 })
261 }
262
263 fn head_bucket_error(
264 bucket: &str,
265 status: Option<u16>,
266 bucket_region: Option<&str>,
267 configured_region: Option<&str>,
268 detail: impl std::fmt::Debug,
269 ) -> BundleUploadError {
270 match status {
271 Some(301) | Some(400) if bucket_region.is_some() => {
272 let bucket_region = bucket_region.unwrap();
273 let configured_region = configured_region.unwrap_or("<not configured>");
274 BundleUploadError::Other(format!(
275 "S3 bucket {bucket} is in region {bucket_region}, but the deployer is using region {configured_region}; set AWS_REGION={bucket_region} and retry"
276 ))
277 }
278 Some(403) => BundleUploadError::AccessDenied {
279 action: "checking S3 bucket".to_string(),
280 resource: format!("s3://{bucket}"),
281 required_perms: "s3:ListBucket on the bucket, or use a bucket owned by the configured AWS account".to_string(),
282 },
283 _ => Self::aws_operation_error(&format!("checking S3 bucket {bucket}"), detail),
284 }
285 }
286}
287
288#[async_trait::async_trait]
289impl BundleUploader for S3Uploader {
290 async fn upload(
291 &self,
292 bundle_path: &Path,
293 opts: &UploadOptions,
294 ) -> BundleUploadResult<UploadedBundle> {
295 let client = self.s3_client().await?;
296 self.ensure_bucket(&client).await?;
297
298 let (full_digest, short_digest) = digest_file(bundle_path).await?;
299 let key = self.target.compose_key(&format!("{short_digest}.gtbundle"));
300
301 let head = client
303 .head_object()
304 .bucket(&self.target.bucket)
305 .key(&key)
306 .send()
307 .await;
308
309 let must_upload = match head {
310 Ok(out) => {
311 let existing = out
312 .metadata()
313 .and_then(|m| m.get("greentic-bundle-digest"))
314 .map(|s| s.as_str());
315 existing != Some(full_digest.as_str())
316 }
317 Err(sdk_err) => match sdk_err.into_service_error() {
318 aws_sdk_s3::operation::head_object::HeadObjectError::NotFound(_) => true,
319 other => {
320 return Err(Self::aws_operation_error(
321 &format!("checking S3 object {key}"),
322 other,
323 ));
324 }
325 },
326 };
327
328 if must_upload {
329 let body = aws_sdk_s3::primitives::ByteStream::from_path(bundle_path)
330 .await
331 .map_err(|e| BundleUploadError::Other(format!("read bundle: {e}")))?;
332 client
333 .put_object()
334 .bucket(&self.target.bucket)
335 .key(&key)
336 .body(body)
337 .metadata("greentic-bundle-digest", &full_digest)
338 .content_type("application/octet-stream")
339 .send()
340 .await
341 .map_err(|err| {
342 Self::aws_operation_error("uploading bundle to S3", err.into_service_error())
343 })?;
344 }
345
346 let object_ref = format!("s3://{}/{}", self.target.bucket, key);
347 let uploaded = self
348 .presign_get(&client, &key, &full_digest, &object_ref, opts)
349 .await?;
350 Ok(uploaded)
351 }
352
353 async fn refresh_url(
354 &self,
355 object_ref: &str,
356 opts: &UploadOptions,
357 ) -> BundleUploadResult<UploadedBundle> {
358 let parsed = url::Url::parse(object_ref)
360 .map_err(|_| BundleUploadError::InvalidUrl(object_ref.to_string()))?;
361 if parsed.scheme() != "s3" {
362 return Err(BundleUploadError::InvalidUrl(object_ref.to_string()));
363 }
364 let bucket = parsed
365 .host_str()
366 .ok_or_else(|| BundleUploadError::InvalidUrl(object_ref.to_string()))?;
367 let key = parsed.path().trim_start_matches('/');
368 if bucket != self.target.bucket {
369 return Err(BundleUploadError::Other(format!(
370 "object_ref bucket {bucket} does not match uploader bucket {}",
371 self.target.bucket
372 )));
373 }
374
375 let client = self.s3_client().await?;
376 let head = client
378 .head_object()
379 .bucket(bucket)
380 .key(key)
381 .send()
382 .await
383 .map_err(|err| {
384 let svc = err.into_service_error();
385 if let aws_sdk_s3::operation::head_object::HeadObjectError::NotFound(_) = svc {
386 BundleUploadError::ObjectMissing(object_ref.to_string())
387 } else {
388 Self::aws_operation_error(&format!("checking S3 object {key}"), svc)
389 }
390 })?;
391 let digest = head
392 .metadata()
393 .and_then(|m| m.get("greentic-bundle-digest"))
394 .cloned()
395 .unwrap_or_else(|| "sha256:unknown".to_string());
396
397 self.presign_get(&client, key, &digest, object_ref, opts)
398 .await
399 }
400}
401
402fn is_aws_credentials_refresh_required(detail: &str) -> bool {
403 detail.contains("TokenExpired")
404 || detail.contains("ExpiredToken")
405 || detail.contains("The refresh token has expired")
406 || detail.contains("Your session has expired")
407 || detail.contains("security token included in the request is expired")
408}
409
410fn env_has_explicit_aws_access_key() -> bool {
411 std::env::var_os("AWS_ACCESS_KEY_ID").is_some()
412}
413
414fn load_shared_file_static_credentials() -> Option<aws_sdk_s3::config::Credentials> {
415 let profile = selected_aws_profile();
416 let path = shared_credentials_path()?;
417 let contents = std::fs::read_to_string(path).ok()?;
418 let credentials = parse_shared_credentials_profile(&contents, &profile)?;
419 Some(aws_sdk_s3::config::Credentials::new(
420 credentials.access_key_id,
421 credentials.secret_access_key,
422 credentials.session_token,
423 None,
424 "shared-credentials-file",
425 ))
426}
427
428fn selected_aws_profile() -> String {
429 std::env::var("AWS_PROFILE")
430 .ok()
431 .filter(|value| !value.trim().is_empty())
432 .or_else(|| {
433 std::env::var("AWS_DEFAULT_PROFILE")
434 .ok()
435 .filter(|value| !value.trim().is_empty())
436 })
437 .unwrap_or_else(|| "default".to_string())
438}
439
440fn shared_credentials_path() -> Option<PathBuf> {
441 if let Some(path) = std::env::var_os("AWS_SHARED_CREDENTIALS_FILE")
442 && !path.is_empty()
443 {
444 return Some(PathBuf::from(path));
445 }
446 std::env::var_os("HOME")
447 .filter(|home| !home.is_empty())
448 .map(PathBuf::from)
449 .or_else(|| {
450 std::env::var_os("USERPROFILE")
451 .filter(|home| !home.is_empty())
452 .map(PathBuf::from)
453 })
454 .map(|home| home.join(".aws").join("credentials"))
455}
456
457#[derive(Debug, PartialEq, Eq)]
458struct SharedFileCredentials {
459 access_key_id: String,
460 secret_access_key: String,
461 session_token: Option<String>,
462}
463
464fn parse_shared_credentials_profile(
465 contents: &str,
466 profile: &str,
467) -> Option<SharedFileCredentials> {
468 let mut in_selected_profile = false;
469 let mut access_key_id = None;
470 let mut secret_access_key = None;
471 let mut session_token = None;
472
473 for raw_line in contents.lines() {
474 let line = raw_line.trim();
475 if line.is_empty() || line.starts_with('#') || line.starts_with(';') {
476 continue;
477 }
478 if let Some(section) = line
479 .strip_prefix('[')
480 .and_then(|value| value.strip_suffix(']'))
481 {
482 let section = section
483 .trim()
484 .strip_prefix("profile ")
485 .unwrap_or_else(|| section.trim())
486 .trim();
487 in_selected_profile = section == profile;
488 continue;
489 }
490 if !in_selected_profile {
491 continue;
492 }
493 let Some((key, value)) = line.split_once('=') else {
494 continue;
495 };
496 let key = key.trim();
497 let value = value.trim().trim_matches('"').to_string();
498 match key {
499 "aws_access_key_id" if !value.is_empty() => access_key_id = Some(value),
500 "aws_secret_access_key" if !value.is_empty() => secret_access_key = Some(value),
501 "aws_session_token" if !value.is_empty() => session_token = Some(value),
502 _ => {}
503 }
504 }
505
506 Some(SharedFileCredentials {
507 access_key_id: access_key_id?,
508 secret_access_key: secret_access_key?,
509 session_token,
510 })
511}
512
513pub(crate) async fn digest_file(path: &Path) -> BundleUploadResult<(String, String)> {
516 use sha2::{Digest, Sha256};
517 use tokio::io::AsyncReadExt;
518
519 let mut file = tokio::fs::File::open(path).await?;
520 let mut hasher = Sha256::new();
521 let mut buf = vec![0u8; 64 * 1024];
522 loop {
523 let n = file.read(&mut buf).await?;
524 if n == 0 {
525 break;
526 }
527 hasher.update(&buf[..n]);
528 }
529 let full_hex = hex::encode(hasher.finalize());
530 let short_hex = full_hex[..16].to_string();
531 Ok((format!("sha256:{full_hex}"), short_hex))
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537
538 #[test]
539 fn parses_bucket_and_empty_prefix() {
540 let target = S3Target::parse("s3://my-bucket/").unwrap();
541 assert_eq!(target.bucket, "my-bucket");
542 assert_eq!(target.key_prefix, "");
543 }
544
545 #[test]
546 fn parses_bucket_and_simple_prefix() {
547 let target = S3Target::parse("s3://my-bucket/bundles/").unwrap();
548 assert_eq!(target.bucket, "my-bucket");
549 assert_eq!(target.key_prefix, "bundles/");
550 }
551
552 #[test]
553 fn parses_bucket_and_nested_prefix_no_trailing_slash() {
554 let target = S3Target::parse("s3://my-bucket/path/to/bundles").unwrap();
555 assert_eq!(target.bucket, "my-bucket");
556 assert_eq!(target.key_prefix, "path/to/bundles");
557 }
558
559 #[test]
560 fn rejects_non_s3_scheme() {
561 assert!(S3Target::parse("https://my-bucket/").is_err());
562 }
563
564 #[test]
565 fn rejects_empty_bucket() {
566 assert!(S3Target::parse("s3:///key").is_err());
568 }
569
570 #[test]
571 fn compose_key_with_trailing_slash_prefix() {
572 let target = S3Target {
573 bucket: "b".into(),
574 key_prefix: "bundles/".into(),
575 };
576 assert_eq!(target.compose_key("abc.gtbundle"), "bundles/abc.gtbundle");
577 }
578
579 #[test]
580 fn compose_key_without_trailing_slash_prefix() {
581 let target = S3Target {
582 bucket: "b".into(),
583 key_prefix: "bundles".into(),
584 };
585 assert_eq!(target.compose_key("abc.gtbundle"), "bundles/abc.gtbundle");
586 }
587
588 #[test]
589 fn compose_key_empty_prefix() {
590 let target = S3Target {
591 bucket: "b".into(),
592 key_prefix: "".into(),
593 };
594 assert_eq!(target.compose_key("abc.gtbundle"), "abc.gtbundle");
595 }
596
597 #[test]
598 fn expired_aws_credentials_map_to_actionable_error() {
599 let err = S3Uploader::aws_operation_error(
600 "checking S3 bucket demo",
601 "ProviderError RefreshFailed AccessDeniedException TokenExpired The refresh token has expired",
602 );
603 match err {
604 BundleUploadError::AwsCredentialsRefreshRequired { action, help } => {
605 assert_eq!(action, "checking S3 bucket demo");
606 assert_eq!(help.configure_command, "aws configure");
607 assert_eq!(
608 help.session_token_check_command,
609 "aws configure get aws_session_token"
610 );
611 assert_eq!(
612 help.session_token_unset_command,
613 "unset AWS_SESSION_TOKEN AWS_SECURITY_TOKEN"
614 );
615 assert_eq!(help.sso_login_command, "aws sso login");
616 assert_eq!(help.profile_env_command, "export AWS_PROFILE=<profile>");
617 assert_eq!(
618 help.profile_configure_command,
619 "aws configure --profile <profile>"
620 );
621 assert_eq!(
622 help.profile_sso_login_command,
623 "aws sso login --profile <profile>"
624 );
625 assert_eq!(help.verify_command, "aws sts get-caller-identity");
626 let rendered =
627 BundleUploadError::AwsCredentialsRefreshRequired { action, help }.to_string();
628 assert!(rendered.contains("AWS credentials need to be refreshed"));
629 assert!(rendered.contains("aws configure"));
630 assert!(rendered.contains("aws configure get aws_session_token"));
631 assert!(rendered.contains("unset AWS_SESSION_TOKEN AWS_SECURITY_TOKEN"));
632 assert!(rendered.contains("aws sso login"));
633 assert!(rendered.contains("aws sts get-caller-identity"));
634 }
635 other => panic!("expected credentials refresh error, got {other:?}"),
636 }
637 }
638
639 #[test]
640 fn bundle_upload_errors_expose_message_keys() {
641 let err = S3Uploader::aws_operation_error(
642 "checking S3 bucket demo",
643 "Your session has expired. Please reauthenticate.",
644 );
645 assert_eq!(
646 err.message_key(),
647 "bundle_upload.aws.credentials_refresh_required"
648 );
649 }
650
651 #[test]
652 fn generic_refresh_failure_stays_unclassified() {
653 let err = S3Uploader::aws_operation_error(
654 "checking S3 bucket demo",
655 "ProviderError RefreshFailed without expiration details",
656 );
657 assert!(matches!(err, BundleUploadError::Other(_)));
658 }
659
660 #[test]
661 fn head_bucket_forbidden_maps_to_access_denied() {
662 let err =
663 S3Uploader::head_bucket_error("demo", Some(403), None, Some("eu-north-1"), "no body");
664 match err {
665 BundleUploadError::AccessDenied {
666 action,
667 resource,
668 required_perms,
669 } => {
670 assert_eq!(action, "checking S3 bucket");
671 assert_eq!(resource, "s3://demo");
672 assert!(required_perms.contains("s3:ListBucket"));
673 }
674 other => panic!("expected access denied, got {other:?}"),
675 }
676 }
677
678 #[test]
679 fn head_bucket_region_mismatch_includes_bucket_region() {
680 let err = S3Uploader::head_bucket_error(
681 "demo",
682 Some(301),
683 Some("us-east-1"),
684 Some("eu-north-1"),
685 "no body",
686 );
687 let rendered = err.to_string();
688 assert!(rendered.contains("us-east-1"));
689 assert!(rendered.contains("eu-north-1"));
690 assert!(rendered.contains("AWS_REGION=us-east-1"));
691 }
692
693 #[test]
694 fn parses_default_shared_file_static_credentials() {
695 let credentials = parse_shared_credentials_profile(
696 r#"
697[default]
698aws_access_key_id = AKIADEFAULT
699aws_secret_access_key = default-secret
700
701[prod]
702aws_access_key_id = AKIAPROD
703aws_secret_access_key = prod-secret
704"#,
705 "default",
706 )
707 .unwrap();
708 assert_eq!(
709 credentials,
710 SharedFileCredentials {
711 access_key_id: "AKIADEFAULT".to_string(),
712 secret_access_key: "default-secret".to_string(),
713 session_token: None,
714 }
715 );
716 }
717
718 #[test]
719 fn parses_named_shared_file_static_credentials_with_session_token() {
720 let credentials = parse_shared_credentials_profile(
721 r#"
722[profile dev]
723aws_access_key_id = AKIADEV
724aws_secret_access_key = dev-secret
725aws_session_token = dev-token
726"#,
727 "dev",
728 )
729 .unwrap();
730 assert_eq!(
731 credentials,
732 SharedFileCredentials {
733 access_key_id: "AKIADEV".to_string(),
734 secret_access_key: "dev-secret".to_string(),
735 session_token: Some("dev-token".to_string()),
736 }
737 );
738 }
739
740 #[test]
741 fn ignores_incomplete_shared_file_credentials() {
742 assert!(
743 parse_shared_credentials_profile("[default]\naws_access_key_id = AKIA\n", "default")
744 .is_none()
745 );
746 }
747
748 #[tokio::test]
749 async fn digest_known_fixture() {
750 use std::io::Write;
751 let mut tmp = tempfile::NamedTempFile::new().unwrap();
752 tmp.write_all(b"hello world").unwrap();
753 tmp.flush().unwrap();
754 let (full, short) = digest_file(tmp.path()).await.unwrap();
755 assert_eq!(
757 full,
758 "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
759 );
760 assert_eq!(short, "b94d27b9934d3e08");
761 }
762}