redis_enterprise/
client.rs

1//! REST API client implementation
2
3use crate::error::{RestError, Result};
4use reqwest::{Client, Response};
5use serde::{Serialize, de::DeserializeOwned};
6use std::sync::Arc;
7use std::time::Duration;
8use tracing::{debug, trace};
9
10// Legacy alias for backwards compatibility during migration
11pub type RestConfig = EnterpriseClientBuilder;
12
13/// Builder for EnterpriseClient
14#[derive(Debug, Clone)]
15pub struct EnterpriseClientBuilder {
16    base_url: String,
17    username: Option<String>,
18    password: Option<String>,
19    timeout: Duration,
20    insecure: bool,
21}
22
23impl Default for EnterpriseClientBuilder {
24    fn default() -> Self {
25        Self {
26            base_url: "https://localhost:9443".to_string(),
27            username: None,
28            password: None,
29            timeout: Duration::from_secs(30),
30            insecure: false,
31        }
32    }
33}
34
35impl EnterpriseClientBuilder {
36    /// Create a new builder
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Set the base URL
42    pub fn base_url(mut self, url: impl Into<String>) -> Self {
43        self.base_url = url.into();
44        self
45    }
46
47    /// Set the username
48    pub fn username(mut self, username: impl Into<String>) -> Self {
49        self.username = Some(username.into());
50        self
51    }
52
53    /// Set the password
54    pub fn password(mut self, password: impl Into<String>) -> Self {
55        self.password = Some(password.into());
56        self
57    }
58
59    /// Set the timeout
60    pub fn timeout(mut self, timeout: Duration) -> Self {
61        self.timeout = timeout;
62        self
63    }
64
65    /// Allow insecure TLS connections (self-signed certificates)
66    pub fn insecure(mut self, insecure: bool) -> Self {
67        self.insecure = insecure;
68        self
69    }
70
71    /// Build the client
72    pub fn build(self) -> Result<EnterpriseClient> {
73        let username = self.username.unwrap_or_default();
74        let password = self.password.unwrap_or_default();
75
76        let client_builder = Client::builder()
77            .timeout(self.timeout)
78            .danger_accept_invalid_certs(self.insecure);
79
80        let client = client_builder
81            .build()
82            .map_err(|e| RestError::ConnectionError(e.to_string()))?;
83
84        Ok(EnterpriseClient {
85            base_url: self.base_url,
86            username,
87            password,
88            timeout: self.timeout,
89            client: Arc::new(client),
90        })
91    }
92}
93
94/// REST API client for Redis Enterprise
95#[derive(Clone)]
96pub struct EnterpriseClient {
97    base_url: String,
98    username: String,
99    password: String,
100    timeout: Duration,
101    client: Arc<Client>,
102}
103
104// Alias for backwards compatibility
105pub type RestClient = EnterpriseClient;
106
107impl EnterpriseClient {
108    /// Create a new builder for the client
109    pub fn builder() -> EnterpriseClientBuilder {
110        EnterpriseClientBuilder::new()
111    }
112
113    /// Normalize URL path concatenation to avoid double slashes
114    fn normalize_url(&self, path: &str) -> String {
115        let base = self.base_url.trim_end_matches('/');
116        let path = path.trim_start_matches('/');
117        format!("{}/{}", base, path)
118    }
119
120    /// Create a client from environment variables
121    ///
122    /// Reads configuration from:
123    /// - `REDIS_ENTERPRISE_URL`: Base URL for the cluster (default: "https://localhost:9443")
124    /// - `REDIS_ENTERPRISE_USER`: Username for authentication (default: "admin@redis.local")
125    /// - `REDIS_ENTERPRISE_PASSWORD`: Password for authentication (required)
126    /// - `REDIS_ENTERPRISE_INSECURE`: Set to "true" to skip SSL verification (default: "false")
127    pub fn from_env() -> Result<Self> {
128        use std::env;
129
130        let base_url = env::var("REDIS_ENTERPRISE_URL")
131            .unwrap_or_else(|_| "https://localhost:9443".to_string());
132        let username =
133            env::var("REDIS_ENTERPRISE_USER").unwrap_or_else(|_| "admin@redis.local".to_string());
134        let password =
135            env::var("REDIS_ENTERPRISE_PASSWORD").map_err(|_| RestError::AuthenticationFailed)?;
136        let insecure = env::var("REDIS_ENTERPRISE_INSECURE")
137            .unwrap_or_else(|_| "false".to_string())
138            .parse::<bool>()
139            .unwrap_or(false);
140
141        Self::builder()
142            .base_url(base_url)
143            .username(username)
144            .password(password)
145            .insecure(insecure)
146            .build()
147    }
148
149    /// Make a GET request
150    pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
151        let url = self.normalize_url(path);
152        debug!("GET {}", url);
153
154        let response = self
155            .client
156            .get(&url)
157            .basic_auth(&self.username, Some(&self.password))
158            .send()
159            .await
160            .map_err(|e| self.map_reqwest_error(e, &url))?;
161
162        trace!("Response status: {}", response.status());
163        self.handle_response(response).await
164    }
165
166    /// Make a GET request for text content
167    pub async fn get_text(&self, path: &str) -> Result<String> {
168        let url = self.normalize_url(path);
169        debug!("GET {} (text)", url);
170
171        let response = self
172            .client
173            .get(&url)
174            .basic_auth(&self.username, Some(&self.password))
175            .send()
176            .await
177            .map_err(|e| self.map_reqwest_error(e, &url))?;
178
179        trace!("Response status: {}", response.status());
180
181        if response.status().is_success() {
182            let text = response
183                .text()
184                .await
185                .map_err(crate::error::RestError::RequestFailed)?;
186            Ok(text)
187        } else {
188            let status = response.status();
189            let error_text = response
190                .text()
191                .await
192                .unwrap_or_else(|_| "Unknown error".to_string());
193            Err(crate::error::RestError::ApiError {
194                code: status.as_u16(),
195                message: error_text,
196            })
197        }
198    }
199
200    /// Make a POST request
201    pub async fn post<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
202        let url = self.normalize_url(path);
203        debug!("POST {}", url);
204        trace!("Request body: {:?}", serde_json::to_value(body).ok());
205
206        let response = self
207            .client
208            .post(&url)
209            .basic_auth(&self.username, Some(&self.password))
210            .json(body)
211            .send()
212            .await
213            .map_err(|e| self.map_reqwest_error(e, &url))?;
214
215        trace!("Response status: {}", response.status());
216        self.handle_response(response).await
217    }
218
219    /// Make a PUT request
220    pub async fn put<B: Serialize, T: DeserializeOwned>(&self, path: &str, body: &B) -> Result<T> {
221        let url = self.normalize_url(path);
222        debug!("PUT {}", url);
223        trace!("Request body: {:?}", serde_json::to_value(body).ok());
224
225        let response = self
226            .client
227            .put(&url)
228            .basic_auth(&self.username, Some(&self.password))
229            .json(body)
230            .send()
231            .await
232            .map_err(|e| self.map_reqwest_error(e, &url))?;
233
234        trace!("Response status: {}", response.status());
235        self.handle_response(response).await
236    }
237
238    /// Make a DELETE request
239    pub async fn delete(&self, path: &str) -> Result<()> {
240        let url = self.normalize_url(path);
241        debug!("DELETE {}", url);
242
243        let response = self
244            .client
245            .delete(&url)
246            .basic_auth(&self.username, Some(&self.password))
247            .send()
248            .await
249            .map_err(|e| self.map_reqwest_error(e, &url))?;
250
251        trace!("Response status: {}", response.status());
252        if response.status().is_success() {
253            Ok(())
254        } else {
255            let status = response.status();
256            let text = response.text().await.unwrap_or_default();
257            Err(RestError::ApiError {
258                code: status.as_u16(),
259                message: text,
260            })
261        }
262    }
263
264    /// Execute raw GET request returning JSON Value
265    pub async fn get_raw(&self, path: &str) -> Result<serde_json::Value> {
266        self.get(path).await
267    }
268
269    /// Execute raw POST request with JSON body
270    pub async fn post_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
271        self.post(path, &body).await
272    }
273
274    /// Execute raw PUT request with JSON body
275    pub async fn put_raw(&self, path: &str, body: serde_json::Value) -> Result<serde_json::Value> {
276        self.put(path, &body).await
277    }
278
279    /// POST request for actions that return no content
280    pub async fn post_action<B: Serialize>(&self, path: &str, body: &B) -> Result<()> {
281        let url = self.normalize_url(path);
282        debug!("POST {}", url);
283        trace!("Request body: {:?}", serde_json::to_value(body).ok());
284
285        let response = self
286            .client
287            .post(&url)
288            .basic_auth(&self.username, Some(&self.password))
289            .json(body)
290            .send()
291            .await
292            .map_err(|e| self.map_reqwest_error(e, &url))?;
293
294        trace!("Response status: {}", response.status());
295        if response.status().is_success() {
296            Ok(())
297        } else {
298            let status = response.status();
299            let text = response.text().await.unwrap_or_default();
300            Err(RestError::ApiError {
301                code: status.as_u16(),
302                message: text,
303            })
304        }
305    }
306
307    /// POST request with multipart/form-data for file uploads
308    pub async fn post_multipart<T: DeserializeOwned>(
309        &self,
310        path: &str,
311        file_data: Vec<u8>,
312        field_name: &str,
313        file_name: &str,
314    ) -> Result<T> {
315        let url = self.normalize_url(path);
316        debug!("POST {} (multipart)", url);
317
318        let part = reqwest::multipart::Part::bytes(file_data).file_name(file_name.to_string());
319
320        let form = reqwest::multipart::Form::new().part(field_name.to_string(), part);
321
322        let response = self
323            .client
324            .post(&url)
325            .basic_auth(&self.username, Some(&self.password))
326            .multipart(form)
327            .send()
328            .await
329            .map_err(|e| self.map_reqwest_error(e, &url))?;
330
331        trace!("Response status: {}", response.status());
332        self.handle_response(response).await
333    }
334
335    /// Get a reference to self for handler construction
336    pub fn rest_client(&self) -> Self {
337        self.clone()
338    }
339
340    /// POST request for bootstrap - handles empty response
341    pub async fn post_bootstrap<B: Serialize>(
342        &self,
343        path: &str,
344        body: &B,
345    ) -> Result<serde_json::Value> {
346        let url = self.normalize_url(path);
347
348        let response = self
349            .client
350            .post(&url)
351            .basic_auth(&self.username, Some(&self.password))
352            .json(body)
353            .send()
354            .await
355            .map_err(|e| self.map_reqwest_error(e, &url))?;
356
357        let status = response.status();
358        if status.is_success() {
359            // Try to parse JSON, but if empty/invalid, return success
360            let text = response.text().await.unwrap_or_default();
361            if text.is_empty() || text.trim().is_empty() {
362                Ok(serde_json::json!({"status": "success"}))
363            } else {
364                Ok(serde_json::from_str(&text)
365                    .unwrap_or_else(|_| serde_json::json!({"status": "success", "response": text})))
366            }
367        } else {
368            let text = response.text().await.unwrap_or_default();
369            Err(RestError::ApiError {
370                code: status.as_u16(),
371                message: text,
372            })
373        }
374    }
375
376    /// Execute raw PATCH request with JSON body
377    pub async fn patch_raw(
378        &self,
379        path: &str,
380        body: serde_json::Value,
381    ) -> Result<serde_json::Value> {
382        let url = self.normalize_url(path);
383        let response = self
384            .client
385            .patch(&url)
386            .basic_auth(&self.username, Some(&self.password))
387            .json(&body)
388            .send()
389            .await
390            .map_err(|e| self.map_reqwest_error(e, &url))?;
391
392        if response.status().is_success() {
393            response
394                .json()
395                .await
396                .map_err(|e| RestError::ParseError(e.to_string()))
397        } else {
398            let status = response.status();
399            let text = response.text().await.unwrap_or_default();
400            Err(RestError::ApiError {
401                code: status.as_u16(),
402                message: text,
403            })
404        }
405    }
406
407    /// Execute raw DELETE request returning any response body
408    pub async fn delete_raw(&self, path: &str) -> Result<serde_json::Value> {
409        let url = self.normalize_url(path);
410        let response = self
411            .client
412            .delete(&url)
413            .basic_auth(&self.username, Some(&self.password))
414            .send()
415            .await
416            .map_err(|e| self.map_reqwest_error(e, &url))?;
417
418        if response.status().is_success() {
419            if response.content_length() == Some(0) {
420                Ok(serde_json::json!({"status": "deleted"}))
421            } else {
422                response
423                    .json()
424                    .await
425                    .map_err(|e| RestError::ParseError(e.to_string()))
426            }
427        } else {
428            let status = response.status();
429            let text = response.text().await.unwrap_or_default();
430            Err(RestError::ApiError {
431                code: status.as_u16(),
432                message: text,
433            })
434        }
435    }
436
437    /// Map reqwest errors to more specific error messages
438    fn map_reqwest_error(&self, error: reqwest::Error, url: &str) -> RestError {
439        if error.is_connect() {
440            RestError::ConnectionError(format!(
441                "Failed to connect to {}: Connection refused or host unreachable. Check if the Redis Enterprise server is running and accessible.",
442                url
443            ))
444        } else if error.is_timeout() {
445            RestError::ConnectionError(format!(
446                "Request to {} timed out after {:?}. Check network connectivity or increase timeout.",
447                url, self.timeout
448            ))
449        } else if error.is_decode() {
450            RestError::ConnectionError(format!(
451                "Failed to decode JSON response from {}: {}. Server may have returned invalid JSON or HTML error page.",
452                url, error
453            ))
454        } else if let Some(status) = error.status() {
455            RestError::ApiError {
456                code: status.as_u16(),
457                message: format!("HTTP {} from {}: {}", status.as_u16(), url, error),
458            }
459        } else if error.is_request() {
460            RestError::ConnectionError(format!(
461                "Request to {} failed: {}. Check URL format and network settings.",
462                url, error
463            ))
464        } else {
465            RestError::RequestFailed(error)
466        }
467    }
468
469    /// Handle HTTP response
470    async fn handle_response<T: DeserializeOwned>(&self, response: Response) -> Result<T> {
471        if response.status().is_success() {
472            response.json::<T>().await.map_err(Into::into)
473        } else {
474            let status = response.status();
475            let text = response.text().await.unwrap_or_default();
476
477            match status.as_u16() {
478                401 => Err(RestError::Unauthorized),
479                404 => Err(RestError::NotFound),
480                500..=599 => Err(RestError::ServerError(text)),
481                _ => Err(RestError::ApiError {
482                    code: status.as_u16(),
483                    message: text,
484                }),
485            }
486        }
487    }
488
489    /// Execute a Redis command on a specific database (internal use only)
490    /// This uses the /v1/bdbs/{uid}/command endpoint which may not be publicly documented
491    pub async fn execute_command(&self, db_uid: u32, command: &str) -> Result<serde_json::Value> {
492        let url = self.normalize_url(&format!("/v1/bdbs/{}/command", db_uid));
493        let body = serde_json::json!({
494            "command": command
495        });
496
497        debug!("Executing command on database {}: {}", db_uid, command);
498
499        let response = self
500            .client
501            .post(&url)
502            .basic_auth(&self.username, Some(&self.password))
503            .json(&body)
504            .send()
505            .await
506            .map_err(|e| self.map_reqwest_error(e, &url))?;
507
508        self.handle_response(response).await
509    }
510}