redis_cloud/
client.rs

1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a CloudClient with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final CloudClient instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31///     .api_key("your-api-key")
32///     .api_secret("your-api-secret")
33///     .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37///     .api_key("your-api-key")
38///     .api_secret("your-api-secret")
39///     .base_url("https://api.redislabs.com/v1".to_string())
40///     .timeout(std::time::Duration::from_secs(120))
41///     .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46    api_key: Option<String>,
47    api_secret: Option<String>,
48    base_url: String,
49    timeout: std::time::Duration,
50    user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54    fn default() -> Self {
55        Self {
56            api_key: None,
57            api_secret: None,
58            base_url: "https://api.redislabs.com/v1".to_string(),
59            timeout: std::time::Duration::from_secs(30),
60            user_agent: DEFAULT_USER_AGENT.to_string(),
61        }
62    }
63}
64
65impl CloudClientBuilder {
66    /// Create a new builder
67    pub fn new() -> Self {
68        Self::default()
69    }
70
71    /// Set the API key
72    pub fn api_key(mut self, key: impl Into<String>) -> Self {
73        self.api_key = Some(key.into());
74        self
75    }
76
77    /// Set the API secret
78    pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
79        self.api_secret = Some(secret.into());
80        self
81    }
82
83    /// Set the base URL
84    pub fn base_url(mut self, url: impl Into<String>) -> Self {
85        self.base_url = url.into();
86        self
87    }
88
89    /// Set the timeout
90    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
91        self.timeout = timeout;
92        self
93    }
94
95    /// Set the user agent string for HTTP requests
96    ///
97    /// The default user agent is `redis-cloud/{version}`.
98    /// This can be overridden to identify specific clients, for example:
99    /// `redisctl/1.2.3` or `my-app/1.0.0`.
100    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
101        self.user_agent = user_agent.into();
102        self
103    }
104
105    /// Build the client
106    pub fn build(self) -> Result<CloudClient> {
107        let api_key = self
108            .api_key
109            .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
110        let api_secret = self
111            .api_secret
112            .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
113
114        let mut default_headers = HeaderMap::new();
115        default_headers.insert(
116            USER_AGENT,
117            HeaderValue::from_str(&self.user_agent)
118                .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {}", e)))?,
119        );
120
121        let client = Client::builder()
122            .timeout(self.timeout)
123            .default_headers(default_headers)
124            .build()
125            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
126
127        Ok(CloudClient {
128            api_key,
129            api_secret,
130            base_url: self.base_url,
131            timeout: self.timeout,
132            client: Arc::new(client),
133        })
134    }
135}
136
137/// Redis Cloud API client
138#[derive(Clone)]
139pub struct CloudClient {
140    pub(crate) api_key: String,
141    pub(crate) api_secret: String,
142    pub(crate) base_url: String,
143    #[allow(dead_code)]
144    pub(crate) timeout: std::time::Duration,
145    pub(crate) client: Arc<Client>,
146}
147
148impl CloudClient {
149    /// Create a new builder for the client
150    pub fn builder() -> CloudClientBuilder {
151        CloudClientBuilder::new()
152    }
153
154    /// Normalize URL path concatenation to avoid double slashes
155    fn normalize_url(&self, path: &str) -> String {
156        let base = self.base_url.trim_end_matches('/');
157        let path = path.trim_start_matches('/');
158        format!("{}/{}", base, path)
159    }
160
161    /// Make a GET request with API key authentication
162    #[instrument(skip(self), fields(method = "GET"))]
163    pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
164        let url = self.normalize_url(path);
165        debug!("GET {}", url);
166
167        // Redis Cloud API uses these headers for authentication
168        let response = self
169            .client
170            .get(&url)
171            .header("x-api-key", &self.api_key)
172            .header("x-api-secret-key", &self.api_secret)
173            .send()
174            .await?;
175
176        trace!("Response status: {}", response.status());
177        self.handle_response(response).await
178    }
179
180    /// Make a POST request
181    #[instrument(skip(self, body), fields(method = "POST"))]
182    pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
183        &self,
184        path: &str,
185        body: &B,
186    ) -> Result<T> {
187        let url = self.normalize_url(path);
188        debug!("POST {}", url);
189        trace!("Request body: {:?}", serde_json::to_value(body).ok());
190
191        // Same backwards header naming as GET
192        let response = self
193            .client
194            .post(&url)
195            .header("x-api-key", &self.api_key)
196            .header("x-api-secret-key", &self.api_secret)
197            .json(body)
198            .send()
199            .await?;
200
201        trace!("Response status: {}", response.status());
202        self.handle_response(response).await
203    }
204
205    /// Make a PUT request
206    #[instrument(skip(self, body), fields(method = "PUT"))]
207    pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
208        &self,
209        path: &str,
210        body: &B,
211    ) -> Result<T> {
212        let url = self.normalize_url(path);
213        debug!("PUT {}", url);
214        trace!("Request body: {:?}", serde_json::to_value(body).ok());
215
216        // Same backwards header naming as GET
217        let response = self
218            .client
219            .put(&url)
220            .header("x-api-key", &self.api_key)
221            .header("x-api-secret-key", &self.api_secret)
222            .json(body)
223            .send()
224            .await?;
225
226        trace!("Response status: {}", response.status());
227        self.handle_response(response).await
228    }
229
230    /// Make a DELETE request
231    #[instrument(skip(self), fields(method = "DELETE"))]
232    pub async fn delete(&self, path: &str) -> Result<()> {
233        let url = self.normalize_url(path);
234        debug!("DELETE {}", url);
235
236        // Same backwards header naming as GET
237        let response = self
238            .client
239            .delete(&url)
240            .header("x-api-key", &self.api_key)
241            .header("x-api-secret-key", &self.api_secret)
242            .send()
243            .await?;
244
245        trace!("Response status: {}", response.status());
246        if response.status().is_success() {
247            Ok(())
248        } else {
249            let status = response.status();
250            let text = response.text().await.unwrap_or_default();
251
252            match status.as_u16() {
253                400 => Err(RestError::BadRequest { message: text }),
254                401 => Err(RestError::AuthenticationFailed { message: text }),
255                403 => Err(RestError::Forbidden { message: text }),
256                404 => Err(RestError::NotFound { message: text }),
257                412 => Err(RestError::PreconditionFailed),
258                500 => Err(RestError::InternalServerError { message: text }),
259                503 => Err(RestError::ServiceUnavailable { message: text }),
260                _ => Err(RestError::ApiError {
261                    code: status.as_u16(),
262                    message: text,
263                }),
264            }
265        }
266    }
267
268    /// Execute raw GET request returning JSON Value
269    #[instrument(skip(self), fields(method = "GET"))]
270    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
271        self.get(path).await
272    }
273
274    /// Execute GET request returning raw bytes
275    ///
276    /// Useful for downloading binary content like cost reports or other files.
277    #[instrument(skip(self), fields(method = "GET"))]
278    pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
279        let url = self.normalize_url(path);
280        debug!("GET {} (bytes)", url);
281
282        let response = self
283            .client
284            .get(&url)
285            .header("x-api-key", &self.api_key)
286            .header("x-api-secret-key", &self.api_secret)
287            .send()
288            .await?;
289
290        trace!("Response status: {}", response.status());
291        let status = response.status();
292
293        if status.is_success() {
294            response
295                .bytes()
296                .await
297                .map(|b| b.to_vec())
298                .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {}", e)))
299        } else {
300            let text = response.text().await.unwrap_or_default();
301
302            match status.as_u16() {
303                400 => Err(RestError::BadRequest { message: text }),
304                401 => Err(RestError::AuthenticationFailed { message: text }),
305                403 => Err(RestError::Forbidden { message: text }),
306                404 => Err(RestError::NotFound { message: text }),
307                412 => Err(RestError::PreconditionFailed),
308                500 => Err(RestError::InternalServerError { message: text }),
309                503 => Err(RestError::ServiceUnavailable { message: text }),
310                _ => Err(RestError::ApiError {
311                    code: status.as_u16(),
312                    message: text,
313                }),
314            }
315        }
316    }
317
318    /// Execute raw POST request with JSON body
319    #[instrument(skip(self, body), fields(method = "POST"))]
320    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
321        self.post(path, &body).await
322    }
323
324    /// Execute raw PUT request with JSON body
325    #[instrument(skip(self, body), fields(method = "PUT"))]
326    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
327        self.put(path, &body).await
328    }
329
330    /// Execute raw PATCH request with JSON body
331    #[instrument(skip(self, body), fields(method = "PATCH"))]
332    pub async fn patch_raw(
333        &self,
334        path: &str,
335        body: serde_json::Value,
336    ) -> Result<serde_json::Value> {
337        let url = self.normalize_url(path);
338        debug!("PATCH {}", url);
339        trace!("Request body: {:?}", body);
340
341        // Use backwards header names for compatibility
342        let response = self
343            .client
344            .patch(&url)
345            .header("x-api-key", &self.api_key)
346            .header("x-api-secret-key", &self.api_secret)
347            .json(&body)
348            .send()
349            .await?;
350
351        trace!("Response status: {}", response.status());
352        self.handle_response(response).await
353    }
354
355    /// Execute raw DELETE request returning any response body
356    #[instrument(skip(self), fields(method = "DELETE"))]
357    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
358        let url = self.normalize_url(path);
359        debug!("DELETE {}", url);
360
361        // Use backwards header names for compatibility
362        let response = self
363            .client
364            .delete(&url)
365            .header("x-api-key", &self.api_key)
366            .header("x-api-secret-key", &self.api_secret)
367            .send()
368            .await?;
369
370        trace!("Response status: {}", response.status());
371        if response.status().is_success() {
372            if response.content_length() == Some(0) {
373                Ok(serde_json::json!({"status": "deleted"}))
374            } else {
375                response.json().await.map_err(Into::into)
376            }
377        } else {
378            let status = response.status();
379            let text = response.text().await.unwrap_or_default();
380
381            match status.as_u16() {
382                400 => Err(RestError::BadRequest { message: text }),
383                401 => Err(RestError::AuthenticationFailed { message: text }),
384                403 => Err(RestError::Forbidden { message: text }),
385                404 => Err(RestError::NotFound { message: text }),
386                412 => Err(RestError::PreconditionFailed),
387                500 => Err(RestError::InternalServerError { message: text }),
388                503 => Err(RestError::ServiceUnavailable { message: text }),
389                _ => Err(RestError::ApiError {
390                    code: status.as_u16(),
391                    message: text,
392                }),
393            }
394        }
395    }
396
397    /// Execute DELETE request with JSON body (used by some endpoints like PrivateLink principals)
398    #[instrument(skip(self, body), fields(method = "DELETE"))]
399    pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
400        &self,
401        path: &str,
402        body: serde_json::Value,
403    ) -> Result<T> {
404        let url = self.normalize_url(path);
405        debug!("DELETE {} (with body)", url);
406        trace!("Request body: {:?}", body);
407
408        let response = self
409            .client
410            .delete(&url)
411            .header("x-api-key", &self.api_key)
412            .header("x-api-secret-key", &self.api_secret)
413            .json(&body)
414            .send()
415            .await?;
416
417        trace!("Response status: {}", response.status());
418        self.handle_response(response).await
419    }
420
421    /// Handle HTTP response
422    async fn handle_response<T: serde::de::DeserializeOwned>(
423        &self,
424        response: reqwest::Response,
425    ) -> Result<T> {
426        let status = response.status();
427
428        if status.is_success() {
429            // Get the response bytes for better error reporting
430            let bytes = response.bytes().await.map_err(|e| {
431                RestError::ConnectionError(format!("Failed to read response: {}", e))
432            })?;
433
434            // Use serde_path_to_error for better deserialization error messages
435            let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
436            serde_path_to_error::deserialize(deserializer).map_err(|err| {
437                let path = err.path().to_string();
438                // Use ConnectionError to provide detailed error message with field path
439                RestError::ConnectionError(format!(
440                    "Failed to deserialize field '{}': {}",
441                    path,
442                    err.inner()
443                ))
444            })
445        } else {
446            let text = response.text().await.unwrap_or_default();
447
448            match status.as_u16() {
449                400 => Err(RestError::BadRequest { message: text }),
450                401 => Err(RestError::AuthenticationFailed { message: text }),
451                403 => Err(RestError::Forbidden { message: text }),
452                404 => Err(RestError::NotFound { message: text }),
453                412 => Err(RestError::PreconditionFailed),
454                500 => Err(RestError::InternalServerError { message: text }),
455                503 => Err(RestError::ServiceUnavailable { message: text }),
456                _ => Err(RestError::ApiError {
457                    code: status.as_u16(),
458                    message: text,
459                }),
460            }
461        }
462    }
463}
464
465/// Tower Service integration for CloudClient
466///
467/// This module provides Tower Service implementations for CloudClient, enabling
468/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
469///
470/// # Feature Flag
471///
472/// This module is only available when the `tower-integration` feature is enabled.
473///
474/// # Examples
475///
476/// ```rust,ignore
477/// use redis_cloud::CloudClient;
478/// use redis_cloud::tower_support::ApiRequest;
479/// use tower::ServiceExt;
480///
481/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
482/// let client = CloudClient::builder()
483///     .api_key("your-key")
484///     .api_secret("your-secret")
485///     .build()?;
486///
487/// // Convert to a Tower service
488/// let mut service = client.into_service();
489///
490/// // Use the service
491/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
492/// println!("Status: {}", response.status);
493/// # Ok(())
494/// # }
495/// ```
496#[cfg(feature = "tower-integration")]
497pub mod tower_support {
498    use super::*;
499    use std::future::Future;
500    use std::pin::Pin;
501    use std::task::{Context, Poll};
502    use tower::Service;
503
504    /// HTTP method for API requests
505    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
506    pub enum Method {
507        /// GET request
508        Get,
509        /// POST request
510        Post,
511        /// PUT request
512        Put,
513        /// PATCH request
514        Patch,
515        /// DELETE request
516        Delete,
517    }
518
519    /// Tower-compatible request type for Redis Cloud API
520    ///
521    /// This wraps the essential components of an API request in a format
522    /// suitable for Tower middleware composition.
523    #[derive(Debug, Clone)]
524    pub struct ApiRequest {
525        /// HTTP method
526        pub method: Method,
527        /// API endpoint path (e.g., "/subscriptions")
528        pub path: String,
529        /// Optional JSON body for POST/PUT/PATCH requests
530        pub body: Option<serde_json::Value>,
531    }
532
533    impl ApiRequest {
534        /// Create a GET request
535        pub fn get(path: impl Into<String>) -> Self {
536            Self {
537                method: Method::Get,
538                path: path.into(),
539                body: None,
540            }
541        }
542
543        /// Create a POST request with a JSON body
544        pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
545            Self {
546                method: Method::Post,
547                path: path.into(),
548                body: Some(body),
549            }
550        }
551
552        /// Create a PUT request with a JSON body
553        pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
554            Self {
555                method: Method::Put,
556                path: path.into(),
557                body: Some(body),
558            }
559        }
560
561        /// Create a PATCH request with a JSON body
562        pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
563            Self {
564                method: Method::Patch,
565                path: path.into(),
566                body: Some(body),
567            }
568        }
569
570        /// Create a DELETE request
571        pub fn delete(path: impl Into<String>) -> Self {
572            Self {
573                method: Method::Delete,
574                path: path.into(),
575                body: None,
576            }
577        }
578    }
579
580    /// Tower-compatible response type
581    ///
582    /// Contains the HTTP status code and response body as JSON.
583    #[derive(Debug, Clone)]
584    pub struct ApiResponse {
585        /// HTTP status code
586        pub status: u16,
587        /// Response body as JSON
588        pub body: serde_json::Value,
589    }
590
591    impl CloudClient {
592        /// Convert this client into a Tower service
593        ///
594        /// This consumes the client and returns it wrapped in a Tower service
595        /// implementation, enabling middleware composition.
596        ///
597        /// # Examples
598        ///
599        /// ```rust,ignore
600        /// use redis_cloud::CloudClient;
601        /// use tower::ServiceExt;
602        /// use redis_cloud::tower_support::ApiRequest;
603        ///
604        /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
605        /// let client = CloudClient::builder()
606        ///     .api_key("key")
607        ///     .api_secret("secret")
608        ///     .build()?;
609        ///
610        /// let mut service = client.into_service();
611        /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
612        /// # Ok(())
613        /// # }
614        /// ```
615        pub fn into_service(self) -> Self {
616            self
617        }
618    }
619
620    impl Service<ApiRequest> for CloudClient {
621        type Response = ApiResponse;
622        type Error = RestError;
623        type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
624
625        fn poll_ready(
626            &mut self,
627            _cx: &mut Context<'_>,
628        ) -> Poll<std::result::Result<(), Self::Error>> {
629            // CloudClient is always ready since it uses an internal connection pool
630            Poll::Ready(Ok(()))
631        }
632
633        fn call(&mut self, req: ApiRequest) -> Self::Future {
634            let client = self.clone();
635            Box::pin(async move {
636                let response: serde_json::Value = match req.method {
637                    Method::Get => client.get_raw(&req.path).await?,
638                    Method::Post => {
639                        let body = req.body.ok_or_else(|| RestError::BadRequest {
640                            message: "POST request requires a body".to_string(),
641                        })?;
642                        client.post_raw(&req.path, body).await?
643                    }
644                    Method::Put => {
645                        let body = req.body.ok_or_else(|| RestError::BadRequest {
646                            message: "PUT request requires a body".to_string(),
647                        })?;
648                        client.put_raw(&req.path, body).await?
649                    }
650                    Method::Patch => {
651                        let body = req.body.ok_or_else(|| RestError::BadRequest {
652                            message: "PATCH request requires a body".to_string(),
653                        })?;
654                        client.patch_raw(&req.path, body).await?
655                    }
656                    Method::Delete => client.delete_raw(&req.path).await?,
657                };
658
659                Ok(ApiResponse {
660                    status: 200,
661                    body: response,
662                })
663            })
664        }
665    }
666}