redis_cloud/client.rs
1//! Redis Cloud API client core implementation
2//!
3//! This module contains the core HTTP client for interacting with the Redis Cloud REST API.
4//! It provides authentication handling, request/response processing, and error management.
5//!
6//! The client is designed around a builder pattern for flexible configuration and supports
7//! both typed and untyped API interactions.
8
9use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16/// Default user agent for the Redis Cloud client
17const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19/// Builder for constructing a `CloudClient` with custom configuration
20///
21/// Provides a fluent interface for configuring API credentials, base URL, timeouts,
22/// and other client settings before creating the final `CloudClient` instance.
23///
24/// # Examples
25///
26/// ```rust,no_run
27/// use redis_cloud::CloudClient;
28///
29/// // Basic configuration
30/// let client = CloudClient::builder()
31/// .api_key("your-api-key")
32/// .api_secret("your-api-secret")
33/// .build()?;
34///
35/// // Advanced configuration
36/// let client = CloudClient::builder()
37/// .api_key("your-api-key")
38/// .api_secret("your-api-secret")
39/// .base_url("https://api.redislabs.com/v1".to_string())
40/// .timeout(std::time::Duration::from_secs(120))
41/// .build()?;
42/// # Ok::<(), Box<dyn std::error::Error>>(())
43/// ```
44#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46 api_key: Option<String>,
47 api_secret: Option<String>,
48 base_url: String,
49 timeout: std::time::Duration,
50 user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54 fn default() -> Self {
55 Self {
56 api_key: None,
57 api_secret: None,
58 base_url: "https://api.redislabs.com/v1".to_string(),
59 timeout: std::time::Duration::from_secs(30),
60 user_agent: DEFAULT_USER_AGENT.to_string(),
61 }
62 }
63}
64
65impl CloudClientBuilder {
66 /// Create a new builder
67 #[must_use]
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Set the API key
73 #[must_use]
74 pub fn api_key(mut self, key: impl Into<String>) -> Self {
75 self.api_key = Some(key.into());
76 self
77 }
78
79 /// Set the API secret
80 #[must_use]
81 pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
82 self.api_secret = Some(secret.into());
83 self
84 }
85
86 /// Set the base URL
87 #[must_use]
88 pub fn base_url(mut self, url: impl Into<String>) -> Self {
89 self.base_url = url.into();
90 self
91 }
92
93 /// Set the timeout
94 #[must_use]
95 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
96 self.timeout = timeout;
97 self
98 }
99
100 /// Set the user agent string for HTTP requests
101 ///
102 /// The default user agent is `redis-cloud/{version}`.
103 /// This can be overridden to identify specific clients, for example:
104 /// `redisctl/1.2.3` or `my-app/1.0.0`.
105 #[must_use]
106 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
107 self.user_agent = user_agent.into();
108 self
109 }
110
111 /// Build the client
112 pub fn build(self) -> Result<CloudClient> {
113 let api_key = self
114 .api_key
115 .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
116 let api_secret = self
117 .api_secret
118 .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
119
120 let mut default_headers = HeaderMap::new();
121 default_headers.insert(
122 USER_AGENT,
123 HeaderValue::from_str(&self.user_agent)
124 .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {e}")))?,
125 );
126
127 let client = Client::builder()
128 .timeout(self.timeout)
129 .default_headers(default_headers)
130 .build()
131 .map_err(|e| RestError::ConnectionError(e.to_string()))?;
132
133 Ok(CloudClient {
134 api_key,
135 api_secret,
136 base_url: self.base_url,
137 timeout: self.timeout,
138 client: Arc::new(client),
139 })
140 }
141}
142
143/// Redis Cloud API client
144#[derive(Clone)]
145pub struct CloudClient {
146 pub(crate) api_key: String,
147 pub(crate) api_secret: String,
148 pub(crate) base_url: String,
149 pub(crate) timeout: std::time::Duration,
150 pub(crate) client: Arc<Client>,
151}
152
153impl CloudClient {
154 /// Create a new builder for the client
155 #[must_use]
156 pub fn builder() -> CloudClientBuilder {
157 CloudClientBuilder::new()
158 }
159
160 /// Get the configured request timeout
161 ///
162 /// Returns the timeout duration that was set when building the client.
163 /// This timeout is applied to all HTTP requests made by this client.
164 #[must_use]
165 pub fn timeout(&self) -> std::time::Duration {
166 self.timeout
167 }
168
169 // ========================================================================
170 // Fluent API - Handler accessors
171 // ========================================================================
172
173 /// Get an account handler for account management operations
174 ///
175 /// # Example
176 ///
177 /// ```rust,no_run
178 /// # use redis_cloud::CloudClient;
179 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
180 /// let client = CloudClient::builder()
181 /// .api_key("key")
182 /// .api_secret("secret")
183 /// .build()?;
184 ///
185 /// let account = client.account().get_current_account().await?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 #[must_use]
190 pub fn account(&self) -> crate::AccountHandler {
191 crate::AccountHandler::new(self.clone())
192 }
193
194 /// Get a subscription handler for Pro subscription operations
195 ///
196 /// # Example
197 ///
198 /// ```rust,no_run
199 /// # use redis_cloud::CloudClient;
200 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
201 /// let client = CloudClient::builder()
202 /// .api_key("key")
203 /// .api_secret("secret")
204 /// .build()?;
205 ///
206 /// let subscriptions = client.subscriptions().get_all_subscriptions().await?;
207 /// # Ok(())
208 /// # }
209 /// ```
210 #[must_use]
211 pub fn subscriptions(&self) -> crate::SubscriptionHandler {
212 crate::SubscriptionHandler::new(self.clone())
213 }
214
215 /// Get a database handler for Pro database operations
216 ///
217 /// # Example
218 ///
219 /// ```rust,no_run
220 /// # use redis_cloud::CloudClient;
221 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222 /// let client = CloudClient::builder()
223 /// .api_key("key")
224 /// .api_secret("secret")
225 /// .build()?;
226 ///
227 /// let databases = client.databases().get_subscription_databases(123, None, None).await?;
228 /// # Ok(())
229 /// # }
230 /// ```
231 #[must_use]
232 pub fn databases(&self) -> crate::DatabaseHandler {
233 crate::DatabaseHandler::new(self.clone())
234 }
235
236 /// Get a fixed subscription handler for Essentials subscription operations
237 ///
238 /// # Example
239 ///
240 /// ```rust,no_run
241 /// # use redis_cloud::CloudClient;
242 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
243 /// let client = CloudClient::builder()
244 /// .api_key("key")
245 /// .api_secret("secret")
246 /// .build()?;
247 ///
248 /// let subscriptions = client.fixed_subscriptions().list().await?;
249 /// # Ok(())
250 /// # }
251 /// ```
252 #[must_use]
253 pub fn fixed_subscriptions(&self) -> crate::FixedSubscriptionHandler {
254 crate::FixedSubscriptionHandler::new(self.clone())
255 }
256
257 /// Get a fixed database handler for Essentials database operations
258 ///
259 /// # Example
260 ///
261 /// ```rust,no_run
262 /// # use redis_cloud::CloudClient;
263 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
264 /// let client = CloudClient::builder()
265 /// .api_key("key")
266 /// .api_secret("secret")
267 /// .build()?;
268 ///
269 /// let databases = client.fixed_databases().list(123, None, None).await?;
270 /// # Ok(())
271 /// # }
272 /// ```
273 #[must_use]
274 pub fn fixed_databases(&self) -> crate::FixedDatabaseHandler {
275 crate::FixedDatabaseHandler::new(self.clone())
276 }
277
278 /// Get an ACL handler for access control operations
279 ///
280 /// # Example
281 ///
282 /// ```rust,no_run
283 /// # use redis_cloud::CloudClient;
284 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
285 /// let client = CloudClient::builder()
286 /// .api_key("key")
287 /// .api_secret("secret")
288 /// .build()?;
289 ///
290 /// let users = client.acl().get_all_acl_users().await?;
291 /// # Ok(())
292 /// # }
293 /// ```
294 #[must_use]
295 pub fn acl(&self) -> crate::AclHandler {
296 crate::AclHandler::new(self.clone())
297 }
298
299 /// Get a users handler for user management operations
300 ///
301 /// # Example
302 ///
303 /// ```rust,no_run
304 /// # use redis_cloud::CloudClient;
305 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306 /// let client = CloudClient::builder()
307 /// .api_key("key")
308 /// .api_secret("secret")
309 /// .build()?;
310 ///
311 /// let users = client.users().get_all_users().await?;
312 /// # Ok(())
313 /// # }
314 /// ```
315 #[must_use]
316 pub fn users(&self) -> crate::UserHandler {
317 crate::UserHandler::new(self.clone())
318 }
319
320 /// Get a tasks handler for async operation tracking
321 ///
322 /// # Example
323 ///
324 /// ```rust,no_run
325 /// # use redis_cloud::CloudClient;
326 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
327 /// let client = CloudClient::builder()
328 /// .api_key("key")
329 /// .api_secret("secret")
330 /// .build()?;
331 ///
332 /// let tasks = client.tasks().get_all_tasks().await?;
333 /// # Ok(())
334 /// # }
335 /// ```
336 #[must_use]
337 pub fn tasks(&self) -> crate::TaskHandler {
338 crate::TaskHandler::new(self.clone())
339 }
340
341 /// Get a cloud accounts handler for cloud provider integration
342 ///
343 /// # Example
344 ///
345 /// ```rust,no_run
346 /// # use redis_cloud::CloudClient;
347 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
348 /// let client = CloudClient::builder()
349 /// .api_key("key")
350 /// .api_secret("secret")
351 /// .build()?;
352 ///
353 /// let accounts = client.cloud_accounts().get_cloud_accounts().await?;
354 /// # Ok(())
355 /// # }
356 /// ```
357 #[must_use]
358 pub fn cloud_accounts(&self) -> crate::CloudAccountHandler {
359 crate::CloudAccountHandler::new(self.clone())
360 }
361
362 /// Get a VPC peering handler for VPC peering operations
363 ///
364 /// # Example
365 ///
366 /// ```rust,no_run
367 /// # use redis_cloud::CloudClient;
368 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369 /// let client = CloudClient::builder()
370 /// .api_key("key")
371 /// .api_secret("secret")
372 /// .build()?;
373 ///
374 /// let peering = client.vpc_peering().get(123).await?;
375 /// # Ok(())
376 /// # }
377 /// ```
378 #[must_use]
379 pub fn vpc_peering(&self) -> crate::VpcPeeringHandler {
380 crate::VpcPeeringHandler::new(self.clone())
381 }
382
383 /// Get a transit gateway handler for AWS Transit Gateway operations
384 ///
385 /// # Example
386 ///
387 /// ```rust,no_run
388 /// # use redis_cloud::CloudClient;
389 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
390 /// let client = CloudClient::builder()
391 /// .api_key("key")
392 /// .api_secret("secret")
393 /// .build()?;
394 ///
395 /// let attachments = client.transit_gateway().get_attachments(123).await?;
396 /// # Ok(())
397 /// # }
398 /// ```
399 #[must_use]
400 pub fn transit_gateway(&self) -> crate::TransitGatewayHandler {
401 crate::TransitGatewayHandler::new(self.clone())
402 }
403
404 /// Get a Private Service Connect handler for GCP PSC operations
405 ///
406 /// # Example
407 ///
408 /// ```rust,no_run
409 /// # use redis_cloud::CloudClient;
410 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411 /// let client = CloudClient::builder()
412 /// .api_key("key")
413 /// .api_secret("secret")
414 /// .build()?;
415 ///
416 /// let service = client.psc().get_service(123).await?;
417 /// # Ok(())
418 /// # }
419 /// ```
420 #[must_use]
421 pub fn psc(&self) -> crate::PscHandler {
422 crate::PscHandler::new(self.clone())
423 }
424
425 /// Get a `PrivateLink` handler for AWS `PrivateLink` operations
426 ///
427 /// # Example
428 ///
429 /// ```rust,no_run
430 /// # use redis_cloud::CloudClient;
431 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
432 /// let client = CloudClient::builder()
433 /// .api_key("key")
434 /// .api_secret("secret")
435 /// .build()?;
436 ///
437 /// let config = client.private_link().get(123).await?;
438 /// # Ok(())
439 /// # }
440 /// ```
441 #[must_use]
442 pub fn private_link(&self) -> crate::PrivateLinkHandler {
443 crate::PrivateLinkHandler::new(self.clone())
444 }
445
446 /// Get a cost report handler for generating cost reports
447 ///
448 /// # Example
449 ///
450 /// ```rust,no_run
451 /// # use redis_cloud::CloudClient;
452 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
453 /// let client = CloudClient::builder()
454 /// .api_key("key")
455 /// .api_secret("secret")
456 /// .build()?;
457 ///
458 /// let handler = client.cost_reports();
459 /// # Ok(())
460 /// # }
461 /// ```
462 #[must_use]
463 pub fn cost_reports(&self) -> crate::CostReportHandler {
464 crate::CostReportHandler::new(self.clone())
465 }
466
467 /// Normalize URL path concatenation to avoid double slashes
468 fn normalize_url(&self, path: &str) -> String {
469 let base = self.base_url.trim_end_matches('/');
470 let path = path.trim_start_matches('/');
471 format!("{base}/{path}")
472 }
473
474 /// Convert HTTP status code and response text to appropriate error
475 ///
476 /// This is a helper to avoid duplicating the error handling pattern
477 /// across multiple methods.
478 fn status_to_error(status: reqwest::StatusCode, text: String) -> RestError {
479 match status.as_u16() {
480 400 => RestError::BadRequest { message: text },
481 401 => RestError::AuthenticationFailed { message: text },
482 403 => RestError::Forbidden { message: text },
483 404 => RestError::NotFound { message: text },
484 412 => RestError::PreconditionFailed,
485 429 => RestError::RateLimited { message: text },
486 500 => RestError::InternalServerError { message: text },
487 503 => RestError::ServiceUnavailable { message: text },
488 _ => RestError::ApiError {
489 code: status.as_u16(),
490 message: text,
491 },
492 }
493 }
494
495 /// Make a GET request with API key authentication
496 #[instrument(skip(self), fields(method = "GET"))]
497 pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
498 let url = self.normalize_url(path);
499 debug!("GET {}", url);
500
501 // Redis Cloud API uses these headers for authentication
502 let response = self
503 .client
504 .get(&url)
505 .header("x-api-key", &self.api_key)
506 .header("x-api-secret-key", &self.api_secret)
507 .send()
508 .await?;
509
510 trace!("Response status: {}", response.status());
511 self.handle_response(response).await
512 }
513
514 /// Make a POST request
515 #[instrument(skip(self, body), fields(method = "POST"))]
516 pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
517 &self,
518 path: &str,
519 body: &B,
520 ) -> Result<T> {
521 let url = self.normalize_url(path);
522 debug!("POST {}", url);
523 trace!("Request body: {:?}", serde_json::to_value(body).ok());
524
525 // Same backwards header naming as GET
526 let response = self
527 .client
528 .post(&url)
529 .header("x-api-key", &self.api_key)
530 .header("x-api-secret-key", &self.api_secret)
531 .json(body)
532 .send()
533 .await?;
534
535 trace!("Response status: {}", response.status());
536 self.handle_response(response).await
537 }
538
539 /// Make a PUT request
540 #[instrument(skip(self, body), fields(method = "PUT"))]
541 pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
542 &self,
543 path: &str,
544 body: &B,
545 ) -> Result<T> {
546 let url = self.normalize_url(path);
547 debug!("PUT {}", url);
548 trace!("Request body: {:?}", serde_json::to_value(body).ok());
549
550 // Same backwards header naming as GET
551 let response = self
552 .client
553 .put(&url)
554 .header("x-api-key", &self.api_key)
555 .header("x-api-secret-key", &self.api_secret)
556 .json(body)
557 .send()
558 .await?;
559
560 trace!("Response status: {}", response.status());
561 self.handle_response(response).await
562 }
563
564 /// Make a DELETE request
565 #[instrument(skip(self), fields(method = "DELETE"))]
566 pub async fn delete(&self, path: &str) -> Result<()> {
567 let url = self.normalize_url(path);
568 debug!("DELETE {}", url);
569
570 // Same backwards header naming as GET
571 let response = self
572 .client
573 .delete(&url)
574 .header("x-api-key", &self.api_key)
575 .header("x-api-secret-key", &self.api_secret)
576 .send()
577 .await?;
578
579 trace!("Response status: {}", response.status());
580 if response.status().is_success() {
581 Ok(())
582 } else {
583 let status = response.status();
584 let text = response
585 .text()
586 .await
587 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
588 Err(Self::status_to_error(status, text))
589 }
590 }
591
592 /// Execute a bodyless DELETE request, deserializing the response body.
593 ///
594 /// Unlike [`Self::delete`] (which discards the body) this parses the
595 /// response into `T`. Connectivity deletes are asynchronous and the spec
596 /// returns a [`TaskStateUpdate`](crate::types::TaskStateUpdate) so callers
597 /// can poll the resulting task to completion.
598 #[instrument(skip(self), fields(method = "DELETE"))]
599 pub async fn delete_typed<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
600 let url = self.normalize_url(path);
601 debug!("DELETE {}", url);
602
603 let response = self
604 .client
605 .delete(&url)
606 .header("x-api-key", &self.api_key)
607 .header("x-api-secret-key", &self.api_secret)
608 .send()
609 .await?;
610
611 trace!("Response status: {}", response.status());
612 self.handle_response(response).await
613 }
614
615 /// Execute raw GET request returning JSON Value
616 #[instrument(skip(self), fields(method = "GET"))]
617 pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
618 self.get(path).await
619 }
620
621 /// Execute GET request returning raw bytes
622 ///
623 /// Useful for downloading binary content like cost reports or other files.
624 #[instrument(skip(self), fields(method = "GET"))]
625 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
626 let url = self.normalize_url(path);
627 debug!("GET {} (bytes)", url);
628
629 let response = self
630 .client
631 .get(&url)
632 .header("x-api-key", &self.api_key)
633 .header("x-api-secret-key", &self.api_secret)
634 .send()
635 .await?;
636
637 trace!("Response status: {}", response.status());
638 let status = response.status();
639
640 if status.is_success() {
641 response
642 .bytes()
643 .await
644 .map(|b| b.to_vec())
645 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))
646 } else {
647 let text = response
648 .text()
649 .await
650 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
651 Err(Self::status_to_error(status, text))
652 }
653 }
654
655 /// Execute raw POST request with JSON body
656 #[instrument(skip(self, body), fields(method = "POST"))]
657 pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
658 self.post(path, &body).await
659 }
660
661 /// Execute raw PUT request with JSON body
662 #[instrument(skip(self, body), fields(method = "PUT"))]
663 pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
664 self.put(path, &body).await
665 }
666
667 /// Execute raw PATCH request with JSON body
668 #[instrument(skip(self, body), fields(method = "PATCH"))]
669 pub async fn patch_raw(
670 &self,
671 path: &str,
672 body: serde_json::Value,
673 ) -> Result<serde_json::Value> {
674 let url = self.normalize_url(path);
675 debug!("PATCH {}", url);
676 trace!("Request body: {:?}", body);
677
678 // Use backwards header names for compatibility
679 let response = self
680 .client
681 .patch(&url)
682 .header("x-api-key", &self.api_key)
683 .header("x-api-secret-key", &self.api_secret)
684 .json(&body)
685 .send()
686 .await?;
687
688 trace!("Response status: {}", response.status());
689 self.handle_response(response).await
690 }
691
692 /// Execute raw DELETE request returning any response body
693 #[instrument(skip(self), fields(method = "DELETE"))]
694 pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
695 let url = self.normalize_url(path);
696 debug!("DELETE {}", url);
697
698 // Use backwards header names for compatibility
699 let response = self
700 .client
701 .delete(&url)
702 .header("x-api-key", &self.api_key)
703 .header("x-api-secret-key", &self.api_secret)
704 .send()
705 .await?;
706
707 trace!("Response status: {}", response.status());
708 if response.status().is_success() {
709 if response.content_length() == Some(0) {
710 Ok(serde_json::json!({"status": "deleted"}))
711 } else {
712 response.json().await.map_err(Into::into)
713 }
714 } else {
715 let status = response.status();
716 let text = response
717 .text()
718 .await
719 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
720 Err(Self::status_to_error(status, text))
721 }
722 }
723
724 /// Execute DELETE request with JSON body (used by some endpoints like `PrivateLink` principals)
725 #[instrument(skip(self, body), fields(method = "DELETE"))]
726 pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
727 &self,
728 path: &str,
729 body: serde_json::Value,
730 ) -> Result<T> {
731 let url = self.normalize_url(path);
732 debug!("DELETE {} (with body)", url);
733 trace!("Request body: {:?}", body);
734
735 let response = self
736 .client
737 .delete(&url)
738 .header("x-api-key", &self.api_key)
739 .header("x-api-secret-key", &self.api_secret)
740 .json(&body)
741 .send()
742 .await?;
743
744 trace!("Response status: {}", response.status());
745 self.handle_response(response).await
746 }
747
748 /// Handle HTTP response and return both status code and body as JSON
749 ///
750 /// This is used internally by the Tower service implementation to preserve
751 /// the actual HTTP status code in responses.
752 #[cfg(feature = "tower-integration")]
753 async fn handle_response_with_status(
754 &self,
755 response: reqwest::Response,
756 ) -> Result<(u16, serde_json::Value)> {
757 let status = response.status();
758 let status_code = status.as_u16();
759
760 if status.is_success() {
761 let bytes = response
762 .bytes()
763 .await
764 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
765
766 let value: serde_json::Value = serde_json::from_slice(&bytes).map_err(|e| {
767 RestError::ConnectionError(format!("Failed to parse JSON response: {e}"))
768 })?;
769
770 Ok((status_code, value))
771 } else {
772 let text = response
773 .text()
774 .await
775 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
776 Err(Self::status_to_error(status, text))
777 }
778 }
779
780 /// Handle HTTP response
781 async fn handle_response<T: serde::de::DeserializeOwned>(
782 &self,
783 response: reqwest::Response,
784 ) -> Result<T> {
785 let status = response.status();
786
787 if status.is_success() {
788 // Get the response bytes for better error reporting
789 let bytes = response
790 .bytes()
791 .await
792 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {e}")))?;
793
794 // Treat an empty success body (e.g. HTTP 204 No Content from the
795 // traffic-resume endpoints) as JSON `null` so it deserializes
796 // cleanly into `()`, `Option<T>`, or `serde_json::Value::Null`.
797 let bytes: &[u8] = if bytes.is_empty() { b"null" } else { &bytes };
798
799 // Use serde_path_to_error for better deserialization error messages
800 let deserializer = &mut serde_json::Deserializer::from_slice(bytes);
801 serde_path_to_error::deserialize(deserializer).map_err(|err| {
802 let path = err.path().to_string();
803 // Use ConnectionError to provide detailed error message with field path
804 RestError::ConnectionError(format!(
805 "Failed to deserialize field '{}': {}",
806 path,
807 err.inner()
808 ))
809 })
810 } else {
811 let text = response
812 .text()
813 .await
814 .unwrap_or_else(|e| format!("(failed to read response body: {e})"));
815 Err(Self::status_to_error(status, text))
816 }
817 }
818}
819
820/// Tower Service integration for `CloudClient`
821///
822/// This module provides Tower Service implementations for `CloudClient`, enabling
823/// middleware composition with patterns like circuit breakers, retry, and rate limiting.
824///
825/// # Feature Flag
826///
827/// This module is only available when the `tower-integration` feature is enabled.
828///
829/// # Examples
830///
831/// ```rust,ignore
832/// use redis_cloud::CloudClient;
833/// use redis_cloud::tower_support::ApiRequest;
834/// use tower::ServiceExt;
835///
836/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
837/// let client = CloudClient::builder()
838/// .api_key("your-key")
839/// .api_secret("your-secret")
840/// .build()?;
841///
842/// // Convert to a Tower service
843/// let mut service = client.into_service();
844///
845/// // Use the service
846/// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
847/// println!("Status: {}", response.status);
848/// # Ok(())
849/// # }
850/// ```
851#[cfg(feature = "tower-integration")]
852pub mod tower_support {
853 use super::{CloudClient, RestError, Result};
854 use std::future::Future;
855 use std::pin::Pin;
856 use std::task::{Context, Poll};
857 use tower::Service;
858
859 /// HTTP method for API requests
860 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
861 pub enum Method {
862 /// GET request
863 Get,
864 /// POST request
865 Post,
866 /// PUT request
867 Put,
868 /// PATCH request
869 Patch,
870 /// DELETE request
871 Delete,
872 }
873
874 /// Tower-compatible request type for Redis Cloud API
875 ///
876 /// This wraps the essential components of an API request in a format
877 /// suitable for Tower middleware composition.
878 #[derive(Debug, Clone)]
879 pub struct ApiRequest {
880 /// HTTP method
881 pub method: Method,
882 /// API endpoint path (e.g., "/subscriptions")
883 pub path: String,
884 /// Optional JSON body for POST/PUT/PATCH requests
885 pub body: Option<serde_json::Value>,
886 }
887
888 impl ApiRequest {
889 /// Create a GET request
890 pub fn get(path: impl Into<String>) -> Self {
891 Self {
892 method: Method::Get,
893 path: path.into(),
894 body: None,
895 }
896 }
897
898 /// Create a POST request with a JSON body
899 pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
900 Self {
901 method: Method::Post,
902 path: path.into(),
903 body: Some(body),
904 }
905 }
906
907 /// Create a PUT request with a JSON body
908 pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
909 Self {
910 method: Method::Put,
911 path: path.into(),
912 body: Some(body),
913 }
914 }
915
916 /// Create a PATCH request with a JSON body
917 pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
918 Self {
919 method: Method::Patch,
920 path: path.into(),
921 body: Some(body),
922 }
923 }
924
925 /// Create a DELETE request
926 pub fn delete(path: impl Into<String>) -> Self {
927 Self {
928 method: Method::Delete,
929 path: path.into(),
930 body: None,
931 }
932 }
933 }
934
935 /// Tower-compatible response type
936 ///
937 /// Contains the HTTP status code and response body as JSON.
938 #[derive(Debug, Clone)]
939 pub struct ApiResponse {
940 /// HTTP status code
941 pub status: u16,
942 /// Response body as JSON
943 pub body: serde_json::Value,
944 }
945
946 impl CloudClient {
947 /// Convert this client into a Tower service
948 ///
949 /// This consumes the client and returns it wrapped in a Tower service
950 /// implementation, enabling middleware composition.
951 ///
952 /// # Examples
953 ///
954 /// ```rust,ignore
955 /// use redis_cloud::CloudClient;
956 /// use tower::ServiceExt;
957 /// use redis_cloud::tower_support::ApiRequest;
958 ///
959 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
960 /// let client = CloudClient::builder()
961 /// .api_key("key")
962 /// .api_secret("secret")
963 /// .build()?;
964 ///
965 /// let mut service = client.into_service();
966 /// let response = service.oneshot(ApiRequest::get("/subscriptions")).await?;
967 /// # Ok(())
968 /// # }
969 /// ```
970 #[must_use]
971 pub fn into_service(self) -> Self {
972 self
973 }
974 }
975
976 impl Service<ApiRequest> for CloudClient {
977 type Response = ApiResponse;
978 type Error = RestError;
979 type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
980
981 fn poll_ready(
982 &mut self,
983 _cx: &mut Context<'_>,
984 ) -> Poll<std::result::Result<(), Self::Error>> {
985 // CloudClient is always ready since it uses an internal connection pool
986 Poll::Ready(Ok(()))
987 }
988
989 fn call(&mut self, req: ApiRequest) -> Self::Future {
990 let client = self.clone();
991 Box::pin(async move {
992 let url = client.normalize_url(&req.path);
993
994 let request_builder = match req.method {
995 Method::Get => client.client.get(&url),
996 Method::Post => {
997 let body = req.body.ok_or_else(|| RestError::BadRequest {
998 message: "POST request requires a body".to_string(),
999 })?;
1000 client.client.post(&url).json(&body)
1001 }
1002 Method::Put => {
1003 let body = req.body.ok_or_else(|| RestError::BadRequest {
1004 message: "PUT request requires a body".to_string(),
1005 })?;
1006 client.client.put(&url).json(&body)
1007 }
1008 Method::Patch => {
1009 let body = req.body.ok_or_else(|| RestError::BadRequest {
1010 message: "PATCH request requires a body".to_string(),
1011 })?;
1012 client.client.patch(&url).json(&body)
1013 }
1014 Method::Delete => client.client.delete(&url),
1015 };
1016
1017 let response = request_builder
1018 .header("x-api-key", &client.api_key)
1019 .header("x-api-secret-key", &client.api_secret)
1020 .send()
1021 .await?;
1022
1023 let (status, body) = client.handle_response_with_status(response).await?;
1024
1025 Ok(ApiResponse { status, body })
1026 })
1027 }
1028 }
1029}