1use anyhow::Result;
2use chrono::Utc;
3use core::net::IpAddr;
4use metrics::{counter, gauge};
5use serde::{Deserialize, Serialize};
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::Mutex;
11use tokio::time::sleep;
12use tracing::{debug, trace};
13
14use crate::state_store::store::StateStore;
15use crate::update::{map_to_ipv6, Update, UpdateMetadata};
16
17pub type AsyncState<T> = Arc<Mutex<State<T>>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state<T: StateStore + Send>(store: T) -> AsyncState<T> {
21 Arc::new(Mutex::new(State::new(store)))
22}
23
24pub struct State<T: StateStore> {
25 pub store: T,
26}
27
28impl<T: StateStore> State<T> {
29 pub fn new(store: T) -> State<T> {
30 State { store }
31 }
32
33 pub fn get_updates_by_peer(
34 &self,
35 router_addr: &IpAddr,
36 peer_addr: &IpAddr,
37 ) -> Result<Vec<TimedPrefix>> {
38 Ok(self.store.get_updates_by_peer(router_addr, peer_addr))
39 }
40
41 pub fn add_peer(&mut self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Result<()> {
42 self.store.add_peer(router_addr, peer_addr);
43 Ok(())
44 }
45
46 pub fn remove_peer(&mut self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Result<()> {
47 self.store.remove_peer(router_addr, peer_addr);
48 gauge!(
49 "risotto_state_updates",
50 "router" => router_addr.to_string(),
51 "peer" => peer_addr.to_string()
52 )
53 .set(0);
54 Ok(())
55 }
56
57 pub fn update(
58 &mut self,
59 router_addr: &IpAddr,
60 peer_addr: &IpAddr,
61 update: &Update,
62 ) -> Result<bool> {
63 let emit = self.store.update(router_addr, &peer_addr, update);
64 if emit {
65 let delta = if update.announced { 1.0 } else { -1.0 };
66 gauge!(
67 "risotto_state_updates",
68 "router" => router_addr.to_string(),
69 "peer" => peer_addr.to_string()
70 )
71 .increment(delta);
72 }
73 Ok(emit)
74 }
75}
76
77#[derive(Serialize, Deserialize, Eq, Clone)]
78pub struct TimedPrefix {
79 pub prefix_addr: IpAddr,
80 pub prefix_len: u8,
81 pub is_post_policy: bool,
82 pub is_adj_rib_out: bool,
83 pub timestamp: i64,
84}
85
86impl PartialEq for TimedPrefix {
87 fn eq(&self, other: &Self) -> bool {
88 self.prefix_addr == other.prefix_addr
89 && self.prefix_len == other.prefix_len
90 && self.is_post_policy == other.is_post_policy
91 && self.is_adj_rib_out == other.is_adj_rib_out
92 }
93}
94
95impl Hash for TimedPrefix {
96 fn hash<H: Hasher>(&self, state: &mut H) {
97 self.prefix_addr.hash(state);
98 self.prefix_len.hash(state);
99 self.is_post_policy.hash(state);
100 self.is_adj_rib_out.hash(state);
101 }
102}
103
104pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata) -> Update {
105 Update {
106 time_received_ns: Utc::now(),
107 time_bmp_header_ns: Utc::now(),
108 router_addr: map_to_ipv6(metadata.router_socket.ip()),
109 router_port: metadata.router_socket.port(),
110 peer_addr: map_to_ipv6(metadata.peer_addr),
111 peer_bgp_id: metadata.peer_bgp_id,
112 peer_asn: metadata.peer_asn,
113 prefix_addr: map_to_ipv6(prefix.prefix_addr),
114 prefix_len: prefix.prefix_len,
115 is_post_policy: prefix.is_post_policy,
116 is_adj_rib_out: prefix.is_adj_rib_out,
117 announced: false,
118 synthetic: true,
119
120 origin: "INCOMPLETE".to_string(),
122 as_path: vec![],
123 next_hop: None,
124 multi_exit_discriminator: None,
125 local_preference: None,
126 only_to_customer: None,
127 atomic_aggregate: false,
128 aggregator_asn: None,
129 aggregator_bgp_id: None,
130 communities: vec![],
131 extended_communities: vec![],
132 large_communities: vec![],
133 originator_id: None,
134 cluster_list: vec![],
135 mp_reach_afi: None,
136 mp_reach_safi: None,
137 mp_unreach_afi: None,
138 mp_unreach_safi: None,
139 }
140}
141
142pub async fn peer_up_withdraws_handler<T: StateStore>(
143 state: AsyncState<T>,
144 tx: Sender<Update>,
145 metadata: UpdateMetadata,
146 sleep_time: u64,
147) -> Result<()> {
148 let startup = chrono::Utc::now();
149 sleep(Duration::from_secs(sleep_time)).await;
150
151 debug!(
152 "[{} - {} - removing updates older than {} after waited {} seconds",
153 metadata.router_socket, metadata.peer_addr, startup, sleep_time
154 );
155
156 let state_lock = state.lock().await;
157 let timed_prefixes = state_lock
158 .store
159 .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr);
160
161 drop(state_lock);
162
163 let mut synthetic_updates = Vec::new();
164 for timed_prefix in timed_prefixes {
165 if timed_prefix.timestamp < startup.timestamp_millis() {
166 synthetic_updates.push(synthesize_withdraw_update(timed_prefix, metadata.clone()));
169 }
170 }
171
172 debug!(
173 "[{} - {} - emitting {} synthetic withdraw updates",
174 metadata.router_socket,
175 metadata.peer_addr,
176 synthetic_updates.len()
177 );
178
179 counter!(
180 "risotto_tx_updates_total",
181 "router" => metadata.router_socket.ip().to_string(),
182 "peer" => metadata.peer_addr.to_string(),
183 )
184 .increment(synthetic_updates.len() as u64);
185
186 let mut state_lock = state.lock().await;
187 for update in &mut synthetic_updates {
188 trace!("{:?}", update);
189
190 tx.send(update.clone()).await?;
192
193 state_lock
195 .store
196 .update(&update.router_addr, &metadata.peer_addr, update);
197 }
198
199 Ok(())
200}
201
202pub async fn process_updates<T: StateStore>(
205 state: Option<AsyncState<T>>,
206 tx: Sender<Update>,
207 updates: Vec<Update>,
208) -> Result<()> {
209 if updates.is_empty() {
210 return Ok(());
211 }
212
213 match state {
214 Some(state) => {
215 let mut state_lock = state.lock().await;
217
218 for update in updates {
219 debug!("Processing update: {:?}", update);
220 let should_emit =
221 state_lock.update(&update.router_addr, &update.peer_addr, &update)?;
222
223 if should_emit {
224 trace!("Emitting update: {:?}", update);
225 tx.send(update.clone()).await?;
226
227 counter!(
228 "risotto_tx_updates_total",
229 "router" => update.router_addr.to_string(),
230 "peer" => update.peer_addr.to_string(),
231 )
232 .increment(1);
233 }
234 }
235 }
236 None => {
237 for update in updates {
239 trace!("Forwarding update: {:?}", update);
240
241 counter!(
242 "risotto_tx_updates_total",
243 "router" => update.router_addr.to_string(),
244 "peer" => update.peer_addr.to_string(),
245 )
246 .increment(1);
247
248 tx.send(update).await?;
249 }
250 }
251 }
252
253 Ok(())
254}
255
256pub async fn process_update<T: StateStore>(
259 state: Option<AsyncState<T>>,
260 tx: Sender<Update>,
261 update: Update,
262) -> Result<()> {
263 process_updates(state, tx, vec![update]).await
264}