1use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46 api_key: Option<String>,
47 api_secret: Option<String>,
48 base_url: String,
49 timeout: std::time::Duration,
50 user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54 fn default() -> Self {
55 Self {
56 api_key: None,
57 api_secret: None,
58 base_url: "https://api.redislabs.com/v1".to_string(),
59 timeout: std::time::Duration::from_secs(30),
60 user_agent: DEFAULT_USER_AGENT.to_string(),
61 }
62 }
63}
64
65impl CloudClientBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn api_key(mut self, key: impl Into<String>) -> Self {
73 self.api_key = Some(key.into());
74 self
75 }
76
77 pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
79 self.api_secret = Some(secret.into());
80 self
81 }
82
83 pub fn base_url(mut self, url: impl Into<String>) -> Self {
85 self.base_url = url.into();
86 self
87 }
88
89 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
91 self.timeout = timeout;
92 self
93 }
94
95 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
101 self.user_agent = user_agent.into();
102 self
103 }
104
105 pub fn build(self) -> Result<CloudClient> {
107 let api_key = self
108 .api_key
109 .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
110 let api_secret = self
111 .api_secret
112 .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
113
114 let mut default_headers = HeaderMap::new();
115 default_headers.insert(
116 USER_AGENT,
117 HeaderValue::from_str(&self.user_agent)
118 .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {}", e)))?,
119 );
120
121 let client = Client::builder()
122 .timeout(self.timeout)
123 .default_headers(default_headers)
124 .build()
125 .map_err(|e| RestError::ConnectionError(e.to_string()))?;
126
127 Ok(CloudClient {
128 api_key,
129 api_secret,
130 base_url: self.base_url,
131 timeout: self.timeout,
132 client: Arc::new(client),
133 })
134 }
135}
136
137#[derive(Clone)]
139pub struct CloudClient {
140 pub(crate) api_key: String,
141 pub(crate) api_secret: String,
142 pub(crate) base_url: String,
143 #[allow(dead_code)]
144 pub(crate) timeout: std::time::Duration,
145 pub(crate) client: Arc<Client>,
146}
147
148impl CloudClient {
149 pub fn builder() -> CloudClientBuilder {
151 CloudClientBuilder::new()
152 }
153
154 fn normalize_url(&self, path: &str) -> String {
156 let base = self.base_url.trim_end_matches('/');
157 let path = path.trim_start_matches('/');
158 format!("{}/{}", base, path)
159 }
160
161 #[instrument(skip(self), fields(method = "GET"))]
163 pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
164 let url = self.normalize_url(path);
165 debug!("GET {}", url);
166
167 let response = self
169 .client
170 .get(&url)
171 .header("x-api-key", &self.api_key)
172 .header("x-api-secret-key", &self.api_secret)
173 .send()
174 .await?;
175
176 trace!("Response status: {}", response.status());
177 self.handle_response(response).await
178 }
179
180 #[instrument(skip(self, body), fields(method = "POST"))]
182 pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
183 &self,
184 path: &str,
185 body: &B,
186 ) -> Result<T> {
187 let url = self.normalize_url(path);
188 debug!("POST {}", url);
189 trace!("Request body: {:?}", serde_json::to_value(body).ok());
190
191 let response = self
193 .client
194 .post(&url)
195 .header("x-api-key", &self.api_key)
196 .header("x-api-secret-key", &self.api_secret)
197 .json(body)
198 .send()
199 .await?;
200
201 trace!("Response status: {}", response.status());
202 self.handle_response(response).await
203 }
204
205 #[instrument(skip(self, body), fields(method = "PUT"))]
207 pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
208 &self,
209 path: &str,
210 body: &B,
211 ) -> Result<T> {
212 let url = self.normalize_url(path);
213 debug!("PUT {}", url);
214 trace!("Request body: {:?}", serde_json::to_value(body).ok());
215
216 let response = self
218 .client
219 .put(&url)
220 .header("x-api-key", &self.api_key)
221 .header("x-api-secret-key", &self.api_secret)
222 .json(body)
223 .send()
224 .await?;
225
226 trace!("Response status: {}", response.status());
227 self.handle_response(response).await
228 }
229
230 #[instrument(skip(self), fields(method = "DELETE"))]
232 pub async fn delete(&self, path: &str) -> Result<()> {
233 let url = self.normalize_url(path);
234 debug!("DELETE {}", url);
235
236 let response = self
238 .client
239 .delete(&url)
240 .header("x-api-key", &self.api_key)
241 .header("x-api-secret-key", &self.api_secret)
242 .send()
243 .await?;
244
245 trace!("Response status: {}", response.status());
246 if response.status().is_success() {
247 Ok(())
248 } else {
249 let status = response.status();
250 let text = response.text().await.unwrap_or_default();
251
252 match status.as_u16() {
253 400 => Err(RestError::BadRequest { message: text }),
254 401 => Err(RestError::AuthenticationFailed { message: text }),
255 403 => Err(RestError::Forbidden { message: text }),
256 404 => Err(RestError::NotFound { message: text }),
257 412 => Err(RestError::PreconditionFailed),
258 429 => Err(RestError::RateLimited { message: text }),
259 500 => Err(RestError::InternalServerError { message: text }),
260 503 => Err(RestError::ServiceUnavailable { message: text }),
261 _ => Err(RestError::ApiError {
262 code: status.as_u16(),
263 message: text,
264 }),
265 }
266 }
267 }
268
269 #[instrument(skip(self), fields(method = "GET"))]
271 pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
272 self.get(path).await
273 }
274
275 #[instrument(skip(self), fields(method = "GET"))]
279 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
280 let url = self.normalize_url(path);
281 debug!("GET {} (bytes)", url);
282
283 let response = self
284 .client
285 .get(&url)
286 .header("x-api-key", &self.api_key)
287 .header("x-api-secret-key", &self.api_secret)
288 .send()
289 .await?;
290
291 trace!("Response status: {}", response.status());
292 let status = response.status();
293
294 if status.is_success() {
295 response
296 .bytes()
297 .await
298 .map(|b| b.to_vec())
299 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {}", e)))
300 } else {
301 let text = response.text().await.unwrap_or_default();
302
303 match status.as_u16() {
304 400 => Err(RestError::BadRequest { message: text }),
305 401 => Err(RestError::AuthenticationFailed { message: text }),
306 403 => Err(RestError::Forbidden { message: text }),
307 404 => Err(RestError::NotFound { message: text }),
308 412 => Err(RestError::PreconditionFailed),
309 429 => Err(RestError::RateLimited { message: text }),
310 500 => Err(RestError::InternalServerError { message: text }),
311 503 => Err(RestError::ServiceUnavailable { message: text }),
312 _ => Err(RestError::ApiError {
313 code: status.as_u16(),
314 message: text,
315 }),
316 }
317 }
318 }
319
320 #[instrument(skip(self, body), fields(method = "POST"))]
322 pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
323 self.post(path, &body).await
324 }
325
326 #[instrument(skip(self, body), fields(method = "PUT"))]
328 pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
329 self.put(path, &body).await
330 }
331
332 #[instrument(skip(self, body), fields(method = "PATCH"))]
334 pub async fn patch_raw(
335 &self,
336 path: &str,
337 body: serde_json::Value,
338 ) -> Result<serde_json::Value> {
339 let url = self.normalize_url(path);
340 debug!("PATCH {}", url);
341 trace!("Request body: {:?}", body);
342
343 let response = self
345 .client
346 .patch(&url)
347 .header("x-api-key", &self.api_key)
348 .header("x-api-secret-key", &self.api_secret)
349 .json(&body)
350 .send()
351 .await?;
352
353 trace!("Response status: {}", response.status());
354 self.handle_response(response).await
355 }
356
357 #[instrument(skip(self), fields(method = "DELETE"))]
359 pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
360 let url = self.normalize_url(path);
361 debug!("DELETE {}", url);
362
363 let response = self
365 .client
366 .delete(&url)
367 .header("x-api-key", &self.api_key)
368 .header("x-api-secret-key", &self.api_secret)
369 .send()
370 .await?;
371
372 trace!("Response status: {}", response.status());
373 if response.status().is_success() {
374 if response.content_length() == Some(0) {
375 Ok(serde_json::json!({"status": "deleted"}))
376 } else {
377 response.json().await.map_err(Into::into)
378 }
379 } else {
380 let status = response.status();
381 let text = response.text().await.unwrap_or_default();
382
383 match status.as_u16() {
384 400 => Err(RestError::BadRequest { message: text }),
385 401 => Err(RestError::AuthenticationFailed { message: text }),
386 403 => Err(RestError::Forbidden { message: text }),
387 404 => Err(RestError::NotFound { message: text }),
388 412 => Err(RestError::PreconditionFailed),
389 429 => Err(RestError::RateLimited { message: text }),
390 500 => Err(RestError::InternalServerError { message: text }),
391 503 => Err(RestError::ServiceUnavailable { message: text }),
392 _ => Err(RestError::ApiError {
393 code: status.as_u16(),
394 message: text,
395 }),
396 }
397 }
398 }
399
400 #[instrument(skip(self, body), fields(method = "DELETE"))]
402 pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
403 &self,
404 path: &str,
405 body: serde_json::Value,
406 ) -> Result<T> {
407 let url = self.normalize_url(path);
408 debug!("DELETE {} (with body)", url);
409 trace!("Request body: {:?}", body);
410
411 let response = self
412 .client
413 .delete(&url)
414 .header("x-api-key", &self.api_key)
415 .header("x-api-secret-key", &self.api_secret)
416 .json(&body)
417 .send()
418 .await?;
419
420 trace!("Response status: {}", response.status());
421 self.handle_response(response).await
422 }
423
424 async fn handle_response<T: serde::de::DeserializeOwned>(
426 &self,
427 response: reqwest::Response,
428 ) -> Result<T> {
429 let status = response.status();
430
431 if status.is_success() {
432 let bytes = response.bytes().await.map_err(|e| {
434 RestError::ConnectionError(format!("Failed to read response: {}", e))
435 })?;
436
437 let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
439 serde_path_to_error::deserialize(deserializer).map_err(|err| {
440 let path = err.path().to_string();
441 RestError::ConnectionError(format!(
443 "Failed to deserialize field '{}': {}",
444 path,
445 err.inner()
446 ))
447 })
448 } else {
449 let text = response.text().await.unwrap_or_default();
450
451 match status.as_u16() {
452 400 => Err(RestError::BadRequest { message: text }),
453 401 => Err(RestError::AuthenticationFailed { message: text }),
454 403 => Err(RestError::Forbidden { message: text }),
455 404 => Err(RestError::NotFound { message: text }),
456 412 => Err(RestError::PreconditionFailed),
457 429 => Err(RestError::RateLimited { message: text }),
458 500 => Err(RestError::InternalServerError { message: text }),
459 503 => Err(RestError::ServiceUnavailable { message: text }),
460 _ => Err(RestError::ApiError {
461 code: status.as_u16(),
462 message: text,
463 }),
464 }
465 }
466 }
467}
468
469#[cfg(feature = "tower-integration")]
501pub mod tower_support {
502 use super::*;
503 use std::future::Future;
504 use std::pin::Pin;
505 use std::task::{Context, Poll};
506 use tower::Service;
507
508 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
510 pub enum Method {
511 Get,
513 Post,
515 Put,
517 Patch,
519 Delete,
521 }
522
523 #[derive(Debug, Clone)]
528 pub struct ApiRequest {
529 pub method: Method,
531 pub path: String,
533 pub body: Option<serde_json::Value>,
535 }
536
537 impl ApiRequest {
538 pub fn get(path: impl Into<String>) -> Self {
540 Self {
541 method: Method::Get,
542 path: path.into(),
543 body: None,
544 }
545 }
546
547 pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
549 Self {
550 method: Method::Post,
551 path: path.into(),
552 body: Some(body),
553 }
554 }
555
556 pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
558 Self {
559 method: Method::Put,
560 path: path.into(),
561 body: Some(body),
562 }
563 }
564
565 pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
567 Self {
568 method: Method::Patch,
569 path: path.into(),
570 body: Some(body),
571 }
572 }
573
574 pub fn delete(path: impl Into<String>) -> Self {
576 Self {
577 method: Method::Delete,
578 path: path.into(),
579 body: None,
580 }
581 }
582 }
583
584 #[derive(Debug, Clone)]
588 pub struct ApiResponse {
589 pub status: u16,
591 pub body: serde_json::Value,
593 }
594
595 impl CloudClient {
596 pub fn into_service(self) -> Self {
620 self
621 }
622 }
623
624 impl Service<ApiRequest> for CloudClient {
625 type Response = ApiResponse;
626 type Error = RestError;
627 type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
628
629 fn poll_ready(
630 &mut self,
631 _cx: &mut Context<'_>,
632 ) -> Poll<std::result::Result<(), Self::Error>> {
633 Poll::Ready(Ok(()))
635 }
636
637 fn call(&mut self, req: ApiRequest) -> Self::Future {
638 let client = self.clone();
639 Box::pin(async move {
640 let response: serde_json::Value = match req.method {
641 Method::Get => client.get_raw(&req.path).await?,
642 Method::Post => {
643 let body = req.body.ok_or_else(|| RestError::BadRequest {
644 message: "POST request requires a body".to_string(),
645 })?;
646 client.post_raw(&req.path, body).await?
647 }
648 Method::Put => {
649 let body = req.body.ok_or_else(|| RestError::BadRequest {
650 message: "PUT request requires a body".to_string(),
651 })?;
652 client.put_raw(&req.path, body).await?
653 }
654 Method::Patch => {
655 let body = req.body.ok_or_else(|| RestError::BadRequest {
656 message: "PATCH request requires a body".to_string(),
657 })?;
658 client.patch_raw(&req.path, body).await?
659 }
660 Method::Delete => client.delete_raw(&req.path).await?,
661 };
662
663 Ok(ApiResponse {
664 status: 200,
665 body: response,
666 })
667 })
668 }
669 }
670}