Skip to main content

drasi_source_ris_live/
mapping.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! BGP-to-graph mapping logic for RIS Live source messages.
16
17use std::collections::HashSet;
18use std::sync::Arc;
19
20use chrono::Utc;
21use drasi_core::models::{
22    Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
23};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26
27use crate::messages::{message_timestamp_millis, AsPathSegment, RisMessageData};
28
29/// Runtime graph state for mapping INSERT vs UPDATE vs DELETE.
30#[derive(Debug, Clone, Default, PartialEq, Eq)]
31pub struct StreamState {
32    /// Known peer node IDs (`{host}|{peer_ip}`).
33    pub known_peers: HashSet<String>,
34    /// Known prefix node IDs (`{prefix}`).
35    pub known_prefixes: HashSet<String>,
36    /// Active ROUTES relationship IDs (`{host}|{peer_ip}|{prefix}`).
37    pub active_routes: HashSet<String>,
38}
39
40/// Persisted representation of [`StreamState`].
41#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
42pub struct PersistedStreamState {
43    pub known_peers: Vec<String>,
44    pub known_prefixes: Vec<String>,
45    pub active_routes: Vec<String>,
46}
47
48impl From<&StreamState> for PersistedStreamState {
49    fn from(state: &StreamState) -> Self {
50        let mut known_peers = state.known_peers.iter().cloned().collect::<Vec<_>>();
51        let mut known_prefixes = state.known_prefixes.iter().cloned().collect::<Vec<_>>();
52        let mut active_routes = state.active_routes.iter().cloned().collect::<Vec<_>>();
53        known_peers.sort();
54        known_prefixes.sort();
55        active_routes.sort();
56        Self {
57            known_peers,
58            known_prefixes,
59            active_routes,
60        }
61    }
62}
63
64impl From<PersistedStreamState> for StreamState {
65    fn from(state: PersistedStreamState) -> Self {
66        Self {
67            known_peers: state.known_peers.into_iter().collect(),
68            known_prefixes: state.known_prefixes.into_iter().collect(),
69            active_routes: state.active_routes.into_iter().collect(),
70        }
71    }
72}
73
74/// Converts RIS messages to graph `SourceChange` events.
75pub struct GraphMapper {
76    source_id: String,
77    state: StreamState,
78}
79
80impl GraphMapper {
81    /// Create a mapper with initial state.
82    pub fn new(source_id: impl Into<String>, state: StreamState) -> Self {
83        Self {
84            source_id: source_id.into(),
85            state,
86        }
87    }
88
89    /// Returns current state.
90    pub fn state(&self) -> &StreamState {
91        &self.state
92    }
93
94    /// Process announcement entries from an UPDATE message.
95    pub fn process_announcements(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
96        let peer_id = match peer_node_id(message) {
97            Some(value) => value,
98            None => return Vec::new(),
99        };
100
101        let effective_from = effective_from(message);
102        let mut changes = Vec::new();
103
104        if self.state.known_peers.insert(peer_id.clone()) {
105            changes.push(SourceChange::Insert {
106                element: build_peer_node(&self.source_id, &peer_id, message, effective_from),
107            });
108        }
109
110        for announcement in message.announcements.as_ref().into_iter().flatten() {
111            for prefix in &announcement.prefixes {
112                if self.state.known_prefixes.insert(prefix.clone()) {
113                    changes.push(SourceChange::Insert {
114                        element: build_prefix_node(&self.source_id, prefix, effective_from),
115                    });
116                }
117
118                let route_id = route_id(message, prefix);
119                let route_element = build_route_relation(
120                    &self.source_id,
121                    &peer_id,
122                    prefix,
123                    &route_id,
124                    message,
125                    &announcement.next_hop,
126                    effective_from,
127                );
128
129                if self.state.active_routes.insert(route_id) {
130                    changes.push(SourceChange::Insert {
131                        element: route_element,
132                    });
133                } else {
134                    changes.push(SourceChange::Update {
135                        element: route_element,
136                    });
137                }
138            }
139        }
140
141        changes
142    }
143
144    /// Process withdrawals from an UPDATE message.
145    pub fn process_withdrawals(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
146        let effective_from = effective_from(message);
147        if message.peer.is_none() || message.host.is_none() {
148            return Vec::new();
149        }
150
151        let mut changes = Vec::new();
152        for prefix in message.withdrawals.as_ref().into_iter().flatten() {
153            let route_id = route_id(message, prefix);
154            if self.state.active_routes.remove(&route_id) {
155                changes.push(SourceChange::Delete {
156                    metadata: relation_metadata(&self.source_id, &route_id, effective_from),
157                });
158            }
159        }
160        changes
161    }
162
163    /// Process `RIS_PEER_STATE` messages to upsert peer state.
164    pub fn process_peer_state(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
165        let peer_id = match peer_node_id(message) {
166            Some(value) => value,
167            None => return Vec::new(),
168        };
169
170        let effective_from = effective_from(message);
171        let peer_node = build_peer_node(&self.source_id, &peer_id, message, effective_from);
172
173        if self.state.known_peers.insert(peer_id) {
174            vec![SourceChange::Insert { element: peer_node }]
175        } else {
176            vec![SourceChange::Update { element: peer_node }]
177        }
178    }
179}
180
181fn effective_from(message: &RisMessageData) -> u64 {
182    message_timestamp_millis(message)
183        .and_then(|ts| u64::try_from(ts).ok())
184        .unwrap_or_else(|| Utc::now().timestamp_millis().max(0) as u64)
185}
186
187fn peer_node_id(message: &RisMessageData) -> Option<String> {
188    let host = message.host.as_deref()?;
189    let peer = message.peer.as_deref()?;
190    Some(format!("{host}|{peer}"))
191}
192
193fn route_id(message: &RisMessageData, prefix: &str) -> String {
194    let host = message.host.as_deref().unwrap_or("unknown");
195    let peer = message.peer.as_deref().unwrap_or("unknown");
196    format!("{host}|{peer}|{prefix}")
197}
198
199fn relation_metadata(source_id: &str, route_id: &str, effective_from: u64) -> ElementMetadata {
200    ElementMetadata {
201        reference: ElementReference::new(source_id, route_id),
202        labels: Arc::from(vec![Arc::from("ROUTES")]),
203        effective_from,
204    }
205}
206
207fn build_peer_node(
208    source_id: &str,
209    peer_id: &str,
210    message: &RisMessageData,
211    effective_from: u64,
212) -> Element {
213    let mut properties = ElementPropertyMap::new();
214    if let Some(peer) = &message.peer {
215        properties.insert("peer_ip", ElementValue::String(Arc::from(peer.as_str())));
216    }
217    if let Some(peer_asn) = &message.peer_asn {
218        properties.insert(
219            "peer_asn",
220            ElementValue::String(Arc::from(peer_asn.as_str())),
221        );
222    }
223    if let Some(host) = &message.host {
224        properties.insert("host", ElementValue::String(Arc::from(host.as_str())));
225    }
226    if let Some(state) = &message.state {
227        properties.insert("state", ElementValue::String(Arc::from(state.as_str())));
228    }
229    if let Some(id) = &message.id {
230        properties.insert("msg_id", ElementValue::String(Arc::from(id.as_str())));
231    }
232    if let Some(timestamp) = message.timestamp {
233        properties.insert("timestamp", ElementValue::from(&json!(timestamp)));
234    }
235
236    Element::Node {
237        metadata: ElementMetadata {
238            reference: ElementReference::new(source_id, peer_id),
239            labels: Arc::from(vec![Arc::from("Peer")]),
240            effective_from,
241        },
242        properties,
243    }
244}
245
246fn build_prefix_node(source_id: &str, prefix: &str, effective_from: u64) -> Element {
247    let mut properties = ElementPropertyMap::new();
248    properties.insert("prefix", ElementValue::String(Arc::from(prefix)));
249
250    Element::Node {
251        metadata: ElementMetadata {
252            reference: ElementReference::new(source_id, prefix),
253            labels: Arc::from(vec![Arc::from("Prefix")]),
254            effective_from,
255        },
256        properties,
257    }
258}
259
260fn build_route_relation(
261    source_id: &str,
262    peer_id: &str,
263    prefix: &str,
264    route_id: &str,
265    message: &RisMessageData,
266    next_hop: &str,
267    effective_from: u64,
268) -> Element {
269    let mut properties = ElementPropertyMap::new();
270    properties.insert("next_hop", ElementValue::String(Arc::from(next_hop)));
271    properties.insert("prefix", ElementValue::String(Arc::from(prefix)));
272
273    if let Some(peer) = &message.peer {
274        properties.insert("peer", ElementValue::String(Arc::from(peer.as_str())));
275    }
276    if let Some(peer_asn) = &message.peer_asn {
277        properties.insert(
278            "peer_asn",
279            ElementValue::String(Arc::from(peer_asn.as_str())),
280        );
281    }
282    if let Some(host) = &message.host {
283        properties.insert("host", ElementValue::String(Arc::from(host.as_str())));
284    }
285    if let Some(origin) = &message.origin {
286        properties.insert("origin", ElementValue::String(Arc::from(origin.as_str())));
287    }
288    if let Some(timestamp) = message.timestamp {
289        properties.insert("timestamp", ElementValue::from(&json!(timestamp)));
290    }
291    if let Some(id) = &message.id {
292        properties.insert("msg_id", ElementValue::String(Arc::from(id.as_str())));
293    }
294
295    if let Some(path) = &message.path {
296        if let Ok(serialized) = serde_json::to_string(path) {
297            properties.insert("path", ElementValue::String(Arc::from(serialized.as_str())));
298        }
299        properties.insert("path_length", ElementValue::Integer(path_length(path)));
300        if let Some(origin_asn) = origin_asn(path) {
301            properties.insert("origin_asn", ElementValue::Integer(origin_asn));
302        }
303    }
304
305    if let Some(community) = &message.community {
306        if let Ok(serialized) = serde_json::to_string(community) {
307            properties.insert(
308                "community",
309                ElementValue::String(Arc::from(serialized.as_str())),
310            );
311        }
312    }
313
314    Element::Relation {
315        metadata: relation_metadata(source_id, route_id, effective_from),
316        in_node: ElementReference::new(source_id, peer_id),
317        out_node: ElementReference::new(source_id, prefix),
318        properties,
319    }
320}
321
322fn path_length(path: &[AsPathSegment]) -> i64 {
323    path.iter()
324        .map(|segment| match segment {
325            AsPathSegment::Asn(_) => 1i64,
326            AsPathSegment::AsSet(as_set) => i64::try_from(as_set.len()).unwrap_or(i64::MAX),
327        })
328        .sum()
329}
330
331fn origin_asn(path: &[AsPathSegment]) -> Option<i64> {
332    for segment in path.iter().rev() {
333        match segment {
334            AsPathSegment::Asn(asn) => return Some(*asn),
335            AsPathSegment::AsSet(as_set) => {
336                if let Some(last) = as_set.last() {
337                    return Some(*last);
338                }
339            }
340        }
341    }
342    None
343}
344
345#[cfg(test)]
346mod tests {
347    use drasi_core::models::Element;
348
349    use crate::messages::Announcement;
350
351    use super::{GraphMapper, RisMessageData, SourceChange, StreamState};
352
353    fn base_update() -> RisMessageData {
354        RisMessageData {
355            timestamp: Some(1_773_098_494.83),
356            peer: Some("208.80.153.193".to_string()),
357            peer_asn: Some("14907".to_string()),
358            id: Some("msg-1".to_string()),
359            host: Some("rrc00.ripe.net".to_string()),
360            msg_type: Some("UPDATE".to_string()),
361            path: None,
362            origin: Some("IGP".to_string()),
363            community: None,
364            announcements: Some(vec![Announcement {
365                next_hop: "208.80.153.193".to_string(),
366                prefixes: vec!["203.0.113.0/24".to_string()],
367            }]),
368            withdrawals: Some(Vec::new()),
369            state: None,
370        }
371    }
372
373    #[test]
374    fn announcement_creates_peer_prefix_and_route() {
375        let mut mapper = GraphMapper::new("ris-source", StreamState::default());
376        let message = base_update();
377
378        let changes = mapper.process_announcements(&message);
379        assert_eq!(changes.len(), 3);
380
381        let peer_inserted = changes.iter().any(|change| {
382            matches!(
383                change,
384                SourceChange::Insert {
385                    element: Element::Node { metadata, .. }
386                } if metadata.labels.iter().any(|label| label.as_ref() == "Peer")
387            )
388        });
389        assert!(peer_inserted);
390
391        let prefix_inserted = changes.iter().any(|change| {
392            matches!(
393                change,
394                SourceChange::Insert {
395                    element: Element::Node { metadata, .. }
396                } if metadata.labels.iter().any(|label| label.as_ref() == "Prefix")
397            )
398        });
399        assert!(prefix_inserted);
400
401        let route_inserted = changes.iter().any(|change| {
402            matches!(
403                change,
404                SourceChange::Insert {
405                    element: Element::Relation {
406                        metadata,
407                        in_node,
408                        out_node,
409                        ..
410                    }
411                } if metadata.labels.iter().any(|label| label.as_ref() == "ROUTES")
412                    && in_node.element_id.as_ref() == "rrc00.ripe.net|208.80.153.193"
413                    && out_node.element_id.as_ref() == "203.0.113.0/24"
414            )
415        });
416        assert!(route_inserted);
417    }
418
419    #[test]
420    fn reannouncement_updates_existing_route() {
421        let mut mapper = GraphMapper::new("ris-source", StreamState::default());
422        let message = base_update();
423        let _ = mapper.process_announcements(&message);
424
425        let mut second = base_update();
426        second.id = Some("msg-2".to_string());
427        second.announcements = Some(vec![Announcement {
428            next_hop: "198.51.100.10".to_string(),
429            prefixes: vec!["203.0.113.0/24".to_string()],
430        }]);
431
432        let changes = mapper.process_announcements(&second);
433        assert_eq!(changes.len(), 1);
434        assert!(matches!(
435            changes[0],
436            SourceChange::Update {
437                element: Element::Relation { .. }
438            }
439        ));
440    }
441
442    #[test]
443    fn withdrawal_deletes_existing_route() {
444        let mut mapper = GraphMapper::new("ris-source", StreamState::default());
445        let message = base_update();
446        let _ = mapper.process_announcements(&message);
447
448        let mut withdraw = base_update();
449        withdraw.announcements = None;
450        withdraw.withdrawals = Some(vec!["203.0.113.0/24".to_string()]);
451
452        let changes = mapper.process_withdrawals(&withdraw);
453        assert_eq!(changes.len(), 1);
454        assert!(matches!(changes[0], SourceChange::Delete { .. }));
455    }
456
457    #[test]
458    fn peer_state_upserts_peer_node() {
459        let mut mapper = GraphMapper::new("ris-source", StreamState::default());
460        let mut peer_state = base_update();
461        peer_state.msg_type = Some("RIS_PEER_STATE".to_string());
462        peer_state.state = Some("down".to_string());
463        peer_state.announcements = None;
464        peer_state.withdrawals = None;
465
466        let first = mapper.process_peer_state(&peer_state);
467        assert!(matches!(
468            first[0],
469            SourceChange::Insert {
470                element: Element::Node { .. }
471            }
472        ));
473
474        let second = mapper.process_peer_state(&peer_state);
475        assert!(matches!(
476            second[0],
477            SourceChange::Update {
478                element: Element::Node { .. }
479            }
480        ));
481    }
482}