risotto_lib/
state.rs

1use anyhow::Result;
2use chrono::Utc;
3use core::net::IpAddr;
4use serde::{Deserialize, Serialize};
5use std::error::Error;
6use std::hash::{Hash, Hasher};
7use std::sync::mpsc::Sender;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::Mutex;
11use tokio::time::sleep;
12use tracing::{info, trace};
13
14use crate::state_store::store::StateStore;
15use crate::update::{Update, UpdateMetadata};
16
17pub type AsyncState<T> = Arc<Mutex<State<T>>>;
18pub type RouterPeerUpdate = (IpAddr, IpAddr, TimedPrefix);
19
20pub fn new_state<T: StateStore + Send>(store: T) -> AsyncState<T> {
21    Arc::new(Mutex::new(State::new(store)))
22}
23
24pub struct State<T: StateStore> {
25    pub store: T,
26}
27
28impl<T: StateStore> State<T> {
29    pub fn new(store: T) -> State<T> {
30        State { store }
31    }
32
33    // Get all the updates from the state
34    pub fn get_all(&self) -> Result<Vec<RouterPeerUpdate>> {
35        Ok(self.store.get_all())
36    }
37
38    // Get the updates for a specific router and peer
39    pub fn get_updates_by_peer(
40        &self,
41        router_addr: &IpAddr,
42        peer_addr: &IpAddr,
43    ) -> Result<Vec<TimedPrefix>> {
44        Ok(self.store.get_updates_by_peer(router_addr, peer_addr))
45    }
46
47    // Remove all updates for a specific router and peer
48    pub fn remove_updates(&mut self, router_addr: &IpAddr, peer_addr: &IpAddr) -> Result<()> {
49        self.store.remove_peer(router_addr, peer_addr);
50        Ok(())
51    }
52
53    // Update the state with a new update
54    pub fn update(
55        &mut self,
56        router_addr: &IpAddr,
57        peer_addr: &IpAddr,
58        update: &Update,
59    ) -> Result<bool, Box<dyn Error>> {
60        Ok(self.store.update(router_addr, &peer_addr, update))
61    }
62}
63
64#[derive(Serialize, Deserialize, Eq, Clone)]
65pub struct TimedPrefix {
66    pub prefix_addr: IpAddr,
67    pub prefix_len: u8,
68    pub is_post_policy: bool,
69    pub is_adj_rib_out: bool,
70    pub timestamp: i64,
71}
72
73impl PartialEq for TimedPrefix {
74    fn eq(&self, other: &Self) -> bool {
75        self.prefix_addr == other.prefix_addr
76            && self.prefix_len == other.prefix_len
77            && self.is_post_policy == other.is_post_policy
78            && self.is_adj_rib_out == other.is_adj_rib_out
79    }
80}
81
82impl Hash for TimedPrefix {
83    fn hash<H: Hasher>(&self, state: &mut H) {
84        self.prefix_addr.hash(state);
85        self.prefix_len.hash(state);
86        self.is_post_policy.hash(state);
87        self.is_adj_rib_out.hash(state);
88    }
89}
90
91pub fn synthesize_withdraw_update(prefix: TimedPrefix, metadata: UpdateMetadata) -> Update {
92    Update {
93        time_received_ns: Utc::now(),
94        time_bmp_header_ns: Utc::now(),
95        router_addr: metadata.router_addr,
96        router_port: metadata.router_port,
97        peer_addr: metadata.peer_addr,
98        peer_bgp_id: metadata.peer_bgp_id,
99        peer_asn: metadata.peer_asn,
100        prefix_addr: prefix.prefix_addr,
101        prefix_len: prefix.prefix_len,
102        is_post_policy: prefix.is_post_policy,
103        is_adj_rib_out: prefix.is_adj_rib_out,
104        announced: false,
105        next_hop: None,
106        origin: "INCOMPLETE".to_string(),
107        path: vec![],
108        local_preference: None,
109        med: None,
110        communities: vec![],
111        synthetic: true,
112    }
113}
114
115pub async fn peer_up_withdraws_handler<T: StateStore>(
116    state: AsyncState<T>,
117    tx: Sender<Update>,
118    metadata: UpdateMetadata,
119    sleep_time: u64,
120) {
121    let startup = chrono::Utc::now();
122    sleep(Duration::from_secs(sleep_time)).await;
123
124    info!(
125        "[{}]:{} - {} - removing updates older than {} after waited {} seconds",
126        metadata.router_addr, metadata.router_port, metadata.peer_addr, startup, sleep_time
127    );
128
129    let state_lock = state.lock().await;
130    let timed_prefixes = state_lock
131        .store
132        .get_updates_by_peer(&metadata.router_addr, &metadata.peer_addr);
133
134    drop(state_lock);
135
136    let mut synthetic_updates = Vec::new();
137    for timed_prefix in timed_prefixes {
138        if timed_prefix.timestamp < startup.timestamp_millis() {
139            // This update has not been re-announced after startup
140            // Emit a synthetic withdraw update
141            synthetic_updates.push(synthesize_withdraw_update(timed_prefix, metadata.clone()));
142        }
143    }
144
145    info!(
146        "[{}]:{} - {} - emitting {} synthetic withdraw updates",
147        metadata.router_addr,
148        metadata.router_port,
149        metadata.peer_addr,
150        synthetic_updates.len()
151    );
152
153    let mut state_lock = state.lock().await;
154    for update in &mut synthetic_updates {
155        trace!("{:?}", update);
156
157        // Sent to the event pipeline
158        tx.send(update.clone()).unwrap();
159
160        // Remove the update from the state
161        state_lock
162            .store
163            .update(&update.router_addr, &metadata.peer_addr, update);
164    }
165}