agent_api/
api.rs

1#![allow(warnings)]
2use anyhow::Context;
3use chrono::Local;
4use cortexbrain_common::{
5    formatters::{format_ipv4, format_ipv6},
6};
7use prost::bytes::BytesMut;
8use std::{str::FromStr, sync::Arc};
9use std::sync::Mutex;
10use tonic::{Request, Response, Status};
11use tracing::info;
12
13use aya::{
14    maps::{MapData, PerfEventArray},
15    util::online_cpus,
16};
17use std::result::Result::Ok;
18use tonic::async_trait;
19
20use aya::maps::HashMap as ayaHashMap;
21use std::collections::HashMap;
22use tokio::sync::mpsc;
23use tokio::task;
24
25use crate::{
26    agent::{
27        ConnectionEvent, DroppedPacketMetric, DroppedPacketsResponse,
28        LatencyMetric, LatencyMetricsResponse,
29    }, 
30};
31
32use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics};
33
34// *  contains agent api configuration
35use crate::agent::{
36    agent_server::Agent, ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse,
37    RequestActiveConnections, RmIpFromBlocklistRequest, RmIpFromBlocklistResponse,
38};
39use crate::constants::PIN_BLOCKLIST_MAP_PATH;
40
41use crate::helpers::comm_to_string;
42use aya::maps::Map;
43use cortexbrain_common::constants::BPF_PATH;
44use cortexflow_identity::enums::IpProtocols;
45use std::net::Ipv4Addr;
46use tracing::warn;
47
48pub struct AgentApi {
49    //* event_rx is an istance of a mpsc receiver.
50    //* is used to receive the data from the transmitter (tx)
51    active_connection_event_rx: Mutex<mpsc::Receiver<Result<Vec<ConnectionEvent>, Status>>>,
52    active_connection_event_tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
53    latency_metrics_rx: Mutex<mpsc::Receiver<Result<Vec<LatencyMetric>, Status>>>,
54    latency_metrics_tx: mpsc::Sender<Result<Vec<LatencyMetric>, Status>>,
55    dropped_packet_metrics_rx: Mutex<mpsc::Receiver<Result<Vec<DroppedPacketMetric>, Status>>>,
56    dropped_packet_metrics_tx: mpsc::Sender<Result<Vec<DroppedPacketMetric>, Status>>,
57}
58
59//* Event sender trait. Takes an event from a map and send that to the mpsc channel
60//* using the send_map function
61#[async_trait]
62pub trait EventSender: Send + Sync + 'static {
63    async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>);
64    async fn send_active_connection_event_map(
65        &self,
66        map: Vec<ConnectionEvent>,
67        tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
68    ) {
69        let status = Status::new(tonic::Code::Ok, "success");
70        let event = Ok(map);
71
72        let _ = tx.send(event).await;
73    }
74
75    async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>);
76    async fn send_latency_metrics_event_map(
77        &self,
78        map: Vec<LatencyMetric>,
79        tx: mpsc::Sender<Result<Vec<LatencyMetric>, Status>>,
80    ) {
81        let status = Status::new(tonic::Code::Ok, "success");
82        let event = Ok(map);
83        let _ = tx.send(event).await;
84    }
85
86    async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>);
87    async fn send_dropped_packet_metrics_event_map(
88        &self,
89        map: Vec<DroppedPacketMetric>,
90        tx: mpsc::Sender<Result<Vec<DroppedPacketMetric>, Status>>,
91    ) {
92        let status = Status::new(tonic::Code::Ok, "success");
93        let event = Ok(map);
94        let _ = tx.send(event).await;
95    }
96
97}
98
99// send event function. takes an HashMap and send that using mpsc event_tx
100#[async_trait]
101impl EventSender for AgentApi {
102    async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>) {
103        self.send_active_connection_event_map(event, self.active_connection_event_tx.clone())
104            .await;
105    }
106
107    async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>) {
108        self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone())
109            .await;
110    }
111
112    async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>) {
113        self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone())
114            .await;
115    }
116}
117
118//initialize a default trait for AgentApi. Loads a name and a bpf istance.
119//this trait is essential for init the Agent.
120impl Default for AgentApi {
121    //TODO:this part needs a better error handling
122    fn default() -> Self {
123        // load connections maps mapdata
124        let active_connection_mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map")
125            .expect("cannot open events_map Mapdata");
126        let active_connection_map = Map::PerfEventArray(active_connection_mapdata); //creates a PerfEventArray from the mapdata
127
128        let mut active_connection_events_array = PerfEventArray::try_from(active_connection_map)
129            .expect("Error while initializing events array");
130
131        // load network metrics maps mapdata
132        let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/net_metrics")
133            .expect("cannot open net_metrics Mapdata");
134        let network_metrics_map = Map::PerfEventArray(network_metrics_mapdata); //creates a PerfEventArray from the mapdata
135        let mut network_metrics_events_array = PerfEventArray::try_from(network_metrics_map)
136            .expect("Error while initializing network metrics array");
137
138        // load time stamp events maps mapdata
139        let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/time_stamp_events")
140            .expect("cannot open time_stamp_events Mapdata");
141        let time_stamp_events_map = Map::PerfEventArray(time_stamp_events_mapdata); //
142        let mut time_stamp_events_array = PerfEventArray::try_from(time_stamp_events_map)
143            .expect("Error while initializing time stamp events array");
144
145        //init a mpsc channel
146        let (conn_tx, conn_rx) = mpsc::channel(1024);
147        let (lat_tx, lat_rx) = mpsc::channel(2048);
148        let (drop_tx, drop_rx) = mpsc::channel(2048);
149
150        let api = AgentApi {
151            active_connection_event_rx: conn_rx.into(),
152            active_connection_event_tx: conn_tx.clone(),
153            latency_metrics_rx: Mutex::new(lat_rx),
154            latency_metrics_tx: lat_tx.clone(),
155            dropped_packet_metrics_rx: Mutex::new(drop_rx),
156            dropped_packet_metrics_tx: drop_tx.clone(),
157        };
158
159        // For network metrics
160
161        //spawn an event readers
162        task::spawn(async move {
163            let mut net_events_buffer = Vec::new();
164            //scan the cpus to read the data
165
166            for cpu_id in online_cpus()
167                .map_err(|e| anyhow::anyhow!("Error {:?}", e))
168                .unwrap()
169            {
170                let buf = active_connection_events_array
171                    .open(cpu_id, None)
172                    .expect("Error during the creation of net_events_buf structure");
173
174                let buffers = vec![BytesMut::with_capacity(4096); 8];
175                net_events_buffer.push((buf, buffers));
176            }
177
178            info!("Starting event listener");
179            //send the data through a mpsc channel
180            loop {
181                for (buf, buffers) in net_events_buffer.iter_mut() {
182                    match buf.read_events(buffers) {
183                        Ok(events) => {
184                            //read the events, this function is similar to the one used in identity/helpers.rs/display_events
185                            if events.read > 0 {
186                                for i in 0..events.read {
187                                    let data = &buffers[i];
188                                    if data.len() >= std::mem::size_of::<PacketLog>() {
189                                        let pl: PacketLog =
190                                            unsafe { std::ptr::read(data.as_ptr() as *const _) };
191                                        let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
192                                        let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
193                                        let src_port = u16::from_be(pl.src_port as u16);
194                                        let dst_port = u16::from_be(pl.dst_port as u16);
195                                        let event_id = pl.pid;
196
197                                        match IpProtocols::try_from(pl.proto) {
198                                            Ok(proto) => {
199                                                info!(
200                                                    "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
201                                                    event_id,
202                                                    proto,
203                                                    src,
204                                                    src_port,
205                                                    dst,
206                                                    dst_port
207                                                );
208                                                info!("creating vector for the aggregated data");
209                                                let mut evt = Vec::new();
210                                                // insert event in the vector
211                                                info!("Inserting events into the vector");
212                                                //TODO: use a Arc<str> or Box<str> type instead of String type.
213                                                //The data doesn't need to implement any .copy() or .clone() trait
214                                                // using an Arc<str> type will also waste less resources
215                                                evt.push(ConnectionEvent {
216                                                    event_id: event_id.to_string(),
217                                                    src_ip_port: format!("{}:{}", src, src_port),
218                                                    dst_ip_port: format!("{}:{}", dst, dst_port),
219                                                });
220                                                info!("sending events to the MPSC channel");
221                                                let _ = conn_tx.send(Ok(evt)).await;
222                                            }
223                                            Err(_) => {
224                                                info!(
225                                                    "Event Id: {} Protocol: Unknown ({})",
226                                                    event_id, pl.proto
227                                                );
228                                            }
229                                        };
230                                    } else {
231                                        warn!(
232                                            "Received packet data too small: {} bytes",
233                                            data.len()
234                                        );
235                                    }
236                                }
237                            } else if events.read == 0 {
238                                info!("[Agent/API] 0 Events found");
239                            }
240                        }
241                        Err(e) => {
242                            eprintln!("Errore nella lettura eventi: {}", e);
243                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
244                        }
245                    }
246                }
247                // small delay to avoid cpu congestion
248                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
249            }
250        });
251
252        task::spawn(async move {
253            let mut net_metrics_buffer = Vec::new();
254
255            //scan the cpus to read the data
256            for cpu_id in online_cpus()
257                .map_err(|e| anyhow::anyhow!("Error {:?}", e))
258                .unwrap()
259            {
260                let buf = network_metrics_events_array
261                    .open(cpu_id, None)
262                    .expect("Error during the creation of net_metrics_buf structure");
263
264                let buffers = vec![BytesMut::with_capacity(4096); 8];
265                net_metrics_buffer.push((buf, buffers));
266            }
267
268            info!("Starting network metrics listener");
269
270            //send the data through a mpsc channel
271            loop {
272                for (buf, buffers) in net_metrics_buffer.iter_mut() {
273                    match buf.read_events(buffers) {
274                        Ok(events) => {
275                            //read the events, this function is similar to the one used in identity/helpers.rs/display_events
276                            if events.read > 0 {
277                                for i in 0..events.read {
278                                    let data = &buffers[i];
279                                    if data.len() >= std::mem::size_of::<NetworkMetrics>() {
280                                        let nm: NetworkMetrics =
281                                            unsafe { std::ptr::read(data.as_ptr() as *const _) };
282
283                                        let dropped_packet_metrics = DroppedPacketMetric {
284                                            tgid: nm.tgid,
285                                            process_name: comm_to_string(&nm.comm),
286                                            sk_drops: nm.sk_drops,
287                                            sk_err: nm.sk_err,
288                                            sk_err_soft: nm.sk_err_soft,
289                                            sk_backlog_len: nm.sk_backlog_len as u32,
290                                            sk_wmem_queued: nm.sk_write_memory_queued,
291                                            sk_rcvbuf: nm.sk_receive_buffer_size,
292                                            sk_ack_backlog: nm.sk_ack_backlog,
293                                            timestamp_us: nm.ts_us,
294                                        };
295
296                                        if dropped_packet_metrics.sk_drops > 0 {
297                                            let mut evt = Vec::new();
298                                            info!(
299                                                    "Dropped Packet Metric - tgid: {}, process_name: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_wmem_queued: {}, sk_rcvbuf: {}, sk_ack_backlog: {}, timestamp_us: {}",
300                                                    dropped_packet_metrics.tgid,
301                                                    dropped_packet_metrics.process_name,
302                                                    dropped_packet_metrics.sk_drops,
303                                                    dropped_packet_metrics.sk_err,
304                                                    dropped_packet_metrics.sk_err_soft,
305                                                    dropped_packet_metrics.sk_backlog_len,
306                                                    dropped_packet_metrics.sk_wmem_queued,
307                                                    dropped_packet_metrics.sk_rcvbuf,
308                                                    dropped_packet_metrics.sk_ack_backlog,
309                                                    dropped_packet_metrics.timestamp_us
310                                                );
311                                            evt.push(dropped_packet_metrics.clone());
312                                            let _ = drop_tx.send(Ok(evt)).await;
313                                        }
314                                    } else {
315                                        warn!(
316                                            "Received network metrics data too small: {} bytes",
317                                            data.len()
318                                        );
319                                    }
320                                }
321                            }
322                        }
323                        Err(e) => {
324                            eprintln!("Errore nella lettura network metrics eventi: {}", e);
325                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
326                        }
327                    }
328                }
329                // small delay to avoid cpu congestion
330                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
331            }
332        });
333
334        task::spawn(async move {
335            let mut ts_events_buffer = Vec::new();
336            //scan the cpus to read the data
337            for cpu_id in online_cpus()
338                .map_err(|e| anyhow::anyhow!("Error {:?}", e))
339                .unwrap()
340            {
341                let buf = time_stamp_events_array
342                    .open(cpu_id, None)
343                    .expect("Error during the creation of time stamp events buf structure");
344
345                let buffers = vec![BytesMut::with_capacity(4096); 8];
346                ts_events_buffer.push((buf, buffers));
347            }
348
349            info!("Starting time stamp events listener");
350
351            //send the data through a mpsc channel
352            loop {
353                for (buf, buffers) in ts_events_buffer.iter_mut() {
354                    match buf.read_events(buffers) {
355                        Ok(events) => {
356                            //read the events, this function is similar to the one used in identity/helpers.rs/display_events
357                            if events.read > 0 {
358                                for i in 0..events.read {
359                                    let data = &buffers[i];
360                                    if data.len() >= std::mem::size_of::<TimeStampMetrics>() {
361                                        let tsm: TimeStampMetrics =
362                                            unsafe { std::ptr::read(data.as_ptr() as *const _) };
363                                        let latency_metric = LatencyMetric {
364                                            delta_us: tsm.delta_us,
365                                            timestamp_us: tsm.ts_us,
366                                            tgid: tsm.tgid,
367                                            process_name: comm_to_string(&tsm.comm),
368                                            local_port: tsm.lport as u32,
369                                            remote_port: tsm.dport_be as u32,
370                                            address_family: tsm.af as u32,
371                                            src_address_v4: format_ipv4(tsm.saddr_v4),
372                                            dst_address_v4: format_ipv4(tsm.daddr_v4),
373                                            src_address_v6: format_ipv6(&tsm.saddr_v6),
374                                            dst_address_v6: format_ipv6(&tsm.daddr_v6),
375                                        };
376                                        info!(
377                                            "Latency Metric - tgid: {}, process_name: {}, delta_us: {}, timestamp_us: {}, local_port: {}, remote_port: {}, address_family: {}, src_address_v4: {}, dst_address_v4: {}, src_address_v6: {}, dst_address_v6: {}",
378                                            latency_metric.tgid,
379                                            latency_metric.process_name,
380                                            latency_metric.delta_us,
381                                            latency_metric.timestamp_us,
382                                            latency_metric.local_port,
383                                            latency_metric.remote_port,
384                                            latency_metric.address_family,
385                                            latency_metric.src_address_v4,
386                                            latency_metric.dst_address_v4,
387                                            latency_metric.src_address_v6,
388                                            latency_metric.dst_address_v6
389                                        );
390                                        let mut evt = Vec::new();
391                                        evt.push(latency_metric.clone());
392                                        let _ = lat_tx.send(Ok(evt)).await;
393                                    } else {
394                                        warn!(
395                                            "Received time stamp metrics data too small: {} bytes",
396                                            data.len()
397                                        );
398                                    }
399                                }
400                            }
401                        }
402                        Err(e) => {
403                            eprintln!("Errore nella lettura time stamp eventi: {}", e);
404                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
405                        }
406                    }
407                }
408            }
409        });
410
411        api
412    }
413}
414
415//declare the blocklist hashmap structure
416//TODO: finish the creation of a blocklist hashmap
417#[async_trait]
418impl Agent for AgentApi {
419    // * read the incoming active_connections requests and returns a response with the
420    // * active connections. The data are transformed and sent to the api with a mpsc channel
421    async fn active_connections(
422        &self,
423        request: Request<RequestActiveConnections>,
424    ) -> Result<Response<ActiveConnectionResponse>, Status> {
425        //read request
426        let req = request.into_inner();
427
428        //create the hashmap to process events from the mpsc channel queue
429        let mut aggregated_events: Vec<ConnectionEvent> = Vec::new();
430
431        //aggregate events
432        while let Ok(evt) = self.active_connection_event_rx.lock().unwrap().try_recv() {
433            if let Ok(vec) = evt {
434                aggregated_events.extend(vec);
435            }
436        }
437
438        //log response for debugging
439        info!(
440            "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}",
441            aggregated_events
442        );
443
444        //return response
445        Ok(Response::new(ActiveConnectionResponse {
446            status: "success".to_string(),
447            events: aggregated_events,
448        }))
449    }
450
451    // * creates and add ip to the blocklist
452    async fn add_ip_to_blocklist(
453        &self,
454        request: Request<AddIpToBlocklistRequest>,
455    ) -> Result<Response<BlocklistResponse>, Status> {
456        //read request
457        let req = request.into_inner();
458
459        //open blocklist map
460        let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
461            .expect("cannot open blocklist_map Mapdata");
462        let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata
463        let mut blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
464            ayaHashMap::try_from(blocklist_mapdata).unwrap();
465
466        if req.ip.is_none() {
467            // log blocklist event
468            info!("IP field in request is none");
469            info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
470        } else {
471            // add ip to the blocklist
472            // log blocklist event
473            let datetime = Local::now().to_string();
474            let ip = req.ip.unwrap();
475            //convert ip from string to [u8;4] type and insert into the bpf map
476            let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets();
477            //TODO: convert datetime in a kernel compatible format
478            blocklist_map.insert(u8_4_ip, u8_4_ip, 0);
479            info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
480        }
481        let path = std::env::var(PIN_BLOCKLIST_MAP_PATH)
482            .context("Blocklist map path not found!")
483            .unwrap();
484
485        //convert the maps with a buffer to match the protobuffer types
486        let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
487        for item in blocklist_map.iter() {
488            let (k, v) = item.unwrap();
489            // convert keys and values from [u8;4] to String
490            let key = Ipv4Addr::from(k).to_string();
491            let value = Ipv4Addr::from(k).to_string();
492            converted_blocklist_map.insert(key, value);
493        }
494
495        //save ip into the blocklist
496        Ok(Response::new(BlocklistResponse {
497            status: "success".to_string(),
498            events: converted_blocklist_map,
499        }))
500    }
501
502    async fn check_blocklist(
503        &self,
504        request: Request<()>,
505    ) -> Result<Response<BlocklistResponse>, Status> {
506        info!("Returning blocklist hashmap");
507        //open blocklist map
508        let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
509            .expect("cannot open blocklist_map Mapdata");
510        let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata
511
512        let blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
513            ayaHashMap::try_from(blocklist_mapdata).unwrap();
514
515        //convert the maps with a buffer to match the protobuffer types
516
517        let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
518        for item in blocklist_map.iter() {
519            let (k, v) = item.unwrap();
520            // convert keys and values from [u8;4] to String
521            let key = Ipv4Addr::from(k).to_string();
522            let value = Ipv4Addr::from(k).to_string();
523            converted_blocklist_map.insert(key, value);
524        }
525        Ok(Response::new(BlocklistResponse {
526            status: "success".to_string(),
527            events: converted_blocklist_map,
528        }))
529    }
530    async fn rm_ip_from_blocklist(
531        &self,
532        request: Request<RmIpFromBlocklistRequest>,
533    ) -> Result<Response<RmIpFromBlocklistResponse>, Status> {
534        //read request
535        let req = request.into_inner();
536        info!("Removing ip from blocklist map");
537        //open blocklist map
538        let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
539            .expect("cannot open blocklist_map Mapdata");
540        let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata
541        let mut blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
542            ayaHashMap::try_from(blocklist_mapdata).unwrap();
543        //remove the address
544        let ip_to_remove = req.ip;
545        let u8_4_ip_to_remove = Ipv4Addr::from_str(&ip_to_remove).unwrap().octets();
546        blocklist_map.remove(&u8_4_ip_to_remove);
547
548        //convert the maps with a buffer to match the protobuffer types
549        let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
550        for item in blocklist_map.iter() {
551            let (k, v) = item.unwrap();
552            // convert keys and values from [u8;4] to String
553            let key = Ipv4Addr::from(k).to_string();
554            let value = Ipv4Addr::from(k).to_string();
555            converted_blocklist_map.insert(key, value);
556        }
557
558        Ok(Response::new(RmIpFromBlocklistResponse {
559            status: "Ip removed from blocklist".to_string(),
560            events: converted_blocklist_map,
561        }))
562    }
563
564    async fn get_latency_metrics(
565        &self,
566        request: Request<()>,
567    ) -> Result<Response<LatencyMetricsResponse>, Status> {
568        // Extract the request parameters
569        let req = request.into_inner();
570        info!("Getting latency metrics");
571
572        // Here you would typically query your data source for the latency metrics
573        // For demonstration purposes, we'll return a dummy response
574
575        let mut aggregated_latency_metrics_events: Vec<LatencyMetric> = Vec::new();
576
577        while let Ok(evt) = self.latency_metrics_rx.lock().unwrap().try_recv() {
578            if let Ok(vec) = evt {
579                aggregated_latency_metrics_events.extend(vec);
580            }
581        }
582
583        let total_count = aggregated_latency_metrics_events.len() as u32;
584
585        let (average_latency_us, min_latency_us, max_latency_us) =
586            if !aggregated_latency_metrics_events.is_empty() {
587                let sum: u64 = aggregated_latency_metrics_events
588                    .iter()
589                    .map(|m| m.delta_us)
590                    .sum();
591                let avg = sum as f64 / aggregated_latency_metrics_events.len() as f64;
592
593                let min = aggregated_latency_metrics_events
594                    .iter()
595                    .map(|m| m.delta_us)
596                    .min()
597                    .unwrap_or(0) as f64;
598
599                let max = aggregated_latency_metrics_events
600                    .iter()
601                    .map(|m| m.delta_us)
602                    .max()
603                    .unwrap_or(0) as f64;
604
605                (avg, min, max)
606            } else {
607                (0.0, 0.0, 0.0)
608            };
609
610        info!(
611            "Latency metrics - total_count: {}, average: {:.2}us, min: {:.2}us, max: {:.2}us",
612            total_count, average_latency_us, min_latency_us, max_latency_us
613        );
614
615        let response = LatencyMetricsResponse {
616            status: "success".to_string(),
617            metrics: aggregated_latency_metrics_events,
618            total_count,
619            average_latency_us,
620            max_latency_us,
621            min_latency_us,
622        };
623
624        Ok(Response::new(response))
625    }
626
627    async fn get_dropped_packets_metrics(
628        &self,
629        request: Request<()>,
630    ) -> Result<Response<DroppedPacketsResponse>, Status> {
631        // Extract the request parameters
632        let req = request.into_inner();
633        info!("Getting dropped packets metrics");
634
635        let mut aggregated_dropped_packet_metrics: Vec<DroppedPacketMetric> = Vec::new();
636        let mut total_drops = 0u32;
637
638        // Collect all metrics from channel
639        while let Ok(evt) = self.dropped_packet_metrics_rx.lock().unwrap().try_recv() {
640            if let Ok(vec) = evt {
641                for metric in vec {
642                    total_drops += metric.sk_drops as u32;
643                    aggregated_dropped_packet_metrics.push(metric);
644                }
645            }
646        }
647
648        info!(
649            "Dropped packets metrics - total_metrics: {}, total_drops: {}",
650            aggregated_dropped_packet_metrics.len(),
651            total_drops
652        );
653
654        let response = DroppedPacketsResponse {
655            status: "success".to_string(),
656            metrics: aggregated_dropped_packet_metrics,
657            total_drops,
658        };
659
660        Ok(Response::new(response))
661    }
662}