1#![allow(warnings)]
2use anyhow::Context;
3use chrono::Local;
4use cortexbrain_common::{
5 formatters::{format_ipv4, format_ipv6},
6};
7use prost::bytes::BytesMut;
8use std::{str::FromStr, sync::Arc};
9use std::sync::Mutex;
10use tonic::{Request, Response, Status};
11use tracing::info;
12
13use aya::{
14 maps::{MapData, PerfEventArray},
15 util::online_cpus,
16};
17use std::result::Result::Ok;
18use tonic::async_trait;
19
20use aya::maps::HashMap as ayaHashMap;
21use std::collections::HashMap;
22use tokio::sync::mpsc;
23use tokio::task;
24
25use crate::{
26 agent::{
27 ConnectionEvent, DroppedPacketMetric, DroppedPacketsResponse,
28 LatencyMetric, LatencyMetricsResponse,
29 },
30};
31
32use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics};
33
34use crate::agent::{
36 agent_server::Agent, ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse,
37 RequestActiveConnections, RmIpFromBlocklistRequest, RmIpFromBlocklistResponse,
38};
39use crate::constants::PIN_BLOCKLIST_MAP_PATH;
40
41use crate::helpers::comm_to_string;
42use aya::maps::Map;
43use cortexbrain_common::constants::BPF_PATH;
44use cortexflow_identity::enums::IpProtocols;
45use std::net::Ipv4Addr;
46use tracing::warn;
47
48pub struct AgentApi {
49 active_connection_event_rx: Mutex<mpsc::Receiver<Result<Vec<ConnectionEvent>, Status>>>,
52 active_connection_event_tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
53 latency_metrics_rx: Mutex<mpsc::Receiver<Result<Vec<LatencyMetric>, Status>>>,
54 latency_metrics_tx: mpsc::Sender<Result<Vec<LatencyMetric>, Status>>,
55 dropped_packet_metrics_rx: Mutex<mpsc::Receiver<Result<Vec<DroppedPacketMetric>, Status>>>,
56 dropped_packet_metrics_tx: mpsc::Sender<Result<Vec<DroppedPacketMetric>, Status>>,
57}
58
59#[async_trait]
62pub trait EventSender: Send + Sync + 'static {
63 async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>);
64 async fn send_active_connection_event_map(
65 &self,
66 map: Vec<ConnectionEvent>,
67 tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
68 ) {
69 let status = Status::new(tonic::Code::Ok, "success");
70 let event = Ok(map);
71
72 let _ = tx.send(event).await;
73 }
74
75 async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>);
76 async fn send_latency_metrics_event_map(
77 &self,
78 map: Vec<LatencyMetric>,
79 tx: mpsc::Sender<Result<Vec<LatencyMetric>, Status>>,
80 ) {
81 let status = Status::new(tonic::Code::Ok, "success");
82 let event = Ok(map);
83 let _ = tx.send(event).await;
84 }
85
86 async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>);
87 async fn send_dropped_packet_metrics_event_map(
88 &self,
89 map: Vec<DroppedPacketMetric>,
90 tx: mpsc::Sender<Result<Vec<DroppedPacketMetric>, Status>>,
91 ) {
92 let status = Status::new(tonic::Code::Ok, "success");
93 let event = Ok(map);
94 let _ = tx.send(event).await;
95 }
96
97}
98
99#[async_trait]
101impl EventSender for AgentApi {
102 async fn send_active_connection_event(&self, event: Vec<ConnectionEvent>) {
103 self.send_active_connection_event_map(event, self.active_connection_event_tx.clone())
104 .await;
105 }
106
107 async fn send_latency_metrics_event(&self, event: Vec<LatencyMetric>) {
108 self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone())
109 .await;
110 }
111
112 async fn send_dropped_packet_metrics_event(&self, event: Vec<DroppedPacketMetric>) {
113 self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone())
114 .await;
115 }
116}
117
118impl Default for AgentApi {
121 fn default() -> Self {
123 let active_connection_mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map")
125 .expect("cannot open events_map Mapdata");
126 let active_connection_map = Map::PerfEventArray(active_connection_mapdata); let mut active_connection_events_array = PerfEventArray::try_from(active_connection_map)
129 .expect("Error while initializing events array");
130
131 let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/net_metrics")
133 .expect("cannot open net_metrics Mapdata");
134 let network_metrics_map = Map::PerfEventArray(network_metrics_mapdata); let mut network_metrics_events_array = PerfEventArray::try_from(network_metrics_map)
136 .expect("Error while initializing network metrics array");
137
138 let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/time_stamp_events")
140 .expect("cannot open time_stamp_events Mapdata");
141 let time_stamp_events_map = Map::PerfEventArray(time_stamp_events_mapdata); let mut time_stamp_events_array = PerfEventArray::try_from(time_stamp_events_map)
143 .expect("Error while initializing time stamp events array");
144
145 let (conn_tx, conn_rx) = mpsc::channel(1024);
147 let (lat_tx, lat_rx) = mpsc::channel(2048);
148 let (drop_tx, drop_rx) = mpsc::channel(2048);
149
150 let api = AgentApi {
151 active_connection_event_rx: conn_rx.into(),
152 active_connection_event_tx: conn_tx.clone(),
153 latency_metrics_rx: Mutex::new(lat_rx),
154 latency_metrics_tx: lat_tx.clone(),
155 dropped_packet_metrics_rx: Mutex::new(drop_rx),
156 dropped_packet_metrics_tx: drop_tx.clone(),
157 };
158
159 task::spawn(async move {
163 let mut net_events_buffer = Vec::new();
164 for cpu_id in online_cpus()
167 .map_err(|e| anyhow::anyhow!("Error {:?}", e))
168 .unwrap()
169 {
170 let buf = active_connection_events_array
171 .open(cpu_id, None)
172 .expect("Error during the creation of net_events_buf structure");
173
174 let buffers = vec![BytesMut::with_capacity(4096); 8];
175 net_events_buffer.push((buf, buffers));
176 }
177
178 info!("Starting event listener");
179 loop {
181 for (buf, buffers) in net_events_buffer.iter_mut() {
182 match buf.read_events(buffers) {
183 Ok(events) => {
184 if events.read > 0 {
186 for i in 0..events.read {
187 let data = &buffers[i];
188 if data.len() >= std::mem::size_of::<PacketLog>() {
189 let pl: PacketLog =
190 unsafe { std::ptr::read(data.as_ptr() as *const _) };
191 let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
192 let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
193 let src_port = u16::from_be(pl.src_port as u16);
194 let dst_port = u16::from_be(pl.dst_port as u16);
195 let event_id = pl.pid;
196
197 match IpProtocols::try_from(pl.proto) {
198 Ok(proto) => {
199 info!(
200 "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
201 event_id,
202 proto,
203 src,
204 src_port,
205 dst,
206 dst_port
207 );
208 info!("creating vector for the aggregated data");
209 let mut evt = Vec::new();
210 info!("Inserting events into the vector");
212 evt.push(ConnectionEvent {
216 event_id: event_id.to_string(),
217 src_ip_port: format!("{}:{}", src, src_port),
218 dst_ip_port: format!("{}:{}", dst, dst_port),
219 });
220 info!("sending events to the MPSC channel");
221 let _ = conn_tx.send(Ok(evt)).await;
222 }
223 Err(_) => {
224 info!(
225 "Event Id: {} Protocol: Unknown ({})",
226 event_id, pl.proto
227 );
228 }
229 };
230 } else {
231 warn!(
232 "Received packet data too small: {} bytes",
233 data.len()
234 );
235 }
236 }
237 } else if events.read == 0 {
238 info!("[Agent/API] 0 Events found");
239 }
240 }
241 Err(e) => {
242 eprintln!("Errore nella lettura eventi: {}", e);
243 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
244 }
245 }
246 }
247 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
249 }
250 });
251
252 task::spawn(async move {
253 let mut net_metrics_buffer = Vec::new();
254
255 for cpu_id in online_cpus()
257 .map_err(|e| anyhow::anyhow!("Error {:?}", e))
258 .unwrap()
259 {
260 let buf = network_metrics_events_array
261 .open(cpu_id, None)
262 .expect("Error during the creation of net_metrics_buf structure");
263
264 let buffers = vec![BytesMut::with_capacity(4096); 8];
265 net_metrics_buffer.push((buf, buffers));
266 }
267
268 info!("Starting network metrics listener");
269
270 loop {
272 for (buf, buffers) in net_metrics_buffer.iter_mut() {
273 match buf.read_events(buffers) {
274 Ok(events) => {
275 if events.read > 0 {
277 for i in 0..events.read {
278 let data = &buffers[i];
279 if data.len() >= std::mem::size_of::<NetworkMetrics>() {
280 let nm: NetworkMetrics =
281 unsafe { std::ptr::read(data.as_ptr() as *const _) };
282
283 let dropped_packet_metrics = DroppedPacketMetric {
284 tgid: nm.tgid,
285 process_name: comm_to_string(&nm.comm),
286 sk_drops: nm.sk_drops,
287 sk_err: nm.sk_err,
288 sk_err_soft: nm.sk_err_soft,
289 sk_backlog_len: nm.sk_backlog_len as u32,
290 sk_wmem_queued: nm.sk_write_memory_queued,
291 sk_rcvbuf: nm.sk_receive_buffer_size,
292 sk_ack_backlog: nm.sk_ack_backlog,
293 timestamp_us: nm.ts_us,
294 };
295
296 if dropped_packet_metrics.sk_drops > 0 {
297 let mut evt = Vec::new();
298 info!(
299 "Dropped Packet Metric - tgid: {}, process_name: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_wmem_queued: {}, sk_rcvbuf: {}, sk_ack_backlog: {}, timestamp_us: {}",
300 dropped_packet_metrics.tgid,
301 dropped_packet_metrics.process_name,
302 dropped_packet_metrics.sk_drops,
303 dropped_packet_metrics.sk_err,
304 dropped_packet_metrics.sk_err_soft,
305 dropped_packet_metrics.sk_backlog_len,
306 dropped_packet_metrics.sk_wmem_queued,
307 dropped_packet_metrics.sk_rcvbuf,
308 dropped_packet_metrics.sk_ack_backlog,
309 dropped_packet_metrics.timestamp_us
310 );
311 evt.push(dropped_packet_metrics.clone());
312 let _ = drop_tx.send(Ok(evt)).await;
313 }
314 } else {
315 warn!(
316 "Received network metrics data too small: {} bytes",
317 data.len()
318 );
319 }
320 }
321 }
322 }
323 Err(e) => {
324 eprintln!("Errore nella lettura network metrics eventi: {}", e);
325 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
326 }
327 }
328 }
329 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
331 }
332 });
333
334 task::spawn(async move {
335 let mut ts_events_buffer = Vec::new();
336 for cpu_id in online_cpus()
338 .map_err(|e| anyhow::anyhow!("Error {:?}", e))
339 .unwrap()
340 {
341 let buf = time_stamp_events_array
342 .open(cpu_id, None)
343 .expect("Error during the creation of time stamp events buf structure");
344
345 let buffers = vec![BytesMut::with_capacity(4096); 8];
346 ts_events_buffer.push((buf, buffers));
347 }
348
349 info!("Starting time stamp events listener");
350
351 loop {
353 for (buf, buffers) in ts_events_buffer.iter_mut() {
354 match buf.read_events(buffers) {
355 Ok(events) => {
356 if events.read > 0 {
358 for i in 0..events.read {
359 let data = &buffers[i];
360 if data.len() >= std::mem::size_of::<TimeStampMetrics>() {
361 let tsm: TimeStampMetrics =
362 unsafe { std::ptr::read(data.as_ptr() as *const _) };
363 let latency_metric = LatencyMetric {
364 delta_us: tsm.delta_us,
365 timestamp_us: tsm.ts_us,
366 tgid: tsm.tgid,
367 process_name: comm_to_string(&tsm.comm),
368 local_port: tsm.lport as u32,
369 remote_port: tsm.dport_be as u32,
370 address_family: tsm.af as u32,
371 src_address_v4: format_ipv4(tsm.saddr_v4),
372 dst_address_v4: format_ipv4(tsm.daddr_v4),
373 src_address_v6: format_ipv6(&tsm.saddr_v6),
374 dst_address_v6: format_ipv6(&tsm.daddr_v6),
375 };
376 info!(
377 "Latency Metric - tgid: {}, process_name: {}, delta_us: {}, timestamp_us: {}, local_port: {}, remote_port: {}, address_family: {}, src_address_v4: {}, dst_address_v4: {}, src_address_v6: {}, dst_address_v6: {}",
378 latency_metric.tgid,
379 latency_metric.process_name,
380 latency_metric.delta_us,
381 latency_metric.timestamp_us,
382 latency_metric.local_port,
383 latency_metric.remote_port,
384 latency_metric.address_family,
385 latency_metric.src_address_v4,
386 latency_metric.dst_address_v4,
387 latency_metric.src_address_v6,
388 latency_metric.dst_address_v6
389 );
390 let mut evt = Vec::new();
391 evt.push(latency_metric.clone());
392 let _ = lat_tx.send(Ok(evt)).await;
393 } else {
394 warn!(
395 "Received time stamp metrics data too small: {} bytes",
396 data.len()
397 );
398 }
399 }
400 }
401 }
402 Err(e) => {
403 eprintln!("Errore nella lettura time stamp eventi: {}", e);
404 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
405 }
406 }
407 }
408 }
409 });
410
411 api
412 }
413}
414
415#[async_trait]
418impl Agent for AgentApi {
419 async fn active_connections(
422 &self,
423 request: Request<RequestActiveConnections>,
424 ) -> Result<Response<ActiveConnectionResponse>, Status> {
425 let req = request.into_inner();
427
428 let mut aggregated_events: Vec<ConnectionEvent> = Vec::new();
430
431 while let Ok(evt) = self.active_connection_event_rx.lock().unwrap().try_recv() {
433 if let Ok(vec) = evt {
434 aggregated_events.extend(vec);
435 }
436 }
437
438 info!(
440 "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}",
441 aggregated_events
442 );
443
444 Ok(Response::new(ActiveConnectionResponse {
446 status: "success".to_string(),
447 events: aggregated_events,
448 }))
449 }
450
451 async fn add_ip_to_blocklist(
453 &self,
454 request: Request<AddIpToBlocklistRequest>,
455 ) -> Result<Response<BlocklistResponse>, Status> {
456 let req = request.into_inner();
458
459 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
461 .expect("cannot open blocklist_map Mapdata");
462 let blocklist_mapdata = Map::HashMap(mapdata); let mut blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
464 ayaHashMap::try_from(blocklist_mapdata).unwrap();
465
466 if req.ip.is_none() {
467 info!("IP field in request is none");
469 info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
470 } else {
471 let datetime = Local::now().to_string();
474 let ip = req.ip.unwrap();
475 let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets();
477 blocklist_map.insert(u8_4_ip, u8_4_ip, 0);
479 info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
480 }
481 let path = std::env::var(PIN_BLOCKLIST_MAP_PATH)
482 .context("Blocklist map path not found!")
483 .unwrap();
484
485 let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
487 for item in blocklist_map.iter() {
488 let (k, v) = item.unwrap();
489 let key = Ipv4Addr::from(k).to_string();
491 let value = Ipv4Addr::from(k).to_string();
492 converted_blocklist_map.insert(key, value);
493 }
494
495 Ok(Response::new(BlocklistResponse {
497 status: "success".to_string(),
498 events: converted_blocklist_map,
499 }))
500 }
501
502 async fn check_blocklist(
503 &self,
504 request: Request<()>,
505 ) -> Result<Response<BlocklistResponse>, Status> {
506 info!("Returning blocklist hashmap");
507 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
509 .expect("cannot open blocklist_map Mapdata");
510 let blocklist_mapdata = Map::HashMap(mapdata); let blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
513 ayaHashMap::try_from(blocklist_mapdata).unwrap();
514
515 let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
518 for item in blocklist_map.iter() {
519 let (k, v) = item.unwrap();
520 let key = Ipv4Addr::from(k).to_string();
522 let value = Ipv4Addr::from(k).to_string();
523 converted_blocklist_map.insert(key, value);
524 }
525 Ok(Response::new(BlocklistResponse {
526 status: "success".to_string(),
527 events: converted_blocklist_map,
528 }))
529 }
530 async fn rm_ip_from_blocklist(
531 &self,
532 request: Request<RmIpFromBlocklistRequest>,
533 ) -> Result<Response<RmIpFromBlocklistResponse>, Status> {
534 let req = request.into_inner();
536 info!("Removing ip from blocklist map");
537 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map")
539 .expect("cannot open blocklist_map Mapdata");
540 let blocklist_mapdata = Map::HashMap(mapdata); let mut blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> =
542 ayaHashMap::try_from(blocklist_mapdata).unwrap();
543 let ip_to_remove = req.ip;
545 let u8_4_ip_to_remove = Ipv4Addr::from_str(&ip_to_remove).unwrap().octets();
546 blocklist_map.remove(&u8_4_ip_to_remove);
547
548 let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
550 for item in blocklist_map.iter() {
551 let (k, v) = item.unwrap();
552 let key = Ipv4Addr::from(k).to_string();
554 let value = Ipv4Addr::from(k).to_string();
555 converted_blocklist_map.insert(key, value);
556 }
557
558 Ok(Response::new(RmIpFromBlocklistResponse {
559 status: "Ip removed from blocklist".to_string(),
560 events: converted_blocklist_map,
561 }))
562 }
563
564 async fn get_latency_metrics(
565 &self,
566 request: Request<()>,
567 ) -> Result<Response<LatencyMetricsResponse>, Status> {
568 let req = request.into_inner();
570 info!("Getting latency metrics");
571
572 let mut aggregated_latency_metrics_events: Vec<LatencyMetric> = Vec::new();
576
577 while let Ok(evt) = self.latency_metrics_rx.lock().unwrap().try_recv() {
578 if let Ok(vec) = evt {
579 aggregated_latency_metrics_events.extend(vec);
580 }
581 }
582
583 let total_count = aggregated_latency_metrics_events.len() as u32;
584
585 let (average_latency_us, min_latency_us, max_latency_us) =
586 if !aggregated_latency_metrics_events.is_empty() {
587 let sum: u64 = aggregated_latency_metrics_events
588 .iter()
589 .map(|m| m.delta_us)
590 .sum();
591 let avg = sum as f64 / aggregated_latency_metrics_events.len() as f64;
592
593 let min = aggregated_latency_metrics_events
594 .iter()
595 .map(|m| m.delta_us)
596 .min()
597 .unwrap_or(0) as f64;
598
599 let max = aggregated_latency_metrics_events
600 .iter()
601 .map(|m| m.delta_us)
602 .max()
603 .unwrap_or(0) as f64;
604
605 (avg, min, max)
606 } else {
607 (0.0, 0.0, 0.0)
608 };
609
610 info!(
611 "Latency metrics - total_count: {}, average: {:.2}us, min: {:.2}us, max: {:.2}us",
612 total_count, average_latency_us, min_latency_us, max_latency_us
613 );
614
615 let response = LatencyMetricsResponse {
616 status: "success".to_string(),
617 metrics: aggregated_latency_metrics_events,
618 total_count,
619 average_latency_us,
620 max_latency_us,
621 min_latency_us,
622 };
623
624 Ok(Response::new(response))
625 }
626
627 async fn get_dropped_packets_metrics(
628 &self,
629 request: Request<()>,
630 ) -> Result<Response<DroppedPacketsResponse>, Status> {
631 let req = request.into_inner();
633 info!("Getting dropped packets metrics");
634
635 let mut aggregated_dropped_packet_metrics: Vec<DroppedPacketMetric> = Vec::new();
636 let mut total_drops = 0u32;
637
638 while let Ok(evt) = self.dropped_packet_metrics_rx.lock().unwrap().try_recv() {
640 if let Ok(vec) = evt {
641 for metric in vec {
642 total_drops += metric.sk_drops as u32;
643 aggregated_dropped_packet_metrics.push(metric);
644 }
645 }
646 }
647
648 info!(
649 "Dropped packets metrics - total_metrics: {}, total_drops: {}",
650 aggregated_dropped_packet_metrics.len(),
651 total_drops
652 );
653
654 let response = DroppedPacketsResponse {
655 status: "success".to_string(),
656 metrics: aggregated_dropped_packet_metrics,
657 total_drops,
658 };
659
660 Ok(Response::new(response))
661 }
662}