drasi_source_ris_live/
descriptor.rs1use drasi_plugin_sdk::prelude::*;
18use utoipa::OpenApi;
19
20use crate::config::StartFrom;
21use crate::RisLiveSourceBuilder;
22
23fn default_websocket_url() -> ConfigValue<String> {
24 ConfigValue::Static("wss://ris-live.ripe.net/v1/ws/".to_string())
25}
26
27fn default_include_peer_state() -> ConfigValue<bool> {
28 ConfigValue::Static(true)
29}
30
31fn default_reconnect_delay_secs() -> ConfigValue<u64> {
32 ConfigValue::Static(5)
33}
34
35fn default_clear_state_on_start() -> ConfigValue<bool> {
36 ConfigValue::Static(false)
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
41#[schema(as = source::rislive::RisLiveSourceConfig)]
42#[serde(rename_all = "camelCase", deny_unknown_fields)]
43pub struct RisLiveSourceConfigDto {
44 #[serde(default = "default_websocket_url")]
45 pub websocket_url: ConfigValue<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
47 pub client_name: Option<ConfigValue<String>>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub host: Option<ConfigValue<String>>,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub message_type: Option<ConfigValue<String>>,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub prefixes: Option<Vec<ConfigValue<String>>>,
54 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub more_specific: Option<ConfigValue<bool>>,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub less_specific: Option<ConfigValue<bool>>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub path: Option<ConfigValue<String>>,
60 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub peer: Option<ConfigValue<String>>,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub require: Option<ConfigValue<String>>,
64 #[serde(default = "default_include_peer_state")]
65 pub include_peer_state: ConfigValue<bool>,
66 #[serde(default = "default_reconnect_delay_secs")]
67 pub reconnect_delay_secs: ConfigValue<u64>,
68 #[serde(default = "default_clear_state_on_start")]
69 pub clear_state_on_start: ConfigValue<bool>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub start_from_beginning: Option<ConfigValue<bool>>,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub start_from_now: Option<ConfigValue<bool>>,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub start_from_timestamp: Option<ConfigValue<i64>>,
76}
77
78#[derive(OpenApi)]
79#[openapi(components(schemas(RisLiveSourceConfigDto)))]
80struct RisLiveSourceSchemas;
81
82pub struct RisLiveSourceDescriptor;
84
85#[async_trait]
86impl SourcePluginDescriptor for RisLiveSourceDescriptor {
87 fn kind(&self) -> &str {
88 "ris-live"
89 }
90
91 fn config_version(&self) -> &str {
92 "1.0.0"
93 }
94
95 fn config_schema_name(&self) -> &str {
96 "source.rislive.RisLiveSourceConfig"
97 }
98
99 fn config_schema_json(&self) -> String {
100 let api = RisLiveSourceSchemas::openapi();
101 serde_json::to_string(
102 &api.components
103 .as_ref()
104 .expect("OpenAPI components missing")
105 .schemas,
106 )
107 .expect("Failed to serialize config schema")
108 }
109
110 async fn create_source(
111 &self,
112 id: &str,
113 config_json: &serde_json::Value,
114 auto_start: bool,
115 ) -> anyhow::Result<Box<dyn drasi_lib::sources::Source>> {
116 let dto: RisLiveSourceConfigDto = serde_json::from_value(config_json.clone())?;
117 let mapper = DtoMapper::new();
118
119 let websocket_url = mapper.resolve_string(&dto.websocket_url).await?;
120 let client_name = mapper.resolve_optional_string(&dto.client_name).await?;
121 let host = mapper.resolve_optional_string(&dto.host).await?;
122 let message_type = mapper.resolve_optional_string(&dto.message_type).await?;
123 let prefixes = if let Some(values) = dto.prefixes.as_deref() {
124 Some(mapper.resolve_string_vec(values).await?)
125 } else {
126 None
127 };
128 let more_specific = mapper.resolve_optional(&dto.more_specific).await?;
129 let less_specific = mapper.resolve_optional(&dto.less_specific).await?;
130 let path = mapper.resolve_optional_string(&dto.path).await?;
131 let peer = mapper.resolve_optional_string(&dto.peer).await?;
132 let require = mapper.resolve_optional_string(&dto.require).await?;
133 let include_peer_state = mapper.resolve_typed(&dto.include_peer_state).await?;
134 let reconnect_delay_secs = mapper.resolve_typed(&dto.reconnect_delay_secs).await?;
135 let clear_state_on_start = mapper.resolve_typed(&dto.clear_state_on_start).await?;
136
137 let start_from =
141 if let Some(timestamp) = mapper.resolve_optional(&dto.start_from_timestamp).await? {
142 StartFrom::Timestamp {
143 timestamp_ms: timestamp,
144 }
145 } else if mapper
146 .resolve_optional(&dto.start_from_beginning)
147 .await?
148 .unwrap_or(false)
149 {
150 StartFrom::Beginning
151 } else {
152 StartFrom::Now
153 };
154
155 let source = RisLiveSourceBuilder::new(id)
156 .with_websocket_url(websocket_url)
157 .with_optional_client_name(client_name)
158 .with_optional_host(host)
159 .with_optional_message_type(message_type)
160 .with_optional_prefixes(prefixes)
161 .with_optional_more_specific(more_specific)
162 .with_optional_less_specific(less_specific)
163 .with_optional_path(path)
164 .with_optional_peer(peer)
165 .with_optional_require(require)
166 .with_include_peer_state(include_peer_state)
167 .with_reconnect_delay_secs(reconnect_delay_secs)
168 .with_clear_state_on_start(clear_state_on_start)
169 .with_start_from(start_from)
170 .with_auto_start(auto_start)
171 .build()?;
172
173 Ok(Box::new(source))
174 }
175}