1use bgpkit_parser::models::{NetworkPrefix, Origin, Peer as BGPkitPeer};
2use chrono::Utc;
3use core::net::IpAddr;
4use rand::Rng;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::hash::{Hash, Hasher};
9use std::sync::mpsc::Sender;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{info, trace};
14
15use crate::update::{format_update, Update};
16
17pub type AsyncState = Arc<Mutex<State>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state() -> AsyncState {
21 Arc::new(Mutex::new(State::new()))
22}
23
24pub struct State {
25 pub store: MemoryStore,
26}
27
28impl State {
29 pub fn new() -> State {
30 State {
31 store: MemoryStore::new(),
32 }
33 }
34
35 pub fn get_all(&self) -> Result<Vec<RouterPeerUpdate>, Box<dyn Error>> {
37 Ok(self.store.get_all())
38 }
39
40 pub fn get_updates_by_peer(
42 &self,
43 router_addr: &IpAddr,
44 peer: &BGPkitPeer,
45 ) -> Result<Vec<TimedPrefix>, Box<dyn Error>> {
46 Ok(self.store.get_updates_by_peer(router_addr, peer))
47 }
48
49 pub fn remove_updates(
51 &mut self,
52 router_addr: &IpAddr,
53 peer: &BGPkitPeer,
54 ) -> Result<(), Box<dyn Error>> {
55 self.store.remove_peer(router_addr, peer);
56 Ok(())
57 }
58
59 pub fn update(
61 &mut self,
62 router_addr: &IpAddr,
63 peer: &BGPkitPeer,
64 update: &Update,
65 ) -> Result<bool, Box<dyn Error>> {
66 let emit = self.store.update(router_addr, peer, update);
67 Ok(emit)
68 }
69}
70
71#[derive(Serialize, Deserialize, Eq, Clone)]
72pub struct TimedPrefix {
73 pub prefix: NetworkPrefix,
74 pub is_post_policy: bool,
75 pub is_adj_rib_out: bool,
76 pub timestamp: i64,
77}
78
79impl PartialEq for TimedPrefix {
80 fn eq(&self, other: &Self) -> bool {
81 self.prefix == other.prefix
82 && self.is_post_policy == other.is_post_policy
83 && self.is_adj_rib_out == other.is_adj_rib_out
84 }
85}
86
87impl Hash for TimedPrefix {
88 fn hash<H: Hasher>(&self, state: &mut H) {
89 self.prefix.hash(state);
90 self.is_post_policy.hash(state);
91 self.is_adj_rib_out.hash(state);
92 }
93}
94
95#[derive(Serialize, Deserialize)]
96pub struct MemoryStore {
97 routers: HashMap<IpAddr, Router>,
98}
99
100impl MemoryStore {
101 fn new() -> MemoryStore {
102 MemoryStore {
103 routers: HashMap::new(),
104 }
105 }
106
107 fn _get_router(&mut self, router_addr: &IpAddr) -> &mut Router {
108 let router = self.routers.entry(*router_addr).or_insert(Router::new());
109 router
110 }
111
112 fn get_all(&self) -> Vec<RouterPeerUpdate> {
113 let mut res = Vec::new();
114 for (router_addr, router) in &self.routers {
115 for (peer_addr, peer) in &router.peers {
116 for update in &peer.updates {
117 res.push((*router_addr, *peer_addr, update.clone()));
118 }
119 }
120 }
121 res
122 }
123
124 fn get_peer(&self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Option<Peer> {
125 let router_binding = Router::new();
126 let router = self.routers.get(router_addr).unwrap_or(&router_binding);
127 router.peers.get(peer_addr).cloned()
128 }
129
130 fn get_updates_by_peer(&self, router_addr: &IpAddr, peer: &BGPkitPeer) -> Vec<TimedPrefix> {
131 let router_binding = Router::new();
132 let router = self.routers.get(router_addr).unwrap_or(&router_binding);
133 let peer_binding = Peer {
134 details: *peer,
135 updates: HashSet::new(),
136 };
137 let updates = router
138 .peers
139 .get(&peer.peer_address)
140 .unwrap_or(&peer_binding);
141
142 updates.updates.iter().cloned().collect()
143 }
144
145 fn remove_peer(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer) {
146 let router = self._get_router(router_addr);
147 router.remove_peer(peer);
148 }
149
150 fn update(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer, update: &Update) -> bool {
151 let router = self._get_router(router_addr);
152 router.update(peer, update)
153 }
154}
155
156#[derive(Serialize, Deserialize, Clone)]
157pub struct Peer {
158 details: BGPkitPeer,
159 updates: HashSet<TimedPrefix>,
160}
161
162#[derive(Serialize, Deserialize)]
163struct Router {
164 peers: HashMap<IpAddr, Peer>,
165}
166
167impl Router {
168 fn new() -> Router {
169 Router {
170 peers: HashMap::new(),
171 }
172 }
173
174 fn add_peer(&mut self, peer: &BGPkitPeer) {
175 self.peers.entry(peer.peer_address).or_insert_with(|| Peer {
176 details: *peer,
177 updates: HashSet::new(),
178 });
179 }
180
181 fn remove_peer(&mut self, peer: &BGPkitPeer) {
182 self.peers.remove(&peer.peer_address);
183 }
184
185 fn update(&mut self, peer: &BGPkitPeer, update: &Update) -> bool {
186 self.add_peer(peer);
187 let peer = self.peers.get_mut(&peer.peer_address).unwrap();
188
189 let now: i64 = chrono::Utc::now().timestamp_millis();
190 let timed_prefix = TimedPrefix {
191 prefix: update.prefix,
192 is_post_policy: update.is_post_policy,
193 is_adj_rib_out: update.is_adj_rib_out,
194 timestamp: now,
195 };
196
197 let emit = update.announced ^ peer.updates.contains(&timed_prefix);
200
201 if update.announced {
202 peer.updates.replace(timed_prefix);
204 } else {
205 peer.updates.remove(&timed_prefix);
207 }
208 emit
209 }
210}
211
212pub fn synthesize_withdraw_update(prefix: TimedPrefix) -> Update {
213 Update {
214 prefix: prefix.prefix,
215 announced: false,
216 origin: Origin::INCOMPLETE,
217 path: None,
218 communities: vec![],
219 is_post_policy: prefix.is_post_policy,
220 is_adj_rib_out: prefix.is_adj_rib_out,
221 timestamp: Utc::now(),
222 synthetic: true,
223 }
224}
225
226pub async fn peer_up_withdraws_handler(
227 state: AsyncState,
228 router_addr: IpAddr,
229 router_port: u16,
230 bgp_peer: BGPkitPeer,
231 tx: Sender<String>,
232) {
233 let startup = chrono::Utc::now();
234 let random = {
235 let mut rng = rand::rng();
236 rng.random_range(-60.0..60.0) as i64
237 };
238 let sleep_time = 300 + random; sleep(Duration::from_secs(sleep_time as u64)).await;
241
242 info!(
243 "startup withdraws handler - {}:{} - {} removing updates older than {}",
244 router_addr, router_port, bgp_peer.peer_address, startup
245 );
246
247 let state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
248 let peer = match state_lock
249 .store
250 .get_peer(&router_addr, &bgp_peer.peer_address)
251 {
252 Some(peer) => peer,
253 None => return,
254 };
255
256 drop(state_lock);
257
258 let mut synthetic_updates = Vec::new();
259 for update in peer.updates {
260 if update.timestamp < startup.timestamp_millis() {
261 synthetic_updates.push((
264 router_addr,
265 peer.details,
266 synthesize_withdraw_update(update.clone()),
267 ));
268 }
269 }
270
271 info!(
272 "startup withdraws handler - {} - {} emitting {} synthetic withdraw updates",
273 router_addr,
274 bgp_peer.peer_address,
275 synthetic_updates.len()
276 );
277
278 let mut state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
279 for (router_addr, peer, update) in &mut synthetic_updates {
280 let update_str = format_update(*router_addr, 0, peer, update);
281 trace!("{:?}", update_str);
282
283 tx.send(update_str).unwrap();
285
286 state_lock.store.update(router_addr, peer, update);
288 }
289}