use serde::{Deserialize, Serialize};
fn default_websocket_url() -> String {
"wss://ris-live.ripe.net/v1/ws/".to_string()
}
fn default_include_peer_state() -> bool {
true
}
fn default_reconnect_delay_secs() -> u64 {
5
}
fn default_clear_state_on_start() -> bool {
false
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum StartFrom {
Beginning,
#[default]
Now,
Timestamp { timestamp_ms: i64 },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RisLiveSourceConfig {
#[serde(default = "default_websocket_url")]
pub websocket_url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub client_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_type: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prefixes: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub more_specific: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub less_specific: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub peer: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub require: Option<String>,
#[serde(default = "default_include_peer_state")]
pub include_peer_state: bool,
#[serde(default = "default_reconnect_delay_secs")]
pub reconnect_delay_secs: u64,
#[serde(default = "default_clear_state_on_start")]
pub clear_state_on_start: bool,
#[serde(default)]
pub start_from: StartFrom,
}
impl Default for RisLiveSourceConfig {
fn default() -> Self {
Self {
websocket_url: default_websocket_url(),
client_name: None,
host: None,
message_type: None,
prefixes: None,
more_specific: None,
less_specific: None,
path: None,
peer: None,
require: None,
include_peer_state: default_include_peer_state(),
reconnect_delay_secs: default_reconnect_delay_secs(),
clear_state_on_start: default_clear_state_on_start(),
start_from: StartFrom::default(),
}
}
}
impl RisLiveSourceConfig {
pub fn validate(&self) -> anyhow::Result<()> {
let parsed = url::Url::parse(self.websocket_url.trim())
.map_err(|e| anyhow::anyhow!("websocket_url is not a valid URL: {e}"))?;
match parsed.scheme() {
"wss" | "ws" => {}
other => {
return Err(anyhow::anyhow!(
"websocket_url scheme must be ws or wss, got: {other}"
));
}
}
Ok(())
}
pub fn should_process_timestamp(&self, message_timestamp_ms: Option<i64>) -> bool {
match self.start_from {
StartFrom::Timestamp { timestamp_ms } => match message_timestamp_ms {
Some(ts) => ts >= timestamp_ms,
None => true,
},
StartFrom::Beginning | StartFrom::Now => true,
}
}
pub fn reconnect_delay_secs(&self) -> u64 {
self.reconnect_delay_secs.max(1)
}
}
#[cfg(test)]
mod tests {
use super::{RisLiveSourceConfig, StartFrom};
#[test]
fn default_values_are_set() {
let config = RisLiveSourceConfig::default();
assert_eq!(config.websocket_url, "wss://ris-live.ripe.net/v1/ws/");
assert!(config.include_peer_state);
assert_eq!(config.reconnect_delay_secs, 5);
assert_eq!(config.start_from, StartFrom::Now);
}
#[test]
fn start_from_timestamp_filters_older_messages() {
let config = RisLiveSourceConfig {
start_from: StartFrom::Timestamp {
timestamp_ms: 1_700_000_000_000,
},
..Default::default()
};
assert!(!config.should_process_timestamp(Some(1_699_999_999_999)));
assert!(config.should_process_timestamp(Some(1_700_000_000_001)));
assert!(config.should_process_timestamp(None));
}
#[test]
fn reconnect_delay_is_never_zero() {
let config = RisLiveSourceConfig {
reconnect_delay_secs: 0,
..Default::default()
};
assert_eq!(config.reconnect_delay_secs(), 1);
}
}