Skip to main content

drasi_source_ris_live/
messages.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! RIPE RIS Live protocol message types.
16
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19
20use crate::config::RisLiveSourceConfig;
21
22/// Wrapper for incoming server messages.
23#[derive(Debug, Clone, Deserialize)]
24pub struct RisIncomingMessage {
25    #[serde(rename = "type")]
26    pub msg_type: String,
27    pub data: Option<Value>,
28}
29
30/// Generic RIS error payload.
31#[derive(Debug, Clone, Deserialize)]
32pub struct RisErrorData {
33    pub message: String,
34}
35
36/// BGP announcement structure for UPDATE messages.
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct Announcement {
39    pub next_hop: String,
40    pub prefixes: Vec<String>,
41}
42
43/// AS path segment can be either an ASN or an AS_SET.
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45#[serde(untagged)]
46pub enum AsPathSegment {
47    Asn(i64),
48    AsSet(Vec<i64>),
49}
50
51/// Parsed `ris_message` payload data.
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53pub struct RisMessageData {
54    pub timestamp: Option<f64>,
55    pub peer: Option<String>,
56    pub peer_asn: Option<String>,
57    pub id: Option<String>,
58    pub host: Option<String>,
59    #[serde(rename = "type")]
60    pub msg_type: Option<String>,
61    pub path: Option<Vec<AsPathSegment>>,
62    pub origin: Option<String>,
63    pub community: Option<Vec<Vec<i64>>>,
64    pub announcements: Option<Vec<Announcement>>,
65    pub withdrawals: Option<Vec<String>>,
66    pub state: Option<String>,
67}
68
69/// Prefix filter in subscribe payload can be a single string or list.
70#[derive(Debug, Clone, Serialize, PartialEq)]
71#[serde(untagged)]
72pub enum PrefixFilter {
73    One(String),
74    Many(Vec<String>),
75}
76
77/// Socket options in subscribe payload.
78#[derive(Debug, Clone, Serialize, PartialEq)]
79#[serde(rename_all = "camelCase")]
80pub struct SocketOptions {
81    pub acknowledge: bool,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub include_raw: Option<bool>,
84}
85
86/// Data payload for `ris_subscribe`.
87#[derive(Debug, Clone, Serialize, PartialEq)]
88#[serde(rename_all = "camelCase")]
89pub struct RisSubscribeData {
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub host: Option<String>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    #[serde(rename = "type")]
94    pub msg_type: Option<String>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub require: Option<String>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub peer: Option<String>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub path: Option<String>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub prefix: Option<PrefixFilter>,
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub more_specific: Option<bool>,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub less_specific: Option<bool>,
107    #[serde(rename = "socketOptions")]
108    pub socket_options: SocketOptions,
109}
110
111/// Message sent by client to subscribe to RIS Live.
112#[derive(Debug, Clone, Serialize, PartialEq)]
113pub struct RisSubscribeMessage {
114    #[serde(rename = "type")]
115    pub msg_type: &'static str,
116    pub data: RisSubscribeData,
117}
118
119impl RisSubscribeMessage {
120    /// Build a `ris_subscribe` message from source config.
121    pub fn from_config(config: &RisLiveSourceConfig) -> Self {
122        let prefix = config.prefixes.as_ref().and_then(|prefixes| {
123            if prefixes.is_empty() {
124                None
125            } else if prefixes.len() == 1 {
126                Some(PrefixFilter::One(prefixes[0].clone()))
127            } else {
128                Some(PrefixFilter::Many(prefixes.clone()))
129            }
130        });
131
132        Self {
133            msg_type: "ris_subscribe",
134            data: RisSubscribeData {
135                host: config.host.clone(),
136                msg_type: config.message_type.clone(),
137                require: config.require.clone(),
138                peer: config.peer.clone(),
139                path: config.path.clone(),
140                prefix,
141                more_specific: config.more_specific,
142                less_specific: config.less_specific,
143                socket_options: SocketOptions {
144                    acknowledge: true,
145                    include_raw: Some(false),
146                },
147            },
148        }
149    }
150}
151
152/// Convert message timestamp in seconds to milliseconds.
153///
154/// Returns `None` for missing, NaN, or infinite timestamps.
155pub fn message_timestamp_millis(message: &RisMessageData) -> Option<i64> {
156    message.timestamp.and_then(|seconds| {
157        let ms = (seconds * 1000.0).round();
158        if ms.is_finite() {
159            i64::try_from(ms as i128).ok()
160        } else {
161            None
162        }
163    })
164}
165
166#[cfg(test)]
167mod tests {
168    use serde_json::json;
169
170    use crate::config::RisLiveSourceConfig;
171
172    use super::{
173        message_timestamp_millis, AsPathSegment, RisIncomingMessage, RisMessageData,
174        RisSubscribeMessage,
175    };
176
177    #[test]
178    fn deserialize_update_message_with_announcements() {
179        let payload = json!({
180            "type": "ris_message",
181            "data": {
182                "timestamp": 1773098494.83,
183                "peer": "208.80.153.193",
184                "peer_asn": "14907",
185                "id": "msg-1",
186                "host": "rrc00.ripe.net",
187                "type": "UPDATE",
188                "path": [14907, 3356, [64512, 64513]],
189                "origin": "INCOMPLETE",
190                "community": [[3356, 5]],
191                "announcements": [
192                    {"next_hop": "208.80.153.193", "prefixes": ["38.190.126.0/24"]}
193                ],
194                "withdrawals": []
195            }
196        });
197
198        let incoming: RisIncomingMessage =
199            serde_json::from_value(payload).expect("valid incoming payload");
200        assert_eq!(incoming.msg_type, "ris_message");
201
202        let msg: RisMessageData = serde_json::from_value(incoming.data.expect("message data"))
203            .expect("valid message data");
204        assert_eq!(msg.msg_type.as_deref(), Some("UPDATE"));
205        assert_eq!(msg.peer.as_deref(), Some("208.80.153.193"));
206        assert_eq!(msg.announcements.as_ref().map(Vec::len), Some(1));
207
208        let path = msg.path.expect("path should be present");
209        assert_eq!(path[0], AsPathSegment::Asn(14907));
210        assert_eq!(path[2], AsPathSegment::AsSet(vec![64512, 64513]));
211    }
212
213    #[test]
214    fn deserialize_withdraw_only_update() {
215        let payload = json!({
216            "timestamp": 1773098494.83,
217            "peer": "208.80.153.193",
218            "peer_asn": "14907",
219            "id": "msg-2",
220            "host": "rrc00.ripe.net",
221            "type": "UPDATE",
222            "withdrawals": ["203.0.113.0/24"]
223        });
224
225        let msg: RisMessageData = serde_json::from_value(payload).expect("valid message data");
226        assert!(msg.announcements.is_none());
227        assert_eq!(msg.withdrawals, Some(vec!["203.0.113.0/24".to_string()]));
228    }
229
230    #[test]
231    fn deserialize_peer_state_message() {
232        let payload = json!({
233            "timestamp": 1773098494.83,
234            "peer": "208.80.153.193",
235            "peer_asn": "14907",
236            "id": "msg-3",
237            "host": "rrc00.ripe.net",
238            "type": "RIS_PEER_STATE",
239            "state": "down"
240        });
241
242        let msg: RisMessageData = serde_json::from_value(payload).expect("valid message data");
243        assert_eq!(msg.msg_type.as_deref(), Some("RIS_PEER_STATE"));
244        assert_eq!(msg.state.as_deref(), Some("down"));
245    }
246
247    #[test]
248    fn build_subscribe_message_from_config() {
249        let config = RisLiveSourceConfig {
250            host: Some("rrc00".to_string()),
251            message_type: Some("UPDATE".to_string()),
252            prefixes: Some(vec!["203.0.113.0/24".to_string()]),
253            path: Some("3356".to_string()),
254            ..Default::default()
255        };
256
257        let message = RisSubscribeMessage::from_config(&config);
258        let as_json = serde_json::to_value(&message).expect("subscribe should serialize");
259
260        assert_eq!(as_json["type"], "ris_subscribe");
261        assert_eq!(as_json["data"]["host"], "rrc00");
262        assert_eq!(as_json["data"]["type"], "UPDATE");
263        assert_eq!(as_json["data"]["path"], "3356");
264        assert_eq!(as_json["data"]["prefix"], "203.0.113.0/24");
265        assert_eq!(as_json["data"]["socketOptions"]["acknowledge"], true);
266    }
267
268    #[test]
269    fn timestamp_conversion_to_millis() {
270        let message = RisMessageData {
271            timestamp: Some(1773098494.83),
272            peer: None,
273            peer_asn: None,
274            id: None,
275            host: None,
276            msg_type: None,
277            path: None,
278            origin: None,
279            community: None,
280            announcements: None,
281            withdrawals: None,
282            state: None,
283        };
284
285        assert_eq!(message_timestamp_millis(&message), Some(1_773_098_494_830));
286    }
287}