Skip to main content

redis_cloud/
client.rs

1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a CloudClient with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final CloudClient instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31///     .api_key("your-api-key")
32///     .api_secret("your-api-secret")
33///     .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37///     .api_key("your-api-key")
38///     .api_secret("your-api-secret")
39///     .base_url("https://api.redislabs.com/v1".to_string())
40///     .timeout(std::time::Duration::from_secs(120))
41///     .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46    api_key: Option<String>,
47    api_secret: Option<String>,
48    base_url: String,
49    timeout: std::time::Duration,
50    user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54    fn default() -> Self {
55        Self {
56            api_key: None,
57            api_secret: None,
58            base_url: "https://api.redislabs.com/v1".to_string(),
59            timeout: std::time::Duration::from_secs(30),
60            user_agent: DEFAULT_USER_AGENT.to_string(),
61        }
62    }
63}
64
65impl CloudClientBuilder {
66    /// Create a new builder
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Set the API key
72    pub fn api_key(mut self, key: impl Into<String>) -> Self {
73        self.api_key = Some(key.into());
74        self
75    }
76
77    /// Set the API secret
78    pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
79        self.api_secret = Some(secret.into());
80        self
81    }
82
83    /// Set the base URL
84    pub fn base_url(mut self, url: impl Into<String>) -> Self {
85        self.base_url = url.into();
86        self
87    }
88
89    /// Set the timeout
90    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
91        self.timeout = timeout;
92        self
93    }
94
95    /// Set the user agent string for HTTP requests
96    ///
97    /// The default user agent is `redis-cloud/{version}`.
98    /// This can be overridden to identify specific clients, for example:
99    /// `redisctl/1.2.3` or `my-app/1.0.0`.
100    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
101        self.user_agent = user_agent.into();
102        self
103    }
104
105    /// Build the client
106    pub fn build(self) -> Result<CloudClient> {
107        let api_key = self
108            .api_key
109            .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
110        let api_secret = self
111            .api_secret
112            .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
113
114        let mut default_headers = HeaderMap::new();
115        default_headers.insert(
116            USER_AGENT,
117            HeaderValue::from_str(&self.user_agent)
118                .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {}", e)))?,
119        );
120
121        let client = Client::builder()
122            .timeout(self.timeout)
123            .default_headers(default_headers)
124            .build()
125            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
126
127        Ok(CloudClient {
128            api_key,
129            api_secret,
130            base_url: self.base_url,
131            timeout: self.timeout,
132            client: Arc::new(client),
133        })
134    }
135}
136
137/// Redis Cloud API client
138#[derive(Clone)]
139pub struct CloudClient {
140    pub(crate) api_key: String,
141    pub(crate) api_secret: String,
142    pub(crate) base_url: String,
143    #[allow(dead_code)]
144    pub(crate) timeout: std::time::Duration,
145    pub(crate) client: Arc<Client>,
146}
147
148impl CloudClient {
149    /// Create a new builder for the client
150    pub fn builder() -> CloudClientBuilder {
151        CloudClientBuilder::new()
152    }
153
154    /// Normalize URL path concatenation to avoid double slashes
155    fn normalize_url(&self, path: &str) -> String {
156        let base = self.base_url.trim_end_matches('/');
157        let path = path.trim_start_matches('/');
158        format!("{}/{}", base, path)
159    }
160
161    /// Make a GET request with API key authentication
162    #[instrument(skip(self), fields(method = "GET"))]
163    pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
164        let url = self.normalize_url(path);
165        debug!("GET {}", url);
166
167        // Redis Cloud API uses these headers for authentication
168        let response = self
169            .client
170            .get(&url)
171            .header("x-api-key", &self.api_key)
172            .header("x-api-secret-key", &self.api_secret)
173            .send()
174            .await?;
175
176        trace!("Response status: {}", response.status());
177        self.handle_response(response).await
178    }
179
180    /// Make a POST request
181    #[instrument(skip(self, body), fields(method = "POST"))]
182    pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
183        &self,
184        path: &str,
185        body: &B,
186    ) -> Result<T> {
187        let url = self.normalize_url(path);
188        debug!("POST {}", url);
189        trace!("Request body: {:?}", serde_json::to_value(body).ok());
190
191        // Same backwards header naming as GET
192        let response = self
193            .client
194            .post(&url)
195            .header("x-api-key", &self.api_key)
196            .header("x-api-secret-key", &self.api_secret)
197            .json(body)
198            .send()
199            .await?;
200
201        trace!("Response status: {}", response.status());
202        self.handle_response(response).await
203    }
204
205    /// Make a PUT request
206    #[instrument(skip(self, body), fields(method = "PUT"))]
207    pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
208        &self,
209        path: &str,
210        body: &B,
211    ) -> Result<T> {
212        let url = self.normalize_url(path);
213        debug!("PUT {}", url);
214        trace!("Request body: {:?}", serde_json::to_value(body).ok());
215
216        // Same backwards header naming as GET
217        let response = self
218            .client
219            .put(&url)
220            .header("x-api-key", &self.api_key)
221            .header("x-api-secret-key", &self.api_secret)
222            .json(body)
223            .send()
224            .await?;
225
226        trace!("Response status: {}", response.status());
227        self.handle_response(response).await
228    }
229
230    /// Make a DELETE request
231    #[instrument(skip(self), fields(method = "DELETE"))]
232    pub async fn delete(&self, path: &str) -> Result<()> {
233        let url = self.normalize_url(path);
234        debug!("DELETE {}", url);
235
236        // Same backwards header naming as GET
237        let response = self
238            .client
239            .delete(&url)
240            .header("x-api-key", &self.api_key)
241            .header("x-api-secret-key", &self.api_secret)
242            .send()
243            .await?;
244
245        trace!("Response status: {}", response.status());
246        if response.status().is_success() {
247            Ok(())
248        } else {
249            let status = response.status();
250            let text = response.text().await.unwrap_or_default();
251
252            match status.as_u16() {
253                400 => Err(RestError::BadRequest { message: text }),
254                401 => Err(RestError::AuthenticationFailed { message: text }),
255                403 => Err(RestError::Forbidden { message: text }),
256                404 => Err(RestError::NotFound { message: text }),
257                412 => Err(RestError::PreconditionFailed),
258                429 => Err(RestError::RateLimited { message: text }),
259                500 => Err(RestError::InternalServerError { message: text }),
260                503 => Err(RestError::ServiceUnavailable { message: text }),
261                _ => Err(RestError::ApiError {
262                    code: status.as_u16(),
263                    message: text,
264                }),
265            }
266        }
267    }
268
269    /// Execute raw GET request returning JSON Value
270    #[instrument(skip(self), fields(method = "GET"))]
271    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
272        self.get(path).await
273    }
274
275    /// Execute GET request returning raw bytes
276    ///
277    /// Useful for downloading binary content like cost reports or other files.
278    #[instrument(skip(self), fields(method = "GET"))]
279    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
280        let url = self.normalize_url(path);
281        debug!("GET {} (bytes)", url);
282
283        let response = self
284            .client
285            .get(&url)
286            .header("x-api-key", &self.api_key)
287            .header("x-api-secret-key", &self.api_secret)
288            .send()
289            .await?;
290
291        trace!("Response status: {}", response.status());
292        let status = response.status();
293
294        if status.is_success() {
295            response
296                .bytes()
297                .await
298                .map(|b| b.to_vec())
299                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {}", e)))
300        } else {
301            let text = response.text().await.unwrap_or_default();
302
303            match status.as_u16() {
304                400 => Err(RestError::BadRequest { message: text }),
305                401 => Err(RestError::AuthenticationFailed { message: text }),
306                403 => Err(RestError::Forbidden { message: text }),
307                404 => Err(RestError::NotFound { message: text }),
308                412 => Err(RestError::PreconditionFailed),
309                429 => Err(RestError::RateLimited { message: text }),
310                500 => Err(RestError::InternalServerError { message: text }),
311                503 => Err(RestError::ServiceUnavailable { message: text }),
312                _ => Err(RestError::ApiError {
313                    code: status.as_u16(),
314                    message: text,
315                }),
316            }
317        }
318    }
319
320    /// Execute raw POST request with JSON body
321    #[instrument(skip(self, body), fields(method = "POST"))]
322    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
323        self.post(path, &body).await
324    }
325
326    /// Execute raw PUT request with JSON body
327    #[instrument(skip(self, body), fields(method = "PUT"))]
328    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
329        self.put(path, &body).await
330    }
331
332    /// Execute raw PATCH request with JSON body
333    #[instrument(skip(self, body), fields(method = "PATCH"))]
334    pub async fn patch_raw(
335        &self,
336        path: &str,
337        body: serde_json::Value,
338    ) -> Result<serde_json::Value> {
339        let url = self.normalize_url(path);
340        debug!("PATCH {}", url);
341        trace!("Request body: {:?}", body);
342
343        // Use backwards header names for compatibility
344        let response = self
345            .client
346            .patch(&url)
347            .header("x-api-key", &self.api_key)
348            .header("x-api-secret-key", &self.api_secret)
349            .json(&body)
350            .send()
351            .await?;
352
353        trace!("Response status: {}", response.status());
354        self.handle_response(response).await
355    }
356
357    /// Execute raw DELETE request returning any response body
358    #[instrument(skip(self), fields(method = "DELETE"))]
359    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
360        let url = self.normalize_url(path);
361        debug!("DELETE {}", url);
362
363        // Use backwards header names for compatibility
364        let response = self
365            .client
366            .delete(&url)
367            .header("x-api-key", &self.api_key)
368            .header("x-api-secret-key", &self.api_secret)
369            .send()
370            .await?;
371
372        trace!("Response status: {}", response.status());
373        if response.status().is_success() {
374            if response.content_length() == Some(0) {
375                Ok(serde_json::json!({"status": "deleted"}))
376            } else {
377                response.json().await.map_err(Into::into)
378            }
379        } else {
380            let status = response.status();
381            let text = response.text().await.unwrap_or_default();
382
383            match status.as_u16() {
384                400 => Err(RestError::BadRequest { message: text }),
385                401 => Err(RestError::AuthenticationFailed { message: text }),
386                403 => Err(RestError::Forbidden { message: text }),
387                404 => Err(RestError::NotFound { message: text }),
388                412 => Err(RestError::PreconditionFailed),
389                429 => Err(RestError::RateLimited { message: text }),
390                500 => Err(RestError::InternalServerError { message: text }),
391                503 => Err(RestError::ServiceUnavailable { message: text }),
392                _ => Err(RestError::ApiError {
393                    code: status.as_u16(),
394                    message: text,
395                }),
396            }
397        }
398    }
399
400    /// Execute DELETE request with JSON body (used by some endpoints like PrivateLink principals)
401    #[instrument(skip(self, body), fields(method = "DELETE"))]
402    pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
403        &self,
404        path: &str,
405        body: serde_json::Value,
406    ) -> Result<T> {
407        let url = self.normalize_url(path);
408        debug!("DELETE {} (with body)", url);
409        trace!("Request body: {:?}", body);
410
411        let response = self
412            .client
413            .delete(&url)
414            .header("x-api-key", &self.api_key)
415            .header("x-api-secret-key", &self.api_secret)
416            .json(&body)
417            .send()
418            .await?;
419
420        trace!("Response status: {}", response.status());
421        self.handle_response(response).await
422    }
423
424    /// Handle HTTP response
425    async fn handle_response<T: serde::de::DeserializeOwned>(
426        &self,
427        response: reqwest::Response,
428    ) -> Result<T> {
429        let status = response.status();
430
431        if status.is_success() {
432            // Get the response bytes for better error reporting
433            let bytes = response.bytes().await.map_err(|e| {
434                RestError::ConnectionError(format!("Failed to read response: {}", e))
435            })?;
436
437            // Use serde_path_to_error for better deserialization error messages
438            let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
439            serde_path_to_error::deserialize(deserializer).map_err(|err| {
440                let path = err.path().to_string();
441                // Use ConnectionError to provide detailed error message with field path
442                RestError::ConnectionError(format!(
443                    "Failed to deserialize field '{}': {}",
444                    path,
445                    err.inner()
446                ))
447            })
448        } else {
449            let text = response.text().await.unwrap_or_default();
450
451            match status.as_u16() {
452                400 => Err(RestError::BadRequest { message: text }),
453                401 => Err(RestError::AuthenticationFailed { message: text }),
454                403 => Err(RestError::Forbidden { message: text }),
455                404 => Err(RestError::NotFound { message: text }),
456                412 => Err(RestError::PreconditionFailed),
457                429 => Err(RestError::RateLimited { message: text }),
458                500 => Err(RestError::InternalServerError { message: text }),
459                503 => Err(RestError::ServiceUnavailable { message: text }),
460                _ => Err(RestError::ApiError {
461                    code: status.as_u16(),
462                    message: text,
463                }),
464            }
465        }
466    }
467}
468
469/// Tower Service integration for CloudClient
470///
471/// This module provides Tower Service implementations for CloudClient, enabling
472/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
473///
474/// # Feature Flag
475///
476/// This module is only available when the `tower-integration` feature is enabled.
477///
478/// # Examples
479///
480/// ```rust,ignore
481/// use redis_cloud::CloudClient;
482/// use redis_cloud::tower_support::ApiRequest;
483/// use tower::ServiceExt;
484///
485/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
486/// let client = CloudClient::builder()
487///     .api_key("your-key")
488///     .api_secret("your-secret")
489///     .build()?;
490///
491/// // Convert to a Tower service
492/// let mut service = client.into_service();
493///
494/// // Use the service
495/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
496/// println!("Status: {}", response.status);
497/// # Ok(())
498/// # }
499/// ```
500#[cfg(feature = "tower-integration")]
501pub mod tower_support {
502    use super::*;
503    use std::future::Future;
504    use std::pin::Pin;
505    use std::task::{Context, Poll};
506    use tower::Service;
507
508    /// HTTP method for API requests
509    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
510    pub enum Method {
511        /// GET request
512        Get,
513        /// POST request
514        Post,
515        /// PUT request
516        Put,
517        /// PATCH request
518        Patch,
519        /// DELETE request
520        Delete,
521    }
522
523    /// Tower-compatible request type for Redis Cloud API
524    ///
525    /// This wraps the essential components of an API request in a format
526    /// suitable for Tower middleware composition.
527    #[derive(Debug, Clone)]
528    pub struct ApiRequest {
529        /// HTTP method
530        pub method: Method,
531        /// API endpoint path (e.g., "/subscriptions")
532        pub path: String,
533        /// Optional JSON body for POST/PUT/PATCH requests
534        pub body: Option<serde_json::Value>,
535    }
536
537    impl ApiRequest {
538        /// Create a GET request
539        pub fn get(path: impl Into<String>) -> Self {
540            Self {
541                method: Method::Get,
542                path: path.into(),
543                body: None,
544            }
545        }
546
547        /// Create a POST request with a JSON body
548        pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
549            Self {
550                method: Method::Post,
551                path: path.into(),
552                body: Some(body),
553            }
554        }
555
556        /// Create a PUT request with a JSON body
557        pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
558            Self {
559                method: Method::Put,
560                path: path.into(),
561                body: Some(body),
562            }
563        }
564
565        /// Create a PATCH request with a JSON body
566        pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
567            Self {
568                method: Method::Patch,
569                path: path.into(),
570                body: Some(body),
571            }
572        }
573
574        /// Create a DELETE request
575        pub fn delete(path: impl Into<String>) -> Self {
576            Self {
577                method: Method::Delete,
578                path: path.into(),
579                body: None,
580            }
581        }
582    }
583
584    /// Tower-compatible response type
585    ///
586    /// Contains the HTTP status code and response body as JSON.
587    #[derive(Debug, Clone)]
588    pub struct ApiResponse {
589        /// HTTP status code
590        pub status: u16,
591        /// Response body as JSON
592        pub body: serde_json::Value,
593    }
594
595    impl CloudClient {
596        /// Convert this client into a Tower service
597        ///
598        /// This consumes the client and returns it wrapped in a Tower service
599        /// implementation, enabling middleware composition.
600        ///
601        /// # Examples
602        ///
603        /// ```rust,ignore
604        /// use redis_cloud::CloudClient;
605        /// use tower::ServiceExt;
606        /// use redis_cloud::tower_support::ApiRequest;
607        ///
608        /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
609        /// let client = CloudClient::builder()
610        ///     .api_key("key")
611        ///     .api_secret("secret")
612        ///     .build()?;
613        ///
614        /// let mut service = client.into_service();
615        /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
616        /// # Ok(())
617        /// # }
618        /// ```
619        pub fn into_service(self) -> Self {
620            self
621        }
622    }
623
624    impl Service<ApiRequest> for CloudClient {
625        type Response = ApiResponse;
626        type Error = RestError;
627        type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
628
629        fn poll_ready(
630            &mut self,
631            _cx: &mut Context<'_>,
632        ) -> Poll<std::result::Result<(), Self::Error>> {
633            // CloudClient is always ready since it uses an internal connection pool
634            Poll::Ready(Ok(()))
635        }
636
637        fn call(&mut self, req: ApiRequest) -> Self::Future {
638            let client = self.clone();
639            Box::pin(async move {
640                let response: serde_json::Value = match req.method {
641                    Method::Get => client.get_raw(&req.path).await?,
642                    Method::Post => {
643                        let body = req.body.ok_or_else(|| RestError::BadRequest {
644                            message: "POST request requires a body".to_string(),
645                        })?;
646                        client.post_raw(&req.path, body).await?
647                    }
648                    Method::Put => {
649                        let body = req.body.ok_or_else(|| RestError::BadRequest {
650                            message: "PUT request requires a body".to_string(),
651                        })?;
652                        client.put_raw(&req.path, body).await?
653                    }
654                    Method::Patch => {
655                        let body = req.body.ok_or_else(|| RestError::BadRequest {
656                            message: "PATCH request requires a body".to_string(),
657                        })?;
658                        client.patch_raw(&req.path, body).await?
659                    }
660                    Method::Delete => client.delete_raw(&req.path).await?,
661                };
662
663                Ok(ApiResponse {
664                    status: 200,
665                    body: response,
666                })
667            })
668        }
669    }
670}