1use crate::{CloudError as RestError, Result};
10use reqwest::Client;
11use reqwest::header::{HeaderMap, HeaderValue, USER_AGENT};
12use serde::Serialize;
13use std::sync::Arc;
14use tracing::{debug, instrument, trace};
15
16const DEFAULT_USER_AGENT: &str = concat!("redis-cloud/", env!("CARGO_PKG_VERSION"));
18
19#[derive(Debug, Clone)]
45pub struct CloudClientBuilder {
46 api_key: Option<String>,
47 api_secret: Option<String>,
48 base_url: String,
49 timeout: std::time::Duration,
50 user_agent: String,
51}
52
53impl Default for CloudClientBuilder {
54 fn default() -> Self {
55 Self {
56 api_key: None,
57 api_secret: None,
58 base_url: "https://api.redislabs.com/v1".to_string(),
59 timeout: std::time::Duration::from_secs(30),
60 user_agent: DEFAULT_USER_AGENT.to_string(),
61 }
62 }
63}
64
65impl CloudClientBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn api_key(mut self, key: impl Into<String>) -> Self {
73 self.api_key = Some(key.into());
74 self
75 }
76
77 pub fn api_secret(mut self, secret: impl Into<String>) -> Self {
79 self.api_secret = Some(secret.into());
80 self
81 }
82
83 pub fn base_url(mut self, url: impl Into<String>) -> Self {
85 self.base_url = url.into();
86 self
87 }
88
89 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
91 self.timeout = timeout;
92 self
93 }
94
95 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
101 self.user_agent = user_agent.into();
102 self
103 }
104
105 pub fn build(self) -> Result<CloudClient> {
107 let api_key = self
108 .api_key
109 .ok_or_else(|| RestError::ConnectionError("API key is required".to_string()))?;
110 let api_secret = self
111 .api_secret
112 .ok_or_else(|| RestError::ConnectionError("API secret is required".to_string()))?;
113
114 let mut default_headers = HeaderMap::new();
115 default_headers.insert(
116 USER_AGENT,
117 HeaderValue::from_str(&self.user_agent)
118 .map_err(|e| RestError::ConnectionError(format!("Invalid user agent: {}", e)))?,
119 );
120
121 let client = Client::builder()
122 .timeout(self.timeout)
123 .default_headers(default_headers)
124 .build()
125 .map_err(|e| RestError::ConnectionError(e.to_string()))?;
126
127 Ok(CloudClient {
128 api_key,
129 api_secret,
130 base_url: self.base_url,
131 timeout: self.timeout,
132 client: Arc::new(client),
133 })
134 }
135}
136
137#[derive(Clone)]
139pub struct CloudClient {
140 pub(crate) api_key: String,
141 pub(crate) api_secret: String,
142 pub(crate) base_url: String,
143 #[allow(dead_code)]
144 pub(crate) timeout: std::time::Duration,
145 pub(crate) client: Arc<Client>,
146}
147
148impl CloudClient {
149 pub fn builder() -> CloudClientBuilder {
151 CloudClientBuilder::new()
152 }
153
154 fn normalize_url(&self, path: &str) -> String {
156 let base = self.base_url.trim_end_matches('/');
157 let path = path.trim_start_matches('/');
158 format!("{}/{}", base, path)
159 }
160
161 #[instrument(skip(self), fields(method = "GET"))]
163 pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T> {
164 let url = self.normalize_url(path);
165 debug!("GET {}", url);
166
167 let response = self
169 .client
170 .get(&url)
171 .header("x-api-key", &self.api_key)
172 .header("x-api-secret-key", &self.api_secret)
173 .send()
174 .await?;
175
176 trace!("Response status: {}", response.status());
177 self.handle_response(response).await
178 }
179
180 #[instrument(skip(self, body), fields(method = "POST"))]
182 pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
183 &self,
184 path: &str,
185 body: &B,
186 ) -> Result<T> {
187 let url = self.normalize_url(path);
188 debug!("POST {}", url);
189 trace!("Request body: {:?}", serde_json::to_value(body).ok());
190
191 let response = self
193 .client
194 .post(&url)
195 .header("x-api-key", &self.api_key)
196 .header("x-api-secret-key", &self.api_secret)
197 .json(body)
198 .send()
199 .await?;
200
201 trace!("Response status: {}", response.status());
202 self.handle_response(response).await
203 }
204
205 #[instrument(skip(self, body), fields(method = "PUT"))]
207 pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
208 &self,
209 path: &str,
210 body: &B,
211 ) -> Result<T> {
212 let url = self.normalize_url(path);
213 debug!("PUT {}", url);
214 trace!("Request body: {:?}", serde_json::to_value(body).ok());
215
216 let response = self
218 .client
219 .put(&url)
220 .header("x-api-key", &self.api_key)
221 .header("x-api-secret-key", &self.api_secret)
222 .json(body)
223 .send()
224 .await?;
225
226 trace!("Response status: {}", response.status());
227 self.handle_response(response).await
228 }
229
230 #[instrument(skip(self), fields(method = "DELETE"))]
232 pub async fn delete(&self, path: &str) -> Result<()> {
233 let url = self.normalize_url(path);
234 debug!("DELETE {}", url);
235
236 let response = self
238 .client
239 .delete(&url)
240 .header("x-api-key", &self.api_key)
241 .header("x-api-secret-key", &self.api_secret)
242 .send()
243 .await?;
244
245 trace!("Response status: {}", response.status());
246 if response.status().is_success() {
247 Ok(())
248 } else {
249 let status = response.status();
250 let text = response.text().await.unwrap_or_default();
251
252 match status.as_u16() {
253 400 => Err(RestError::BadRequest { message: text }),
254 401 => Err(RestError::AuthenticationFailed { message: text }),
255 403 => Err(RestError::Forbidden { message: text }),
256 404 => Err(RestError::NotFound { message: text }),
257 412 => Err(RestError::PreconditionFailed),
258 500 => Err(RestError::InternalServerError { message: text }),
259 503 => Err(RestError::ServiceUnavailable { message: text }),
260 _ => Err(RestError::ApiError {
261 code: status.as_u16(),
262 message: text,
263 }),
264 }
265 }
266 }
267
268 #[instrument(skip(self), fields(method = "GET"))]
270 pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
271 self.get(path).await
272 }
273
274 #[instrument(skip(self), fields(method = "GET"))]
278 pub async fn get_bytes(&self, path: &str) -> Result<Vec<u8>> {
279 let url = self.normalize_url(path);
280 debug!("GET {} (bytes)", url);
281
282 let response = self
283 .client
284 .get(&url)
285 .header("x-api-key", &self.api_key)
286 .header("x-api-secret-key", &self.api_secret)
287 .send()
288 .await?;
289
290 trace!("Response status: {}", response.status());
291 let status = response.status();
292
293 if status.is_success() {
294 response
295 .bytes()
296 .await
297 .map(|b| b.to_vec())
298 .map_err(|e| RestError::ConnectionError(format!("Failed to read response: {}", e)))
299 } else {
300 let text = response.text().await.unwrap_or_default();
301
302 match status.as_u16() {
303 400 => Err(RestError::BadRequest { message: text }),
304 401 => Err(RestError::AuthenticationFailed { message: text }),
305 403 => Err(RestError::Forbidden { message: text }),
306 404 => Err(RestError::NotFound { message: text }),
307 412 => Err(RestError::PreconditionFailed),
308 500 => Err(RestError::InternalServerError { message: text }),
309 503 => Err(RestError::ServiceUnavailable { message: text }),
310 _ => Err(RestError::ApiError {
311 code: status.as_u16(),
312 message: text,
313 }),
314 }
315 }
316 }
317
318 #[instrument(skip(self, body), fields(method = "POST"))]
320 pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
321 self.post(path, &body).await
322 }
323
324 #[instrument(skip(self, body), fields(method = "PUT"))]
326 pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
327 self.put(path, &body).await
328 }
329
330 #[instrument(skip(self, body), fields(method = "PATCH"))]
332 pub async fn patch_raw(
333 &self,
334 path: &str,
335 body: serde_json::Value,
336 ) -> Result<serde_json::Value> {
337 let url = self.normalize_url(path);
338 debug!("PATCH {}", url);
339 trace!("Request body: {:?}", body);
340
341 let response = self
343 .client
344 .patch(&url)
345 .header("x-api-key", &self.api_key)
346 .header("x-api-secret-key", &self.api_secret)
347 .json(&body)
348 .send()
349 .await?;
350
351 trace!("Response status: {}", response.status());
352 self.handle_response(response).await
353 }
354
355 #[instrument(skip(self), fields(method = "DELETE"))]
357 pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
358 let url = self.normalize_url(path);
359 debug!("DELETE {}", url);
360
361 let response = self
363 .client
364 .delete(&url)
365 .header("x-api-key", &self.api_key)
366 .header("x-api-secret-key", &self.api_secret)
367 .send()
368 .await?;
369
370 trace!("Response status: {}", response.status());
371 if response.status().is_success() {
372 if response.content_length() == Some(0) {
373 Ok(serde_json::json!({"status": "deleted"}))
374 } else {
375 response.json().await.map_err(Into::into)
376 }
377 } else {
378 let status = response.status();
379 let text = response.text().await.unwrap_or_default();
380
381 match status.as_u16() {
382 400 => Err(RestError::BadRequest { message: text }),
383 401 => Err(RestError::AuthenticationFailed { message: text }),
384 403 => Err(RestError::Forbidden { message: text }),
385 404 => Err(RestError::NotFound { message: text }),
386 412 => Err(RestError::PreconditionFailed),
387 500 => Err(RestError::InternalServerError { message: text }),
388 503 => Err(RestError::ServiceUnavailable { message: text }),
389 _ => Err(RestError::ApiError {
390 code: status.as_u16(),
391 message: text,
392 }),
393 }
394 }
395 }
396
397 #[instrument(skip(self, body), fields(method = "DELETE"))]
399 pub async fn delete_with_body<T: serde::de::DeserializeOwned>(
400 &self,
401 path: &str,
402 body: serde_json::Value,
403 ) -> Result<T> {
404 let url = self.normalize_url(path);
405 debug!("DELETE {} (with body)", url);
406 trace!("Request body: {:?}", body);
407
408 let response = self
409 .client
410 .delete(&url)
411 .header("x-api-key", &self.api_key)
412 .header("x-api-secret-key", &self.api_secret)
413 .json(&body)
414 .send()
415 .await?;
416
417 trace!("Response status: {}", response.status());
418 self.handle_response(response).await
419 }
420
421 async fn handle_response<T: serde::de::DeserializeOwned>(
423 &self,
424 response: reqwest::Response,
425 ) -> Result<T> {
426 let status = response.status();
427
428 if status.is_success() {
429 let bytes = response.bytes().await.map_err(|e| {
431 RestError::ConnectionError(format!("Failed to read response: {}", e))
432 })?;
433
434 let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
436 serde_path_to_error::deserialize(deserializer).map_err(|err| {
437 let path = err.path().to_string();
438 RestError::ConnectionError(format!(
440 "Failed to deserialize field '{}': {}",
441 path,
442 err.inner()
443 ))
444 })
445 } else {
446 let text = response.text().await.unwrap_or_default();
447
448 match status.as_u16() {
449 400 => Err(RestError::BadRequest { message: text }),
450 401 => Err(RestError::AuthenticationFailed { message: text }),
451 403 => Err(RestError::Forbidden { message: text }),
452 404 => Err(RestError::NotFound { message: text }),
453 412 => Err(RestError::PreconditionFailed),
454 500 => Err(RestError::InternalServerError { message: text }),
455 503 => Err(RestError::ServiceUnavailable { message: text }),
456 _ => Err(RestError::ApiError {
457 code: status.as_u16(),
458 message: text,
459 }),
460 }
461 }
462 }
463}
464
465#[cfg(feature = "tower-integration")]
497pub mod tower_support {
498 use super::*;
499 use std::future::Future;
500 use std::pin::Pin;
501 use std::task::{Context, Poll};
502 use tower::Service;
503
504 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
506 pub enum Method {
507 Get,
509 Post,
511 Put,
513 Patch,
515 Delete,
517 }
518
519 #[derive(Debug, Clone)]
524 pub struct ApiRequest {
525 pub method: Method,
527 pub path: String,
529 pub body: Option<serde_json::Value>,
531 }
532
533 impl ApiRequest {
534 pub fn get(path: impl Into<String>) -> Self {
536 Self {
537 method: Method::Get,
538 path: path.into(),
539 body: None,
540 }
541 }
542
543 pub fn post(path: impl Into<String>, body: serde_json::Value) -> Self {
545 Self {
546 method: Method::Post,
547 path: path.into(),
548 body: Some(body),
549 }
550 }
551
552 pub fn put(path: impl Into<String>, body: serde_json::Value) -> Self {
554 Self {
555 method: Method::Put,
556 path: path.into(),
557 body: Some(body),
558 }
559 }
560
561 pub fn patch(path: impl Into<String>, body: serde_json::Value) -> Self {
563 Self {
564 method: Method::Patch,
565 path: path.into(),
566 body: Some(body),
567 }
568 }
569
570 pub fn delete(path: impl Into<String>) -> Self {
572 Self {
573 method: Method::Delete,
574 path: path.into(),
575 body: None,
576 }
577 }
578 }
579
580 #[derive(Debug, Clone)]
584 pub struct ApiResponse {
585 pub status: u16,
587 pub body: serde_json::Value,
589 }
590
591 impl CloudClient {
592 pub fn into_service(self) -> Self {
616 self
617 }
618 }
619
620 impl Service<ApiRequest> for CloudClient {
621 type Response = ApiResponse;
622 type Error = RestError;
623 type Future = Pin<Box<dyn Future<Output = Result<Self::Response>> + Send>>;
624
625 fn poll_ready(
626 &mut self,
627 _cx: &mut Context<'_>,
628 ) -> Poll<std::result::Result<(), Self::Error>> {
629 Poll::Ready(Ok(()))
631 }
632
633 fn call(&mut self, req: ApiRequest) -> Self::Future {
634 let client = self.clone();
635 Box::pin(async move {
636 let response: serde_json::Value = match req.method {
637 Method::Get => client.get_raw(&req.path).await?,
638 Method::Post => {
639 let body = req.body.ok_or_else(|| RestError::BadRequest {
640 message: "POST request requires a body".to_string(),
641 })?;
642 client.post_raw(&req.path, body).await?
643 }
644 Method::Put => {
645 let body = req.body.ok_or_else(|| RestError::BadRequest {
646 message: "PUT request requires a body".to_string(),
647 })?;
648 client.put_raw(&req.path, body).await?
649 }
650 Method::Patch => {
651 let body = req.body.ok_or_else(|| RestError::BadRequest {
652 message: "PATCH request requires a body".to_string(),
653 })?;
654 client.patch_raw(&req.path, body).await?
655 }
656 Method::Delete => client.delete_raw(&req.path).await?,
657 };
658
659 Ok(ApiResponse {
660 status: 200,
661 body: response,
662 })
663 })
664 }
665 }
666}