1use crate::sink::EventSink;
12use crate::subject::subject_matches;
13use crate::types::Event;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17#[derive(Debug, Clone, Default)]
22pub struct TriggerFilter {
23 pub event_type: Option<String>,
25
26 pub source: Option<String>,
28
29 pub subject_pattern: Option<String>,
31
32 pub attributes: Vec<(String, String)>,
34}
35
36impl TriggerFilter {
37 pub fn by_type(event_type: impl Into<String>) -> Self {
39 Self {
40 event_type: Some(event_type.into()),
41 ..Default::default()
42 }
43 }
44
45 pub fn by_source(source: impl Into<String>) -> Self {
47 Self {
48 source: Some(source.into()),
49 ..Default::default()
50 }
51 }
52
53 pub fn by_subject(pattern: impl Into<String>) -> Self {
55 Self {
56 subject_pattern: Some(pattern.into()),
57 ..Default::default()
58 }
59 }
60
61 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
63 self.attributes.push((key.into(), value.into()));
64 self
65 }
66
67 pub fn matches(&self, event: &Event) -> bool {
69 if let Some(ref et) = self.event_type {
71 if event.event_type != *et {
72 return false;
73 }
74 }
75
76 if let Some(ref src) = self.source {
78 if event.source != *src {
79 return false;
80 }
81 }
82
83 if let Some(ref pattern) = self.subject_pattern {
85 if !subject_matches(&event.subject, pattern) {
86 return false;
87 }
88 }
89
90 for (key, value) in &self.attributes {
92 match event.metadata.get(key) {
93 Some(v) if v == value => {}
94 _ => return false,
95 }
96 }
97
98 true
99 }
100}
101
102pub struct Trigger {
104 pub name: String,
106
107 pub filter: TriggerFilter,
109
110 pub sink: Arc<dyn EventSink>,
112}
113
114impl Trigger {
115 pub fn new(
117 name: impl Into<String>,
118 filter: TriggerFilter,
119 sink: Arc<dyn EventSink>,
120 ) -> Self {
121 Self {
122 name: name.into(),
123 filter,
124 sink,
125 }
126 }
127}
128
129pub struct Broker {
136 triggers: Arc<RwLock<Vec<Trigger>>>,
137}
138
139impl Broker {
140 pub fn new() -> Self {
142 Self {
143 triggers: Arc::new(RwLock::new(Vec::new())),
144 }
145 }
146
147 pub async fn add_trigger(&self, trigger: Trigger) {
149 self.triggers.write().await.push(trigger);
150 }
151
152 pub async fn remove_trigger(&self, name: &str) -> bool {
154 let mut triggers = self.triggers.write().await;
155 let len_before = triggers.len();
156 triggers.retain(|t| t.name != name);
157 triggers.len() < len_before
158 }
159
160 pub async fn trigger_count(&self) -> usize {
162 self.triggers.read().await.len()
163 }
164
165 pub async fn route(&self, event: &Event) -> RouteResult {
171 let triggers = self.triggers.read().await;
172 let mut delivered = 0usize;
173 let mut failed = 0usize;
174
175 let matching: Vec<(&str, Arc<dyn EventSink>)> = triggers
177 .iter()
178 .filter(|t| t.filter.matches(event))
179 .map(|t| (t.name.as_str(), t.sink.clone()))
180 .collect();
181
182 let matched = matching.len();
183
184 if matching.is_empty() {
185 return RouteResult {
186 matched: 0,
187 delivered: 0,
188 failed: 0,
189 };
190 }
191
192 let mut handles = Vec::with_capacity(matching.len());
194 for (name, sink) in matching {
195 let event = event.clone();
196 let trigger_name = name.to_string();
197 handles.push(tokio::spawn(async move {
198 match sink.deliver(&event).await {
199 Ok(()) => {
200 tracing::debug!(
201 trigger = %trigger_name,
202 event_id = %event.id,
203 sink = %sink.name(),
204 "Event delivered via trigger"
205 );
206 true
207 }
208 Err(e) => {
209 tracing::warn!(
210 trigger = %trigger_name,
211 event_id = %event.id,
212 sink = %sink.name(),
213 error = %e,
214 "Trigger delivery failed"
215 );
216 false
217 }
218 }
219 }));
220 }
221
222 for handle in handles {
223 match handle.await {
224 Ok(true) => delivered += 1,
225 Ok(false) => failed += 1,
226 Err(e) => {
227 tracing::warn!(error = %e, "Trigger delivery task panicked");
228 failed += 1;
229 }
230 }
231 }
232
233 RouteResult {
234 matched,
235 delivered,
236 failed,
237 }
238 }
239}
240
241impl Default for Broker {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct RouteResult {
250 pub matched: usize,
252 pub delivered: usize,
254 pub failed: usize,
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::sink::{CollectorSink, FailingSink, LogSink};
262
263 fn test_event(event_type: &str, source: &str, subject: &str) -> Event {
264 Event::typed(
265 subject,
266 "test",
267 event_type,
268 1,
269 "Test",
270 source,
271 serde_json::json!({}),
272 )
273 }
274
275 #[test]
278 fn test_filter_empty_matches_all() {
279 let filter = TriggerFilter::default();
280 let event = test_event("any.type", "any-src", "events.any.subject");
281 assert!(filter.matches(&event));
282 }
283
284 #[test]
285 fn test_filter_by_type() {
286 let filter = TriggerFilter::by_type("a3s.gateway.scale.up");
287 assert!(filter.matches(&test_event("a3s.gateway.scale.up", "gw", "events.scale.up")));
288 assert!(!filter.matches(&test_event("a3s.gateway.scale.down", "gw", "events.scale.down")));
289 }
290
291 #[test]
292 fn test_filter_by_source() {
293 let filter = TriggerFilter::by_source("gateway");
294 assert!(filter.matches(&test_event("any", "gateway", "events.a")));
295 assert!(!filter.matches(&test_event("any", "box", "events.a")));
296 }
297
298 #[test]
299 fn test_filter_by_subject_exact() {
300 let filter = TriggerFilter::by_subject("events.market.forex");
301 assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
302 assert!(!filter.matches(&test_event("t", "s", "events.market.crypto")));
303 }
304
305 #[test]
306 fn test_filter_by_subject_wildcard() {
307 let filter = TriggerFilter::by_subject("events.market.>");
308 assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
309 assert!(filter.matches(&test_event("t", "s", "events.market.crypto.btc")));
310 assert!(!filter.matches(&test_event("t", "s", "events.system.deploy")));
311 }
312
313 #[test]
314 fn test_filter_by_subject_single_wildcard() {
315 let filter = TriggerFilter::by_subject("events.*.forex");
316 assert!(filter.matches(&test_event("t", "s", "events.market.forex")));
317 assert!(!filter.matches(&test_event("t", "s", "events.market.crypto")));
318 }
319
320 #[test]
321 fn test_filter_with_attributes() {
322 let filter = TriggerFilter::default()
323 .with_attribute("env", "prod")
324 .with_attribute("region", "us-east");
325
326 let event = test_event("t", "s", "events.a")
327 .with_metadata("env", "prod")
328 .with_metadata("region", "us-east");
329 assert!(filter.matches(&event));
330
331 let partial = test_event("t", "s", "events.a").with_metadata("env", "prod");
332 assert!(!filter.matches(&partial));
333
334 let wrong = test_event("t", "s", "events.a")
335 .with_metadata("env", "staging")
336 .with_metadata("region", "us-east");
337 assert!(!filter.matches(&wrong));
338 }
339
340 #[test]
341 fn test_filter_combined() {
342 let filter = TriggerFilter {
343 event_type: Some("scale.up".to_string()),
344 source: Some("gateway".to_string()),
345 subject_pattern: Some("events.scaling.>".to_string()),
346 attributes: vec![("priority".to_string(), "high".to_string())],
347 };
348
349 let good = Event::typed(
350 "events.scaling.web",
351 "test",
352 "scale.up",
353 1,
354 "Scale",
355 "gateway",
356 serde_json::json!({}),
357 )
358 .with_metadata("priority", "high");
359 assert!(filter.matches(&good));
360
361 let bad_type = Event::typed(
363 "events.scaling.web",
364 "test",
365 "scale.down",
366 1,
367 "Scale",
368 "gateway",
369 serde_json::json!({}),
370 )
371 .with_metadata("priority", "high");
372 assert!(!filter.matches(&bad_type));
373 }
374
375 #[tokio::test]
378 async fn test_broker_add_remove_triggers() {
379 let broker = Broker::new();
380 assert_eq!(broker.trigger_count().await, 0);
381
382 let sink = Arc::new(LogSink::default());
383 broker
384 .add_trigger(Trigger::new("t1", TriggerFilter::default(), sink.clone()))
385 .await;
386 broker
387 .add_trigger(Trigger::new("t2", TriggerFilter::by_type("x"), sink))
388 .await;
389
390 assert_eq!(broker.trigger_count().await, 2);
391
392 assert!(broker.remove_trigger("t1").await);
393 assert_eq!(broker.trigger_count().await, 1);
394
395 assert!(!broker.remove_trigger("nonexistent").await);
396 }
397
398 #[tokio::test]
399 async fn test_broker_route_to_matching_sink() {
400 let broker = Broker::new();
401 let collector = Arc::new(CollectorSink::new("matched"));
402
403 broker
404 .add_trigger(Trigger::new(
405 "scale-trigger",
406 TriggerFilter::by_type("a3s.gateway.scale.up"),
407 collector.clone(),
408 ))
409 .await;
410
411 let event = test_event("a3s.gateway.scale.up", "gateway", "events.scaling.up");
413 let result = broker.route(&event).await;
414 assert_eq!(result.matched, 1);
415 assert_eq!(result.delivered, 1);
416 assert_eq!(result.failed, 0);
417 assert_eq!(collector.count().await, 1);
418
419 let other = test_event("a3s.box.instance.ready", "box", "events.instance.ready");
421 let result = broker.route(&other).await;
422 assert_eq!(result.matched, 0);
423 assert_eq!(result.delivered, 0);
424 assert_eq!(collector.count().await, 1); }
426
427 #[tokio::test]
428 async fn test_broker_route_multiple_triggers() {
429 let broker = Broker::new();
430 let sink1 = Arc::new(CollectorSink::new("sink1"));
431 let sink2 = Arc::new(CollectorSink::new("sink2"));
432 let sink3 = Arc::new(CollectorSink::new("sink3"));
433
434 broker
435 .add_trigger(Trigger::new(
436 "t1",
437 TriggerFilter::by_type("scale.up"),
438 sink1.clone(),
439 ))
440 .await;
441 broker
442 .add_trigger(Trigger::new(
443 "t2",
444 TriggerFilter::by_source("gateway"),
445 sink2.clone(),
446 ))
447 .await;
448 broker
449 .add_trigger(Trigger::new(
450 "t3",
451 TriggerFilter::by_type("scale.down"),
452 sink3.clone(),
453 ))
454 .await;
455
456 let event = test_event("scale.up", "gateway", "events.a");
457 let result = broker.route(&event).await;
458
459 assert_eq!(result.matched, 2);
461 assert_eq!(result.delivered, 2);
462 assert_eq!(sink1.count().await, 1);
463 assert_eq!(sink2.count().await, 1);
464 assert_eq!(sink3.count().await, 0);
465 }
466
467 #[tokio::test]
468 async fn test_broker_route_with_failing_sink() {
469 let broker = Broker::new();
470 let good_sink = Arc::new(CollectorSink::new("good"));
471 let bad_sink = Arc::new(FailingSink::new("bad", "network error"));
472
473 broker
474 .add_trigger(Trigger::new("good-trigger", TriggerFilter::default(), good_sink.clone()))
475 .await;
476 broker
477 .add_trigger(Trigger::new("bad-trigger", TriggerFilter::default(), bad_sink))
478 .await;
479
480 let event = test_event("any", "any", "events.a");
481 let result = broker.route(&event).await;
482
483 assert_eq!(result.matched, 2);
484 assert_eq!(result.delivered, 1);
485 assert_eq!(result.failed, 1);
486 assert_eq!(good_sink.count().await, 1); }
488
489 #[tokio::test]
490 async fn test_broker_route_no_triggers() {
491 let broker = Broker::new();
492 let event = test_event("any", "any", "events.a");
493 let result = broker.route(&event).await;
494
495 assert_eq!(result.matched, 0);
496 assert_eq!(result.delivered, 0);
497 assert_eq!(result.failed, 0);
498 }
499
500 #[tokio::test]
501 async fn test_broker_default() {
502 let broker = Broker::default();
503 assert_eq!(broker.trigger_count().await, 0);
504 }
505
506 #[tokio::test]
507 async fn test_route_result_equality() {
508 let a = RouteResult {
509 matched: 2,
510 delivered: 1,
511 failed: 1,
512 };
513 let b = RouteResult {
514 matched: 2,
515 delivered: 1,
516 failed: 1,
517 };
518 assert_eq!(a, b);
519 }
520}