wazuh_client/
cluster.rs

1use reqwest::Method;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use tracing::{debug, info};
5
6use super::error::WazuhApiError;
7use super::wazuh_client::WazuhApiClient;
8
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClusterStatus {
11    pub enabled: String,
12    pub running: String,
13}
14
15#[derive(Debug, Clone, Deserialize, Serialize)]
16pub struct ClusterNode {
17    pub name: String,
18    pub node_type: String,
19    pub version: String,
20    pub ip: String,
21    pub status: String,
22}
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ManagerStatus {
26    pub wazuh_version: String,
27    pub openssl_version: String,
28    pub compilation_date: String,
29    pub version: String,
30}
31
32#[derive(Debug, Clone, Deserialize, Serialize)]
33pub struct ProcessStatus {
34    #[serde(rename = "wazuh-agentlessd")]
35    pub wazuh_agentlessd: String,
36    #[serde(rename = "wazuh-analysisd")]
37    pub wazuh_analysisd: String,
38    #[serde(rename = "wazuh-authd")]
39    pub wazuh_authd: String,
40    #[serde(rename = "wazuh-csyslogd")]
41    pub wazuh_csyslogd: String,
42    #[serde(rename = "wazuh-dbd")]
43    pub wazuh_dbd: String,
44    #[serde(rename = "wazuh-monitord")]
45    pub wazuh_monitord: String,
46    #[serde(rename = "wazuh-execd")]
47    pub wazuh_execd: String,
48    #[serde(rename = "wazuh-integratord")]
49    pub wazuh_integratord: String,
50    #[serde(rename = "wazuh-logcollector")]
51    pub wazuh_logcollector: String,
52    #[serde(rename = "wazuh-maild")]
53    pub wazuh_maild: String,
54    #[serde(rename = "wazuh-remoted")]
55    pub wazuh_remoted: String,
56    #[serde(rename = "wazuh-reportd")]
57    pub wazuh_reportd: String,
58    #[serde(rename = "wazuh-syscheckd")]
59    pub wazuh_syscheckd: String,
60    #[serde(rename = "wazuh-clusterd")]
61    pub wazuh_clusterd: String,
62    #[serde(rename = "wazuh-modulesd")]
63    pub wazuh_modulesd: String,
64    #[serde(rename = "wazuh-db")]
65    pub wazuh_db: String,
66    #[serde(rename = "wazuh-apid")]
67    pub wazuh_apid: String,
68}
69
70#[derive(Debug, Clone, Deserialize, Serialize)]
71pub struct ManagerInfo {
72    pub path: String,
73    pub version: String,
74    #[serde(rename = "type")]
75    pub node_type: String,
76    pub max_agents: String,
77    pub openssl_support: Option<String>,
78    pub tz_offset: Option<String>,
79    pub tz_name: Option<String>,
80    pub installation_date: Option<String>,
81    pub revision: Option<String>,
82    pub license_version: Option<String>,
83    pub license_path: Option<String>,
84    pub home_path: Option<String>,
85    pub share_path: Option<String>,
86    pub openssl_version: Option<String>,
87    pub node_name: Option<String>,
88    pub cluster_name: Option<String>,
89}
90
91#[derive(Debug, Clone, Deserialize, Serialize)]
92pub struct ClusterHealthcheck {
93    pub nodes: Vec<ClusterNodeHealth>,
94    pub n_connected_nodes: u32,
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98pub struct ClusterNodeHealth {
99    pub info: ClusterNodeInfo,
100    pub status: ClusterNodeStatus,
101}
102
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct ClusterNodeInfo {
105    pub name: String,
106    pub node_type: String,
107    pub version: String,
108    pub ip: String,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub struct ClusterNodeStatus {
113    pub last_keep_alive: String,
114    pub sync_integrity_free: bool,
115    pub sync_agent_info_free: bool,
116    pub sync_extravalid_free: bool,
117}
118
119#[derive(Debug, Clone)]
120pub struct ClusterClient {
121    api_client: WazuhApiClient,
122}
123
124impl ClusterClient {
125    pub fn new(api_client: WazuhApiClient) -> Self {
126        Self { api_client }
127    }
128
129    pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus, WazuhApiError> {
130        debug!("Getting cluster status");
131
132        let response = self
133            .api_client
134            .make_request(Method::GET, "/cluster/status", None, None)
135            .await?;
136
137        let status_data = response.get("data").ok_or_else(|| {
138            WazuhApiError::ApiError("Missing 'data' in cluster status response".to_string())
139        })?;
140
141        let status: ClusterStatus = serde_json::from_value(status_data.clone())?;
142        info!(
143            "Retrieved cluster status: enabled={}, running={}",
144            status.enabled, status.running
145        );
146        Ok(status)
147    }
148
149    pub async fn get_cluster_nodes(
150        &mut self,
151        limit: Option<u32>,
152        offset: Option<u32>,
153        node_type: Option<&str>,
154    ) -> Result<Vec<ClusterNode>, WazuhApiError> {
155        debug!(?node_type, "Getting cluster nodes");
156
157        let mut query_params = Vec::new();
158
159        if let Some(limit) = limit {
160            query_params.push(("limit", limit.to_string()));
161        }
162        if let Some(offset) = offset {
163            query_params.push(("offset", offset.to_string()));
164        }
165        if let Some(node_type) = node_type {
166            query_params.push(("type", node_type.to_string()));
167        }
168
169        let query_params_ref: Vec<(&str, &str)> =
170            query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
171
172        let response = self
173            .api_client
174            .make_request(
175                Method::GET,
176                "/cluster/nodes",
177                None,
178                if query_params_ref.is_empty() {
179                    None
180                } else {
181                    Some(&query_params_ref)
182                },
183            )
184            .await?;
185
186        let nodes_data = response
187            .get("data")
188            .and_then(|d| d.get("affected_items"))
189            .ok_or_else(|| {
190                WazuhApiError::ApiError(
191                    "Missing 'data.affected_items' in cluster nodes response".to_string(),
192                )
193            })?;
194
195        let nodes: Vec<ClusterNode> = serde_json::from_value(nodes_data.clone())?;
196        info!("Retrieved {} cluster nodes", nodes.len());
197        Ok(nodes)
198    }
199
200    pub async fn get_cluster_node(
201        &mut self,
202        node_name: &str,
203    ) -> Result<ClusterNode, WazuhApiError> {
204        debug!(%node_name, "Getting specific cluster node");
205
206        let endpoint = format!("/cluster/nodes/{}", node_name);
207        let response = self
208            .api_client
209            .make_request(Method::GET, &endpoint, None, None)
210            .await?;
211
212        let node_data = response
213            .get("data")
214            .and_then(|d| d.get("affected_items"))
215            .and_then(|items| items.as_array())
216            .and_then(|arr| arr.first())
217            .ok_or_else(|| {
218                WazuhApiError::ApiError(format!("Cluster node {} not found", node_name))
219            })?;
220
221        let node: ClusterNode = serde_json::from_value(node_data.clone())?;
222        info!(%node_name, "Retrieved cluster node details");
223        Ok(node)
224    }
225
226    pub async fn get_cluster_healthcheck(&mut self) -> Result<ClusterHealthcheck, WazuhApiError> {
227        debug!("Getting cluster healthcheck");
228
229        let response = self
230            .api_client
231            .make_request(Method::GET, "/cluster/healthcheck", None, None)
232            .await?;
233
234        let healthcheck_data = response
235            .get("data")
236            .and_then(|d| d.get("affected_items"))
237            .and_then(|items| items.as_array())
238            .and_then(|arr| arr.first())
239            .ok_or_else(|| {
240                WazuhApiError::ApiError("Missing cluster healthcheck data".to_string())
241            })?;
242
243        let healthcheck: ClusterHealthcheck = serde_json::from_value(healthcheck_data.clone())?;
244        info!(
245            "Retrieved cluster healthcheck: {} connected nodes",
246            healthcheck.n_connected_nodes
247        );
248        Ok(healthcheck)
249    }
250
251    pub async fn get_manager_process_status(&mut self) -> Result<ProcessStatus, WazuhApiError> {
252        debug!("Getting manager process status");
253
254        let response = self
255            .api_client
256            .make_request(Method::GET, "/manager/status", None, None)
257            .await?;
258
259        let status_data = response
260            .get("data")
261            .and_then(|d| d.get("affected_items"))
262            .and_then(|items| items.as_array())
263            .and_then(|arr| arr.first())
264            .ok_or_else(|| {
265                WazuhApiError::ApiError("Missing manager process status data".to_string())
266            })?;
267
268        let status: ProcessStatus = serde_json::from_value(status_data.clone())?;
269        info!("Retrieved manager process status");
270        Ok(status)
271    }
272
273    pub async fn get_manager_status(&mut self) -> Result<ManagerStatus, WazuhApiError> {
274        debug!("Getting manager status");
275
276        let response = self
277            .api_client
278            .make_request(Method::GET, "/manager/info", None, None)
279            .await?;
280
281        let status_data = response
282            .get("data")
283            .and_then(|d| d.get("affected_items"))
284            .and_then(|items| items.as_array())
285            .and_then(|arr| arr.first())
286            .ok_or_else(|| WazuhApiError::ApiError("Missing manager status data".to_string()))?;
287
288        let manager_info: ManagerInfo = serde_json::from_value(status_data.clone())?;
289
290        let status = ManagerStatus {
291            wazuh_version: manager_info.version.clone(),
292            // Ensure ManagerInfo.openssl_version is what's needed or adjust source
293            openssl_version: manager_info.openssl_version.unwrap_or_default(),
294            // Ensure ManagerInfo.installation_date is what's needed or adjust source
295            compilation_date: manager_info.installation_date.unwrap_or_default(),
296            version: manager_info.version,
297        };
298
299        info!("Retrieved manager status: version={}", status.wazuh_version);
300        Ok(status)
301    }
302
303    pub async fn get_manager_info(&mut self) -> Result<ManagerInfo, WazuhApiError> {
304        debug!("Getting manager information");
305
306        let response = self
307            .api_client
308            .make_request(Method::GET, "/manager/info", None, None)
309            .await?;
310
311        let info_data = response
312            .get("data")
313            .and_then(|d| d.get("affected_items"))
314            .and_then(|items| items.as_array())
315            .and_then(|arr| arr.first())
316            .ok_or_else(|| WazuhApiError::ApiError("Missing manager info data".to_string()))?;
317
318        let info: ManagerInfo = serde_json::from_value(info_data.clone())?;
319        info!(
320            "Retrieved manager info: version={}, node_name={}",
321            info.version,
322            info.node_name.as_deref().unwrap_or("unknown")
323        );
324        Ok(info)
325    }
326
327    pub async fn get_cluster_configuration(&mut self) -> Result<Value, WazuhApiError> {
328        debug!("Getting cluster configuration");
329
330        let response = self
331            .api_client
332            .make_request(Method::GET, "/cluster/configuration", None, None)
333            .await?;
334
335        info!("Retrieved cluster configuration");
336        Ok(response)
337    }
338
339    pub async fn get_master_nodes(&mut self) -> Result<Vec<ClusterNode>, WazuhApiError> {
340        debug!("Getting master nodes");
341        self.get_cluster_nodes(None, None, Some("master")).await
342    }
343
344    pub async fn get_worker_nodes(&mut self) -> Result<Vec<ClusterNode>, WazuhApiError> {
345        debug!("Getting worker nodes");
346        self.get_cluster_nodes(None, None, Some("worker")).await
347    }
348
349    pub async fn is_cluster_healthy(&mut self) -> Result<bool, WazuhApiError> {
350        debug!("Checking cluster health");
351
352        let status = self.get_cluster_status().await?;
353        let is_enabled = status.enabled.eq_ignore_ascii_case("yes");
354        let is_running = status.running.eq_ignore_ascii_case("yes");
355        let is_healthy = is_enabled && is_running;
356
357        if is_healthy {
358            // Additional check: verify nodes are connected
359            match self.get_cluster_healthcheck().await {
360                Ok(healthcheck) => {
361                    let healthy = healthcheck.n_connected_nodes > 0;
362                    info!("Cluster health check: enabled={}, running={}, connected_nodes={}, healthy={}", 
363                          is_enabled, is_running, healthcheck.n_connected_nodes, healthy);
364                    Ok(healthy)
365                }
366                Err(_) => {
367                    info!(
368                        "Cluster health check: enabled={}, running={}, healthcheck_failed=true",
369                        is_enabled, is_running
370                    );
371                    Ok(false) // Or handle error more explicitly if healthcheck failure means unhealthy
372                }
373            }
374        } else {
375            info!(
376                "Cluster health check: enabled={}, running={}, healthy=false",
377                is_enabled, is_running
378            );
379            Ok(false)
380        }
381    }
382
383    pub async fn get_cluster_statistics(&mut self) -> Result<Value, WazuhApiError> {
384        debug!("Getting cluster statistics");
385
386        let response = self
387            .api_client
388            .make_request(Method::GET, "/cluster/stats", None, None)
389            .await?;
390
391        info!("Retrieved cluster statistics");
392        Ok(response)
393    }
394
395    pub async fn get_local_node_info(&mut self) -> Result<Value, WazuhApiError> {
396        debug!("Getting local node information");
397
398        let response = self
399            .api_client
400            .make_request(Method::GET, "/cluster/local/info", None, None)
401            .await?;
402
403        info!("Retrieved local node information");
404        Ok(response)
405    }
406
407    pub async fn restart_manager(&mut self) -> Result<Value, WazuhApiError> {
408        debug!("Restarting manager");
409
410        let response = self
411            .api_client
412            .make_request(Method::PUT, "/manager/restart", None, None)
413            .await?;
414
415        info!("Manager restart command sent");
416        Ok(response)
417    }
418
419    pub async fn get_manager_logs_summary(&mut self) -> Result<Value, WazuhApiError> {
420        debug!("Getting manager logs summary");
421
422        let response = self
423            .api_client
424            .make_request(Method::GET, "/manager/logs/summary", None, None)
425            .await?;
426
427        info!("Retrieved manager logs summary");
428        Ok(response)
429    }
430}