Skip to main content

drasi_reaction_sse/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Server-Sent Events (SSE) reaction plugin for Drasi
17//!
18//! This plugin implements SSE reactions for Drasi.
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use drasi_reaction_sse::SseReaction;
24//!
25//! let reaction = SseReaction::builder("my-sse-reaction")
26//!     .with_queries(vec!["query1".to_string()])
27//!     .with_host("0.0.0.0")
28//!     .with_port(8080)
29//!     .build()?;
30//! ```
31
32use std::collections::HashMap;
33
34pub mod config;
35pub mod descriptor;
36pub mod sse;
37
38pub use config::{QueryConfig, SseExtension, SseReactionConfig, TemplateSpec};
39pub use sse::SseReaction;
40
41/// Helper function to register the json helper in a Handlebars instance
42/// This helper serializes values to JSON format in templates
43fn register_json_helper(handlebars: &mut handlebars::Handlebars) {
44    handlebars.register_helper(
45        "json",
46        Box::new(
47            |h: &handlebars::Helper,
48             _: &handlebars::Handlebars,
49             _: &handlebars::Context,
50             _: &mut handlebars::RenderContext,
51             out: &mut dyn handlebars::Output|
52             -> handlebars::HelperResult {
53                if let Some(value) = h.param(0) {
54                    match serde_json::to_string(&value.value()) {
55                        Ok(json_str) => out.write(&json_str)?,
56                        Err(_) => {
57                            // On serialization error, output null
58                            out.write("null")?;
59                        }
60                    }
61                } else {
62                    // No parameter provided to json helper
63                    out.write("null")?;
64                }
65                Ok(())
66            },
67        ),
68    );
69}
70
71/// Builder for SSE reaction
72pub struct SseReactionBuilder {
73    id: String,
74    queries: Vec<String>,
75    host: String,
76    port: u16,
77    sse_path: String,
78    heartbeat_interval_ms: u64,
79    priority_queue_capacity: Option<usize>,
80    auto_start: bool,
81    routes: HashMap<String, QueryConfig>,
82    default_template: Option<QueryConfig>,
83}
84
85impl SseReactionBuilder {
86    /// Create a new SSE reaction builder with the given ID
87    pub fn new(id: impl Into<String>) -> Self {
88        Self {
89            id: id.into(),
90            queries: Vec::new(),
91            host: "0.0.0.0".to_string(),
92            port: 8080,
93            sse_path: "/events".to_string(),
94            heartbeat_interval_ms: 30000,
95            priority_queue_capacity: None,
96            auto_start: true,
97            routes: HashMap::new(),
98            default_template: None,
99        }
100    }
101
102    /// Set the query IDs to subscribe to
103    pub fn with_queries(mut self, queries: Vec<String>) -> Self {
104        self.queries = queries;
105        self
106    }
107
108    /// Add a query ID to subscribe to
109    pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
110        self.queries.push(query_id.into());
111        self
112    }
113
114    /// Set the host to bind to
115    pub fn with_host(mut self, host: impl Into<String>) -> Self {
116        self.host = host.into();
117        self
118    }
119
120    /// Set the port to bind to
121    pub fn with_port(mut self, port: u16) -> Self {
122        self.port = port;
123        self
124    }
125
126    /// Set the SSE path
127    pub fn with_sse_path(mut self, path: impl Into<String>) -> Self {
128        self.sse_path = path.into();
129        self
130    }
131
132    /// Set the heartbeat interval in milliseconds
133    pub fn with_heartbeat_interval_ms(mut self, interval_ms: u64) -> Self {
134        self.heartbeat_interval_ms = interval_ms;
135        self
136    }
137
138    /// Set the priority queue capacity
139    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
140        self.priority_queue_capacity = Some(capacity);
141        self
142    }
143
144    /// Set whether the reaction should auto-start
145    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
146        self.auto_start = auto_start;
147        self
148    }
149
150    /// Add a route configuration for a specific query
151    pub fn with_route(mut self, query_id: impl Into<String>, config: QueryConfig) -> Self {
152        self.routes.insert(query_id.into(), config);
153        self
154    }
155
156    /// Set the default template configuration used when no query-specific route is defined
157    pub fn with_default_template(mut self, config: QueryConfig) -> Self {
158        self.default_template = Some(config);
159        self
160    }
161
162    /// Set the full configuration at once
163    pub fn with_config(mut self, config: SseReactionConfig) -> Self {
164        self.host = config.host;
165        self.port = config.port;
166        self.sse_path = config.sse_path;
167        self.heartbeat_interval_ms = config.heartbeat_interval_ms;
168        self.routes = config.routes;
169        self.default_template = config.default_template;
170        self
171    }
172
173    /// Validate a template by attempting to compile it with Handlebars
174    fn validate_template(
175        handlebars: &handlebars::Handlebars,
176        template: &str,
177    ) -> anyhow::Result<()> {
178        if template.is_empty() {
179            return Ok(());
180        }
181        // Validate the template by attempting to render it with empty data
182        handlebars
183            .render_template(template, &serde_json::json!({}))
184            .map_err(|e| anyhow::anyhow!("Invalid template: {e}"))?;
185        Ok(())
186    }
187
188    /// Validate all templates in a QueryConfig
189    fn validate_query_config(
190        handlebars: &handlebars::Handlebars,
191        config: &QueryConfig,
192    ) -> anyhow::Result<()> {
193        if let Some(added) = &config.added {
194            // Validate body template
195            Self::validate_template(handlebars, &added.template)?;
196            // Validate path template (if present)
197            if let Some(path) = &added.extension.path {
198                Self::validate_template(handlebars, path)?;
199            }
200        }
201        if let Some(updated) = &config.updated {
202            // Validate body template
203            Self::validate_template(handlebars, &updated.template)?;
204            // Validate path template (if present)
205            if let Some(path) = &updated.extension.path {
206                Self::validate_template(handlebars, path)?;
207            }
208        }
209        if let Some(deleted) = &config.deleted {
210            // Validate body template
211            Self::validate_template(handlebars, &deleted.template)?;
212            // Validate path template (if present)
213            if let Some(path) = &deleted.extension.path {
214                Self::validate_template(handlebars, path)?;
215            }
216        }
217        Ok(())
218    }
219
220    /// Build the SSE reaction
221    pub fn build(self) -> anyhow::Result<SseReaction> {
222        // Create a single Handlebars instance for all validation with json helper
223        let mut handlebars = handlebars::Handlebars::new();
224
225        // Register the json helper for template validation
226        register_json_helper(&mut handlebars);
227
228        // Validate all templates in routes
229        for (query_id, config) in &self.routes {
230            Self::validate_query_config(&handlebars, config)
231                .map_err(|e| anyhow::anyhow!("Invalid template in route '{query_id}': {e}"))?;
232        }
233
234        // Validate default template if provided
235        if let Some(default_template) = &self.default_template {
236            Self::validate_query_config(&handlebars, default_template)
237                .map_err(|e| anyhow::anyhow!("Invalid default template: {e}"))?;
238        }
239
240        // Validate that all routes correspond to subscribed queries
241        if !self.routes.is_empty() && !self.queries.is_empty() {
242            for route_query in self.routes.keys() {
243                // Check exact match or if the query ends with the route (for dotted notation)
244                let matches = self
245                    .queries
246                    .iter()
247                    .any(|q| q == route_query || q.ends_with(&format!(".{route_query}")));
248                if !matches {
249                    return Err(anyhow::anyhow!(
250                        "Route '{}' does not match any subscribed query. Subscribed queries: {:?}",
251                        route_query,
252                        self.queries
253                    ));
254                }
255            }
256        }
257
258        let config = SseReactionConfig {
259            host: self.host,
260            port: self.port,
261            sse_path: self.sse_path,
262            heartbeat_interval_ms: self.heartbeat_interval_ms,
263            routes: self.routes,
264            default_template: self.default_template,
265        };
266
267        Ok(SseReaction::from_builder(
268            self.id,
269            self.queries,
270            config,
271            self.priority_queue_capacity,
272            self.auto_start,
273        ))
274    }
275}
276
277#[cfg(test)]
278mod tests;
279
280/// Dynamic plugin entry point.
281///
282/// Dynamic plugin entry point.
283#[cfg(feature = "dynamic-plugin")]
284drasi_plugin_sdk::export_plugin!(
285    plugin_id = "sse-reaction",
286    core_version = env!("CARGO_PKG_VERSION"),
287    lib_version = env!("CARGO_PKG_VERSION"),
288    plugin_version = env!("CARGO_PKG_VERSION"),
289    source_descriptors = [],
290    reaction_descriptors = [descriptor::SseReactionDescriptor],
291    bootstrap_descriptors = [],
292);