drasi_reaction_sse/
lib.rs1#![allow(unexpected_cfgs)]
2use std::collections::HashMap;
33
34pub mod config;
35pub mod descriptor;
36pub mod sse;
37
38pub use config::{QueryConfig, SseExtension, SseReactionConfig, TemplateSpec};
39pub use sse::SseReaction;
40
41fn register_json_helper(handlebars: &mut handlebars::Handlebars) {
44 handlebars.register_helper(
45 "json",
46 Box::new(
47 |h: &handlebars::Helper,
48 _: &handlebars::Handlebars,
49 _: &handlebars::Context,
50 _: &mut handlebars::RenderContext,
51 out: &mut dyn handlebars::Output|
52 -> handlebars::HelperResult {
53 if let Some(value) = h.param(0) {
54 match serde_json::to_string(&value.value()) {
55 Ok(json_str) => out.write(&json_str)?,
56 Err(_) => {
57 out.write("null")?;
59 }
60 }
61 } else {
62 out.write("null")?;
64 }
65 Ok(())
66 },
67 ),
68 );
69}
70
71pub struct SseReactionBuilder {
73 id: String,
74 queries: Vec<String>,
75 host: String,
76 port: u16,
77 sse_path: String,
78 heartbeat_interval_ms: u64,
79 priority_queue_capacity: Option<usize>,
80 auto_start: bool,
81 routes: HashMap<String, QueryConfig>,
82 default_template: Option<QueryConfig>,
83}
84
85impl SseReactionBuilder {
86 pub fn new(id: impl Into<String>) -> Self {
88 Self {
89 id: id.into(),
90 queries: Vec::new(),
91 host: "0.0.0.0".to_string(),
92 port: 8080,
93 sse_path: "/events".to_string(),
94 heartbeat_interval_ms: 30000,
95 priority_queue_capacity: None,
96 auto_start: true,
97 routes: HashMap::new(),
98 default_template: None,
99 }
100 }
101
102 pub fn with_queries(mut self, queries: Vec<String>) -> Self {
104 self.queries = queries;
105 self
106 }
107
108 pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
110 self.queries.push(query_id.into());
111 self
112 }
113
114 pub fn with_host(mut self, host: impl Into<String>) -> Self {
116 self.host = host.into();
117 self
118 }
119
120 pub fn with_port(mut self, port: u16) -> Self {
122 self.port = port;
123 self
124 }
125
126 pub fn with_sse_path(mut self, path: impl Into<String>) -> Self {
128 self.sse_path = path.into();
129 self
130 }
131
132 pub fn with_heartbeat_interval_ms(mut self, interval_ms: u64) -> Self {
134 self.heartbeat_interval_ms = interval_ms;
135 self
136 }
137
138 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
140 self.priority_queue_capacity = Some(capacity);
141 self
142 }
143
144 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
146 self.auto_start = auto_start;
147 self
148 }
149
150 pub fn with_route(mut self, query_id: impl Into<String>, config: QueryConfig) -> Self {
152 self.routes.insert(query_id.into(), config);
153 self
154 }
155
156 pub fn with_default_template(mut self, config: QueryConfig) -> Self {
158 self.default_template = Some(config);
159 self
160 }
161
162 pub fn with_config(mut self, config: SseReactionConfig) -> Self {
164 self.host = config.host;
165 self.port = config.port;
166 self.sse_path = config.sse_path;
167 self.heartbeat_interval_ms = config.heartbeat_interval_ms;
168 self.routes = config.routes;
169 self.default_template = config.default_template;
170 self
171 }
172
173 fn validate_template(
175 handlebars: &handlebars::Handlebars,
176 template: &str,
177 ) -> anyhow::Result<()> {
178 if template.is_empty() {
179 return Ok(());
180 }
181 handlebars
183 .render_template(template, &serde_json::json!({}))
184 .map_err(|e| anyhow::anyhow!("Invalid template: {e}"))?;
185 Ok(())
186 }
187
188 fn validate_query_config(
190 handlebars: &handlebars::Handlebars,
191 config: &QueryConfig,
192 ) -> anyhow::Result<()> {
193 if let Some(added) = &config.added {
194 Self::validate_template(handlebars, &added.template)?;
196 if let Some(path) = &added.extension.path {
198 Self::validate_template(handlebars, path)?;
199 }
200 }
201 if let Some(updated) = &config.updated {
202 Self::validate_template(handlebars, &updated.template)?;
204 if let Some(path) = &updated.extension.path {
206 Self::validate_template(handlebars, path)?;
207 }
208 }
209 if let Some(deleted) = &config.deleted {
210 Self::validate_template(handlebars, &deleted.template)?;
212 if let Some(path) = &deleted.extension.path {
214 Self::validate_template(handlebars, path)?;
215 }
216 }
217 Ok(())
218 }
219
220 pub fn build(self) -> anyhow::Result<SseReaction> {
222 let mut handlebars = handlebars::Handlebars::new();
224
225 register_json_helper(&mut handlebars);
227
228 for (query_id, config) in &self.routes {
230 Self::validate_query_config(&handlebars, config)
231 .map_err(|e| anyhow::anyhow!("Invalid template in route '{query_id}': {e}"))?;
232 }
233
234 if let Some(default_template) = &self.default_template {
236 Self::validate_query_config(&handlebars, default_template)
237 .map_err(|e| anyhow::anyhow!("Invalid default template: {e}"))?;
238 }
239
240 if !self.routes.is_empty() && !self.queries.is_empty() {
242 for route_query in self.routes.keys() {
243 let matches = self
245 .queries
246 .iter()
247 .any(|q| q == route_query || q.ends_with(&format!(".{route_query}")));
248 if !matches {
249 return Err(anyhow::anyhow!(
250 "Route '{}' does not match any subscribed query. Subscribed queries: {:?}",
251 route_query,
252 self.queries
253 ));
254 }
255 }
256 }
257
258 let config = SseReactionConfig {
259 host: self.host,
260 port: self.port,
261 sse_path: self.sse_path,
262 heartbeat_interval_ms: self.heartbeat_interval_ms,
263 routes: self.routes,
264 default_template: self.default_template,
265 };
266
267 Ok(SseReaction::from_builder(
268 self.id,
269 self.queries,
270 config,
271 self.priority_queue_capacity,
272 self.auto_start,
273 ))
274 }
275}
276
277#[cfg(test)]
278mod tests;
279
280#[cfg(feature = "dynamic-plugin")]
284drasi_plugin_sdk::export_plugin!(
285 plugin_id = "sse-reaction",
286 core_version = env!("CARGO_PKG_VERSION"),
287 lib_version = env!("CARGO_PKG_VERSION"),
288 plugin_version = env!("CARGO_PKG_VERSION"),
289 source_descriptors = [],
290 reaction_descriptors = [descriptor::SseReactionDescriptor],
291 bootstrap_descriptors = [],
292);