risotto_lib/
state.rs

1use anyhow::Result;
2use chrono::Utc;
3use core::net::IpAddr;
4use metrics::{counter, gauge};
5use serde::{Deserialize, Serialize};
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::mpsc::Sender;
10use tokio::sync::Mutex;
11use tokio::time::sleep;
12use tracing::{debug, trace};
13
14use crate::state_store::store::StateStore;
15use crate::update::{map_to_ipv6, Update, UpdateMetadata};
16
17pub type AsyncState<T> = Arc<Mutex<State<T>>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state<T: StateStore + Send>(store: T) -> AsyncState<T> {
21    Arc::new(Mutex::new(State::new(store)))
22}
23
24pub struct State<T: StateStore> {
25    pub store: T,
26}
27
28impl<T: StateStore> State<T> {
29    pub fn new(store: T) -> State<T> {
30        State { store }
31    }
32
33    pub fn get_updates_by_peer(
34        &self,
35        router_addr: &IpAddr,
36        peer_addr: &IpAddr,
37    ) -> Result<Vec<TimedPrefix>> {
38        Ok(self.store.get_updates_by_peer(router_addr, peer_addr))
39    }
40
41    pub fn add_peer(&mut self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Result<()> {
42        self.store.add_peer(router_addr, peer_addr);
43        Ok(())
44    }
45
46    pub fn remove_peer(&mut self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Result<()> {
47        self.store.remove_peer(router_addr, peer_addr);
48        gauge!(
49            "risotto_state_updates",
50            "router" => router_addr.to_string(),
51            "peer" => peer_addr.to_string()
52        )
53        .set(0);
54        Ok(())
55    }
56
57    pub fn update(
58        &mut self,
59        router_addr: &IpAddr,
60        peer_addr: &IpAddr,
61        update: &Update,
62    ) -> Result<bool> {
63        let emit = self.store.update(router_addr, &peer_addr, update);
64        if emit {
65            let delta = if update.announced { 1.0 } else { -1.0 };
66            gauge!(
67                "risotto_state_updates",
68                "router" => router_addr.to_string(),
69                "peer" => peer_addr.to_string()
70            )
71            .increment(delta);
72        }
73        Ok(emit)
74    }
75}
76
77#[derive(Serialize, Deserialize, Eq, Clone)]
78pub struct TimedPrefix {
79    pub prefix_addr: IpAddr,
80    pub prefix_len: u8,
81    pub is_post_policy: bool,
82    pub is_adj_rib_out: bool,
83    pub timestamp: i64,
84}
85
86impl PartialEq for TimedPrefix {
87    fn eq(&self, other: &Self) -> bool {
88        self.prefix_addr == other.prefix_addr
89            && self.prefix_len == other.prefix_len
90            && self.is_post_policy == other.is_post_policy
91            && self.is_adj_rib_out == other.is_adj_rib_out
92    }
93}
94
95impl Hash for TimedPrefix {
96    fn hash<H: Hasher>(&self, state: &mut H) {
97        self.prefix_addr.hash(state);
98        self.prefix_len.hash(state);
99        self.is_post_policy.hash(state);
100        self.is_adj_rib_out.hash(state);
101    }
102}
103
104pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata) -> Update {
105    Update {
106        time_received_ns: Utc::now(),
107        time_bmp_header_ns: Utc::now(),
108        router_addr: map_to_ipv6(metadata.router_socket.ip()),
109        router_port: metadata.router_socket.port(),
110        peer_addr: map_to_ipv6(metadata.peer_addr),
111        peer_bgp_id: metadata.peer_bgp_id,
112        peer_asn: metadata.peer_asn,
113        prefix_addr: map_to_ipv6(prefix.prefix_addr),
114        prefix_len: prefix.prefix_len,
115        is_post_policy: prefix.is_post_policy,
116        is_adj_rib_out: prefix.is_adj_rib_out,
117        announced: false,
118        synthetic: true,
119        
120        // BGP Attributes - all empty/default for synthetic withdraws
121        origin: "INCOMPLETE".to_string(),
122        as_path: vec![],
123        next_hop: None,
124        multi_exit_discriminator: None,
125        local_preference: None,
126        only_to_customer: None,
127        atomic_aggregate: false,
128        aggregator_asn: None,
129        aggregator_bgp_id: None,
130        communities: vec![],
131        extended_communities: vec![],
132        large_communities: vec![],
133        originator_id: None,
134        cluster_list: vec![],
135        mp_reach_afi: None,
136        mp_reach_safi: None,
137        mp_unreach_afi: None,
138        mp_unreach_safi: None,
139    }
140}
141
142pub async fn peer_up_withdraws_handler<T: StateStore>(
143    state: AsyncState<T>,
144    tx: Sender<Update>,
145    metadata: UpdateMetadata,
146    sleep_time: u64,
147) -> Result<()> {
148    let startup = chrono::Utc::now();
149    sleep(Duration::from_secs(sleep_time)).await;
150
151    debug!(
152        "[{} - {} - removing updates older than {} after waited {} seconds",
153        metadata.router_socket, metadata.peer_addr, startup, sleep_time
154    );
155
156    let state_lock = state.lock().await;
157    let timed_prefixes = state_lock
158        .store
159        .get_updates_by_peer(&metadata.router_socket.ip(), &metadata.peer_addr);
160
161    drop(state_lock);
162
163    let mut synthetic_updates = Vec::new();
164    for timed_prefix in timed_prefixes {
165        if timed_prefix.timestamp < startup.timestamp_millis() {
166            // This update has not been re-announced after startup
167            // Emit a synthetic withdraw update
168            synthetic_updates.push(synthesize_withdraw_update(timed_prefix, metadata.clone()));
169        }
170    }
171
172    debug!(
173        "[{} - {} - emitting {} synthetic withdraw updates",
174        metadata.router_socket,
175        metadata.peer_addr,
176        synthetic_updates.len()
177    );
178
179    counter!(
180        "risotto_tx_updates_total",
181        "router" => metadata.router_socket.ip().to_string(),
182        "peer" => metadata.peer_addr.to_string(),
183    )
184    .increment(synthetic_updates.len() as u64);
185
186    let mut state_lock = state.lock().await;
187    for update in &mut synthetic_updates {
188        trace!("{:?}", update);
189
190        // Sent to the event pipeline
191        tx.send(update.clone()).await?;
192
193        // Remove the update from the state
194        state_lock
195            .store
196            .update(&update.router_addr, &metadata.peer_addr, update);
197    }
198
199    Ok(())
200}
201
202/// Process pre-parsed updates through the state machine
203/// This is the core processing logic used by both collectors and curators
204pub async fn process_updates<T: StateStore>(
205    state: Option<AsyncState<T>>,
206    tx: Sender<Update>,
207    updates: Vec<Update>,
208) -> Result<()> {
209    if updates.is_empty() {
210        return Ok(());
211    }
212
213    match state {
214        Some(state) => {
215            // Stateful mode: deduplicate updates
216            let mut state_lock = state.lock().await;
217
218            for update in updates {
219                debug!("Processing update: {:?}", update);
220                let should_emit =
221                    state_lock.update(&update.router_addr, &update.peer_addr, &update)?;
222
223                if should_emit {
224                    trace!("Emitting update: {:?}", update);
225                    tx.send(update.clone()).await?;
226
227                    counter!(
228                        "risotto_tx_updates_total",
229                        "router" => update.router_addr.to_string(),
230                        "peer" => update.peer_addr.to_string(),
231                    )
232                    .increment(1);
233                }
234            }
235        }
236        None => {
237            // Stateless mode: forward all updates as-is
238            for update in updates {
239                trace!("Forwarding update: {:?}", update);
240
241                counter!(
242                    "risotto_tx_updates_total",
243                    "router" => update.router_addr.to_string(),
244                    "peer" => update.peer_addr.to_string(),
245                )
246                .increment(1);
247
248                tx.send(update).await?;
249            }
250        }
251    }
252
253    Ok(())
254}
255
256/// Process a single pre-parsed update through the state machine
257/// Convenience wrapper for single update processing
258pub async fn process_update<T: StateStore>(
259    state: Option<AsyncState<T>>,
260    tx: Sender<Update>,
261    update: Update,
262) -> Result<()> {
263    process_updates(state, tx, vec![update]).await
264}