1use std::collections::HashSet;
18use std::sync::Arc;
19
20use chrono::Utc;
21use drasi_core::models::{
22 Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
23};
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26
27use crate::messages::{message_timestamp_millis, AsPathSegment, RisMessageData};
28
29#[derive(Debug, Clone, Default, PartialEq, Eq)]
31pub struct StreamState {
32 pub known_peers: HashSet<String>,
34 pub known_prefixes: HashSet<String>,
36 pub active_routes: HashSet<String>,
38}
39
40#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
42pub struct PersistedStreamState {
43 pub known_peers: Vec<String>,
44 pub known_prefixes: Vec<String>,
45 pub active_routes: Vec<String>,
46}
47
48impl From<&StreamState> for PersistedStreamState {
49 fn from(state: &StreamState) -> Self {
50 let mut known_peers = state.known_peers.iter().cloned().collect::<Vec<_>>();
51 let mut known_prefixes = state.known_prefixes.iter().cloned().collect::<Vec<_>>();
52 let mut active_routes = state.active_routes.iter().cloned().collect::<Vec<_>>();
53 known_peers.sort();
54 known_prefixes.sort();
55 active_routes.sort();
56 Self {
57 known_peers,
58 known_prefixes,
59 active_routes,
60 }
61 }
62}
63
64impl From<PersistedStreamState> for StreamState {
65 fn from(state: PersistedStreamState) -> Self {
66 Self {
67 known_peers: state.known_peers.into_iter().collect(),
68 known_prefixes: state.known_prefixes.into_iter().collect(),
69 active_routes: state.active_routes.into_iter().collect(),
70 }
71 }
72}
73
74pub struct GraphMapper {
76 source_id: String,
77 state: StreamState,
78}
79
80impl GraphMapper {
81 pub fn new(source_id: impl Into<String>, state: StreamState) -> Self {
83 Self {
84 source_id: source_id.into(),
85 state,
86 }
87 }
88
89 pub fn state(&self) -> &StreamState {
91 &self.state
92 }
93
94 pub fn process_announcements(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
96 let peer_id = match peer_node_id(message) {
97 Some(value) => value,
98 None => return Vec::new(),
99 };
100
101 let effective_from = effective_from(message);
102 let mut changes = Vec::new();
103
104 if self.state.known_peers.insert(peer_id.clone()) {
105 changes.push(SourceChange::Insert {
106 element: build_peer_node(&self.source_id, &peer_id, message, effective_from),
107 });
108 }
109
110 for announcement in message.announcements.as_ref().into_iter().flatten() {
111 for prefix in &announcement.prefixes {
112 if self.state.known_prefixes.insert(prefix.clone()) {
113 changes.push(SourceChange::Insert {
114 element: build_prefix_node(&self.source_id, prefix, effective_from),
115 });
116 }
117
118 let route_id = route_id(message, prefix);
119 let route_element = build_route_relation(
120 &self.source_id,
121 &peer_id,
122 prefix,
123 &route_id,
124 message,
125 &announcement.next_hop,
126 effective_from,
127 );
128
129 if self.state.active_routes.insert(route_id) {
130 changes.push(SourceChange::Insert {
131 element: route_element,
132 });
133 } else {
134 changes.push(SourceChange::Update {
135 element: route_element,
136 });
137 }
138 }
139 }
140
141 changes
142 }
143
144 pub fn process_withdrawals(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
146 let effective_from = effective_from(message);
147 if message.peer.is_none() || message.host.is_none() {
148 return Vec::new();
149 }
150
151 let mut changes = Vec::new();
152 for prefix in message.withdrawals.as_ref().into_iter().flatten() {
153 let route_id = route_id(message, prefix);
154 if self.state.active_routes.remove(&route_id) {
155 changes.push(SourceChange::Delete {
156 metadata: relation_metadata(&self.source_id, &route_id, effective_from),
157 });
158 }
159 }
160 changes
161 }
162
163 pub fn process_peer_state(&mut self, message: &RisMessageData) -> Vec<SourceChange> {
165 let peer_id = match peer_node_id(message) {
166 Some(value) => value,
167 None => return Vec::new(),
168 };
169
170 let effective_from = effective_from(message);
171 let peer_node = build_peer_node(&self.source_id, &peer_id, message, effective_from);
172
173 if self.state.known_peers.insert(peer_id) {
174 vec![SourceChange::Insert { element: peer_node }]
175 } else {
176 vec![SourceChange::Update { element: peer_node }]
177 }
178 }
179}
180
181fn effective_from(message: &RisMessageData) -> u64 {
182 message_timestamp_millis(message)
183 .and_then(|ts| u64::try_from(ts).ok())
184 .unwrap_or_else(|| Utc::now().timestamp_millis().max(0) as u64)
185}
186
187fn peer_node_id(message: &RisMessageData) -> Option<String> {
188 let host = message.host.as_deref()?;
189 let peer = message.peer.as_deref()?;
190 Some(format!("{host}|{peer}"))
191}
192
193fn route_id(message: &RisMessageData, prefix: &str) -> String {
194 let host = message.host.as_deref().unwrap_or("unknown");
195 let peer = message.peer.as_deref().unwrap_or("unknown");
196 format!("{host}|{peer}|{prefix}")
197}
198
199fn relation_metadata(source_id: &str, route_id: &str, effective_from: u64) -> ElementMetadata {
200 ElementMetadata {
201 reference: ElementReference::new(source_id, route_id),
202 labels: Arc::from(vec![Arc::from("ROUTES")]),
203 effective_from,
204 }
205}
206
207fn build_peer_node(
208 source_id: &str,
209 peer_id: &str,
210 message: &RisMessageData,
211 effective_from: u64,
212) -> Element {
213 let mut properties = ElementPropertyMap::new();
214 if let Some(peer) = &message.peer {
215 properties.insert("peer_ip", ElementValue::String(Arc::from(peer.as_str())));
216 }
217 if let Some(peer_asn) = &message.peer_asn {
218 properties.insert(
219 "peer_asn",
220 ElementValue::String(Arc::from(peer_asn.as_str())),
221 );
222 }
223 if let Some(host) = &message.host {
224 properties.insert("host", ElementValue::String(Arc::from(host.as_str())));
225 }
226 if let Some(state) = &message.state {
227 properties.insert("state", ElementValue::String(Arc::from(state.as_str())));
228 }
229 if let Some(id) = &message.id {
230 properties.insert("msg_id", ElementValue::String(Arc::from(id.as_str())));
231 }
232 if let Some(timestamp) = message.timestamp {
233 properties.insert("timestamp", ElementValue::from(&json!(timestamp)));
234 }
235
236 Element::Node {
237 metadata: ElementMetadata {
238 reference: ElementReference::new(source_id, peer_id),
239 labels: Arc::from(vec![Arc::from("Peer")]),
240 effective_from,
241 },
242 properties,
243 }
244}
245
246fn build_prefix_node(source_id: &str, prefix: &str, effective_from: u64) -> Element {
247 let mut properties = ElementPropertyMap::new();
248 properties.insert("prefix", ElementValue::String(Arc::from(prefix)));
249
250 Element::Node {
251 metadata: ElementMetadata {
252 reference: ElementReference::new(source_id, prefix),
253 labels: Arc::from(vec![Arc::from("Prefix")]),
254 effective_from,
255 },
256 properties,
257 }
258}
259
260fn build_route_relation(
261 source_id: &str,
262 peer_id: &str,
263 prefix: &str,
264 route_id: &str,
265 message: &RisMessageData,
266 next_hop: &str,
267 effective_from: u64,
268) -> Element {
269 let mut properties = ElementPropertyMap::new();
270 properties.insert("next_hop", ElementValue::String(Arc::from(next_hop)));
271 properties.insert("prefix", ElementValue::String(Arc::from(prefix)));
272
273 if let Some(peer) = &message.peer {
274 properties.insert("peer", ElementValue::String(Arc::from(peer.as_str())));
275 }
276 if let Some(peer_asn) = &message.peer_asn {
277 properties.insert(
278 "peer_asn",
279 ElementValue::String(Arc::from(peer_asn.as_str())),
280 );
281 }
282 if let Some(host) = &message.host {
283 properties.insert("host", ElementValue::String(Arc::from(host.as_str())));
284 }
285 if let Some(origin) = &message.origin {
286 properties.insert("origin", ElementValue::String(Arc::from(origin.as_str())));
287 }
288 if let Some(timestamp) = message.timestamp {
289 properties.insert("timestamp", ElementValue::from(&json!(timestamp)));
290 }
291 if let Some(id) = &message.id {
292 properties.insert("msg_id", ElementValue::String(Arc::from(id.as_str())));
293 }
294
295 if let Some(path) = &message.path {
296 if let Ok(serialized) = serde_json::to_string(path) {
297 properties.insert("path", ElementValue::String(Arc::from(serialized.as_str())));
298 }
299 properties.insert("path_length", ElementValue::Integer(path_length(path)));
300 if let Some(origin_asn) = origin_asn(path) {
301 properties.insert("origin_asn", ElementValue::Integer(origin_asn));
302 }
303 }
304
305 if let Some(community) = &message.community {
306 if let Ok(serialized) = serde_json::to_string(community) {
307 properties.insert(
308 "community",
309 ElementValue::String(Arc::from(serialized.as_str())),
310 );
311 }
312 }
313
314 Element::Relation {
315 metadata: relation_metadata(source_id, route_id, effective_from),
316 in_node: ElementReference::new(source_id, peer_id),
317 out_node: ElementReference::new(source_id, prefix),
318 properties,
319 }
320}
321
322fn path_length(path: &[AsPathSegment]) -> i64 {
323 path.iter()
324 .map(|segment| match segment {
325 AsPathSegment::Asn(_) => 1i64,
326 AsPathSegment::AsSet(as_set) => i64::try_from(as_set.len()).unwrap_or(i64::MAX),
327 })
328 .sum()
329}
330
331fn origin_asn(path: &[AsPathSegment]) -> Option<i64> {
332 for segment in path.iter().rev() {
333 match segment {
334 AsPathSegment::Asn(asn) => return Some(*asn),
335 AsPathSegment::AsSet(as_set) => {
336 if let Some(last) = as_set.last() {
337 return Some(*last);
338 }
339 }
340 }
341 }
342 None
343}
344
345#[cfg(test)]
346mod tests {
347 use drasi_core::models::Element;
348
349 use crate::messages::Announcement;
350
351 use super::{GraphMapper, RisMessageData, SourceChange, StreamState};
352
353 fn base_update() -> RisMessageData {
354 RisMessageData {
355 timestamp: Some(1_773_098_494.83),
356 peer: Some("208.80.153.193".to_string()),
357 peer_asn: Some("14907".to_string()),
358 id: Some("msg-1".to_string()),
359 host: Some("rrc00.ripe.net".to_string()),
360 msg_type: Some("UPDATE".to_string()),
361 path: None,
362 origin: Some("IGP".to_string()),
363 community: None,
364 announcements: Some(vec![Announcement {
365 next_hop: "208.80.153.193".to_string(),
366 prefixes: vec!["203.0.113.0/24".to_string()],
367 }]),
368 withdrawals: Some(Vec::new()),
369 state: None,
370 }
371 }
372
373 #[test]
374 fn announcement_creates_peer_prefix_and_route() {
375 let mut mapper = GraphMapper::new("ris-source", StreamState::default());
376 let message = base_update();
377
378 let changes = mapper.process_announcements(&message);
379 assert_eq!(changes.len(), 3);
380
381 let peer_inserted = changes.iter().any(|change| {
382 matches!(
383 change,
384 SourceChange::Insert {
385 element: Element::Node { metadata, .. }
386 } if metadata.labels.iter().any(|label| label.as_ref() == "Peer")
387 )
388 });
389 assert!(peer_inserted);
390
391 let prefix_inserted = changes.iter().any(|change| {
392 matches!(
393 change,
394 SourceChange::Insert {
395 element: Element::Node { metadata, .. }
396 } if metadata.labels.iter().any(|label| label.as_ref() == "Prefix")
397 )
398 });
399 assert!(prefix_inserted);
400
401 let route_inserted = changes.iter().any(|change| {
402 matches!(
403 change,
404 SourceChange::Insert {
405 element: Element::Relation {
406 metadata,
407 in_node,
408 out_node,
409 ..
410 }
411 } if metadata.labels.iter().any(|label| label.as_ref() == "ROUTES")
412 && in_node.element_id.as_ref() == "rrc00.ripe.net|208.80.153.193"
413 && out_node.element_id.as_ref() == "203.0.113.0/24"
414 )
415 });
416 assert!(route_inserted);
417 }
418
419 #[test]
420 fn reannouncement_updates_existing_route() {
421 let mut mapper = GraphMapper::new("ris-source", StreamState::default());
422 let message = base_update();
423 let _ = mapper.process_announcements(&message);
424
425 let mut second = base_update();
426 second.id = Some("msg-2".to_string());
427 second.announcements = Some(vec![Announcement {
428 next_hop: "198.51.100.10".to_string(),
429 prefixes: vec!["203.0.113.0/24".to_string()],
430 }]);
431
432 let changes = mapper.process_announcements(&second);
433 assert_eq!(changes.len(), 1);
434 assert!(matches!(
435 changes[0],
436 SourceChange::Update {
437 element: Element::Relation { .. }
438 }
439 ));
440 }
441
442 #[test]
443 fn withdrawal_deletes_existing_route() {
444 let mut mapper = GraphMapper::new("ris-source", StreamState::default());
445 let message = base_update();
446 let _ = mapper.process_announcements(&message);
447
448 let mut withdraw = base_update();
449 withdraw.announcements = None;
450 withdraw.withdrawals = Some(vec!["203.0.113.0/24".to_string()]);
451
452 let changes = mapper.process_withdrawals(&withdraw);
453 assert_eq!(changes.len(), 1);
454 assert!(matches!(changes[0], SourceChange::Delete { .. }));
455 }
456
457 #[test]
458 fn peer_state_upserts_peer_node() {
459 let mut mapper = GraphMapper::new("ris-source", StreamState::default());
460 let mut peer_state = base_update();
461 peer_state.msg_type = Some("RIS_PEER_STATE".to_string());
462 peer_state.state = Some("down".to_string());
463 peer_state.announcements = None;
464 peer_state.withdrawals = None;
465
466 let first = mapper.process_peer_state(&peer_state);
467 assert!(matches!(
468 first[0],
469 SourceChange::Insert {
470 element: Element::Node { .. }
471 }
472 ));
473
474 let second = mapper.process_peer_state(&peer_state);
475 assert!(matches!(
476 second[0],
477 SourceChange::Update {
478 element: Element::Node { .. }
479 }
480 ));
481 }
482}