Skip to main content

actionqueue_budget/subscription/
registry.rs

1//! In-memory subscription state registry.
2//!
3//! Tracks the lifecycle of event subscriptions: active, triggered, canceled.
4//! The dispatch loop consults this registry to:
5//! 1. Find tasks with triggered subscriptions for early promotion.
6//! 2. Match new events against active subscriptions.
7
8use std::collections::{HashMap, HashSet};
9
10use actionqueue_core::ids::TaskId;
11use actionqueue_core::subscription::{EventFilter, SubscriptionId};
12
13/// State record for a single subscription.
14#[derive(Debug, Clone)]
15pub struct SubscriptionEntry {
16    /// The subscribing task.
17    pub task_id: TaskId,
18    /// The event filter.
19    pub filter: EventFilter,
20    /// Set when the subscription has been triggered. One-shot: cleared after promotion.
21    pub triggered: bool,
22    /// Set when the subscription has been explicitly canceled.
23    pub canceled: bool,
24}
25
26/// In-memory registry of event subscriptions.
27///
28/// Reconstructed from WAL events at bootstrap via [`SubscriptionRegistry::register`],
29/// [`SubscriptionRegistry::trigger`], and [`SubscriptionRegistry::cancel`] calls.
30///
31/// Maintains a secondary `task_subscriptions` index for O(S) per-task lookups
32/// where S is the number of subscriptions for that task (not O(N) total).
33#[derive(Debug, Default)]
34pub struct SubscriptionRegistry {
35    subscriptions: HashMap<SubscriptionId, SubscriptionEntry>,
36    /// Secondary index: task_id → set of subscription_ids for that task.
37    task_subscriptions: HashMap<TaskId, HashSet<SubscriptionId>>,
38}
39
40impl SubscriptionRegistry {
41    /// Creates an empty registry.
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Registers a new active subscription.
47    pub fn register(
48        &mut self,
49        subscription_id: SubscriptionId,
50        task_id: TaskId,
51        filter: EventFilter,
52    ) {
53        self.subscriptions.insert(
54            subscription_id,
55            SubscriptionEntry { task_id, filter, triggered: false, canceled: false },
56        );
57        self.task_subscriptions.entry(task_id).or_default().insert(subscription_id);
58    }
59
60    /// Marks a subscription as triggered (promotion eligible).
61    pub fn trigger(&mut self, subscription_id: SubscriptionId) {
62        if let Some(entry) = self.subscriptions.get_mut(&subscription_id) {
63            entry.triggered = true;
64        }
65    }
66
67    /// Marks a subscription as canceled (no longer active).
68    pub fn cancel(&mut self, subscription_id: SubscriptionId) {
69        if let Some(entry) = self.subscriptions.get_mut(&subscription_id) {
70            entry.canceled = true;
71        }
72    }
73
74    /// Returns an iterator over non-canceled, non-triggered active subscriptions.
75    pub fn active_subscriptions(
76        &self,
77    ) -> impl Iterator<Item = (SubscriptionId, &SubscriptionEntry)> {
78        self.subscriptions
79            .iter()
80            .filter(|(_, entry)| !entry.canceled && !entry.triggered)
81            .map(|(id, entry)| (*id, entry))
82    }
83
84    /// Returns `true` if any subscription for this task has been triggered.
85    ///
86    /// O(S) where S = number of subscriptions for this task (via secondary index).
87    pub fn is_triggered(&self, task_id: TaskId) -> bool {
88        let Some(ids) = self.task_subscriptions.get(&task_id) else {
89            return false;
90        };
91        ids.iter().any(|id| self.subscriptions.get(id).is_some_and(|e| e.triggered && !e.canceled))
92    }
93
94    /// Clears the triggered flag for all triggered subscriptions for this task.
95    ///
96    /// Called after the task has been promoted to Ready so the one-shot
97    /// trigger is consumed and won't re-promote on the next tick.
98    ///
99    /// O(S) where S = number of subscriptions for this task (via secondary index).
100    pub fn clear_triggered(&mut self, task_id: TaskId) {
101        let Some(ids) = self.task_subscriptions.get(&task_id) else {
102            return;
103        };
104        let ids: Vec<_> = ids.iter().copied().collect();
105        for id in ids {
106            if let Some(entry) = self.subscriptions.get_mut(&id) {
107                if entry.triggered {
108                    entry.triggered = false;
109                }
110            }
111        }
112    }
113
114    /// Removes all subscription state for a fully-terminal task.
115    ///
116    /// Called by the dispatch loop after a task reaches terminal state.
117    /// Removes all subscriptions (including canceled ones) from both the
118    /// primary map and the secondary index.
119    pub fn gc_task(&mut self, task_id: TaskId) {
120        if let Some(ids) = self.task_subscriptions.remove(&task_id) {
121            for id in ids {
122                self.subscriptions.remove(&id);
123            }
124        }
125    }
126
127    /// Returns the subscription entry for the given ID.
128    pub fn get(&self, subscription_id: &SubscriptionId) -> Option<&SubscriptionEntry> {
129        self.subscriptions.get(subscription_id)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use actionqueue_core::ids::TaskId;
136    use actionqueue_core::subscription::{EventFilter, SubscriptionId};
137
138    use super::SubscriptionRegistry;
139
140    #[test]
141    fn register_and_is_triggered_lifecycle() {
142        let mut registry = SubscriptionRegistry::new();
143        let sub_id = SubscriptionId::new();
144        let task_id = TaskId::new();
145        let filter = EventFilter::TaskCompleted { task_id };
146
147        registry.register(sub_id, task_id, filter);
148        assert!(!registry.is_triggered(task_id));
149
150        registry.trigger(sub_id);
151        assert!(registry.is_triggered(task_id));
152
153        registry.clear_triggered(task_id);
154        assert!(!registry.is_triggered(task_id));
155    }
156
157    #[test]
158    fn cancel_removes_from_active() {
159        let mut registry = SubscriptionRegistry::new();
160        let sub_id = SubscriptionId::new();
161        let task_id = TaskId::new();
162        let filter = EventFilter::TaskCompleted { task_id };
163        registry.register(sub_id, task_id, filter);
164
165        registry.cancel(sub_id);
166        assert_eq!(registry.active_subscriptions().count(), 0);
167    }
168
169    #[test]
170    fn trigger_nonexistent_subscription_is_noop() {
171        let mut registry = SubscriptionRegistry::new();
172        let nonexistent = SubscriptionId::new();
173        registry.trigger(nonexistent); // should not panic
174        assert!(!registry.is_triggered(TaskId::new()));
175    }
176
177    #[test]
178    fn cancel_nonexistent_subscription_is_noop() {
179        let mut registry = SubscriptionRegistry::new();
180        let nonexistent = SubscriptionId::new();
181        registry.cancel(nonexistent); // should not panic
182        assert_eq!(registry.active_subscriptions().count(), 0);
183    }
184
185    #[test]
186    fn get_returns_entry() {
187        let mut registry = SubscriptionRegistry::new();
188        let sub_id = SubscriptionId::new();
189        let task_id = TaskId::new();
190        let filter = EventFilter::TaskCompleted { task_id };
191        registry.register(sub_id, task_id, filter.clone());
192
193        let entry = registry.get(&sub_id).expect("subscription should exist");
194        assert_eq!(entry.task_id, task_id);
195        assert_eq!(entry.filter, filter);
196        assert!(!entry.triggered);
197        assert!(!entry.canceled);
198    }
199
200    #[test]
201    fn get_nonexistent_returns_none() {
202        let registry = SubscriptionRegistry::new();
203        assert!(registry.get(&SubscriptionId::new()).is_none());
204    }
205
206    #[test]
207    fn duplicate_registration_overwrites() {
208        let mut registry = SubscriptionRegistry::new();
209        let sub_id = SubscriptionId::new();
210        let task1 = TaskId::new();
211        let task2 = TaskId::new();
212        registry.register(sub_id, task1, EventFilter::TaskCompleted { task_id: task1 });
213        registry.register(sub_id, task2, EventFilter::TaskCompleted { task_id: task2 });
214
215        let entry = registry.get(&sub_id).unwrap();
216        assert_eq!(entry.task_id, task2);
217        assert_eq!(registry.active_subscriptions().count(), 1);
218    }
219
220    #[test]
221    fn is_triggered_uses_secondary_index() {
222        let mut registry = SubscriptionRegistry::new();
223        let task = TaskId::new();
224        let sub1 = SubscriptionId::new();
225        let sub2 = SubscriptionId::new();
226        registry.register(sub1, task, EventFilter::TaskCompleted { task_id: task });
227        registry.register(sub2, task, EventFilter::TaskCompleted { task_id: task });
228
229        assert!(!registry.is_triggered(task));
230        registry.trigger(sub1);
231        assert!(registry.is_triggered(task));
232        registry.clear_triggered(task);
233        assert!(!registry.is_triggered(task));
234    }
235
236    #[test]
237    fn gc_task_removes_subscriptions_and_index() {
238        let mut registry = SubscriptionRegistry::new();
239        let task = TaskId::new();
240        let sub_id = SubscriptionId::new();
241        registry.register(sub_id, task, EventFilter::TaskCompleted { task_id: task });
242
243        registry.gc_task(task);
244
245        assert!(registry.get(&sub_id).is_none());
246        assert!(!registry.is_triggered(task));
247        assert_eq!(registry.active_subscriptions().count(), 0);
248    }
249
250    #[test]
251    fn gc_task_does_not_affect_other_tasks() {
252        let mut registry = SubscriptionRegistry::new();
253        let task1 = TaskId::new();
254        let task2 = TaskId::new();
255        let sub1 = SubscriptionId::new();
256        let sub2 = SubscriptionId::new();
257        registry.register(sub1, task1, EventFilter::TaskCompleted { task_id: task1 });
258        registry.register(sub2, task2, EventFilter::TaskCompleted { task_id: task2 });
259
260        registry.gc_task(task1);
261
262        assert!(registry.get(&sub1).is_none());
263        assert!(registry.get(&sub2).is_some());
264        assert_eq!(registry.active_subscriptions().count(), 1);
265    }
266
267    #[test]
268    fn gc_task_is_idempotent() {
269        let mut registry = SubscriptionRegistry::new();
270        let task = TaskId::new();
271        let sub_id = SubscriptionId::new();
272        registry.register(sub_id, task, EventFilter::TaskCompleted { task_id: task });
273        registry.gc_task(task);
274        registry.gc_task(task); // must not panic
275    }
276
277    #[test]
278    fn active_subscriptions_excludes_triggered_and_canceled() {
279        let mut registry = SubscriptionRegistry::new();
280        let task = TaskId::new();
281
282        let active = SubscriptionId::new();
283        let triggered = SubscriptionId::new();
284        let canceled = SubscriptionId::new();
285
286        registry.register(active, task, EventFilter::TaskCompleted { task_id: task });
287        registry.register(triggered, task, EventFilter::TaskCompleted { task_id: task });
288        registry.register(canceled, task, EventFilter::TaskCompleted { task_id: task });
289
290        registry.trigger(triggered);
291        registry.cancel(canceled);
292
293        let active_subs: Vec<_> = registry.active_subscriptions().collect();
294        assert_eq!(active_subs.len(), 1);
295        assert_eq!(active_subs[0].0, active);
296    }
297}