1use serde::{Deserialize, Serialize};
18use serde_json::Value;
19
20use crate::config::RisLiveSourceConfig;
21
22#[derive(Debug, Clone, Deserialize)]
24pub struct RisIncomingMessage {
25 #[serde(rename = "type")]
26 pub msg_type: String,
27 pub data: Option<Value>,
28}
29
30#[derive(Debug, Clone, Deserialize)]
32pub struct RisErrorData {
33 pub message: String,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38pub struct Announcement {
39 pub next_hop: String,
40 pub prefixes: Vec<String>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45#[serde(untagged)]
46pub enum AsPathSegment {
47 Asn(i64),
48 AsSet(Vec<i64>),
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53pub struct RisMessageData {
54 pub timestamp: Option<f64>,
55 pub peer: Option<String>,
56 pub peer_asn: Option<String>,
57 pub id: Option<String>,
58 pub host: Option<String>,
59 #[serde(rename = "type")]
60 pub msg_type: Option<String>,
61 pub path: Option<Vec<AsPathSegment>>,
62 pub origin: Option<String>,
63 pub community: Option<Vec<Vec<i64>>>,
64 pub announcements: Option<Vec<Announcement>>,
65 pub withdrawals: Option<Vec<String>>,
66 pub state: Option<String>,
67}
68
69#[derive(Debug, Clone, Serialize, PartialEq)]
71#[serde(untagged)]
72pub enum PrefixFilter {
73 One(String),
74 Many(Vec<String>),
75}
76
77#[derive(Debug, Clone, Serialize, PartialEq)]
79#[serde(rename_all = "camelCase")]
80pub struct SocketOptions {
81 pub acknowledge: bool,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub include_raw: Option<bool>,
84}
85
86#[derive(Debug, Clone, Serialize, PartialEq)]
88#[serde(rename_all = "camelCase")]
89pub struct RisSubscribeData {
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub host: Option<String>,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 #[serde(rename = "type")]
94 pub msg_type: Option<String>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub require: Option<String>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub peer: Option<String>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub path: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
102 pub prefix: Option<PrefixFilter>,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub more_specific: Option<bool>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub less_specific: Option<bool>,
107 #[serde(rename = "socketOptions")]
108 pub socket_options: SocketOptions,
109}
110
111#[derive(Debug, Clone, Serialize, PartialEq)]
113pub struct RisSubscribeMessage {
114 #[serde(rename = "type")]
115 pub msg_type: &'static str,
116 pub data: RisSubscribeData,
117}
118
119impl RisSubscribeMessage {
120 pub fn from_config(config: &RisLiveSourceConfig) -> Self {
122 let prefix = config.prefixes.as_ref().and_then(|prefixes| {
123 if prefixes.is_empty() {
124 None
125 } else if prefixes.len() == 1 {
126 Some(PrefixFilter::One(prefixes[0].clone()))
127 } else {
128 Some(PrefixFilter::Many(prefixes.clone()))
129 }
130 });
131
132 Self {
133 msg_type: "ris_subscribe",
134 data: RisSubscribeData {
135 host: config.host.clone(),
136 msg_type: config.message_type.clone(),
137 require: config.require.clone(),
138 peer: config.peer.clone(),
139 path: config.path.clone(),
140 prefix,
141 more_specific: config.more_specific,
142 less_specific: config.less_specific,
143 socket_options: SocketOptions {
144 acknowledge: true,
145 include_raw: Some(false),
146 },
147 },
148 }
149 }
150}
151
152pub fn message_timestamp_millis(message: &RisMessageData) -> Option<i64> {
156 message.timestamp.and_then(|seconds| {
157 let ms = (seconds * 1000.0).round();
158 if ms.is_finite() {
159 i64::try_from(ms as i128).ok()
160 } else {
161 None
162 }
163 })
164}
165
166#[cfg(test)]
167mod tests {
168 use serde_json::json;
169
170 use crate::config::RisLiveSourceConfig;
171
172 use super::{
173 message_timestamp_millis, AsPathSegment, RisIncomingMessage, RisMessageData,
174 RisSubscribeMessage,
175 };
176
177 #[test]
178 fn deserialize_update_message_with_announcements() {
179 let payload = json!({
180 "type": "ris_message",
181 "data": {
182 "timestamp": 1773098494.83,
183 "peer": "208.80.153.193",
184 "peer_asn": "14907",
185 "id": "msg-1",
186 "host": "rrc00.ripe.net",
187 "type": "UPDATE",
188 "path": [14907, 3356, [64512, 64513]],
189 "origin": "INCOMPLETE",
190 "community": [[3356, 5]],
191 "announcements": [
192 {"next_hop": "208.80.153.193", "prefixes": ["38.190.126.0/24"]}
193 ],
194 "withdrawals": []
195 }
196 });
197
198 let incoming: RisIncomingMessage =
199 serde_json::from_value(payload).expect("valid incoming payload");
200 assert_eq!(incoming.msg_type, "ris_message");
201
202 let msg: RisMessageData = serde_json::from_value(incoming.data.expect("message data"))
203 .expect("valid message data");
204 assert_eq!(msg.msg_type.as_deref(), Some("UPDATE"));
205 assert_eq!(msg.peer.as_deref(), Some("208.80.153.193"));
206 assert_eq!(msg.announcements.as_ref().map(Vec::len), Some(1));
207
208 let path = msg.path.expect("path should be present");
209 assert_eq!(path[0], AsPathSegment::Asn(14907));
210 assert_eq!(path[2], AsPathSegment::AsSet(vec![64512, 64513]));
211 }
212
213 #[test]
214 fn deserialize_withdraw_only_update() {
215 let payload = json!({
216 "timestamp": 1773098494.83,
217 "peer": "208.80.153.193",
218 "peer_asn": "14907",
219 "id": "msg-2",
220 "host": "rrc00.ripe.net",
221 "type": "UPDATE",
222 "withdrawals": ["203.0.113.0/24"]
223 });
224
225 let msg: RisMessageData = serde_json::from_value(payload).expect("valid message data");
226 assert!(msg.announcements.is_none());
227 assert_eq!(msg.withdrawals, Some(vec!["203.0.113.0/24".to_string()]));
228 }
229
230 #[test]
231 fn deserialize_peer_state_message() {
232 let payload = json!({
233 "timestamp": 1773098494.83,
234 "peer": "208.80.153.193",
235 "peer_asn": "14907",
236 "id": "msg-3",
237 "host": "rrc00.ripe.net",
238 "type": "RIS_PEER_STATE",
239 "state": "down"
240 });
241
242 let msg: RisMessageData = serde_json::from_value(payload).expect("valid message data");
243 assert_eq!(msg.msg_type.as_deref(), Some("RIS_PEER_STATE"));
244 assert_eq!(msg.state.as_deref(), Some("down"));
245 }
246
247 #[test]
248 fn build_subscribe_message_from_config() {
249 let config = RisLiveSourceConfig {
250 host: Some("rrc00".to_string()),
251 message_type: Some("UPDATE".to_string()),
252 prefixes: Some(vec!["203.0.113.0/24".to_string()]),
253 path: Some("3356".to_string()),
254 ..Default::default()
255 };
256
257 let message = RisSubscribeMessage::from_config(&config);
258 let as_json = serde_json::to_value(&message).expect("subscribe should serialize");
259
260 assert_eq!(as_json["type"], "ris_subscribe");
261 assert_eq!(as_json["data"]["host"], "rrc00");
262 assert_eq!(as_json["data"]["type"], "UPDATE");
263 assert_eq!(as_json["data"]["path"], "3356");
264 assert_eq!(as_json["data"]["prefix"], "203.0.113.0/24");
265 assert_eq!(as_json["data"]["socketOptions"]["acknowledge"], true);
266 }
267
268 #[test]
269 fn timestamp_conversion_to_millis() {
270 let message = RisMessageData {
271 timestamp: Some(1773098494.83),
272 peer: None,
273 peer_asn: None,
274 id: None,
275 host: None,
276 msg_type: None,
277 path: None,
278 origin: None,
279 community: None,
280 announcements: None,
281 withdrawals: None,
282 state: None,
283 };
284
285 assert_eq!(message_timestamp_millis(&message), Some(1_773_098_494_830));
286 }
287}