Skip to main content

a3s_event/
broker.rs

1//! Broker/Trigger event routing
2//!
3//! Implements the Knative-inspired Broker/Trigger pattern:
4//! - Publishers emit events to a **Broker**
5//! - **Triggers** filter events by type, source, subject pattern, and metadata
6//! - Matching events are delivered to the Trigger's **EventSink** in parallel
7//!
8//! Delivery is fire-and-forget — errors are logged but do not break the
9//! publish path.
10
11use crate::sink::EventSink;
12use crate::subject::subject_matches;
13use crate::types::Event;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17/// Filter criteria for a Trigger
18///
19/// All non-None fields must match (AND logic). A filter with all fields
20/// set to None matches every event.
21#[derive(Debug, Clone, Default)]
22pub struct TriggerFilter {
23    /// Match events with this exact event_type
24    pub event_type: Option<String>,
25
26    /// Match events from this source
27    pub source: Option<String>,
28
29    /// Match events whose subject matches this pattern (supports `>` and `*` wildcards)
30    pub subject_pattern: Option<String>,
31
32    /// Match events that contain all of these metadata key-value pairs
33    pub attributes: Vec<(String, String)>,
34}
35
36impl TriggerFilter {
37    /// Create a filter matching a specific event type
38    pub fn by_type(event_type: impl Into<String>) -> Self {
39        Self {
40            event_type: Some(event_type.into()),
41            ..Default::default()
42        }
43    }
44
45    /// Create a filter matching a specific source
46    pub fn by_source(source: impl Into<String>) -> Self {
47        Self {
48            source: Some(source.into()),
49            ..Default::default()
50        }
51    }
52
53    /// Create a filter matching a subject pattern
54    pub fn by_subject(pattern: impl Into<String>) -> Self {
55        Self {
56            subject_pattern: Some(pattern.into()),
57            ..Default::default()
58        }
59    }
60
61    /// Add a required metadata attribute
62    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
63        self.attributes.push((key.into(), value.into()));
64        self
65    }
66
67    /// Check if an event matches this filter
68    pub fn matches(&self, event: &Event) -> bool {
69        // Check event_type
70        if let Some(ref et) = self.event_type {
71            if event.event_type != *et {
72                return false;
73            }
74        }
75
76        // Check source
77        if let Some(ref src) = self.source {
78            if event.source != *src {
79                return false;
80            }
81        }
82
83        // Check subject pattern
84        if let Some(ref pattern) = self.subject_pattern {
85            if !subject_matches(&event.subject, pattern) {
86                return false;
87            }
88        }
89
90        // Check metadata attributes (AND logic)
91        for (key, value) in &self.attributes {
92            match event.metadata.get(key) {
93                Some(v) if v == value => {}
94                _ => return false,
95            }
96        }
97
98        true
99    }
100}
101
102/// A Trigger pairs a filter with a delivery sink
103pub struct Trigger {
104    /// Trigger name for identification and logging
105    pub name: String,
106
107    /// Filter criteria — events must match to be delivered
108    pub filter: TriggerFilter,
109
110    /// Delivery target for matching events
111    pub sink: Arc<dyn EventSink>,
112}
113
114impl Trigger {
115    /// Create a new trigger
116    pub fn new(
117        name: impl Into<String>,
118        filter: TriggerFilter,
119        sink: Arc<dyn EventSink>,
120    ) -> Self {
121        Self {
122            name: name.into(),
123            filter,
124            sink,
125        }
126    }
127}
128
129/// Event broker — receives events and routes them through matching triggers
130///
131/// The Broker evaluates all registered triggers against each incoming event.
132/// Matching events are delivered to their triggers' sinks in parallel.
133/// Delivery errors are logged but do not propagate — the broker is
134/// fire-and-forget to avoid blocking publishers.
135pub struct Broker {
136    triggers: Arc<RwLock<Vec<Trigger>>>,
137}
138
139impl Broker {
140    /// Create an empty broker
141    pub fn new() -> Self {
142        Self {
143            triggers: Arc::new(RwLock::new(Vec::new())),
144        }
145    }
146
147    /// Add a trigger to the broker
148    pub async fn add_trigger(&self, trigger: Trigger) {
149        self.triggers.write().await.push(trigger);
150    }
151
152    /// Remove a trigger by name, returns true if found
153    pub async fn remove_trigger(&self, name: &str) -> bool {
154        let mut triggers = self.triggers.write().await;
155        let len_before = triggers.len();
156        triggers.retain(|t| t.name != name);
157        triggers.len() < len_before
158    }
159
160    /// Get the number of registered triggers
161    pub async fn trigger_count(&self) -> usize {
162        self.triggers.read().await.len()
163    }
164
165    /// Route an event through all matching triggers
166    ///
167    /// Evaluates each trigger's filter. For matching triggers, delivers
168    /// the event to the sink. All deliveries happen in parallel.
169    /// Errors are logged but do not propagate.
170    pub async fn route(&self, event: &Event) -> RouteResult {
171        let triggers = self.triggers.read().await;
172        let mut delivered = 0usize;
173        let mut failed = 0usize;
174
175        // Collect matching triggers and their sinks
176        let matching: Vec<(&str, Arc<dyn EventSink>)> = triggers
177            .iter()
178            .filter(|t| t.filter.matches(event))
179            .map(|t| (t.name.as_str(), t.sink.clone()))
180            .collect();
181
182        let matched = matching.len();
183
184        if matching.is_empty() {
185            return RouteResult {
186                matched: 0,
187                delivered: 0,
188                failed: 0,
189            };
190        }
191
192        // Deliver to all matching sinks in parallel
193        let mut handles = Vec::with_capacity(matching.len());
194        for (name, sink) in matching {
195            let event = event.clone();
196            let trigger_name = name.to_string();
197            handles.push(tokio::spawn(async move {
198                match sink.deliver(&event).await {
199                    Ok(()) => {
200                        tracing::debug!(
201                            trigger = %trigger_name,
202                            event_id = %event.id,
203                            sink = %sink.name(),
204                            "Event delivered via trigger"
205                        );
206                        true
207                    }
208                    Err(e) => {
209                        tracing::warn!(
210                            trigger = %trigger_name,
211                            event_id = %event.id,
212                            sink = %sink.name(),
213                            error = %e,
214                            "Trigger delivery failed"
215                        );
216                        false
217                    }
218                }
219            }));
220        }
221
222        for handle in handles {
223            match handle.await {
224                Ok(true) => delivered += 1,
225                Ok(false) => failed += 1,
226                Err(e) => {
227                    tracing::warn!(error = %e, "Trigger delivery task panicked");
228                    failed += 1;
229                }
230            }
231        }
232
233        RouteResult {
234            matched,
235            delivered,
236            failed,
237        }
238    }
239}
240
241impl Default for Broker {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247/// Result of routing an event through the broker
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct RouteResult {
250    /// Number of triggers whose filter matched
251    pub matched: usize,
252    /// Number of successful deliveries
253    pub delivered: usize,
254    /// Number of failed deliveries
255    pub failed: usize,
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::sink::{CollectorSink, FailingSink, LogSink};
262
263    fn test_event(event_type: &str, source: &str, subject: &str) -> Event {
264        Event::typed(
265            subject,
266            "test",
267            event_type,
268            1,
269            "Test",
270            source,
271            serde_json::json!({}),
272        )
273    }
274
275    // ─── TriggerFilter tests ─────────────────────────────────────
276
277    #[test]
278    fn test_filter_empty_matches_all() {
279        let filter = TriggerFilter::default();
280        let event = test_event("any.type", "any-src", "events.any.subject");
281        assert!(filter.matches(&event));
282    }
283
284    #[test]
285    fn test_filter_by_type() {
286        let filter = TriggerFilter::by_type("a3s.gateway.scale.up");
287        assert!(filter.matches(&test_event("a3s.gateway.scale.up", "gw", "events.scale.up")));
288        assert!(!filter.matches(&test_event("a3s.gateway.scale.down", "gw", "events.scale.down")));
289    }
290
291    #[test]
292    fn test_filter_by_source() {
293        let filter = TriggerFilter::by_source("gateway");
294        assert!(filter.matches(&test_event("any", "gateway", "events.a")));
295        assert!(!filter.matches(&test_event("any", "box", "events.a")));
296    }
297
298    #[test]
299    fn test_filter_by_subject_exact() {
300        let filter = TriggerFilter::by_subject("events.market.forex");
301        assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
302        assert!(!filter.matches(&test_event("t", "s", "events.market.crypto")));
303    }
304
305    #[test]
306    fn test_filter_by_subject_wildcard() {
307        let filter = TriggerFilter::by_subject("events.market.>");
308        assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
309        assert!(filter.matches(&test_event("t", "s", "events.market.crypto.btc")));
310        assert!(!filter.matches(&test_event("t", "s", "events.system.deploy")));
311    }
312
313    #[test]
314    fn test_filter_by_subject_single_wildcard() {
315        let filter = TriggerFilter::by_subject("events.*.forex");
316        assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
317        assert!(!filter.matches(&test_event("t", "s", "events.market.crypto")));
318    }
319
320    #[test]
321    fn test_filter_with_attributes() {
322        let filter = TriggerFilter::default()
323            .with_attribute("env", "prod")
324            .with_attribute("region", "us-east");
325
326        let event = test_event("t", "s", "events.a")
327            .with_metadata("env", "prod")
328            .with_metadata("region", "us-east");
329        assert!(filter.matches(&event));
330
331        let partial = test_event("t", "s", "events.a").with_metadata("env", "prod");
332        assert!(!filter.matches(&partial));
333
334        let wrong = test_event("t", "s", "events.a")
335            .with_metadata("env", "staging")
336            .with_metadata("region", "us-east");
337        assert!(!filter.matches(&wrong));
338    }
339
340    #[test]
341    fn test_filter_combined() {
342        let filter = TriggerFilter {
343            event_type: Some("scale.up".to_string()),
344            source: Some("gateway".to_string()),
345            subject_pattern: Some("events.scaling.>".to_string()),
346            attributes: vec![("priority".to_string(), "high".to_string())],
347        };
348
349        let good = Event::typed(
350            "events.scaling.web",
351            "test",
352            "scale.up",
353            1,
354            "Scale",
355            "gateway",
356            serde_json::json!({}),
357        )
358        .with_metadata("priority", "high");
359        assert!(filter.matches(&good));
360
361        // Wrong type
362        let bad_type = Event::typed(
363            "events.scaling.web",
364            "test",
365            "scale.down",
366            1,
367            "Scale",
368            "gateway",
369            serde_json::json!({}),
370        )
371        .with_metadata("priority", "high");
372        assert!(!filter.matches(&bad_type));
373    }
374
375    // ─── Broker tests ────────────────────────────────────────────
376
377    #[tokio::test]
378    async fn test_broker_add_remove_triggers() {
379        let broker = Broker::new();
380        assert_eq!(broker.trigger_count().await, 0);
381
382        let sink = Arc::new(LogSink::default());
383        broker
384            .add_trigger(Trigger::new("t1", TriggerFilter::default(), sink.clone()))
385            .await;
386        broker
387            .add_trigger(Trigger::new("t2", TriggerFilter::by_type("x"), sink))
388            .await;
389
390        assert_eq!(broker.trigger_count().await, 2);
391
392        assert!(broker.remove_trigger("t1").await);
393        assert_eq!(broker.trigger_count().await, 1);
394
395        assert!(!broker.remove_trigger("nonexistent").await);
396    }
397
398    #[tokio::test]
399    async fn test_broker_route_to_matching_sink() {
400        let broker = Broker::new();
401        let collector = Arc::new(CollectorSink::new("matched"));
402
403        broker
404            .add_trigger(Trigger::new(
405                "scale-trigger",
406                TriggerFilter::by_type("a3s.gateway.scale.up"),
407                collector.clone(),
408            ))
409            .await;
410
411        // Matching event
412        let event = test_event("a3s.gateway.scale.up", "gateway", "events.scaling.up");
413        let result = broker.route(&event).await;
414        assert_eq!(result.matched, 1);
415        assert_eq!(result.delivered, 1);
416        assert_eq!(result.failed, 0);
417        assert_eq!(collector.count().await, 1);
418
419        // Non-matching event
420        let other = test_event("a3s.box.instance.ready", "box", "events.instance.ready");
421        let result = broker.route(&other).await;
422        assert_eq!(result.matched, 0);
423        assert_eq!(result.delivered, 0);
424        assert_eq!(collector.count().await, 1); // unchanged
425    }
426
427    #[tokio::test]
428    async fn test_broker_route_multiple_triggers() {
429        let broker = Broker::new();
430        let sink1 = Arc::new(CollectorSink::new("sink1"));
431        let sink2 = Arc::new(CollectorSink::new("sink2"));
432        let sink3 = Arc::new(CollectorSink::new("sink3"));
433
434        broker
435            .add_trigger(Trigger::new(
436                "t1",
437                TriggerFilter::by_type("scale.up"),
438                sink1.clone(),
439            ))
440            .await;
441        broker
442            .add_trigger(Trigger::new(
443                "t2",
444                TriggerFilter::by_source("gateway"),
445                sink2.clone(),
446            ))
447            .await;
448        broker
449            .add_trigger(Trigger::new(
450                "t3",
451                TriggerFilter::by_type("scale.down"),
452                sink3.clone(),
453            ))
454            .await;
455
456        let event = test_event("scale.up", "gateway", "events.a");
457        let result = broker.route(&event).await;
458
459        // t1 and t2 match, t3 does not
460        assert_eq!(result.matched, 2);
461        assert_eq!(result.delivered, 2);
462        assert_eq!(sink1.count().await, 1);
463        assert_eq!(sink2.count().await, 1);
464        assert_eq!(sink3.count().await, 0);
465    }
466
467    #[tokio::test]
468    async fn test_broker_route_with_failing_sink() {
469        let broker = Broker::new();
470        let good_sink = Arc::new(CollectorSink::new("good"));
471        let bad_sink = Arc::new(FailingSink::new("bad", "network error"));
472
473        broker
474            .add_trigger(Trigger::new("good-trigger", TriggerFilter::default(), good_sink.clone()))
475            .await;
476        broker
477            .add_trigger(Trigger::new("bad-trigger", TriggerFilter::default(), bad_sink))
478            .await;
479
480        let event = test_event("any", "any", "events.a");
481        let result = broker.route(&event).await;
482
483        assert_eq!(result.matched, 2);
484        assert_eq!(result.delivered, 1);
485        assert_eq!(result.failed, 1);
486        assert_eq!(good_sink.count().await, 1); // good sink still delivered
487    }
488
489    #[tokio::test]
490    async fn test_broker_route_no_triggers() {
491        let broker = Broker::new();
492        let event = test_event("any", "any", "events.a");
493        let result = broker.route(&event).await;
494
495        assert_eq!(result.matched, 0);
496        assert_eq!(result.delivered, 0);
497        assert_eq!(result.failed, 0);
498    }
499
500    #[tokio::test]
501    async fn test_broker_default() {
502        let broker = Broker::default();
503        assert_eq!(broker.trigger_count().await, 0);
504    }
505
506    #[tokio::test]
507    async fn test_route_result_equality() {
508        let a = RouteResult {
509            matched: 2,
510            delivered: 1,
511            failed: 1,
512        };
513        let b = RouteResult {
514            matched: 2,
515            delivered: 1,
516            failed: 1,
517        };
518        assert_eq!(a, b);
519    }
520}