Skip to main content

drasi_source_ris_live/
descriptor.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! RIS Live source plugin descriptor and configuration DTOs.
16
17use drasi_plugin_sdk::prelude::*;
18use utoipa::OpenApi;
19
20use crate::config::StartFrom;
21use crate::RisLiveSourceBuilder;
22
23fn default_websocket_url() -> ConfigValue<String> {
24    ConfigValue::Static("wss://ris-live.ripe.net/v1/ws/".to_string())
25}
26
27fn default_include_peer_state() -> ConfigValue<bool> {
28    ConfigValue::Static(true)
29}
30
31fn default_reconnect_delay_secs() -> ConfigValue<u64> {
32    ConfigValue::Static(5)
33}
34
35fn default_clear_state_on_start() -> ConfigValue<bool> {
36    ConfigValue::Static(false)
37}
38
39/// RIS Live source configuration DTO.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
41#[schema(as = source::rislive::RisLiveSourceConfig)]
42#[serde(rename_all = "camelCase", deny_unknown_fields)]
43pub struct RisLiveSourceConfigDto {
44    #[serde(default = "default_websocket_url")]
45    pub websocket_url: ConfigValue<String>,
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub client_name: Option<ConfigValue<String>>,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub host: Option<ConfigValue<String>>,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub message_type: Option<ConfigValue<String>>,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub prefixes: Option<Vec<ConfigValue<String>>>,
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub more_specific: Option<ConfigValue<bool>>,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub less_specific: Option<ConfigValue<bool>>,
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub path: Option<ConfigValue<String>>,
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub peer: Option<ConfigValue<String>>,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub require: Option<ConfigValue<String>>,
64    #[serde(default = "default_include_peer_state")]
65    pub include_peer_state: ConfigValue<bool>,
66    #[serde(default = "default_reconnect_delay_secs")]
67    pub reconnect_delay_secs: ConfigValue<u64>,
68    #[serde(default = "default_clear_state_on_start")]
69    pub clear_state_on_start: ConfigValue<bool>,
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub start_from_beginning: Option<ConfigValue<bool>>,
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub start_from_now: Option<ConfigValue<bool>>,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub start_from_timestamp: Option<ConfigValue<i64>>,
76}
77
78#[derive(OpenApi)]
79#[openapi(components(schemas(RisLiveSourceConfigDto)))]
80struct RisLiveSourceSchemas;
81
82/// Descriptor for the RIS Live source plugin.
83pub struct RisLiveSourceDescriptor;
84
85#[async_trait]
86impl SourcePluginDescriptor for RisLiveSourceDescriptor {
87    fn kind(&self) -> &str {
88        "ris-live"
89    }
90
91    fn config_version(&self) -> &str {
92        "1.0.0"
93    }
94
95    fn config_schema_name(&self) -> &str {
96        "source.rislive.RisLiveSourceConfig"
97    }
98
99    fn config_schema_json(&self) -> String {
100        let api = RisLiveSourceSchemas::openapi();
101        serde_json::to_string(
102            &api.components
103                .as_ref()
104                .expect("OpenAPI components missing")
105                .schemas,
106        )
107        .expect("Failed to serialize config schema")
108    }
109
110    async fn create_source(
111        &self,
112        id: &str,
113        config_json: &serde_json::Value,
114        auto_start: bool,
115    ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
116        let dto: RisLiveSourceConfigDto = serde_json::from_value(config_json.clone())?;
117        let mapper = DtoMapper::new();
118
119        let websocket_url = mapper.resolve_string(&dto.websocket_url).await?;
120        let client_name = mapper.resolve_optional_string(&dto.client_name).await?;
121        let host = mapper.resolve_optional_string(&dto.host).await?;
122        let message_type = mapper.resolve_optional_string(&dto.message_type).await?;
123        let prefixes = if let Some(values) = dto.prefixes.as_deref() {
124            Some(mapper.resolve_string_vec(values).await?)
125        } else {
126            None
127        };
128        let more_specific = mapper.resolve_optional(&dto.more_specific).await?;
129        let less_specific = mapper.resolve_optional(&dto.less_specific).await?;
130        let path = mapper.resolve_optional_string(&dto.path).await?;
131        let peer = mapper.resolve_optional_string(&dto.peer).await?;
132        let require = mapper.resolve_optional_string(&dto.require).await?;
133        let include_peer_state = mapper.resolve_typed(&dto.include_peer_state).await?;
134        let reconnect_delay_secs = mapper.resolve_typed(&dto.reconnect_delay_secs).await?;
135        let clear_state_on_start = mapper.resolve_typed(&dto.clear_state_on_start).await?;
136
137        // Precedence: start_from_timestamp > start_from_beginning > default (Now).
138        // start_from_now is accepted in the DTO for explicit configuration but
139        // is equivalent to the default behavior.
140        let start_from =
141            if let Some(timestamp) = mapper.resolve_optional(&dto.start_from_timestamp).await? {
142                StartFrom::Timestamp {
143                    timestamp_ms: timestamp,
144                }
145            } else if mapper
146                .resolve_optional(&dto.start_from_beginning)
147                .await?
148                .unwrap_or(false)
149            {
150                StartFrom::Beginning
151            } else {
152                StartFrom::Now
153            };
154
155        let source = RisLiveSourceBuilder::new(id)
156            .with_websocket_url(websocket_url)
157            .with_optional_client_name(client_name)
158            .with_optional_host(host)
159            .with_optional_message_type(message_type)
160            .with_optional_prefixes(prefixes)
161            .with_optional_more_specific(more_specific)
162            .with_optional_less_specific(less_specific)
163            .with_optional_path(path)
164            .with_optional_peer(peer)
165            .with_optional_require(require)
166            .with_include_peer_state(include_peer_state)
167            .with_reconnect_delay_secs(reconnect_delay_secs)
168            .with_clear_state_on_start(clear_state_on_start)
169            .with_start_from(start_from)
170            .with_auto_start(auto_start)
171            .build()?;
172
173        Ok(Box::new(source))
174    }
175}