agent_api/
api.rs

1#![allow(warnings)]
2use prost::bytes::BytesMut;
3use std::sync::Mutex;
4use tonic::{Request, Response, Status};
5use tracing::info;
6
7use aya::{
8    maps::{MapData, PerfEventArray},
9    util::online_cpus,
10};
11use std::result::Result::Ok;
12use tonic::async_trait;
13
14use std::collections::HashMap;
15use tokio::sync::mpsc;
16use tokio::task;
17
18// *  contains agent api configuration
19use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections};
20use aya::maps::Map;
21use bytemuck_derive::Zeroable;
22use cortexflow_identity::enums::IpProtocols;
23use std::net::Ipv4Addr;
24use tracing::warn;
25
26#[repr(C)]
27#[derive(Clone, Copy, Zeroable)]
28pub struct PacketLog {
29    pub proto: u8,
30    pub src_ip: u32,
31    pub src_port: u16,
32    pub dst_ip: u32,
33    pub dst_port: u16,
34    pub pid: u32,
35}
36unsafe impl aya::Pod for PacketLog {}
37
38pub struct AgentApi {
39    //* event_rx is an istance of a mpsc receiver.
40    //* is used to receive the data from the transmitter (tx)
41    event_rx: Mutex<mpsc::Receiver<Result<HashMap<String, String>, Status>>>,
42    event_tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
43}
44
45//* Event sender trait. Takes an event from a map and send that to the mpsc channel
46//* using the send_map function
47#[async_trait]
48pub trait EventSender: Send + Sync + 'static {
49    async fn send_event(&self, event: HashMap<String, String>);
50    async fn send_map(
51        &self,
52        map: HashMap<String, String>,
53        tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
54    ) {
55        let status = Status::new(tonic::Code::Ok, "success");
56        let event = Ok(map);
57
58        let _ = tx.send(event).await;
59    }
60}
61// send event function. takes an HashMap and send that using mpsc event_tx
62#[async_trait]
63impl EventSender for AgentApi {
64    async fn send_event(&self, event: HashMap<String, String>) {
65        self.send_map(event, self.event_tx.clone()).await;
66    }
67}
68
69const BPF_PATH: &str = "BPF_PATH";
70
71//initialize a default trait for AgentApi. Loads a name and a bpf istance.
72//this trait is essential for init the Agent.
73impl Default for AgentApi {
74    //TODO:this part needs a better error handling
75    fn default() -> Self {
76        // load maps mapdata
77        let mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map")
78            .expect("cannot open events_map Mapdata");
79        let map = Map::PerfEventArray(mapdata); //creates a PerfEventArray from the mapdata
80
81        //init a mpsc channel
82        let (tx, rx) = mpsc::channel(1024);
83        let api = AgentApi {
84            event_rx: rx.into(),
85            event_tx: tx.clone(),
86        };
87
88        let mut events_array =
89            PerfEventArray::try_from(map).expect("Error while initializing events array");
90
91        //spawn an event reader
92        task::spawn(async move {
93            let mut net_events_buffer = Vec::new();
94            //scan the cpus to read the data
95
96            for cpu_id in online_cpus()
97                .map_err(|e| anyhow::anyhow!("Error {:?}", e))
98                .unwrap()
99            {
100                let buf = events_array
101                    .open(cpu_id, None)
102                    .expect("Error during the creation of net_events_buf structure");
103
104                let buffers = vec![BytesMut::with_capacity(4096); 8];
105                net_events_buffer.push((buf, buffers));
106            }
107
108            info!("Starting event listener");
109            //send the data through a mpsc channel
110            loop {
111                for (buf, buffers) in net_events_buffer.iter_mut() {
112                    match buf.read_events(buffers) {
113                        Ok(events) => {
114                            //read the events, this function is similar to the one used in identity/helpers.rs/display_events
115                            if events.read > 0 {
116                                for i in 0..events.read {
117                                    let data = &buffers[i];
118                                    if data.len() >= std::mem::size_of::<PacketLog>() {
119                                        let pl: PacketLog =
120                                            unsafe { std::ptr::read(data.as_ptr() as *const _) };
121                                        let src = Ipv4Addr::from(u32::from_be(pl.src_ip));
122                                        let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip));
123                                        let src_port = u16::from_be(pl.src_port as u16);
124                                        let dst_port = u16::from_be(pl.dst_port as u16);
125                                        let event_id = pl.pid;
126
127                                        match IpProtocols::try_from(pl.proto) {
128                                            Ok(proto) => {
129                                                info!(
130                                            "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
131                                            event_id, proto, src, src_port, dst, dst_port
132                                        );
133                                                info!("creating hashmap for the aggregated data");
134                                                let mut evt = HashMap::new();
135                                                // insert event in the hashmap
136                                                info!("Inserting events into the hashmap");
137                                                //TODO: use a Arc<str> or Box<str> type instead of String type.
138                                                //The data doesn't need to implement any .copy() or .clone() trait
139                                                // using an Arc<str> type will also waste less resources
140                                                evt.insert(
141                                                    format!("{:?}", event_id.to_string()),
142                                                    format!("{:?}", src.to_string()),
143                                                );
144                                                info!("sending events to the MPSC channel");
145                                                let _ = tx.send(Ok(evt)).await;
146                                            }
147                                            Err(_) => {
148                                                info!(
149                                                    "Event Id: {} Protocol: Unknown ({})",
150                                                    event_id, pl.proto
151                                                );
152                                            }
153                                        };
154                                    } else {
155                                        warn!(
156                                            "Received packet data too small: {} bytes",
157                                            data.len()
158                                        );
159                                    }
160                                }
161                            } else if events.read == 0 {
162                                info!("[Agent/API] 0 Events found");
163                                let mut evt = HashMap::new();
164                                evt.insert("0".to_string(), "0".to_string());
165                                let _ = tx.send(Ok(evt)).await;
166                            }
167                        }
168                        Err(e) => {
169                            eprintln!("Errore nella lettura eventi: {}", e);
170                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
171                        }
172                    }
173                }
174                // small delay to avoid cpu congestion
175                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
176            }
177        });
178
179        api
180    }
181}
182
183#[async_trait]
184impl Agent for AgentApi {
185    // * read the incoming active_connections requests and returns a response with the
186    // * active connections. The data are transformed and sent to the api with a mpsc channel
187    async fn active_connections(
188        &self,
189        request: Request<RequestActiveConnections>,
190    ) -> Result<Response<ActiveConnectionResponse>, Status> {
191        //read request
192        let req = request.into_inner();
193
194        //create the hashmap to process events from the mpsc channel queue
195        let mut aggregated_events: HashMap<String, String> = HashMap::new();
196
197        //aggregate events
198        while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() {
199            if let Ok(map) = evt {
200                aggregated_events.extend(map);
201            }
202        }
203
204        //log response for debugging
205        info!(
206            "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}",
207            aggregated_events
208        );
209
210        //return response
211        Ok(Response::new(ActiveConnectionResponse {
212            status: "success".to_string(),
213            events: aggregated_events,
214        }))
215    }
216}