Skip to main content

aws_lite_rs/
client.rs

1//! Core HTTP client for AWS API access.
2
3use crate::auth::credentials::AwsCredentials;
4use crate::auth::sigv4;
5use crate::error::AwsError;
6use base64::Engine;
7use cloud_lite_core::rate_limit::{RateLimitConfig, RateLimiter};
8use cloud_lite_core::retry::RetryConfig;
9use md5::{Digest, Md5};
10
11/// HTTP client for AWS API operations.
12///
13/// Provides automatic SigV4 signing, retry, and rate limiting.
14pub struct AwsHttpClient {
15    http: reqwest::Client,
16    credentials: AwsCredentials,
17    retry_config: RetryConfig,
18    rate_limit_config: RateLimitConfig,
19    rate_limiter: RateLimiter,
20    /// Override base URL for testing.
21    #[cfg(any(test, feature = "test-support"))]
22    pub(crate) base_url: Option<String>,
23    /// Mock client for testing.
24    #[cfg(any(test, feature = "test-support"))]
25    pub(crate) mock: Option<std::sync::Arc<crate::mock_client::MockClient>>,
26}
27
28/// Response wrapper that abstracts over real and mock responses.
29pub struct AwsResponse {
30    data: ResponseData,
31}
32
33enum ResponseData {
34    Real(reqwest::Response),
35    #[cfg(any(test, feature = "test-support"))]
36    Mock(Vec<u8>),
37}
38
39impl AwsResponse {
40    /// Get the HTTP status code of the response.
41    pub fn status(&self) -> u16 {
42        match &self.data {
43            ResponseData::Real(response) => response.status().as_u16(),
44            #[cfg(any(test, feature = "test-support"))]
45            ResponseData::Mock(_) => 200,
46        }
47    }
48
49    /// Check for HTTP error status and parse the error body.
50    ///
51    /// For 4xx/5xx responses, reads the body and parses the AWS error format
52    /// (XML or JSON) into a typed `AwsError`. For 2xx responses, returns self
53    /// so the caller can proceed with body parsing.
54    pub async fn error_for_status(self, content_type: &str) -> Result<Self, AwsError> {
55        let status = self.status();
56        if status < 400 {
57            return Ok(self);
58        }
59
60        let body_bytes = self
61            .bytes()
62            .await
63            .unwrap_or_else(|_| bytes::Bytes::from_static(b""));
64        let body_text = std::str::from_utf8(&body_bytes).unwrap_or("");
65
66        if content_type.contains("json") {
67            Err(crate::error::parse_json_error(status, body_text))
68        } else {
69            Err(crate::error::parse_xml_error(status, body_text))
70        }
71    }
72
73    /// Read the response body as bytes.
74    pub async fn bytes(self) -> Result<bytes::Bytes, AwsError> {
75        match self.data {
76            ResponseData::Real(response) => response
77                .bytes()
78                .await
79                .map_err(|e| AwsError::Network(e.to_string())),
80            #[cfg(any(test, feature = "test-support"))]
81            ResponseData::Mock(data) => Ok(bytes::Bytes::from(data)),
82        }
83    }
84}
85
86/// Builder for [`AwsHttpClient`].
87pub struct AwsHttpClientBuilder {
88    credentials: Option<AwsCredentials>,
89    retry_config: RetryConfig,
90    rate_limit: RateLimitConfig,
91}
92
93impl Default for AwsHttpClientBuilder {
94    fn default() -> Self {
95        Self {
96            credentials: None,
97            retry_config: RetryConfig::default(),
98            rate_limit: RateLimitConfig::new(20),
99        }
100    }
101}
102
103impl AwsHttpClientBuilder {
104    /// Set AWS credentials.
105    pub fn credentials(mut self, credentials: AwsCredentials) -> Self {
106        self.credentials = Some(credentials);
107        self
108    }
109
110    /// Set retry configuration.
111    pub fn retry_config(mut self, config: RetryConfig) -> Self {
112        self.retry_config = config;
113        self
114    }
115
116    /// Set rate limiting configuration.
117    pub fn rate_limit(mut self, config: RateLimitConfig) -> Self {
118        self.rate_limit = config;
119        self
120    }
121
122    /// Build the client.
123    pub fn build(self) -> Result<AwsHttpClient, AwsError> {
124        let credentials = self.credentials.ok_or(AwsError::Auth {
125            message: "Credentials required".into(),
126        })?;
127
128        let http = reqwest::Client::builder()
129            .build()
130            .map_err(|e| AwsError::Network(e.to_string()))?;
131
132        Ok(AwsHttpClient {
133            http,
134            credentials,
135            retry_config: self.retry_config,
136            rate_limit_config: self.rate_limit.clone(),
137            rate_limiter: RateLimiter::new(self.rate_limit),
138            #[cfg(any(test, feature = "test-support"))]
139            base_url: None,
140            #[cfg(any(test, feature = "test-support"))]
141            mock: None,
142        })
143    }
144}
145
146impl AwsHttpClient {
147    /// Create a new builder.
148    pub fn builder() -> AwsHttpClientBuilder {
149        AwsHttpClientBuilder::default()
150    }
151
152    /// Create a client from the default credential chain (sync).
153    ///
154    /// Checks environment variables and credentials file. For ECS/IMDS support,
155    /// use [`from_default_chain_async`](Self::from_default_chain_async).
156    pub fn from_default_chain(region: &str) -> Result<Self, AwsError> {
157        let credentials = AwsCredentials::from_default_chain(region)?;
158        Self::builder().credentials(credentials).build()
159    }
160
161    /// Create a client from the full async credential chain.
162    ///
163    /// Checks environment variables, credentials file, ECS container credentials,
164    /// and EC2 instance metadata (IMDSv2) in order.
165    pub async fn from_default_chain_async(region: &str) -> Result<Self, AwsError> {
166        let credentials = AwsCredentials::from_default_chain_async(region).await?;
167        Self::builder().credentials(credentials).build()
168    }
169
170    /// Create a client from a mock for testing.
171    #[cfg(any(test, feature = "test-support"))]
172    pub fn from_mock(mock: crate::mock_client::MockClient) -> Self {
173        let credentials =
174            AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
175        Self {
176            http: reqwest::Client::new(),
177            credentials,
178            retry_config: RetryConfig::default(),
179            rate_limit_config: RateLimitConfig::disabled(),
180            rate_limiter: RateLimiter::new(RateLimitConfig::disabled()),
181            base_url: Some("http://mock".into()),
182            mock: Some(std::sync::Arc::new(mock)),
183        }
184    }
185
186    /// Get the configured region.
187    pub fn region(&self) -> &str {
188        &self.credentials.region
189    }
190
191    /// Create a new client targeting a different region.
192    ///
193    /// Clones the credentials with the new region and builds a fresh client
194    /// preserving the same retry and rate-limit configuration. Each regional
195    /// client gets its own rate limiter — AWS limits are per-account-per-region.
196    pub fn with_region(&self, region: &str) -> Result<Self, AwsError> {
197        let mut credentials = self.credentials.clone();
198        credentials.region = region.to_string();
199
200        let http = reqwest::Client::builder()
201            .build()
202            .map_err(|e| AwsError::Network(e.to_string()))?;
203
204        Ok(Self {
205            http,
206            credentials,
207            retry_config: self.retry_config.clone(),
208            rate_limit_config: self.rate_limit_config.clone(),
209            rate_limiter: RateLimiter::new(self.rate_limit_config.clone()),
210            #[cfg(any(test, feature = "test-support"))]
211            base_url: None,
212            #[cfg(any(test, feature = "test-support"))]
213            mock: None,
214        })
215    }
216
217    // === Generated API Accessors (do not edit) ===
218
219    /// Access the AWS IAM Access Analyzer API
220    pub fn accessanalyzer(&self) -> crate::api::AccessAnalyzerClient<'_> {
221        crate::api::AccessAnalyzerClient::new(self)
222    }
223
224    /// Access the Amazon API Gateway API
225    pub fn apigateway(&self) -> crate::api::ApigatewayClient<'_> {
226        crate::api::ApigatewayClient::new(self)
227    }
228
229    /// Access the Amazon Auto Scaling API
230    pub fn autoscaling(&self) -> crate::api::AutoScalingClient<'_> {
231        crate::api::AutoScalingClient::new(self)
232    }
233
234    /// Access the AWS Cost Explorer API
235    pub fn ce(&self) -> crate::api::CeClient<'_> {
236        crate::api::CeClient::new(self)
237    }
238
239    /// Access the Amazon CloudFront API
240    pub fn cloudfront(&self) -> crate::api::CloudfrontClient<'_> {
241        crate::api::CloudfrontClient::new(self)
242    }
243
244    /// Access the AWS CloudTrail API
245    pub fn cloudtrail(&self) -> crate::api::CloudtrailClient<'_> {
246        crate::api::CloudtrailClient::new(self)
247    }
248
249    /// Access the Amazon CloudWatch API
250    pub fn cloudwatch(&self) -> crate::api::CloudWatchClient<'_> {
251        crate::api::CloudWatchClient::new(self)
252    }
253
254    /// Access the AWS Config API
255    pub fn config(&self) -> crate::api::ConfigClient<'_> {
256        crate::api::ConfigClient::new(self)
257    }
258
259    /// Access the Amazon DynamoDB API
260    pub fn dynamodb(&self) -> crate::api::DynamodbClient<'_> {
261        crate::api::DynamodbClient::new(self)
262    }
263
264    /// Access the Amazon EC2 API
265    pub fn ec2(&self) -> crate::api::Ec2Client<'_> {
266        crate::api::Ec2Client::new(self)
267    }
268
269    /// Access the Amazon Elastic Container Registry API
270    pub fn ecr(&self) -> crate::api::EcrClient<'_> {
271        crate::api::EcrClient::new(self)
272    }
273
274    /// Access the Amazon Elastic Container Service API
275    pub fn ecs(&self) -> crate::api::EcsClient<'_> {
276        crate::api::EcsClient::new(self)
277    }
278
279    /// Access the Amazon Elastic File System API
280    pub fn efs(&self) -> crate::api::EfsClient<'_> {
281        crate::api::EfsClient::new(self)
282    }
283
284    /// Access the Amazon Elastic Kubernetes Service API
285    pub fn eks(&self) -> crate::api::EksClient<'_> {
286        crate::api::EksClient::new(self)
287    }
288
289    /// Access the Amazon ElastiCache API
290    pub fn elasticache(&self) -> crate::api::ElasticacheClient<'_> {
291        crate::api::ElasticacheClient::new(self)
292    }
293
294    /// Access the Elastic Load Balancing v2 API
295    pub fn elbv2(&self) -> crate::api::Elbv2Client<'_> {
296        crate::api::Elbv2Client::new(self)
297    }
298
299    /// Access the Amazon EMR API
300    pub fn emr(&self) -> crate::api::EmrClient<'_> {
301        crate::api::EmrClient::new(self)
302    }
303
304    /// Access the AWS Identity and Access Management API
305    pub fn iam(&self) -> crate::api::IamClient<'_> {
306        crate::api::IamClient::new(self)
307    }
308
309    /// Access the Amazon Kinesis API
310    pub fn kinesis(&self) -> crate::api::KinesisClient<'_> {
311        crate::api::KinesisClient::new(self)
312    }
313
314    /// Access the AWS Key Management Service API
315    pub fn kms(&self) -> crate::api::KmsClient<'_> {
316        crate::api::KmsClient::new(self)
317    }
318
319    /// Access the AWS Lambda API
320    pub fn lambda(&self) -> crate::api::LambdaClient<'_> {
321        crate::api::LambdaClient::new(self)
322    }
323
324    /// Access the Amazon CloudWatch Logs API
325    pub fn logs(&self) -> crate::api::LogsClient<'_> {
326        crate::api::LogsClient::new(self)
327    }
328
329    /// Access the Amazon OpenSearch Service API
330    pub fn opensearch(&self) -> crate::api::OpensearchClient<'_> {
331        crate::api::OpensearchClient::new(self)
332    }
333
334    /// Access the AWS Organizations API
335    pub fn organizations(&self) -> crate::api::OrganizationsClient<'_> {
336        crate::api::OrganizationsClient::new(self)
337    }
338
339    /// Access the Amazon Relational Database Service API
340    pub fn rds(&self) -> crate::api::RdsClient<'_> {
341        crate::api::RdsClient::new(self)
342    }
343
344    /// Access the Amazon Redshift API
345    pub fn redshift(&self) -> crate::api::RedshiftClient<'_> {
346        crate::api::RedshiftClient::new(self)
347    }
348
349    /// Access the Amazon Route 53 API
350    pub fn route53(&self) -> crate::api::Route53Client<'_> {
351        crate::api::Route53Client::new(self)
352    }
353
354    /// Access the Amazon S3 API
355    pub fn s3(&self) -> crate::api::S3Client<'_> {
356        crate::api::S3Client::new(self)
357    }
358
359    /// Access the Amazon SageMaker API
360    pub fn sagemaker(&self) -> crate::api::SagemakerClient<'_> {
361        crate::api::SagemakerClient::new(self)
362    }
363
364    /// Access the AWS Secrets Manager API
365    pub fn secretsmanager(&self) -> crate::api::SecretsmanagerClient<'_> {
366        crate::api::SecretsmanagerClient::new(self)
367    }
368
369    /// Access the AWS Security Hub API
370    pub fn securityhub(&self) -> crate::api::SecurityHubClient<'_> {
371        crate::api::SecurityHubClient::new(self)
372    }
373
374    /// Access the AWS Security Token Service API
375    pub fn sts(&self) -> crate::api::StsClient<'_> {
376        crate::api::StsClient::new(self)
377    }
378    // === End Generated API Accessors ===
379
380    /// Make a signed GET request with automatic retry.
381    pub async fn get(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
382        #[cfg(any(test, feature = "test-support"))]
383        if let Some(ref mock) = self.mock {
384            let result = mock.execute("GET", url, None).await?;
385            return Ok(AwsResponse {
386                data: ResponseData::Mock(result),
387            });
388        }
389
390        let response = self.signed_request("GET", url, service, b"", &[]).await?;
391        Ok(AwsResponse {
392            data: ResponseData::Real(response),
393        })
394    }
395
396    /// Make a signed GET request with `Accept: application/json`.
397    ///
398    /// Used for REST-JSON services (e.g. API Gateway) that return HAL format
399    /// by default and require `Accept: application/json` to get plain JSON.
400    pub async fn get_json(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
401        #[cfg(any(test, feature = "test-support"))]
402        if let Some(ref mock) = self.mock {
403            let result = mock.execute("GET", url, None).await?;
404            return Ok(AwsResponse {
405                data: ResponseData::Mock(result),
406            });
407        }
408
409        let response = self
410            .signed_request("GET", url, service, b"", &[("accept", "application/json")])
411            .await?;
412        Ok(AwsResponse {
413            data: ResponseData::Real(response),
414        })
415    }
416
417    /// Make a signed POST request with a body.
418    pub async fn post(
419        &self,
420        url: &str,
421        service: &str,
422        body: &[u8],
423        content_type: &str,
424    ) -> Result<AwsResponse, AwsError> {
425        #[cfg(any(test, feature = "test-support"))]
426        if let Some(ref mock) = self.mock {
427            let result = mock.execute("POST", url, None).await?;
428            return Ok(AwsResponse {
429                data: ResponseData::Mock(result),
430            });
431        }
432
433        let response = self
434            .signed_request(
435                "POST",
436                url,
437                service,
438                body,
439                &[("content-type", content_type)],
440            )
441            .await?;
442        Ok(AwsResponse {
443            data: ResponseData::Real(response),
444        })
445    }
446
447    /// Make a signed POST request for AWS JSON protocol services.
448    ///
449    /// Automatically sets `Content-Type: application/x-amz-json-{json_version}`
450    /// and `X-Amz-Target: {target}`, with both headers included in the SigV4
451    /// signature.
452    pub async fn post_json(
453        &self,
454        url: &str,
455        service: &str,
456        target: &str,
457        json_version: &str,
458        body: &[u8],
459    ) -> Result<AwsResponse, AwsError> {
460        #[cfg(any(test, feature = "test-support"))]
461        if let Some(ref mock) = self.mock {
462            let result = mock.execute("POST", url, None).await?;
463            return Ok(AwsResponse {
464                data: ResponseData::Mock(result),
465            });
466        }
467
468        let content_type = format!("application/x-amz-json-{json_version}");
469        let extra = [
470            ("content-type", content_type.as_str()),
471            ("x-amz-target", target),
472        ];
473        let response = self
474            .signed_request("POST", url, service, body, &extra)
475            .await?;
476        Ok(AwsResponse {
477            data: ResponseData::Real(response),
478        })
479    }
480
481    /// Make a signed PUT request with a body.
482    pub async fn put(
483        &self,
484        url: &str,
485        service: &str,
486        body: &[u8],
487        content_type: &str,
488    ) -> Result<AwsResponse, AwsError> {
489        #[cfg(any(test, feature = "test-support"))]
490        if let Some(ref mock) = self.mock {
491            let result = mock.execute("PUT", url, None).await?;
492            return Ok(AwsResponse {
493                data: ResponseData::Mock(result),
494            });
495        }
496
497        // AWS requires Content-MD5 for certain PUT operations (e.g., S3 lifecycle)
498        let md5_b64;
499        let extra: Vec<(&str, &str)> = if body.is_empty() {
500            vec![("content-type", content_type)]
501        } else {
502            let digest = Md5::digest(body);
503            md5_b64 = base64::engine::general_purpose::STANDARD.encode(digest);
504            vec![("content-type", content_type), ("content-md5", &md5_b64)]
505        };
506        let response = self
507            .signed_request("PUT", url, service, body, &extra)
508            .await?;
509        Ok(AwsResponse {
510            data: ResponseData::Real(response),
511        })
512    }
513
514    /// Make a signed DELETE request.
515    pub async fn delete(&self, url: &str, service: &str) -> Result<AwsResponse, AwsError> {
516        #[cfg(any(test, feature = "test-support"))]
517        if let Some(ref mock) = self.mock {
518            let result = mock.execute("DELETE", url, None).await?;
519            return Ok(AwsResponse {
520                data: ResponseData::Mock(result),
521            });
522        }
523
524        let response = self
525            .signed_request("DELETE", url, service, b"", &[])
526            .await?;
527        Ok(AwsResponse {
528            data: ResponseData::Real(response),
529        })
530    }
531
532    /// Make a signed PATCH request.
533    pub async fn patch(
534        &self,
535        url: &str,
536        service: &str,
537        body: &[u8],
538        content_type: &str,
539    ) -> Result<AwsResponse, AwsError> {
540        #[cfg(any(test, feature = "test-support"))]
541        if let Some(ref mock) = self.mock {
542            let result = mock.execute("PATCH", url, None).await?;
543            return Ok(AwsResponse {
544                data: ResponseData::Mock(result),
545            });
546        }
547
548        let response = self
549            .signed_request(
550                "PATCH",
551                url,
552                service,
553                body,
554                &[("content-type", content_type)],
555            )
556            .await?;
557        Ok(AwsResponse {
558            data: ResponseData::Real(response),
559        })
560    }
561
562    /// Internal: signed request with arbitrary method, headers, and automatic retry.
563    async fn signed_request(
564        &self,
565        method: &str,
566        url: &str,
567        service: &str,
568        body: &[u8],
569        extra_headers: &[(&str, &str)],
570    ) -> Result<reqwest::Response, AwsError> {
571        let _permit = self.rate_limiter.acquire(url).await;
572
573        let mut attempt = 0u32;
574        let mut backoff = self.retry_config.initial_backoff;
575        let body_bytes = if body.is_empty() {
576            None
577        } else {
578            Some(bytes::Bytes::copy_from_slice(body))
579        };
580
581        loop {
582            let now = chrono::Utc::now();
583            let signed = sigv4::sign_request(
584                method,
585                url,
586                extra_headers,
587                body,
588                service,
589                &self.credentials,
590                &now,
591            );
592
593            let mut request = self
594                .http
595                .request(method.parse().expect("invalid HTTP method"), url);
596            if let Some(ref b) = body_bytes {
597                request = request.body(b.clone());
598            }
599            for &(key, value) in extra_headers {
600                request = request.header(key, value);
601            }
602            request = request.header("Authorization", &signed.authorization);
603            for (key, value) in &signed.extra_headers {
604                request = request.header(key, value);
605            }
606
607            let result = match request.send().await {
608                Ok(response) => Self::classify_response(response).await,
609                Err(e) => Err(AwsError::from(e)),
610            };
611
612            match result {
613                Ok(response) => return Ok(response),
614                Err(aws_err) => {
615                    if aws_err.is_retryable() && attempt < self.retry_config.max_retries {
616                        let delay = self
617                            .retry_config
618                            .compute_backoff(backoff, aws_err.retry_after());
619                        tokio::time::sleep(delay).await;
620                        backoff = std::time::Duration::from_secs_f64(
621                            backoff.as_secs_f64() * self.retry_config.backoff_multiplier,
622                        );
623                        attempt += 1;
624                        continue;
625                    }
626                    return Err(aws_err);
627                }
628            }
629        }
630    }
631
632    /// Classify an HTTP response: return Ok for 2xx, parse and return Err for 4xx/5xx.
633    ///
634    /// Extracts `Retry-After` header for throttled responses. Tries JSON then XML
635    /// parsing for error bodies based on Content-Type.
636    async fn classify_response(response: reqwest::Response) -> Result<reqwest::Response, AwsError> {
637        let status = response.status().as_u16();
638        if status < 400 {
639            return Ok(response);
640        }
641
642        // Extract Retry-After header before consuming the response body
643        let retry_after_secs: Option<u64> = response
644            .headers()
645            .get("retry-after")
646            .and_then(|v| v.to_str().ok())
647            .and_then(|s| s.parse().ok());
648
649        // Detect content type for error parsing
650        let content_type = response
651            .headers()
652            .get("content-type")
653            .and_then(|v| v.to_str().ok())
654            .unwrap_or("")
655            .to_string();
656
657        let body_text = response.text().await.unwrap_or_default();
658
659        let mut err = if content_type.contains("json") {
660            crate::error::parse_json_error(status, &body_text)
661        } else {
662            crate::error::parse_xml_error(status, &body_text)
663        };
664
665        // Inject Retry-After duration if the header was present and the error is Throttled
666        if let Some(secs) = retry_after_secs
667            && let AwsError::Throttled { retry_after, .. } = &mut err
668        {
669            *retry_after = Some(std::time::Duration::from_secs(secs));
670        }
671
672        Err(err)
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn builder_requires_credentials() {
682        let result = AwsHttpClient::builder().build();
683        match result {
684            Err(AwsError::Auth { .. }) => {} // expected
685            Err(other) => panic!("expected Auth error, got: {other}"),
686            Ok(_) => panic!("expected error, got Ok"),
687        }
688    }
689
690    #[test]
691    fn builder_succeeds_with_credentials() {
692        let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
693        let client = AwsHttpClient::builder().credentials(creds).build();
694        assert!(client.is_ok());
695        assert_eq!(client.unwrap().region(), "us-east-1");
696    }
697
698    #[test]
699    fn builder_accepts_custom_retry_config() {
700        let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
701        let client = AwsHttpClient::builder()
702            .credentials(creds)
703            .retry_config(RetryConfig {
704                max_retries: 5,
705                ..RetryConfig::default()
706            })
707            .build()
708            .unwrap();
709        assert_eq!(client.retry_config.max_retries, 5);
710    }
711
712    #[test]
713    fn builder_accepts_custom_rate_limit() {
714        let creds = AwsCredentials::new("AKID".into(), "SECRET".into(), None, "us-east-1".into());
715        let client = AwsHttpClient::builder()
716            .credentials(creds)
717            .rate_limit(RateLimitConfig::disabled())
718            .build();
719        assert!(client.is_ok());
720    }
721}