Skip to main content

drasi_source_ris_live/
config.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configuration for the RIPE NCC RIS Live source.
16
17use serde::{Deserialize, Serialize};
18
19fn default_websocket_url() -> String {
20    "wss://ris-live.ripe.net/v1/ws/".to_string()
21}
22
23fn default_include_peer_state() -> bool {
24    true
25}
26
27fn default_reconnect_delay_secs() -> u64 {
28    5
29}
30
31fn default_clear_state_on_start() -> bool {
32    false
33}
34
35/// Initial behavior for stream processing.
36///
37/// RIS Live does not expose replay-by-offset semantics, but this setting controls
38/// how incoming events are filtered when the source starts.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
40#[serde(tag = "mode", rename_all = "snake_case")]
41pub enum StartFrom {
42    /// Process all stream events from the moment the connection is established,
43    /// including any buffered events the server delivers on connect.
44    Beginning,
45    /// Start processing from the current stream position, ignoring any
46    /// buffered history the server may deliver.
47    #[default]
48    Now,
49    /// Ignore events whose `timestamp` is older than this Unix timestamp in milliseconds.
50    Timestamp { timestamp_ms: i64 },
51}
52
53/// RIPE RIS Live source configuration.
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
55pub struct RisLiveSourceConfig {
56    /// WebSocket endpoint for RIS Live.
57    #[serde(default = "default_websocket_url")]
58    pub websocket_url: String,
59    /// Optional client identifier passed as `?client=` query parameter.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub client_name: Option<String>,
62    /// Optional route collector filter (e.g. `rrc00`).
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub host: Option<String>,
65    /// Optional BGP message type filter (e.g. `UPDATE`).
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub message_type: Option<String>,
68    /// Optional prefix filter(s).
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub prefixes: Option<Vec<String>>,
71    /// Whether to match more specific prefixes.
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub more_specific: Option<bool>,
74    /// Whether to match less specific prefixes.
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub less_specific: Option<bool>,
77    /// Optional AS path filter.
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub path: Option<String>,
80    /// Optional peer IP filter.
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub peer: Option<String>,
83    /// Optional required field filter (e.g. announcements/withdrawals).
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    pub require: Option<String>,
86    /// Emit peer node updates from `RIS_PEER_STATE` messages.
87    #[serde(default = "default_include_peer_state")]
88    pub include_peer_state: bool,
89    /// Delay before reconnect attempts after disconnects.
90    #[serde(default = "default_reconnect_delay_secs")]
91    pub reconnect_delay_secs: u64,
92    /// Clear persisted graph state at startup.
93    #[serde(default = "default_clear_state_on_start")]
94    pub clear_state_on_start: bool,
95    /// Initial stream behavior.
96    #[serde(default)]
97    pub start_from: StartFrom,
98}
99
100impl Default for RisLiveSourceConfig {
101    fn default() -> Self {
102        Self {
103            websocket_url: default_websocket_url(),
104            client_name: None,
105            host: None,
106            message_type: None,
107            prefixes: None,
108            more_specific: None,
109            less_specific: None,
110            path: None,
111            peer: None,
112            require: None,
113            include_peer_state: default_include_peer_state(),
114            reconnect_delay_secs: default_reconnect_delay_secs(),
115            clear_state_on_start: default_clear_state_on_start(),
116            start_from: StartFrom::default(),
117        }
118    }
119}
120
121impl RisLiveSourceConfig {
122    /// Validate configuration fields.
123    pub fn validate(&self) -> anyhow::Result<()> {
124        let parsed = url::Url::parse(self.websocket_url.trim())
125            .map_err(|e| anyhow::anyhow!("websocket_url is not a valid URL: {e}"))?;
126        match parsed.scheme() {
127            "wss" | "ws" => {}
128            other => {
129                return Err(anyhow::anyhow!(
130                    "websocket_url scheme must be ws or wss, got: {other}"
131                ));
132            }
133        }
134        Ok(())
135    }
136
137    /// Returns whether a message with the given timestamp should be processed.
138    pub fn should_process_timestamp(&self, message_timestamp_ms: Option<i64>) -> bool {
139        match self.start_from {
140            StartFrom::Timestamp { timestamp_ms } => match message_timestamp_ms {
141                Some(ts) => ts >= timestamp_ms,
142                None => true,
143            },
144            StartFrom::Beginning | StartFrom::Now => true,
145        }
146    }
147
148    /// Returns a non-zero reconnect delay in seconds.
149    pub fn reconnect_delay_secs(&self) -> u64 {
150        self.reconnect_delay_secs.max(1)
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::{RisLiveSourceConfig, StartFrom};
157
158    #[test]
159    fn default_values_are_set() {
160        let config = RisLiveSourceConfig::default();
161        assert_eq!(config.websocket_url, "wss://ris-live.ripe.net/v1/ws/");
162        assert!(config.include_peer_state);
163        assert_eq!(config.reconnect_delay_secs, 5);
164        assert_eq!(config.start_from, StartFrom::Now);
165    }
166
167    #[test]
168    fn start_from_timestamp_filters_older_messages() {
169        let config = RisLiveSourceConfig {
170            start_from: StartFrom::Timestamp {
171                timestamp_ms: 1_700_000_000_000,
172            },
173            ..Default::default()
174        };
175
176        assert!(!config.should_process_timestamp(Some(1_699_999_999_999)));
177        assert!(config.should_process_timestamp(Some(1_700_000_000_001)));
178        assert!(config.should_process_timestamp(None));
179    }
180
181    #[test]
182    fn reconnect_delay_is_never_zero() {
183        let config = RisLiveSourceConfig {
184            reconnect_delay_secs: 0,
185            ..Default::default()
186        };
187        assert_eq!(config.reconnect_delay_secs(), 1);
188    }
189}