risotto_lib/
state.rs

1use bgpkit_parser::models::Peer as BGPkitPeer;
2use chrono::Utc;
3use core::net::IpAddr;
4use rand::Rng;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::hash::{Hash, Hasher};
9use std::sync::mpsc::Sender;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{info, trace};
14
15use crate::update::{Update, UpdateMetadata};
16
17pub type AsyncState = Arc<Mutex<State>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state() -> AsyncState {
21    Arc::new(Mutex::new(State::new()))
22}
23
24pub struct State {
25    pub store: MemoryStore,
26}
27
28impl State {
29    pub fn new() -> State {
30        State {
31            store: MemoryStore::new(),
32        }
33    }
34
35    // Get all the updates from the state
36    pub fn get_all(&self) -> Result<Vec<RouterPeerUpdate>, Box<dyn Error>> {
37        Ok(self.store.get_all())
38    }
39
40    // Get the updates for a specific router and peer
41    pub fn get_updates_by_peer(
42        &self,
43        router_addr: &IpAddr,
44        peer: &BGPkitPeer,
45    ) -> Result<Vec<TimedPrefix>, Box<dyn Error>> {
46        Ok(self.store.get_updates_by_peer(router_addr, peer))
47    }
48
49    // Remove all updates for a specific router and peer
50    pub fn remove_updates(
51        &mut self,
52        router_addr: &IpAddr,
53        peer: &BGPkitPeer,
54    ) -> Result<(), Box<dyn Error>> {
55        self.store.remove_peer(router_addr, peer);
56        Ok(())
57    }
58
59    // Update the state with a new update
60    pub fn update(
61        &mut self,
62        router_addr: &IpAddr,
63        peer: &BGPkitPeer,
64        update: &Update,
65    ) -> Result<bool, Box<dyn Error>> {
66        Ok(self.store.update(router_addr, &peer, update))
67    }
68}
69
70#[derive(Serialize, Deserialize, Eq, Clone)]
71pub struct TimedPrefix {
72    pub prefix_addr: IpAddr,
73    pub prefix_len: u8,
74    pub is_post_policy: bool,
75    pub is_adj_rib_out: bool,
76    pub timestamp: i64,
77}
78
79impl PartialEq for TimedPrefix {
80    fn eq(&self, other: &Self) -> bool {
81        self.prefix_addr == other.prefix_addr
82            && self.prefix_len == other.prefix_len
83            && self.is_post_policy == other.is_post_policy
84            && self.is_adj_rib_out == other.is_adj_rib_out
85    }
86}
87
88impl Hash for TimedPrefix {
89    fn hash<H: Hasher>(&self, state: &mut H) {
90        self.prefix_addr.hash(state);
91        self.prefix_len.hash(state);
92        self.is_post_policy.hash(state);
93        self.is_adj_rib_out.hash(state);
94    }
95}
96
97#[derive(Serialize, Deserialize)]
98pub struct MemoryStore {
99    routers: HashMap<IpAddr, Router>,
100}
101
102impl MemoryStore {
103    fn new() -> MemoryStore {
104        MemoryStore {
105            routers: HashMap::new(),
106        }
107    }
108
109    fn _get_router(&mut self, router_addr: &IpAddr) -> &mut Router {
110        let router = self.routers.entry(*router_addr).or_insert(Router::new());
111        router
112    }
113
114    fn get_all(&self) -> Vec<RouterPeerUpdate> {
115        let mut res = Vec::new();
116        for (router_addr, router) in &self.routers {
117            for (peer_addr, peer) in &router.peers {
118                for update in &peer.updates {
119                    res.push((*router_addr, *peer_addr, update.clone()));
120                }
121            }
122        }
123        res
124    }
125
126    fn get_peer(&self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Option<Peer> {
127        let router_binding = Router::new();
128        let router = self.routers.get(router_addr).unwrap_or(&router_binding);
129        router.peers.get(peer_addr).cloned()
130    }
131
132    fn get_updates_by_peer(&self, router_addr: &IpAddr, peer: &BGPkitPeer) -> Vec<TimedPrefix> {
133        let router_binding = Router::new();
134        let router = self.routers.get(router_addr).unwrap_or(&router_binding);
135        let peer_binding = Peer {
136            details: *peer,
137            updates: HashSet::new(),
138        };
139        let updates = router
140            .peers
141            .get(&peer.peer_address)
142            .unwrap_or(&peer_binding);
143
144        updates.updates.iter().cloned().collect()
145    }
146
147    fn remove_peer(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer) {
148        let router = self._get_router(router_addr);
149        router.remove_peer(peer);
150    }
151
152    fn update(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer, update: &Update) -> bool {
153        let router = self._get_router(router_addr);
154        router.update(peer, update)
155    }
156}
157
158#[derive(Serialize, Deserialize, Clone)]
159pub struct Peer {
160    details: BGPkitPeer,
161    updates: HashSet<TimedPrefix>,
162}
163
164#[derive(Serialize, Deserialize)]
165struct Router {
166    peers: HashMap<IpAddr, Peer>,
167}
168
169impl Router {
170    fn new() -> Router {
171        Router {
172            peers: HashMap::new(),
173        }
174    }
175
176    fn add_peer(&mut self, peer: &BGPkitPeer) {
177        self.peers.entry(peer.peer_address).or_insert_with(|| Peer {
178            details: *peer,
179            updates: HashSet::new(),
180        });
181    }
182
183    fn remove_peer(&mut self, peer: &BGPkitPeer) {
184        self.peers.remove(&peer.peer_address);
185    }
186
187    fn update(&mut self, peer: &BGPkitPeer, update: &Update) -> bool {
188        self.add_peer(peer);
189        let peer = self.peers.get_mut(&peer.peer_address).unwrap();
190
191        let now: i64 = chrono::Utc::now().timestamp_millis();
192        let timed_prefix = TimedPrefix {
193            prefix_addr: update.prefix_addr,
194            prefix_len: update.prefix_len,
195            is_post_policy: update.is_post_policy,
196            is_adj_rib_out: update.is_adj_rib_out,
197            timestamp: now,
198        };
199
200        // Will emit the update only if (1) announced + not present or (2) withdrawn + present
201        // Which is a XOR operation
202        let emit = update.announced ^ peer.updates.contains(&timed_prefix);
203
204        if update.announced {
205            // Announced prefix: add the update or overwrite it if present
206            peer.updates.replace(timed_prefix);
207        } else {
208            // Withdrawn prefix: remove the update if present
209            peer.updates.remove(&timed_prefix);
210        }
211        emit
212    }
213}
214
215pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata) -> Update {
216    Update {
217        timestamp: Utc::now(),
218        router_addr: metadata.router_addr,
219        router_port: metadata.router_port,
220        peer_addr: metadata.peer_addr,
221        peer_bgp_id: metadata.peer_bgp_id,
222        peer_asn: metadata.peer_asn,
223        prefix_addr: prefix.prefix_addr,
224        prefix_len: prefix.prefix_len,
225        announced: false,
226        origin: "INCOMPLETE".to_string(),
227        path: vec![],
228        communities: vec![],
229        is_post_policy: prefix.is_post_policy,
230        is_adj_rib_out: prefix.is_adj_rib_out,
231        synthetic: true,
232    }
233}
234
235pub async fn peer_up_withdraws_handler(
236    state: AsyncState,
237    tx: Sender<Update>,
238    metadata: UpdateMetadata,
239) {
240    let startup = chrono::Utc::now();
241    let random = {
242        let mut rng = rand::rng();
243        rng.random_range(-60.0..60.0) as i64
244    };
245    let sleep_time = 300 + random; // 5 minutes +/- 1 minute
246
247    sleep(Duration::from_secs(sleep_time as u64)).await;
248
249    info!(
250        "startup withdraws handler - {}:{} - {} removing updates older than {}",
251        metadata.router_addr, metadata.router_port, metadata.peer_addr, startup
252    );
253
254    let state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
255    let peer = match state_lock
256        .store
257        .get_peer(&metadata.router_addr, &metadata.peer_addr)
258    {
259        Some(peer) => peer,
260        None => return,
261    };
262
263    drop(state_lock);
264
265    let mut synthetic_updates = Vec::new();
266    for update in peer.updates {
267        if update.timestamp < startup.timestamp_millis() {
268            // This update has not been re-announced after startup
269            // Emit a synthetic withdraw update
270            synthetic_updates.push(synthesize_withdraw_update(update.clone(), metadata.clone()));
271        }
272    }
273
274    info!(
275        "startup withdraws handler - {} - {} emitting {} synthetic withdraw updates",
276        metadata.router_addr,
277        peer.details.peer_address,
278        synthetic_updates.len()
279    );
280
281    let mut state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
282    for update in &mut synthetic_updates {
283        trace!("{:?}", update);
284
285        // Sent to the event pipeline
286        tx.send(update.clone()).unwrap();
287
288        // Remove the update from the state
289        state_lock
290            .store
291            .update(&update.router_addr, &peer.details, update);
292    }
293}