redis_cloud/client.rs
1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a `CloudClient` with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final `CloudClient` instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31/// .api_key("your-api-key")
32/// .api_secret("your-api-secret")
33/// .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37/// .api_key("your-api-key")
38/// .api_secret("your-api-secret")
39/// .base_url("https://api.redislabs.com/v1".to_string())
40/// .timeout(std::time::Duration::from_secs(120))
41/// .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46 api_key: Option<String>,
47 api_secret: Option<String>,
48 base_url: String,
49 timeout: std::time::Duration,
50 user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54 fn default() -> Self {
55 Self {
56 api_key: None,
57 api_secret: None,
58 base_url: "https://api.redislabs.com/v1".to_string(),
59 timeout: std::time::Duration::from_secs(30),
60 user_agent: DEFAULT_USER_AGENT.to_string(),
61 }
62 }
63}
64
65impl CloudClientBuilder {
66 /// Create a new builder
67 #[must_use]
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Set the API key
73 #[must_use]
74 pub fn api_key(mut self, key: impl Into<String>) -> Self {
75 self.api_key = Some(key.into());
76 self
77 }
78
79 /// Set the API secret
80 #[must_use]
81 pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
82 self.api_secret = Some(secret.into());
83 self
84 }
85
86 /// Set the base URL
87 #[must_use]
88 pub fn base_url(mut self, url: impl Into<String>) -> Self {
89 self.base_url = url.into();
90 self
91 }
92
93 /// Set the timeout
94 #[must_use]
95 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
96 self.timeout = timeout;
97 self
98 }
99
100 /// Set the user agent string for HTTP requests
101 ///
102 /// The default user agent is `redis-cloud/{version}`.
103 /// This can be overridden to identify specific clients, for example:
104 /// `redisctl/1.2.3` or `my-app/1.0.0`.
105 #[must_use]
106 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
107 self.user_agent = user_agent.into();
108 self
109 }
110
111 /// Build the client
112 pub fn build(self) -> Result<CloudClient> {
113 let api_key = self
114 .api_key
115 .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
116 let api_secret = self
117 .api_secret
118 .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
119
120 let mut default_headers = HeaderMap::new();
121 default_headers.insert(
122 USER_AGENT,
123 HeaderValue::from_str(&self.user_agent)
124 .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {e}")))?,
125 );
126
127 let client = Client::builder()
128 .timeout(self.timeout)
129 .default_headers(default_headers)
130 .build()
131 .map_err(|e| RestError::ConnectionError(e.to_string()))?;
132
133 Ok(CloudClient {
134 api_key,
135 api_secret,
136 base_url: self.base_url,
137 timeout: self.timeout,
138 client: Arc::new(client),
139 })
140 }
141}
142
143/// Redis Cloud API client
144#[derive(Clone)]
145pub struct CloudClient {
146 pub(crate) api_key: String,
147 pub(crate) api_secret: String,
148 pub(crate) base_url: String,
149 pub(crate) timeout: std::time::Duration,
150 pub(crate) client: Arc<Client>,
151}
152
153impl CloudClient {
154 /// Create a new builder for the client
155 #[must_use]
156 pub fn builder() -> CloudClientBuilder {
157 CloudClientBuilder::new()
158 }
159
160 /// Get the configured request timeout
161 ///
162 /// Returns the timeout duration that was set when building the client.
163 /// This timeout is applied to all HTTP requests made by this client.
164 #[must_use]
165 pub fn timeout(&self) -> std::time::Duration {
166 self.timeout
167 }
168
169 // ========================================================================
170 // Fluent API - Handler accessors
171 // ========================================================================
172
173 /// Get an account handler for account management operations
174 ///
175 /// # Example
176 ///
177 /// ```rust,no_run
178 /// # use redis_cloud::CloudClient;
179 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
180 /// let client = CloudClient::builder()
181 /// .api_key("key")
182 /// .api_secret("secret")
183 /// .build()?;
184 ///
185 /// let account = client.account().get_current_account().await?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 #[must_use]
190 pub fn account(&self) -> crate::AccountHandler {
191 crate::AccountHandler::new(self.clone())
192 }
193
194 /// Get a subscription handler for Pro subscription operations
195 ///
196 /// # Example
197 ///
198 /// ```rust,no_run
199 /// # use redis_cloud::CloudClient;
200 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
201 /// let client = CloudClient::builder()
202 /// .api_key("key")
203 /// .api_secret("secret")
204 /// .build()?;
205 ///
206 /// let subscriptions = client.subscriptions().get_all_subscriptions().await?;
207 /// # Ok(())
208 /// # }
209 /// ```
210 #[must_use]
211 pub fn subscriptions(&self) -> crate::SubscriptionHandler {
212 crate::SubscriptionHandler::new(self.clone())
213 }
214
215 /// Get a database handler for Pro database operations
216 ///
217 /// # Example
218 ///
219 /// ```rust,no_run
220 /// # use redis_cloud::CloudClient;
221 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222 /// let client = CloudClient::builder()
223 /// .api_key("key")
224 /// .api_secret("secret")
225 /// .build()?;
226 ///
227 /// let databases = client.databases().get_subscription_databases(123, None, None).await?;
228 /// # Ok(())
229 /// # }
230 /// ```
231 #[must_use]
232 pub fn databases(&self) -> crate::DatabaseHandler {
233 crate::DatabaseHandler::new(self.clone())
234 }
235
236 /// Get a fixed subscription handler for Essentials subscription operations
237 ///
238 /// # Example
239 ///
240 /// ```rust,no_run
241 /// # use redis_cloud::CloudClient;
242 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
243 /// let client = CloudClient::builder()
244 /// .api_key("key")
245 /// .api_secret("secret")
246 /// .build()?;
247 ///
248 /// let subscriptions = client.fixed_subscriptions().list().await?;
249 /// # Ok(())
250 /// # }
251 /// ```
252 #[must_use]
253 pub fn fixed_subscriptions(&self) -> crate::FixedSubscriptionHandler {
254 crate::FixedSubscriptionHandler::new(self.clone())
255 }
256
257 /// Get a fixed database handler for Essentials database operations
258 ///
259 /// # Example
260 ///
261 /// ```rust,no_run
262 /// # use redis_cloud::CloudClient;
263 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
264 /// let client = CloudClient::builder()
265 /// .api_key("key")
266 /// .api_secret("secret")
267 /// .build()?;
268 ///
269 /// let databases = client.fixed_databases().list(123, None, None).await?;
270 /// # Ok(())
271 /// # }
272 /// ```
273 #[must_use]
274 pub fn fixed_databases(&self) -> crate::FixedDatabaseHandler {
275 crate::FixedDatabaseHandler::new(self.clone())
276 }
277
278 /// Get an ACL handler for access control operations
279 ///
280 /// # Example
281 ///
282 /// ```rust,no_run
283 /// # use redis_cloud::CloudClient;
284 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
285 /// let client = CloudClient::builder()
286 /// .api_key("key")
287 /// .api_secret("secret")
288 /// .build()?;
289 ///
290 /// let users = client.acl().get_all_acl_users().await?;
291 /// # Ok(())
292 /// # }
293 /// ```
294 #[must_use]
295 pub fn acl(&self) -> crate::AclHandler {
296 crate::AclHandler::new(self.clone())
297 }
298
299 /// Get a users handler for user management operations
300 ///
301 /// # Example
302 ///
303 /// ```rust,no_run
304 /// # use redis_cloud::CloudClient;
305 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306 /// let client = CloudClient::builder()
307 /// .api_key("key")
308 /// .api_secret("secret")
309 /// .build()?;
310 ///
311 /// let users = client.users().get_all_users().await?;
312 /// # Ok(())
313 /// # }
314 /// ```
315 #[must_use]
316 pub fn users(&self) -> crate::UserHandler {
317 crate::UserHandler::new(self.clone())
318 }
319
320 /// Get a tasks handler for async operation tracking
321 ///
322 /// # Example
323 ///
324 /// ```rust,no_run
325 /// # use redis_cloud::CloudClient;
326 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327 /// let client = CloudClient::builder()
328 /// .api_key("key")
329 /// .api_secret("secret")
330 /// .build()?;
331 ///
332 /// let tasks = client.tasks().get_all_tasks().await?;
333 /// # Ok(())
334 /// # }
335 /// ```
336 #[must_use]
337 pub fn tasks(&self) -> crate::TaskHandler {
338 crate::TaskHandler::new(self.clone())
339 }
340
341 /// Get a cloud accounts handler for cloud provider integration
342 ///
343 /// # Example
344 ///
345 /// ```rust,no_run
346 /// # use redis_cloud::CloudClient;
347 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
348 /// let client = CloudClient::builder()
349 /// .api_key("key")
350 /// .api_secret("secret")
351 /// .build()?;
352 ///
353 /// let accounts = client.cloud_accounts().get_cloud_accounts().await?;
354 /// # Ok(())
355 /// # }
356 /// ```
357 #[must_use]
358 pub fn cloud_accounts(&self) -> crate::CloudAccountHandler {
359 crate::CloudAccountHandler::new(self.clone())
360 }
361
362 /// Get a VPC peering handler for VPC peering operations
363 ///
364 /// # Example
365 ///
366 /// ```rust,no_run
367 /// # use redis_cloud::CloudClient;
368 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369 /// let client = CloudClient::builder()
370 /// .api_key("key")
371 /// .api_secret("secret")
372 /// .build()?;
373 ///
374 /// let peering = client.vpc_peering().get(123).await?;
375 /// # Ok(())
376 /// # }
377 /// ```
378 #[must_use]
379 pub fn vpc_peering(&self) -> crate::VpcPeeringHandler {
380 crate::VpcPeeringHandler::new(self.clone())
381 }
382
383 /// Get a transit gateway handler for AWS Transit Gateway operations
384 ///
385 /// # Example
386 ///
387 /// ```rust,no_run
388 /// # use redis_cloud::CloudClient;
389 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
390 /// let client = CloudClient::builder()
391 /// .api_key("key")
392 /// .api_secret("secret")
393 /// .build()?;
394 ///
395 /// let attachments = client.transit_gateway().get_attachments(123).await?;
396 /// # Ok(())
397 /// # }
398 /// ```
399 #[must_use]
400 pub fn transit_gateway(&self) -> crate::TransitGatewayHandler {
401 crate::TransitGatewayHandler::new(self.clone())
402 }
403
404 /// Get a Private Service Connect handler for GCP PSC operations
405 ///
406 /// # Example
407 ///
408 /// ```rust,no_run
409 /// # use redis_cloud::CloudClient;
410 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411 /// let client = CloudClient::builder()
412 /// .api_key("key")
413 /// .api_secret("secret")
414 /// .build()?;
415 ///
416 /// let service = client.psc().get_service(123).await?;
417 /// # Ok(())
418 /// # }
419 /// ```
420 #[must_use]
421 pub fn psc(&self) -> crate::PscHandler {
422 crate::PscHandler::new(self.clone())
423 }
424
425 /// Get a `PrivateLink` handler for AWS `PrivateLink` operations
426 ///
427 /// # Example
428 ///
429 /// ```rust,no_run
430 /// # use redis_cloud::CloudClient;
431 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
432 /// let client = CloudClient::builder()
433 /// .api_key("key")
434 /// .api_secret("secret")
435 /// .build()?;
436 ///
437 /// let config = client.private_link().get(123).await?;
438 /// # Ok(())
439 /// # }
440 /// ```
441 #[must_use]
442 pub fn private_link(&self) -> crate::PrivateLinkHandler {
443 crate::PrivateLinkHandler::new(self.clone())
444 }
445
446 /// Get a cost report handler for generating cost reports
447 ///
448 /// # Example
449 ///
450 /// ```rust,no_run
451 /// # use redis_cloud::CloudClient;
452 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
453 /// let client = CloudClient::builder()
454 /// .api_key("key")
455 /// .api_secret("secret")
456 /// .build()?;
457 ///
458 /// let handler = client.cost_reports();
459 /// # Ok(())
460 /// # }
461 /// ```
462 #[must_use]
463 pub fn cost_reports(&self) -> crate::CostReportHandler {
464 crate::CostReportHandler::new(self.clone())
465 }
466
467 /// Normalize URL path concatenation to avoid double slashes
468 fn normalize_url(&self, path: &str) -> String {
469 let base = self.base_url.trim_end_matches('/');
470 let path = path.trim_start_matches('/');
471 format!("{base}/{path}")
472 }
473
474 /// Convert HTTP status code and response text to appropriate error
475 ///
476 /// This is a helper to avoid duplicating the error handling pattern
477 /// across multiple methods.
478 fn status_to_error(status: reqwest::StatusCode, text: String) -> RestError {
479 match status.as_u16() {
480 400 => RestError::BadRequest { message: text },
481 401 => RestError::AuthenticationFailed { message: text },
482 403 => RestError::Forbidden { message: text },
483 404 => RestError::NotFound { message: text },
484 412 => RestError::PreconditionFailed,
485 429 => RestError::RateLimited { message: text },
486 500 => RestError::InternalServerError { message: text },
487 503 => RestError::ServiceUnavailable { message: text },
488 _ => RestError::ApiError {
489 code: status.as_u16(),
490 message: text,
491 },
492 }
493 }
494
495 /// Make a GET request with API key authentication
496 #[instrument(skip(self), fields(method = "GET"))]
497 pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
498 let url = self.normalize_url(path);
499 debug!("GET {}", url);
500
501 // Redis Cloud API uses these headers for authentication
502 let response = self
503 .client
504 .get(&url)
505 .header("x-api-key", &self.api_key)
506 .header("x-api-secret-key", &self.api_secret)
507 .send()
508 .await?;
509
510 trace!("Response status: {}", response.status());
511 self.handle_response(response).await
512 }
513
514 /// Make a POST request
515 #[instrument(skip(self, body), fields(method = "POST"))]
516 pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
517 &self,
518 path: &str,
519 body: &B,
520 ) -> Result<T> {
521 let url = self.normalize_url(path);
522 debug!("POST {}", url);
523 trace!("Request body: {:?}", serde_json::to_value(body).ok());
524
525 // Same backwards header naming as GET
526 let response = self
527 .client
528 .post(&url)
529 .header("x-api-key", &self.api_key)
530 .header("x-api-secret-key", &self.api_secret)
531 .json(body)
532 .send()
533 .await?;
534
535 trace!("Response status: {}", response.status());
536 self.handle_response(response).await
537 }
538
539 /// Make a PUT request
540 #[instrument(skip(self, body), fields(method = "PUT"))]
541 pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
542 &self,
543 path: &str,
544 body: &B,
545 ) -> Result<T> {
546 let url = self.normalize_url(path);
547 debug!("PUT {}", url);
548 trace!("Request body: {:?}", serde_json::to_value(body).ok());
549
550 // Same backwards header naming as GET
551 let response = self
552 .client
553 .put(&url)
554 .header("x-api-key", &self.api_key)
555 .header("x-api-secret-key", &self.api_secret)
556 .json(body)
557 .send()
558 .await?;
559
560 trace!("Response status: {}", response.status());
561 self.handle_response(response).await
562 }
563
564 /// Make a DELETE request
565 #[instrument(skip(self), fields(method = "DELETE"))]
566 pub async fn delete(&self, path: &str) -> Result<()> {
567 let url = self.normalize_url(path);
568 debug!("DELETE {}", url);
569
570 // Same backwards header naming as GET
571 let response = self
572 .client
573 .delete(&url)
574 .header("x-api-key", &self.api_key)
575 .header("x-api-secret-key", &self.api_secret)
576 .send()
577 .await?;
578
579 trace!("Response status: {}", response.status());
580 if response.status().is_success() {
581 Ok(())
582 } else {
583 let status = response.status();
584 let text = response
585 .text()
586 .await
587 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
588 Err(Self::status_to_error(status, text))
589 }
590 }
591
592 /// Execute raw GET request returning JSON Value
593 #[instrument(skip(self), fields(method = "GET"))]
594 pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
595 self.get(path).await
596 }
597
598 /// Execute GET request returning raw bytes
599 ///
600 /// Useful for downloading binary content like cost reports or other files.
601 #[instrument(skip(self), fields(method = "GET"))]
602 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
603 let url = self.normalize_url(path);
604 debug!("GET {} (bytes)", url);
605
606 let response = self
607 .client
608 .get(&url)
609 .header("x-api-key", &self.api_key)
610 .header("x-api-secret-key", &self.api_secret)
611 .send()
612 .await?;
613
614 trace!("Response status: {}", response.status());
615 let status = response.status();
616
617 if status.is_success() {
618 response
619 .bytes()
620 .await
621 .map(|b| b.to_vec())
622 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))
623 } else {
624 let text = response
625 .text()
626 .await
627 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
628 Err(Self::status_to_error(status, text))
629 }
630 }
631
632 /// Execute raw POST request with JSON body
633 #[instrument(skip(self, body), fields(method = "POST"))]
634 pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
635 self.post(path, &body).await
636 }
637
638 /// Execute raw PUT request with JSON body
639 #[instrument(skip(self, body), fields(method = "PUT"))]
640 pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
641 self.put(path, &body).await
642 }
643
644 /// Execute raw PATCH request with JSON body
645 #[instrument(skip(self, body), fields(method = "PATCH"))]
646 pub async fn patch_raw(
647 &self,
648 path: &str,
649 body: serde_json::Value,
650 ) -> Result<serde_json::Value> {
651 let url = self.normalize_url(path);
652 debug!("PATCH {}", url);
653 trace!("Request body: {:?}", body);
654
655 // Use backwards header names for compatibility
656 let response = self
657 .client
658 .patch(&url)
659 .header("x-api-key", &self.api_key)
660 .header("x-api-secret-key", &self.api_secret)
661 .json(&body)
662 .send()
663 .await?;
664
665 trace!("Response status: {}", response.status());
666 self.handle_response(response).await
667 }
668
669 /// Execute raw DELETE request returning any response body
670 #[instrument(skip(self), fields(method = "DELETE"))]
671 pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
672 let url = self.normalize_url(path);
673 debug!("DELETE {}", url);
674
675 // Use backwards header names for compatibility
676 let response = self
677 .client
678 .delete(&url)
679 .header("x-api-key", &self.api_key)
680 .header("x-api-secret-key", &self.api_secret)
681 .send()
682 .await?;
683
684 trace!("Response status: {}", response.status());
685 if response.status().is_success() {
686 if response.content_length() == Some(0) {
687 Ok(serde_json::json!({"status": "deleted"}))
688 } else {
689 response.json().await.map_err(Into::into)
690 }
691 } else {
692 let status = response.status();
693 let text = response
694 .text()
695 .await
696 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
697 Err(Self::status_to_error(status, text))
698 }
699 }
700
701 /// Execute DELETE request with JSON body (used by some endpoints like `PrivateLink` principals)
702 #[instrument(skip(self, body), fields(method = "DELETE"))]
703 pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
704 &self,
705 path: &str,
706 body: serde_json::Value,
707 ) -> Result<T> {
708 let url = self.normalize_url(path);
709 debug!("DELETE {} (with body)", url);
710 trace!("Request body: {:?}", body);
711
712 let response = self
713 .client
714 .delete(&url)
715 .header("x-api-key", &self.api_key)
716 .header("x-api-secret-key", &self.api_secret)
717 .json(&body)
718 .send()
719 .await?;
720
721 trace!("Response status: {}", response.status());
722 self.handle_response(response).await
723 }
724
725 /// Handle HTTP response and return both status code and body as JSON
726 ///
727 /// This is used internally by the Tower service implementation to preserve
728 /// the actual HTTP status code in responses.
729 #[cfg(feature = "tower-integration")]
730 async fn handle_response_with_status(
731 &self,
732 response: reqwest::Response,
733 ) -> Result<(u16, serde_json::Value)> {
734 let status = response.status();
735 let status_code = status.as_u16();
736
737 if status.is_success() {
738 let bytes = response
739 .bytes()
740 .await
741 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
742
743 let value: serde_json::Value = serde_json::from_slice(&bytes).map_err(|e| {
744 RestError::ConnectionError(format!("Failed to parse JSON response: {e}"))
745 })?;
746
747 Ok((status_code, value))
748 } else {
749 let text = response
750 .text()
751 .await
752 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
753 Err(Self::status_to_error(status, text))
754 }
755 }
756
757 /// Handle HTTP response
758 async fn handle_response<T: serde::de::DeserializeOwned>(
759 &self,
760 response: reqwest::Response,
761 ) -> Result<T> {
762 let status = response.status();
763
764 if status.is_success() {
765 // Get the response bytes for better error reporting
766 let bytes = response
767 .bytes()
768 .await
769 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
770
771 // Use serde_path_to_error for better deserialization error messages
772 let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
773 serde_path_to_error::deserialize(deserializer).map_err(|err| {
774 let path = err.path().to_string();
775 // Use ConnectionError to provide detailed error message with field path
776 RestError::ConnectionError(format!(
777 "Failed to deserialize field '{}': {}",
778 path,
779 err.inner()
780 ))
781 })
782 } else {
783 let text = response
784 .text()
785 .await
786 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
787 Err(Self::status_to_error(status, text))
788 }
789 }
790}
791
792/// Tower Service integration for `CloudClient`
793///
794/// This module provides Tower Service implementations for `CloudClient`, enabling
795/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
796///
797/// # Feature Flag
798///
799/// This module is only available when the `tower-integration` feature is enabled.
800///
801/// # Examples
802///
803/// ```rust,ignore
804/// use redis_cloud::CloudClient;
805/// use redis_cloud::tower_support::ApiRequest;
806/// use tower::ServiceExt;
807///
808/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
809/// let client = CloudClient::builder()
810/// .api_key("your-key")
811/// .api_secret("your-secret")
812/// .build()?;
813///
814/// // Convert to a Tower service
815/// let mut service = client.into_service();
816///
817/// // Use the service
818/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
819/// println!("Status: {}", response.status);
820/// # Ok(())
821/// # }
822/// ```
823#[cfg(feature = "tower-integration")]
824pub mod tower_support {
825 use super::{CloudClient, RestError, Result};
826 use std::future::Future;
827 use std::pin::Pin;
828 use std::task::{Context, Poll};
829 use tower::Service;
830
831 /// HTTP method for API requests
832 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
833 pub enum Method {
834 /// GET request
835 Get,
836 /// POST request
837 Post,
838 /// PUT request
839 Put,
840 /// PATCH request
841 Patch,
842 /// DELETE request
843 Delete,
844 }
845
846 /// Tower-compatible request type for Redis Cloud API
847 ///
848 /// This wraps the essential components of an API request in a format
849 /// suitable for Tower middleware composition.
850 #[derive(Debug, Clone)]
851 pub struct ApiRequest {
852 /// HTTP method
853 pub method: Method,
854 /// API endpoint path (e.g., "/subscriptions")
855 pub path: String,
856 /// Optional JSON body for POST/PUT/PATCH requests
857 pub body: Option<serde_json::Value>,
858 }
859
860 impl ApiRequest {
861 /// Create a GET request
862 pub fn get(path: impl Into<String>) -> Self {
863 Self {
864 method: Method::Get,
865 path: path.into(),
866 body: None,
867 }
868 }
869
870 /// Create a POST request with a JSON body
871 pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
872 Self {
873 method: Method::Post,
874 path: path.into(),
875 body: Some(body),
876 }
877 }
878
879 /// Create a PUT request with a JSON body
880 pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
881 Self {
882 method: Method::Put,
883 path: path.into(),
884 body: Some(body),
885 }
886 }
887
888 /// Create a PATCH request with a JSON body
889 pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
890 Self {
891 method: Method::Patch,
892 path: path.into(),
893 body: Some(body),
894 }
895 }
896
897 /// Create a DELETE request
898 pub fn delete(path: impl Into<String>) -> Self {
899 Self {
900 method: Method::Delete,
901 path: path.into(),
902 body: None,
903 }
904 }
905 }
906
907 /// Tower-compatible response type
908 ///
909 /// Contains the HTTP status code and response body as JSON.
910 #[derive(Debug, Clone)]
911 pub struct ApiResponse {
912 /// HTTP status code
913 pub status: u16,
914 /// Response body as JSON
915 pub body: serde_json::Value,
916 }
917
918 impl CloudClient {
919 /// Convert this client into a Tower service
920 ///
921 /// This consumes the client and returns it wrapped in a Tower service
922 /// implementation, enabling middleware composition.
923 ///
924 /// # Examples
925 ///
926 /// ```rust,ignore
927 /// use redis_cloud::CloudClient;
928 /// use tower::ServiceExt;
929 /// use redis_cloud::tower_support::ApiRequest;
930 ///
931 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
932 /// let client = CloudClient::builder()
933 /// .api_key("key")
934 /// .api_secret("secret")
935 /// .build()?;
936 ///
937 /// let mut service = client.into_service();
938 /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
939 /// # Ok(())
940 /// # }
941 /// ```
942 #[must_use]
943 pub fn into_service(self) -> Self {
944 self
945 }
946 }
947
948 impl Service<ApiRequest> for CloudClient {
949 type Response = ApiResponse;
950 type Error = RestError;
951 type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
952
953 fn poll_ready(
954 &mut self,
955 _cx: &mut Context<'_>,
956 ) -> Poll<std::result::Result<(), Self::Error>> {
957 // CloudClient is always ready since it uses an internal connection pool
958 Poll::Ready(Ok(()))
959 }
960
961 fn call(&mut self, req: ApiRequest) -> Self::Future {
962 let client = self.clone();
963 Box::pin(async move {
964 let url = client.normalize_url(&req.path);
965
966 let request_builder = match req.method {
967 Method::Get => client.client.get(&url),
968 Method::Post => {
969 let body = req.body.ok_or_else(|| RestError::BadRequest {
970 message: "POST request requires a body".to_string(),
971 })?;
972 client.client.post(&url).json(&body)
973 }
974 Method::Put => {
975 let body = req.body.ok_or_else(|| RestError::BadRequest {
976 message: "PUT request requires a body".to_string(),
977 })?;
978 client.client.put(&url).json(&body)
979 }
980 Method::Patch => {
981 let body = req.body.ok_or_else(|| RestError::BadRequest {
982 message: "PATCH request requires a body".to_string(),
983 })?;
984 client.client.patch(&url).json(&body)
985 }
986 Method::Delete => client.client.delete(&url),
987 };
988
989 let response = request_builder
990 .header("x-api-key", &client.api_key)
991 .header("x-api-secret-key", &client.api_secret)
992 .send()
993 .await?;
994
995 let (status, body) = client.handle_response_with_status(response).await?;
996
997 Ok(ApiResponse { status, body })
998 })
999 }
1000 }
1001}