Skip to main content

drasi_source_ris_live/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! RIPE NCC RIS Live source plugin for Drasi.
17//!
18//! This source consumes real-time BGP messages from RIPE RIS Live over WebSocket
19//! and maps them to a graph model:
20//! - `(:Peer)` nodes
21//! - `(:Prefix)` nodes
22//! - `(peer)-[:ROUTES]->(prefix)` relationships
23
24pub mod config;
25pub mod descriptor;
26pub mod mapping;
27pub mod messages;
28pub mod stream;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33
34use anyhow::Result;
35use async_trait::async_trait;
36use log::{error, info};
37use tokio::sync::{watch, RwLock};
38use tokio::time::timeout;
39use tracing::Instrument;
40
41use drasi_lib::channels::{ComponentStatus, DispatchMode, SubscriptionResponse};
42use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
43use drasi_lib::state_store::StateStoreProvider;
44use drasi_lib::Source;
45
46pub use config::{RisLiveSourceConfig, StartFrom};
47
48/// RIS Live source implementation.
49pub struct RisLiveSource {
50    base: SourceBase,
51    config: RisLiveSourceConfig,
52    state_store: RwLock<Option<Arc<dyn StateStoreProvider>>>,
53    task_handle: RwLock<Option<tokio::task::JoinHandle<()>>>,
54    shutdown_tx: RwLock<Option<watch::Sender<bool>>>,
55}
56
57impl RisLiveSource {
58    /// Creates a new source with the given ID and config.
59    pub fn new(id: impl Into<String>, config: RisLiveSourceConfig) -> Result<Self> {
60        let id = id.into();
61        let params = SourceBaseParams::new(id);
62        Ok(Self {
63            base: SourceBase::new(params)?,
64            config,
65            state_store: RwLock::new(None),
66            task_handle: RwLock::new(None),
67            shutdown_tx: RwLock::new(None),
68        })
69    }
70
71    /// Creates a source builder.
72    pub fn builder(id: impl Into<String>) -> RisLiveSourceBuilder {
73        RisLiveSourceBuilder::new(id)
74    }
75}
76
77#[async_trait]
78impl Source for RisLiveSource {
79    fn id(&self) -> &str {
80        &self.base.id
81    }
82
83    fn type_name(&self) -> &str {
84        "ris-live"
85    }
86
87    fn properties(&self) -> HashMap<String, serde_json::Value> {
88        let mut properties = HashMap::new();
89        properties.insert(
90            "websocket_url".to_string(),
91            serde_json::Value::String(self.config.websocket_url.clone()),
92        );
93        if let Some(client_name) = &self.config.client_name {
94            properties.insert(
95                "client_name".to_string(),
96                serde_json::Value::String(client_name.clone()),
97            );
98        }
99        if let Some(host) = &self.config.host {
100            properties.insert("host".to_string(), serde_json::Value::String(host.clone()));
101        }
102        if let Some(message_type) = &self.config.message_type {
103            properties.insert(
104                "message_type".to_string(),
105                serde_json::Value::String(message_type.clone()),
106            );
107        }
108        if let Some(prefixes) = &self.config.prefixes {
109            properties.insert(
110                "prefixes".to_string(),
111                serde_json::Value::Array(
112                    prefixes
113                        .iter()
114                        .map(|prefix| serde_json::Value::String(prefix.clone()))
115                        .collect(),
116                ),
117            );
118        }
119        properties.insert(
120            "include_peer_state".to_string(),
121            serde_json::Value::Bool(self.config.include_peer_state),
122        );
123        properties.insert(
124            "reconnect_delay_secs".to_string(),
125            serde_json::Value::Number(self.config.reconnect_delay_secs.into()),
126        );
127        properties.insert(
128            "clear_state_on_start".to_string(),
129            serde_json::Value::Bool(self.config.clear_state_on_start),
130        );
131        properties
132    }
133
134    fn auto_start(&self) -> bool {
135        self.base.get_auto_start()
136    }
137
138    async fn start(&self) -> Result<()> {
139        if self.base.get_status().await == ComponentStatus::Running {
140            return Ok(());
141        }
142
143        self.base
144            .set_status(
145                ComponentStatus::Starting,
146                Some("Starting RIS Live source".to_string()),
147            )
148            .await;
149
150        let source_id = self.base.id.clone();
151        let config = self.config.clone();
152        let dispatchers = self.base.dispatchers.clone();
153        let state_store = self.state_store.read().await.clone();
154
155        let (shutdown_tx, shutdown_rx) = watch::channel(false);
156        let source_id_for_span = source_id.clone();
157        let instance_id = self
158            .base
159            .context()
160            .await
161            .map(|context| context.instance_id)
162            .unwrap_or_default();
163        let span = tracing::info_span!(
164            "ris_live_stream_task",
165            instance_id = %instance_id,
166            component_id = %source_id_for_span,
167            component_type = "source"
168        );
169
170        let task_handle = tokio::spawn(
171            async move {
172                if let Err(error) = stream::run_stream_loop(
173                    source_id.clone(),
174                    config,
175                    dispatchers,
176                    state_store,
177                    shutdown_rx,
178                )
179                .await
180                {
181                    error!("[{source_id}] RIS stream loop failed: {error}");
182                }
183            }
184            .instrument(span),
185        );
186
187        *self.shutdown_tx.write().await = Some(shutdown_tx);
188        *self.task_handle.write().await = Some(task_handle);
189
190        self.base
191            .set_status(
192                ComponentStatus::Running,
193                Some("RIS Live source started".to_string()),
194            )
195            .await;
196        info!("[{}] RIS Live source started", self.base.id);
197
198        Ok(())
199    }
200
201    async fn stop(&self) -> Result<()> {
202        if self.base.get_status().await == ComponentStatus::Stopped {
203            return Ok(());
204        }
205
206        self.base
207            .set_status(
208                ComponentStatus::Stopping,
209                Some("Stopping RIS Live source".to_string()),
210            )
211            .await;
212
213        if let Some(shutdown_tx) = self.shutdown_tx.write().await.take() {
214            let _ = shutdown_tx.send(true);
215        }
216
217        if let Some(task_handle) = self.task_handle.write().await.take() {
218            match timeout(Duration::from_secs(10), task_handle).await {
219                Ok(Ok(())) => {}
220                Ok(Err(error)) => {
221                    error!("[{}] RIS Live task panicked: {error}", self.base.id);
222                }
223                Err(_) => {
224                    error!(
225                        "[{}] Timed out while waiting for RIS task to stop",
226                        self.base.id
227                    );
228                }
229            }
230        }
231
232        self.base
233            .set_status(
234                ComponentStatus::Stopped,
235                Some("RIS Live source stopped".to_string()),
236            )
237            .await;
238
239        Ok(())
240    }
241
242    async fn status(&self) -> ComponentStatus {
243        self.base.get_status().await
244    }
245
246    async fn subscribe(
247        &self,
248        settings: drasi_lib::config::SourceSubscriptionSettings,
249    ) -> Result<SubscriptionResponse> {
250        self.base
251            .subscribe_with_bootstrap(&settings, "RIS Live")
252            .await
253    }
254
255    fn as_any(&self) -> &dyn std::any::Any {
256        self
257    }
258
259    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
260        self.base.initialize(context.clone()).await;
261        if self.state_store.read().await.is_none() {
262            if let Some(store) = context.state_store.as_ref() {
263                *self.state_store.write().await = Some(store.clone());
264            }
265        }
266    }
267
268    async fn set_bootstrap_provider(
269        &self,
270        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
271    ) {
272        self.base.set_bootstrap_provider(provider).await;
273    }
274}
275
276/// Builder for [`RisLiveSource`].
277pub struct RisLiveSourceBuilder {
278    id: String,
279    websocket_url: String,
280    client_name: Option<String>,
281    host: Option<String>,
282    message_type: Option<String>,
283    prefixes: Option<Vec<String>>,
284    more_specific: Option<bool>,
285    less_specific: Option<bool>,
286    path: Option<String>,
287    peer: Option<String>,
288    require: Option<String>,
289    include_peer_state: bool,
290    reconnect_delay_secs: u64,
291    clear_state_on_start: bool,
292    start_from: StartFrom,
293    state_store: Option<Arc<dyn StateStoreProvider>>,
294    dispatch_mode: Option<DispatchMode>,
295    dispatch_buffer_capacity: Option<usize>,
296    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
297    auto_start: bool,
298}
299
300impl RisLiveSourceBuilder {
301    /// Creates a new builder.
302    pub fn new(id: impl Into<String>) -> Self {
303        Self {
304            id: id.into(),
305            websocket_url: "wss://ris-live.ripe.net/v1/ws/".to_string(),
306            client_name: None,
307            host: None,
308            message_type: None,
309            prefixes: None,
310            more_specific: None,
311            less_specific: None,
312            path: None,
313            peer: None,
314            require: None,
315            include_peer_state: true,
316            reconnect_delay_secs: 5,
317            clear_state_on_start: false,
318            start_from: StartFrom::Now,
319            state_store: None,
320            dispatch_mode: None,
321            dispatch_buffer_capacity: None,
322            bootstrap_provider: None,
323            auto_start: true,
324        }
325    }
326
327    pub fn with_websocket_url(mut self, websocket_url: impl Into<String>) -> Self {
328        self.websocket_url = websocket_url.into();
329        self
330    }
331
332    pub fn with_client_name(mut self, client_name: impl Into<String>) -> Self {
333        self.client_name = Some(client_name.into());
334        self
335    }
336
337    pub fn with_optional_client_name(mut self, client_name: Option<String>) -> Self {
338        self.client_name = client_name;
339        self
340    }
341
342    pub fn with_host(mut self, host: impl Into<String>) -> Self {
343        self.host = Some(host.into());
344        self
345    }
346
347    pub fn with_optional_host(mut self, host: Option<String>) -> Self {
348        self.host = host;
349        self
350    }
351
352    pub fn with_message_type(mut self, message_type: impl Into<String>) -> Self {
353        self.message_type = Some(message_type.into());
354        self
355    }
356
357    pub fn with_optional_message_type(mut self, message_type: Option<String>) -> Self {
358        self.message_type = message_type;
359        self
360    }
361
362    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
363        let mut prefixes = self.prefixes.unwrap_or_default();
364        prefixes.push(prefix.into());
365        self.prefixes = Some(prefixes);
366        self
367    }
368
369    pub fn with_prefixes(mut self, prefixes: Vec<String>) -> Self {
370        self.prefixes = Some(prefixes);
371        self
372    }
373
374    pub fn with_optional_prefixes(mut self, prefixes: Option<Vec<String>>) -> Self {
375        self.prefixes = prefixes;
376        self
377    }
378
379    pub fn with_more_specific(mut self, more_specific: bool) -> Self {
380        self.more_specific = Some(more_specific);
381        self
382    }
383
384    pub fn with_optional_more_specific(mut self, more_specific: Option<bool>) -> Self {
385        self.more_specific = more_specific;
386        self
387    }
388
389    pub fn with_less_specific(mut self, less_specific: bool) -> Self {
390        self.less_specific = Some(less_specific);
391        self
392    }
393
394    pub fn with_optional_less_specific(mut self, less_specific: Option<bool>) -> Self {
395        self.less_specific = less_specific;
396        self
397    }
398
399    pub fn with_path(mut self, path: impl Into<String>) -> Self {
400        self.path = Some(path.into());
401        self
402    }
403
404    pub fn with_optional_path(mut self, path: Option<String>) -> Self {
405        self.path = path;
406        self
407    }
408
409    pub fn with_peer(mut self, peer: impl Into<String>) -> Self {
410        self.peer = Some(peer.into());
411        self
412    }
413
414    pub fn with_optional_peer(mut self, peer: Option<String>) -> Self {
415        self.peer = peer;
416        self
417    }
418
419    pub fn with_require(mut self, require: impl Into<String>) -> Self {
420        self.require = Some(require.into());
421        self
422    }
423
424    pub fn with_optional_require(mut self, require: Option<String>) -> Self {
425        self.require = require;
426        self
427    }
428
429    pub fn with_include_peer_state(mut self, include_peer_state: bool) -> Self {
430        self.include_peer_state = include_peer_state;
431        self
432    }
433
434    pub fn with_reconnect_delay_secs(mut self, reconnect_delay_secs: u64) -> Self {
435        self.reconnect_delay_secs = reconnect_delay_secs;
436        self
437    }
438
439    pub fn with_clear_state_on_start(mut self, clear_state_on_start: bool) -> Self {
440        self.clear_state_on_start = clear_state_on_start;
441        self
442    }
443
444    pub fn with_start_from_beginning(mut self) -> Self {
445        self.start_from = StartFrom::Beginning;
446        self
447    }
448
449    pub fn with_start_from_now(mut self) -> Self {
450        self.start_from = StartFrom::Now;
451        self
452    }
453
454    pub fn with_start_from_timestamp(mut self, timestamp_ms: i64) -> Self {
455        self.start_from = StartFrom::Timestamp { timestamp_ms };
456        self
457    }
458
459    pub fn with_start_from(mut self, start_from: StartFrom) -> Self {
460        self.start_from = start_from;
461        self
462    }
463
464    pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
465        self.state_store = Some(state_store);
466        self
467    }
468
469    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
470        self.dispatch_mode = Some(mode);
471        self
472    }
473
474    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
475        self.dispatch_buffer_capacity = Some(capacity);
476        self
477    }
478
479    pub fn with_bootstrap_provider(
480        mut self,
481        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
482    ) -> Self {
483        self.bootstrap_provider = Some(Box::new(provider));
484        self
485    }
486
487    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
488        self.auto_start = auto_start;
489        self
490    }
491
492    pub fn build(self) -> Result<RisLiveSource> {
493        let config = RisLiveSourceConfig {
494            websocket_url: self.websocket_url,
495            client_name: self.client_name,
496            host: self.host,
497            message_type: self.message_type,
498            prefixes: self.prefixes,
499            more_specific: self.more_specific,
500            less_specific: self.less_specific,
501            path: self.path,
502            peer: self.peer,
503            require: self.require,
504            include_peer_state: self.include_peer_state,
505            reconnect_delay_secs: self.reconnect_delay_secs.max(1),
506            clear_state_on_start: self.clear_state_on_start,
507            start_from: self.start_from,
508        };
509        config.validate()?;
510
511        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
512        if let Some(mode) = self.dispatch_mode {
513            params = params.with_dispatch_mode(mode);
514        }
515        if let Some(capacity) = self.dispatch_buffer_capacity {
516            params = params.with_dispatch_buffer_capacity(capacity);
517        }
518        if let Some(provider) = self.bootstrap_provider {
519            params = params.with_bootstrap_provider(provider);
520        }
521
522        Ok(RisLiveSource {
523            base: SourceBase::new(params)?,
524            config,
525            state_store: RwLock::new(self.state_store),
526            task_handle: RwLock::new(None),
527            shutdown_tx: RwLock::new(None),
528        })
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use drasi_lib::Source;
535
536    use super::{RisLiveSource, StartFrom};
537
538    #[test]
539    fn builder_sets_defaults() {
540        let source = RisLiveSource::builder("test-source")
541            .build()
542            .expect("source should build");
543
544        assert_eq!(source.id(), "test-source");
545        assert_eq!(source.type_name(), "ris-live");
546        assert_eq!(source.config.reconnect_delay_secs, 5);
547        assert!(source.config.include_peer_state);
548    }
549
550    #[test]
551    fn builder_sets_custom_values() {
552        let source = RisLiveSource::builder("test-source")
553            .with_websocket_url("wss://example.invalid/ws/")
554            .with_host("rrc00")
555            .with_message_type("UPDATE")
556            .with_prefix("203.0.113.0/24")
557            .with_reconnect_delay_secs(10)
558            .with_clear_state_on_start(true)
559            .with_start_from_timestamp(1_700_000_000_000)
560            .build()
561            .expect("source should build");
562
563        assert_eq!(source.config.websocket_url, "wss://example.invalid/ws/");
564        assert_eq!(source.config.host.as_deref(), Some("rrc00"));
565        assert_eq!(source.config.message_type.as_deref(), Some("UPDATE"));
566        assert_eq!(
567            source.config.prefixes,
568            Some(vec!["203.0.113.0/24".to_string()])
569        );
570        assert_eq!(source.config.reconnect_delay_secs, 10);
571        assert!(source.config.clear_state_on_start);
572        assert_eq!(
573            source.config.start_from,
574            StartFrom::Timestamp {
575                timestamp_ms: 1_700_000_000_000,
576            }
577        );
578    }
579}
580
581/// Dynamic plugin entry point.
582#[cfg(feature = "dynamic-plugin")]
583drasi_plugin_sdk::export_plugin!(
584    plugin_id = "ris-live-source",
585    core_version = env!("CARGO_PKG_VERSION"),
586    lib_version = env!("CARGO_PKG_VERSION"),
587    plugin_version = env!("CARGO_PKG_VERSION"),
588    source_descriptors = [descriptor::RisLiveSourceDescriptor],
589    reaction_descriptors = [],
590    bootstrap_descriptors = [],
591);