1use reqwest::Method;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use tracing::{debug, info};
5
6use super::error::WazuhApiError;
7use super::wazuh_client::WazuhApiClient;
8
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct LogEntry {
11 pub timestamp: String,
12 pub tag: String,
13 pub level: String,
14 pub description: String,
15}
16
17#[derive(Debug, Clone, Deserialize, Serialize)]
18pub struct LogCollectorStats {
19 pub global: LogCollectorPeriod,
20 pub interval: LogCollectorPeriod,
21}
22
23#[derive(Debug, Clone, Deserialize, Serialize)]
24pub struct LogCollectorPeriod {
25 pub start: String,
26 pub end: String,
27 pub files: Vec<LogFile>,
28}
29
30#[derive(Debug, Clone, Deserialize, Serialize)]
31pub struct LogFile {
32 pub location: String,
33 pub events: u64,
34 pub bytes: u64,
35 pub targets: Vec<LogTarget>,
36}
37
38#[derive(Debug, Clone, Deserialize, Serialize)]
39pub struct LogTarget {
40 pub name: String,
41 pub drops: u64,
42}
43
44#[derive(Debug, Clone, Deserialize, Serialize)]
45pub struct AnalysisdStats {
46 pub total_events_decoded: u64,
47 pub syscheck_events_decoded: u64,
48 pub syscheck_edps: f64,
49 pub syscollector_events_decoded: u64,
50 pub syscollector_edps: f64,
51 pub rootcheck_events_decoded: u64,
52 pub rootcheck_edps: f64,
53 pub sca_events_decoded: u64,
54 pub sca_edps: f64,
55 pub hostinfo_events_decoded: u64,
56 pub hostinfo_edps: f64,
57 pub winevt_events_decoded: u64,
58 pub winevt_edps: f64,
59 pub other_events_decoded: u64,
60 pub other_edps: f64,
61 pub events_processed: u64,
62 pub events_edps: f64,
63 pub events_received: u64,
64 pub events_dropped: u64,
65 pub alerts_written: u64,
66 pub firewall_written: u64,
67 pub fts_written: u64,
68 pub syscheck_queue_usage: f64,
69 pub syscheck_queue_size: u64,
70 pub syscollector_queue_usage: f64,
71 pub syscollector_queue_size: u64,
72 pub rootcheck_queue_usage: f64,
73 pub rootcheck_queue_size: u64,
74 pub sca_queue_usage: f64,
75 pub sca_queue_size: u64,
76 pub hostinfo_queue_usage: f64,
77 pub hostinfo_queue_size: u64,
78 pub winevt_queue_usage: f64,
79 pub winevt_queue_size: u64,
80 pub dbsync_queue_usage: f64,
81 pub dbsync_queue_size: u64,
82 pub upgrade_queue_usage: f64,
83 pub upgrade_queue_size: u64,
84 pub event_queue_usage: f64,
85 pub event_queue_size: u64,
86 pub rule_matching_queue_usage: f64,
87 pub rule_matching_queue_size: u64,
88 pub alerts_queue_usage: f64,
89 pub alerts_queue_size: u64,
90 pub firewall_queue_usage: f64,
91 pub firewall_queue_size: u64,
92 pub statistical_queue_usage: f64,
93 pub statistical_queue_size: u64,
94 pub archives_queue_usage: f64,
95 pub archives_queue_size: u64,
96}
97
98#[derive(Debug, Clone, Deserialize, Serialize)]
99pub struct RemotedStats {
100 pub queue_size: f64,
101 pub total_queue_size: f64,
102 pub tcp_sessions: f64,
103 pub ctrl_msg_count: f64,
104 pub discarded_count: f64,
105 pub sent_bytes: f64,
106 pub recv_bytes: f64,
107 pub dequeued_after_close: f64,
108}
109
110#[derive(Debug, Clone)]
111pub struct LogsClient {
112 api_client: WazuhApiClient,
113}
114
115impl LogsClient {
116 pub fn new(api_client: WazuhApiClient) -> Self {
117 Self { api_client }
118 }
119
120 pub async fn get_manager_logs(
121 &mut self,
122 limit: Option<u32>,
123 offset: Option<u32>,
124 level: Option<&str>,
125 tag: Option<&str>,
126 search: Option<&str>,
127 ) -> Result<Vec<LogEntry>, WazuhApiError> {
128 debug!(?level, ?tag, ?search, "Getting manager logs");
129
130 let mut query_params = Vec::new();
131
132 if let Some(limit) = limit {
133 query_params.push(("limit", limit.to_string()));
134 }
135 if let Some(offset) = offset {
136 query_params.push(("offset", offset.to_string()));
137 }
138 if let Some(level) = level {
139 query_params.push(("level", level.to_string()));
140 }
141 if let Some(tag) = tag {
142 query_params.push(("tag", tag.to_string()));
143 }
144 if let Some(search) = search {
145 query_params.push(("search", search.to_string()));
146 }
147
148 let query_params_ref: Vec<(&str, &str)> =
149 query_params.iter().map(|(k, v)| (*k, v.as_str())).collect();
150
151 let response = self
152 .api_client
153 .make_request(
154 Method::GET,
155 "/manager/logs",
156 None,
157 if query_params_ref.is_empty() {
158 None
159 } else {
160 Some(&query_params_ref)
161 },
162 )
163 .await?;
164
165 let logs_data = response
166 .get("data")
167 .and_then(|d| d.get("affected_items"))
168 .ok_or_else(|| {
169 WazuhApiError::ApiError(
170 "Missing 'data.affected_items' in manager logs response".to_string(),
171 )
172 })?;
173
174 let logs: Vec<LogEntry> = serde_json::from_value(logs_data.clone())?;
175 info!("Retrieved {} manager log entries", logs.len());
176 Ok(logs)
177 }
178
179 pub async fn get_error_logs(
180 &mut self,
181 limit: Option<u32>,
182 ) -> Result<Vec<LogEntry>, WazuhApiError> {
183 debug!("Getting error logs");
184 self.get_manager_logs(limit, None, Some("error"), None, None)
185 .await
186 }
187
188 pub async fn get_warning_logs(
189 &mut self,
190 limit: Option<u32>,
191 ) -> Result<Vec<LogEntry>, WazuhApiError> {
192 debug!("Getting warning logs");
193 self.get_manager_logs(limit, None, Some("warning"), None, None)
194 .await
195 }
196
197 pub async fn get_critical_logs(
198 &mut self,
199 limit: Option<u32>,
200 ) -> Result<Vec<LogEntry>, WazuhApiError> {
201 debug!("Getting critical logs");
202 self.get_manager_logs(limit, None, Some("critical"), None, None)
203 .await
204 }
205
206 pub async fn get_logs_by_tag(
207 &mut self,
208 tag: &str,
209 limit: Option<u32>,
210 ) -> Result<Vec<LogEntry>, WazuhApiError> {
211 debug!(%tag, "Getting logs by tag");
212 self.get_manager_logs(limit, None, None, Some(tag), None)
213 .await
214 }
215
216 pub async fn search_logs(
217 &mut self,
218 search_term: &str,
219 limit: Option<u32>,
220 ) -> Result<Vec<LogEntry>, WazuhApiError> {
221 debug!(%search_term, "Searching logs");
222 self.get_manager_logs(limit, None, None, None, Some(search_term))
223 .await
224 }
225
226 pub async fn get_logcollector_stats(
227 &mut self,
228 agent_id: &str,
229 ) -> Result<LogCollectorStats, WazuhApiError> {
230 debug!(%agent_id, "Getting log collector statistics");
231
232 let endpoint = format!("/agents/{}/stats/logcollector", agent_id);
233 let response = self
234 .api_client
235 .make_request(Method::GET, &endpoint, None, None)
236 .await?;
237
238 let stats_data = response
239 .get("data")
240 .and_then(|d| d.get("affected_items"))
241 .and_then(|items| items.as_array())
242 .and_then(|arr| arr.first())
243 .ok_or_else(|| {
244 WazuhApiError::ApiError(format!(
245 "Log collector stats for agent {} not found",
246 agent_id
247 ))
248 })?;
249
250 let stats: LogCollectorStats = serde_json::from_value(stats_data.clone())?;
251
252 info!(%agent_id, "Retrieved log collector statistics");
253 Ok(stats)
254 }
255
256 pub async fn get_analysisd_stats(&mut self) -> Result<AnalysisdStats, WazuhApiError> {
257 debug!("Getting analysis daemon statistics");
258
259 let response = self
260 .api_client
261 .make_request(Method::GET, "/manager/stats/analysisd", None, None)
262 .await?;
263
264 let stats_data = response
265 .get("data")
266 .and_then(|d| d.get("affected_items"))
267 .and_then(|items| items.as_array())
268 .and_then(|arr| arr.first())
269 .ok_or_else(|| {
270 WazuhApiError::ApiError("Analysis daemon statistics not found".to_string())
271 })?;
272
273 let stats: AnalysisdStats = serde_json::from_value(stats_data.clone())?;
274 info!("Retrieved analysis daemon statistics");
275 Ok(stats)
276 }
277
278 pub async fn get_remoted_stats(&mut self) -> Result<RemotedStats, WazuhApiError> {
279 debug!("Getting remote daemon statistics");
280
281 let response = self
282 .api_client
283 .make_request(Method::GET, "/manager/stats/remoted", None, None)
284 .await?;
285
286 let stats_data = response
287 .get("data")
288 .and_then(|d| d.get("affected_items"))
289 .and_then(|items| items.as_array())
290 .and_then(|arr| arr.first())
291 .ok_or_else(|| {
292 WazuhApiError::ApiError("Remote daemon statistics not found".to_string())
293 })?;
294
295 let stats: RemotedStats = serde_json::from_value(stats_data.clone())?;
296 info!("Retrieved remote daemon statistics");
297 Ok(stats)
298 }
299
300 pub async fn get_hourly_stats(&mut self) -> Result<Value, WazuhApiError> {
301 debug!("Getting hourly statistics");
302
303 let response = self
304 .api_client
305 .make_request(Method::GET, "/manager/stats/hourly", None, None)
306 .await?;
307
308 info!("Retrieved hourly statistics");
309 Ok(response)
310 }
311
312 pub async fn get_weekly_stats(&mut self) -> Result<Value, WazuhApiError> {
313 debug!("Getting weekly statistics");
314
315 let response = self
316 .api_client
317 .make_request(Method::GET, "/manager/stats/weekly", None, None)
318 .await?;
319
320 info!("Retrieved weekly statistics");
321 Ok(response)
322 }
323
324 pub async fn get_logs_summary(&mut self) -> Result<Value, WazuhApiError> {
325 debug!("Getting logs summary");
326
327 let response = self
328 .api_client
329 .make_request(Method::GET, "/manager/logs/summary", None, None)
330 .await?;
331
332 info!("Retrieved logs summary");
333 Ok(response)
334 }
335
336 pub async fn get_recent_errors(
337 &mut self,
338 limit: Option<u32>,
339 ) -> Result<Vec<LogEntry>, WazuhApiError> {
340 debug!("Getting recent error logs");
341 self.get_error_logs(limit).await
342 }
343
344 pub async fn get_recent_warnings(
345 &mut self,
346 limit: Option<u32>,
347 ) -> Result<Vec<LogEntry>, WazuhApiError> {
348 debug!("Getting recent warning logs");
349 self.get_warning_logs(limit).await
350 }
351
352 pub async fn get_performance_metrics(&mut self) -> Result<Value, WazuhApiError> {
353 debug!("Getting performance metrics");
354
355 let analysisd_stats = self.get_analysisd_stats().await?;
356 let remoted_stats = self.get_remoted_stats().await?;
357
358 let metrics = serde_json::json!({
359 "analysisd": {
360 "events_per_second": analysisd_stats.events_edps,
361 "total_events_processed": analysisd_stats.events_processed,
362 "events_dropped": analysisd_stats.events_dropped,
363 "alerts_written": analysisd_stats.alerts_written,
364 "queue_usage": {
365 "event_queue": analysisd_stats.event_queue_usage,
366 "alerts_queue": analysisd_stats.alerts_queue_usage,
367 "rule_matching_queue": analysisd_stats.rule_matching_queue_usage
368 }
369 },
370 "remoted": {
371 "tcp_sessions": remoted_stats.tcp_sessions,
372 "bytes_sent": remoted_stats.sent_bytes,
373 "bytes_received": remoted_stats.recv_bytes,
374 "discarded_count": remoted_stats.discarded_count
375 }
376 });
377
378 info!("Retrieved performance metrics");
379 Ok(metrics)
380 }
381
382 pub async fn monitor_agent_ingestion(
383 &mut self,
384 agent_id: &str,
385 ) -> Result<Value, WazuhApiError> {
386 debug!(%agent_id, "Monitoring agent log ingestion");
387
388 let logcollector_stats = self.get_logcollector_stats(agent_id).await?;
389
390 let total_events: u64 = logcollector_stats
391 .global
392 .files
393 .iter()
394 .map(|f| f.events)
395 .sum();
396 let total_bytes: u64 = logcollector_stats
397 .global
398 .files
399 .iter()
400 .map(|f| f.bytes)
401 .sum();
402 let total_drops: u64 = logcollector_stats
403 .global
404 .files
405 .iter()
406 .flat_map(|f| &f.targets)
407 .map(|t| t.drops)
408 .sum();
409
410 let ingestion_info = serde_json::json!({
411 "agent_id": agent_id,
412 "total_events": total_events,
413 "events_dropped": total_drops,
414 "bytes_processed": total_bytes,
415 "drop_rate": if total_events > 0 {
416 (total_drops as f64 / total_events as f64) * 100.0
417 } else {
418 0.0
419 },
420 "global_period": {
421 "start": logcollector_stats.global.start,
422 "end": logcollector_stats.global.end,
423 "files": logcollector_stats.global.files
424 },
425 "interval_period": {
426 "start": logcollector_stats.interval.start,
427 "end": logcollector_stats.interval.end,
428 "files": logcollector_stats.interval.files
429 }
430 });
431
432 info!(%agent_id, "Retrieved agent ingestion monitoring data");
433 Ok(ingestion_info)
434 }
435}