actionqueue_core/event.rs
1//! System events for subscription matching and dispatch coordination.
2//!
3//! [`ActionQueueEvent`](crate::event::ActionQueueEvent) is the canonical event type
4//! evaluated by the dispatch loop each tick. Subscriptions declare
5//! [`EventFilter`](crate::subscription::EventFilter)
6//! predicates that are matched against fired events to trigger task promotion.
7//!
8//! This module lives in `actionqueue-core` (rather than `actionqueue-budget`) so that
9//! `actionqueue-actor` and other extension crates can emit events without creating
10//! a circular dependency on `actionqueue-budget`.
11
12use crate::budget::BudgetDimension;
13use crate::ids::{ActorId, TaskId, TenantId};
14use crate::run::state::RunState;
15use crate::subscription::{EventFilter, SubscriptionId};
16
17/// An event that occurred within the dispatch loop, evaluated against subscriptions.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ActionQueueEvent {
20 /// A task reached terminal success (all runs completed).
21 TaskReachedTerminalSuccess {
22 /// The task that completed.
23 task_id: TaskId,
24 },
25 /// A run of the given task transitioned to a new state.
26 RunChangedState {
27 /// The owning task.
28 task_id: TaskId,
29 /// The new run state.
30 new_state: RunState,
31 },
32 /// A budget dimension crossed a percentage threshold for the given task.
33 BudgetThresholdCrossed {
34 /// The task whose budget crossed a threshold.
35 task_id: TaskId,
36 /// The budget dimension.
37 dimension: BudgetDimension,
38 /// The percentage (0-100) at which the threshold was crossed.
39 pct: u8,
40 },
41 /// An application-defined custom event.
42 CustomEvent {
43 /// The application-defined event key.
44 key: String,
45 },
46 /// A remote actor registered with the hub.
47 ActorRegistered {
48 /// The actor that registered.
49 actor_id: ActorId,
50 },
51 /// A remote actor deregistered (explicit or heartbeat timeout).
52 ActorDeregistered {
53 /// The actor that deregistered.
54 actor_id: ActorId,
55 },
56 /// A remote actor's heartbeat timed out.
57 ActorHeartbeatTimeout {
58 /// The actor whose heartbeat timed out.
59 actor_id: ActorId,
60 },
61 /// A ledger entry was appended in the platform layer.
62 LedgerEntryAppended {
63 /// The tenant whose ledger received the entry.
64 tenant_id: TenantId,
65 /// The ledger key (e.g. `"audit"`, `"decision"`).
66 ledger_key: String,
67 },
68}
69
70/// Checks a `ActionQueueEvent` against all active subscriptions in `registry`.
71///
72/// Returns the IDs of subscriptions whose filters match the event. Used by
73/// the dispatch loop to trigger task promotions when events fire.
74pub fn check_event<R>(event: &ActionQueueEvent, registry: &R) -> Vec<SubscriptionId>
75where
76 R: SubscriptionSource,
77{
78 registry
79 .active_subscriptions_iter()
80 .filter_map(|(id, filter)| if filter_matches(filter, event) { Some(id) } else { None })
81 .collect()
82}
83
84/// Trait for types that expose active subscription filters.
85///
86/// Implemented by `SubscriptionRegistry` in `actionqueue-budget`. This indirection
87/// keeps `actionqueue-core` free of a dependency on `actionqueue-budget`.
88pub trait SubscriptionSource {
89 /// Returns an iterator over (subscription_id, filter) pairs for all
90 /// active (non-canceled, non-triggered) subscriptions.
91 fn active_subscriptions_iter(
92 &self,
93 ) -> impl Iterator<Item = (SubscriptionId, &EventFilter)> + '_;
94}
95
96fn filter_matches(filter: &EventFilter, event: &ActionQueueEvent) -> bool {
97 match (filter, event) {
98 (
99 EventFilter::TaskCompleted { task_id: filter_task },
100 ActionQueueEvent::TaskReachedTerminalSuccess { task_id: event_task },
101 ) => filter_task == event_task,
102
103 (
104 EventFilter::RunStateChanged { task_id: filter_task, state: filter_state },
105 ActionQueueEvent::RunChangedState { task_id: event_task, new_state },
106 ) => filter_task == event_task && filter_state == new_state,
107
108 (
109 EventFilter::BudgetThreshold {
110 task_id: filter_task,
111 dimension: filter_dim,
112 threshold_pct,
113 },
114 ActionQueueEvent::BudgetThresholdCrossed { task_id: event_task, dimension, pct },
115 ) => filter_task == event_task && filter_dim == dimension && pct >= threshold_pct,
116
117 (
118 EventFilter::Custom { key: filter_key },
119 ActionQueueEvent::CustomEvent { key: event_key },
120 ) => filter_key == event_key,
121
122 _ => false,
123 }
124}