actionqueue_budget/subscription/
registry.rs1use std::collections::{HashMap, HashSet};
9
10use actionqueue_core::ids::TaskId;
11use actionqueue_core::subscription::{EventFilter, SubscriptionId};
12
13#[derive(Debug, Clone)]
15pub struct SubscriptionEntry {
16 pub task_id: TaskId,
18 pub filter: EventFilter,
20 pub triggered: bool,
22 pub canceled: bool,
24}
25
26#[derive(Debug, Default)]
34pub struct SubscriptionRegistry {
35 subscriptions: HashMap<SubscriptionId, SubscriptionEntry>,
36 task_subscriptions: HashMap<TaskId, HashSet<SubscriptionId>>,
38}
39
40impl SubscriptionRegistry {
41 pub fn new() -> Self {
43 Self::default()
44 }
45
46 pub fn register(
48 &mut self,
49 subscription_id: SubscriptionId,
50 task_id: TaskId,
51 filter: EventFilter,
52 ) {
53 self.subscriptions.insert(
54 subscription_id,
55 SubscriptionEntry { task_id, filter, triggered: false, canceled: false },
56 );
57 self.task_subscriptions.entry(task_id).or_default().insert(subscription_id);
58 }
59
60 pub fn trigger(&mut self, subscription_id: SubscriptionId) {
62 if let Some(entry) = self.subscriptions.get_mut(&subscription_id) {
63 entry.triggered = true;
64 }
65 }
66
67 pub fn cancel(&mut self, subscription_id: SubscriptionId) {
69 if let Some(entry) = self.subscriptions.get_mut(&subscription_id) {
70 entry.canceled = true;
71 }
72 }
73
74 pub fn active_subscriptions(
76 &self,
77 ) -> impl Iterator<Item = (SubscriptionId, &SubscriptionEntry)> {
78 self.subscriptions
79 .iter()
80 .filter(|(_, entry)| !entry.canceled && !entry.triggered)
81 .map(|(id, entry)| (*id, entry))
82 }
83
84 pub fn is_triggered(&self, task_id: TaskId) -> bool {
88 let Some(ids) = self.task_subscriptions.get(&task_id) else {
89 return false;
90 };
91 ids.iter().any(|id| self.subscriptions.get(id).is_some_and(|e| e.triggered && !e.canceled))
92 }
93
94 pub fn clear_triggered(&mut self, task_id: TaskId) {
101 let Some(ids) = self.task_subscriptions.get(&task_id) else {
102 return;
103 };
104 let ids: Vec<_> = ids.iter().copied().collect();
105 for id in ids {
106 if let Some(entry) = self.subscriptions.get_mut(&id) {
107 if entry.triggered {
108 entry.triggered = false;
109 }
110 }
111 }
112 }
113
114 pub fn gc_task(&mut self, task_id: TaskId) {
120 if let Some(ids) = self.task_subscriptions.remove(&task_id) {
121 for id in ids {
122 self.subscriptions.remove(&id);
123 }
124 }
125 }
126
127 pub fn get(&self, subscription_id: &SubscriptionId) -> Option<&SubscriptionEntry> {
129 self.subscriptions.get(subscription_id)
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use actionqueue_core::ids::TaskId;
136 use actionqueue_core::subscription::{EventFilter, SubscriptionId};
137
138 use super::SubscriptionRegistry;
139
140 #[test]
141 fn register_and_is_triggered_lifecycle() {
142 let mut registry = SubscriptionRegistry::new();
143 let sub_id = SubscriptionId::new();
144 let task_id = TaskId::new();
145 let filter = EventFilter::TaskCompleted { task_id };
146
147 registry.register(sub_id, task_id, filter);
148 assert!(!registry.is_triggered(task_id));
149
150 registry.trigger(sub_id);
151 assert!(registry.is_triggered(task_id));
152
153 registry.clear_triggered(task_id);
154 assert!(!registry.is_triggered(task_id));
155 }
156
157 #[test]
158 fn cancel_removes_from_active() {
159 let mut registry = SubscriptionRegistry::new();
160 let sub_id = SubscriptionId::new();
161 let task_id = TaskId::new();
162 let filter = EventFilter::TaskCompleted { task_id };
163 registry.register(sub_id, task_id, filter);
164
165 registry.cancel(sub_id);
166 assert_eq!(registry.active_subscriptions().count(), 0);
167 }
168
169 #[test]
170 fn trigger_nonexistent_subscription_is_noop() {
171 let mut registry = SubscriptionRegistry::new();
172 let nonexistent = SubscriptionId::new();
173 registry.trigger(nonexistent); assert!(!registry.is_triggered(TaskId::new()));
175 }
176
177 #[test]
178 fn cancel_nonexistent_subscription_is_noop() {
179 let mut registry = SubscriptionRegistry::new();
180 let nonexistent = SubscriptionId::new();
181 registry.cancel(nonexistent); assert_eq!(registry.active_subscriptions().count(), 0);
183 }
184
185 #[test]
186 fn get_returns_entry() {
187 let mut registry = SubscriptionRegistry::new();
188 let sub_id = SubscriptionId::new();
189 let task_id = TaskId::new();
190 let filter = EventFilter::TaskCompleted { task_id };
191 registry.register(sub_id, task_id, filter.clone());
192
193 let entry = registry.get(&sub_id).expect("subscription should exist");
194 assert_eq!(entry.task_id, task_id);
195 assert_eq!(entry.filter, filter);
196 assert!(!entry.triggered);
197 assert!(!entry.canceled);
198 }
199
200 #[test]
201 fn get_nonexistent_returns_none() {
202 let registry = SubscriptionRegistry::new();
203 assert!(registry.get(&SubscriptionId::new()).is_none());
204 }
205
206 #[test]
207 fn duplicate_registration_overwrites() {
208 let mut registry = SubscriptionRegistry::new();
209 let sub_id = SubscriptionId::new();
210 let task1 = TaskId::new();
211 let task2 = TaskId::new();
212 registry.register(sub_id, task1, EventFilter::TaskCompleted { task_id: task1 });
213 registry.register(sub_id, task2, EventFilter::TaskCompleted { task_id: task2 });
214
215 let entry = registry.get(&sub_id).unwrap();
216 assert_eq!(entry.task_id, task2);
217 assert_eq!(registry.active_subscriptions().count(), 1);
218 }
219
220 #[test]
221 fn is_triggered_uses_secondary_index() {
222 let mut registry = SubscriptionRegistry::new();
223 let task = TaskId::new();
224 let sub1 = SubscriptionId::new();
225 let sub2 = SubscriptionId::new();
226 registry.register(sub1, task, EventFilter::TaskCompleted { task_id: task });
227 registry.register(sub2, task, EventFilter::TaskCompleted { task_id: task });
228
229 assert!(!registry.is_triggered(task));
230 registry.trigger(sub1);
231 assert!(registry.is_triggered(task));
232 registry.clear_triggered(task);
233 assert!(!registry.is_triggered(task));
234 }
235
236 #[test]
237 fn gc_task_removes_subscriptions_and_index() {
238 let mut registry = SubscriptionRegistry::new();
239 let task = TaskId::new();
240 let sub_id = SubscriptionId::new();
241 registry.register(sub_id, task, EventFilter::TaskCompleted { task_id: task });
242
243 registry.gc_task(task);
244
245 assert!(registry.get(&sub_id).is_none());
246 assert!(!registry.is_triggered(task));
247 assert_eq!(registry.active_subscriptions().count(), 0);
248 }
249
250 #[test]
251 fn gc_task_does_not_affect_other_tasks() {
252 let mut registry = SubscriptionRegistry::new();
253 let task1 = TaskId::new();
254 let task2 = TaskId::new();
255 let sub1 = SubscriptionId::new();
256 let sub2 = SubscriptionId::new();
257 registry.register(sub1, task1, EventFilter::TaskCompleted { task_id: task1 });
258 registry.register(sub2, task2, EventFilter::TaskCompleted { task_id: task2 });
259
260 registry.gc_task(task1);
261
262 assert!(registry.get(&sub1).is_none());
263 assert!(registry.get(&sub2).is_some());
264 assert_eq!(registry.active_subscriptions().count(), 1);
265 }
266
267 #[test]
268 fn gc_task_is_idempotent() {
269 let mut registry = SubscriptionRegistry::new();
270 let task = TaskId::new();
271 let sub_id = SubscriptionId::new();
272 registry.register(sub_id, task, EventFilter::TaskCompleted { task_id: task });
273 registry.gc_task(task);
274 registry.gc_task(task); }
276
277 #[test]
278 fn active_subscriptions_excludes_triggered_and_canceled() {
279 let mut registry = SubscriptionRegistry::new();
280 let task = TaskId::new();
281
282 let active = SubscriptionId::new();
283 let triggered = SubscriptionId::new();
284 let canceled = SubscriptionId::new();
285
286 registry.register(active, task, EventFilter::TaskCompleted { task_id: task });
287 registry.register(triggered, task, EventFilter::TaskCompleted { task_id: task });
288 registry.register(canceled, task, EventFilter::TaskCompleted { task_id: task });
289
290 registry.trigger(triggered);
291 registry.cancel(canceled);
292
293 let active_subs: Vec<_> = registry.active_subscriptions().collect();
294 assert_eq!(active_subs.len(), 1);
295 assert_eq!(active_subs[0].0, active);
296 }
297}