drasi_source_ris_live/
config.rs1use serde::{Deserialize, Serialize};
18
19fn default_websocket_url() -> String {
20 "wss://ris-live.ripe.net/v1/ws/".to_string()
21}
22
23fn default_include_peer_state() -> bool {
24 true
25}
26
27fn default_reconnect_delay_secs() -> u64 {
28 5
29}
30
31fn default_clear_state_on_start() -> bool {
32 false
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
40#[serde(tag = "mode", rename_all = "snake_case")]
41pub enum StartFrom {
42 Beginning,
45 #[default]
48 Now,
49 Timestamp { timestamp_ms: i64 },
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55pub struct RisLiveSourceConfig {
56 #[serde(default = "default_websocket_url")]
58 pub websocket_url: String,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub client_name: Option<String>,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub host: Option<String>,
65 #[serde(default, skip_serializing_if = "Option::is_none")]
67 pub message_type: Option<String>,
68 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub prefixes: Option<Vec<String>>,
71 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub more_specific: Option<bool>,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
76 pub less_specific: Option<bool>,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub path: Option<String>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub peer: Option<String>,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
85 pub require: Option<String>,
86 #[serde(default = "default_include_peer_state")]
88 pub include_peer_state: bool,
89 #[serde(default = "default_reconnect_delay_secs")]
91 pub reconnect_delay_secs: u64,
92 #[serde(default = "default_clear_state_on_start")]
94 pub clear_state_on_start: bool,
95 #[serde(default)]
97 pub start_from: StartFrom,
98}
99
100impl Default for RisLiveSourceConfig {
101 fn default() -> Self {
102 Self {
103 websocket_url: default_websocket_url(),
104 client_name: None,
105 host: None,
106 message_type: None,
107 prefixes: None,
108 more_specific: None,
109 less_specific: None,
110 path: None,
111 peer: None,
112 require: None,
113 include_peer_state: default_include_peer_state(),
114 reconnect_delay_secs: default_reconnect_delay_secs(),
115 clear_state_on_start: default_clear_state_on_start(),
116 start_from: StartFrom::default(),
117 }
118 }
119}
120
121impl RisLiveSourceConfig {
122 pub fn validate(&self) -> anyhow::Result<()> {
124 let parsed = url::Url::parse(self.websocket_url.trim())
125 .map_err(|e| anyhow::anyhow!("websocket_url is not a valid URL: {e}"))?;
126 match parsed.scheme() {
127 "wss" | "ws" => {}
128 other => {
129 return Err(anyhow::anyhow!(
130 "websocket_url scheme must be ws or wss, got: {other}"
131 ));
132 }
133 }
134 Ok(())
135 }
136
137 pub fn should_process_timestamp(&self, message_timestamp_ms: Option<i64>) -> bool {
139 match self.start_from {
140 StartFrom::Timestamp { timestamp_ms } => match message_timestamp_ms {
141 Some(ts) => ts >= timestamp_ms,
142 None => true,
143 },
144 StartFrom::Beginning | StartFrom::Now => true,
145 }
146 }
147
148 pub fn reconnect_delay_secs(&self) -> u64 {
150 self.reconnect_delay_secs.max(1)
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::{RisLiveSourceConfig, StartFrom};
157
158 #[test]
159 fn default_values_are_set() {
160 let config = RisLiveSourceConfig::default();
161 assert_eq!(config.websocket_url, "wss://ris-live.ripe.net/v1/ws/");
162 assert!(config.include_peer_state);
163 assert_eq!(config.reconnect_delay_secs, 5);
164 assert_eq!(config.start_from, StartFrom::Now);
165 }
166
167 #[test]
168 fn start_from_timestamp_filters_older_messages() {
169 let config = RisLiveSourceConfig {
170 start_from: StartFrom::Timestamp {
171 timestamp_ms: 1_700_000_000_000,
172 },
173 ..Default::default()
174 };
175
176 assert!(!config.should_process_timestamp(Some(1_699_999_999_999)));
177 assert!(config.should_process_timestamp(Some(1_700_000_000_001)));
178 assert!(config.should_process_timestamp(None));
179 }
180
181 #[test]
182 fn reconnect_delay_is_never_zero() {
183 let config = RisLiveSourceConfig {
184 reconnect_delay_secs: 0,
185 ..Default::default()
186 };
187 assert_eq!(config.reconnect_delay_secs(), 1);
188 }
189}