1use crate::auth::credentials::AwsCredentials;
4use crate::auth::sigv4;
5use crate::error::AwsError;
6use base64::Engine;
7use cloud_lite_core::rate_limit::{RateLimitConfig, RateLimiter};
8use cloud_lite_core::retry::RetryConfig;
9use md5::{Digest, Md5};
10
11pub struct AwsHttpClient {
15 http: reqwest::Client,
16 credentials: AwsCredentials,
17 retry_config: RetryConfig,
18 rate_limit_config: RateLimitConfig,
19 rate_limiter: RateLimiter,
20 #[cfg(any(test, feature = "test-support"))]
22 pub(crate) base_url: Option<String>,
23 #[cfg(any(test, feature = "test-support"))]
25 pub(crate) mock: Option<std::sync::Arc<crate::mock_client::MockClient>>,
26}
27
28pub struct AwsResponse {
30 data: ResponseData,
31}
32
33enum ResponseData {
34 Real(reqwest::Response),
35 #[cfg(any(test, feature = "test-support"))]
36 Mock(Vec<u8>),
37}
38
39impl AwsResponse {
40 pub fn status(&self) -> u16 {
42 match &self.data {
43 ResponseData::Real(response) => response.status().as_u16(),
44 #[cfg(any(test, feature = "test-support"))]
45 ResponseData::Mock(_) => 200,
46 }
47 }
48
49 pub async fn error_for_status(self, content_type: &str) -> Result<Self, AwsError> {
55 let status = self.status();
56 if status < 400 {
57 return Ok(self);
58 }
59
60 let body_bytes = self
61 .bytes()
62 .await
63 .unwrap_or_else(|_| bytes::Bytes::from_static(b""));
64 let body_text = std::str::from_utf8(&body_bytes).unwrap_or("");
65
66 if content_type.contains("json") {
67 Err(crate::error::parse_json_error(status, body_text))
68 } else {
69 Err(crate::error::parse_xml_error(status, body_text))
70 }
71 }
72
73 pub async fn bytes(self) -> Result<bytes::Bytes, AwsError> {
75 match self.data {
76 ResponseData::Real(response) => response
77 .bytes()
78 .await
79 .map_err(|e| AwsError::Network(e.to_string())),
80 #[cfg(any(test, feature = "test-support"))]
81 ResponseData::Mock(data) => Ok(bytes::Bytes::from(data)),
82 }
83 }
84}
85
86pub struct AwsHttpClientBuilder {
88 credentials: Option<AwsCredentials>,
89 retry_config: RetryConfig,
90 rate_limit: RateLimitConfig,
91}
92
93impl Default for AwsHttpClientBuilder {
94 fn default() -> Self {
95 Self {
96 credentials: None,
97 retry_config: RetryConfig::default(),
98 rate_limit: RateLimitConfig::new(20),
99 }
100 }
101}
102
103impl AwsHttpClientBuilder {
104 pub fn credentials(mut self, credentials: AwsCredentials) -> Self {
106 self.credentials = Some(credentials);
107 self
108 }
109
110 pub fn retry_config(mut self, config: RetryConfig) -> Self {
112 self.retry_config = config;
113 self
114 }
115
116 pub fn rate_limit(mut self, config: RateLimitConfig) -> Self {
118 self.rate_limit = config;
119 self
120 }
121
122 pub fn build(self) -> Result<AwsHttpClient, AwsError> {
124 let credentials = self.credentials.ok_or(AwsError::Auth {
125 message: "Credentials required".into(),
126 })?;
127
128 let http = reqwest::Client::builder()
129 .build()
130 .map_err(|e| AwsError::Network(e.to_string()))?;
131
132 Ok(AwsHttpClient {
133 http,
134 credentials,
135 retry_config: self.retry_config,
136 rate_limit_config: self.rate_limit.clone(),
137 rate_limiter: RateLimiter::new(self.rate_limit),
138 #[cfg(any(test, feature = "test-support"))]
139 base_url: None,
140 #[cfg(any(test, feature = "test-support"))]
141 mock: None,
142 })
143 }
144}
145
146impl AwsHttpClient {
147 pub fn builder() -> AwsHttpClientBuilder {
149 AwsHttpClientBuilder::default()
150 }
151
152 pub fn from_default_chain(region: &str) -> Result<Self, AwsError> {
157 let credentials = AwsCredentials::from_default_chain(region)?;
158 Self::builder().credentials(credentials).build()
159 }
160
161 pub async fn from_default_chain_async(region: &str) -> Result<Self, AwsError> {
166 let credentials = AwsCredentials::from_default_chain_async(region).await?;
167 Self::builder().credentials(credentials).build()
168 }
169
170 #[cfg(any(test, feature = "test-support"))]
172 pub fn from_mock(mock: crate::mock_client::MockClient) -> Self {
173 let credentials =
174 AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
175 Self {
176 http: reqwest::Client::new(),
177 credentials,
178 retry_config: RetryConfig::default(),
179 rate_limit_config: RateLimitConfig::disabled(),
180 rate_limiter: RateLimiter::new(RateLimitConfig::disabled()),
181 base_url: Some("http://mock".into()),
182 mock: Some(std::sync::Arc::new(mock)),
183 }
184 }
185
186 pub fn region(&self) -> &str {
188 &self.credentials.region
189 }
190
191 pub fn with_region(&self, region: &str) -> Result<Self, AwsError> {
197 let mut credentials = self.credentials.clone();
198 credentials.region = region.to_string();
199
200 let http = reqwest::Client::builder()
201 .build()
202 .map_err(|e| AwsError::Network(e.to_string()))?;
203
204 Ok(Self {
205 http,
206 credentials,
207 retry_config: self.retry_config.clone(),
208 rate_limit_config: self.rate_limit_config.clone(),
209 rate_limiter: RateLimiter::new(self.rate_limit_config.clone()),
210 #[cfg(any(test, feature = "test-support"))]
211 base_url: None,
212 #[cfg(any(test, feature = "test-support"))]
213 mock: None,
214 })
215 }
216
217 pub fn accessanalyzer(&self) -> crate::api::AccessAnalyzerClient<'_> {
221 crate::api::AccessAnalyzerClient::new(self)
222 }
223
224 pub fn apigateway(&self) -> crate::api::ApigatewayClient<'_> {
226 crate::api::ApigatewayClient::new(self)
227 }
228
229 pub fn autoscaling(&self) -> crate::api::AutoScalingClient<'_> {
231 crate::api::AutoScalingClient::new(self)
232 }
233
234 pub fn ce(&self) -> crate::api::CeClient<'_> {
236 crate::api::CeClient::new(self)
237 }
238
239 pub fn cloudfront(&self) -> crate::api::CloudfrontClient<'_> {
241 crate::api::CloudfrontClient::new(self)
242 }
243
244 pub fn cloudtrail(&self) -> crate::api::CloudtrailClient<'_> {
246 crate::api::CloudtrailClient::new(self)
247 }
248
249 pub fn cloudwatch(&self) -> crate::api::CloudWatchClient<'_> {
251 crate::api::CloudWatchClient::new(self)
252 }
253
254 pub fn config(&self) -> crate::api::ConfigClient<'_> {
256 crate::api::ConfigClient::new(self)
257 }
258
259 pub fn dynamodb(&self) -> crate::api::DynamodbClient<'_> {
261 crate::api::DynamodbClient::new(self)
262 }
263
264 pub fn ec2(&self) -> crate::api::Ec2Client<'_> {
266 crate::api::Ec2Client::new(self)
267 }
268
269 pub fn ecr(&self) -> crate::api::EcrClient<'_> {
271 crate::api::EcrClient::new(self)
272 }
273
274 pub fn ecs(&self) -> crate::api::EcsClient<'_> {
276 crate::api::EcsClient::new(self)
277 }
278
279 pub fn efs(&self) -> crate::api::EfsClient<'_> {
281 crate::api::EfsClient::new(self)
282 }
283
284 pub fn eks(&self) -> crate::api::EksClient<'_> {
286 crate::api::EksClient::new(self)
287 }
288
289 pub fn elasticache(&self) -> crate::api::ElasticacheClient<'_> {
291 crate::api::ElasticacheClient::new(self)
292 }
293
294 pub fn elbv2(&self) -> crate::api::Elbv2Client<'_> {
296 crate::api::Elbv2Client::new(self)
297 }
298
299 pub fn emr(&self) -> crate::api::EmrClient<'_> {
301 crate::api::EmrClient::new(self)
302 }
303
304 pub fn iam(&self) -> crate::api::IamClient<'_> {
306 crate::api::IamClient::new(self)
307 }
308
309 pub fn kinesis(&self) -> crate::api::KinesisClient<'_> {
311 crate::api::KinesisClient::new(self)
312 }
313
314 pub fn kms(&self) -> crate::api::KmsClient<'_> {
316 crate::api::KmsClient::new(self)
317 }
318
319 pub fn lambda(&self) -> crate::api::LambdaClient<'_> {
321 crate::api::LambdaClient::new(self)
322 }
323
324 pub fn logs(&self) -> crate::api::LogsClient<'_> {
326 crate::api::LogsClient::new(self)
327 }
328
329 pub fn opensearch(&self) -> crate::api::OpensearchClient<'_> {
331 crate::api::OpensearchClient::new(self)
332 }
333
334 pub fn organizations(&self) -> crate::api::OrganizationsClient<'_> {
336 crate::api::OrganizationsClient::new(self)
337 }
338
339 pub fn rds(&self) -> crate::api::RdsClient<'_> {
341 crate::api::RdsClient::new(self)
342 }
343
344 pub fn redshift(&self) -> crate::api::RedshiftClient<'_> {
346 crate::api::RedshiftClient::new(self)
347 }
348
349 pub fn route53(&self) -> crate::api::Route53Client<'_> {
351 crate::api::Route53Client::new(self)
352 }
353
354 pub fn s3(&self) -> crate::api::S3Client<'_> {
356 crate::api::S3Client::new(self)
357 }
358
359 pub fn sagemaker(&self) -> crate::api::SagemakerClient<'_> {
361 crate::api::SagemakerClient::new(self)
362 }
363
364 pub fn secretsmanager(&self) -> crate::api::SecretsmanagerClient<'_> {
366 crate::api::SecretsmanagerClient::new(self)
367 }
368
369 pub fn securityhub(&self) -> crate::api::SecurityHubClient<'_> {
371 crate::api::SecurityHubClient::new(self)
372 }
373
374 pub fn sts(&self) -> crate::api::StsClient<'_> {
376 crate::api::StsClient::new(self)
377 }
378 pub async fn get(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
382 #[cfg(any(test, feature = "test-support"))]
383 if let Some(ref mock) = self.mock {
384 let result = mock.execute("GET", url, None).await?;
385 return Ok(AwsResponse {
386 data: ResponseData::Mock(result),
387 });
388 }
389
390 let response = self.signed_request("GET", url, service, b"", &[]).await?;
391 Ok(AwsResponse {
392 data: ResponseData::Real(response),
393 })
394 }
395
396 pub async fn get_json(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
401 #[cfg(any(test, feature = "test-support"))]
402 if let Some(ref mock) = self.mock {
403 let result = mock.execute("GET", url, None).await?;
404 return Ok(AwsResponse {
405 data: ResponseData::Mock(result),
406 });
407 }
408
409 let response = self
410 .signed_request("GET", url, service, b"", &[("accept", "application/json")])
411 .await?;
412 Ok(AwsResponse {
413 data: ResponseData::Real(response),
414 })
415 }
416
417 pub async fn post(
419 &self,
420 url: &str,
421 service: &str,
422 body: &[u8],
423 content_type: &str,
424 ) -> Result<AwsResponse, AwsError> {
425 #[cfg(any(test, feature = "test-support"))]
426 if let Some(ref mock) = self.mock {
427 let result = mock.execute("POST", url, None).await?;
428 return Ok(AwsResponse {
429 data: ResponseData::Mock(result),
430 });
431 }
432
433 let response = self
434 .signed_request(
435 "POST",
436 url,
437 service,
438 body,
439 &[("content-type", content_type)],
440 )
441 .await?;
442 Ok(AwsResponse {
443 data: ResponseData::Real(response),
444 })
445 }
446
447 pub async fn post_json(
453 &self,
454 url: &str,
455 service: &str,
456 target: &str,
457 json_version: &str,
458 body: &[u8],
459 ) -> Result<AwsResponse, AwsError> {
460 #[cfg(any(test, feature = "test-support"))]
461 if let Some(ref mock) = self.mock {
462 let result = mock.execute("POST", url, None).await?;
463 return Ok(AwsResponse {
464 data: ResponseData::Mock(result),
465 });
466 }
467
468 let content_type = format!("application/x-amz-json-{json_version}");
469 let extra = [
470 ("content-type", content_type.as_str()),
471 ("x-amz-target", target),
472 ];
473 let response = self
474 .signed_request("POST", url, service, body, &extra)
475 .await?;
476 Ok(AwsResponse {
477 data: ResponseData::Real(response),
478 })
479 }
480
481 pub async fn put(
483 &self,
484 url: &str,
485 service: &str,
486 body: &[u8],
487 content_type: &str,
488 ) -> Result<AwsResponse, AwsError> {
489 #[cfg(any(test, feature = "test-support"))]
490 if let Some(ref mock) = self.mock {
491 let result = mock.execute("PUT", url, None).await?;
492 return Ok(AwsResponse {
493 data: ResponseData::Mock(result),
494 });
495 }
496
497 let md5_b64;
499 let extra: Vec<(&str, &str)> = if body.is_empty() {
500 vec![("content-type", content_type)]
501 } else {
502 let digest = Md5::digest(body);
503 md5_b64 = base64::engine::general_purpose::STANDARD.encode(digest);
504 vec![("content-type", content_type), ("content-md5", &md5_b64)]
505 };
506 let response = self
507 .signed_request("PUT", url, service, body, &extra)
508 .await?;
509 Ok(AwsResponse {
510 data: ResponseData::Real(response),
511 })
512 }
513
514 pub async fn delete(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
516 #[cfg(any(test, feature = "test-support"))]
517 if let Some(ref mock) = self.mock {
518 let result = mock.execute("DELETE", url, None).await?;
519 return Ok(AwsResponse {
520 data: ResponseData::Mock(result),
521 });
522 }
523
524 let response = self
525 .signed_request("DELETE", url, service, b"", &[])
526 .await?;
527 Ok(AwsResponse {
528 data: ResponseData::Real(response),
529 })
530 }
531
532 pub async fn patch(
534 &self,
535 url: &str,
536 service: &str,
537 body: &[u8],
538 content_type: &str,
539 ) -> Result<AwsResponse, AwsError> {
540 #[cfg(any(test, feature = "test-support"))]
541 if let Some(ref mock) = self.mock {
542 let result = mock.execute("PATCH", url, None).await?;
543 return Ok(AwsResponse {
544 data: ResponseData::Mock(result),
545 });
546 }
547
548 let response = self
549 .signed_request(
550 "PATCH",
551 url,
552 service,
553 body,
554 &[("content-type", content_type)],
555 )
556 .await?;
557 Ok(AwsResponse {
558 data: ResponseData::Real(response),
559 })
560 }
561
562 async fn signed_request(
564 &self,
565 method: &str,
566 url: &str,
567 service: &str,
568 body: &[u8],
569 extra_headers: &[(&str, &str)],
570 ) -> Result<reqwest::Response, AwsError> {
571 let _permit = self.rate_limiter.acquire(url).await;
572
573 let mut attempt = 0u32;
574 let mut backoff = self.retry_config.initial_backoff;
575 let body_bytes = if body.is_empty() {
576 None
577 } else {
578 Some(bytes::Bytes::copy_from_slice(body))
579 };
580
581 loop {
582 let now = chrono::Utc::now();
583 let signed = sigv4::sign_request(
584 method,
585 url,
586 extra_headers,
587 body,
588 service,
589 &self.credentials,
590 &now,
591 );
592
593 let mut request = self
594 .http
595 .request(method.parse().expect("invalid HTTP method"), url);
596 if let Some(ref b) = body_bytes {
597 request = request.body(b.clone());
598 }
599 for &(key, value) in extra_headers {
600 request = request.header(key, value);
601 }
602 request = request.header("Authorization", &signed.authorization);
603 for (key, value) in &signed.extra_headers {
604 request = request.header(key, value);
605 }
606
607 let result = match request.send().await {
608 Ok(response) => Self::classify_response(response).await,
609 Err(e) => Err(AwsError::from(e)),
610 };
611
612 match result {
613 Ok(response) => return Ok(response),
614 Err(aws_err) => {
615 if aws_err.is_retryable() && attempt < self.retry_config.max_retries {
616 let delay = self
617 .retry_config
618 .compute_backoff(backoff, aws_err.retry_after());
619 tokio::time::sleep(delay).await;
620 backoff = std::time::Duration::from_secs_f64(
621 backoff.as_secs_f64() * self.retry_config.backoff_multiplier,
622 );
623 attempt += 1;
624 continue;
625 }
626 return Err(aws_err);
627 }
628 }
629 }
630 }
631
632 async fn classify_response(response: reqwest::Response) -> Result<reqwest::Response, AwsError> {
637 let status = response.status().as_u16();
638 if status < 400 {
639 return Ok(response);
640 }
641
642 let retry_after_secs: Option<u64> = response
644 .headers()
645 .get("retry-after")
646 .and_then(|v| v.to_str().ok())
647 .and_then(|s| s.parse().ok());
648
649 let content_type = response
651 .headers()
652 .get("content-type")
653 .and_then(|v| v.to_str().ok())
654 .unwrap_or("")
655 .to_string();
656
657 let body_text = response.text().await.unwrap_or_default();
658
659 let mut err = if content_type.contains("json") {
660 crate::error::parse_json_error(status, &body_text)
661 } else {
662 crate::error::parse_xml_error(status, &body_text)
663 };
664
665 if let Some(secs) = retry_after_secs
667 && let AwsError::Throttled { retry_after, .. } = &mut err
668 {
669 *retry_after = Some(std::time::Duration::from_secs(secs));
670 }
671
672 Err(err)
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679
680 #[test]
681 fn builder_requires_credentials() {
682 let result = AwsHttpClient::builder().build();
683 match result {
684 Err(AwsError::Auth { .. }) => {} Err(other) => panic!("expected Auth error, got: {other}"),
686 Ok(_) => panic!("expected error, got Ok"),
687 }
688 }
689
690 #[test]
691 fn builder_succeeds_with_credentials() {
692 let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
693 let client = AwsHttpClient::builder().credentials(creds).build();
694 assert!(client.is_ok());
695 assert_eq!(client.unwrap().region(), "us-east-1");
696 }
697
698 #[test]
699 fn builder_accepts_custom_retry_config() {
700 let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
701 let client = AwsHttpClient::builder()
702 .credentials(creds)
703 .retry_config(RetryConfig {
704 max_retries: 5,
705 ..RetryConfig::default()
706 })
707 .build()
708 .unwrap();
709 assert_eq!(client.retry_config.max_retries, 5);
710 }
711
712 #[test]
713 fn builder_accepts_custom_rate_limit() {
714 let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
715 let client = AwsHttpClient::builder()
716 .credentials(creds)
717 .rate_limit(RateLimitConfig::disabled())
718 .build();
719 assert!(client.is_ok());
720 }
721}