redis_enterprise/
client.rs

1//! REST API client implementation
2
3use crate::error::{RestError, Result};
4use reqwest::{Client, Response};
5use serde::{Serialize, de::DeserializeOwned};
6use std::sync::Arc;
7use std::time::Duration;
8use tracing::{debug, trace};
9
10// Legacy alias for backwards compatibility during migration
11pub type RestConfig = EnterpriseClientBuilder;
12
13/// Builder for EnterpriseClient
14#[derive(Debug, Clone)]
15pub struct EnterpriseClientBuilder {
16    base_url: String,
17    username: Option<String>,
18    password: Option<String>,
19    timeout: Duration,
20    insecure: bool,
21}
22
23impl Default for EnterpriseClientBuilder {
24    fn default() -> Self {
25        Self {
26            base_url: "https://localhost:9443".to_string(),
27            username: None,
28            password: None,
29            timeout: Duration::from_secs(30),
30            insecure: false,
31        }
32    }
33}
34
35impl EnterpriseClientBuilder {
36    /// Create a new builder
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Set the base URL
42    pub fn base_url(mut self, url: impl Into<String>) -> Self {
43        self.base_url = url.into();
44        self
45    }
46
47    /// Set the username
48    pub fn username(mut self, username: impl Into<String>) -> Self {
49        self.username = Some(username.into());
50        self
51    }
52
53    /// Set the password
54    pub fn password(mut self, password: impl Into<String>) -> Self {
55        self.password = Some(password.into());
56        self
57    }
58
59    /// Set the timeout
60    pub fn timeout(mut self, timeout: Duration) -> Self {
61        self.timeout = timeout;
62        self
63    }
64
65    /// Allow insecure TLS connections (self-signed certificates)
66    pub fn insecure(mut self, insecure: bool) -> Self {
67        self.insecure = insecure;
68        self
69    }
70
71    /// Build the client
72    pub fn build(self) -> Result<EnterpriseClient> {
73        let username = self.username.unwrap_or_default();
74        let password = self.password.unwrap_or_default();
75
76        let client_builder = Client::builder()
77            .timeout(self.timeout)
78            .danger_accept_invalid_certs(self.insecure);
79
80        let client = client_builder
81            .build()
82            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
83
84        Ok(EnterpriseClient {
85            base_url: self.base_url,
86            username,
87            password,
88            timeout: self.timeout,
89            client: Arc::new(client),
90        })
91    }
92}
93
94/// REST API client for Redis Enterprise
95#[derive(Clone)]
96pub struct EnterpriseClient {
97    base_url: String,
98    username: String,
99    password: String,
100    timeout: Duration,
101    client: Arc<Client>,
102}
103
104// Alias for backwards compatibility
105pub type RestClient = EnterpriseClient;
106
107impl EnterpriseClient {
108    /// Create a new builder for the client
109    pub fn builder() -> EnterpriseClientBuilder {
110        EnterpriseClientBuilder::new()
111    }
112
113    /// Get a reference to the underlying client (for use with handlers)
114    pub fn client(&self) -> Arc<Client> {
115        self.client.clone()
116    }
117
118    /// Normalize URL path concatenation to avoid double slashes
119    fn normalize_url(&self, path: &str) -> String {
120        let base = self.base_url.trim_end_matches('/');
121        let path = path.trim_start_matches('/');
122        format!("{}/{}", base, path)
123    }
124
125    /// Create a client from environment variables
126    ///
127    /// Reads configuration from:
128    /// - `REDIS_ENTERPRISE_URL`: Base URL for the cluster (default: "https://localhost:9443")
129    /// - `REDIS_ENTERPRISE_USER`: Username for authentication (default: "admin@redis.local")
130    /// - `REDIS_ENTERPRISE_PASSWORD`: Password for authentication (required)
131    /// - `REDIS_ENTERPRISE_INSECURE`: Set to "true" to skip SSL verification (default: "false")
132    pub fn from_env() -> Result<Self> {
133        use std::env;
134
135        let base_url = env::var("REDIS_ENTERPRISE_URL")
136            .unwrap_or_else(|_| "https://localhost:9443".to_string());
137        let username =
138            env::var("REDIS_ENTERPRISE_USER").unwrap_or_else(|_| "admin@redis.local".to_string());
139        let password =
140            env::var("REDIS_ENTERPRISE_PASSWORD").map_err(|_| RestError::AuthenticationFailed)?;
141        let insecure = env::var("REDIS_ENTERPRISE_INSECURE")
142            .unwrap_or_else(|_| "false".to_string())
143            .parse::<bool>()
144            .unwrap_or(false);
145
146        Self::builder()
147            .base_url(base_url)
148            .username(username)
149            .password(password)
150            .insecure(insecure)
151            .build()
152    }
153
154    /// Make a GET request
155    pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
156        let url = self.normalize_url(path);
157        debug!("GET {}", url);
158
159        let response = self
160            .client
161            .get(&url)
162            .basic_auth(&self.username, Some(&self.password))
163            .send()
164            .await
165            .map_err(|e| self.map_reqwest_error(e, &url))?;
166
167        trace!("Response status: {}", response.status());
168        self.handle_response(response).await
169    }
170
171    /// Make a GET request for text content
172    pub async fn get_text(&self, path: &str) -> Result<String> {
173        let url = self.normalize_url(path);
174        debug!("GET {} (text)", url);
175
176        let response = self
177            .client
178            .get(&url)
179            .basic_auth(&self.username, Some(&self.password))
180            .send()
181            .await
182            .map_err(|e| self.map_reqwest_error(e, &url))?;
183
184        trace!("Response status: {}", response.status());
185
186        if response.status().is_success() {
187            let text = response
188                .text()
189                .await
190                .map_err(crate::error::RestError::RequestFailed)?;
191            Ok(text)
192        } else {
193            let status = response.status();
194            let error_text = response
195                .text()
196                .await
197                .unwrap_or_else(|_| "Unknown error".to_string());
198            Err(crate::error::RestError::ApiError {
199                code: status.as_u16(),
200                message: error_text,
201            })
202        }
203    }
204
205    /// Make a GET request for binary content (e.g., tar.gz files)
206    pub async fn get_binary(&self, path: &str) -> Result<Vec<u8>> {
207        let url = self.normalize_url(path);
208        debug!("GET {} (binary)", url);
209
210        let response = self
211            .client
212            .get(&url)
213            .basic_auth(&self.username, Some(&self.password))
214            .send()
215            .await
216            .map_err(|e| self.map_reqwest_error(e, &url))?;
217
218        trace!("Response status: {}", response.status());
219        trace!(
220            "Response content-type: {:?}",
221            response.headers().get("content-type")
222        );
223
224        if response.status().is_success() {
225            let bytes = response
226                .bytes()
227                .await
228                .map_err(crate::error::RestError::RequestFailed)?;
229            Ok(bytes.to_vec())
230        } else {
231            let status = response.status();
232            let error_text = response
233                .text()
234                .await
235                .unwrap_or_else(|_| "Unknown error".to_string());
236            Err(crate::error::RestError::ApiError {
237                code: status.as_u16(),
238                message: error_text,
239            })
240        }
241    }
242
243    /// Make a POST request
244    pub async fn post<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
245        let url = self.normalize_url(path);
246        debug!("POST {}", url);
247        trace!("Request body: {:?}", serde_json::to_value(body).ok());
248
249        let response = self
250            .client
251            .post(&url)
252            .basic_auth(&self.username, Some(&self.password))
253            .json(body)
254            .send()
255            .await
256            .map_err(|e| self.map_reqwest_error(e, &url))?;
257
258        trace!("Response status: {}", response.status());
259        self.handle_response(response).await
260    }
261
262    /// Make a PUT request
263    pub async fn put<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
264        let url = self.normalize_url(path);
265        debug!("PUT {}", url);
266        trace!("Request body: {:?}", serde_json::to_value(body).ok());
267
268        let response = self
269            .client
270            .put(&url)
271            .basic_auth(&self.username, Some(&self.password))
272            .json(body)
273            .send()
274            .await
275            .map_err(|e| self.map_reqwest_error(e, &url))?;
276
277        trace!("Response status: {}", response.status());
278        self.handle_response(response).await
279    }
280
281    /// Make a DELETE request
282    pub async fn delete(&self, path: &str) -> Result<()> {
283        let url = self.normalize_url(path);
284        debug!("DELETE {}", url);
285
286        let response = self
287            .client
288            .delete(&url)
289            .basic_auth(&self.username, Some(&self.password))
290            .send()
291            .await
292            .map_err(|e| self.map_reqwest_error(e, &url))?;
293
294        trace!("Response status: {}", response.status());
295        if response.status().is_success() {
296            Ok(())
297        } else {
298            let status = response.status();
299            let text = response.text().await.unwrap_or_default();
300            Err(RestError::ApiError {
301                code: status.as_u16(),
302                message: text,
303            })
304        }
305    }
306
307    /// Execute raw GET request returning JSON Value
308    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
309        self.get(path).await
310    }
311
312    /// Execute raw POST request with JSON body
313    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
314        self.post(path, &body).await
315    }
316
317    /// Execute raw PUT request with JSON body
318    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
319        self.put(path, &body).await
320    }
321
322    /// POST request for actions that return no content
323    pub async fn post_action<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
324        let url = self.normalize_url(path);
325        debug!("POST {}", url);
326        trace!("Request body: {:?}", serde_json::to_value(body).ok());
327
328        let response = self
329            .client
330            .post(&url)
331            .basic_auth(&self.username, Some(&self.password))
332            .json(body)
333            .send()
334            .await
335            .map_err(|e| self.map_reqwest_error(e, &url))?;
336
337        trace!("Response status: {}", response.status());
338        if response.status().is_success() {
339            Ok(())
340        } else {
341            let status = response.status();
342            let text = response.text().await.unwrap_or_default();
343            Err(RestError::ApiError {
344                code: status.as_u16(),
345                message: text,
346            })
347        }
348    }
349
350    /// POST request with multipart/form-data for file uploads
351    pub async fn post_multipart<T: DeserializeOwned>(
352        &self,
353        path: &str,
354        file_data: Vec<u8>,
355        field_name: &str,
356        file_name: &str,
357    ) -> Result<T> {
358        let url = self.normalize_url(path);
359        debug!("POST {} (multipart)", url);
360
361        let part = reqwest::multipart::Part::bytes(file_data).file_name(file_name.to_string());
362
363        let form = reqwest::multipart::Form::new().part(field_name.to_string(), part);
364
365        let response = self
366            .client
367            .post(&url)
368            .basic_auth(&self.username, Some(&self.password))
369            .multipart(form)
370            .send()
371            .await
372            .map_err(|e| self.map_reqwest_error(e, &url))?;
373
374        trace!("Response status: {}", response.status());
375        self.handle_response(response).await
376    }
377
378    /// Get a reference to self for handler construction
379    pub fn rest_client(&self) -> Self {
380        self.clone()
381    }
382
383    /// POST request for bootstrap - handles empty response
384    pub async fn post_bootstrap<B: Serialize>(
385        &self,
386        path: &str,
387        body: &B,
388    ) -> Result<serde_json::Value> {
389        let url = self.normalize_url(path);
390
391        let response = self
392            .client
393            .post(&url)
394            .basic_auth(&self.username, Some(&self.password))
395            .json(body)
396            .send()
397            .await
398            .map_err(|e| self.map_reqwest_error(e, &url))?;
399
400        let status = response.status();
401        if status.is_success() {
402            // Try to parse JSON, but if empty/invalid, return success
403            let text = response.text().await.unwrap_or_default();
404            if text.is_empty() || text.trim().is_empty() {
405                Ok(serde_json::json!({"status": "success"}))
406            } else {
407                Ok(serde_json::from_str(&text)
408                    .unwrap_or_else(|_| serde_json::json!({"status": "success", "response": text})))
409            }
410        } else {
411            let text = response.text().await.unwrap_or_default();
412            Err(RestError::ApiError {
413                code: status.as_u16(),
414                message: text,
415            })
416        }
417    }
418
419    /// Execute raw PATCH request with JSON body
420    pub async fn patch_raw(
421        &self,
422        path: &str,
423        body: serde_json::Value,
424    ) -> Result<serde_json::Value> {
425        let url = self.normalize_url(path);
426        let response = self
427            .client
428            .patch(&url)
429            .basic_auth(&self.username, Some(&self.password))
430            .json(&body)
431            .send()
432            .await
433            .map_err(|e| self.map_reqwest_error(e, &url))?;
434
435        if response.status().is_success() {
436            response
437                .json()
438                .await
439                .map_err(|e| RestError::ParseError(e.to_string()))
440        } else {
441            let status = response.status();
442            let text = response.text().await.unwrap_or_default();
443            Err(RestError::ApiError {
444                code: status.as_u16(),
445                message: text,
446            })
447        }
448    }
449
450    /// Execute raw DELETE request returning any response body
451    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
452        let url = self.normalize_url(path);
453        let response = self
454            .client
455            .delete(&url)
456            .basic_auth(&self.username, Some(&self.password))
457            .send()
458            .await
459            .map_err(|e| self.map_reqwest_error(e, &url))?;
460
461        if response.status().is_success() {
462            if response.content_length() == Some(0) {
463                Ok(serde_json::json!({"status": "deleted"}))
464            } else {
465                response
466                    .json()
467                    .await
468                    .map_err(|e| RestError::ParseError(e.to_string()))
469            }
470        } else {
471            let status = response.status();
472            let text = response.text().await.unwrap_or_default();
473            Err(RestError::ApiError {
474                code: status.as_u16(),
475                message: text,
476            })
477        }
478    }
479
480    /// Map reqwest errors to more specific error messages
481    fn map_reqwest_error(&self, error: reqwest::Error, url: &str) -> RestError {
482        if error.is_connect() {
483            RestError::ConnectionError(format!(
484                "Failed to connect to {}: Connection refused or host unreachable. Check if the Redis Enterprise server is running and accessible.",
485                url
486            ))
487        } else if error.is_timeout() {
488            RestError::ConnectionError(format!(
489                "Request to {} timed out after {:?}. Check network connectivity or increase timeout.",
490                url, self.timeout
491            ))
492        } else if error.is_decode() {
493            RestError::ConnectionError(format!(
494                "Failed to decode JSON response from {}: {}. Server may have returned invalid JSON or HTML error page.",
495                url, error
496            ))
497        } else if let Some(status) = error.status() {
498            RestError::ApiError {
499                code: status.as_u16(),
500                message: format!("HTTP {} from {}: {}", status.as_u16(), url, error),
501            }
502        } else if error.is_request() {
503            RestError::ConnectionError(format!(
504                "Request to {} failed: {}. Check URL format and network settings.",
505                url, error
506            ))
507        } else {
508            RestError::RequestFailed(error)
509        }
510    }
511
512    /// Handle HTTP response
513    async fn handle_response<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
514        if response.status().is_success() {
515            // Get the response bytes for better error reporting
516            let bytes = response.bytes().await.map_err(Into::<RestError>::into)?;
517
518            // Use serde_path_to_error for better deserialization error messages
519            let deserializer = &mut serde_json::Deserializer::from_slice(&bytes);
520            serde_path_to_error::deserialize(deserializer).map_err(|err| {
521                let path = err.path().to_string();
522                RestError::ParseError(format!(
523                    "Failed to deserialize field '{}': {}",
524                    path,
525                    err.inner()
526                ))
527            })
528        } else {
529            let status = response.status();
530            let text = response.text().await.unwrap_or_default();
531
532            match status.as_u16() {
533                401 => Err(RestError::Unauthorized),
534                404 => Err(RestError::NotFound),
535                500..=599 => Err(RestError::ServerError(text)),
536                _ => Err(RestError::ApiError {
537                    code: status.as_u16(),
538                    message: text,
539                }),
540            }
541        }
542    }
543
544    /// Execute a Redis command on a specific database (internal use only)
545    /// This uses the /v1/bdbs/{uid}/command endpoint which may not be publicly documented
546    pub async fn execute_command(&self, db_uid: u32, command: &str) -> Result<serde_json::Value> {
547        let url = self.normalize_url(&format!("/v1/bdbs/{}/command", db_uid));
548        let body = serde_json::json!({
549            "command": command
550        });
551
552        debug!("Executing command on database {}: {}", db_uid, command);
553
554        let response = self
555            .client
556            .post(&url)
557            .basic_auth(&self.username, Some(&self.password))
558            .json(&body)
559            .send()
560            .await
561            .map_err(|e| self.map_reqwest_error(e, &url))?;
562
563        self.handle_response(response).await
564    }
565}