Skip to main content

redis_cloud/
client.rs

1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a `CloudClient` with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final `CloudClient` instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31///     .api_key("your-api-key")
32///     .api_secret("your-api-secret")
33///     .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37///     .api_key("your-api-key")
38///     .api_secret("your-api-secret")
39///     .base_url("https://api.redislabs.com/v1".to_string())
40///     .timeout(std::time::Duration::from_secs(120))
41///     .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46    api_key: Option<String>,
47    api_secret: Option<String>,
48    base_url: String,
49    timeout: std::time::Duration,
50    user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54    fn default() -> Self {
55        Self {
56            api_key: None,
57            api_secret: None,
58            base_url: "https://api.redislabs.com/v1".to_string(),
59            timeout: std::time::Duration::from_secs(30),
60            user_agent: DEFAULT_USER_AGENT.to_string(),
61        }
62    }
63}
64
65impl CloudClientBuilder {
66    /// Create a new builder
67    #[must_use]
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Set the API key
73    #[must_use]
74    pub fn api_key(mut self, key: impl Into<String>) -> Self {
75        self.api_key = Some(key.into());
76        self
77    }
78
79    /// Set the API secret
80    #[must_use]
81    pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
82        self.api_secret = Some(secret.into());
83        self
84    }
85
86    /// Set the base URL
87    #[must_use]
88    pub fn base_url(mut self, url: impl Into<String>) -> Self {
89        self.base_url = url.into();
90        self
91    }
92
93    /// Set the timeout
94    #[must_use]
95    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
96        self.timeout = timeout;
97        self
98    }
99
100    /// Set the user agent string for HTTP requests
101    ///
102    /// The default user agent is `redis-cloud/{version}`.
103    /// This can be overridden to identify specific clients, for example:
104    /// `redisctl/1.2.3` or `my-app/1.0.0`.
105    #[must_use]
106    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
107        self.user_agent = user_agent.into();
108        self
109    }
110
111    /// Build the client
112    pub fn build(self) -> Result<CloudClient> {
113        let api_key = self
114            .api_key
115            .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
116        let api_secret = self
117            .api_secret
118            .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
119
120        let mut default_headers = HeaderMap::new();
121        default_headers.insert(
122            USER_AGENT,
123            HeaderValue::from_str(&self.user_agent)
124                .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {e}")))?,
125        );
126
127        let client = Client::builder()
128            .timeout(self.timeout)
129            .default_headers(default_headers)
130            .build()
131            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
132
133        Ok(CloudClient {
134            api_key,
135            api_secret,
136            base_url: self.base_url,
137            timeout: self.timeout,
138            client: Arc::new(client),
139        })
140    }
141}
142
143/// Redis Cloud API client
144#[derive(Clone)]
145pub struct CloudClient {
146    pub(crate) api_key: String,
147    pub(crate) api_secret: String,
148    pub(crate) base_url: String,
149    pub(crate) timeout: std::time::Duration,
150    pub(crate) client: Arc<Client>,
151}
152
153impl CloudClient {
154    /// Create a new builder for the client
155    #[must_use]
156    pub fn builder() -> CloudClientBuilder {
157        CloudClientBuilder::new()
158    }
159
160    /// Get the configured request timeout
161    ///
162    /// Returns the timeout duration that was set when building the client.
163    /// This timeout is applied to all HTTP requests made by this client.
164    #[must_use]
165    pub fn timeout(&self) -> std::time::Duration {
166        self.timeout
167    }
168
169    // ========================================================================
170    // Fluent API - Handler accessors
171    // ========================================================================
172
173    /// Get an account handler for account management operations
174    ///
175    /// # Example
176    ///
177    /// ```rust,no_run
178    /// # use redis_cloud::CloudClient;
179    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
180    /// let client = CloudClient::builder()
181    ///     .api_key("key")
182    ///     .api_secret("secret")
183    ///     .build()?;
184    ///
185    /// let account = client.account().get_current_account().await?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    #[must_use]
190    pub fn account(&self) -> crate::AccountHandler {
191        crate::AccountHandler::new(self.clone())
192    }
193
194    /// Get a subscription handler for Pro subscription operations
195    ///
196    /// # Example
197    ///
198    /// ```rust,no_run
199    /// # use redis_cloud::CloudClient;
200    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
201    /// let client = CloudClient::builder()
202    ///     .api_key("key")
203    ///     .api_secret("secret")
204    ///     .build()?;
205    ///
206    /// let subscriptions = client.subscriptions().get_all_subscriptions().await?;
207    /// # Ok(())
208    /// # }
209    /// ```
210    #[must_use]
211    pub fn subscriptions(&self) -> crate::SubscriptionHandler {
212        crate::SubscriptionHandler::new(self.clone())
213    }
214
215    /// Get a database handler for Pro database operations
216    ///
217    /// # Example
218    ///
219    /// ```rust,no_run
220    /// # use redis_cloud::CloudClient;
221    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222    /// let client = CloudClient::builder()
223    ///     .api_key("key")
224    ///     .api_secret("secret")
225    ///     .build()?;
226    ///
227    /// let databases = client.databases().get_subscription_databases(123, None, None).await?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    #[must_use]
232    pub fn databases(&self) -> crate::DatabaseHandler {
233        crate::DatabaseHandler::new(self.clone())
234    }
235
236    /// Get a fixed subscription handler for Essentials subscription operations
237    ///
238    /// # Example
239    ///
240    /// ```rust,no_run
241    /// # use redis_cloud::CloudClient;
242    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
243    /// let client = CloudClient::builder()
244    ///     .api_key("key")
245    ///     .api_secret("secret")
246    ///     .build()?;
247    ///
248    /// let subscriptions = client.fixed_subscriptions().list().await?;
249    /// # Ok(())
250    /// # }
251    /// ```
252    #[must_use]
253    pub fn fixed_subscriptions(&self) -> crate::FixedSubscriptionHandler {
254        crate::FixedSubscriptionHandler::new(self.clone())
255    }
256
257    /// Get a fixed database handler for Essentials database operations
258    ///
259    /// # Example
260    ///
261    /// ```rust,no_run
262    /// # use redis_cloud::CloudClient;
263    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
264    /// let client = CloudClient::builder()
265    ///     .api_key("key")
266    ///     .api_secret("secret")
267    ///     .build()?;
268    ///
269    /// let databases = client.fixed_databases().list(123, None, None).await?;
270    /// # Ok(())
271    /// # }
272    /// ```
273    #[must_use]
274    pub fn fixed_databases(&self) -> crate::FixedDatabaseHandler {
275        crate::FixedDatabaseHandler::new(self.clone())
276    }
277
278    /// Get an ACL handler for access control operations
279    ///
280    /// # Example
281    ///
282    /// ```rust,no_run
283    /// # use redis_cloud::CloudClient;
284    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
285    /// let client = CloudClient::builder()
286    ///     .api_key("key")
287    ///     .api_secret("secret")
288    ///     .build()?;
289    ///
290    /// let users = client.acl().get_all_acl_users().await?;
291    /// # Ok(())
292    /// # }
293    /// ```
294    #[must_use]
295    pub fn acl(&self) -> crate::AclHandler {
296        crate::AclHandler::new(self.clone())
297    }
298
299    /// Get a users handler for user management operations
300    ///
301    /// # Example
302    ///
303    /// ```rust,no_run
304    /// # use redis_cloud::CloudClient;
305    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306    /// let client = CloudClient::builder()
307    ///     .api_key("key")
308    ///     .api_secret("secret")
309    ///     .build()?;
310    ///
311    /// let users = client.users().get_all_users().await?;
312    /// # Ok(())
313    /// # }
314    /// ```
315    #[must_use]
316    pub fn users(&self) -> crate::UserHandler {
317        crate::UserHandler::new(self.clone())
318    }
319
320    /// Get a tasks handler for async operation tracking
321    ///
322    /// # Example
323    ///
324    /// ```rust,no_run
325    /// # use redis_cloud::CloudClient;
326    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327    /// let client = CloudClient::builder()
328    ///     .api_key("key")
329    ///     .api_secret("secret")
330    ///     .build()?;
331    ///
332    /// let tasks = client.tasks().get_all_tasks().await?;
333    /// # Ok(())
334    /// # }
335    /// ```
336    #[must_use]
337    pub fn tasks(&self) -> crate::TaskHandler {
338        crate::TaskHandler::new(self.clone())
339    }
340
341    /// Get a cloud accounts handler for cloud provider integration
342    ///
343    /// # Example
344    ///
345    /// ```rust,no_run
346    /// # use redis_cloud::CloudClient;
347    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
348    /// let client = CloudClient::builder()
349    ///     .api_key("key")
350    ///     .api_secret("secret")
351    ///     .build()?;
352    ///
353    /// let accounts = client.cloud_accounts().get_cloud_accounts().await?;
354    /// # Ok(())
355    /// # }
356    /// ```
357    #[must_use]
358    pub fn cloud_accounts(&self) -> crate::CloudAccountHandler {
359        crate::CloudAccountHandler::new(self.clone())
360    }
361
362    /// Get a VPC peering handler for VPC peering operations
363    ///
364    /// # Example
365    ///
366    /// ```rust,no_run
367    /// # use redis_cloud::CloudClient;
368    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369    /// let client = CloudClient::builder()
370    ///     .api_key("key")
371    ///     .api_secret("secret")
372    ///     .build()?;
373    ///
374    /// let peering = client.vpc_peering().get(123).await?;
375    /// # Ok(())
376    /// # }
377    /// ```
378    #[must_use]
379    pub fn vpc_peering(&self) -> crate::VpcPeeringHandler {
380        crate::VpcPeeringHandler::new(self.clone())
381    }
382
383    /// Get a transit gateway handler for AWS Transit Gateway operations
384    ///
385    /// # Example
386    ///
387    /// ```rust,no_run
388    /// # use redis_cloud::CloudClient;
389    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
390    /// let client = CloudClient::builder()
391    ///     .api_key("key")
392    ///     .api_secret("secret")
393    ///     .build()?;
394    ///
395    /// let attachments = client.transit_gateway().get_attachments(123).await?;
396    /// # Ok(())
397    /// # }
398    /// ```
399    #[must_use]
400    pub fn transit_gateway(&self) -> crate::TransitGatewayHandler {
401        crate::TransitGatewayHandler::new(self.clone())
402    }
403
404    /// Get a Private Service Connect handler for GCP PSC operations
405    ///
406    /// # Example
407    ///
408    /// ```rust,no_run
409    /// # use redis_cloud::CloudClient;
410    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411    /// let client = CloudClient::builder()
412    ///     .api_key("key")
413    ///     .api_secret("secret")
414    ///     .build()?;
415    ///
416    /// let service = client.psc().get_service(123).await?;
417    /// # Ok(())
418    /// # }
419    /// ```
420    #[must_use]
421    pub fn psc(&self) -> crate::PscHandler {
422        crate::PscHandler::new(self.clone())
423    }
424
425    /// Get a `PrivateLink` handler for AWS `PrivateLink` operations
426    ///
427    /// # Example
428    ///
429    /// ```rust,no_run
430    /// # use redis_cloud::CloudClient;
431    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
432    /// let client = CloudClient::builder()
433    ///     .api_key("key")
434    ///     .api_secret("secret")
435    ///     .build()?;
436    ///
437    /// let config = client.private_link().get(123).await?;
438    /// # Ok(())
439    /// # }
440    /// ```
441    #[must_use]
442    pub fn private_link(&self) -> crate::PrivateLinkHandler {
443        crate::PrivateLinkHandler::new(self.clone())
444    }
445
446    /// Get a cost report handler for generating cost reports
447    ///
448    /// # Example
449    ///
450    /// ```rust,no_run
451    /// # use redis_cloud::CloudClient;
452    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
453    /// let client = CloudClient::builder()
454    ///     .api_key("key")
455    ///     .api_secret("secret")
456    ///     .build()?;
457    ///
458    /// let handler = client.cost_reports();
459    /// # Ok(())
460    /// # }
461    /// ```
462    #[must_use]
463    pub fn cost_reports(&self) -> crate::CostReportHandler {
464        crate::CostReportHandler::new(self.clone())
465    }
466
467    /// Normalize URL path concatenation to avoid double slashes
468    fn normalize_url(&self, path: &str) -> String {
469        let base = self.base_url.trim_end_matches('/');
470        let path = path.trim_start_matches('/');
471        format!("{base}/{path}")
472    }
473
474    /// Convert HTTP status code and response text to appropriate error
475    ///
476    /// This is a helper to avoid duplicating the error handling pattern
477    /// across multiple methods.
478    fn status_to_error(status: reqwest::StatusCode, text: String) -> RestError {
479        match status.as_u16() {
480            400 => RestError::BadRequest { message: text },
481            401 => RestError::AuthenticationFailed { message: text },
482            403 => RestError::Forbidden { message: text },
483            404 => RestError::NotFound { message: text },
484            412 => RestError::PreconditionFailed,
485            429 => RestError::RateLimited { message: text },
486            500 => RestError::InternalServerError { message: text },
487            503 => RestError::ServiceUnavailable { message: text },
488            _ => RestError::ApiError {
489                code: status.as_u16(),
490                message: text,
491            },
492        }
493    }
494
495    /// Make a GET request with API key authentication
496    #[instrument(skip(self), fields(method = "GET"))]
497    pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
498        let url = self.normalize_url(path);
499        debug!("GET {}", url);
500
501        // Redis Cloud API uses these headers for authentication
502        let response = self
503            .client
504            .get(&url)
505            .header("x-api-key", &self.api_key)
506            .header("x-api-secret-key", &self.api_secret)
507            .send()
508            .await?;
509
510        trace!("Response status: {}", response.status());
511        self.handle_response(response).await
512    }
513
514    /// Make a POST request
515    #[instrument(skip(self, body), fields(method = "POST"))]
516    pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
517        &self,
518        path: &str,
519        body: &B,
520    ) -> Result<T> {
521        let url = self.normalize_url(path);
522        debug!("POST {}", url);
523        trace!("Request body: {:?}", serde_json::to_value(body).ok());
524
525        // Same backwards header naming as GET
526        let response = self
527            .client
528            .post(&url)
529            .header("x-api-key", &self.api_key)
530            .header("x-api-secret-key", &self.api_secret)
531            .json(body)
532            .send()
533            .await?;
534
535        trace!("Response status: {}", response.status());
536        self.handle_response(response).await
537    }
538
539    /// Make a PUT request
540    #[instrument(skip(self, body), fields(method = "PUT"))]
541    pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
542        &self,
543        path: &str,
544        body: &B,
545    ) -> Result<T> {
546        let url = self.normalize_url(path);
547        debug!("PUT {}", url);
548        trace!("Request body: {:?}", serde_json::to_value(body).ok());
549
550        // Same backwards header naming as GET
551        let response = self
552            .client
553            .put(&url)
554            .header("x-api-key", &self.api_key)
555            .header("x-api-secret-key", &self.api_secret)
556            .json(body)
557            .send()
558            .await?;
559
560        trace!("Response status: {}", response.status());
561        self.handle_response(response).await
562    }
563
564    /// Make a DELETE request
565    #[instrument(skip(self), fields(method = "DELETE"))]
566    pub async fn delete(&self, path: &str) -> Result<()> {
567        let url = self.normalize_url(path);
568        debug!("DELETE {}", url);
569
570        // Same backwards header naming as GET
571        let response = self
572            .client
573            .delete(&url)
574            .header("x-api-key", &self.api_key)
575            .header("x-api-secret-key", &self.api_secret)
576            .send()
577            .await?;
578
579        trace!("Response status: {}", response.status());
580        if response.status().is_success() {
581            Ok(())
582        } else {
583            let status = response.status();
584            let text = response
585                .text()
586                .await
587                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
588            Err(Self::status_to_error(status, text))
589        }
590    }
591
592    /// Execute a bodyless DELETE request, deserializing the response body.
593    ///
594    /// Unlike [`Self::delete`] (which discards the body) this parses the
595    /// response into `T`. Connectivity deletes are asynchronous and the spec
596    /// returns a [`TaskStateUpdate`](crate::types::TaskStateUpdate) so callers
597    /// can poll the resulting task to completion.
598    #[instrument(skip(self), fields(method = "DELETE"))]
599    pub async fn delete_typed<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
600        let url = self.normalize_url(path);
601        debug!("DELETE {}", url);
602
603        let response = self
604            .client
605            .delete(&url)
606            .header("x-api-key", &self.api_key)
607            .header("x-api-secret-key", &self.api_secret)
608            .send()
609            .await?;
610
611        trace!("Response status: {}", response.status());
612        self.handle_response(response).await
613    }
614
615    /// Execute raw GET request returning JSON Value
616    #[instrument(skip(self), fields(method = "GET"))]
617    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
618        self.get(path).await
619    }
620
621    /// Execute GET request returning raw bytes
622    ///
623    /// Useful for downloading binary content like cost reports or other files.
624    #[instrument(skip(self), fields(method = "GET"))]
625    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
626        let url = self.normalize_url(path);
627        debug!("GET {} (bytes)", url);
628
629        let response = self
630            .client
631            .get(&url)
632            .header("x-api-key", &self.api_key)
633            .header("x-api-secret-key", &self.api_secret)
634            .send()
635            .await?;
636
637        trace!("Response status: {}", response.status());
638        let status = response.status();
639
640        if status.is_success() {
641            response
642                .bytes()
643                .await
644                .map(|b| b.to_vec())
645                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))
646        } else {
647            let text = response
648                .text()
649                .await
650                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
651            Err(Self::status_to_error(status, text))
652        }
653    }
654
655    /// Execute raw POST request with JSON body
656    #[instrument(skip(self, body), fields(method = "POST"))]
657    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
658        self.post(path, &body).await
659    }
660
661    /// Execute raw PUT request with JSON body
662    #[instrument(skip(self, body), fields(method = "PUT"))]
663    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
664        self.put(path, &body).await
665    }
666
667    /// Execute raw PATCH request with JSON body
668    #[instrument(skip(self, body), fields(method = "PATCH"))]
669    pub async fn patch_raw(
670        &self,
671        path: &str,
672        body: serde_json::Value,
673    ) -> Result<serde_json::Value> {
674        let url = self.normalize_url(path);
675        debug!("PATCH {}", url);
676        trace!("Request body: {:?}", body);
677
678        // Use backwards header names for compatibility
679        let response = self
680            .client
681            .patch(&url)
682            .header("x-api-key", &self.api_key)
683            .header("x-api-secret-key", &self.api_secret)
684            .json(&body)
685            .send()
686            .await?;
687
688        trace!("Response status: {}", response.status());
689        self.handle_response(response).await
690    }
691
692    /// Execute raw DELETE request returning any response body
693    #[instrument(skip(self), fields(method = "DELETE"))]
694    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
695        let url = self.normalize_url(path);
696        debug!("DELETE {}", url);
697
698        // Use backwards header names for compatibility
699        let response = self
700            .client
701            .delete(&url)
702            .header("x-api-key", &self.api_key)
703            .header("x-api-secret-key", &self.api_secret)
704            .send()
705            .await?;
706
707        trace!("Response status: {}", response.status());
708        if response.status().is_success() {
709            if response.content_length() == Some(0) {
710                Ok(serde_json::json!({"status": "deleted"}))
711            } else {
712                response.json().await.map_err(Into::into)
713            }
714        } else {
715            let status = response.status();
716            let text = response
717                .text()
718                .await
719                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
720            Err(Self::status_to_error(status, text))
721        }
722    }
723
724    /// Execute DELETE request with JSON body (used by some endpoints like `PrivateLink` principals)
725    #[instrument(skip(self, body), fields(method = "DELETE"))]
726    pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
727        &self,
728        path: &str,
729        body: serde_json::Value,
730    ) -> Result<T> {
731        let url = self.normalize_url(path);
732        debug!("DELETE {} (with body)", url);
733        trace!("Request body: {:?}", body);
734
735        let response = self
736            .client
737            .delete(&url)
738            .header("x-api-key", &self.api_key)
739            .header("x-api-secret-key", &self.api_secret)
740            .json(&body)
741            .send()
742            .await?;
743
744        trace!("Response status: {}", response.status());
745        self.handle_response(response).await
746    }
747
748    /// Handle HTTP response and return both status code and body as JSON
749    ///
750    /// This is used internally by the Tower service implementation to preserve
751    /// the actual HTTP status code in responses.
752    #[cfg(feature = "tower-integration")]
753    async fn handle_response_with_status(
754        &self,
755        response: reqwest::Response,
756    ) -> Result<(u16, serde_json::Value)> {
757        let status = response.status();
758        let status_code = status.as_u16();
759
760        if status.is_success() {
761            let bytes = response
762                .bytes()
763                .await
764                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
765
766            let value: serde_json::Value = serde_json::from_slice(&bytes).map_err(|e| {
767                RestError::ConnectionError(format!("Failed to parse JSON response: {e}"))
768            })?;
769
770            Ok((status_code, value))
771        } else {
772            let text = response
773                .text()
774                .await
775                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
776            Err(Self::status_to_error(status, text))
777        }
778    }
779
780    /// Handle HTTP response
781    async fn handle_response<T: serde::de::DeserializeOwned>(
782        &self,
783        response: reqwest::Response,
784    ) -> Result<T> {
785        let status = response.status();
786
787        if status.is_success() {
788            // Get the response bytes for better error reporting
789            let bytes = response
790                .bytes()
791                .await
792                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
793
794            // Treat an empty success body (e.g. HTTP 204 No Content from the
795            // traffic-resume endpoints) as JSON `null` so it deserializes
796            // cleanly into `()`, `Option<T>`, or `serde_json::Value::Null`.
797            let bytes: &[u8] = if bytes.is_empty() { b"null" } else { &bytes };
798
799            // Use serde_path_to_error for better deserialization error messages
800            let deserializer = &mut serde_json::Deserializer::from_slice(bytes);
801            serde_path_to_error::deserialize(deserializer).map_err(|err| {
802                let path = err.path().to_string();
803                // Use ConnectionError to provide detailed error message with field path
804                RestError::ConnectionError(format!(
805                    "Failed to deserialize field '{}': {}",
806                    path,
807                    err.inner()
808                ))
809            })
810        } else {
811            let text = response
812                .text()
813                .await
814                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
815            Err(Self::status_to_error(status, text))
816        }
817    }
818}
819
820/// Tower Service integration for `CloudClient`
821///
822/// This module provides Tower Service implementations for `CloudClient`, enabling
823/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
824///
825/// # Feature Flag
826///
827/// This module is only available when the `tower-integration` feature is enabled.
828///
829/// # Examples
830///
831/// ```rust,ignore
832/// use redis_cloud::CloudClient;
833/// use redis_cloud::tower_support::ApiRequest;
834/// use tower::ServiceExt;
835///
836/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
837/// let client = CloudClient::builder()
838///     .api_key("your-key")
839///     .api_secret("your-secret")
840///     .build()?;
841///
842/// // Convert to a Tower service
843/// let mut service = client.into_service();
844///
845/// // Use the service
846/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
847/// println!("Status: {}", response.status);
848/// # Ok(())
849/// # }
850/// ```
851#[cfg(feature = "tower-integration")]
852pub mod tower_support {
853    use super::{CloudClient, RestError, Result};
854    use std::future::Future;
855    use std::pin::Pin;
856    use std::task::{Context, Poll};
857    use tower::Service;
858
859    /// HTTP method for API requests
860    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
861    pub enum Method {
862        /// GET request
863        Get,
864        /// POST request
865        Post,
866        /// PUT request
867        Put,
868        /// PATCH request
869        Patch,
870        /// DELETE request
871        Delete,
872    }
873
874    /// Tower-compatible request type for Redis Cloud API
875    ///
876    /// This wraps the essential components of an API request in a format
877    /// suitable for Tower middleware composition.
878    #[derive(Debug, Clone)]
879    pub struct ApiRequest {
880        /// HTTP method
881        pub method: Method,
882        /// API endpoint path (e.g., "/subscriptions")
883        pub path: String,
884        /// Optional JSON body for POST/PUT/PATCH requests
885        pub body: Option<serde_json::Value>,
886    }
887
888    impl ApiRequest {
889        /// Create a GET request
890        pub fn get(path: impl Into<String>) -> Self {
891            Self {
892                method: Method::Get,
893                path: path.into(),
894                body: None,
895            }
896        }
897
898        /// Create a POST request with a JSON body
899        pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
900            Self {
901                method: Method::Post,
902                path: path.into(),
903                body: Some(body),
904            }
905        }
906
907        /// Create a PUT request with a JSON body
908        pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
909            Self {
910                method: Method::Put,
911                path: path.into(),
912                body: Some(body),
913            }
914        }
915
916        /// Create a PATCH request with a JSON body
917        pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
918            Self {
919                method: Method::Patch,
920                path: path.into(),
921                body: Some(body),
922            }
923        }
924
925        /// Create a DELETE request
926        pub fn delete(path: impl Into<String>) -> Self {
927            Self {
928                method: Method::Delete,
929                path: path.into(),
930                body: None,
931            }
932        }
933    }
934
935    /// Tower-compatible response type
936    ///
937    /// Contains the HTTP status code and response body as JSON.
938    #[derive(Debug, Clone)]
939    pub struct ApiResponse {
940        /// HTTP status code
941        pub status: u16,
942        /// Response body as JSON
943        pub body: serde_json::Value,
944    }
945
946    impl CloudClient {
947        /// Convert this client into a Tower service
948        ///
949        /// This consumes the client and returns it wrapped in a Tower service
950        /// implementation, enabling middleware composition.
951        ///
952        /// # Examples
953        ///
954        /// ```rust,ignore
955        /// use redis_cloud::CloudClient;
956        /// use tower::ServiceExt;
957        /// use redis_cloud::tower_support::ApiRequest;
958        ///
959        /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
960        /// let client = CloudClient::builder()
961        ///     .api_key("key")
962        ///     .api_secret("secret")
963        ///     .build()?;
964        ///
965        /// let mut service = client.into_service();
966        /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
967        /// # Ok(())
968        /// # }
969        /// ```
970        #[must_use]
971        pub fn into_service(self) -> Self {
972            self
973        }
974    }
975
976    impl Service<ApiRequest> for CloudClient {
977        type Response = ApiResponse;
978        type Error = RestError;
979        type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
980
981        fn poll_ready(
982            &mut self,
983            _cx: &mut Context<'_>,
984        ) -> Poll<std::result::Result<(), Self::Error>> {
985            // CloudClient is always ready since it uses an internal connection pool
986            Poll::Ready(Ok(()))
987        }
988
989        fn call(&mut self, req: ApiRequest) -> Self::Future {
990            let client = self.clone();
991            Box::pin(async move {
992                let url = client.normalize_url(&req.path);
993
994                let request_builder = match req.method {
995                    Method::Get => client.client.get(&url),
996                    Method::Post => {
997                        let body = req.body.ok_or_else(|| RestError::BadRequest {
998                            message: "POST request requires a body".to_string(),
999                        })?;
1000                        client.client.post(&url).json(&body)
1001                    }
1002                    Method::Put => {
1003                        let body = req.body.ok_or_else(|| RestError::BadRequest {
1004                            message: "PUT request requires a body".to_string(),
1005                        })?;
1006                        client.client.put(&url).json(&body)
1007                    }
1008                    Method::Patch => {
1009                        let body = req.body.ok_or_else(|| RestError::BadRequest {
1010                            message: "PATCH request requires a body".to_string(),
1011                        })?;
1012                        client.client.patch(&url).json(&body)
1013                    }
1014                    Method::Delete => client.client.delete(&url),
1015                };
1016
1017                let response = request_builder
1018                    .header("x-api-key", &client.api_key)
1019                    .header("x-api-secret-key", &client.api_secret)
1020                    .send()
1021                    .await?;
1022
1023                let (status, body) = client.handle_response_with_status(response).await?;
1024
1025                Ok(ApiResponse { status, body })
1026            })
1027        }
1028    }
1029}