Skip to main content

redis_cloud/
client.rs

1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a `CloudClient` with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final `CloudClient` instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31///     .api_key("your-api-key")
32///     .api_secret("your-api-secret")
33///     .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37///     .api_key("your-api-key")
38///     .api_secret("your-api-secret")
39///     .base_url("https://api.redislabs.com/v1".to_string())
40///     .timeout(std::time::Duration::from_secs(120))
41///     .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46    api_key: Option<String>,
47    api_secret: Option<String>,
48    base_url: String,
49    timeout: std::time::Duration,
50    user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54    fn default() -> Self {
55        Self {
56            api_key: None,
57            api_secret: None,
58            base_url: "https://api.redislabs.com/v1".to_string(),
59            timeout: std::time::Duration::from_secs(30),
60            user_agent: DEFAULT_USER_AGENT.to_string(),
61        }
62    }
63}
64
65impl CloudClientBuilder {
66    /// Create a new builder
67    #[must_use]
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Set the API key
73    #[must_use]
74    pub fn api_key(mut self, key: impl Into<String>) -> Self {
75        self.api_key = Some(key.into());
76        self
77    }
78
79    /// Set the API secret
80    #[must_use]
81    pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
82        self.api_secret = Some(secret.into());
83        self
84    }
85
86    /// Set the base URL
87    #[must_use]
88    pub fn base_url(mut self, url: impl Into<String>) -> Self {
89        self.base_url = url.into();
90        self
91    }
92
93    /// Set the timeout
94    #[must_use]
95    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
96        self.timeout = timeout;
97        self
98    }
99
100    /// Set the user agent string for HTTP requests
101    ///
102    /// The default user agent is `redis-cloud/{version}`.
103    /// This can be overridden to identify specific clients, for example:
104    /// `redisctl/1.2.3` or `my-app/1.0.0`.
105    #[must_use]
106    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
107        self.user_agent = user_agent.into();
108        self
109    }
110
111    /// Build the client
112    pub fn build(self) -> Result<CloudClient> {
113        let api_key = self
114            .api_key
115            .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
116        let api_secret = self
117            .api_secret
118            .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
119
120        let mut default_headers = HeaderMap::new();
121        default_headers.insert(
122            USER_AGENT,
123            HeaderValue::from_str(&self.user_agent)
124                .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {e}")))?,
125        );
126
127        let client = Client::builder()
128            .timeout(self.timeout)
129            .default_headers(default_headers)
130            .build()
131            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
132
133        Ok(CloudClient {
134            api_key,
135            api_secret,
136            base_url: self.base_url,
137            timeout: self.timeout,
138            client: Arc::new(client),
139        })
140    }
141}
142
143/// Redis Cloud API client
144#[derive(Clone)]
145pub struct CloudClient {
146    pub(crate) api_key: String,
147    pub(crate) api_secret: String,
148    pub(crate) base_url: String,
149    pub(crate) timeout: std::time::Duration,
150    pub(crate) client: Arc<Client>,
151}
152
153impl CloudClient {
154    /// Create a new builder for the client
155    #[must_use]
156    pub fn builder() -> CloudClientBuilder {
157        CloudClientBuilder::new()
158    }
159
160    /// Get the configured request timeout
161    ///
162    /// Returns the timeout duration that was set when building the client.
163    /// This timeout is applied to all HTTP requests made by this client.
164    #[must_use]
165    pub fn timeout(&self) -> std::time::Duration {
166        self.timeout
167    }
168
169    // ========================================================================
170    // Fluent API - Handler accessors
171    // ========================================================================
172
173    /// Get an account handler for account management operations
174    ///
175    /// # Example
176    ///
177    /// ```rust,no_run
178    /// # use redis_cloud::CloudClient;
179    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
180    /// let client = CloudClient::builder()
181    ///     .api_key("key")
182    ///     .api_secret("secret")
183    ///     .build()?;
184    ///
185    /// let account = client.account().get_current_account().await?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    #[must_use]
190    pub fn account(&self) -> crate::AccountHandler {
191        crate::AccountHandler::new(self.clone())
192    }
193
194    /// Get a subscription handler for Pro subscription operations
195    ///
196    /// # Example
197    ///
198    /// ```rust,no_run
199    /// # use redis_cloud::CloudClient;
200    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
201    /// let client = CloudClient::builder()
202    ///     .api_key("key")
203    ///     .api_secret("secret")
204    ///     .build()?;
205    ///
206    /// let subscriptions = client.subscriptions().get_all_subscriptions().await?;
207    /// # Ok(())
208    /// # }
209    /// ```
210    #[must_use]
211    pub fn subscriptions(&self) -> crate::SubscriptionHandler {
212        crate::SubscriptionHandler::new(self.clone())
213    }
214
215    /// Get a database handler for Pro database operations
216    ///
217    /// # Example
218    ///
219    /// ```rust,no_run
220    /// # use redis_cloud::CloudClient;
221    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222    /// let client = CloudClient::builder()
223    ///     .api_key("key")
224    ///     .api_secret("secret")
225    ///     .build()?;
226    ///
227    /// let databases = client.databases().get_subscription_databases(123, None, None).await?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    #[must_use]
232    pub fn databases(&self) -> crate::DatabaseHandler {
233        crate::DatabaseHandler::new(self.clone())
234    }
235
236    /// Get a fixed subscription handler for Essentials subscription operations
237    ///
238    /// # Example
239    ///
240    /// ```rust,no_run
241    /// # use redis_cloud::CloudClient;
242    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
243    /// let client = CloudClient::builder()
244    ///     .api_key("key")
245    ///     .api_secret("secret")
246    ///     .build()?;
247    ///
248    /// let subscriptions = client.fixed_subscriptions().list().await?;
249    /// # Ok(())
250    /// # }
251    /// ```
252    #[must_use]
253    pub fn fixed_subscriptions(&self) -> crate::FixedSubscriptionHandler {
254        crate::FixedSubscriptionHandler::new(self.clone())
255    }
256
257    /// Get a fixed database handler for Essentials database operations
258    ///
259    /// # Example
260    ///
261    /// ```rust,no_run
262    /// # use redis_cloud::CloudClient;
263    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
264    /// let client = CloudClient::builder()
265    ///     .api_key("key")
266    ///     .api_secret("secret")
267    ///     .build()?;
268    ///
269    /// let databases = client.fixed_databases().list(123, None, None).await?;
270    /// # Ok(())
271    /// # }
272    /// ```
273    #[must_use]
274    pub fn fixed_databases(&self) -> crate::FixedDatabaseHandler {
275        crate::FixedDatabaseHandler::new(self.clone())
276    }
277
278    /// Get an ACL handler for access control operations
279    ///
280    /// # Example
281    ///
282    /// ```rust,no_run
283    /// # use redis_cloud::CloudClient;
284    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
285    /// let client = CloudClient::builder()
286    ///     .api_key("key")
287    ///     .api_secret("secret")
288    ///     .build()?;
289    ///
290    /// let users = client.acl().get_all_acl_users().await?;
291    /// # Ok(())
292    /// # }
293    /// ```
294    #[must_use]
295    pub fn acl(&self) -> crate::AclHandler {
296        crate::AclHandler::new(self.clone())
297    }
298
299    /// Get a users handler for user management operations
300    ///
301    /// # Example
302    ///
303    /// ```rust,no_run
304    /// # use redis_cloud::CloudClient;
305    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306    /// let client = CloudClient::builder()
307    ///     .api_key("key")
308    ///     .api_secret("secret")
309    ///     .build()?;
310    ///
311    /// let users = client.users().get_all_users().await?;
312    /// # Ok(())
313    /// # }
314    /// ```
315    #[must_use]
316    pub fn users(&self) -> crate::UserHandler {
317        crate::UserHandler::new(self.clone())
318    }
319
320    /// Get a tasks handler for async operation tracking
321    ///
322    /// # Example
323    ///
324    /// ```rust,no_run
325    /// # use redis_cloud::CloudClient;
326    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327    /// let client = CloudClient::builder()
328    ///     .api_key("key")
329    ///     .api_secret("secret")
330    ///     .build()?;
331    ///
332    /// let tasks = client.tasks().get_all_tasks().await?;
333    /// # Ok(())
334    /// # }
335    /// ```
336    #[must_use]
337    pub fn tasks(&self) -> crate::TaskHandler {
338        crate::TaskHandler::new(self.clone())
339    }
340
341    /// Get a cloud accounts handler for cloud provider integration
342    ///
343    /// # Example
344    ///
345    /// ```rust,no_run
346    /// # use redis_cloud::CloudClient;
347    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
348    /// let client = CloudClient::builder()
349    ///     .api_key("key")
350    ///     .api_secret("secret")
351    ///     .build()?;
352    ///
353    /// let accounts = client.cloud_accounts().get_cloud_accounts().await?;
354    /// # Ok(())
355    /// # }
356    /// ```
357    #[must_use]
358    pub fn cloud_accounts(&self) -> crate::CloudAccountHandler {
359        crate::CloudAccountHandler::new(self.clone())
360    }
361
362    /// Get a VPC peering handler for VPC peering operations
363    ///
364    /// # Example
365    ///
366    /// ```rust,no_run
367    /// # use redis_cloud::CloudClient;
368    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369    /// let client = CloudClient::builder()
370    ///     .api_key("key")
371    ///     .api_secret("secret")
372    ///     .build()?;
373    ///
374    /// let peering = client.vpc_peering().get(123).await?;
375    /// # Ok(())
376    /// # }
377    /// ```
378    #[must_use]
379    pub fn vpc_peering(&self) -> crate::VpcPeeringHandler {
380        crate::VpcPeeringHandler::new(self.clone())
381    }
382
383    /// Get a transit gateway handler for AWS Transit Gateway operations
384    ///
385    /// # Example
386    ///
387    /// ```rust,no_run
388    /// # use redis_cloud::CloudClient;
389    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
390    /// let client = CloudClient::builder()
391    ///     .api_key("key")
392    ///     .api_secret("secret")
393    ///     .build()?;
394    ///
395    /// let attachments = client.transit_gateway().get_attachments(123).await?;
396    /// # Ok(())
397    /// # }
398    /// ```
399    #[must_use]
400    pub fn transit_gateway(&self) -> crate::TransitGatewayHandler {
401        crate::TransitGatewayHandler::new(self.clone())
402    }
403
404    /// Get a Private Service Connect handler for GCP PSC operations
405    ///
406    /// # Example
407    ///
408    /// ```rust,no_run
409    /// # use redis_cloud::CloudClient;
410    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411    /// let client = CloudClient::builder()
412    ///     .api_key("key")
413    ///     .api_secret("secret")
414    ///     .build()?;
415    ///
416    /// let service = client.psc().get_service(123).await?;
417    /// # Ok(())
418    /// # }
419    /// ```
420    #[must_use]
421    pub fn psc(&self) -> crate::PscHandler {
422        crate::PscHandler::new(self.clone())
423    }
424
425    /// Get a `PrivateLink` handler for AWS `PrivateLink` operations
426    ///
427    /// # Example
428    ///
429    /// ```rust,no_run
430    /// # use redis_cloud::CloudClient;
431    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
432    /// let client = CloudClient::builder()
433    ///     .api_key("key")
434    ///     .api_secret("secret")
435    ///     .build()?;
436    ///
437    /// let config = client.private_link().get(123).await?;
438    /// # Ok(())
439    /// # }
440    /// ```
441    #[must_use]
442    pub fn private_link(&self) -> crate::PrivateLinkHandler {
443        crate::PrivateLinkHandler::new(self.clone())
444    }
445
446    /// Get a cost report handler for generating cost reports
447    ///
448    /// # Example
449    ///
450    /// ```rust,no_run
451    /// # use redis_cloud::CloudClient;
452    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
453    /// let client = CloudClient::builder()
454    ///     .api_key("key")
455    ///     .api_secret("secret")
456    ///     .build()?;
457    ///
458    /// let handler = client.cost_reports();
459    /// # Ok(())
460    /// # }
461    /// ```
462    #[must_use]
463    pub fn cost_reports(&self) -> crate::CostReportHandler {
464        crate::CostReportHandler::new(self.clone())
465    }
466
467    /// Normalize URL path concatenation to avoid double slashes
468    fn normalize_url(&self, path: &str) -> String {
469        let base = self.base_url.trim_end_matches('/');
470        let path = path.trim_start_matches('/');
471        format!("{base}/{path}")
472    }
473
474    /// Convert HTTP status code and response text to appropriate error
475    ///
476    /// This is a helper to avoid duplicating the error handling pattern
477    /// across multiple methods.
478    fn status_to_error(status: reqwest::StatusCode, text: String) -> RestError {
479        match status.as_u16() {
480            400 => RestError::BadRequest { message: text },
481            401 => RestError::AuthenticationFailed { message: text },
482            403 => RestError::Forbidden { message: text },
483            404 => RestError::NotFound { message: text },
484            412 => RestError::PreconditionFailed,
485            429 => RestError::RateLimited { message: text },
486            500 => RestError::InternalServerError { message: text },
487            503 => RestError::ServiceUnavailable { message: text },
488            _ => RestError::ApiError {
489                code: status.as_u16(),
490                message: text,
491            },
492        }
493    }
494
495    /// Make a GET request with API key authentication
496    #[instrument(skip(self), fields(method = "GET"))]
497    pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
498        let url = self.normalize_url(path);
499        debug!("GET {}", url);
500
501        // Redis Cloud API uses these headers for authentication
502        let response = self
503            .client
504            .get(&url)
505            .header("x-api-key", &self.api_key)
506            .header("x-api-secret-key", &self.api_secret)
507            .send()
508            .await?;
509
510        trace!("Response status: {}", response.status());
511        self.handle_response(response).await
512    }
513
514    /// Make a POST request
515    #[instrument(skip(self, body), fields(method = "POST"))]
516    pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
517        &self,
518        path: &str,
519        body: &B,
520    ) -> Result<T> {
521        let url = self.normalize_url(path);
522        debug!("POST {}", url);
523        trace!("Request body: {:?}", serde_json::to_value(body).ok());
524
525        // Same backwards header naming as GET
526        let response = self
527            .client
528            .post(&url)
529            .header("x-api-key", &self.api_key)
530            .header("x-api-secret-key", &self.api_secret)
531            .json(body)
532            .send()
533            .await?;
534
535        trace!("Response status: {}", response.status());
536        self.handle_response(response).await
537    }
538
539    /// Make a PUT request
540    #[instrument(skip(self, body), fields(method = "PUT"))]
541    pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
542        &self,
543        path: &str,
544        body: &B,
545    ) -> Result<T> {
546        let url = self.normalize_url(path);
547        debug!("PUT {}", url);
548        trace!("Request body: {:?}", serde_json::to_value(body).ok());
549
550        // Same backwards header naming as GET
551        let response = self
552            .client
553            .put(&url)
554            .header("x-api-key", &self.api_key)
555            .header("x-api-secret-key", &self.api_secret)
556            .json(body)
557            .send()
558            .await?;
559
560        trace!("Response status: {}", response.status());
561        self.handle_response(response).await
562    }
563
564    /// Make a DELETE request
565    #[instrument(skip(self), fields(method = "DELETE"))]
566    pub async fn delete(&self, path: &str) -> Result<()> {
567        let url = self.normalize_url(path);
568        debug!("DELETE {}", url);
569
570        // Same backwards header naming as GET
571        let response = self
572            .client
573            .delete(&url)
574            .header("x-api-key", &self.api_key)
575            .header("x-api-secret-key", &self.api_secret)
576            .send()
577            .await?;
578
579        trace!("Response status: {}", response.status());
580        if response.status().is_success() {
581            Ok(())
582        } else {
583            let status = response.status();
584            let text = response
585                .text()
586                .await
587                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
588            Err(Self::status_to_error(status, text))
589        }
590    }
591
592    /// Execute raw GET request returning JSON Value
593    #[instrument(skip(self), fields(method = "GET"))]
594    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
595        self.get(path).await
596    }
597
598    /// Execute GET request returning raw bytes
599    ///
600    /// Useful for downloading binary content like cost reports or other files.
601    #[instrument(skip(self), fields(method = "GET"))]
602    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
603        let url = self.normalize_url(path);
604        debug!("GET {} (bytes)", url);
605
606        let response = self
607            .client
608            .get(&url)
609            .header("x-api-key", &self.api_key)
610            .header("x-api-secret-key", &self.api_secret)
611            .send()
612            .await?;
613
614        trace!("Response status: {}", response.status());
615        let status = response.status();
616
617        if status.is_success() {
618            response
619                .bytes()
620                .await
621                .map(|b| b.to_vec())
622                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))
623        } else {
624            let text = response
625                .text()
626                .await
627                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
628            Err(Self::status_to_error(status, text))
629        }
630    }
631
632    /// Execute raw POST request with JSON body
633    #[instrument(skip(self, body), fields(method = "POST"))]
634    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
635        self.post(path, &body).await
636    }
637
638    /// Execute raw PUT request with JSON body
639    #[instrument(skip(self, body), fields(method = "PUT"))]
640    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
641        self.put(path, &body).await
642    }
643
644    /// Execute raw PATCH request with JSON body
645    #[instrument(skip(self, body), fields(method = "PATCH"))]
646    pub async fn patch_raw(
647        &self,
648        path: &str,
649        body: serde_json::Value,
650    ) -> Result<serde_json::Value> {
651        let url = self.normalize_url(path);
652        debug!("PATCH {}", url);
653        trace!("Request body: {:?}", body);
654
655        // Use backwards header names for compatibility
656        let response = self
657            .client
658            .patch(&url)
659            .header("x-api-key", &self.api_key)
660            .header("x-api-secret-key", &self.api_secret)
661            .json(&body)
662            .send()
663            .await?;
664
665        trace!("Response status: {}", response.status());
666        self.handle_response(response).await
667    }
668
669    /// Execute raw DELETE request returning any response body
670    #[instrument(skip(self), fields(method = "DELETE"))]
671    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
672        let url = self.normalize_url(path);
673        debug!("DELETE {}", url);
674
675        // Use backwards header names for compatibility
676        let response = self
677            .client
678            .delete(&url)
679            .header("x-api-key", &self.api_key)
680            .header("x-api-secret-key", &self.api_secret)
681            .send()
682            .await?;
683
684        trace!("Response status: {}", response.status());
685        if response.status().is_success() {
686            if response.content_length() == Some(0) {
687                Ok(serde_json::json!({"status": "deleted"}))
688            } else {
689                response.json().await.map_err(Into::into)
690            }
691        } else {
692            let status = response.status();
693            let text = response
694                .text()
695                .await
696                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
697            Err(Self::status_to_error(status, text))
698        }
699    }
700
701    /// Execute DELETE request with JSON body (used by some endpoints like `PrivateLink` principals)
702    #[instrument(skip(self, body), fields(method = "DELETE"))]
703    pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
704        &self,
705        path: &str,
706        body: serde_json::Value,
707    ) -> Result<T> {
708        let url = self.normalize_url(path);
709        debug!("DELETE {} (with body)", url);
710        trace!("Request body: {:?}", body);
711
712        let response = self
713            .client
714            .delete(&url)
715            .header("x-api-key", &self.api_key)
716            .header("x-api-secret-key", &self.api_secret)
717            .json(&body)
718            .send()
719            .await?;
720
721        trace!("Response status: {}", response.status());
722        self.handle_response(response).await
723    }
724
725    /// Handle HTTP response and return both status code and body as JSON
726    ///
727    /// This is used internally by the Tower service implementation to preserve
728    /// the actual HTTP status code in responses.
729    #[cfg(feature = "tower-integration")]
730    async fn handle_response_with_status(
731        &self,
732        response: reqwest::Response,
733    ) -> Result<(u16, serde_json::Value)> {
734        let status = response.status();
735        let status_code = status.as_u16();
736
737        if status.is_success() {
738            let bytes = response
739                .bytes()
740                .await
741                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
742
743            let value: serde_json::Value = serde_json::from_slice(&bytes).map_err(|e| {
744                RestError::ConnectionError(format!("Failed to parse JSON response: {e}"))
745            })?;
746
747            Ok((status_code, value))
748        } else {
749            let text = response
750                .text()
751                .await
752                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
753            Err(Self::status_to_error(status, text))
754        }
755    }
756
757    /// Handle HTTP response
758    async fn handle_response<T: serde::de::DeserializeOwned>(
759        &self,
760        response: reqwest::Response,
761    ) -> Result<T> {
762        let status = response.status();
763
764        if status.is_success() {
765            // Get the response bytes for better error reporting
766            let bytes = response
767                .bytes()
768                .await
769                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
770
771            // Use serde_path_to_error for better deserialization error messages
772            let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
773            serde_path_to_error::deserialize(deserializer).map_err(|err| {
774                let path = err.path().to_string();
775                // Use ConnectionError to provide detailed error message with field path
776                RestError::ConnectionError(format!(
777                    "Failed to deserialize field '{}': {}",
778                    path,
779                    err.inner()
780                ))
781            })
782        } else {
783            let text = response
784                .text()
785                .await
786                .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
787            Err(Self::status_to_error(status, text))
788        }
789    }
790}
791
792/// Tower Service integration for `CloudClient`
793///
794/// This module provides Tower Service implementations for `CloudClient`, enabling
795/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
796///
797/// # Feature Flag
798///
799/// This module is only available when the `tower-integration` feature is enabled.
800///
801/// # Examples
802///
803/// ```rust,ignore
804/// use redis_cloud::CloudClient;
805/// use redis_cloud::tower_support::ApiRequest;
806/// use tower::ServiceExt;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let client = CloudClient::builder()
810///     .api_key("your-key")
811///     .api_secret("your-secret")
812///     .build()?;
813///
814/// // Convert to a Tower service
815/// let mut service = client.into_service();
816///
817/// // Use the service
818/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
819/// println!("Status: {}", response.status);
820/// # Ok(())
821/// # }
822/// ```
823#[cfg(feature = "tower-integration")]
824pub mod tower_support {
825    use super::{CloudClient, RestError, Result};
826    use std::future::Future;
827    use std::pin::Pin;
828    use std::task::{Context, Poll};
829    use tower::Service;
830
831    /// HTTP method for API requests
832    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
833    pub enum Method {
834        /// GET request
835        Get,
836        /// POST request
837        Post,
838        /// PUT request
839        Put,
840        /// PATCH request
841        Patch,
842        /// DELETE request
843        Delete,
844    }
845
846    /// Tower-compatible request type for Redis Cloud API
847    ///
848    /// This wraps the essential components of an API request in a format
849    /// suitable for Tower middleware composition.
850    #[derive(Debug, Clone)]
851    pub struct ApiRequest {
852        /// HTTP method
853        pub method: Method,
854        /// API endpoint path (e.g., "/subscriptions")
855        pub path: String,
856        /// Optional JSON body for POST/PUT/PATCH requests
857        pub body: Option<serde_json::Value>,
858    }
859
860    impl ApiRequest {
861        /// Create a GET request
862        pub fn get(path: impl Into<String>) -> Self {
863            Self {
864                method: Method::Get,
865                path: path.into(),
866                body: None,
867            }
868        }
869
870        /// Create a POST request with a JSON body
871        pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
872            Self {
873                method: Method::Post,
874                path: path.into(),
875                body: Some(body),
876            }
877        }
878
879        /// Create a PUT request with a JSON body
880        pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
881            Self {
882                method: Method::Put,
883                path: path.into(),
884                body: Some(body),
885            }
886        }
887
888        /// Create a PATCH request with a JSON body
889        pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
890            Self {
891                method: Method::Patch,
892                path: path.into(),
893                body: Some(body),
894            }
895        }
896
897        /// Create a DELETE request
898        pub fn delete(path: impl Into<String>) -> Self {
899            Self {
900                method: Method::Delete,
901                path: path.into(),
902                body: None,
903            }
904        }
905    }
906
907    /// Tower-compatible response type
908    ///
909    /// Contains the HTTP status code and response body as JSON.
910    #[derive(Debug, Clone)]
911    pub struct ApiResponse {
912        /// HTTP status code
913        pub status: u16,
914        /// Response body as JSON
915        pub body: serde_json::Value,
916    }
917
918    impl CloudClient {
919        /// Convert this client into a Tower service
920        ///
921        /// This consumes the client and returns it wrapped in a Tower service
922        /// implementation, enabling middleware composition.
923        ///
924        /// # Examples
925        ///
926        /// ```rust,ignore
927        /// use redis_cloud::CloudClient;
928        /// use tower::ServiceExt;
929        /// use redis_cloud::tower_support::ApiRequest;
930        ///
931        /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
932        /// let client = CloudClient::builder()
933        ///     .api_key("key")
934        ///     .api_secret("secret")
935        ///     .build()?;
936        ///
937        /// let mut service = client.into_service();
938        /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
939        /// # Ok(())
940        /// # }
941        /// ```
942        #[must_use]
943        pub fn into_service(self) -> Self {
944            self
945        }
946    }
947
948    impl Service<ApiRequest> for CloudClient {
949        type Response = ApiResponse;
950        type Error = RestError;
951        type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
952
953        fn poll_ready(
954            &mut self,
955            _cx: &mut Context<'_>,
956        ) -> Poll<std::result::Result<(), Self::Error>> {
957            // CloudClient is always ready since it uses an internal connection pool
958            Poll::Ready(Ok(()))
959        }
960
961        fn call(&mut self, req: ApiRequest) -> Self::Future {
962            let client = self.clone();
963            Box::pin(async move {
964                let url = client.normalize_url(&req.path);
965
966                let request_builder = match req.method {
967                    Method::Get => client.client.get(&url),
968                    Method::Post => {
969                        let body = req.body.ok_or_else(|| RestError::BadRequest {
970                            message: "POST request requires a body".to_string(),
971                        })?;
972                        client.client.post(&url).json(&body)
973                    }
974                    Method::Put => {
975                        let body = req.body.ok_or_else(|| RestError::BadRequest {
976                            message: "PUT request requires a body".to_string(),
977                        })?;
978                        client.client.put(&url).json(&body)
979                    }
980                    Method::Patch => {
981                        let body = req.body.ok_or_else(|| RestError::BadRequest {
982                            message: "PATCH request requires a body".to_string(),
983                        })?;
984                        client.client.patch(&url).json(&body)
985                    }
986                    Method::Delete => client.client.delete(&url),
987                };
988
989                let response = request_builder
990                    .header("x-api-key", &client.api_key)
991                    .header("x-api-secret-key", &client.api_secret)
992                    .send()
993                    .await?;
994
995                let (status, body) = client.handle_response_with_status(response).await?;
996
997                Ok(ApiResponse { status, body })
998            })
999        }
1000    }
1001}