Skip to main content

actionqueue_core/
event.rs

1//! System events for subscription matching and dispatch coordination.
2//!
3//! [`ActionQueueEvent`](crate::event::ActionQueueEvent) is the canonical event type
4//! evaluated by the dispatch loop each tick. Subscriptions declare
5//! [`EventFilter`](crate::subscription::EventFilter)
6//! predicates that are matched against fired events to trigger task promotion.
7//!
8//! This module lives in `actionqueue-core` (rather than `actionqueue-budget`) so that
9//! `actionqueue-actor` and other extension crates can emit events without creating
10//! a circular dependency on `actionqueue-budget`.
11
12use crate::budget::BudgetDimension;
13use crate::ids::{ActorId, TaskId, TenantId};
14use crate::run::state::RunState;
15use crate::subscription::{EventFilter, SubscriptionId};
16
17/// An event that occurred within the dispatch loop, evaluated against subscriptions.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ActionQueueEvent {
20    /// A task reached terminal success (all runs completed).
21    TaskReachedTerminalSuccess {
22        /// The task that completed.
23        task_id: TaskId,
24    },
25    /// A run of the given task transitioned to a new state.
26    RunChangedState {
27        /// The owning task.
28        task_id: TaskId,
29        /// The new run state.
30        new_state: RunState,
31    },
32    /// A budget dimension crossed a percentage threshold for the given task.
33    BudgetThresholdCrossed {
34        /// The task whose budget crossed a threshold.
35        task_id: TaskId,
36        /// The budget dimension.
37        dimension: BudgetDimension,
38        /// The percentage (0-100) at which the threshold was crossed.
39        pct: u8,
40    },
41    /// An application-defined custom event.
42    CustomEvent {
43        /// The application-defined event key.
44        key: String,
45    },
46    /// A remote actor registered with the hub.
47    ActorRegistered {
48        /// The actor that registered.
49        actor_id: ActorId,
50    },
51    /// A remote actor deregistered (explicit or heartbeat timeout).
52    ActorDeregistered {
53        /// The actor that deregistered.
54        actor_id: ActorId,
55    },
56    /// A remote actor's heartbeat timed out.
57    ActorHeartbeatTimeout {
58        /// The actor whose heartbeat timed out.
59        actor_id: ActorId,
60    },
61    /// A ledger entry was appended in the platform layer.
62    LedgerEntryAppended {
63        /// The tenant whose ledger received the entry.
64        tenant_id: TenantId,
65        /// The ledger key (e.g. `"audit"`, `"decision"`).
66        ledger_key: String,
67    },
68}
69
70/// Checks a `ActionQueueEvent` against all active subscriptions in `registry`.
71///
72/// Returns the IDs of subscriptions whose filters match the event. Used by
73/// the dispatch loop to trigger task promotions when events fire.
74pub fn check_event<R>(event: &ActionQueueEvent, registry: &R) -> Vec<SubscriptionId>
75where
76    R: SubscriptionSource,
77{
78    registry
79        .active_subscriptions_iter()
80        .filter_map(|(id, filter)| if filter_matches(filter, event) { Some(id) } else { None })
81        .collect()
82}
83
84/// Trait for types that expose active subscription filters.
85///
86/// Implemented by `SubscriptionRegistry` in `actionqueue-budget`. This indirection
87/// keeps `actionqueue-core` free of a dependency on `actionqueue-budget`.
88pub trait SubscriptionSource {
89    /// Returns an iterator over (subscription_id, filter) pairs for all
90    /// active (non-canceled, non-triggered) subscriptions.
91    fn active_subscriptions_iter(
92        &self,
93    ) -> impl Iterator<Item = (SubscriptionId, &EventFilter)> + '_;
94}
95
96fn filter_matches(filter: &EventFilter, event: &ActionQueueEvent) -> bool {
97    match (filter, event) {
98        (
99            EventFilter::TaskCompleted { task_id: filter_task },
100            ActionQueueEvent::TaskReachedTerminalSuccess { task_id: event_task },
101        ) => filter_task == event_task,
102
103        (
104            EventFilter::RunStateChanged { task_id: filter_task, state: filter_state },
105            ActionQueueEvent::RunChangedState { task_id: event_task, new_state },
106        ) => filter_task == event_task && filter_state == new_state,
107
108        (
109            EventFilter::BudgetThreshold {
110                task_id: filter_task,
111                dimension: filter_dim,
112                threshold_pct,
113            },
114            ActionQueueEvent::BudgetThresholdCrossed { task_id: event_task, dimension, pct },
115        ) => filter_task == event_task && filter_dim == dimension && pct >= threshold_pct,
116
117        (
118            EventFilter::Custom { key: filter_key },
119            ActionQueueEvent::CustomEvent { key: event_key },
120        ) => filter_key == event_key,
121
122        _ => false,
123    }
124}