1#![allow(unexpected_cfgs)]
2pub mod config;
25pub mod descriptor;
26pub mod mapping;
27pub mod messages;
28pub mod stream;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33
34use anyhow::Result;
35use async_trait::async_trait;
36use log::{error, info};
37use tokio::sync::{watch, RwLock};
38use tokio::time::timeout;
39use tracing::Instrument;
40
41use drasi_lib::channels::{ComponentStatus, DispatchMode, SubscriptionResponse};
42use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
43use drasi_lib::state_store::StateStoreProvider;
44use drasi_lib::Source;
45
46pub use config::{RisLiveSourceConfig, StartFrom};
47
48pub struct RisLiveSource {
50 base: SourceBase,
51 config: RisLiveSourceConfig,
52 state_store: RwLock<Option<Arc<dyn StateStoreProvider>>>,
53 task_handle: RwLock<Option<tokio::task::JoinHandle<()>>>,
54 shutdown_tx: RwLock<Option<watch::Sender<bool>>>,
55}
56
57impl RisLiveSource {
58 pub fn new(id: impl Into<String>, config: RisLiveSourceConfig) -> Result<Self> {
60 let id = id.into();
61 let params = SourceBaseParams::new(id);
62 Ok(Self {
63 base: SourceBase::new(params)?,
64 config,
65 state_store: RwLock::new(None),
66 task_handle: RwLock::new(None),
67 shutdown_tx: RwLock::new(None),
68 })
69 }
70
71 pub fn builder(id: impl Into<String>) -> RisLiveSourceBuilder {
73 RisLiveSourceBuilder::new(id)
74 }
75}
76
77#[async_trait]
78impl Source for RisLiveSource {
79 fn id(&self) -> &str {
80 &self.base.id
81 }
82
83 fn type_name(&self) -> &str {
84 "ris-live"
85 }
86
87 fn properties(&self) -> HashMap<String, serde_json::Value> {
88 let mut properties = HashMap::new();
89 properties.insert(
90 "websocket_url".to_string(),
91 serde_json::Value::String(self.config.websocket_url.clone()),
92 );
93 if let Some(client_name) = &self.config.client_name {
94 properties.insert(
95 "client_name".to_string(),
96 serde_json::Value::String(client_name.clone()),
97 );
98 }
99 if let Some(host) = &self.config.host {
100 properties.insert("host".to_string(), serde_json::Value::String(host.clone()));
101 }
102 if let Some(message_type) = &self.config.message_type {
103 properties.insert(
104 "message_type".to_string(),
105 serde_json::Value::String(message_type.clone()),
106 );
107 }
108 if let Some(prefixes) = &self.config.prefixes {
109 properties.insert(
110 "prefixes".to_string(),
111 serde_json::Value::Array(
112 prefixes
113 .iter()
114 .map(|prefix| serde_json::Value::String(prefix.clone()))
115 .collect(),
116 ),
117 );
118 }
119 properties.insert(
120 "include_peer_state".to_string(),
121 serde_json::Value::Bool(self.config.include_peer_state),
122 );
123 properties.insert(
124 "reconnect_delay_secs".to_string(),
125 serde_json::Value::Number(self.config.reconnect_delay_secs.into()),
126 );
127 properties.insert(
128 "clear_state_on_start".to_string(),
129 serde_json::Value::Bool(self.config.clear_state_on_start),
130 );
131 properties
132 }
133
134 fn auto_start(&self) -> bool {
135 self.base.get_auto_start()
136 }
137
138 async fn start(&self) -> Result<()> {
139 if self.base.get_status().await == ComponentStatus::Running {
140 return Ok(());
141 }
142
143 self.base
144 .set_status(
145 ComponentStatus::Starting,
146 Some("Starting RIS Live source".to_string()),
147 )
148 .await;
149
150 let source_id = self.base.id.clone();
151 let config = self.config.clone();
152 let dispatchers = self.base.dispatchers.clone();
153 let state_store = self.state_store.read().await.clone();
154
155 let (shutdown_tx, shutdown_rx) = watch::channel(false);
156 let source_id_for_span = source_id.clone();
157 let instance_id = self
158 .base
159 .context()
160 .await
161 .map(|context| context.instance_id)
162 .unwrap_or_default();
163 let span = tracing::info_span!(
164 "ris_live_stream_task",
165 instance_id = %instance_id,
166 component_id = %source_id_for_span,
167 component_type = "source"
168 );
169
170 let task_handle = tokio::spawn(
171 async move {
172 if let Err(error) = stream::run_stream_loop(
173 source_id.clone(),
174 config,
175 dispatchers,
176 state_store,
177 shutdown_rx,
178 )
179 .await
180 {
181 error!("[{source_id}] RIS stream loop failed: {error}");
182 }
183 }
184 .instrument(span),
185 );
186
187 *self.shutdown_tx.write().await = Some(shutdown_tx);
188 *self.task_handle.write().await = Some(task_handle);
189
190 self.base
191 .set_status(
192 ComponentStatus::Running,
193 Some("RIS Live source started".to_string()),
194 )
195 .await;
196 info!("[{}] RIS Live source started", self.base.id);
197
198 Ok(())
199 }
200
201 async fn stop(&self) -> Result<()> {
202 if self.base.get_status().await == ComponentStatus::Stopped {
203 return Ok(());
204 }
205
206 self.base
207 .set_status(
208 ComponentStatus::Stopping,
209 Some("Stopping RIS Live source".to_string()),
210 )
211 .await;
212
213 if let Some(shutdown_tx) = self.shutdown_tx.write().await.take() {
214 let _ = shutdown_tx.send(true);
215 }
216
217 if let Some(task_handle) = self.task_handle.write().await.take() {
218 match timeout(Duration::from_secs(10), task_handle).await {
219 Ok(Ok(())) => {}
220 Ok(Err(error)) => {
221 error!("[{}] RIS Live task panicked: {error}", self.base.id);
222 }
223 Err(_) => {
224 error!(
225 "[{}] Timed out while waiting for RIS task to stop",
226 self.base.id
227 );
228 }
229 }
230 }
231
232 self.base
233 .set_status(
234 ComponentStatus::Stopped,
235 Some("RIS Live source stopped".to_string()),
236 )
237 .await;
238
239 Ok(())
240 }
241
242 async fn status(&self) -> ComponentStatus {
243 self.base.get_status().await
244 }
245
246 async fn subscribe(
247 &self,
248 settings: drasi_lib::config::SourceSubscriptionSettings,
249 ) -> Result<SubscriptionResponse> {
250 self.base
251 .subscribe_with_bootstrap(&settings, "RIS Live")
252 .await
253 }
254
255 fn as_any(&self) -> &dyn std::any::Any {
256 self
257 }
258
259 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
260 self.base.initialize(context.clone()).await;
261 if self.state_store.read().await.is_none() {
262 if let Some(store) = context.state_store.as_ref() {
263 *self.state_store.write().await = Some(store.clone());
264 }
265 }
266 }
267
268 async fn set_bootstrap_provider(
269 &self,
270 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
271 ) {
272 self.base.set_bootstrap_provider(provider).await;
273 }
274}
275
276pub struct RisLiveSourceBuilder {
278 id: String,
279 websocket_url: String,
280 client_name: Option<String>,
281 host: Option<String>,
282 message_type: Option<String>,
283 prefixes: Option<Vec<String>>,
284 more_specific: Option<bool>,
285 less_specific: Option<bool>,
286 path: Option<String>,
287 peer: Option<String>,
288 require: Option<String>,
289 include_peer_state: bool,
290 reconnect_delay_secs: u64,
291 clear_state_on_start: bool,
292 start_from: StartFrom,
293 state_store: Option<Arc<dyn StateStoreProvider>>,
294 dispatch_mode: Option<DispatchMode>,
295 dispatch_buffer_capacity: Option<usize>,
296 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
297 auto_start: bool,
298}
299
300impl RisLiveSourceBuilder {
301 pub fn new(id: impl Into<String>) -> Self {
303 Self {
304 id: id.into(),
305 websocket_url: "wss://ris-live.ripe.net/v1/ws/".to_string(),
306 client_name: None,
307 host: None,
308 message_type: None,
309 prefixes: None,
310 more_specific: None,
311 less_specific: None,
312 path: None,
313 peer: None,
314 require: None,
315 include_peer_state: true,
316 reconnect_delay_secs: 5,
317 clear_state_on_start: false,
318 start_from: StartFrom::Now,
319 state_store: None,
320 dispatch_mode: None,
321 dispatch_buffer_capacity: None,
322 bootstrap_provider: None,
323 auto_start: true,
324 }
325 }
326
327 pub fn with_websocket_url(mut self, websocket_url: impl Into<String>) -> Self {
328 self.websocket_url = websocket_url.into();
329 self
330 }
331
332 pub fn with_client_name(mut self, client_name: impl Into<String>) -> Self {
333 self.client_name = Some(client_name.into());
334 self
335 }
336
337 pub fn with_optional_client_name(mut self, client_name: Option<String>) -> Self {
338 self.client_name = client_name;
339 self
340 }
341
342 pub fn with_host(mut self, host: impl Into<String>) -> Self {
343 self.host = Some(host.into());
344 self
345 }
346
347 pub fn with_optional_host(mut self, host: Option<String>) -> Self {
348 self.host = host;
349 self
350 }
351
352 pub fn with_message_type(mut self, message_type: impl Into<String>) -> Self {
353 self.message_type = Some(message_type.into());
354 self
355 }
356
357 pub fn with_optional_message_type(mut self, message_type: Option<String>) -> Self {
358 self.message_type = message_type;
359 self
360 }
361
362 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
363 let mut prefixes = self.prefixes.unwrap_or_default();
364 prefixes.push(prefix.into());
365 self.prefixes = Some(prefixes);
366 self
367 }
368
369 pub fn with_prefixes(mut self, prefixes: Vec<String>) -> Self {
370 self.prefixes = Some(prefixes);
371 self
372 }
373
374 pub fn with_optional_prefixes(mut self, prefixes: Option<Vec<String>>) -> Self {
375 self.prefixes = prefixes;
376 self
377 }
378
379 pub fn with_more_specific(mut self, more_specific: bool) -> Self {
380 self.more_specific = Some(more_specific);
381 self
382 }
383
384 pub fn with_optional_more_specific(mut self, more_specific: Option<bool>) -> Self {
385 self.more_specific = more_specific;
386 self
387 }
388
389 pub fn with_less_specific(mut self, less_specific: bool) -> Self {
390 self.less_specific = Some(less_specific);
391 self
392 }
393
394 pub fn with_optional_less_specific(mut self, less_specific: Option<bool>) -> Self {
395 self.less_specific = less_specific;
396 self
397 }
398
399 pub fn with_path(mut self, path: impl Into<String>) -> Self {
400 self.path = Some(path.into());
401 self
402 }
403
404 pub fn with_optional_path(mut self, path: Option<String>) -> Self {
405 self.path = path;
406 self
407 }
408
409 pub fn with_peer(mut self, peer: impl Into<String>) -> Self {
410 self.peer = Some(peer.into());
411 self
412 }
413
414 pub fn with_optional_peer(mut self, peer: Option<String>) -> Self {
415 self.peer = peer;
416 self
417 }
418
419 pub fn with_require(mut self, require: impl Into<String>) -> Self {
420 self.require = Some(require.into());
421 self
422 }
423
424 pub fn with_optional_require(mut self, require: Option<String>) -> Self {
425 self.require = require;
426 self
427 }
428
429 pub fn with_include_peer_state(mut self, include_peer_state: bool) -> Self {
430 self.include_peer_state = include_peer_state;
431 self
432 }
433
434 pub fn with_reconnect_delay_secs(mut self, reconnect_delay_secs: u64) -> Self {
435 self.reconnect_delay_secs = reconnect_delay_secs;
436 self
437 }
438
439 pub fn with_clear_state_on_start(mut self, clear_state_on_start: bool) -> Self {
440 self.clear_state_on_start = clear_state_on_start;
441 self
442 }
443
444 pub fn with_start_from_beginning(mut self) -> Self {
445 self.start_from = StartFrom::Beginning;
446 self
447 }
448
449 pub fn with_start_from_now(mut self) -> Self {
450 self.start_from = StartFrom::Now;
451 self
452 }
453
454 pub fn with_start_from_timestamp(mut self, timestamp_ms: i64) -> Self {
455 self.start_from = StartFrom::Timestamp { timestamp_ms };
456 self
457 }
458
459 pub fn with_start_from(mut self, start_from: StartFrom) -> Self {
460 self.start_from = start_from;
461 self
462 }
463
464 pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
465 self.state_store = Some(state_store);
466 self
467 }
468
469 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
470 self.dispatch_mode = Some(mode);
471 self
472 }
473
474 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
475 self.dispatch_buffer_capacity = Some(capacity);
476 self
477 }
478
479 pub fn with_bootstrap_provider(
480 mut self,
481 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
482 ) -> Self {
483 self.bootstrap_provider = Some(Box::new(provider));
484 self
485 }
486
487 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
488 self.auto_start = auto_start;
489 self
490 }
491
492 pub fn build(self) -> Result<RisLiveSource> {
493 let config = RisLiveSourceConfig {
494 websocket_url: self.websocket_url,
495 client_name: self.client_name,
496 host: self.host,
497 message_type: self.message_type,
498 prefixes: self.prefixes,
499 more_specific: self.more_specific,
500 less_specific: self.less_specific,
501 path: self.path,
502 peer: self.peer,
503 require: self.require,
504 include_peer_state: self.include_peer_state,
505 reconnect_delay_secs: self.reconnect_delay_secs.max(1),
506 clear_state_on_start: self.clear_state_on_start,
507 start_from: self.start_from,
508 };
509 config.validate()?;
510
511 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
512 if let Some(mode) = self.dispatch_mode {
513 params = params.with_dispatch_mode(mode);
514 }
515 if let Some(capacity) = self.dispatch_buffer_capacity {
516 params = params.with_dispatch_buffer_capacity(capacity);
517 }
518 if let Some(provider) = self.bootstrap_provider {
519 params = params.with_bootstrap_provider(provider);
520 }
521
522 Ok(RisLiveSource {
523 base: SourceBase::new(params)?,
524 config,
525 state_store: RwLock::new(self.state_store),
526 task_handle: RwLock::new(None),
527 shutdown_tx: RwLock::new(None),
528 })
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use drasi_lib::Source;
535
536 use super::{RisLiveSource, StartFrom};
537
538 #[test]
539 fn builder_sets_defaults() {
540 let source = RisLiveSource::builder("test-source")
541 .build()
542 .expect("source should build");
543
544 assert_eq!(source.id(), "test-source");
545 assert_eq!(source.type_name(), "ris-live");
546 assert_eq!(source.config.reconnect_delay_secs, 5);
547 assert!(source.config.include_peer_state);
548 }
549
550 #[test]
551 fn builder_sets_custom_values() {
552 let source = RisLiveSource::builder("test-source")
553 .with_websocket_url("wss://example.invalid/ws/")
554 .with_host("rrc00")
555 .with_message_type("UPDATE")
556 .with_prefix("203.0.113.0/24")
557 .with_reconnect_delay_secs(10)
558 .with_clear_state_on_start(true)
559 .with_start_from_timestamp(1_700_000_000_000)
560 .build()
561 .expect("source should build");
562
563 assert_eq!(source.config.websocket_url, "wss://example.invalid/ws/");
564 assert_eq!(source.config.host.as_deref(), Some("rrc00"));
565 assert_eq!(source.config.message_type.as_deref(), Some("UPDATE"));
566 assert_eq!(
567 source.config.prefixes,
568 Some(vec!["203.0.113.0/24".to_string()])
569 );
570 assert_eq!(source.config.reconnect_delay_secs, 10);
571 assert!(source.config.clear_state_on_start);
572 assert_eq!(
573 source.config.start_from,
574 StartFrom::Timestamp {
575 timestamp_ms: 1_700_000_000_000,
576 }
577 );
578 }
579}
580
581#[cfg(feature = "dynamic-plugin")]
583drasi_plugin_sdk::export_plugin!(
584 plugin_id = "ris-live-source",
585 core_version = env!("CARGO_PKG_VERSION"),
586 lib_version = env!("CARGO_PKG_VERSION"),
587 plugin_version = env!("CARGO_PKG_VERSION"),
588 source_descriptors = [descriptor::RisLiveSourceDescriptor],
589 reaction_descriptors = [],
590 bootstrap_descriptors = [],
591);