1#![allow(warnings)]
2use anyhow::Context;
3use chrono::Local;
4use prost::bytes::BytesMut;
5use std::str::FromStr;
6use std::{sync::Mutex };
7use tonic::{ Request, Response, Status };
8use tracing::info;
9
10use aya::{ maps::{ MapData, PerfEventArray }, util::online_cpus };
11use std::result::Result::Ok;
12use tonic::async_trait;
13
14use std::collections::HashMap;
15use aya::maps::HashMap as ayaHashMap;
16use tokio::sync::mpsc;
17use tokio::task;
18
19use crate::agent::{
21 agent_server::Agent,
22 ActiveConnectionResponse,
23 RequestActiveConnections,
24 AddIpToBlocklistRequest,
25 BlocklistResponse,
26};
27use aya::maps::Map;
28use bytemuck_derive::Zeroable;
29use cortexflow_identity::enums::IpProtocols;
30use std::net::Ipv4Addr;
31use tracing::warn;
32
33#[repr(C)]
34#[derive(Clone, Copy, Zeroable)]
35pub struct PacketLog {
36 pub proto: u8,
37 pub src_ip: u32,
38 pub src_port: u16,
39 pub dst_ip: u32,
40 pub dst_port: u16,
41 pub pid: u32,
42}
43unsafe impl aya::Pod for PacketLog {}
44
45pub struct AgentApi {
46 event_rx: Mutex<mpsc::Receiver<Result<HashMap<String, String>, Status>>>,
49 event_tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
50}
51
52#[async_trait]
55pub trait EventSender: Send + Sync + 'static {
56 async fn send_event(&self, event: HashMap<String, String>);
57 async fn send_map(
58 &self,
59 map: HashMap<String, String>,
60 tx: mpsc::Sender<Result<HashMap<String, String>, Status>>
61 ) {
62 let status = Status::new(tonic::Code::Ok, "success");
63 let event = Ok(map);
64
65 let _ = tx.send(event).await;
66 }
67}
68#[async_trait]
70impl EventSender for AgentApi {
71 async fn send_event(&self, event: HashMap<String, String>) {
72 self.send_map(event, self.event_tx.clone()).await;
73 }
74}
75
76const BPF_PATH: &str = "BPF_PATH";
77const PIN_BLOCKLIST_MAP_PATH: &str = "PIN_BLOCKLIST_MAP_PATH";
78
79impl Default for AgentApi {
82 fn default() -> Self {
84 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map").expect(
86 "cannot open events_map Mapdata"
87 );
88 let map = Map::PerfEventArray(mapdata); let (tx, rx) = mpsc::channel(1024);
92 let api = AgentApi {
93 event_rx: rx.into(),
94 event_tx: tx.clone(),
95 };
96
97 let mut events_array = PerfEventArray::try_from(map).expect(
98 "Error while initializing events array"
99 );
100
101 task::spawn(async move {
103 let mut net_events_buffer = Vec::new();
104 for cpu_id in online_cpus()
107 .map_err(|e| anyhow::anyhow!("Error {:?}", e))
108 .unwrap() {
109 let buf = events_array
110 .open(cpu_id, None)
111 .expect("Error during the creation of net_events_buf structure");
112
113 let buffers = vec![BytesMut::with_capacity(4096); 8];
114 net_events_buffer.push((buf, buffers));
115 }
116
117 info!("Starting event listener");
118 loop {
120 for (buf, buffers) in net_events_buffer.iter_mut() {
121 match buf.read_events(buffers) {
122 Ok(events) => {
123 if events.read > 0 {
125 for i in 0..events.read {
126 let data = &buffers[i];
127 if data.len() >= std::mem::size_of::<PacketLog>() {
128 let pl: PacketLog = unsafe {
129 std::ptr::read(data.as_ptr() as *const _)
130 };
131 let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
132 let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
133 let src_port = u16::from_be(pl.src_port as u16);
134 let dst_port = u16::from_be(pl.dst_port as u16);
135 let event_id = pl.pid;
136
137 match IpProtocols::try_from(pl.proto) {
138 Ok(proto) => {
139 info!(
140 "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
141 event_id,
142 proto,
143 src,
144 src_port,
145 dst,
146 dst_port
147 );
148 info!("creating hashmap for the aggregated data");
149 let mut evt = HashMap::new();
150 info!("Inserting events into the hashmap");
152 evt.insert(
156 format!("{:?}", src.to_string()),
157 format!("{:?}", event_id.to_string())
158 );
159 info!("sending events to the MPSC channel");
160 let _ = tx.send(Ok(evt)).await;
161 }
162 Err(_) => {
163 info!(
164 "Event Id: {} Protocol: Unknown ({})",
165 event_id,
166 pl.proto
167 );
168 }
169 };
170 } else {
171 warn!(
172 "Received packet data too small: {} bytes",
173 data.len()
174 );
175 }
176 }
177 } else if events.read == 0 {
178 info!("[Agent/API] 0 Events found");
179 let mut evt = HashMap::new();
180 evt.insert("0".to_string(), "0".to_string());
181 let _ = tx.send(Ok(evt)).await;
182 }
183 }
184 Err(e) => {
185 eprintln!("Errore nella lettura eventi: {}", e);
186 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
187 }
188 }
189 }
190 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
192 }
193 });
194
195 api
196 }
197}
198
199#[async_trait]
202impl Agent for AgentApi {
203 async fn active_connections(
206 &self,
207 request: Request<RequestActiveConnections>
208 ) -> Result<Response<ActiveConnectionResponse>, Status> {
209 let req = request.into_inner();
211
212 let mut aggregated_events: HashMap<String, String> = HashMap::new();
214
215 while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() {
217 if let Ok(map) = evt {
218 aggregated_events.extend(map);
219 }
220 }
221
222 let block_list = "135.171.168.192".to_string();
226 if aggregated_events.contains_key(&block_list) {
227 aggregated_events.remove(&block_list);
228 info!("Blocked ip from block_list: {:?}", block_list);
229 }
230
231 info!("DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}", aggregated_events);
233
234 Ok(
236 Response::new(ActiveConnectionResponse {
237 status: "success".to_string(),
238 events: aggregated_events,
239 })
240 )
241 }
242
243 async fn add_ip_to_blocklist(
245 &self,
246 request: Request<AddIpToBlocklistRequest>
247 ) -> Result<Response<BlocklistResponse>, Status> {
248 let req = request.into_inner();
250
251 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect(
253 "cannot open blocklist_map Mapdata"
254 );
255 let blocklist_mapdata = Map::HashMap(mapdata); let mut blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> = ayaHashMap
257 ::try_from(blocklist_mapdata)
258 .unwrap();
259
260 if req.ip.is_none() {
261 info!("IP field in request is none");
263 info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
264 } else {
265 let datetime = Local::now().to_string();
268 let ip = req.ip.unwrap();
269 let u8_4_ip = Ipv4Addr::from_str(&ip).unwrap().octets();
271 blocklist_map.insert(u8_4_ip, u8_4_ip, 0);
273 info!("CURRENT BLOCKLIST: {:?}", blocklist_map);
274 }
275 let path = std::env
276 ::var(PIN_BLOCKLIST_MAP_PATH)
277 .context("Blocklist map path not found!")
278 .unwrap();
279
280 let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
282 for item in blocklist_map.iter() {
283 let (k, v) = item.unwrap();
284 let key = String::from_utf8(k.to_vec()).unwrap();
286 let value = String::from_utf8(v.to_vec()).unwrap();
287 converted_blocklist_map.insert(key, value);
288 }
289
290 Ok(
292 Response::new(BlocklistResponse {
293 status: "success".to_string(),
294 events: converted_blocklist_map,
295 })
296 )
297 }
298
299 async fn check_blocklist(
300 &self,
301 request: Request<()>
302 ) -> Result<Response<BlocklistResponse>, Status> {
303 info!("Returning blocklist hashmap");
304 let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect(
306 "cannot open blocklist_map Mapdata"
307 );
308 let blocklist_mapdata = Map::HashMap(mapdata); let blocklist_map: ayaHashMap<MapData, [u8; 4], [u8; 4]> = ayaHashMap
311 ::try_from(blocklist_mapdata)
312 .unwrap();
313
314 let mut converted_blocklist_map: HashMap<String, String> = HashMap::new();
317 for item in blocklist_map.iter() {
318 let (k, v) = item.unwrap();
319 let key = String::from_utf8(k.to_vec()).unwrap();
321 let value = String::from_utf8(v.to_vec()).unwrap();
322 converted_blocklist_map.insert(key, value);
323 }
324 Ok(
325 Response::new(BlocklistResponse {
326 status: "success".to_string(),
327 events: converted_blocklist_map,
328 })
329 )
330 }
331}