risotto_lib/
state.rs

1use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer};
2use chrono::Utc;
3use core::net::IpAddr;
4use rand::Rng;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::hash::{Hash, Hasher};
9use std::sync::mpsc::Sender;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{info, trace};
14
15use crate::update::{format_update, Update};
16
17pub type AsyncState = Arc<Mutex<State>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state() -> AsyncState {
21    Arc::new(Mutex::new(State::new()))
22}
23
24pub struct State {
25    pub store: MemoryStore,
26}
27
28impl State {
29    pub fn new() -> State {
30        State {
31            store: MemoryStore::new(),
32        }
33    }
34
35    // Get all the updates from the state
36    pub fn get_all(&self) -> Result<Vec<RouterPeerUpdate>, Box<dyn Error>> {
37        Ok(self.store.get_all())
38    }
39
40    // Get the updates for a specific router and peer
41    pub fn get_updates_by_peer(
42        &self,
43        router_addr: &IpAddr,
44        peer: &BGPkitPeer,
45    ) -> Result<Vec<TimedPrefix>, Box<dyn Error>> {
46        Ok(self.store.get_updates_by_peer(router_addr, peer))
47    }
48
49    // Remove all updates for a specific router and peer
50    pub fn remove_updates(
51        &mut self,
52        router_addr: &IpAddr,
53        peer: &BGPkitPeer,
54    ) -> Result<(), Box<dyn Error>> {
55        self.store.remove_peer(router_addr, peer);
56        Ok(())
57    }
58
59    // Update the state with a new update
60    pub fn update(
61        &mut self,
62        router_addr: &IpAddr,
63        peer: &BGPkitPeer,
64        update: &Update,
65    ) -> Result<bool, Box<dyn Error>> {
66        let emit = self.store.update(router_addr, peer, update);
67        Ok(emit)
68    }
69}
70
71#[derive(Serialize, Deserialize, Eq, Clone)]
72pub struct TimedPrefix {
73    pub prefix: NetworkPrefix,
74    pub is_post_policy: bool,
75    pub is_adj_rib_out: bool,
76    pub timestamp: i64,
77}
78
79impl PartialEq for TimedPrefix {
80    fn eq(&self, other: &Self) -> bool {
81        self.prefix == other.prefix
82            && self.is_post_policy == other.is_post_policy
83            && self.is_adj_rib_out == other.is_adj_rib_out
84    }
85}
86
87impl Hash for TimedPrefix {
88    fn hash<H: Hasher>(&self, state: &mut H) {
89        self.prefix.hash(state);
90        self.is_post_policy.hash(state);
91        self.is_adj_rib_out.hash(state);
92    }
93}
94
95#[derive(Serialize, Deserialize)]
96pub struct MemoryStore {
97    routers: HashMap<IpAddr, Router>,
98}
99
100impl MemoryStore {
101    fn new() -> MemoryStore {
102        MemoryStore {
103            routers: HashMap::new(),
104        }
105    }
106
107    fn _get_router(&mut self, router_addr: &IpAddr) -> &mut Router {
108        let router = self.routers.entry(*router_addr).or_insert(Router::new());
109        router
110    }
111
112    fn get_all(&self) -> Vec<RouterPeerUpdate> {
113        let mut res = Vec::new();
114        for (router_addr, router) in &self.routers {
115            for (peer_addr, peer) in &router.peers {
116                for update in &peer.updates {
117                    res.push((*router_addr, *peer_addr, update.clone()));
118                }
119            }
120        }
121        res
122    }
123
124    fn get_peer(&self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Option<Peer> {
125        let router_binding = Router::new();
126        let router = self.routers.get(router_addr).unwrap_or(&router_binding);
127        router.peers.get(peer_addr).cloned()
128    }
129
130    fn get_updates_by_peer(&self, router_addr: &IpAddr, peer: &BGPkitPeer) -> Vec<TimedPrefix> {
131        let router_binding = Router::new();
132        let router = self.routers.get(router_addr).unwrap_or(&router_binding);
133        let peer_binding = Peer {
134            details: *peer,
135            updates: HashSet::new(),
136        };
137        let updates = router
138            .peers
139            .get(&peer.peer_address)
140            .unwrap_or(&peer_binding);
141
142        updates.updates.iter().cloned().collect()
143    }
144
145    fn remove_peer(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer) {
146        let router = self._get_router(router_addr);
147        router.remove_peer(peer);
148    }
149
150    fn update(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer, update: &Update) -> bool {
151        let router = self._get_router(router_addr);
152        router.update(peer, update)
153    }
154}
155
156#[derive(Serialize, Deserialize, Clone)]
157pub struct Peer {
158    details: BGPkitPeer,
159    updates: HashSet<TimedPrefix>,
160}
161
162#[derive(Serialize, Deserialize)]
163struct Router {
164    peers: HashMap<IpAddr, Peer>,
165}
166
167impl Router {
168    fn new() -> Router {
169        Router {
170            peers: HashMap::new(),
171        }
172    }
173
174    fn add_peer(&mut self, peer: &BGPkitPeer) {
175        self.peers.entry(peer.peer_address).or_insert_with(|| Peer {
176            details: *peer,
177            updates: HashSet::new(),
178        });
179    }
180
181    fn remove_peer(&mut self, peer: &BGPkitPeer) {
182        self.peers.remove(&peer.peer_address);
183    }
184
185    fn update(&mut self, peer: &BGPkitPeer, update: &Update) -> bool {
186        self.add_peer(peer);
187        let peer = self.peers.get_mut(&peer.peer_address).unwrap();
188
189        let now: i64 = chrono::Utc::now().timestamp_millis();
190        let timed_prefix = TimedPrefix {
191            prefix: update.prefix,
192            is_post_policy: update.is_post_policy,
193            is_adj_rib_out: update.is_adj_rib_out,
194            timestamp: now,
195        };
196
197        // Will emit the update only if (1) announced + not present or (2) withdrawn + present
198        // Which is a XOR operation
199        let emit = update.announced ^ peer.updates.contains(&timed_prefix);
200
201        if update.announced {
202            // Announced prefix: add the update or overwrite it if present
203            peer.updates.replace(timed_prefix);
204        } else {
205            // Withdrawn prefix: remove the update if present
206            peer.updates.remove(&timed_prefix);
207        }
208        emit
209    }
210}
211
212pub fn synthesize_withdraw_update(prefix: TimedPrefix) -> Update {
213    Update {
214        prefix: prefix.prefix,
215        announced: false,
216        origin: Origin::INCOMPLETE,
217        path: None,
218        communities: vec![],
219        is_post_policy: prefix.is_post_policy,
220        is_adj_rib_out: prefix.is_adj_rib_out,
221        timestamp: Utc::now(),
222        synthetic: true,
223    }
224}
225
226pub async fn peer_up_withdraws_handler(
227    state: AsyncState,
228    router_addr: IpAddr,
229    router_port: u16,
230    bgp_peer: BGPkitPeer,
231    tx: Sender<String>,
232) {
233    let startup = chrono::Utc::now();
234    let random = {
235        let mut rng = rand::rng();
236        rng.random_range(-60.0..60.0) as i64
237    };
238    let sleep_time = 300 + random; // 5 minutes +/- 1 minute
239
240    sleep(Duration::from_secs(sleep_time as u64)).await;
241
242    info!(
243        "startup withdraws handler - {}:{} - {} removing updates older than {}",
244        router_addr, router_port, bgp_peer.peer_address, startup
245    );
246
247    let state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
248    let peer = match state_lock
249        .store
250        .get_peer(&router_addr, &bgp_peer.peer_address)
251    {
252        Some(peer) => peer,
253        None => return,
254    };
255
256    drop(state_lock);
257
258    let mut synthetic_updates = Vec::new();
259    for update in peer.updates {
260        if update.timestamp < startup.timestamp_millis() {
261            // This update has been re-announced after startup
262            // Emit a synthetic withdraw update
263            synthetic_updates.push((
264                router_addr,
265                peer.details,
266                synthesize_withdraw_update(update.clone()),
267            ));
268        }
269    }
270
271    info!(
272        "startup withdraws handler - {} - {} emitting {} synthetic withdraw updates",
273        router_addr,
274        bgp_peer.peer_address,
275        synthetic_updates.len()
276    );
277
278    let mut state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
279    for (router_addr, peer, update) in &mut synthetic_updates {
280        let update_str = format_update(*router_addr, 0, peer, update);
281        trace!("{:?}", update_str);
282
283        // Sent to the event pipeline
284        tx.send(update_str).unwrap();
285
286        // Remove the update from the state
287        state_lock.store.update(router_addr, peer, update);
288    }
289}