1use bgpkit_parser::models::Peer as BGPkitPeer;
2use chrono::Utc;
3use core::net::IpAddr;
4use rand::Rng;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::error::Error;
8use std::hash::{Hash, Hasher};
9use std::sync::mpsc::Sender;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio::time::sleep;
13use tracing::{info, trace};
14
15use crate::update::{Update, UpdateMetadata};
16
17pub type AsyncState = Arc<Mutex<State>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state() -> AsyncState {
21 Arc::new(Mutex::new(State::new()))
22}
23
24pub struct State {
25 pub store: MemoryStore,
26}
27
28impl State {
29 pub fn new() -> State {
30 State {
31 store: MemoryStore::new(),
32 }
33 }
34
35 pub fn get_all(&self) -> Result<Vec<RouterPeerUpdate>, Box<dyn Error>> {
37 Ok(self.store.get_all())
38 }
39
40 pub fn get_updates_by_peer(
42 &self,
43 router_addr: &IpAddr,
44 peer: &BGPkitPeer,
45 ) -> Result<Vec<TimedPrefix>, Box<dyn Error>> {
46 Ok(self.store.get_updates_by_peer(router_addr, peer))
47 }
48
49 pub fn remove_updates(
51 &mut self,
52 router_addr: &IpAddr,
53 peer: &BGPkitPeer,
54 ) -> Result<(), Box<dyn Error>> {
55 self.store.remove_peer(router_addr, peer);
56 Ok(())
57 }
58
59 pub fn update(
61 &mut self,
62 router_addr: &IpAddr,
63 peer: &BGPkitPeer,
64 update: &Update,
65 ) -> Result<bool, Box<dyn Error>> {
66 Ok(self.store.update(router_addr, &peer, update))
67 }
68}
69
70#[derive(Serialize, Deserialize, Eq, Clone)]
71pub struct TimedPrefix {
72 pub prefix_addr: IpAddr,
73 pub prefix_len: u8,
74 pub is_post_policy: bool,
75 pub is_adj_rib_out: bool,
76 pub timestamp: i64,
77}
78
79impl PartialEq for TimedPrefix {
80 fn eq(&self, other: &Self) -> bool {
81 self.prefix_addr == other.prefix_addr
82 && self.prefix_len == other.prefix_len
83 && self.is_post_policy == other.is_post_policy
84 && self.is_adj_rib_out == other.is_adj_rib_out
85 }
86}
87
88impl Hash for TimedPrefix {
89 fn hash<H: Hasher>(&self, state: &mut H) {
90 self.prefix_addr.hash(state);
91 self.prefix_len.hash(state);
92 self.is_post_policy.hash(state);
93 self.is_adj_rib_out.hash(state);
94 }
95}
96
97#[derive(Serialize, Deserialize)]
98pub struct MemoryStore {
99 routers: HashMap<IpAddr, Router>,
100}
101
102impl MemoryStore {
103 fn new() -> MemoryStore {
104 MemoryStore {
105 routers: HashMap::new(),
106 }
107 }
108
109 fn _get_router(&mut self, router_addr: &IpAddr) -> &mut Router {
110 let router = self.routers.entry(*router_addr).or_insert(Router::new());
111 router
112 }
113
114 fn get_all(&self) -> Vec<RouterPeerUpdate> {
115 let mut res = Vec::new();
116 for (router_addr, router) in &self.routers {
117 for (peer_addr, peer) in &router.peers {
118 for update in &peer.updates {
119 res.push((*router_addr, *peer_addr, update.clone()));
120 }
121 }
122 }
123 res
124 }
125
126 fn get_peer(&self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Option<Peer> {
127 let router_binding = Router::new();
128 let router = self.routers.get(router_addr).unwrap_or(&router_binding);
129 router.peers.get(peer_addr).cloned()
130 }
131
132 fn get_updates_by_peer(&self, router_addr: &IpAddr, peer: &BGPkitPeer) -> Vec<TimedPrefix> {
133 let router_binding = Router::new();
134 let router = self.routers.get(router_addr).unwrap_or(&router_binding);
135 let peer_binding = Peer {
136 details: *peer,
137 updates: HashSet::new(),
138 };
139 let updates = router
140 .peers
141 .get(&peer.peer_address)
142 .unwrap_or(&peer_binding);
143
144 updates.updates.iter().cloned().collect()
145 }
146
147 fn remove_peer(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer) {
148 let router = self._get_router(router_addr);
149 router.remove_peer(peer);
150 }
151
152 fn update(&mut self, router_addr: &IpAddr, peer: &BGPkitPeer, update: &Update) -> bool {
153 let router = self._get_router(router_addr);
154 router.update(peer, update)
155 }
156}
157
158#[derive(Serialize, Deserialize, Clone)]
159pub struct Peer {
160 details: BGPkitPeer,
161 updates: HashSet<TimedPrefix>,
162}
163
164#[derive(Serialize, Deserialize)]
165struct Router {
166 peers: HashMap<IpAddr, Peer>,
167}
168
169impl Router {
170 fn new() -> Router {
171 Router {
172 peers: HashMap::new(),
173 }
174 }
175
176 fn add_peer(&mut self, peer: &BGPkitPeer) {
177 self.peers.entry(peer.peer_address).or_insert_with(|| Peer {
178 details: *peer,
179 updates: HashSet::new(),
180 });
181 }
182
183 fn remove_peer(&mut self, peer: &BGPkitPeer) {
184 self.peers.remove(&peer.peer_address);
185 }
186
187 fn update(&mut self, peer: &BGPkitPeer, update: &Update) -> bool {
188 self.add_peer(peer);
189 let peer = self.peers.get_mut(&peer.peer_address).unwrap();
190
191 let now: i64 = chrono::Utc::now().timestamp_millis();
192 let timed_prefix = TimedPrefix {
193 prefix_addr: update.prefix_addr,
194 prefix_len: update.prefix_len,
195 is_post_policy: update.is_post_policy,
196 is_adj_rib_out: update.is_adj_rib_out,
197 timestamp: now,
198 };
199
200 let emit = update.announced ^ peer.updates.contains(&timed_prefix);
203
204 if update.announced {
205 peer.updates.replace(timed_prefix);
207 } else {
208 peer.updates.remove(&timed_prefix);
210 }
211 emit
212 }
213}
214
215pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata) -> Update {
216 Update {
217 timestamp: Utc::now(),
218 router_addr: metadata.router_addr,
219 router_port: metadata.router_port,
220 peer_addr: metadata.peer_addr,
221 peer_bgp_id: metadata.peer_bgp_id,
222 peer_asn: metadata.peer_asn,
223 prefix_addr: prefix.prefix_addr,
224 prefix_len: prefix.prefix_len,
225 announced: false,
226 origin: "INCOMPLETE".to_string(),
227 path: vec![],
228 communities: vec![],
229 is_post_policy: prefix.is_post_policy,
230 is_adj_rib_out: prefix.is_adj_rib_out,
231 synthetic: true,
232 }
233}
234
235pub async fn peer_up_withdraws_handler(
236 state: AsyncState,
237 tx: Sender<Update>,
238 metadata: UpdateMetadata,
239) {
240 let startup = chrono::Utc::now();
241 let random = {
242 let mut rng = rand::rng();
243 rng.random_range(-60.0..60.0) as i64
244 };
245 let sleep_time = 300 + random; sleep(Duration::from_secs(sleep_time as u64)).await;
248
249 info!(
250 "startup withdraws handler - {}:{} - {} removing updates older than {}",
251 metadata.router_addr, metadata.router_port, metadata.peer_addr, startup
252 );
253
254 let state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
255 let peer = match state_lock
256 .store
257 .get_peer(&metadata.router_addr, &metadata.peer_addr)
258 {
259 Some(peer) => peer,
260 None => return,
261 };
262
263 drop(state_lock);
264
265 let mut synthetic_updates = Vec::new();
266 for update in peer.updates {
267 if update.timestamp < startup.timestamp_millis() {
268 synthetic_updates.push(synthesize_withdraw_update(update.clone(), metadata.clone()));
271 }
272 }
273
274 info!(
275 "startup withdraws handler - {} - {} emitting {} synthetic withdraw updates",
276 metadata.router_addr,
277 peer.details.peer_address,
278 synthetic_updates.len()
279 );
280
281 let mut state_lock: std::sync::MutexGuard<'_, State> = state.lock().unwrap();
282 for update in &mut synthetic_updates {
283 trace!("{:?}", update);
284
285 tx.send(update.clone()).unwrap();
287
288 state_lock
290 .store
291 .update(&update.router_addr, &peer.details, update);
292 }
293}