Skip to main content

actionqueue_budget/subscription/
matcher.rs

1//! Subscription source implementation and re-exports for event matching.
2//!
3//! [`ActionQueueEvent`] and [`check_event`] are defined in `actionqueue-core` and
4//! re-exported from `actionqueue-budget` for backward compatibility. This module
5//! implements [`SubscriptionSource`] for [`SubscriptionRegistry`] so that
6//! `check_event` can iterate active subscriptions.
7
8pub use actionqueue_core::event::{check_event, ActionQueueEvent, SubscriptionSource};
9use actionqueue_core::subscription::EventFilter;
10
11use crate::subscription::registry::SubscriptionRegistry;
12
13impl SubscriptionSource for SubscriptionRegistry {
14    fn active_subscriptions_iter(
15        &self,
16    ) -> impl Iterator<Item = (actionqueue_core::subscription::SubscriptionId, &EventFilter)> + '_
17    {
18        self.active_subscriptions().map(|(id, entry)| (id, &entry.filter))
19    }
20}
21
22#[cfg(test)]
23mod tests {
24    use actionqueue_core::budget::BudgetDimension;
25    use actionqueue_core::ids::TaskId;
26    use actionqueue_core::run::state::RunState;
27    use actionqueue_core::subscription::{EventFilter, SubscriptionId};
28
29    use super::{check_event, ActionQueueEvent};
30    use crate::subscription::registry::SubscriptionRegistry;
31
32    #[test]
33    fn task_completed_filter_matches_terminal_success_event() {
34        let mut registry = SubscriptionRegistry::new();
35        let task_a = TaskId::new();
36        let task_b = TaskId::new();
37        let sub = SubscriptionId::new();
38        registry.register(sub, task_b, EventFilter::TaskCompleted { task_id: task_a });
39
40        let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id: task_a };
41        let matches = check_event(&event, &registry);
42        assert_eq!(matches, vec![sub]);
43    }
44
45    #[test]
46    fn task_completed_filter_does_not_match_wrong_task() {
47        let mut registry = SubscriptionRegistry::new();
48        let task_a = TaskId::new();
49        let task_b = TaskId::new();
50        let sub = SubscriptionId::new();
51        registry.register(sub, task_b, EventFilter::TaskCompleted { task_id: task_a });
52
53        let event = ActionQueueEvent::TaskReachedTerminalSuccess { task_id: task_b };
54        assert!(check_event(&event, &registry).is_empty());
55    }
56
57    #[test]
58    fn run_state_changed_filter_matches_correct_state() {
59        let mut registry = SubscriptionRegistry::new();
60        let task_a = TaskId::new();
61        let task_b = TaskId::new();
62        let sub = SubscriptionId::new();
63        registry.register(
64            sub,
65            task_b,
66            EventFilter::RunStateChanged { task_id: task_a, state: RunState::Suspended },
67        );
68
69        let event =
70            ActionQueueEvent::RunChangedState { task_id: task_a, new_state: RunState::Suspended };
71        assert_eq!(check_event(&event, &registry), vec![sub]);
72
73        let wrong_state =
74            ActionQueueEvent::RunChangedState { task_id: task_a, new_state: RunState::Completed };
75        assert!(check_event(&wrong_state, &registry).is_empty());
76    }
77
78    #[test]
79    fn budget_threshold_filter_matches_at_or_above_threshold() {
80        let mut registry = SubscriptionRegistry::new();
81        let task_a = TaskId::new();
82        let task_b = TaskId::new();
83        let sub = SubscriptionId::new();
84        registry.register(
85            sub,
86            task_b,
87            EventFilter::BudgetThreshold {
88                task_id: task_a,
89                dimension: BudgetDimension::Token,
90                threshold_pct: 80,
91            },
92        );
93
94        let at_threshold = ActionQueueEvent::BudgetThresholdCrossed {
95            task_id: task_a,
96            dimension: BudgetDimension::Token,
97            pct: 80,
98        };
99        assert_eq!(check_event(&at_threshold, &registry), vec![sub]);
100
101        let above_threshold = ActionQueueEvent::BudgetThresholdCrossed {
102            task_id: task_a,
103            dimension: BudgetDimension::Token,
104            pct: 95,
105        };
106        assert_eq!(check_event(&above_threshold, &registry), vec![sub]);
107
108        let below_threshold = ActionQueueEvent::BudgetThresholdCrossed {
109            task_id: task_a,
110            dimension: BudgetDimension::Token,
111            pct: 79,
112        };
113        assert!(check_event(&below_threshold, &registry).is_empty());
114    }
115
116    #[test]
117    fn custom_event_filter_matches_by_key() {
118        let mut registry = SubscriptionRegistry::new();
119        let task = TaskId::new();
120        let sub = SubscriptionId::new();
121        registry.register(
122            sub,
123            task,
124            EventFilter::Custom { key: "caelum.thread.paused".to_string() },
125        );
126
127        let matching = ActionQueueEvent::CustomEvent { key: "caelum.thread.paused".to_string() };
128        assert_eq!(check_event(&matching, &registry), vec![sub]);
129
130        let non_matching = ActionQueueEvent::CustomEvent { key: "caelum.thread.other".to_string() };
131        assert!(check_event(&non_matching, &registry).is_empty());
132    }
133}