1#![allow(warnings)]
2use prost::bytes::BytesMut;
3use std::sync::Mutex;
4use tonic::{Request, Response, Status};
5use tracing::info;
6
7use aya::{
8 maps::{MapData, PerfEventArray},
9 util::online_cpus,
10};
11use std::result::Result::Ok;
12use tonic::async_trait;
13
14use std::collections::HashMap;
15use tokio::sync::mpsc;
16use tokio::task;
17
18use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections};
20use aya::maps::Map;
21use bytemuck_derive::Zeroable;
22use cortexflow_identity::enums::IpProtocols;
23use std::net::Ipv4Addr;
24use tracing::warn;
25
26#[repr(C)]
27#[derive(Clone, Copy, Zeroable)]
28pub struct PacketLog {
29 pub proto: u8,
30 pub src_ip: u32,
31 pub src_port: u16,
32 pub dst_ip: u32,
33 pub dst_port: u16,
34 pub pid: u32,
35}
36unsafe impl aya::Pod for PacketLog {}
37
38pub struct AgentApi {
39 event_rx: Mutex<mpsc::Receiver<Result<HashMap<String, String>, Status>>>,
42 event_tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
43}
44
45#[async_trait]
48pub trait EventSender: Send + Sync + 'static {
49 async fn send_event(&self, event: HashMap<String, String>);
50 async fn send_map(
51 &self,
52 map: HashMap<String, String>,
53 tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
54 ) {
55 let status = Status::new(tonic::Code::Ok, "success");
56 let event = Ok(map);
57
58 let _ = tx.send(event).await;
59 }
60}
61#[async_trait]
63impl EventSender for AgentApi {
64 async fn send_event(&self, event: HashMap<String, String>) {
65 self.send_map(event, self.event_tx.clone()).await;
66 }
67}
68
69const BPF_PATH: &str = "BPF_PATH";
70
71impl Default for AgentApi {
74 fn default() -> Self {
76 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map")
78 .expect("cannot open events_map Mapdata");
79 let map = Map::PerfEventArray(mapdata); let (tx, rx) = mpsc::channel(1024);
83 let api = AgentApi {
84 event_rx: rx.into(),
85 event_tx: tx.clone(),
86 };
87
88 let mut events_array =
89 PerfEventArray::try_from(map).expect("Error while initializing events array");
90
91 task::spawn(async move {
93 let mut net_events_buffer = Vec::new();
94 for cpu_id in online_cpus()
97 .map_err(|e| anyhow::anyhow!("Error {:?}", e))
98 .unwrap()
99 {
100 let buf = events_array
101 .open(cpu_id, None)
102 .expect("Error during the creation of net_events_buf structure");
103
104 let buffers = vec![BytesMut::with_capacity(4096); 8];
105 net_events_buffer.push((buf, buffers));
106 }
107
108 info!("Starting event listener");
109 loop {
111 for (buf, buffers) in net_events_buffer.iter_mut() {
112 match buf.read_events(buffers) {
113 Ok(events) => {
114 if events.read > 0 {
116 for i in 0..events.read {
117 let data = &buffers[i];
118 if data.len() >= std::mem::size_of::<PacketLog>() {
119 let pl: PacketLog =
120 unsafe { std::ptr::read(data.as_ptr() as *const _) };
121 let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
122 let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
123 let src_port = u16::from_be(pl.src_port as u16);
124 let dst_port = u16::from_be(pl.dst_port as u16);
125 let event_id = pl.pid;
126
127 match IpProtocols::try_from(pl.proto) {
128 Ok(proto) => {
129 info!(
130 "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
131 event_id, proto, src, src_port, dst, dst_port
132 );
133 info!("creating hashmap for the aggregated data");
134 let mut evt = HashMap::new();
135 info!("Inserting events into the hashmap");
137 evt.insert(
141 format!("{:?}", event_id.to_string()),
142 format!("{:?}", src.to_string()),
143 );
144 info!("sending events to the MPSC channel");
145 let _ = tx.send(Ok(evt)).await;
146 }
147 Err(_) => {
148 info!(
149 "Event Id: {} Protocol: Unknown ({})",
150 event_id, pl.proto
151 );
152 }
153 };
154 } else {
155 warn!(
156 "Received packet data too small: {} bytes",
157 data.len()
158 );
159 }
160 }
161 } else if events.read == 0 {
162 info!("[Agent/API] 0 Events found");
163 let mut evt = HashMap::new();
164 evt.insert("0".to_string(), "0".to_string());
165 let _ = tx.send(Ok(evt)).await;
166 }
167 }
168 Err(e) => {
169 eprintln!("Errore nella lettura eventi: {}", e);
170 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
171 }
172 }
173 }
174 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
176 }
177 });
178
179 api
180 }
181}
182
183#[async_trait]
184impl Agent for AgentApi {
185 async fn active_connections(
188 &self,
189 request: Request<RequestActiveConnections>,
190 ) -> Result<Response<ActiveConnectionResponse>, Status> {
191 let req = request.into_inner();
193
194 let mut aggregated_events: HashMap<String, String> = HashMap::new();
196
197 while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() {
199 if let Ok(map) = evt {
200 aggregated_events.extend(map);
201 }
202 }
203
204 info!(
206 "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}",
207 aggregated_events
208 );
209
210 Ok(Response::new(ActiveConnectionResponse {
212 status: "success".to_string(),
213 events: aggregated_events,
214 }))
215 }
216}