1use reqwest::Method;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use tracing::{debug, info};
5
6use super::error::WazuhApiError;
7use super::wazuh_client::WazuhApiClient;
8
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct ClusterStatus {
11 pub enabled: String,
12 pub running: String,
13}
14
15#[derive(Debug, Clone, Deserialize, Serialize)]
16pub struct ClusterNode {
17 pub name: String,
18 pub node_type: String,
19 pub version: String,
20 pub ip: String,
21 pub status: String,
22}
23
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct ManagerStatus {
26 pub wazuh_version: String,
27 pub openssl_version: String,
28 pub compilation_date: String,
29 pub version: String,
30}
31
32#[derive(Debug, Clone, Deserialize, Serialize)]
33pub struct ProcessStatus {
34 #[serde(rename = "wazuh-agentlessd")]
35 pub wazuh_agentlessd: String,
36 #[serde(rename = "wazuh-analysisd")]
37 pub wazuh_analysisd: String,
38 #[serde(rename = "wazuh-authd")]
39 pub wazuh_authd: String,
40 #[serde(rename = "wazuh-csyslogd")]
41 pub wazuh_csyslogd: String,
42 #[serde(rename = "wazuh-dbd")]
43 pub wazuh_dbd: String,
44 #[serde(rename = "wazuh-monitord")]
45 pub wazuh_monitord: String,
46 #[serde(rename = "wazuh-execd")]
47 pub wazuh_execd: String,
48 #[serde(rename = "wazuh-integratord")]
49 pub wazuh_integratord: String,
50 #[serde(rename = "wazuh-logcollector")]
51 pub wazuh_logcollector: String,
52 #[serde(rename = "wazuh-maild")]
53 pub wazuh_maild: String,
54 #[serde(rename = "wazuh-remoted")]
55 pub wazuh_remoted: String,
56 #[serde(rename = "wazuh-reportd")]
57 pub wazuh_reportd: String,
58 #[serde(rename = "wazuh-syscheckd")]
59 pub wazuh_syscheckd: String,
60 #[serde(rename = "wazuh-clusterd")]
61 pub wazuh_clusterd: String,
62 #[serde(rename = "wazuh-modulesd")]
63 pub wazuh_modulesd: String,
64 #[serde(rename = "wazuh-db")]
65 pub wazuh_db: String,
66 #[serde(rename = "wazuh-apid")]
67 pub wazuh_apid: String,
68}
69
70#[derive(Debug, Clone, Deserialize, Serialize)]
71pub struct ManagerInfo {
72 pub path: String,
73 pub version: String,
74 #[serde(rename = "type")]
75 pub node_type: String,
76 pub max_agents: String,
77 pub openssl_support: Option<String>,
78 pub tz_offset: Option<String>,
79 pub tz_name: Option<String>,
80 pub installation_date: Option<String>,
81 pub revision: Option<String>,
82 pub license_version: Option<String>,
83 pub license_path: Option<String>,
84 pub home_path: Option<String>,
85 pub share_path: Option<String>,
86 pub openssl_version: Option<String>,
87 pub node_name: Option<String>,
88 pub cluster_name: Option<String>,
89}
90
91#[derive(Debug, Clone, Deserialize, Serialize)]
92pub struct ClusterHealthcheck {
93 pub nodes: Vec<ClusterNodeHealth>,
94 pub n_connected_nodes: u32,
95}
96
97#[derive(Debug, Clone, Deserialize, Serialize)]
98pub struct ClusterNodeHealth {
99 pub info: ClusterNodeInfo,
100 pub status: ClusterNodeStatus,
101}
102
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct ClusterNodeInfo {
105 pub name: String,
106 pub node_type: String,
107 pub version: String,
108 pub ip: String,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112pub struct ClusterNodeStatus {
113 pub last_keep_alive: String,
114 pub sync_integrity_free: bool,
115 pub sync_agent_info_free: bool,
116 pub sync_extravalid_free: bool,
117}
118
119#[derive(Debug, Clone)]
120pub struct ClusterClient {
121 api_client: WazuhApiClient,
122}
123
124impl ClusterClient {
125 pub fn new(api_client: WazuhApiClient) -> Self {
126 Self { api_client }
127 }
128
129 pub async fn get_cluster_status(&mut self) -> Result<ClusterStatus, WazuhApiError> {
130 debug!("Getting cluster status");
131
132 let response = self
133 .api_client
134 .make_request(Method::GET, "/cluster/status", None, None)
135 .await?;
136
137 let status_data = response.get("data").ok_or_else(|| {
138 WazuhApiError::ApiError("Missing 'data' in cluster status response".to_string())
139 })?;
140
141 let status: ClusterStatus = serde_json::from_value(status_data.clone())?;
142 info!(
143 "Retrieved cluster status: enabled={}, running={}",
144 status.enabled, status.running
145 );
146 Ok(status)
147 }
148
149 pub async fn get_cluster_nodes(
150 &mut self,
151 limit: Option<u32>,
152 offset: Option<u32>,
153 node_type: Option<&str>,
154 ) -> Result<Vec<ClusterNode>, WazuhApiError> {
155 debug!(?node_type, "Getting cluster nodes");
156
157 let mut query_params = Vec::new();
158
159 if let Some(limit) = limit {
160 query_params.push(("limit", limit.to_string()));
161 }
162 if let Some(offset) = offset {
163 query_params.push(("offset", offset.to_string()));
164 }
165 if let Some(node_type) = node_type {
166 query_params.push(("type", node_type.to_string()));
167 }
168
169 let query_params_ref: Vec<(&str, &str)> =
170 query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
171
172 let response = self
173 .api_client
174 .make_request(
175 Method::GET,
176 "/cluster/nodes",
177 None,
178 if query_params_ref.is_empty() {
179 None
180 } else {
181 Some(&query_params_ref)
182 },
183 )
184 .await?;
185
186 let nodes_data = response
187 .get("data")
188 .and_then(|d| d.get("affected_items"))
189 .ok_or_else(|| {
190 WazuhApiError::ApiError(
191 "Missing 'data.affected_items' in cluster nodes response".to_string(),
192 )
193 })?;
194
195 let nodes: Vec<ClusterNode> = serde_json::from_value(nodes_data.clone())?;
196 info!("Retrieved {} cluster nodes", nodes.len());
197 Ok(nodes)
198 }
199
200 pub async fn get_cluster_node(
201 &mut self,
202 node_name: &str,
203 ) -> Result<ClusterNode, WazuhApiError> {
204 debug!(%node_name, "Getting specific cluster node");
205
206 let endpoint = format!("/cluster/nodes/{}", node_name);
207 let response = self
208 .api_client
209 .make_request(Method::GET, &endpoint, None, None)
210 .await?;
211
212 let node_data = response
213 .get("data")
214 .and_then(|d| d.get("affected_items"))
215 .and_then(|items| items.as_array())
216 .and_then(|arr| arr.first())
217 .ok_or_else(|| {
218 WazuhApiError::ApiError(format!("Cluster node {} not found", node_name))
219 })?;
220
221 let node: ClusterNode = serde_json::from_value(node_data.clone())?;
222 info!(%node_name, "Retrieved cluster node details");
223 Ok(node)
224 }
225
226 pub async fn get_cluster_healthcheck(&mut self) -> Result<ClusterHealthcheck, WazuhApiError> {
227 debug!("Getting cluster healthcheck");
228
229 let response = self
230 .api_client
231 .make_request(Method::GET, "/cluster/healthcheck", None, None)
232 .await?;
233
234 let healthcheck_data = response
235 .get("data")
236 .and_then(|d| d.get("affected_items"))
237 .and_then(|items| items.as_array())
238 .and_then(|arr| arr.first())
239 .ok_or_else(|| {
240 WazuhApiError::ApiError("Missing cluster healthcheck data".to_string())
241 })?;
242
243 let healthcheck: ClusterHealthcheck = serde_json::from_value(healthcheck_data.clone())?;
244 info!(
245 "Retrieved cluster healthcheck: {} connected nodes",
246 healthcheck.n_connected_nodes
247 );
248 Ok(healthcheck)
249 }
250
251 pub async fn get_manager_process_status(&mut self) -> Result<ProcessStatus, WazuhApiError> {
252 debug!("Getting manager process status");
253
254 let response = self
255 .api_client
256 .make_request(Method::GET, "/manager/status", None, None)
257 .await?;
258
259 let status_data = response
260 .get("data")
261 .and_then(|d| d.get("affected_items"))
262 .and_then(|items| items.as_array())
263 .and_then(|arr| arr.first())
264 .ok_or_else(|| {
265 WazuhApiError::ApiError("Missing manager process status data".to_string())
266 })?;
267
268 let status: ProcessStatus = serde_json::from_value(status_data.clone())?;
269 info!("Retrieved manager process status");
270 Ok(status)
271 }
272
273 pub async fn get_manager_status(&mut self) -> Result<ManagerStatus, WazuhApiError> {
274 debug!("Getting manager status");
275
276 let response = self
277 .api_client
278 .make_request(Method::GET, "/manager/info", None, None)
279 .await?;
280
281 let status_data = response
282 .get("data")
283 .and_then(|d| d.get("affected_items"))
284 .and_then(|items| items.as_array())
285 .and_then(|arr| arr.first())
286 .ok_or_else(|| WazuhApiError::ApiError("Missing manager status data".to_string()))?;
287
288 let manager_info: ManagerInfo = serde_json::from_value(status_data.clone())?;
289
290 let status = ManagerStatus {
291 wazuh_version: manager_info.version.clone(),
292 openssl_version: manager_info.openssl_version.unwrap_or_default(),
294 compilation_date: manager_info.installation_date.unwrap_or_default(),
296 version: manager_info.version,
297 };
298
299 info!("Retrieved manager status: version={}", status.wazuh_version);
300 Ok(status)
301 }
302
303 pub async fn get_manager_info(&mut self) -> Result<ManagerInfo, WazuhApiError> {
304 debug!("Getting manager information");
305
306 let response = self
307 .api_client
308 .make_request(Method::GET, "/manager/info", None, None)
309 .await?;
310
311 let info_data = response
312 .get("data")
313 .and_then(|d| d.get("affected_items"))
314 .and_then(|items| items.as_array())
315 .and_then(|arr| arr.first())
316 .ok_or_else(|| WazuhApiError::ApiError("Missing manager info data".to_string()))?;
317
318 let info: ManagerInfo = serde_json::from_value(info_data.clone())?;
319 info!(
320 "Retrieved manager info: version={}, node_name={}",
321 info.version,
322 info.node_name.as_deref().unwrap_or("unknown")
323 );
324 Ok(info)
325 }
326
327 pub async fn get_cluster_configuration(&mut self) -> Result<Value, WazuhApiError> {
328 debug!("Getting cluster configuration");
329
330 let response = self
331 .api_client
332 .make_request(Method::GET, "/cluster/configuration", None, None)
333 .await?;
334
335 info!("Retrieved cluster configuration");
336 Ok(response)
337 }
338
339 pub async fn get_master_nodes(&mut self) -> Result<Vec<ClusterNode>, WazuhApiError> {
340 debug!("Getting master nodes");
341 self.get_cluster_nodes(None, None, Some("master")).await
342 }
343
344 pub async fn get_worker_nodes(&mut self) -> Result<Vec<ClusterNode>, WazuhApiError> {
345 debug!("Getting worker nodes");
346 self.get_cluster_nodes(None, None, Some("worker")).await
347 }
348
349 pub async fn is_cluster_healthy(&mut self) -> Result<bool, WazuhApiError> {
350 debug!("Checking cluster health");
351
352 let status = self.get_cluster_status().await?;
353 let is_enabled = status.enabled.eq_ignore_ascii_case("yes");
354 let is_running = status.running.eq_ignore_ascii_case("yes");
355 let is_healthy = is_enabled && is_running;
356
357 if is_healthy {
358 match self.get_cluster_healthcheck().await {
360 Ok(healthcheck) => {
361 let healthy = healthcheck.n_connected_nodes > 0;
362 info!("Cluster health check: enabled={}, running={}, connected_nodes={}, healthy={}",
363 is_enabled, is_running, healthcheck.n_connected_nodes, healthy);
364 Ok(healthy)
365 }
366 Err(_) => {
367 info!(
368 "Cluster health check: enabled={}, running={}, healthcheck_failed=true",
369 is_enabled, is_running
370 );
371 Ok(false) }
373 }
374 } else {
375 info!(
376 "Cluster health check: enabled={}, running={}, healthy=false",
377 is_enabled, is_running
378 );
379 Ok(false)
380 }
381 }
382
383 pub async fn get_cluster_statistics(&mut self) -> Result<Value, WazuhApiError> {
384 debug!("Getting cluster statistics");
385
386 let response = self
387 .api_client
388 .make_request(Method::GET, "/cluster/stats", None, None)
389 .await?;
390
391 info!("Retrieved cluster statistics");
392 Ok(response)
393 }
394
395 pub async fn get_local_node_info(&mut self) -> Result<Value, WazuhApiError> {
396 debug!("Getting local node information");
397
398 let response = self
399 .api_client
400 .make_request(Method::GET, "/cluster/local/info", None, None)
401 .await?;
402
403 info!("Retrieved local node information");
404 Ok(response)
405 }
406
407 pub async fn restart_manager(&mut self) -> Result<Value, WazuhApiError> {
408 debug!("Restarting manager");
409
410 let response = self
411 .api_client
412 .make_request(Method::PUT, "/manager/restart", None, None)
413 .await?;
414
415 info!("Manager restart command sent");
416 Ok(response)
417 }
418
419 pub async fn get_manager_logs_summary(&mut self) -> Result<Value, WazuhApiError> {
420 debug!("Getting manager logs summary");
421
422 let response = self
423 .api_client
424 .make_request(Method::GET, "/manager/logs/summary", None, None)
425 .await?;
426
427 info!("Retrieved manager logs summary");
428 Ok(response)
429 }
430}