use crate::budget::BudgetDimension;
use crate::ids::{ActorId, TaskId, TenantId};
use crate::run::state::RunState;
use crate::subscription::{EventFilter, SubscriptionId};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActionQueueEvent {
TaskReachedTerminalSuccess {
task_id: TaskId,
},
RunChangedState {
task_id: TaskId,
new_state: RunState,
},
BudgetThresholdCrossed {
task_id: TaskId,
dimension: BudgetDimension,
pct: u8,
},
CustomEvent {
key: String,
},
ActorRegistered {
actor_id: ActorId,
},
ActorDeregistered {
actor_id: ActorId,
},
ActorHeartbeatTimeout {
actor_id: ActorId,
},
LedgerEntryAppended {
tenant_id: TenantId,
ledger_key: String,
},
}
pub fn check_event<R>(event: &ActionQueueEvent, registry: &R) -> Vec<SubscriptionId>
where
R: SubscriptionSource,
{
registry
.active_subscriptions_iter()
.filter_map(|(id, filter)| if filter_matches(filter, event) { Some(id) } else { None })
.collect()
}
pub trait SubscriptionSource {
fn active_subscriptions_iter(
&self,
) -> impl Iterator<Item = (SubscriptionId, &EventFilter)> + '_;
}
fn filter_matches(filter: &EventFilter, event: &ActionQueueEvent) -> bool {
match (filter, event) {
(
EventFilter::TaskCompleted { task_id: filter_task },
ActionQueueEvent::TaskReachedTerminalSuccess { task_id: event_task },
) => filter_task == event_task,
(
EventFilter::RunStateChanged { task_id: filter_task, state: filter_state },
ActionQueueEvent::RunChangedState { task_id: event_task, new_state },
) => filter_task == event_task && filter_state == new_state,
(
EventFilter::BudgetThreshold {
task_id: filter_task,
dimension: filter_dim,
threshold_pct,
},
ActionQueueEvent::BudgetThresholdCrossed { task_id: event_task, dimension, pct },
) => filter_task == event_task && filter_dim == dimension && pct >= threshold_pct,
(
EventFilter::Custom { key: filter_key },
ActionQueueEvent::CustomEvent { key: event_key },
) => filter_key == event_key,
_ => false,
}
}