actionqueue_budget/subscription/
matcher.rs1pub use actionqueue_core::event::{check_event, ActionQueueEvent, SubscriptionSource};
9use actionqueue_core::subscription::EventFilter;
10
11use crate::subscription::registry::SubscriptionRegistry;
12
13impl SubscriptionSource for SubscriptionRegistry {
14 fn active_subscriptions_iter(
15 &self,
16 ) -> impl Iterator<Item = (actionqueue_core::subscription::SubscriptionId, &EventFilter)> + '_
17 {
18 self.active_subscriptions().map(|(id, entry)| (id, &entry.filter))
19 }
20}
21
22#[cfg(test)]
23mod tests {
24 use actionqueue_core::budget::BudgetDimension;
25 use actionqueue_core::ids::TaskId;
26 use actionqueue_core::run::state::RunState;
27 use actionqueue_core::subscription::{EventFilter, SubscriptionId};
28
29 use super::{check_event, ActionQueueEvent};
30 use crate::subscription::registry::SubscriptionRegistry;
31
32 #[test]
33 fn task_completed_filter_matches_terminal_success_event() {
34 let mut registry = SubscriptionRegistry::new();
35 let task_a = TaskId::new();
36 let task_b = TaskId::new();
37 let sub = SubscriptionId::new();
38 registry.register(sub, task_b, EventFilter::TaskCompleted { task_id: task_a });
39
40 let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id: task_a };
41 let matches = check_event(&event, ®istry);
42 assert_eq!(matches, vec![sub]);
43 }
44
45 #[test]
46 fn task_completed_filter_does_not_match_wrong_task() {
47 let mut registry = SubscriptionRegistry::new();
48 let task_a = TaskId::new();
49 let task_b = TaskId::new();
50 let sub = SubscriptionId::new();
51 registry.register(sub, task_b, EventFilter::TaskCompleted { task_id: task_a });
52
53 let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id: task_b };
54 assert!(check_event(&event, ®istry).is_empty());
55 }
56
57 #[test]
58 fn run_state_changed_filter_matches_correct_state() {
59 let mut registry = SubscriptionRegistry::new();
60 let task_a = TaskId::new();
61 let task_b = TaskId::new();
62 let sub = SubscriptionId::new();
63 registry.register(
64 sub,
65 task_b,
66 EventFilter::RunStateChanged { task_id: task_a, state: RunState::Suspended },
67 );
68
69 let event =
70 ActionQueueEvent::RunChangedState { task_id: task_a, new_state: RunState::Suspended };
71 assert_eq!(check_event(&event, ®istry), vec![sub]);
72
73 let wrong_state =
74 ActionQueueEvent::RunChangedState { task_id: task_a, new_state: RunState::Completed };
75 assert!(check_event(&wrong_state, ®istry).is_empty());
76 }
77
78 #[test]
79 fn budget_threshold_filter_matches_at_or_above_threshold() {
80 let mut registry = SubscriptionRegistry::new();
81 let task_a = TaskId::new();
82 let task_b = TaskId::new();
83 let sub = SubscriptionId::new();
84 registry.register(
85 sub,
86 task_b,
87 EventFilter::BudgetThreshold {
88 task_id: task_a,
89 dimension: BudgetDimension::Token,
90 threshold_pct: 80,
91 },
92 );
93
94 let at_threshold = ActionQueueEvent::BudgetThresholdCrossed {
95 task_id: task_a,
96 dimension: BudgetDimension::Token,
97 pct: 80,
98 };
99 assert_eq!(check_event(&at_threshold, ®istry), vec![sub]);
100
101 let above_threshold = ActionQueueEvent::BudgetThresholdCrossed {
102 task_id: task_a,
103 dimension: BudgetDimension::Token,
104 pct: 95,
105 };
106 assert_eq!(check_event(&above_threshold, ®istry), vec![sub]);
107
108 let below_threshold = ActionQueueEvent::BudgetThresholdCrossed {
109 task_id: task_a,
110 dimension: BudgetDimension::Token,
111 pct: 79,
112 };
113 assert!(check_event(&below_threshold, ®istry).is_empty());
114 }
115
116 #[test]
117 fn custom_event_filter_matches_by_key() {
118 let mut registry = SubscriptionRegistry::new();
119 let task = TaskId::new();
120 let sub = SubscriptionId::new();
121 registry.register(
122 sub,
123 task,
124 EventFilter::Custom { key: "caelum.thread.paused".to_string() },
125 );
126
127 let matching = ActionQueueEvent::CustomEvent { key: "caelum.thread.paused".to_string() };
128 assert_eq!(check_event(&matching, ®istry), vec![sub]);
129
130 let non_matching = ActionQueueEvent::CustomEvent { key: "caelum.thread.other".to_string() };
131 assert!(check_event(&non_matching, ®istry).is_empty());
132 }
133}