wazuh_client/
logs.rs

1use reqwest::Method;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use tracing::{debug, info};
5
6use super::error::WazuhApiError;
7use super::wazuh_client::WazuhApiClient;
8
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct LogEntry {
11    pub timestamp: String,
12    pub tag: String,
13    pub level: String,
14    pub description: String,
15}
16
17#[derive(Debug, Clone, Deserialize, Serialize)]
18pub struct LogCollectorStats {
19    pub global: LogCollectorPeriod,
20    pub interval: LogCollectorPeriod,
21}
22
23#[derive(Debug, Clone, Deserialize, Serialize)]
24pub struct LogCollectorPeriod {
25    pub start: String,
26    pub end: String,
27    pub files: Vec<LogFile>,
28}
29
30#[derive(Debug, Clone, Deserialize, Serialize)]
31pub struct LogFile {
32    pub location: String,
33    pub events: u64,
34    pub bytes: u64,
35    pub targets: Vec<LogTarget>,
36}
37
38#[derive(Debug, Clone, Deserialize, Serialize)]
39pub struct LogTarget {
40    pub name: String,
41    pub drops: u64,
42}
43
44#[derive(Debug, Clone, Deserialize, Serialize)]
45pub struct AnalysisdStats {
46    pub total_events_decoded: u64,
47    pub syscheck_events_decoded: u64,
48    pub syscheck_edps: f64,
49    pub syscollector_events_decoded: u64,
50    pub syscollector_edps: f64,
51    pub rootcheck_events_decoded: u64,
52    pub rootcheck_edps: f64,
53    pub sca_events_decoded: u64,
54    pub sca_edps: f64,
55    pub hostinfo_events_decoded: u64,
56    pub hostinfo_edps: f64,
57    pub winevt_events_decoded: u64,
58    pub winevt_edps: f64,
59    pub other_events_decoded: u64,
60    pub other_edps: f64,
61    pub events_processed: u64,
62    pub events_edps: f64,
63    pub events_received: u64,
64    pub events_dropped: u64,
65    pub alerts_written: u64,
66    pub firewall_written: u64,
67    pub fts_written: u64,
68    pub syscheck_queue_usage: f64,
69    pub syscheck_queue_size: u64,
70    pub syscollector_queue_usage: f64,
71    pub syscollector_queue_size: u64,
72    pub rootcheck_queue_usage: f64,
73    pub rootcheck_queue_size: u64,
74    pub sca_queue_usage: f64,
75    pub sca_queue_size: u64,
76    pub hostinfo_queue_usage: f64,
77    pub hostinfo_queue_size: u64,
78    pub winevt_queue_usage: f64,
79    pub winevt_queue_size: u64,
80    pub dbsync_queue_usage: f64,
81    pub dbsync_queue_size: u64,
82    pub upgrade_queue_usage: f64,
83    pub upgrade_queue_size: u64,
84    pub event_queue_usage: f64,
85    pub event_queue_size: u64,
86    pub rule_matching_queue_usage: f64,
87    pub rule_matching_queue_size: u64,
88    pub alerts_queue_usage: f64,
89    pub alerts_queue_size: u64,
90    pub firewall_queue_usage: f64,
91    pub firewall_queue_size: u64,
92    pub statistical_queue_usage: f64,
93    pub statistical_queue_size: u64,
94    pub archives_queue_usage: f64,
95    pub archives_queue_size: u64,
96}
97
98#[derive(Debug, Clone, Deserialize, Serialize)]
99pub struct RemotedStats {
100    pub queue_size: f64,
101    pub total_queue_size: f64,
102    pub tcp_sessions: f64,
103    pub ctrl_msg_count: f64,
104    pub discarded_count: f64,
105    pub sent_bytes: f64,
106    pub recv_bytes: f64,
107    pub dequeued_after_close: f64,
108}
109
110#[derive(Debug, Clone)]
111pub struct LogsClient {
112    api_client: WazuhApiClient,
113}
114
115impl LogsClient {
116    pub fn new(api_client: WazuhApiClient) -> Self {
117        Self { api_client }
118    }
119
120    pub async fn get_manager_logs(
121        &mut self,
122        limit: Option<u32>,
123        offset: Option<u32>,
124        level: Option<&str>,
125        tag: Option<&str>,
126        search: Option<&str>,
127    ) -> Result<Vec<LogEntry>, WazuhApiError> {
128        debug!(?level, ?tag, ?search, "Getting manager logs");
129
130        let mut query_params = Vec::new();
131
132        if let Some(limit) = limit {
133            query_params.push(("limit", limit.to_string()));
134        }
135        if let Some(offset) = offset {
136            query_params.push(("offset", offset.to_string()));
137        }
138        if let Some(level) = level {
139            query_params.push(("level", level.to_string()));
140        }
141        if let Some(tag) = tag {
142            query_params.push(("tag", tag.to_string()));
143        }
144        if let Some(search) = search {
145            query_params.push(("search", search.to_string()));
146        }
147
148        let query_params_ref: Vec<(&str, &str)> =
149            query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
150
151        let response = self
152            .api_client
153            .make_request(
154                Method::GET,
155                "/manager/logs",
156                None,
157                if query_params_ref.is_empty() {
158                    None
159                } else {
160                    Some(&query_params_ref)
161                },
162            )
163            .await?;
164
165        let logs_data = response
166            .get("data")
167            .and_then(|d| d.get("affected_items"))
168            .ok_or_else(|| {
169                WazuhApiError::ApiError(
170                    "Missing 'data.affected_items' in manager logs response".to_string(),
171                )
172            })?;
173
174        let logs: Vec<LogEntry> = serde_json::from_value(logs_data.clone())?;
175        info!("Retrieved {} manager log entries", logs.len());
176        Ok(logs)
177    }
178
179    pub async fn get_error_logs(
180        &mut self,
181        limit: Option<u32>,
182    ) -> Result<Vec<LogEntry>, WazuhApiError> {
183        debug!("Getting error logs");
184        self.get_manager_logs(limit, None, Some("error"), None, None)
185            .await
186    }
187
188    pub async fn get_warning_logs(
189        &mut self,
190        limit: Option<u32>,
191    ) -> Result<Vec<LogEntry>, WazuhApiError> {
192        debug!("Getting warning logs");
193        self.get_manager_logs(limit, None, Some("warning"), None, None)
194            .await
195    }
196
197    pub async fn get_critical_logs(
198        &mut self,
199        limit: Option<u32>,
200    ) -> Result<Vec<LogEntry>, WazuhApiError> {
201        debug!("Getting critical logs");
202        self.get_manager_logs(limit, None, Some("critical"), None, None)
203            .await
204    }
205
206    pub async fn get_logs_by_tag(
207        &mut self,
208        tag: &str,
209        limit: Option<u32>,
210    ) -> Result<Vec<LogEntry>, WazuhApiError> {
211        debug!(%tag, "Getting logs by tag");
212        self.get_manager_logs(limit, None, None, Some(tag), None)
213            .await
214    }
215
216    pub async fn search_logs(
217        &mut self,
218        search_term: &str,
219        limit: Option<u32>,
220    ) -> Result<Vec<LogEntry>, WazuhApiError> {
221        debug!(%search_term, "Searching logs");
222        self.get_manager_logs(limit, None, None, None, Some(search_term))
223            .await
224    }
225
226    pub async fn get_logcollector_stats(
227        &mut self,
228        agent_id: &str,
229    ) -> Result<LogCollectorStats, WazuhApiError> {
230        debug!(%agent_id, "Getting log collector statistics");
231
232        let endpoint = format!("/agents/{}/stats/logcollector", agent_id);
233        let response = self
234            .api_client
235            .make_request(Method::GET, &endpoint, None, None)
236            .await?;
237
238        let stats_data = response
239            .get("data")
240            .and_then(|d| d.get("affected_items"))
241            .and_then(|items| items.as_array())
242            .and_then(|arr| arr.first())
243            .ok_or_else(|| {
244                WazuhApiError::ApiError(format!(
245                    "Log collector stats for agent {} not found",
246                    agent_id
247                ))
248            })?;
249
250        let stats: LogCollectorStats = serde_json::from_value(stats_data.clone())?;
251
252        info!(%agent_id, "Retrieved log collector statistics");
253        Ok(stats)
254    }
255
256    pub async fn get_analysisd_stats(&mut self) -> Result<AnalysisdStats, WazuhApiError> {
257        debug!("Getting analysis daemon statistics");
258
259        let response = self
260            .api_client
261            .make_request(Method::GET, "/manager/stats/analysisd", None, None)
262            .await?;
263
264        let stats_data = response
265            .get("data")
266            .and_then(|d| d.get("affected_items"))
267            .and_then(|items| items.as_array())
268            .and_then(|arr| arr.first())
269            .ok_or_else(|| {
270                WazuhApiError::ApiError("Analysis daemon statistics not found".to_string())
271            })?;
272
273        let stats: AnalysisdStats = serde_json::from_value(stats_data.clone())?;
274        info!("Retrieved analysis daemon statistics");
275        Ok(stats)
276    }
277
278    pub async fn get_remoted_stats(&mut self) -> Result<RemotedStats, WazuhApiError> {
279        debug!("Getting remote daemon statistics");
280
281        let response = self
282            .api_client
283            .make_request(Method::GET, "/manager/stats/remoted", None, None)
284            .await?;
285
286        let stats_data = response
287            .get("data")
288            .and_then(|d| d.get("affected_items"))
289            .and_then(|items| items.as_array())
290            .and_then(|arr| arr.first())
291            .ok_or_else(|| {
292                WazuhApiError::ApiError("Remote daemon statistics not found".to_string())
293            })?;
294
295        let stats: RemotedStats = serde_json::from_value(stats_data.clone())?;
296        info!("Retrieved remote daemon statistics");
297        Ok(stats)
298    }
299
300    pub async fn get_hourly_stats(&mut self) -> Result<Value, WazuhApiError> {
301        debug!("Getting hourly statistics");
302
303        let response = self
304            .api_client
305            .make_request(Method::GET, "/manager/stats/hourly", None, None)
306            .await?;
307
308        info!("Retrieved hourly statistics");
309        Ok(response)
310    }
311
312    pub async fn get_weekly_stats(&mut self) -> Result<Value, WazuhApiError> {
313        debug!("Getting weekly statistics");
314
315        let response = self
316            .api_client
317            .make_request(Method::GET, "/manager/stats/weekly", None, None)
318            .await?;
319
320        info!("Retrieved weekly statistics");
321        Ok(response)
322    }
323
324    pub async fn get_logs_summary(&mut self) -> Result<Value, WazuhApiError> {
325        debug!("Getting logs summary");
326
327        let response = self
328            .api_client
329            .make_request(Method::GET, "/manager/logs/summary", None, None)
330            .await?;
331
332        info!("Retrieved logs summary");
333        Ok(response)
334    }
335
336    pub async fn get_recent_errors(
337        &mut self,
338        limit: Option<u32>,
339    ) -> Result<Vec<LogEntry>, WazuhApiError> {
340        debug!("Getting recent error logs");
341        self.get_error_logs(limit).await
342    }
343
344    pub async fn get_recent_warnings(
345        &mut self,
346        limit: Option<u32>,
347    ) -> Result<Vec<LogEntry>, WazuhApiError> {
348        debug!("Getting recent warning logs");
349        self.get_warning_logs(limit).await
350    }
351
352    pub async fn get_performance_metrics(&mut self) -> Result<Value, WazuhApiError> {
353        debug!("Getting performance metrics");
354
355        let analysisd_stats = self.get_analysisd_stats().await?;
356        let remoted_stats = self.get_remoted_stats().await?;
357
358        let metrics = serde_json::json!({
359            "analysisd": {
360                "events_per_second": analysisd_stats.events_edps,
361                "total_events_processed": analysisd_stats.events_processed,
362                "events_dropped": analysisd_stats.events_dropped,
363                "alerts_written": analysisd_stats.alerts_written,
364                "queue_usage": {
365                    "event_queue": analysisd_stats.event_queue_usage,
366                    "alerts_queue": analysisd_stats.alerts_queue_usage,
367                    "rule_matching_queue": analysisd_stats.rule_matching_queue_usage
368                }
369            },
370            "remoted": {
371                "tcp_sessions": remoted_stats.tcp_sessions,
372                "bytes_sent": remoted_stats.sent_bytes,
373                "bytes_received": remoted_stats.recv_bytes,
374                "discarded_count": remoted_stats.discarded_count
375            }
376        });
377
378        info!("Retrieved performance metrics");
379        Ok(metrics)
380    }
381
382    pub async fn monitor_agent_ingestion(
383        &mut self,
384        agent_id: &str,
385    ) -> Result<Value, WazuhApiError> {
386        debug!(%agent_id, "Monitoring agent log ingestion");
387
388        let logcollector_stats = self.get_logcollector_stats(agent_id).await?;
389
390        let total_events: u64 = logcollector_stats
391            .global
392            .files
393            .iter()
394            .map(|f| f.events)
395            .sum();
396        let total_bytes: u64 = logcollector_stats
397            .global
398            .files
399            .iter()
400            .map(|f| f.bytes)
401            .sum();
402        let total_drops: u64 = logcollector_stats
403            .global
404            .files
405            .iter()
406            .flat_map(|f| &f.targets)
407            .map(|t| t.drops)
408            .sum();
409
410        let ingestion_info = serde_json::json!({
411            "agent_id": agent_id,
412            "total_events": total_events,
413            "events_dropped": total_drops,
414            "bytes_processed": total_bytes,
415            "drop_rate": if total_events > 0 {
416                (total_drops as f64 / total_events as f64) * 100.0
417            } else {
418                0.0
419            },
420            "global_period": {
421                "start": logcollector_stats.global.start,
422                "end": logcollector_stats.global.end,
423                "files": logcollector_stats.global.files
424            },
425            "interval_period": {
426                "start": logcollector_stats.interval.start,
427                "end": logcollector_stats.interval.end,
428                "files": logcollector_stats.interval.files
429            }
430        });
431
432        info!(%agent_id, "Retrieved agent ingestion monitoring data");
433        Ok(ingestion_info)
434    }
435}